001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.util.concurrent;
018
019import java.util.PriorityQueue;
020import java.util.concurrent.Executor;
021import java.util.concurrent.TimeUnit;
022import java.util.concurrent.atomic.AtomicInteger;
023import java.util.concurrent.locks.Condition;
024import java.util.concurrent.locks.ReentrantLock;
025import java.util.function.Consumer;
026
027public class AsyncCompletionService<V> {
028
029    private final Executor executor;
030    private final boolean ordered;
031    private final PriorityQueue<Task> queue;
032    private final AtomicInteger nextId = new AtomicInteger();
033    private final AtomicInteger index = new AtomicInteger();
034    private final ReentrantLock lock;
035    private final Condition available;
036
037    public AsyncCompletionService(Executor executor, boolean ordered) {
038        this(executor, ordered, null, 0);
039    }
040
041    public AsyncCompletionService(Executor executor, boolean ordered, ReentrantLock lock) {
042        this(executor, ordered, lock, 0);
043    }
044
045    public AsyncCompletionService(Executor executor, boolean ordered, ReentrantLock lock, int capacity) {
046        this.executor = executor;
047        this.ordered = ordered;
048        this.lock = lock != null ? lock : new ReentrantLock();
049        this.available = this.lock.newCondition();
050        if (capacity > 0) {
051            queue = new PriorityQueue<>(capacity);
052        } else {
053            queue = new PriorityQueue<>();
054        }
055    }
056
057    public ReentrantLock getLock() {
058        return lock;
059    }
060
061    public void submit(Consumer<Consumer<V>> runner) {
062        Task f = new Task(nextId.getAndIncrement(), runner);
063        this.executor.execute(f);
064    }
065
066    public void skip() {
067        index.incrementAndGet();
068    }
069
070    public V pollUnordered() {
071        final ReentrantLock lock = this.lock;
072        lock.lock();
073        try {
074            Task t = queue.poll();
075            return t != null ? t.result : null;
076        } finally {
077            lock.unlock();
078        }
079    }
080
081    public V poll() {
082        final ReentrantLock lock = this.lock;
083        lock.lock();
084        try {
085            Task t = queue.peek();
086            if (t != null && (!ordered || index.compareAndSet(t.id, t.id + 1))) {
087                queue.poll();
088                return t.result;
089            } else {
090                return null;
091            }
092        } finally {
093            lock.unlock();
094        }
095    }
096
097    public V poll(long timeout, TimeUnit unit) throws InterruptedException {
098        long nanos = unit.toNanos(timeout);
099        final ReentrantLock lock = this.lock;
100        lock.lockInterruptibly();
101        try {
102            for (;;) {
103                Task t = queue.peek();
104                if (t != null && (!ordered || index.compareAndSet(t.id, t.id + 1))) {
105                    queue.poll();
106                    return t.result;
107                }
108                if (nanos <= 0) {
109                    return null;
110                } else {
111                    nanos = available.awaitNanos(nanos);
112                }
113            }
114        } finally {
115            lock.unlock();
116        }
117    }
118
119    public V take() throws InterruptedException {
120        final ReentrantLock lock = this.lock;
121        lock.lockInterruptibly();
122        try {
123            for (;;) {
124                Task t = queue.peek();
125                if (t != null && (!ordered || index.compareAndSet(t.id, t.id + 1))) {
126                    queue.poll();
127                    return t.result;
128                }
129                available.await();
130            }
131        } finally {
132            lock.unlock();
133        }
134    }
135
136    private void complete(Task task) {
137        final ReentrantLock lock = this.lock;
138        lock.lock();
139        try {
140            queue.add(task);
141            available.signalAll();
142        } finally {
143            lock.unlock();
144        }
145    }
146
147    private class Task implements Runnable, Comparable<Task>, Consumer<V> {
148        private final int id;
149        private final Consumer<Consumer<V>> runner;
150        private V result;
151
152        Task(int id, Consumer<Consumer<V>> runner) {
153            this.id = id;
154            this.runner = runner;
155        }
156
157        @Override
158        public void run() {
159            runner.accept(this);
160        }
161
162        @Override
163        public void accept(V result) {
164            this.result = result;
165            complete(this);
166        }
167
168        @Override
169        public int compareTo(Task other) {
170            return Integer.compare(this.id, other.id);
171        }
172
173        @Override
174        public String toString() {
175            return "SubmitOrderedTask[" + this.id + "]";
176        }
177    }
178}