001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.nio;
018
019import java.io.IOException;
020import java.nio.channels.SelectionKey;
021import java.nio.channels.Selector;
022import java.util.Iterator;
023import java.util.Set;
024import java.util.concurrent.ConcurrentLinkedQueue;
025import java.util.concurrent.atomic.AtomicInteger;
026
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030public class SelectorWorker implements Runnable {
031
032    private static final Logger LOG = LoggerFactory.getLogger(SelectorWorker.class);
033
034    private static final AtomicInteger NEXT_ID = new AtomicInteger();
035
036    final SelectorManager manager;
037    final Selector selector;
038    final int id = NEXT_ID.getAndIncrement();
039    private final int maxChannelsPerWorker;
040
041    final AtomicInteger retainCounter = new AtomicInteger(1);
042    private final ConcurrentLinkedQueue<Runnable> ioTasks = new ConcurrentLinkedQueue<Runnable>();
043
044    public SelectorWorker(SelectorManager manager) throws IOException {
045        this.manager = manager;
046        selector = Selector.open();
047        maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
048        manager.getSelectorExecutor().execute(this);
049    }
050
051    void retain() {
052        if (retainCounter.incrementAndGet() == maxChannelsPerWorker) {
053            manager.onWorkerFullEvent(this);
054        }
055    }
056
057    void release() {
058        int use = retainCounter.decrementAndGet();
059        if (use == 0) {
060            manager.onWorkerEmptyEvent(this);
061        } else if (use == maxChannelsPerWorker - 1) {
062            manager.onWorkerNotFullEvent(this);
063        }
064    }
065
066    boolean isReleased() {
067        return retainCounter.get() == 0;
068    }
069
070    public void addIoTask(Runnable work) {
071        ioTasks.add(work);
072        selector.wakeup();
073    }
074
075    private void processIoTasks() {
076        Runnable task;
077        while ((task = ioTasks.poll()) != null) {
078            try {
079                task.run();
080            } catch (Throwable e) {
081                LOG.debug(e.getMessage(), e);
082            }
083        }
084    }
085
086    @Override
087    public void run() {
088
089        String origName = Thread.currentThread().getName();
090        try {
091            Thread.currentThread().setName("Selector Worker: " + id);
092            while (!isReleased()) {
093
094                processIoTasks();
095
096                int count = selector.select(10);
097
098                if (count == 0) {
099                    continue;
100                }
101
102                // Get a java.util.Set containing the SelectionKey objects
103                // for all channels that are ready for I/O.
104                Set<SelectionKey> keys = selector.selectedKeys();
105
106                for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext();) {
107                    final SelectionKey key = i.next();
108                    i.remove();
109
110                    final SelectorSelection s = (SelectorSelection) key.attachment();
111                    try {
112                        if (key.isValid()) {
113                            key.interestOps(0);
114                        }
115
116                        // Kick off another thread to find newly selected keys
117                        // while we process the
118                        // currently selected keys
119                        manager.getChannelExecutor().execute(new Runnable() {
120                            @Override
121                            public void run() {
122                                try {
123                                    s.onSelect();
124                                    s.enable();
125                                } catch (Throwable e) {
126                                    s.onError(e);
127                                }
128                            }
129                        });
130
131                    } catch (Throwable e) {
132                        s.onError(e);
133                    }
134                }
135            }
136        } catch (Throwable e) {
137            // Notify all the selections that the error occurred.
138            Set<SelectionKey> keys = selector.keys();
139            for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext();) {
140                SelectionKey key = i.next();
141                SelectorSelection s = (SelectorSelection) key.attachment();
142                s.onError(e);
143            }
144        } finally {
145            try {
146                manager.onWorkerEmptyEvent(this);
147                selector.close();
148            } catch (IOException ignore) {
149                LOG.debug(ignore.getMessage(), ignore);
150            }
151            Thread.currentThread().setName(origName);
152        }
153    }
154}