001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.io.Serializable;
020    import java.util.Random;
021    
022    import org.apache.camel.Exchange;
023    import org.apache.camel.LoggingLevel;
024    import org.apache.camel.Predicate;
025    import org.apache.camel.util.ObjectHelper;
026    import org.slf4j.Logger;
027    import org.slf4j.LoggerFactory;
028    
029    /**
030     * The policy used to decide how many times to redeliver and the time between
031     * the redeliveries before being sent to a <a
032     * href="http://camel.apache.org/dead-letter-channel.html">Dead Letter
033     * Channel</a>
034     * <p>
035     * The default values are:
036     * <ul>
037     *   <li>maximumRedeliveries = 0</li>
038     *   <li>redeliveryDelay = 1000L (the initial delay)</li>
039     *   <li>maximumRedeliveryDelay = 60 * 1000L</li>
040     *   <li>asyncDelayedRedelivery = false</li>
041     *   <li>backOffMultiplier = 2</li>
042     *   <li>useExponentialBackOff = false</li>
043     *   <li>collisionAvoidanceFactor = 0.15d</li>
044     *   <li>useCollisionAvoidance = false</li>
045     *   <li>retriesExhaustedLogLevel = LoggingLevel.ERROR</li>
046     *   <li>retryAttemptedLogLevel = LoggingLevel.DEBUG</li>
047     *   <li>logRetryAttempted = true</li>
048     *   <li>logRetryStackTrace = false</li>
049     *   <li>logStackTrace = true</li>
050     *   <li>logHandled = false</li>
051     *   <li>logExhausted = true</li>
052     * </ul>
053     * <p/>
054     * Setting the maximumRedeliveries to a negative value such as -1 will then always redeliver (unlimited).
055     * Setting the maximumRedeliveries to 0 will disable redelivery.
056     * <p/>
057     * This policy can be configured either by one of the following two settings:
058     * <ul>
059     *   <li>using conventional options, using all the options defined above</li>
060     *   <li>using delay pattern to declare intervals for delays</li>
061     * </ul>
062     * <p/>
063     * <b>Note:</b> If using delay patterns then the following options is not used (delay, backOffMultiplier, useExponentialBackOff, useCollisionAvoidance)
064     * <p/>
065     * <b>Using delay pattern</b>:
066     * <br/>The delay pattern syntax is: <tt>limit:delay;limit 2:delay 2;limit 3:delay 3;...;limit N:delay N</tt>.
067     * <p/>
068     * How it works is best illustrate with an example with this pattern: <tt>delayPattern=5:1000;10:5000:20:20000</tt>
069     * <br/>The delays will be for attempt in range 0..4 = 0 millis, 5..9 = 1000 millis, 10..19 = 5000 millis, >= 20 = 20000 millis.
070     * <p/>
071     * If you want to set a starting delay, then use 0 as the first limit, eg: <tt>0:1000;5:5000</tt> will use 1 sec delay
072     * until attempt number 5 where it will use 5 seconds going forward.
073     *
074     * @version 
075     */
076    public class RedeliveryPolicy implements Cloneable, Serializable {
077        protected static transient Random randomNumberGenerator;
078        private static final long serialVersionUID = -338222777701473252L;
079        private static final transient Logger LOG = LoggerFactory.getLogger(RedeliveryPolicy.class);
080    
081        protected long redeliveryDelay = 1000L;
082        protected int maximumRedeliveries;
083        protected long maximumRedeliveryDelay = 60 * 1000L;
084        protected double backOffMultiplier = 2;
085        protected boolean useExponentialBackOff;
086        // +/-15% for a 30% spread -cgs
087        protected double collisionAvoidanceFactor = 0.15d;
088        protected boolean useCollisionAvoidance;
089        protected LoggingLevel retriesExhaustedLogLevel = LoggingLevel.ERROR;
090        protected LoggingLevel retryAttemptedLogLevel = LoggingLevel.DEBUG;
091        protected boolean logStackTrace = true;
092        protected boolean logRetryStackTrace;
093        protected boolean logHandled;
094        protected boolean logContinued;
095        protected boolean logExhausted = true;
096        protected boolean logRetryAttempted = true;
097        protected String delayPattern;
098        protected boolean asyncDelayedRedelivery;
099    
100        public RedeliveryPolicy() {
101        }
102    
103        @Override
104        public String toString() {
105            return "RedeliveryPolicy[maximumRedeliveries=" + maximumRedeliveries
106                + ", redeliveryDelay=" + redeliveryDelay
107                + ", maximumRedeliveryDelay=" + maximumRedeliveryDelay
108                + ", asyncDelayedRedelivery=" + asyncDelayedRedelivery
109                + ", retriesExhaustedLogLevel=" + retriesExhaustedLogLevel
110                + ", retryAttemptedLogLevel=" + retryAttemptedLogLevel
111                + ", logRetryAttempted=" + logRetryAttempted
112                + ", logStackTrace=" + logStackTrace
113                + ", logRetryStackTrace=" + logRetryStackTrace
114                + ", logHandled=" + logHandled
115                + ", logContinued=" + logContinued
116                + ", logExhausted=" + logExhausted
117                + ", useExponentialBackOff="  + useExponentialBackOff
118                + ", backOffMultiplier=" + backOffMultiplier
119                + ", useCollisionAvoidance=" + useCollisionAvoidance
120                + ", collisionAvoidanceFactor=" + collisionAvoidanceFactor
121                + ", delayPattern=" + delayPattern + "]";
122        }
123    
124        public RedeliveryPolicy copy() {
125            try {
126                return (RedeliveryPolicy)clone();
127            } catch (CloneNotSupportedException e) {
128                throw new RuntimeException("Could not clone: " + e, e);
129            }
130        }
131    
132        /**
133         * Returns true if the policy decides that the message exchange should be
134         * redelivered.
135         *
136         * @param exchange  the current exchange
137         * @param redeliveryCounter  the current retry counter
138         * @param retryWhile  an optional predicate to determine if we should redeliver or not
139         * @return true to redeliver, false to stop
140         */
141        public boolean shouldRedeliver(Exchange exchange, int redeliveryCounter, Predicate retryWhile) {
142            // predicate is always used if provided
143            if (retryWhile != null) {
144                return retryWhile.matches(exchange);
145            }
146    
147            if (getMaximumRedeliveries() < 0) {
148                // retry forever if negative value
149                return true;
150            }
151            // redeliver until we hit the max
152            return redeliveryCounter <= getMaximumRedeliveries();
153        }
154    
155    
156        /**
157         * Calculates the new redelivery delay based on the last one and then <b>sleeps</b> for the necessary amount of time.
158         * <p/>
159         * This implementation will block while sleeping.
160         *
161         * @param redeliveryDelay  previous redelivery delay
162         * @param redeliveryCounter  number of previous redelivery attempts
163         * @return the calculate delay
164         * @throws InterruptedException is thrown if the sleep is interrupted likely because of shutdown
165         */
166        public long sleep(long redeliveryDelay, int redeliveryCounter) throws InterruptedException {
167            redeliveryDelay = calculateRedeliveryDelay(redeliveryDelay, redeliveryCounter);
168    
169            if (redeliveryDelay > 0) {
170                sleep(redeliveryDelay);
171            }
172            return redeliveryDelay;
173        }
174    
175        /**
176         * Sleeps for the given delay
177         *
178         * @param redeliveryDelay  the delay
179         * @throws InterruptedException is thrown if the sleep is interrupted likely because of shutdown
180         */
181        public void sleep(long redeliveryDelay) throws InterruptedException {
182            LOG.debug("Sleeping for: {} millis until attempting redelivery", redeliveryDelay);
183            Thread.sleep(redeliveryDelay);
184        }
185    
186        /**
187         * Calculates the new redelivery delay based on the last one
188         *
189         * @param previousDelay  previous redelivery delay
190         * @param redeliveryCounter  number of previous redelivery attempts
191         * @return the calculate delay
192         */
193        public long calculateRedeliveryDelay(long previousDelay, int redeliveryCounter) {
194            if (ObjectHelper.isNotEmpty(delayPattern)) {
195                // calculate delay using the pattern
196                return calculateRedeliverDelayUsingPattern(delayPattern, redeliveryCounter);
197            }
198    
199            // calculate the delay using the conventional parameters
200            long redeliveryDelayResult;
201            if (previousDelay == 0) {
202                redeliveryDelayResult = redeliveryDelay;
203            } else if (useExponentialBackOff && backOffMultiplier > 1) {
204                redeliveryDelayResult = Math.round(backOffMultiplier * previousDelay);
205            } else {
206                redeliveryDelayResult = previousDelay;
207            }
208    
209            if (useCollisionAvoidance) {
210    
211                /*
212                 * First random determines +/-, second random determines how far to
213                 * go in that direction. -cgs
214                 */
215                Random random = getRandomNumberGenerator();
216                double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor)
217                                  * random.nextDouble();
218                redeliveryDelayResult += redeliveryDelayResult * variance;
219            }
220    
221            // ensure the calculated result is not bigger than the max delay (if configured)
222            if (maximumRedeliveryDelay > 0 && redeliveryDelayResult > maximumRedeliveryDelay) {
223                redeliveryDelayResult = maximumRedeliveryDelay;
224            }
225    
226            return redeliveryDelayResult;
227        }
228    
229        /**
230         * Calculates the delay using the delay pattern
231         */
232        protected static long calculateRedeliverDelayUsingPattern(String delayPattern, int redeliveryCounter) {
233            String[] groups = delayPattern.split(";");
234            // find the group where the redelivery counter matches
235            long answer = 0;
236            for (String group : groups) {
237                long delay = Long.valueOf(ObjectHelper.after(group, ":"));
238                int count = Integer.valueOf(ObjectHelper.before(group, ":"));
239                if (count > redeliveryCounter) {
240                    break;
241                } else {
242                    answer = delay;
243                }
244            }
245    
246            return answer;
247        }
248    
249        // Builder methods
250        // -------------------------------------------------------------------------
251    
252        /**
253         * Sets the initial redelivery delay in milliseconds
254         *
255         * @deprecated will be removed in the near future. Instead use {@link #redeliveryDelay(long)} instead
256         */
257        @Deprecated
258        public RedeliveryPolicy redeliverDelay(long delay) {
259            return redeliveryDelay(delay);
260        }
261    
262        /**
263         * Sets the initial redelivery delay in milliseconds
264         */
265        public RedeliveryPolicy redeliveryDelay(long delay) {
266            setRedeliveryDelay(delay);
267            return this;
268        }
269    
270        /**
271         * Sets the maximum number of times a message exchange will be redelivered
272         */
273        public RedeliveryPolicy maximumRedeliveries(int maximumRedeliveries) {
274            setMaximumRedeliveries(maximumRedeliveries);
275            return this;
276        }
277    
278        /**
279         * Enables collision avoidance which adds some randomization to the backoff
280         * timings to reduce contention probability
281         */
282        public RedeliveryPolicy useCollisionAvoidance() {
283            setUseCollisionAvoidance(true);
284            return this;
285        }
286    
287        /**
288         * Enables exponential backoff using the {@link #getBackOffMultiplier()} to
289         * increase the time between retries
290         */
291        public RedeliveryPolicy useExponentialBackOff() {
292            setUseExponentialBackOff(true);
293            return this;
294        }
295    
296        /**
297         * Enables exponential backoff and sets the multiplier used to increase the
298         * delay between redeliveries
299         */
300        public RedeliveryPolicy backOffMultiplier(double multiplier) {
301            useExponentialBackOff();
302            setBackOffMultiplier(multiplier);
303            return this;
304        }
305    
306        /**
307         * Enables collision avoidance and sets the percentage used
308         */
309        public RedeliveryPolicy collisionAvoidancePercent(double collisionAvoidancePercent) {
310            useCollisionAvoidance();
311            setCollisionAvoidancePercent(collisionAvoidancePercent);
312            return this;
313        }
314    
315        /**
316         * Sets the maximum redelivery delay if using exponential back off.
317         * Use -1 if you wish to have no maximum
318         */
319        public RedeliveryPolicy maximumRedeliveryDelay(long maximumRedeliveryDelay) {
320            setMaximumRedeliveryDelay(maximumRedeliveryDelay);
321            return this;
322        }
323    
324        /**
325         * Sets the logging level to use for log messages when retries have been exhausted.
326         */
327        public RedeliveryPolicy retriesExhaustedLogLevel(LoggingLevel retriesExhaustedLogLevel) {
328            setRetriesExhaustedLogLevel(retriesExhaustedLogLevel);
329            return this;
330        }    
331    
332        /**
333         * Sets the logging level to use for log messages when retries are attempted.
334         */    
335        public RedeliveryPolicy retryAttemptedLogLevel(LoggingLevel retryAttemptedLogLevel) {
336            setRetryAttemptedLogLevel(retryAttemptedLogLevel);
337            return this;
338        }
339    
340        /**
341         * Sets whether to log retry attempts
342         */
343        public RedeliveryPolicy logRetryAttempted(boolean logRetryAttempted) {
344            setLogRetryAttempted(logRetryAttempted);
345            return this;
346        }
347    
348        /**
349         * Sets whether to log stacktrace for failed messages.
350         */
351        public RedeliveryPolicy logStackTrace(boolean logStackTrace) {
352            setLogStackTrace(logStackTrace);
353            return this;
354        }
355    
356        /**
357         * Sets whether to log stacktrace for failed redelivery attempts
358         */
359        public RedeliveryPolicy logRetryStackTrace(boolean logRetryStackTrace) {
360            setLogRetryStackTrace(logRetryStackTrace);
361            return this;
362        }
363    
364        /**
365         * Sets whether to log errors even if its handled
366         */
367        public RedeliveryPolicy logHandled(boolean logHandled) {
368            setLogHandled(logHandled);
369            return this;
370        }
371    
372        /**
373         * Sets whether to log exhausted errors
374         */
375        public RedeliveryPolicy logExhausted(boolean logExhausted) {
376            setLogExhausted(logExhausted);
377            return this;
378        }
379    
380        /**
381         * Sets the delay pattern with delay intervals.
382         */
383        public RedeliveryPolicy delayPattern(String delayPattern) {
384            setDelayPattern(delayPattern);
385            return this;
386        }
387    
388        /**
389         * Disables redelivery by setting maximum redeliveries to 0.
390         */
391        public RedeliveryPolicy disableRedelivery() {
392            setMaximumRedeliveries(0);
393            return this;
394        }
395    
396        /**
397         * Allow asynchronous delayed redelivery.
398         *
399         * @see #setAsyncDelayedRedelivery(boolean)
400         */
401        public RedeliveryPolicy asyncDelayedRedelivery() {
402            setAsyncDelayedRedelivery(true);
403            return this;
404        }
405    
406        // Properties
407        // -------------------------------------------------------------------------
408    
409        /**
410         * @deprecated will be removed in the near future. Instead use {@link #getRedeliveryDelay()}
411         */
412        @Deprecated
413        public long getRedeliverDelay() {
414            return getRedeliveryDelay();
415        }
416    
417        /**
418         * @deprecated will be removed in the near future. Instead use {@link #setRedeliveryDelay(long)}
419         */
420        @Deprecated
421        public void setRedeliverDelay(long redeliveryDelay) {
422            setRedeliveryDelay(redeliveryDelay);
423        }
424        
425        public long getRedeliveryDelay() {
426            return redeliveryDelay;
427        }
428    
429        /**
430         * Sets the initial redelivery delay in milliseconds
431         */
432        public void setRedeliveryDelay(long redeliverDelay) {
433            this.redeliveryDelay = redeliverDelay;
434            // if max enabled then also set max to this value in case max was too low
435            if (maximumRedeliveryDelay > 0 && redeliverDelay > maximumRedeliveryDelay) {
436                this.maximumRedeliveryDelay = redeliverDelay;
437            }
438        }
439    
440        public double getBackOffMultiplier() {
441            return backOffMultiplier;
442        }
443    
444        /**
445         * Sets the multiplier used to increase the delay between redeliveries if
446         * {@link #setUseExponentialBackOff(boolean)} is enabled
447         */
448        public void setBackOffMultiplier(double backOffMultiplier) {
449            this.backOffMultiplier = backOffMultiplier;
450        }
451    
452        public long getCollisionAvoidancePercent() {
453            return Math.round(collisionAvoidanceFactor * 100);
454        }
455    
456        /**
457         * Sets the percentage used for collision avoidance if enabled via
458         * {@link #setUseCollisionAvoidance(boolean)}
459         */
460        public void setCollisionAvoidancePercent(double collisionAvoidancePercent) {
461            this.collisionAvoidanceFactor = collisionAvoidancePercent * 0.01d;
462        }
463    
464        public double getCollisionAvoidanceFactor() {
465            return collisionAvoidanceFactor;
466        }
467    
468        /**
469         * Sets the factor used for collision avoidance if enabled via
470         * {@link #setUseCollisionAvoidance(boolean)}
471         */
472        public void setCollisionAvoidanceFactor(double collisionAvoidanceFactor) {
473            this.collisionAvoidanceFactor = collisionAvoidanceFactor;
474        }
475    
476        public int getMaximumRedeliveries() {
477            return maximumRedeliveries;
478        }
479    
480        /**
481         * Sets the maximum number of times a message exchange will be redelivered.
482         * Setting a negative value will retry forever.
483         */
484        public void setMaximumRedeliveries(int maximumRedeliveries) {
485            this.maximumRedeliveries = maximumRedeliveries;
486        }
487    
488        public long getMaximumRedeliveryDelay() {
489            return maximumRedeliveryDelay;
490        }
491    
492        /**
493         * Sets the maximum redelivery delay.
494         * Use -1 if you wish to have no maximum
495         */
496        public void setMaximumRedeliveryDelay(long maximumRedeliveryDelay) {
497            this.maximumRedeliveryDelay = maximumRedeliveryDelay;
498        }
499    
500        public boolean isUseCollisionAvoidance() {
501            return useCollisionAvoidance;
502        }
503    
504        /**
505         * Enables/disables collision avoidance which adds some randomization to the
506         * backoff timings to reduce contention probability
507         */
508        public void setUseCollisionAvoidance(boolean useCollisionAvoidance) {
509            this.useCollisionAvoidance = useCollisionAvoidance;
510        }
511    
512        public boolean isUseExponentialBackOff() {
513            return useExponentialBackOff;
514        }
515    
516        /**
517         * Enables/disables exponential backoff using the
518         * {@link #getBackOffMultiplier()} to increase the time between retries
519         */
520        public void setUseExponentialBackOff(boolean useExponentialBackOff) {
521            this.useExponentialBackOff = useExponentialBackOff;
522        }
523    
524        protected static synchronized Random getRandomNumberGenerator() {
525            if (randomNumberGenerator == null) {
526                randomNumberGenerator = new Random();
527            }
528            return randomNumberGenerator;
529        }
530    
531        /**
532         * Sets the logging level to use for log messages when retries have been exhausted.
533         */    
534        public void setRetriesExhaustedLogLevel(LoggingLevel retriesExhaustedLogLevel) {
535            this.retriesExhaustedLogLevel = retriesExhaustedLogLevel;        
536        }
537        
538        public LoggingLevel getRetriesExhaustedLogLevel() {
539            return retriesExhaustedLogLevel;
540        }
541    
542        /**
543         * Sets the logging level to use for log messages when retries are attempted.
544         */    
545        public void setRetryAttemptedLogLevel(LoggingLevel retryAttemptedLogLevel) {
546            this.retryAttemptedLogLevel = retryAttemptedLogLevel;
547        }
548    
549        public LoggingLevel getRetryAttemptedLogLevel() {
550            return retryAttemptedLogLevel;
551        }
552    
553        public String getDelayPattern() {
554            return delayPattern;
555        }
556    
557        /**
558         * Sets an optional delay pattern to use instead of fixed delay.
559         */
560        public void setDelayPattern(String delayPattern) {
561            this.delayPattern = delayPattern;
562        }
563    
564        public boolean isLogStackTrace() {
565            return logStackTrace;
566        }
567    
568        /**
569         * Sets whether stack traces should be logged or not
570         */
571        public void setLogStackTrace(boolean logStackTrace) {
572            this.logStackTrace = logStackTrace;
573        }
574    
575        public boolean isLogRetryStackTrace() {
576            return logRetryStackTrace;
577        }
578    
579        /**
580         * Sets whether stack traces should be logged or not
581         */
582        public void setLogRetryStackTrace(boolean logRetryStackTrace) {
583            this.logRetryStackTrace = logRetryStackTrace;
584        }
585    
586        public boolean isLogHandled() {
587            return logHandled;
588        }
589    
590        /**
591         * Sets whether errors should be logged even if its handled
592         */
593        public void setLogHandled(boolean logHandled) {
594            this.logHandled = logHandled;
595        }
596    
597        public boolean isLogContinued() {
598            return logContinued;
599        }
600    
601        /**
602         * Sets whether errors should be logged even if its continued
603         */
604        public void setLogContinued(boolean logContinued) {
605            this.logContinued = logContinued;
606        }
607    
608        public boolean isLogRetryAttempted() {
609            return logRetryAttempted;
610        }
611    
612        /**
613         * Sets whether retry attempts should be logged or not
614         */
615        public void setLogRetryAttempted(boolean logRetryAttempted) {
616            this.logRetryAttempted = logRetryAttempted;
617        }
618    
619        public boolean isLogExhausted() {
620            return logExhausted;
621        }
622    
623        /**
624         * Sets whether exhausted exceptions should be logged or not
625         */
626        public void setLogExhausted(boolean logExhausted) {
627            this.logExhausted = logExhausted;
628        }
629    
630        public boolean isAsyncDelayedRedelivery() {
631            return asyncDelayedRedelivery;
632        }
633    
634        /**
635         * Sets whether asynchronous delayed redelivery is allowed.
636         * <p/>
637         * This is disabled by default.
638         * <p/>
639         * When enabled it allows Camel to schedule a future task for delayed
640         * redelivery which prevents current thread from blocking while waiting.
641         * <p/>
642         * Exchange which is transacted will however always use synchronous delayed redelivery
643         * because the transaction must execute in the same thread context.
644         *
645         * @param asyncDelayedRedelivery whether asynchronous delayed redelivery is allowed
646         */
647        public void setAsyncDelayedRedelivery(boolean asyncDelayedRedelivery) {
648            this.asyncDelayedRedelivery = asyncDelayedRedelivery;
649        }
650    }