001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.concurrent.ScheduledExecutorService;
020    
021    import org.apache.camel.CamelContext;
022    import org.apache.camel.Exchange;
023    import org.apache.camel.Expression;
024    import org.apache.camel.Processor;
025    import org.apache.camel.RuntimeExchangeException;
026    import org.apache.camel.Traceable;
027    import org.apache.camel.util.ObjectHelper;
028    
029    /**
030     * A <a href="http://camel.apache.org/throttler.html">Throttler</a>
031     * will set a limit on the maximum number of message exchanges which can be sent
032     * to a processor within a specific time period. <p/> This pattern can be
033     * extremely useful if you have some external system which meters access; such
034     * as only allowing 100 requests per second; or if huge load can cause a
035     * particular system to malfunction or to reduce its throughput you might want
036     * to introduce some throttling.
037     * 
038     * @version 
039     */
040    public class Throttler extends DelayProcessorSupport implements Traceable {
041        private volatile long maximumRequestsPerPeriod;
042        private Expression maxRequestsPerPeriodExpression;
043        private long timePeriodMillis = 1000;
044        private volatile TimeSlot slot;
045    
046        public Throttler(CamelContext camelContext, Processor processor, Expression maxRequestsPerPeriodExpression, long timePeriodMillis,
047                         ScheduledExecutorService executorService, boolean shutdownExecutorService) {
048            super(camelContext, processor, executorService, shutdownExecutorService);
049    
050            ObjectHelper.notNull(maxRequestsPerPeriodExpression, "maxRequestsPerPeriodExpression");
051            this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression;
052    
053            if (timePeriodMillis <= 0) {
054                throw new IllegalArgumentException("TimePeriodMillis should be a positive number, was: " + timePeriodMillis);
055            }
056            this.timePeriodMillis = timePeriodMillis;
057        }
058    
059        @Override
060        public String toString() {
061            return "Throttler[requests: " + maxRequestsPerPeriodExpression + " per: " + timePeriodMillis + " (ms) to: "
062                   + getProcessor() + "]";
063        }
064    
065        public String getTraceLabel() {
066            return "throttle[" + maxRequestsPerPeriodExpression + " per: " + timePeriodMillis + "]";
067        }
068    
069        // Properties
070        // -----------------------------------------------------------------------
071    
072        /**
073         * Sets the maximum number of requests per time period expression
074         */
075        public void setMaximumRequestsPerPeriodExpression(Expression maxRequestsPerPeriodExpression) {
076            this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression;
077        }
078    
079        public Expression getMaximumRequestsPerPeriodExpression() {
080            return maxRequestsPerPeriodExpression;
081        }
082        
083        public long getTimePeriodMillis() {
084            return timePeriodMillis;
085        }
086    
087        /**
088         * Gets the current maximum request per period value.
089         */
090        public long getCurrentMaximumRequestsPerPeriod() {
091            return maximumRequestsPerPeriod;
092        }
093    
094        /**
095         * Sets the time period during which the maximum number of requests apply
096         */
097        public void setTimePeriodMillis(long timePeriodMillis) {
098            this.timePeriodMillis = timePeriodMillis;
099        }
100    
101        // Implementation methods
102        // -----------------------------------------------------------------------
103    
104        protected long calculateDelay(Exchange exchange) {
105            // evaluate as Object first to see if we get any result at all
106            Object result = maxRequestsPerPeriodExpression.evaluate(exchange, Object.class);
107            if (result == null) {
108                throw new RuntimeExchangeException("The max requests per period expression was evaluated as null: " + maxRequestsPerPeriodExpression, exchange);
109            }
110    
111            // then must convert value to long
112            Long longValue = exchange.getContext().getTypeConverter().convertTo(Long.class, result);
113            if (longValue != null) {
114                // log if we changed max period after initial setting
115                if (maximumRequestsPerPeriod > 0 && longValue.longValue() != maximumRequestsPerPeriod) {
116                    log.debug("Throttler changed maximum requests per period from {} to {}", maximumRequestsPerPeriod, longValue);
117                }
118                maximumRequestsPerPeriod = longValue;
119            }
120    
121            if (maximumRequestsPerPeriod <= 0) {
122                throw new IllegalStateException("The maximumRequestsPerPeriod must be a positive number, was: " + maximumRequestsPerPeriod);
123            }
124    
125            TimeSlot slot = nextSlot();
126            if (!slot.isActive()) {
127                long delay = slot.startTime - currentSystemTime();
128                return delay;
129            } else {
130                return 0;
131            }
132        }
133        
134        /*
135         * Determine what the next available time slot is for handling an Exchange
136         */
137        protected synchronized TimeSlot nextSlot() {
138            if (slot == null) {
139                slot = new TimeSlot();
140            }
141            if (slot.isFull()) {
142                slot = slot.next();
143            }
144            slot.assign();
145            return slot;
146        }
147    
148        /*
149        * A time slot is capable of handling a number of exchanges within a certain period of time.
150        */
151        protected class TimeSlot {
152            
153            private volatile long capacity = Throttler.this.maximumRequestsPerPeriod;
154            private final long duration = Throttler.this.timePeriodMillis;
155            private final long startTime;
156    
157            protected TimeSlot() {
158                this(System.currentTimeMillis());
159            }
160    
161            protected TimeSlot(long startTime) {
162                this.startTime = startTime;
163            }
164    
165            protected void assign() {
166                capacity--;
167            }
168            
169            /*
170             * Start the next time slot either now or in the future
171             * (no time slots are being created in the past)
172             */
173            protected TimeSlot next() {
174                return new TimeSlot(Math.max(System.currentTimeMillis(), this.startTime + this.duration));
175            }
176            
177            protected boolean isActive() {
178                return startTime <= System.currentTimeMillis();
179            }
180            
181            protected boolean isFull() {
182                return capacity <= 0;
183            }        
184        }
185    }