001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.timer;
018    
019    import java.util.Date;
020    import java.util.Timer;
021    import java.util.TimerTask;
022    import java.util.concurrent.atomic.AtomicLong;
023    
024    import org.apache.camel.Exchange;
025    import org.apache.camel.Processor;
026    import org.apache.camel.impl.DefaultConsumer;
027    import org.slf4j.Logger;
028    import org.slf4j.LoggerFactory;
029    
030    /**
031     * The timer consumer.
032     *
033     * @version 
034     */
035    public class TimerConsumer extends DefaultConsumer {
036        private static final transient Logger LOG = LoggerFactory.getLogger(TimerConsumer.class);
037        private final TimerEndpoint endpoint;
038        private volatile TimerTask task;
039    
040        public TimerConsumer(TimerEndpoint endpoint, Processor processor) {
041            super(endpoint, processor);
042            this.endpoint = endpoint;
043        }
044    
045        @Override
046        protected void doStart() throws Exception {
047            task = new TimerTask() {
048                // counter
049                private final AtomicLong counter = new AtomicLong();
050    
051                @Override
052                public void run() {
053                    if (!isTaskRunAllowed()) {
054                        // do not run timer task as it was not allowed
055                        return;
056                    }
057    
058                    try {
059                        long count = counter.incrementAndGet();
060    
061                        boolean fire = endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount();
062                        if (fire) {
063                            sendTimerExchange(count);
064                        } else {
065                            // no need to fire anymore as we exceeded repeat count
066                            LOG.debug("Cancelling {} timer as repeat count limit reached after {} counts.", endpoint.getTimerName(), endpoint.getRepeatCount());
067                            cancel();
068                        }
069                    } catch (Throwable e) {
070                        // catch all to avoid the JVM closing the thread and not firing again
071                        LOG.warn("Error processing exchange. This exception will be ignored, to let the timer be able to trigger again.", e);
072                    }
073                }
074            };
075    
076            Timer timer = endpoint.getTimer();
077            configureTask(task, timer);
078        }
079    
080        @Override
081        protected void doStop() throws Exception {
082            if (task != null) {
083                task.cancel();
084            }
085            task = null;
086        }
087    
088        /**
089         * Whether the timer task is allow to run or not
090         */
091        protected boolean isTaskRunAllowed() {
092            // only allow running the timer task if we can run and are not suspended,
093            // and CamelContext must have been fully started
094            return endpoint.getCamelContext().getStatus().isStarted() && isRunAllowed() && !isSuspended();
095        }
096    
097        protected void configureTask(TimerTask task, Timer timer) {
098            if (endpoint.isFixedRate()) {
099                if (endpoint.getTime() != null) {
100                    timer.scheduleAtFixedRate(task, endpoint.getTime(), endpoint.getPeriod());
101                } else {
102                    timer.scheduleAtFixedRate(task, endpoint.getDelay(), endpoint.getPeriod());
103                }
104            } else {
105                if (endpoint.getTime() != null) {
106                    if (endpoint.getPeriod() > 0) {
107                        timer.schedule(task, endpoint.getTime(), endpoint.getPeriod());
108                    } else {
109                        timer.schedule(task, endpoint.getTime());
110                    }
111                } else {
112                    if (endpoint.getPeriod() > 0) {
113                        timer.schedule(task, endpoint.getDelay(), endpoint.getPeriod());
114                    } else {
115                        timer.schedule(task, endpoint.getDelay());
116                    }
117                }
118            }
119        }
120    
121        protected void sendTimerExchange(long counter) {
122            Exchange exchange = endpoint.createExchange();
123            exchange.setProperty(Exchange.TIMER_COUNTER, counter);
124            exchange.setProperty(Exchange.TIMER_NAME, endpoint.getTimerName());
125            exchange.setProperty(Exchange.TIMER_TIME, endpoint.getTime());
126            exchange.setProperty(Exchange.TIMER_PERIOD, endpoint.getPeriod());
127    
128            Date now = new Date();
129            exchange.setProperty(Exchange.TIMER_FIRED_TIME, now);
130            // also set now on in header with same key as quartz to be consistent
131            exchange.getIn().setHeader("firedTime", now);
132    
133            LOG.trace("Timer {} is firing #{} count", endpoint.getTimerName(), counter);
134            try {
135                getProcessor().process(exchange);
136            } catch (Exception e) {
137                exchange.setException(e);
138            }
139    
140            // handle any thrown exception
141            if (exchange.getException() != null) {
142                getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
143            }
144        }
145    }