001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.util.concurrent; 018 019import java.util.PriorityQueue; 020import java.util.concurrent.Executor; 021import java.util.concurrent.TimeUnit; 022import java.util.concurrent.atomic.AtomicInteger; 023import java.util.concurrent.locks.Condition; 024import java.util.concurrent.locks.ReentrantLock; 025import java.util.function.Consumer; 026 027public class AsyncCompletionService<V> { 028 029 private final Executor executor; 030 private final boolean ordered; 031 private final PriorityQueue<Task> queue; 032 private final AtomicInteger nextId = new AtomicInteger(); 033 private final AtomicInteger index = new AtomicInteger(); 034 private final ReentrantLock lock; 035 private final Condition available; 036 037 public AsyncCompletionService(Executor executor, boolean ordered) { 038 this(executor, ordered, null, 0); 039 } 040 041 public AsyncCompletionService(Executor executor, boolean ordered, ReentrantLock lock) { 042 this(executor, ordered, lock, 0); 043 } 044 045 public AsyncCompletionService(Executor executor, boolean ordered, ReentrantLock lock, int capacity) { 046 this.executor = executor; 047 this.ordered = ordered; 048 this.lock = lock != null ? lock : new ReentrantLock(); 049 this.available = this.lock.newCondition(); 050 if (capacity > 0) { 051 queue = new PriorityQueue<>(capacity); 052 } else { 053 queue = new PriorityQueue<>(); 054 } 055 } 056 057 public ReentrantLock getLock() { 058 return lock; 059 } 060 061 public void submit(Consumer<Consumer<V>> runner) { 062 Task f = new Task(nextId.getAndIncrement(), runner); 063 this.executor.execute(f); 064 } 065 066 public void skip() { 067 index.incrementAndGet(); 068 } 069 070 public V pollUnordered() { 071 final ReentrantLock lock = this.lock; 072 lock.lock(); 073 try { 074 Task t = queue.poll(); 075 return t != null ? t.result : null; 076 } finally { 077 lock.unlock(); 078 } 079 } 080 081 public V poll() { 082 final ReentrantLock lock = this.lock; 083 lock.lock(); 084 try { 085 Task t = queue.peek(); 086 if (t != null && (!ordered || index.compareAndSet(t.id, t.id + 1))) { 087 queue.poll(); 088 return t.result; 089 } else { 090 return null; 091 } 092 } finally { 093 lock.unlock(); 094 } 095 } 096 097 public V poll(long timeout, TimeUnit unit) throws InterruptedException { 098 long nanos = unit.toNanos(timeout); 099 final ReentrantLock lock = this.lock; 100 lock.lockInterruptibly(); 101 try { 102 for (;;) { 103 Task t = queue.peek(); 104 if (t != null && (!ordered || index.compareAndSet(t.id, t.id + 1))) { 105 queue.poll(); 106 return t.result; 107 } 108 if (nanos <= 0) { 109 return null; 110 } else { 111 nanos = available.awaitNanos(nanos); 112 } 113 } 114 } finally { 115 lock.unlock(); 116 } 117 } 118 119 public V take() throws InterruptedException { 120 final ReentrantLock lock = this.lock; 121 lock.lockInterruptibly(); 122 try { 123 for (;;) { 124 Task t = queue.peek(); 125 if (t != null && (!ordered || index.compareAndSet(t.id, t.id + 1))) { 126 queue.poll(); 127 return t.result; 128 } 129 available.await(); 130 } 131 } finally { 132 lock.unlock(); 133 } 134 } 135 136 private void complete(Task task) { 137 final ReentrantLock lock = this.lock; 138 lock.lock(); 139 try { 140 queue.add(task); 141 available.signalAll(); 142 } finally { 143 lock.unlock(); 144 } 145 } 146 147 private class Task implements Runnable, Comparable<Task>, Consumer<V> { 148 private final int id; 149 private final Consumer<Consumer<V>> runner; 150 private V result; 151 152 Task(int id, Consumer<Consumer<V>> runner) { 153 this.id = id; 154 this.runner = runner; 155 } 156 157 @Override 158 public void run() { 159 runner.accept(this); 160 } 161 162 @Override 163 public void accept(V result) { 164 this.result = result; 165 complete(this); 166 } 167 168 @Override 169 public int compareTo(Task other) { 170 return Integer.compare(this.id, other.id); 171 } 172 173 @Override 174 public String toString() { 175 return "SubmitOrderedTask[" + this.id + "]"; 176 } 177 } 178}