001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.concurrent.ArrayBlockingQueue; 020import java.util.concurrent.BlockingQueue; 021import java.util.concurrent.LinkedBlockingQueue; 022import java.util.concurrent.RejectedExecutionException; 023import java.util.concurrent.TimeUnit; 024 025import org.apache.camel.Consumer; 026import org.apache.camel.Endpoint; 027import org.apache.camel.Exchange; 028import org.apache.camel.ExchangeTimedOutException; 029import org.apache.camel.IsSingleton; 030import org.apache.camel.PollingConsumerPollingStrategy; 031import org.apache.camel.Processor; 032import org.apache.camel.spi.ExceptionHandler; 033import org.apache.camel.support.LoggingExceptionHandler; 034import org.apache.camel.util.ServiceHelper; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * A default implementation of the {@link org.apache.camel.PollingConsumer} which uses the normal 040 * asynchronous consumer mechanism along with a {@link BlockingQueue} to allow 041 * the caller to pull messages on demand. 042 * 043 * @version 044 */ 045public class EventDrivenPollingConsumer extends PollingConsumerSupport implements Processor, IsSingleton { 046 private static final Logger LOG = LoggerFactory.getLogger(EventDrivenPollingConsumer.class); 047 private final BlockingQueue<Exchange> queue; 048 private ExceptionHandler interruptedExceptionHandler; 049 private Consumer consumer; 050 private boolean blockWhenFull = true; 051 private long blockTimeout; 052 private final int queueCapacity; 053 054 public EventDrivenPollingConsumer(Endpoint endpoint) { 055 this(endpoint, 1000); 056 } 057 058 public EventDrivenPollingConsumer(Endpoint endpoint, int queueSize) { 059 super(endpoint); 060 this.queueCapacity = queueSize; 061 if (queueSize <= 0) { 062 this.queue = new LinkedBlockingQueue<Exchange>(); 063 } else { 064 this.queue = new ArrayBlockingQueue<Exchange>(queueSize); 065 } 066 this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class); 067 } 068 069 public EventDrivenPollingConsumer(Endpoint endpoint, BlockingQueue<Exchange> queue) { 070 super(endpoint); 071 this.queue = queue; 072 this.queueCapacity = queue.remainingCapacity(); 073 this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class); 074 } 075 076 public boolean isBlockWhenFull() { 077 return blockWhenFull; 078 } 079 080 public void setBlockWhenFull(boolean blockWhenFull) { 081 this.blockWhenFull = blockWhenFull; 082 } 083 084 public long getBlockTimeout() { 085 return blockTimeout; 086 } 087 088 public void setBlockTimeout(long blockTimeout) { 089 this.blockTimeout = blockTimeout; 090 } 091 092 /** 093 * Gets the queue capacity. 094 */ 095 public int getQueueCapacity() { 096 return queueCapacity; 097 } 098 099 /** 100 * Gets the current queue size (no of elements in the queue). 101 */ 102 public int getQueueSize() { 103 return queue.size(); 104 } 105 106 public Exchange receiveNoWait() { 107 return receive(0); 108 } 109 110 public Exchange receive() { 111 // must be started 112 if (!isRunAllowed() || !isStarted()) { 113 throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name()); 114 } 115 116 while (isRunAllowed()) { 117 try { 118 beforePoll(0); 119 // take will block waiting for message 120 return queue.take(); 121 } catch (InterruptedException e) { 122 handleInterruptedException(e); 123 } finally { 124 afterPoll(); 125 } 126 } 127 LOG.trace("Consumer is not running, so returning null"); 128 return null; 129 } 130 131 public Exchange receive(long timeout) { 132 // must be started 133 if (!isRunAllowed() || !isStarted()) { 134 throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name()); 135 } 136 137 try { 138 // use the timeout value returned from beforePoll 139 timeout = beforePoll(timeout); 140 return queue.poll(timeout, TimeUnit.MILLISECONDS); 141 } catch (InterruptedException e) { 142 handleInterruptedException(e); 143 return null; 144 } finally { 145 afterPoll(); 146 } 147 } 148 149 public void process(Exchange exchange) throws Exception { 150 if (isBlockWhenFull()) { 151 try { 152 if (getBlockTimeout() <= 0) { 153 queue.put(exchange); 154 } else { 155 boolean added = queue.offer(exchange, getBlockTimeout(), TimeUnit.MILLISECONDS); 156 if (!added) { 157 throw new ExchangeTimedOutException(exchange, getBlockTimeout()); 158 } 159 } 160 } catch (InterruptedException e) { 161 // ignore 162 log.debug("Put interrupted, are we stopping? {}", isStopping() || isStopped()); 163 } 164 } else { 165 queue.add(exchange); 166 } 167 } 168 169 public ExceptionHandler getInterruptedExceptionHandler() { 170 return interruptedExceptionHandler; 171 } 172 173 public void setInterruptedExceptionHandler(ExceptionHandler interruptedExceptionHandler) { 174 this.interruptedExceptionHandler = interruptedExceptionHandler; 175 } 176 177 protected void handleInterruptedException(InterruptedException e) { 178 getInterruptedExceptionHandler().handleException(e); 179 } 180 181 protected long beforePoll(long timeout) { 182 if (consumer instanceof PollingConsumerPollingStrategy) { 183 PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer; 184 try { 185 timeout = strategy.beforePoll(timeout); 186 } catch (Exception e) { 187 LOG.debug("Error occurred before polling " + consumer + ". This exception will be ignored.", e); 188 } 189 } 190 return timeout; 191 } 192 193 protected void afterPoll() { 194 if (consumer instanceof PollingConsumerPollingStrategy) { 195 PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer; 196 try { 197 strategy.afterPoll(); 198 } catch (Exception e) { 199 LOG.debug("Error occurred after polling " + consumer + ". This exception will be ignored.", e); 200 } 201 } 202 } 203 204 protected void doStart() throws Exception { 205 // lets add ourselves as a consumer 206 consumer = getEndpoint().createConsumer(this); 207 208 // if the consumer has a polling strategy then invoke that 209 if (consumer instanceof PollingConsumerPollingStrategy) { 210 PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer; 211 strategy.onInit(); 212 } else { 213 ServiceHelper.startService(consumer); 214 } 215 } 216 217 protected void doStop() throws Exception { 218 ServiceHelper.stopService(consumer); 219 } 220 221 protected void doShutdown() throws Exception { 222 ServiceHelper.stopAndShutdownService(consumer); 223 queue.clear(); 224 } 225 226 @Override 227 // As the consumer could take the messages at once, so we cannot release the consumer 228 public boolean isSingleton() { 229 return true; 230 } 231}