001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.nio; 018 019import java.io.IOException; 020import java.nio.channels.spi.AbstractSelectableChannel; 021import java.util.LinkedList; 022import java.util.concurrent.BlockingQueue; 023import java.util.concurrent.Executor; 024import java.util.concurrent.ExecutorService; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.RejectedExecutionHandler; 027import java.util.concurrent.SynchronousQueue; 028import java.util.concurrent.ThreadFactory; 029import java.util.concurrent.ThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031 032/** 033 * The SelectorManager will manage one Selector and the thread that checks the 034 * selector. 035 * 036 * We may need to consider running more than one thread to check the selector if 037 * servicing the selector takes too long. 038 */ 039public final class SelectorManager { 040 041 public static final SelectorManager SINGLETON = new SelectorManager(); 042 043 private Executor selectorExecutor = createDefaultExecutor(); 044 private Executor channelExecutor = selectorExecutor; 045 private final LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>(); 046 private int maxChannelsPerWorker = -1; 047 048 protected ExecutorService createDefaultExecutor() { 049 ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultCorePoolSize(), getDefaultMaximumPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, newWorkQueue(), 050 new ThreadFactory() { 051 052 private long i = 0; 053 054 @Override 055 public Thread newThread(Runnable runnable) { 056 Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + (i++)); 057 t.setDaemon(true); 058 return t; 059 } 060 061 }, newRejectionHandler()); 062 return rc; 063 } 064 065 private RejectedExecutionHandler newRejectionHandler() { 066 return canRejectWork() ? new ThreadPoolExecutor.AbortPolicy() : new ThreadPoolExecutor.CallerRunsPolicy(); 067 } 068 069 private BlockingQueue<Runnable> newWorkQueue() { 070 final int workQueueCapicity = getDefaultWorkQueueCapacity(); 071 return workQueueCapicity > 0 ? new LinkedBlockingQueue<Runnable>(workQueueCapicity) : new SynchronousQueue<Runnable>(); 072 } 073 074 private static boolean canRejectWork() { 075 return Boolean.getBoolean("org.apache.activemq.transport.nio.SelectorManager.rejectWork"); 076 } 077 078 private static int getDefaultWorkQueueCapacity() { 079 return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.workQueueCapacity", 0); 080 } 081 082 private static int getDefaultCorePoolSize() { 083 return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.corePoolSize", 10); 084 } 085 086 private static int getDefaultMaximumPoolSize() { 087 return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize", 1024); 088 } 089 090 private static int getDefaultKeepAliveTime() { 091 return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.keepAliveTime", 30); 092 } 093 094 private static int getDefaultMaxChannelsPerWorker() { 095 return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.maxChannelsPerWorker", 1024); 096 } 097 098 public static SelectorManager getInstance() { 099 return SINGLETON; 100 } 101 102 public interface Listener { 103 void onSelect(SelectorSelection selector); 104 105 void onError(SelectorSelection selection, Throwable error); 106 } 107 108 public synchronized SelectorSelection register(AbstractSelectableChannel selectableChannel, Listener listener) throws IOException { 109 SelectorSelection selection = null; 110 while (selection == null) { 111 if (freeWorkers.size() > 0) { 112 SelectorWorker worker = freeWorkers.getFirst(); 113 if (worker.isReleased()) { 114 freeWorkers.remove(worker); 115 } else { 116 worker.retain(); 117 selection = new SelectorSelection(worker, selectableChannel, listener); 118 } 119 } else { 120 // Worker starts /w retain count of 1 121 SelectorWorker worker = new SelectorWorker(this); 122 freeWorkers.addFirst(worker); 123 selection = new SelectorSelection(worker, selectableChannel, listener); 124 } 125 } 126 127 return selection; 128 } 129 130 synchronized void onWorkerFullEvent(SelectorWorker worker) { 131 freeWorkers.remove(worker); 132 } 133 134 public synchronized void onWorkerEmptyEvent(SelectorWorker worker) { 135 freeWorkers.remove(worker); 136 } 137 138 public synchronized void onWorkerNotFullEvent(SelectorWorker worker) { 139 freeWorkers.addFirst(worker); 140 } 141 142 public Executor getChannelExecutor() { 143 return channelExecutor; 144 } 145 146 public void setChannelExecutor(Executor channelExecutor) { 147 this.channelExecutor = channelExecutor; 148 } 149 150 public int getMaxChannelsPerWorker() { 151 return maxChannelsPerWorker >= 0 ? maxChannelsPerWorker : getDefaultMaxChannelsPerWorker(); 152 } 153 154 public void setMaxChannelsPerWorker(int maxChannelsPerWorker) { 155 this.maxChannelsPerWorker = maxChannelsPerWorker; 156 } 157 158 public Executor getSelectorExecutor() { 159 return selectorExecutor; 160 } 161 162 public void setSelectorExecutor(Executor selectorExecutor) { 163 this.selectorExecutor = selectorExecutor; 164 } 165}