001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.timer;
018
019import java.util.Date;
020import java.util.Timer;
021import java.util.TimerTask;
022import java.util.concurrent.ExecutorService;
023import java.util.concurrent.atomic.AtomicLong;
024
025import org.apache.camel.AsyncCallback;
026import org.apache.camel.CamelContext;
027import org.apache.camel.Exchange;
028import org.apache.camel.Processor;
029import org.apache.camel.StartupListener;
030import org.apache.camel.impl.DefaultConsumer;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * The timer consumer.
036 *
037 * @version 
038 */
039public class TimerConsumer extends DefaultConsumer implements StartupListener {
040    private static final Logger LOG = LoggerFactory.getLogger(TimerConsumer.class);
041    private final TimerEndpoint endpoint;
042    private volatile TimerTask task;
043    private volatile boolean configured;
044    private ExecutorService executorService;
045
046    public TimerConsumer(TimerEndpoint endpoint, Processor processor) {
047        super(endpoint, processor);
048        this.endpoint = endpoint;
049    }
050
051    @Override
052    public TimerEndpoint getEndpoint() {
053        return (TimerEndpoint) super.getEndpoint();
054    }
055
056    @Override
057    protected void doStart() throws Exception {
058        if (endpoint.getDelay() >= 0) { 
059            task = new TimerTask() {
060                // counter
061                private final AtomicLong counter = new AtomicLong();
062
063                @Override
064                public void run() {
065                    if (!isTaskRunAllowed()) {
066                        // do not run timer task as it was not allowed
067                        LOG.debug("Run now allowed for timer: {}", endpoint);
068                        return;
069                    }
070
071                    try {
072                        long count = counter.incrementAndGet();
073
074                        boolean fire = endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount();
075                        if (fire) {
076                            sendTimerExchange(count);
077                        } else {
078                            // no need to fire anymore as we exceeded repeat
079                            // count
080                            LOG.debug("Cancelling {} timer as repeat count limit reached after {} counts.", endpoint.getTimerName(), endpoint.getRepeatCount());
081                            cancel();
082                        }
083                    } catch (Throwable e) {
084                        // catch all to avoid the JVM closing the thread and not
085                        // firing again
086                        LOG.warn("Error processing exchange. This exception will be ignored, to let the timer be able to trigger again.", e);
087                    }
088                }
089            };
090
091            // only configure task if CamelContext already started, otherwise
092            // the StartupListener
093            // is configuring the task later
094            if (!configured && endpoint.getCamelContext().getStatus().isStarted()) {
095                Timer timer = endpoint.getTimer(this);
096                configureTask(task, timer);
097            }
098        } else {
099            // if the delay is negative then we use an ExecutorService and fire messages as soon as possible
100            executorService = endpoint.getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, endpoint.getEndpointUri());
101
102            executorService.execute(new Runnable() {
103                public void run() {
104                    final AtomicLong counter = new AtomicLong();
105                    long count = counter.incrementAndGet();
106                    while ((endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount()) && isRunAllowed()) {
107                        sendTimerExchange(count);
108                        count = counter.incrementAndGet();
109                    }
110                }
111            });
112        }
113    }
114
115    @Override
116    protected void doStop() throws Exception {
117        if (task != null) {
118            task.cancel();
119        }
120        task = null;
121        configured = false;
122
123        // remove timer
124        endpoint.removeTimer(this);
125        
126        // if executorService is instantiated then we shutdown it
127        if (executorService != null) {
128            endpoint.getCamelContext().getExecutorServiceManager().shutdown(executorService);
129            executorService = null;
130        }
131    }
132
133    @Override
134    public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception {
135        if (task != null && !configured) {
136            Timer timer = endpoint.getTimer(this);
137            configureTask(task, timer);
138        } 
139    }
140
141    /**
142     * Whether the timer task is allow to run or not
143     */
144    protected boolean isTaskRunAllowed() {
145        // only allow running the timer task if we can run and are not suspended,
146        // and CamelContext must have been fully started
147        return endpoint.getCamelContext().getStatus().isStarted() && isRunAllowed() && !isSuspended();
148    }
149
150    protected void configureTask(TimerTask task, Timer timer) {
151        if (endpoint.isFixedRate()) {
152            if (endpoint.getTime() != null) {
153                timer.scheduleAtFixedRate(task, endpoint.getTime(), endpoint.getPeriod());
154            } else {
155                timer.scheduleAtFixedRate(task, endpoint.getDelay(), endpoint.getPeriod());
156            }
157        } else {
158            if (endpoint.getTime() != null) {
159                if (endpoint.getPeriod() > 0) {
160                    timer.schedule(task, endpoint.getTime(), endpoint.getPeriod());
161                } else {
162                    timer.schedule(task, endpoint.getTime());
163                }
164            } else {
165                if (endpoint.getPeriod() > 0) {
166                    timer.schedule(task, endpoint.getDelay(), endpoint.getPeriod());
167                } else {
168                    timer.schedule(task, endpoint.getDelay());
169                }
170            }
171        }
172        configured = true;
173    }
174
175    protected void sendTimerExchange(long counter) {
176        final Exchange exchange = endpoint.createExchange();
177        exchange.setProperty(Exchange.TIMER_COUNTER, counter);
178        exchange.setProperty(Exchange.TIMER_NAME, endpoint.getTimerName());
179        exchange.setProperty(Exchange.TIMER_TIME, endpoint.getTime());
180        exchange.setProperty(Exchange.TIMER_PERIOD, endpoint.getPeriod());
181
182        Date now = new Date();
183        exchange.setProperty(Exchange.TIMER_FIRED_TIME, now);
184        // also set now on in header with same key as quartz to be consistent
185        exchange.getIn().setHeader("firedTime", now);
186
187        if (LOG.isTraceEnabled()) {
188            LOG.trace("Timer {} is firing #{} count", endpoint.getTimerName(), counter);
189        }
190
191        if (!endpoint.isSynchronous()) {
192            getAsyncProcessor().process(exchange, new AsyncCallback() {
193                @Override
194                public void done(boolean doneSync) {
195                    // handle any thrown exception
196                    if (exchange.getException() != null) {
197                        getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
198                    }
199                }
200            });
201        } else {
202            try {
203                getProcessor().process(exchange);
204            } catch (Exception e) {
205                exchange.setException(e);
206            }
207
208            // handle any thrown exception
209            if (exchange.getException() != null) {
210                getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
211            }
212        }
213    }
214}