001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.timer; 018 019import java.util.Date; 020import java.util.Timer; 021import java.util.TimerTask; 022import java.util.concurrent.ExecutorService; 023import java.util.concurrent.atomic.AtomicLong; 024 025import org.apache.camel.AsyncCallback; 026import org.apache.camel.CamelContext; 027import org.apache.camel.Exchange; 028import org.apache.camel.Processor; 029import org.apache.camel.StartupListener; 030import org.apache.camel.impl.DefaultConsumer; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * The timer consumer. 036 * 037 * @version 038 */ 039public class TimerConsumer extends DefaultConsumer implements StartupListener { 040 private static final Logger LOG = LoggerFactory.getLogger(TimerConsumer.class); 041 private final TimerEndpoint endpoint; 042 private volatile TimerTask task; 043 private volatile boolean configured; 044 private ExecutorService executorService; 045 046 public TimerConsumer(TimerEndpoint endpoint, Processor processor) { 047 super(endpoint, processor); 048 this.endpoint = endpoint; 049 } 050 051 @Override 052 public TimerEndpoint getEndpoint() { 053 return (TimerEndpoint) super.getEndpoint(); 054 } 055 056 @Override 057 protected void doStart() throws Exception { 058 if (endpoint.getDelay() >= 0) { 059 task = new TimerTask() { 060 // counter 061 private final AtomicLong counter = new AtomicLong(); 062 063 @Override 064 public void run() { 065 if (!isTaskRunAllowed()) { 066 // do not run timer task as it was not allowed 067 LOG.debug("Run now allowed for timer: {}", endpoint); 068 return; 069 } 070 071 try { 072 long count = counter.incrementAndGet(); 073 074 boolean fire = endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount(); 075 if (fire) { 076 sendTimerExchange(count); 077 } else { 078 // no need to fire anymore as we exceeded repeat 079 // count 080 LOG.debug("Cancelling {} timer as repeat count limit reached after {} counts.", endpoint.getTimerName(), endpoint.getRepeatCount()); 081 cancel(); 082 } 083 } catch (Throwable e) { 084 // catch all to avoid the JVM closing the thread and not 085 // firing again 086 LOG.warn("Error processing exchange. This exception will be ignored, to let the timer be able to trigger again.", e); 087 } 088 } 089 }; 090 091 // only configure task if CamelContext already started, otherwise 092 // the StartupListener 093 // is configuring the task later 094 if (!configured && endpoint.getCamelContext().getStatus().isStarted()) { 095 Timer timer = endpoint.getTimer(this); 096 configureTask(task, timer); 097 } 098 } else { 099 // if the delay is negative then we use an ExecutorService and fire messages as soon as possible 100 executorService = endpoint.getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, endpoint.getEndpointUri()); 101 102 executorService.execute(new Runnable() { 103 public void run() { 104 final AtomicLong counter = new AtomicLong(); 105 long count = counter.incrementAndGet(); 106 while ((endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount()) && isRunAllowed()) { 107 sendTimerExchange(count); 108 count = counter.incrementAndGet(); 109 } 110 } 111 }); 112 } 113 } 114 115 @Override 116 protected void doStop() throws Exception { 117 if (task != null) { 118 task.cancel(); 119 } 120 task = null; 121 configured = false; 122 123 // remove timer 124 endpoint.removeTimer(this); 125 126 // if executorService is instantiated then we shutdown it 127 if (executorService != null) { 128 endpoint.getCamelContext().getExecutorServiceManager().shutdown(executorService); 129 executorService = null; 130 } 131 } 132 133 @Override 134 public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception { 135 if (task != null && !configured) { 136 Timer timer = endpoint.getTimer(this); 137 configureTask(task, timer); 138 } 139 } 140 141 /** 142 * Whether the timer task is allow to run or not 143 */ 144 protected boolean isTaskRunAllowed() { 145 // only allow running the timer task if we can run and are not suspended, 146 // and CamelContext must have been fully started 147 return endpoint.getCamelContext().getStatus().isStarted() && isRunAllowed() && !isSuspended(); 148 } 149 150 protected void configureTask(TimerTask task, Timer timer) { 151 if (endpoint.isFixedRate()) { 152 if (endpoint.getTime() != null) { 153 timer.scheduleAtFixedRate(task, endpoint.getTime(), endpoint.getPeriod()); 154 } else { 155 timer.scheduleAtFixedRate(task, endpoint.getDelay(), endpoint.getPeriod()); 156 } 157 } else { 158 if (endpoint.getTime() != null) { 159 if (endpoint.getPeriod() > 0) { 160 timer.schedule(task, endpoint.getTime(), endpoint.getPeriod()); 161 } else { 162 timer.schedule(task, endpoint.getTime()); 163 } 164 } else { 165 if (endpoint.getPeriod() > 0) { 166 timer.schedule(task, endpoint.getDelay(), endpoint.getPeriod()); 167 } else { 168 timer.schedule(task, endpoint.getDelay()); 169 } 170 } 171 } 172 configured = true; 173 } 174 175 protected void sendTimerExchange(long counter) { 176 final Exchange exchange = endpoint.createExchange(); 177 exchange.setProperty(Exchange.TIMER_COUNTER, counter); 178 exchange.setProperty(Exchange.TIMER_NAME, endpoint.getTimerName()); 179 exchange.setProperty(Exchange.TIMER_TIME, endpoint.getTime()); 180 exchange.setProperty(Exchange.TIMER_PERIOD, endpoint.getPeriod()); 181 182 Date now = new Date(); 183 exchange.setProperty(Exchange.TIMER_FIRED_TIME, now); 184 // also set now on in header with same key as quartz to be consistent 185 exchange.getIn().setHeader("firedTime", now); 186 187 if (LOG.isTraceEnabled()) { 188 LOG.trace("Timer {} is firing #{} count", endpoint.getTimerName(), counter); 189 } 190 191 if (!endpoint.isSynchronous()) { 192 getAsyncProcessor().process(exchange, new AsyncCallback() { 193 @Override 194 public void done(boolean doneSync) { 195 // handle any thrown exception 196 if (exchange.getException() != null) { 197 getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); 198 } 199 } 200 }); 201 } else { 202 try { 203 getProcessor().process(exchange); 204 } catch (Exception e) { 205 exchange.setException(e); 206 } 207 208 // handle any thrown exception 209 if (exchange.getException() != null) { 210 getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); 211 } 212 } 213 } 214}