001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.nio; 018 019import java.io.IOException; 020import java.nio.channels.SelectionKey; 021import java.nio.channels.Selector; 022import java.util.Iterator; 023import java.util.Set; 024import java.util.concurrent.ConcurrentLinkedQueue; 025import java.util.concurrent.atomic.AtomicInteger; 026 027public class SelectorWorker implements Runnable { 028 029 private static final AtomicInteger NEXT_ID = new AtomicInteger(); 030 031 final SelectorManager manager; 032 final Selector selector; 033 final int id = NEXT_ID.getAndIncrement(); 034 private final int maxChannelsPerWorker; 035 036 final AtomicInteger retainCounter = new AtomicInteger(1); 037 private final ConcurrentLinkedQueue<Runnable> ioTasks = new ConcurrentLinkedQueue<Runnable>(); 038 039 public SelectorWorker(SelectorManager manager) throws IOException { 040 this.manager = manager; 041 selector = Selector.open(); 042 maxChannelsPerWorker = manager.getMaxChannelsPerWorker(); 043 manager.getSelectorExecutor().execute(this); 044 } 045 046 void retain() { 047 if (retainCounter.incrementAndGet() == maxChannelsPerWorker) { 048 manager.onWorkerFullEvent(this); 049 } 050 } 051 052 void release() { 053 int use = retainCounter.decrementAndGet(); 054 if (use == 0) { 055 manager.onWorkerEmptyEvent(this); 056 } else if (use == maxChannelsPerWorker - 1) { 057 manager.onWorkerNotFullEvent(this); 058 } 059 } 060 061 boolean isReleased() { 062 return retainCounter.get()==0; 063 } 064 065 066 public void addIoTask(Runnable work) { 067 ioTasks.add(work); 068 selector.wakeup(); 069 } 070 071 private void processIoTasks() { 072 Runnable task; 073 while( (task= ioTasks.poll()) !=null ) { 074 try { 075 task.run(); 076 } catch (Throwable e) { 077 e.printStackTrace(); 078 } 079 } 080 } 081 082 083 084 public void run() { 085 086 String origName = Thread.currentThread().getName(); 087 try { 088 Thread.currentThread().setName("Selector Worker: " + id); 089 while (!isReleased()) { 090 091 processIoTasks(); 092 093 int count = selector.select(10); 094 095 if (count == 0) { 096 continue; 097 } 098 099 // Get a java.util.Set containing the SelectionKey objects 100 // for all channels that are ready for I/O. 101 Set keys = selector.selectedKeys(); 102 103 for (Iterator i = keys.iterator(); i.hasNext();) { 104 final SelectionKey key = (SelectionKey)i.next(); 105 i.remove(); 106 107 final SelectorSelection s = (SelectorSelection)key.attachment(); 108 try { 109 if( key.isValid() ) { 110 key.interestOps(0); 111 } 112 113 // Kick off another thread to find newly selected keys 114 // while we process the 115 // currently selected keys 116 manager.getChannelExecutor().execute(new Runnable() { 117 public void run() { 118 try { 119 s.onSelect(); 120 s.enable(); 121 } catch (Throwable e) { 122 s.onError(e); 123 } 124 } 125 }); 126 127 } catch (Throwable e) { 128 s.onError(e); 129 } 130 131 } 132 133 } 134 } catch (Throwable e) { 135 e.printStackTrace(); 136 // Notify all the selections that the error occurred. 137 Set keys = selector.keys(); 138 for (Iterator i = keys.iterator(); i.hasNext();) { 139 SelectionKey key = (SelectionKey)i.next(); 140 SelectorSelection s = (SelectorSelection)key.attachment(); 141 s.onError(e); 142 } 143 } finally { 144 try { 145 manager.onWorkerEmptyEvent(this); 146 selector.close(); 147 } catch (IOException ignore) { 148 ignore.printStackTrace(); 149 } 150 Thread.currentThread().setName(origName); 151 } 152 } 153 154}