001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.HashMap;
020import java.util.Map;
021import java.util.concurrent.ScheduledExecutorService;
022import java.util.concurrent.TimeUnit;
023
024import org.apache.camel.Endpoint;
025import org.apache.camel.Exchange;
026import org.apache.camel.FailedToCreateConsumerException;
027import org.apache.camel.LoggingLevel;
028import org.apache.camel.PollingConsumerPollingStrategy;
029import org.apache.camel.Processor;
030import org.apache.camel.Suspendable;
031import org.apache.camel.SuspendableService;
032import org.apache.camel.spi.PollingConsumerPollStrategy;
033import org.apache.camel.spi.ScheduledPollConsumerScheduler;
034import org.apache.camel.util.IntrospectionSupport;
035import org.apache.camel.util.ObjectHelper;
036import org.apache.camel.util.ServiceHelper;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * A useful base class for any consumer which is polling based
042 */
043public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, Suspendable, PollingConsumerPollingStrategy {
044    private static final Logger LOG = LoggerFactory.getLogger(ScheduledPollConsumer.class);
045
046    private ScheduledPollConsumerScheduler scheduler;
047    private ScheduledExecutorService scheduledExecutorService;
048
049    // if adding more options then align with org.apache.camel.impl.ScheduledPollEndpoint
050    private boolean startScheduler = true;
051    private long initialDelay = 1000;
052    private long delay = 500;
053    private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
054    private boolean useFixedDelay = true;
055    private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy();
056    private LoggingLevel runLoggingLevel = LoggingLevel.TRACE;
057    private boolean sendEmptyMessageWhenIdle;
058    private boolean greedy;
059    private int backoffMultiplier;
060    private int backoffIdleThreshold;
061    private int backoffErrorThreshold;
062    private Map<String, Object> schedulerProperties;
063
064    // state during running
065    private volatile boolean polling;
066    private volatile int backoffCounter;
067    private volatile long idleCounter;
068    private volatile long errorCounter;
069
070    public ScheduledPollConsumer(Endpoint endpoint, Processor processor) {
071        super(endpoint, processor);
072    }
073
074    public ScheduledPollConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService scheduledExecutorService) {
075        super(endpoint, processor);
076        // we have been given an existing thread pool, so we should not manage its lifecycle
077        // so we should keep shutdownExecutor as false
078        this.scheduledExecutorService = scheduledExecutorService;
079        ObjectHelper.notNull(scheduledExecutorService, "scheduledExecutorService");
080    }
081
082    /**
083     * Invoked whenever we should be polled
084     */
085    public void run() {
086        // avoid this thread to throw exceptions because the thread pool wont re-schedule a new thread
087        try {
088            // log starting
089            if (LoggingLevel.ERROR == runLoggingLevel) {
090                LOG.error("Scheduled task started on:   {}", this.getEndpoint());
091            } else if (LoggingLevel.WARN == runLoggingLevel) {
092                LOG.warn("Scheduled task started on:   {}", this.getEndpoint());
093            } else if (LoggingLevel.INFO == runLoggingLevel) {
094                LOG.info("Scheduled task started on:   {}", this.getEndpoint());
095            } else if (LoggingLevel.DEBUG == runLoggingLevel) {
096                LOG.debug("Scheduled task started on:   {}", this.getEndpoint());
097            } else {
098                LOG.trace("Scheduled task started on:   {}", this.getEndpoint());
099            }
100
101            // execute scheduled task
102            doRun();
103
104            // log completed
105            if (LoggingLevel.ERROR == runLoggingLevel) {
106                LOG.error("Scheduled task completed on: {}", this.getEndpoint());
107            } else if (LoggingLevel.WARN == runLoggingLevel) {
108                LOG.warn("Scheduled task completed on: {}", this.getEndpoint());
109            } else if (LoggingLevel.INFO == runLoggingLevel) {
110                LOG.info("Scheduled task completed on: {}", this.getEndpoint());
111            } else if (LoggingLevel.DEBUG == runLoggingLevel) {
112                LOG.debug("Scheduled task completed on: {}", this.getEndpoint());
113            } else {
114                LOG.trace("Scheduled task completed on: {}", this.getEndpoint());
115            }
116
117        } catch (Error e) {
118            // must catch Error, to ensure the task is re-scheduled
119            LOG.error("Error occurred during running scheduled task on: " + this.getEndpoint() + ", due: " + e.getMessage(), e);
120        }
121    }
122
123    private void doRun() {
124        if (isSuspended()) {
125            LOG.trace("Cannot start to poll: {} as its suspended", this.getEndpoint());
126            return;
127        }
128
129        // should we backoff if its enabled, and either the idle or error counter is > the threshold
130        if (backoffMultiplier > 0
131                // either idle or error threshold could be not in use, so check for that and use MAX_VALUE if not in use
132                && (idleCounter >= (backoffIdleThreshold > 0 ? backoffIdleThreshold : Integer.MAX_VALUE))
133                || errorCounter >= (backoffErrorThreshold > 0 ? backoffErrorThreshold : Integer.MAX_VALUE)) {
134            if (backoffCounter++ < backoffMultiplier) {
135                // yes we should backoff
136                if (idleCounter > 0) {
137                    LOG.debug("doRun() backoff due subsequent {} idles (backoff at {}/{})", new Object[]{idleCounter, backoffCounter, backoffMultiplier});
138                } else {
139                    LOG.debug("doRun() backoff due subsequent {} errors (backoff at {}/{})", new Object[]{errorCounter, backoffCounter, backoffMultiplier});
140                }
141                return;
142            } else {
143                // we are finished with backoff so reset counters
144                idleCounter = 0;
145                errorCounter = 0;
146                backoffCounter = 0;
147                LOG.trace("doRun() backoff finished, resetting counters.");
148            }
149        }
150
151        int retryCounter = -1;
152        boolean done = false;
153        Throwable cause = null;
154        int polledMessages = 0;
155
156        while (!done) {
157            try {
158                cause = null;
159                // eager assume we are done
160                done = true;
161                if (isPollAllowed()) {
162
163                    if (retryCounter == -1) {
164                        LOG.trace("Starting to poll: {}", this.getEndpoint());
165                    } else {
166                        LOG.debug("Retrying attempt {} to poll: {}", retryCounter, this.getEndpoint());
167                    }
168
169                    // mark we are polling which should also include the begin/poll/commit
170                    polling = true;
171                    try {
172                        boolean begin = pollStrategy.begin(this, getEndpoint());
173                        if (begin) {
174                            retryCounter++;
175                            polledMessages = poll();
176                            LOG.trace("Polled {} messages", polledMessages);
177
178                            if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) {
179                                // send an "empty" exchange
180                                processEmptyMessage();
181                            }
182
183                            pollStrategy.commit(this, getEndpoint(), polledMessages);
184
185                            if (polledMessages > 0 && isGreedy()) {
186                                done = false;
187                                retryCounter = -1;
188                                LOG.trace("Greedy polling after processing {} messages", polledMessages);
189                            }
190                        } else {
191                            LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy);
192                        }
193                    } finally {
194                        polling = false;
195                    }
196                }
197
198                LOG.trace("Finished polling: {}", this.getEndpoint());
199            } catch (Exception e) {
200                try {
201                    boolean retry = pollStrategy.rollback(this, getEndpoint(), retryCounter, e);
202                    if (retry) {
203                        // do not set cause as we retry
204                        done = false;
205                    } else {
206                        cause = e;
207                        done = true;
208                    }
209                } catch (Throwable t) {
210                    cause = t;
211                    done = true;
212                }
213            } catch (Throwable t) {
214                cause = t;
215                done = true;
216            }
217
218            if (cause != null && isRunAllowed()) {
219                // let exception handler deal with the caused exception
220                // but suppress this during shutdown as the logs may get flooded with exceptions during shutdown/forced shutdown
221                try {
222                    getExceptionHandler().handleException("Consumer " + this + " failed polling endpoint: " + getEndpoint()
223                            + ". Will try again at next poll", cause);
224                } catch (Throwable e) {
225                    LOG.warn("Error handling exception. This exception will be ignored.", e);
226                }
227            }
228        }
229
230        if (cause != null) {
231            idleCounter = 0;
232            errorCounter++;
233        } else {
234            idleCounter = polledMessages == 0 ? ++idleCounter : 0;
235            errorCounter = 0;
236        }
237        LOG.trace("doRun() done with idleCounter={}, errorCounter={}", idleCounter, errorCounter);
238
239        // avoid this thread to throw exceptions because the thread pool wont re-schedule a new thread
240    }
241
242    /**
243     * No messages to poll so send an empty message instead.
244     *
245     * @throws Exception is thrown if error processing the empty message.
246     */
247    protected void processEmptyMessage() throws Exception {
248        Exchange exchange = getEndpoint().createExchange();
249        log.debug("Sending empty message as there were no messages from polling: {}", this.getEndpoint());
250        getProcessor().process(exchange);
251    }
252
253    // Properties
254    // -------------------------------------------------------------------------
255
256    protected boolean isPollAllowed() {
257        return isRunAllowed() && !isSuspended();
258    }
259
260    /**
261     * Whether polling is currently in progress
262     */
263    protected boolean isPolling() {
264        return polling;
265    }
266
267    public ScheduledPollConsumerScheduler getScheduler() {
268        return scheduler;
269    }
270
271    public void setScheduler(ScheduledPollConsumerScheduler scheduler) {
272        this.scheduler = scheduler;
273    }
274
275    public Map<String, Object> getSchedulerProperties() {
276        return schedulerProperties;
277    }
278
279    public void setSchedulerProperties(Map<String, Object> schedulerProperties) {
280        this.schedulerProperties = schedulerProperties;
281    }
282
283    public long getInitialDelay() {
284        return initialDelay;
285    }
286
287    public void setInitialDelay(long initialDelay) {
288        this.initialDelay = initialDelay;
289    }
290
291    public long getDelay() {
292        return delay;
293    }
294
295    public void setDelay(long delay) {
296        this.delay = delay;
297    }
298
299    public TimeUnit getTimeUnit() {
300        return timeUnit;
301    }
302
303    public void setTimeUnit(TimeUnit timeUnit) {
304        this.timeUnit = timeUnit;
305    }
306
307    public boolean isUseFixedDelay() {
308        return useFixedDelay;
309    }
310
311    public void setUseFixedDelay(boolean useFixedDelay) {
312        this.useFixedDelay = useFixedDelay;
313    }
314
315    public LoggingLevel getRunLoggingLevel() {
316        return runLoggingLevel;
317    }
318
319    public void setRunLoggingLevel(LoggingLevel runLoggingLevel) {
320        this.runLoggingLevel = runLoggingLevel;
321    }
322
323    public PollingConsumerPollStrategy getPollStrategy() {
324        return pollStrategy;
325    }
326
327    public void setPollStrategy(PollingConsumerPollStrategy pollStrategy) {
328        this.pollStrategy = pollStrategy;
329    }
330
331    public boolean isStartScheduler() {
332        return startScheduler;
333    }
334
335    public void setStartScheduler(boolean startScheduler) {
336        this.startScheduler = startScheduler;
337    }
338
339    public void setSendEmptyMessageWhenIdle(boolean sendEmptyMessageWhenIdle) {
340        this.sendEmptyMessageWhenIdle = sendEmptyMessageWhenIdle;
341    }
342
343    public boolean isSendEmptyMessageWhenIdle() {
344        return sendEmptyMessageWhenIdle;
345    }
346
347    public boolean isGreedy() {
348        return greedy;
349    }
350
351    public void setGreedy(boolean greedy) {
352        this.greedy = greedy;
353    }
354
355    public int getBackoffCounter() {
356        return backoffCounter;
357    }
358
359    public int getBackoffMultiplier() {
360        return backoffMultiplier;
361    }
362
363    public void setBackoffMultiplier(int backoffMultiplier) {
364        this.backoffMultiplier = backoffMultiplier;
365    }
366
367    public int getBackoffIdleThreshold() {
368        return backoffIdleThreshold;
369    }
370
371    public void setBackoffIdleThreshold(int backoffIdleThreshold) {
372        this.backoffIdleThreshold = backoffIdleThreshold;
373    }
374
375    public int getBackoffErrorThreshold() {
376        return backoffErrorThreshold;
377    }
378
379    public void setBackoffErrorThreshold(int backoffErrorThreshold) {
380        this.backoffErrorThreshold = backoffErrorThreshold;
381    }
382
383    public ScheduledExecutorService getScheduledExecutorService() {
384        return scheduledExecutorService;
385    }
386
387    public boolean isSchedulerStarted() {
388        return scheduler.isSchedulerStarted();
389    }
390
391    public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
392        this.scheduledExecutorService = scheduledExecutorService;
393    }
394
395    // Implementation methods
396    // -------------------------------------------------------------------------
397
398    /**
399     * The polling method which is invoked periodically to poll this consumer
400     *
401     * @return number of messages polled, will be <tt>0</tt> if no message was polled at all.
402     * @throws Exception can be thrown if an exception occurred during polling
403     */
404    protected abstract int poll() throws Exception;
405
406    @Override
407    protected void doStart() throws Exception {
408        super.doStart();
409
410        // validate that if backoff multiplier is in use, the threshold values is set correctly
411        if (backoffMultiplier > 0) {
412            if (backoffIdleThreshold <= 0 && backoffErrorThreshold <= 0) {
413                throw new IllegalArgumentException("backoffIdleThreshold and/or backoffErrorThreshold must be configured to a positive value when using backoffMultiplier");
414            }
415            LOG.debug("Using backoff[multiplier={}, idleThreshold={}, errorThreshold={}] on {}", new Object[]{backoffMultiplier, backoffIdleThreshold, backoffErrorThreshold, getEndpoint()});
416        }
417
418        if (scheduler == null) {
419            scheduler = new DefaultScheduledPollConsumerScheduler(scheduledExecutorService);
420        }
421        scheduler.setCamelContext(getEndpoint().getCamelContext());
422        scheduler.onInit(this);
423        scheduler.scheduleTask(this);
424
425        // configure scheduler with options from this consumer
426        Map<String, Object> properties = new HashMap<String, Object>();
427        IntrospectionSupport.getProperties(this, properties, null);
428        IntrospectionSupport.setProperties(getEndpoint().getCamelContext().getTypeConverter(), scheduler, properties);
429        if (schedulerProperties != null && !schedulerProperties.isEmpty()) {
430            // need to use a copy in case the consumer is restarted so we keep the properties
431            Map<String, Object> copy = new HashMap<String, Object>(schedulerProperties);
432            IntrospectionSupport.setProperties(getEndpoint().getCamelContext().getTypeConverter(), scheduler, copy);
433            if (copy.size() > 0) {
434                throw new FailedToCreateConsumerException(getEndpoint(), "There are " + copy.size()
435                        + " scheduler parameters that couldn't be set on the endpoint."
436                        + " Check the uri if the parameters are spelt correctly and that they are properties of the endpoint."
437                        + " Unknown parameters=[" + copy + "]");
438            }
439        }
440
441        ObjectHelper.notNull(scheduler, "scheduler", this);
442        ObjectHelper.notNull(pollStrategy, "pollStrategy", this);
443
444        ServiceHelper.startService(scheduler);
445
446        if (isStartScheduler()) {
447            startScheduler();
448        }
449    }
450
451    /**
452     * Starts the scheduler.
453     * <p/>
454     * If the scheduler is already started, then this is a noop method call.
455     */
456    public void startScheduler() {
457        scheduler.startScheduler();
458    }
459
460    @Override
461    protected void doStop() throws Exception {
462        if (scheduler != null) {
463            scheduler.unscheduleTask();
464            ServiceHelper.stopAndShutdownServices(scheduler);
465        }
466
467        // clear counters
468        backoffCounter = 0;
469        idleCounter = 0;
470        errorCounter = 0;
471
472        super.doStop();
473    }
474
475    @Override
476    protected void doShutdown() throws Exception {
477        ServiceHelper.stopAndShutdownServices(scheduler);
478        super.doShutdown();
479    }
480
481    @Override
482    protected void doSuspend() throws Exception {
483        // dont stop/cancel the future task since we just check in the run method
484    }
485
486    @Override
487    public void onInit() throws Exception {
488        // make sure the scheduler is starter
489        startScheduler = true;
490    }
491
492    @Override
493    public long beforePoll(long timeout) throws Exception {
494        LOG.trace("Before poll {}", getEndpoint());
495        // resume or start our self
496        if (!ServiceHelper.resumeService(this)) {
497            ServiceHelper.startService(this);
498        }
499
500        // ensure at least timeout is as long as one poll delay
501        return Math.max(timeout, getDelay());
502    }
503
504    @Override
505    public void afterPoll() throws Exception {
506        LOG.trace("After poll {}", getEndpoint());
507        // suspend or stop our self
508        if (!ServiceHelper.suspendService(this)) {
509            ServiceHelper.stopService(this);
510        }
511    }
512
513}