001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.timer; 018 019 import java.util.Date; 020 import java.util.Timer; 021 import java.util.TimerTask; 022 import java.util.concurrent.atomic.AtomicLong; 023 024 import org.apache.camel.Exchange; 025 import org.apache.camel.Processor; 026 import org.apache.camel.impl.DefaultConsumer; 027 import org.slf4j.Logger; 028 import org.slf4j.LoggerFactory; 029 030 /** 031 * The timer consumer. 032 * 033 * @version 034 */ 035 public class TimerConsumer extends DefaultConsumer { 036 private static final transient Logger LOG = LoggerFactory.getLogger(TimerConsumer.class); 037 private final TimerEndpoint endpoint; 038 private volatile TimerTask task; 039 040 public TimerConsumer(TimerEndpoint endpoint, Processor processor) { 041 super(endpoint, processor); 042 this.endpoint = endpoint; 043 } 044 045 @Override 046 protected void doStart() throws Exception { 047 task = new TimerTask() { 048 // counter 049 private final AtomicLong counter = new AtomicLong(); 050 051 @Override 052 public void run() { 053 if (!isTaskRunAllowed()) { 054 // do not run timer task as it was not allowed 055 return; 056 } 057 058 try { 059 long count = counter.incrementAndGet(); 060 061 boolean fire = endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount(); 062 if (fire) { 063 sendTimerExchange(count); 064 } else { 065 // no need to fire anymore as we exceeded repeat count 066 LOG.debug("Cancelling {} timer as repeat count limit reached after {} counts.", endpoint.getTimerName(), endpoint.getRepeatCount()); 067 cancel(); 068 } 069 } catch (Throwable e) { 070 // catch all to avoid the JVM closing the thread and not firing again 071 LOG.warn("Error processing exchange. This exception will be ignored, to let the timer be able to trigger again.", e); 072 } 073 } 074 }; 075 076 Timer timer = endpoint.getTimer(); 077 configureTask(task, timer); 078 } 079 080 @Override 081 protected void doStop() throws Exception { 082 if (task != null) { 083 task.cancel(); 084 } 085 task = null; 086 } 087 088 /** 089 * Whether the timer task is allow to run or not 090 */ 091 protected boolean isTaskRunAllowed() { 092 // only allow running the timer task if we can run and are not suspended, 093 // and CamelContext must have been fully started 094 return endpoint.getCamelContext().getStatus().isStarted() && isRunAllowed() && !isSuspended(); 095 } 096 097 protected void configureTask(TimerTask task, Timer timer) { 098 if (endpoint.isFixedRate()) { 099 if (endpoint.getTime() != null) { 100 timer.scheduleAtFixedRate(task, endpoint.getTime(), endpoint.getPeriod()); 101 } else { 102 timer.scheduleAtFixedRate(task, endpoint.getDelay(), endpoint.getPeriod()); 103 } 104 } else { 105 if (endpoint.getTime() != null) { 106 if (endpoint.getPeriod() > 0) { 107 timer.schedule(task, endpoint.getTime(), endpoint.getPeriod()); 108 } else { 109 timer.schedule(task, endpoint.getTime()); 110 } 111 } else { 112 if (endpoint.getPeriod() > 0) { 113 timer.schedule(task, endpoint.getDelay(), endpoint.getPeriod()); 114 } else { 115 timer.schedule(task, endpoint.getDelay()); 116 } 117 } 118 } 119 } 120 121 protected void sendTimerExchange(long counter) { 122 Exchange exchange = endpoint.createExchange(); 123 exchange.setProperty(Exchange.TIMER_COUNTER, counter); 124 exchange.setProperty(Exchange.TIMER_NAME, endpoint.getTimerName()); 125 exchange.setProperty(Exchange.TIMER_TIME, endpoint.getTime()); 126 exchange.setProperty(Exchange.TIMER_PERIOD, endpoint.getPeriod()); 127 128 Date now = new Date(); 129 exchange.setProperty(Exchange.TIMER_FIRED_TIME, now); 130 // also set now on in header with same key as quartz to be consistent 131 exchange.getIn().setHeader("firedTime", now); 132 133 LOG.trace("Timer {} is firing #{} count", endpoint.getTimerName(), counter); 134 try { 135 getProcessor().process(exchange); 136 } catch (Exception e) { 137 exchange.setException(e); 138 } 139 140 // handle any thrown exception 141 if (exchange.getException() != null) { 142 getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); 143 } 144 } 145 }