001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.concurrent.RejectedExecutionException;
020    import java.util.concurrent.ScheduledExecutorService;
021    import java.util.concurrent.TimeUnit;
022    
023    import org.apache.camel.AsyncCallback;
024    import org.apache.camel.CamelContext;
025    import org.apache.camel.Exchange;
026    import org.apache.camel.Processor;
027    import org.apache.camel.util.ObjectHelper;
028    import org.slf4j.Logger;
029    import org.slf4j.LoggerFactory;
030    
031    /**
032     * A useful base class for any processor which provides some kind of throttling
033     * or delayed processing.
034     * <p/>
035     * This implementation will block while waiting.
036     * 
037     * @version 
038     */
039    public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
040        protected final transient Logger log = LoggerFactory.getLogger(getClass());
041        private final CamelContext camelContext;
042        private final ScheduledExecutorService executorService;
043        private final boolean shutdownExecutorService;
044        private boolean asyncDelayed;
045        private boolean callerRunsWhenRejected = true;
046    
047        // TODO: Add option to cancel tasks on shutdown so we can stop fast
048    
049        private final class ProcessCall implements Runnable {
050            private final Exchange exchange;
051            private final AsyncCallback callback;
052    
053            public ProcessCall(Exchange exchange, AsyncCallback callback) {
054                this.exchange = exchange;
055                this.callback = callback;
056            }
057    
058            public void run() {
059                log.trace("Delayed task woke up and continues routing for exchangeId: {}", exchange.getExchangeId());
060                if (!isRunAllowed()) {
061                    exchange.setException(new RejectedExecutionException("Run is not allowed"));
062                }
063    
064                // process the exchange now that we woke up
065                DelayProcessorSupport.super.process(exchange, new AsyncCallback() {
066                    @Override
067                    public void done(boolean doneSync) {
068                        log.trace("Delayed task done for exchangeId: {}", exchange.getExchangeId());
069                        // we must done the callback from this async callback as well, to ensure callback is done correctly
070                        // must invoke done on callback with false, as that is what the original caller would
071                        // expect as we returned false in the process method
072                        callback.done(false);
073                    }
074                });
075            }
076        }
077    
078        public DelayProcessorSupport(CamelContext camelContext, Processor processor) {
079            this(camelContext, processor, null, false);
080        }
081    
082        public DelayProcessorSupport(CamelContext camelContext, Processor processor, ScheduledExecutorService executorService, boolean shutdownExecutorService) {
083            super(processor);
084            this.camelContext = camelContext;
085            this.executorService = executorService;
086            this.shutdownExecutorService = shutdownExecutorService;
087        }
088    
089        @Override
090        public boolean process(Exchange exchange, AsyncCallback callback) {
091            if (!isRunAllowed()) {
092                exchange.setException(new RejectedExecutionException("Run is not allowed"));
093                callback.done(true);
094                return true;
095            }
096    
097            // calculate delay and wait
098            long delay;
099            try {
100                delay = calculateDelay(exchange);
101                if (delay <= 0) {
102                    // no delay then continue routing
103                    log.trace("No delay for exchangeId: {}", exchange.getExchangeId());
104                    return super.process(exchange, callback);
105                }
106            } catch (Throwable e) {
107                exchange.setException(e);
108                callback.done(true);
109                return true;
110            }
111    
112            if (!isAsyncDelayed() || exchange.isTransacted()) {
113                // use synchronous delay (also required if using transactions)
114                try {
115                    delay(delay, exchange);
116                    // then continue routing
117                    return super.process(exchange, callback);
118                } catch (Exception e) {
119                    // exception occurred so we are done
120                    exchange.setException(e);
121                    callback.done(true);
122                    return true;
123                }
124            } else {
125                // asynchronous delay so schedule a process call task
126                ProcessCall call = new ProcessCall(exchange, callback);
127                try {
128                    log.trace("Scheduling delayed task to run in {} millis for exchangeId: {}",
129                            delay, exchange.getExchangeId());
130                    executorService.schedule(call, delay, TimeUnit.MILLISECONDS);
131                    // tell Camel routing engine we continue routing asynchronous
132                    return false;
133                } catch (RejectedExecutionException e) {
134                    if (isCallerRunsWhenRejected()) {
135                        if (!isRunAllowed()) {
136                            exchange.setException(new RejectedExecutionException());
137                        } else {
138                            log.debug("Scheduling rejected task, so letting caller run, delaying at first for {} millis for exchangeId: {}", delay, exchange.getExchangeId());
139                            // let caller run by processing
140                            try {
141                                delay(delay, exchange);
142                            } catch (InterruptedException ie) {
143                                exchange.setException(ie);
144                            }
145                            // then continue routing
146                            return super.process(exchange, callback);
147                        }
148                    } else {
149                        exchange.setException(e);
150                    }
151                    // caller don't run the task so we are done
152                    callback.done(true);
153                    return true;
154                }
155            }
156        }
157    
158        public boolean isAsyncDelayed() {
159            return asyncDelayed;
160        }
161    
162        public void setAsyncDelayed(boolean asyncDelayed) {
163            this.asyncDelayed = asyncDelayed;
164        }
165    
166        public boolean isCallerRunsWhenRejected() {
167            return callerRunsWhenRejected;
168        }
169    
170        public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) {
171            this.callerRunsWhenRejected = callerRunsWhenRejected;
172        }
173    
174        protected abstract long calculateDelay(Exchange exchange);
175    
176        /**
177         * Delays the given time before continuing.
178         * <p/>
179         * This implementation will block while waiting
180         * 
181         * @param delay the delay time in millis
182         * @param exchange the exchange being processed
183         */
184        protected void delay(long delay, Exchange exchange) throws InterruptedException {
185            // only run is we are started
186            if (!isRunAllowed()) {
187                return;
188            }
189    
190            if (delay < 0) {
191                return;
192            } else {
193                try {
194                    sleep(delay);
195                } catch (InterruptedException e) {
196                    handleSleepInterruptedException(e, exchange);
197                }
198            }
199        }
200    
201        /**
202         * Called when a sleep is interrupted; allows derived classes to handle this case differently
203         */
204        protected void handleSleepInterruptedException(InterruptedException e, Exchange exchange) throws InterruptedException {
205            if (log.isDebugEnabled()) {
206                log.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped());
207            }
208            Thread.currentThread().interrupt();
209            throw e;
210        }
211    
212        protected long currentSystemTime() {
213            return System.currentTimeMillis();
214        }
215    
216        private void sleep(long delay) throws InterruptedException {
217            if (delay <= 0) {
218                return;
219            }
220            log.trace("Sleeping for: {} millis", delay);
221            Thread.sleep(delay);
222        }
223    
224        @Override
225        protected void doStart() throws Exception {
226            if (isAsyncDelayed()) {
227                ObjectHelper.notNull(executorService, "executorService", this);
228            }
229            super.doStart();
230        }
231    
232        @Override
233        protected void doShutdown() throws Exception {
234            if (shutdownExecutorService && executorService != null) {
235                camelContext.getExecutorServiceManager().shutdownNow(executorService);
236            }
237            super.doShutdown();
238        }
239    }