001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.concurrent.ArrayBlockingQueue;
020import java.util.concurrent.BlockingQueue;
021import java.util.concurrent.LinkedBlockingQueue;
022import java.util.concurrent.RejectedExecutionException;
023import java.util.concurrent.TimeUnit;
024
025import org.apache.camel.Consumer;
026import org.apache.camel.Endpoint;
027import org.apache.camel.Exchange;
028import org.apache.camel.ExchangeTimedOutException;
029import org.apache.camel.IsSingleton;
030import org.apache.camel.PollingConsumerPollingStrategy;
031import org.apache.camel.Processor;
032import org.apache.camel.spi.ExceptionHandler;
033import org.apache.camel.support.LoggingExceptionHandler;
034import org.apache.camel.util.ServiceHelper;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * A default implementation of the {@link org.apache.camel.PollingConsumer} which uses the normal
040 * asynchronous consumer mechanism along with a {@link BlockingQueue} to allow
041 * the caller to pull messages on demand.
042 *
043 * @version 
044 */
045public class EventDrivenPollingConsumer extends PollingConsumerSupport implements Processor, IsSingleton {
046    private static final Logger LOG = LoggerFactory.getLogger(EventDrivenPollingConsumer.class);
047    private final BlockingQueue<Exchange> queue;
048    private ExceptionHandler interruptedExceptionHandler;
049    private Consumer consumer;
050    private boolean blockWhenFull = true;
051    private long blockTimeout;
052    private final int queueCapacity;
053
054    public EventDrivenPollingConsumer(Endpoint endpoint) {
055        this(endpoint, 1000);
056    }
057
058    public EventDrivenPollingConsumer(Endpoint endpoint, int queueSize) {
059        super(endpoint);
060        this.queueCapacity = queueSize;
061        if (queueSize <= 0) {
062            this.queue = new LinkedBlockingQueue<Exchange>();
063        } else {
064            this.queue = new ArrayBlockingQueue<Exchange>(queueSize);
065        }
066        this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class);
067    }
068
069    public EventDrivenPollingConsumer(Endpoint endpoint, BlockingQueue<Exchange> queue) {
070        super(endpoint);
071        this.queue = queue;
072        this.queueCapacity = queue.remainingCapacity();
073        this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class);
074    }
075
076    public boolean isBlockWhenFull() {
077        return blockWhenFull;
078    }
079
080    public void setBlockWhenFull(boolean blockWhenFull) {
081        this.blockWhenFull = blockWhenFull;
082    }
083
084    public long getBlockTimeout() {
085        return blockTimeout;
086    }
087
088    public void setBlockTimeout(long blockTimeout) {
089        this.blockTimeout = blockTimeout;
090    }
091
092    /**
093     * Gets the queue capacity.
094     */
095    public int getQueueCapacity() {
096        return queueCapacity;
097    }
098
099    /**
100     * Gets the current queue size (no of elements in the queue).
101     */
102    public int getQueueSize() {
103        return queue.size();
104    }
105
106    public Exchange receiveNoWait() {
107        return receive(0);
108    }
109
110    public Exchange receive() {
111        // must be started
112        if (!isRunAllowed() || !isStarted()) {
113            throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name());
114        }
115
116        while (isRunAllowed()) {
117            try {
118                beforePoll(0);
119                // take will block waiting for message
120                return queue.take();
121            } catch (InterruptedException e) {
122                handleInterruptedException(e);
123            } finally {
124                afterPoll();
125            }
126        }
127        LOG.trace("Consumer is not running, so returning null");
128        return null;
129    }
130
131    public Exchange receive(long timeout) {
132        // must be started
133        if (!isRunAllowed() || !isStarted()) {
134            throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name());
135        }
136
137        try {
138            // use the timeout value returned from beforePoll
139            timeout = beforePoll(timeout);
140            return queue.poll(timeout, TimeUnit.MILLISECONDS);
141        } catch (InterruptedException e) {
142            handleInterruptedException(e);
143            return null;
144        } finally {
145            afterPoll();
146        }
147    }
148
149    public void process(Exchange exchange) throws Exception {
150        if (isBlockWhenFull()) {
151            try {
152                if (getBlockTimeout() <= 0) {
153                    queue.put(exchange);
154                } else {
155                    boolean added = queue.offer(exchange, getBlockTimeout(), TimeUnit.MILLISECONDS);
156                    if (!added) {
157                        throw new ExchangeTimedOutException(exchange, getBlockTimeout());
158                    }
159                }
160            } catch (InterruptedException e) {
161                // ignore
162                log.debug("Put interrupted, are we stopping? {}", isStopping() || isStopped());
163            }
164        } else {
165            queue.add(exchange);
166        }
167    }
168
169    public ExceptionHandler getInterruptedExceptionHandler() {
170        return interruptedExceptionHandler;
171    }
172
173    public void setInterruptedExceptionHandler(ExceptionHandler interruptedExceptionHandler) {
174        this.interruptedExceptionHandler = interruptedExceptionHandler;
175    }
176
177    protected void handleInterruptedException(InterruptedException e) {
178        getInterruptedExceptionHandler().handleException(e);
179    }
180
181    protected long beforePoll(long timeout) {
182        if (consumer instanceof PollingConsumerPollingStrategy) {
183            PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer;
184            try {
185                timeout = strategy.beforePoll(timeout);
186            } catch (Exception e) {
187                LOG.debug("Error occurred before polling " + consumer + ". This exception will be ignored.", e);
188            }
189        }
190        return timeout;
191    }
192
193    protected void afterPoll() {
194        if (consumer instanceof PollingConsumerPollingStrategy) {
195            PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer;
196            try {
197                strategy.afterPoll();
198            } catch (Exception e) {
199                LOG.debug("Error occurred after polling " + consumer + ". This exception will be ignored.", e);
200            }
201        }
202    }
203
204    protected void doStart() throws Exception {
205        // lets add ourselves as a consumer
206        consumer = getEndpoint().createConsumer(this);
207
208        // if the consumer has a polling strategy then invoke that
209        if (consumer instanceof PollingConsumerPollingStrategy) {
210            PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer;
211            strategy.onInit();
212        } else {
213            ServiceHelper.startService(consumer);
214        }
215    }
216
217    protected void doStop() throws Exception {
218        ServiceHelper.stopService(consumer);
219    }
220
221    protected void doShutdown() throws Exception {
222        ServiceHelper.stopAndShutdownService(consumer);
223        queue.clear();
224    }
225
226    @Override
227    // As the consumer could take the messages at once, so we cannot release the consumer
228    public boolean isSingleton() {
229        return true;
230    }
231}