001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.concurrent.RejectedExecutionException; 020 import java.util.concurrent.ScheduledExecutorService; 021 import java.util.concurrent.TimeUnit; 022 023 import org.apache.camel.AsyncCallback; 024 import org.apache.camel.CamelContext; 025 import org.apache.camel.Exchange; 026 import org.apache.camel.Processor; 027 import org.apache.camel.util.ObjectHelper; 028 import org.slf4j.Logger; 029 import org.slf4j.LoggerFactory; 030 031 /** 032 * A useful base class for any processor which provides some kind of throttling 033 * or delayed processing. 034 * <p/> 035 * This implementation will block while waiting. 036 * 037 * @version 038 */ 039 public abstract class DelayProcessorSupport extends DelegateAsyncProcessor { 040 protected final transient Logger log = LoggerFactory.getLogger(getClass()); 041 private final CamelContext camelContext; 042 private final ScheduledExecutorService executorService; 043 private final boolean shutdownExecutorService; 044 private boolean asyncDelayed; 045 private boolean callerRunsWhenRejected = true; 046 047 // TODO: Add option to cancel tasks on shutdown so we can stop fast 048 049 private final class ProcessCall implements Runnable { 050 private final Exchange exchange; 051 private final AsyncCallback callback; 052 053 public ProcessCall(Exchange exchange, AsyncCallback callback) { 054 this.exchange = exchange; 055 this.callback = callback; 056 } 057 058 public void run() { 059 log.trace("Delayed task woke up and continues routing for exchangeId: {}", exchange.getExchangeId()); 060 if (!isRunAllowed()) { 061 exchange.setException(new RejectedExecutionException("Run is not allowed")); 062 } 063 064 // process the exchange now that we woke up 065 DelayProcessorSupport.super.process(exchange, new AsyncCallback() { 066 @Override 067 public void done(boolean doneSync) { 068 log.trace("Delayed task done for exchangeId: {}", exchange.getExchangeId()); 069 // we must done the callback from this async callback as well, to ensure callback is done correctly 070 // must invoke done on callback with false, as that is what the original caller would 071 // expect as we returned false in the process method 072 callback.done(false); 073 } 074 }); 075 } 076 } 077 078 public DelayProcessorSupport(CamelContext camelContext, Processor processor) { 079 this(camelContext, processor, null, false); 080 } 081 082 public DelayProcessorSupport(CamelContext camelContext, Processor processor, ScheduledExecutorService executorService, boolean shutdownExecutorService) { 083 super(processor); 084 this.camelContext = camelContext; 085 this.executorService = executorService; 086 this.shutdownExecutorService = shutdownExecutorService; 087 } 088 089 @Override 090 public boolean process(Exchange exchange, AsyncCallback callback) { 091 if (!isRunAllowed()) { 092 exchange.setException(new RejectedExecutionException("Run is not allowed")); 093 callback.done(true); 094 return true; 095 } 096 097 // calculate delay and wait 098 long delay; 099 try { 100 delay = calculateDelay(exchange); 101 if (delay <= 0) { 102 // no delay then continue routing 103 log.trace("No delay for exchangeId: {}", exchange.getExchangeId()); 104 return super.process(exchange, callback); 105 } 106 } catch (Throwable e) { 107 exchange.setException(e); 108 callback.done(true); 109 return true; 110 } 111 112 if (!isAsyncDelayed() || exchange.isTransacted()) { 113 // use synchronous delay (also required if using transactions) 114 try { 115 delay(delay, exchange); 116 // then continue routing 117 return super.process(exchange, callback); 118 } catch (Exception e) { 119 // exception occurred so we are done 120 exchange.setException(e); 121 callback.done(true); 122 return true; 123 } 124 } else { 125 // asynchronous delay so schedule a process call task 126 ProcessCall call = new ProcessCall(exchange, callback); 127 try { 128 log.trace("Scheduling delayed task to run in {} millis for exchangeId: {}", 129 delay, exchange.getExchangeId()); 130 executorService.schedule(call, delay, TimeUnit.MILLISECONDS); 131 // tell Camel routing engine we continue routing asynchronous 132 return false; 133 } catch (RejectedExecutionException e) { 134 if (isCallerRunsWhenRejected()) { 135 if (!isRunAllowed()) { 136 exchange.setException(new RejectedExecutionException()); 137 } else { 138 log.debug("Scheduling rejected task, so letting caller run, delaying at first for {} millis for exchangeId: {}", delay, exchange.getExchangeId()); 139 // let caller run by processing 140 try { 141 delay(delay, exchange); 142 } catch (InterruptedException ie) { 143 exchange.setException(ie); 144 } 145 // then continue routing 146 return super.process(exchange, callback); 147 } 148 } else { 149 exchange.setException(e); 150 } 151 // caller don't run the task so we are done 152 callback.done(true); 153 return true; 154 } 155 } 156 } 157 158 public boolean isAsyncDelayed() { 159 return asyncDelayed; 160 } 161 162 public void setAsyncDelayed(boolean asyncDelayed) { 163 this.asyncDelayed = asyncDelayed; 164 } 165 166 public boolean isCallerRunsWhenRejected() { 167 return callerRunsWhenRejected; 168 } 169 170 public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) { 171 this.callerRunsWhenRejected = callerRunsWhenRejected; 172 } 173 174 protected abstract long calculateDelay(Exchange exchange); 175 176 /** 177 * Delays the given time before continuing. 178 * <p/> 179 * This implementation will block while waiting 180 * 181 * @param delay the delay time in millis 182 * @param exchange the exchange being processed 183 */ 184 protected void delay(long delay, Exchange exchange) throws InterruptedException { 185 // only run is we are started 186 if (!isRunAllowed()) { 187 return; 188 } 189 190 if (delay < 0) { 191 return; 192 } else { 193 try { 194 sleep(delay); 195 } catch (InterruptedException e) { 196 handleSleepInterruptedException(e, exchange); 197 } 198 } 199 } 200 201 /** 202 * Called when a sleep is interrupted; allows derived classes to handle this case differently 203 */ 204 protected void handleSleepInterruptedException(InterruptedException e, Exchange exchange) throws InterruptedException { 205 if (log.isDebugEnabled()) { 206 log.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); 207 } 208 Thread.currentThread().interrupt(); 209 throw e; 210 } 211 212 protected long currentSystemTime() { 213 return System.currentTimeMillis(); 214 } 215 216 private void sleep(long delay) throws InterruptedException { 217 if (delay <= 0) { 218 return; 219 } 220 log.trace("Sleeping for: {} millis", delay); 221 Thread.sleep(delay); 222 } 223 224 @Override 225 protected void doStart() throws Exception { 226 if (isAsyncDelayed()) { 227 ObjectHelper.notNull(executorService, "executorService", this); 228 } 229 super.doStart(); 230 } 231 232 @Override 233 protected void doShutdown() throws Exception { 234 if (shutdownExecutorService && executorService != null) { 235 camelContext.getExecutorServiceManager().shutdownNow(executorService); 236 } 237 super.doShutdown(); 238 } 239 }