001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.concurrent.ArrayBlockingQueue; 020 import java.util.concurrent.BlockingQueue; 021 import java.util.concurrent.RejectedExecutionException; 022 import java.util.concurrent.TimeUnit; 023 024 import org.apache.camel.Consumer; 025 import org.apache.camel.Endpoint; 026 import org.apache.camel.Exchange; 027 import org.apache.camel.PollingConsumerPollingStrategy; 028 import org.apache.camel.Processor; 029 import org.apache.camel.spi.ExceptionHandler; 030 import org.apache.camel.util.ServiceHelper; 031 import org.slf4j.Logger; 032 import org.slf4j.LoggerFactory; 033 034 /** 035 * A default implementation of the {@link org.apache.camel.PollingConsumer} which uses the normal 036 * asynchronous consumer mechanism along with a {@link BlockingQueue} to allow 037 * the caller to pull messages on demand. 038 * 039 * @version 040 */ 041 public class EventDrivenPollingConsumer extends PollingConsumerSupport implements Processor { 042 private static final transient Logger LOG = LoggerFactory.getLogger(EventDrivenPollingConsumer.class); 043 private final BlockingQueue<Exchange> queue; 044 private ExceptionHandler interruptedExceptionHandler = new LoggingExceptionHandler(EventDrivenPollingConsumer.class); 045 private Consumer consumer; 046 047 public EventDrivenPollingConsumer(Endpoint endpoint) { 048 this(endpoint, new ArrayBlockingQueue<Exchange>(1000)); 049 } 050 051 public EventDrivenPollingConsumer(Endpoint endpoint, BlockingQueue<Exchange> queue) { 052 super(endpoint); 053 this.queue = queue; 054 } 055 056 public Exchange receiveNoWait() { 057 return receive(0); 058 } 059 060 public Exchange receive() { 061 // must be started 062 if (!isRunAllowed() || !isStarted()) { 063 throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name()); 064 } 065 066 while (isRunAllowed()) { 067 try { 068 beforePoll(0); 069 // take will block waiting for message 070 return queue.take(); 071 } catch (InterruptedException e) { 072 handleInterruptedException(e); 073 } finally { 074 afterPoll(); 075 } 076 } 077 LOG.trace("Consumer is not running, so returning null"); 078 return null; 079 } 080 081 public Exchange receive(long timeout) { 082 // must be started 083 if (!isRunAllowed() || !isStarted()) { 084 throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name()); 085 } 086 087 try { 088 // use the timeout value returned from beforePoll 089 timeout = beforePoll(timeout); 090 return queue.poll(timeout, TimeUnit.MILLISECONDS); 091 } catch (InterruptedException e) { 092 handleInterruptedException(e); 093 return null; 094 } finally { 095 afterPoll(); 096 } 097 } 098 099 public void process(Exchange exchange) throws Exception { 100 queue.offer(exchange); 101 } 102 103 public ExceptionHandler getInterruptedExceptionHandler() { 104 return interruptedExceptionHandler; 105 } 106 107 public void setInterruptedExceptionHandler(ExceptionHandler interruptedExceptionHandler) { 108 this.interruptedExceptionHandler = interruptedExceptionHandler; 109 } 110 111 protected void handleInterruptedException(InterruptedException e) { 112 getInterruptedExceptionHandler().handleException(e); 113 } 114 115 protected long beforePoll(long timeout) { 116 if (consumer instanceof PollingConsumerPollingStrategy) { 117 PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer; 118 try { 119 timeout = strategy.beforePoll(timeout); 120 } catch (Exception e) { 121 LOG.debug("Error occurred before polling " + consumer + ". This exception will be ignored.", e); 122 } 123 } 124 return timeout; 125 } 126 127 protected void afterPoll() { 128 if (consumer instanceof PollingConsumerPollingStrategy) { 129 PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer; 130 try { 131 strategy.afterPoll(); 132 } catch (Exception e) { 133 LOG.debug("Error occurred after polling " + consumer + ". This exception will be ignored.", e); 134 } 135 } 136 } 137 138 protected void doStart() throws Exception { 139 // lets add ourselves as a consumer 140 consumer = getEndpoint().createConsumer(this); 141 142 // if the consumer has a polling strategy then invoke that 143 if (consumer instanceof PollingConsumerPollingStrategy) { 144 PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer; 145 strategy.onInit(); 146 } else { 147 // for regular consumers start it 148 ServiceHelper.startService(consumer); 149 } 150 } 151 152 protected void doStop() throws Exception { 153 ServiceHelper.stopService(consumer); 154 } 155 156 protected void doShutdown() throws Exception { 157 ServiceHelper.stopAndShutdownService(consumer); 158 } 159 }