001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.nio;
018
019import java.io.IOException;
020import java.nio.channels.SelectionKey;
021import java.nio.channels.Selector;
022import java.util.Iterator;
023import java.util.Set;
024import java.util.concurrent.ConcurrentLinkedQueue;
025import java.util.concurrent.atomic.AtomicInteger;
026
027public class SelectorWorker implements Runnable {
028
029    private static final AtomicInteger NEXT_ID = new AtomicInteger();
030
031    final SelectorManager manager;
032    final Selector selector;
033    final int id = NEXT_ID.getAndIncrement();
034    private final int maxChannelsPerWorker;
035
036    final AtomicInteger retainCounter = new AtomicInteger(1);
037    private final ConcurrentLinkedQueue<Runnable> ioTasks = new ConcurrentLinkedQueue<Runnable>();
038       
039    public SelectorWorker(SelectorManager manager) throws IOException {
040        this.manager = manager;
041        selector = Selector.open();
042        maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
043        manager.getSelectorExecutor().execute(this);
044    }
045
046    void retain() {
047        if (retainCounter.incrementAndGet() == maxChannelsPerWorker) {
048            manager.onWorkerFullEvent(this);
049        }
050    }
051
052    void release() {
053        int use = retainCounter.decrementAndGet();
054        if (use == 0) {
055            manager.onWorkerEmptyEvent(this);
056        } else if (use == maxChannelsPerWorker - 1) {
057            manager.onWorkerNotFullEvent(this);
058        }
059    }
060    
061    boolean isReleased() {
062        return retainCounter.get()==0;
063    }
064
065
066    public void addIoTask(Runnable work) {
067        ioTasks.add(work);
068        selector.wakeup();
069    }
070    
071    private void processIoTasks() {
072        Runnable task; 
073        while( (task= ioTasks.poll()) !=null ) {
074            try {
075                task.run();
076            } catch (Throwable e) {
077                e.printStackTrace();
078            }
079        }
080    }
081
082    
083
084    public void run() {
085
086        String origName = Thread.currentThread().getName();
087        try {
088            Thread.currentThread().setName("Selector Worker: " + id);
089            while (!isReleased()) {
090                
091                processIoTasks();
092                
093                int count = selector.select(10);
094                
095                if (count == 0) {
096                    continue;
097                }
098
099                // Get a java.util.Set containing the SelectionKey objects
100                // for all channels that are ready for I/O.
101                Set keys = selector.selectedKeys();
102
103                for (Iterator i = keys.iterator(); i.hasNext();) {
104                    final SelectionKey key = (SelectionKey)i.next();
105                    i.remove();
106
107                    final SelectorSelection s = (SelectorSelection)key.attachment();
108                    try {
109                        if( key.isValid() ) {
110                            key.interestOps(0);
111                        }
112
113                        // Kick off another thread to find newly selected keys
114                        // while we process the
115                        // currently selected keys
116                        manager.getChannelExecutor().execute(new Runnable() {
117                            public void run() {
118                                try {
119                                    s.onSelect();
120                                    s.enable();
121                                } catch (Throwable e) {
122                                    s.onError(e);
123                                }
124                            }
125                        });
126
127                    } catch (Throwable e) {
128                        s.onError(e);
129                    }
130
131                }
132
133            }
134        } catch (Throwable e) {                 
135            e.printStackTrace();
136            // Notify all the selections that the error occurred.
137            Set keys = selector.keys();
138            for (Iterator i = keys.iterator(); i.hasNext();) {
139                SelectionKey key = (SelectionKey)i.next();
140                SelectorSelection s = (SelectorSelection)key.attachment();
141                s.onError(e);
142            }
143        } finally {
144            try {
145                manager.onWorkerEmptyEvent(this);
146                selector.close();
147            } catch (IOException ignore) {
148                ignore.printStackTrace();
149            }
150            Thread.currentThread().setName(origName);
151        }
152    }
153
154}