001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.Locale;
020    import java.util.concurrent.ScheduledExecutorService;
021    import java.util.concurrent.ScheduledFuture;
022    import java.util.concurrent.TimeUnit;
023    
024    import org.apache.camel.Endpoint;
025    import org.apache.camel.Exchange;
026    import org.apache.camel.LoggingLevel;
027    import org.apache.camel.PollingConsumerPollingStrategy;
028    import org.apache.camel.Processor;
029    import org.apache.camel.StatefulService;
030    import org.apache.camel.SuspendableService;
031    import org.apache.camel.spi.PollingConsumerPollStrategy;
032    import org.apache.camel.util.ObjectHelper;
033    import org.apache.camel.util.ServiceHelper;
034    import org.slf4j.Logger;
035    import org.slf4j.LoggerFactory;
036    
037    /**
038     * A useful base class for any consumer which is polling based
039     * 
040     * @version 
041     */
042    public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, SuspendableService, PollingConsumerPollingStrategy {
043        private static final transient Logger LOG = LoggerFactory.getLogger(ScheduledPollConsumer.class);
044    
045        private ScheduledExecutorService scheduledExecutorService;
046        private boolean shutdownExecutor;
047        private ScheduledFuture<?> future;
048    
049        // if adding more options then align with ScheduledPollEndpoint#configureScheduledPollConsumerProperties
050        private boolean startScheduler = true;
051        private long initialDelay = 1000;
052        private long delay = 500;
053        private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
054        private boolean useFixedDelay = true;
055        private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy();
056        private LoggingLevel runLoggingLevel = LoggingLevel.TRACE;
057        private boolean sendEmptyMessageWhenIdle;
058        private volatile boolean polling;
059    
060        public ScheduledPollConsumer(Endpoint endpoint, Processor processor) {
061            super(endpoint, processor);
062        }
063    
064        public ScheduledPollConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService scheduledExecutorService) {
065            super(endpoint, processor);
066            // we have been given an existing thread pool, so we should not manage its lifecycle
067            // so we should keep shutdownExecutor as false
068            this.scheduledExecutorService = scheduledExecutorService;
069            ObjectHelper.notNull(scheduledExecutorService, "scheduledExecutorService");
070        }
071    
072        /**
073         * Invoked whenever we should be polled
074         */
075        public void run() {
076            // avoid this thread to throw exceptions because the thread pool wont re-schedule a new thread
077            try {
078                // log starting
079                if (LoggingLevel.ERROR == runLoggingLevel) {
080                    LOG.error("Scheduled task started on:   {}", this.getEndpoint());
081                } else if (LoggingLevel.WARN == runLoggingLevel) {
082                    LOG.warn("Scheduled task started on:   {}", this.getEndpoint());
083                } else if (LoggingLevel.INFO == runLoggingLevel) {
084                    LOG.info("Scheduled task started on:   {}", this.getEndpoint());
085                } else if (LoggingLevel.DEBUG == runLoggingLevel) {
086                    LOG.debug("Scheduled task started on:   {}", this.getEndpoint());
087                } else {
088                    LOG.trace("Scheduled task started on:   {}", this.getEndpoint());
089                }
090    
091                // execute scheduled task
092                doRun();
093    
094                // log completed
095                if (LoggingLevel.ERROR == runLoggingLevel) {
096                    LOG.error("Scheduled task completed on: {}", this.getEndpoint());
097                } else if (LoggingLevel.WARN == runLoggingLevel) {
098                    LOG.warn("Scheduled task completed on: {}", this.getEndpoint());
099                } else if (LoggingLevel.INFO == runLoggingLevel) {
100                    LOG.info("Scheduled task completed on: {}", this.getEndpoint());
101                } else if (LoggingLevel.DEBUG == runLoggingLevel) {
102                    LOG.debug("Scheduled task completed on: {}", this.getEndpoint());
103                } else {
104                    LOG.trace("Scheduled task completed on: {}", this.getEndpoint());
105                }
106    
107            } catch (Error e) {
108                // must catch Error, to ensure the task is re-scheduled
109                LOG.error("Error occurred during running scheduled task on: " + this.getEndpoint() + ", due: " + e.getMessage(), e);
110            }
111        }
112    
113        private void doRun() {
114            if (isSuspended()) {
115                LOG.trace("Cannot start to poll: {} as its suspended", this.getEndpoint());
116                return;
117            }
118    
119            int retryCounter = -1;
120            boolean done = false;
121            Throwable cause = null;
122    
123            while (!done) {
124                try {
125                    cause = null;
126                    // eager assume we are done
127                    done = true;
128                    if (isPollAllowed()) {
129    
130                        if (retryCounter == -1) {
131                            LOG.trace("Starting to poll: {}", this.getEndpoint());
132                        } else {
133                            LOG.debug("Retrying attempt {} to poll: {}", retryCounter, this.getEndpoint());
134                        }
135    
136                        // mark we are polling which should also include the begin/poll/commit
137                        polling = true;
138                        try {
139                            boolean begin = pollStrategy.begin(this, getEndpoint());
140                            if (begin) {
141                                retryCounter++;
142                                int polledMessages = poll();
143    
144                                if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) {
145                                    // send an "empty" exchange
146                                    processEmptyMessage();
147                                }
148    
149                                pollStrategy.commit(this, getEndpoint(), polledMessages);
150                            } else {
151                                LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy);
152                            }
153                        } finally {
154                            polling = false;
155                        }
156                    }
157    
158                    LOG.trace("Finished polling: {}", this.getEndpoint());
159                } catch (Exception e) {
160                    try {
161                        boolean retry = pollStrategy.rollback(this, getEndpoint(), retryCounter, e);
162                        if (retry) {
163                            // do not set cause as we retry
164                            done = false;
165                        } else {
166                            cause = e;
167                            done = true;
168                        }
169                    } catch (Throwable t) {
170                        cause = t;
171                        done = true;
172                    }
173                } catch (Throwable t) {
174                    cause = t;
175                    done = true;
176                }
177    
178                if (cause != null && isRunAllowed()) {
179                    // let exception handler deal with the caused exception
180                    // but suppress this during shutdown as the logs may get flooded with exceptions during shutdown/forced shutdown
181                    try {
182                        getExceptionHandler().handleException("Consumer " + this +  " failed polling endpoint: " + getEndpoint()
183                                + ". Will try again at next poll", cause);
184                    } catch (Throwable e) {
185                        LOG.warn("Error handling exception. This exception will be ignored.", e);
186                    }
187                    cause = null;
188                }
189            }
190    
191            // avoid this thread to throw exceptions because the thread pool wont re-schedule a new thread
192        }
193    
194        /**
195         * No messages to poll so send an empty message instead.
196         *
197         * @throws Exception is thrown if error processing the empty message.
198         */
199        protected void processEmptyMessage() throws Exception {
200            Exchange exchange = getEndpoint().createExchange();
201            log.debug("Sending empty message as there were no messages from polling: {}", this.getEndpoint());
202            getProcessor().process(exchange);
203        }
204    
205        // Properties
206        // -------------------------------------------------------------------------
207    
208        protected boolean isPollAllowed() {
209            return isRunAllowed() && !isSuspended();
210        }
211    
212        /**
213         * Whether polling is currently in progress
214         */
215        protected boolean isPolling() {
216            return polling;
217        }
218    
219        public long getInitialDelay() {
220            return initialDelay;
221        }
222    
223        public void setInitialDelay(long initialDelay) {
224            this.initialDelay = initialDelay;
225        }
226    
227        public long getDelay() {
228            return delay;
229        }
230    
231        public void setDelay(long delay) {
232            this.delay = delay;
233        }
234    
235        public TimeUnit getTimeUnit() {
236            return timeUnit;
237        }
238    
239        /**
240         * Sets the time unit to use.
241         * <p/>
242         * Notice that both {@link #getDelay()} and {@link #getInitialDelay()} are using
243         * the same time unit. So if you change this value, then take into account that the
244         * default value of {@link #getInitialDelay()} is 1000. So you may to adjust this value accordingly.
245         *
246         * @param timeUnit the time unit.
247         */
248        public void setTimeUnit(TimeUnit timeUnit) {
249            this.timeUnit = timeUnit;
250        }
251    
252        public boolean isUseFixedDelay() {
253            return useFixedDelay;
254        }
255    
256        public void setUseFixedDelay(boolean useFixedDelay) {
257            this.useFixedDelay = useFixedDelay;
258        }
259    
260        public LoggingLevel getRunLoggingLevel() {
261            return runLoggingLevel;
262        }
263    
264        public void setRunLoggingLevel(LoggingLevel runLoggingLevel) {
265            this.runLoggingLevel = runLoggingLevel;
266        }
267    
268        public PollingConsumerPollStrategy getPollStrategy() {
269            return pollStrategy;
270        }
271    
272        public void setPollStrategy(PollingConsumerPollStrategy pollStrategy) {
273            this.pollStrategy = pollStrategy;
274        }
275    
276        public boolean isStartScheduler() {
277            return startScheduler;
278        }
279    
280        /**
281         * Sets whether the scheduler should be started when this consumer starts.
282         * <p/>
283         * This option is default true.
284         *
285         * @param startScheduler whether to start scheduler
286         */
287        public void setStartScheduler(boolean startScheduler) {
288            this.startScheduler = startScheduler;
289        }
290        
291        public void setSendEmptyMessageWhenIdle(boolean sendEmptyMessageWhenIdle) {
292            this.sendEmptyMessageWhenIdle = sendEmptyMessageWhenIdle;
293        }
294        
295        public boolean isSendEmptyMessageWhenIdle() {
296            return sendEmptyMessageWhenIdle;
297        }
298    
299        public ScheduledExecutorService getScheduledExecutorService() {
300            return scheduledExecutorService;
301        }
302    
303        /**
304         * Sets a custom shared {@link ScheduledExecutorService} to use as thread pool
305         * <p/>
306         * <b>Notice: </b> When using a custom thread pool, then the lifecycle of this thread
307         * pool is not controlled by this consumer (eg this consumer will not start/stop the thread pool
308         * when the consumer is started/stopped etc.)
309         *
310         * @param scheduledExecutorService the custom thread pool to use
311         */
312        public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
313            this.scheduledExecutorService = scheduledExecutorService;
314        }
315    
316        // Implementation methods
317        // -------------------------------------------------------------------------
318    
319        /**
320         * The polling method which is invoked periodically to poll this consumer
321         *
322         * @return number of messages polled, will be <tt>0</tt> if no message was polled at all.
323         * @throws Exception can be thrown if an exception occurred during polling
324         */
325        protected abstract int poll() throws Exception;
326    
327        @Override
328        protected void doStart() throws Exception {
329            super.doStart();
330    
331            // if no existing executor provided, then create a new thread pool ourselves
332            if (scheduledExecutorService == null) {
333                // we only need one thread in the pool to schedule this task
334                this.scheduledExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager()
335                        .newScheduledThreadPool(this, getEndpoint().getEndpointUri(), 1);
336                // and we should shutdown the thread pool when no longer needed
337                this.shutdownExecutor = true;
338            }
339    
340            ObjectHelper.notNull(scheduledExecutorService, "scheduledExecutorService", this);
341            ObjectHelper.notNull(pollStrategy, "pollStrategy", this);
342    
343            if (isStartScheduler()) {
344                startScheduler();
345            }
346        }
347    
348        protected void startScheduler() {
349            if (isUseFixedDelay()) {
350                if (LOG.isDebugEnabled()) {
351                    LOG.debug("Scheduling poll (fixed delay) with initialDelay: {}, delay: {} ({}) for: {}",
352                            new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), getEndpoint()});
353                }
354                future = scheduledExecutorService.scheduleWithFixedDelay(this, getInitialDelay(), getDelay(), getTimeUnit());
355            } else {
356                if (LOG.isDebugEnabled()) {
357                    LOG.debug("Scheduling poll (fixed rate) with initialDelay: {}, delay: {} ({}) for: {}",
358                            new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), getEndpoint()});
359                }
360                future = scheduledExecutorService.scheduleAtFixedRate(this, getInitialDelay(), getDelay(), getTimeUnit());
361            }
362        }
363    
364        @Override
365        protected void doStop() throws Exception {
366            if (future != null) {
367                LOG.debug("This consumer is stopping, so cancelling scheduled task: " + future);
368                future.cancel(false);
369            }
370            super.doStop();
371        }
372    
373        @Override
374        protected void doShutdown() throws Exception {
375            if (shutdownExecutor && scheduledExecutorService != null) {
376                getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService);
377                scheduledExecutorService = null;
378                future = null;
379            }
380            super.doShutdown();
381        }
382    
383        @Override
384        protected void doSuspend() throws Exception {
385            // dont stop/cancel the future task since we just check in the run method
386        }
387    
388        @Override
389        public void onInit() throws Exception {
390            // noop
391        }
392    
393        @Override
394        public long beforePoll(long timeout) throws Exception {
395            LOG.trace("Before poll {}", getEndpoint());
396            // resume or start our self
397            if (!ServiceHelper.resumeService(this)) {
398                ServiceHelper.startService(this);
399            }
400    
401            // ensure at least timeout is as long as one poll delay
402            return Math.max(timeout, getDelay());
403        }
404    
405        @Override
406        public void afterPoll() throws Exception {
407            LOG.trace("After poll {}", getEndpoint());
408            // suspend or stop our self
409            if (!ServiceHelper.suspendService(this)) {
410                ServiceHelper.stopService(this);
411            }
412        }
413    
414    }