001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.nio; 018 019import java.io.IOException; 020import java.nio.channels.SelectionKey; 021import java.nio.channels.Selector; 022import java.util.Iterator; 023import java.util.Set; 024import java.util.concurrent.ConcurrentLinkedQueue; 025import java.util.concurrent.atomic.AtomicInteger; 026 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030public class SelectorWorker implements Runnable { 031 032 private static final Logger LOG = LoggerFactory.getLogger(SelectorWorker.class); 033 034 private static final AtomicInteger NEXT_ID = new AtomicInteger(); 035 036 final SelectorManager manager; 037 final Selector selector; 038 final int id = NEXT_ID.getAndIncrement(); 039 private final int maxChannelsPerWorker; 040 041 final AtomicInteger retainCounter = new AtomicInteger(1); 042 private final ConcurrentLinkedQueue<Runnable> ioTasks = new ConcurrentLinkedQueue<Runnable>(); 043 044 public SelectorWorker(SelectorManager manager) throws IOException { 045 this.manager = manager; 046 selector = Selector.open(); 047 maxChannelsPerWorker = manager.getMaxChannelsPerWorker(); 048 manager.getSelectorExecutor().execute(this); 049 } 050 051 void retain() { 052 if (retainCounter.incrementAndGet() == maxChannelsPerWorker) { 053 manager.onWorkerFullEvent(this); 054 } 055 } 056 057 void release() { 058 int use = retainCounter.decrementAndGet(); 059 if (use == 0) { 060 manager.onWorkerEmptyEvent(this); 061 } else if (use == maxChannelsPerWorker - 1) { 062 manager.onWorkerNotFullEvent(this); 063 } 064 } 065 066 boolean isReleased() { 067 return retainCounter.get() == 0; 068 } 069 070 public void addIoTask(Runnable work) { 071 ioTasks.add(work); 072 selector.wakeup(); 073 } 074 075 private void processIoTasks() { 076 Runnable task; 077 while ((task = ioTasks.poll()) != null) { 078 try { 079 task.run(); 080 } catch (Throwable e) { 081 LOG.debug(e.getMessage(), e); 082 } 083 } 084 } 085 086 @Override 087 public void run() { 088 089 String origName = Thread.currentThread().getName(); 090 try { 091 Thread.currentThread().setName("Selector Worker: " + id); 092 while (!isReleased()) { 093 094 processIoTasks(); 095 096 int count = selector.select(10); 097 098 if (count == 0) { 099 continue; 100 } 101 102 // Get a java.util.Set containing the SelectionKey objects 103 // for all channels that are ready for I/O. 104 Set<SelectionKey> keys = selector.selectedKeys(); 105 106 for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext();) { 107 final SelectionKey key = i.next(); 108 i.remove(); 109 110 final SelectorSelection s = (SelectorSelection) key.attachment(); 111 try { 112 if (key.isValid()) { 113 key.interestOps(0); 114 } 115 116 // Kick off another thread to find newly selected keys 117 // while we process the 118 // currently selected keys 119 manager.getChannelExecutor().execute(new Runnable() { 120 @Override 121 public void run() { 122 try { 123 s.onSelect(); 124 s.enable(); 125 } catch (Throwable e) { 126 s.onError(e); 127 } 128 } 129 }); 130 131 } catch (Throwable e) { 132 s.onError(e); 133 } 134 } 135 } 136 } catch (Throwable e) { 137 // Notify all the selections that the error occurred. 138 Set<SelectionKey> keys = selector.keys(); 139 for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext();) { 140 SelectionKey key = i.next(); 141 SelectorSelection s = (SelectorSelection) key.attachment(); 142 s.onError(e); 143 } 144 } finally { 145 try { 146 manager.onWorkerEmptyEvent(this); 147 selector.close(); 148 } catch (IOException ignore) { 149 LOG.debug(ignore.getMessage(), ignore); 150 } 151 Thread.currentThread().setName(origName); 152 } 153 } 154}