001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.util.concurrent;
018
019import java.util.concurrent.Callable;
020import java.util.concurrent.CompletionService;
021import java.util.concurrent.DelayQueue;
022import java.util.concurrent.Delayed;
023import java.util.concurrent.Executor;
024import java.util.concurrent.Future;
025import java.util.concurrent.FutureTask;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicInteger;
028
029/**
030 * A {@link java.util.concurrent.CompletionService} that orders the completed tasks
031 * in the same order as they where submitted.
032 *
033 * @version 
034 */
035public class SubmitOrderedCompletionService<V> implements CompletionService<V> {
036    
037    private final Executor executor;
038
039    // the idea to order the completed task in the same order as they where submitted is to leverage
040    // the delay queue. With the delay queue we can control the order by the getDelay and compareTo methods
041    // where we can order the tasks in the same order as they where submitted.
042    private final DelayQueue<SubmitOrderFutureTask> completionQueue = new DelayQueue<SubmitOrderFutureTask>();
043
044    // id is the unique id that determines the order in which tasks was submitted (incrementing)
045    private final AtomicInteger id = new AtomicInteger();
046    // index is the index of the next id that should expire and thus be ready to take from the delayed queue
047    private final AtomicInteger index = new AtomicInteger();
048
049    private class SubmitOrderFutureTask extends FutureTask<V> implements Delayed {
050
051        // the id this task was assigned
052        private final long id;
053
054        SubmitOrderFutureTask(long id, Callable<V> voidCallable) {
055            super(voidCallable);
056            this.id = id;
057        }
058
059        SubmitOrderFutureTask(long id, Runnable runnable, V result) {
060            super(runnable, result);
061            this.id = id;
062        }
063
064        public long getDelay(TimeUnit unit) {
065            // if the answer is 0 then this task is ready to be taken
066            return id - index.get();
067        }
068
069        @SuppressWarnings("unchecked")
070        public int compareTo(Delayed o) {
071            SubmitOrderFutureTask other = (SubmitOrderFutureTask) o;
072            return (int) (this.id - other.id);
073        }
074
075        @Override
076        protected void done() {
077            // when we are done add to the completion queue
078            completionQueue.add(this);
079        }
080
081        @Override
082        public String toString() {
083            // output using zero-based index
084            return "SubmitOrderedFutureTask[" + (id - 1) + "]";
085        }
086    }
087
088    public SubmitOrderedCompletionService(Executor executor) {
089        this.executor = executor;
090    }
091
092    public Future<V> submit(Callable<V> task) {
093        if (task == null) {
094            throw new IllegalArgumentException("Task must be provided");
095        }
096        SubmitOrderFutureTask f = new SubmitOrderFutureTask(id.incrementAndGet(), task);
097        executor.execute(f);
098        return f;
099    }
100
101    public Future<V> submit(Runnable task, Object result) {
102        if (task == null) {
103            throw new IllegalArgumentException("Task must be provided");
104        }
105        SubmitOrderFutureTask f = new SubmitOrderFutureTask(id.incrementAndGet(), task, null);
106        executor.execute(f);
107        return f;
108    }
109
110    public Future<V> take() throws InterruptedException {
111        index.incrementAndGet();
112        return completionQueue.take();
113    }
114
115    public Future<V> poll() {
116        index.incrementAndGet();
117        Future<V> answer = completionQueue.poll();
118        if (answer == null) {
119            // decrease counter if we didnt get any data
120            index.decrementAndGet();
121        }
122        return answer;
123    }
124
125    public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
126        index.incrementAndGet();
127        Future<V> answer = completionQueue.poll(timeout, unit);
128        if (answer == null) {
129            // decrease counter if we didnt get any data
130            index.decrementAndGet();
131        }
132        return answer;
133    }
134
135    /**
136     * Marks the current task as timeout, which allows you to poll the next
137     * tasks which may already have been completed.
138     */
139    public void timeoutTask() {
140        index.incrementAndGet();
141    }
142
143}