001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.nio;
018
019import java.io.IOException;
020import java.nio.channels.spi.AbstractSelectableChannel;
021import java.util.LinkedList;
022import java.util.concurrent.*;
023
024/**
025 * The SelectorManager will manage one Selector and the thread that checks the
026 * selector.
027 *
028 * We may need to consider running more than one thread to check the selector if
029 * servicing the selector takes too long.
030 */
031public final class SelectorManager {
032
033    public static final SelectorManager SINGLETON = new SelectorManager();
034
035    private Executor selectorExecutor = createDefaultExecutor();
036    private Executor channelExecutor = selectorExecutor;
037    private final LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
038    private int maxChannelsPerWorker = 1024;
039
040    protected ExecutorService createDefaultExecutor() {
041        ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultCorePoolSize(), getDefaultMaximumPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
042            new ThreadFactory() {
043
044                private long i = 0;
045
046                @Override
047                public Thread newThread(Runnable runnable) {
048                    this.i++;
049                    final Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + this.i);
050                    return t;
051                }
052            });
053
054        return rc;
055    }
056
057    private static int getDefaultCorePoolSize() {
058            return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.corePoolSize", 10);
059    }
060
061    private static int getDefaultMaximumPoolSize() {
062        return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize", 1024);
063    }
064
065    private static int getDefaultKeepAliveTime() {
066        return Integer.getInteger("org.apache.activemq.transport.nio.SelectorManager.keepAliveTime", 30);
067    }
068
069    public static SelectorManager getInstance() {
070        return SINGLETON;
071    }
072
073    public interface Listener {
074        void onSelect(SelectorSelection selector);
075
076        void onError(SelectorSelection selection, Throwable error);
077    }
078
079    public synchronized SelectorSelection register(AbstractSelectableChannel selectableChannel, Listener listener) throws IOException {
080        SelectorSelection selection = null;
081        while (selection == null) {
082            if (freeWorkers.size() > 0) {
083                SelectorWorker worker = freeWorkers.getFirst();
084                if (worker.isReleased()) {
085                    freeWorkers.remove(worker);
086                } else {
087                    worker.retain();
088                    selection = new SelectorSelection(worker, selectableChannel, listener);
089                }
090            } else {
091                // Worker starts /w retain count of 1
092                SelectorWorker worker = new SelectorWorker(this);
093                freeWorkers.addFirst(worker);
094                selection = new SelectorSelection(worker, selectableChannel, listener);
095            }
096        }
097
098        return selection;
099    }
100
101    synchronized void onWorkerFullEvent(SelectorWorker worker) {
102        freeWorkers.remove(worker);
103    }
104
105    public synchronized void onWorkerEmptyEvent(SelectorWorker worker) {
106        freeWorkers.remove(worker);
107    }
108
109    public synchronized void onWorkerNotFullEvent(SelectorWorker worker) {
110        freeWorkers.addFirst(worker);
111    }
112
113    public Executor getChannelExecutor() {
114        return channelExecutor;
115    }
116
117    public void setChannelExecutor(Executor channelExecutor) {
118        this.channelExecutor = channelExecutor;
119    }
120
121    public int getMaxChannelsPerWorker() {
122        return maxChannelsPerWorker;
123    }
124
125    public void setMaxChannelsPerWorker(int maxChannelsPerWorker) {
126        this.maxChannelsPerWorker = maxChannelsPerWorker;
127    }
128
129    public Executor getSelectorExecutor() {
130        return selectorExecutor;
131    }
132
133    public void setSelectorExecutor(Executor selectorExecutor) {
134        this.selectorExecutor = selectorExecutor;
135    }
136}