001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import java.util.Locale;
020import java.util.concurrent.TimeUnit;
021
022import org.apache.camel.AsyncCallback;
023import org.apache.camel.Exchange;
024import org.apache.camel.Processor;
025import org.apache.camel.Traceable;
026import org.apache.camel.spi.IdAware;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * A <code>SamplingThrottler</code> is a special kind of throttler. It also
032 * limits the number of exchanges sent to a downstream endpoint. It differs from
033 * a normal throttler in that it will not queue exchanges above the threshold
034 * for a given period. Instead these exchanges will be stopped, precluding them
035 * from being processed at all by downstream consumers.
036 * <p/>
037 * This kind of throttling can be useful for taking a sample from
038 * an exchange stream, rough consolidation of noisy and bursty exchange traffic
039 * or where queuing of throttled exchanges is undesirable.
040 *
041 * @version 
042 */
043public class SamplingThrottler extends DelegateAsyncProcessor implements Traceable, IdAware {
044
045    private static final Logger LOG = LoggerFactory.getLogger(SamplingThrottler.class);
046    private String id;
047    private long messageFrequency;
048    private long currentMessageCount;
049    private long samplePeriod;
050    private long periodInMillis;
051    private TimeUnit units;
052    private long timeOfLastExchange;
053    private StopProcessor stopper = new StopProcessor();
054    private final Object calculationLock = new Object();
055    private SampleStats sampled = new SampleStats();
056
057    public SamplingThrottler(Processor processor, long messageFrequency) {
058        super(processor);
059
060        if (messageFrequency <= 0) {
061            throw new IllegalArgumentException("A positive value is required for the sampling message frequency");
062        }
063        this.messageFrequency = messageFrequency;
064    }
065
066    public SamplingThrottler(Processor processor, long samplePeriod, TimeUnit units) {
067        super(processor);
068
069        if (samplePeriod <= 0) {
070            throw new IllegalArgumentException("A positive value is required for the sampling period");
071        }
072        if (units == null) {
073            throw new IllegalArgumentException("A invalid null value was supplied for the units of the sampling period");
074        }
075        this.samplePeriod = samplePeriod;
076        this.units = units;
077        this.periodInMillis = units.toMillis(samplePeriod);
078    }
079
080    @Override
081    public String toString() {
082        if (messageFrequency > 0) {
083            return "SamplingThrottler[1 exchange per: " + messageFrequency + " messages received -> " + getProcessor() + "]";
084        } else {
085            return "SamplingThrottler[1 exchange per: " + samplePeriod + " " + units.toString().toLowerCase(Locale.ENGLISH) + " -> " + getProcessor() + "]";
086        }
087    }
088
089    public String getId() {
090        return id;
091    }
092
093    public void setId(String id) {
094        this.id = id;
095    }
096
097    public String getTraceLabel() {
098        if (messageFrequency > 0) {
099            return "samplingThrottler[1 exchange per: " + messageFrequency + " messages received]";
100        } else {
101            return "samplingThrottler[1 exchange per: " + samplePeriod + " " + units.toString().toLowerCase(Locale.ENGLISH) + "]";
102        }
103    }
104
105    public long getMessageFrequency() {
106        return messageFrequency;
107    }
108
109    public long getSamplePeriod() {
110        return samplePeriod;
111    }
112
113    public TimeUnit getUnits() {
114        return units;
115    }
116
117    @Override
118    public boolean process(Exchange exchange, AsyncCallback callback) {
119        boolean doSend = false;
120
121        synchronized (calculationLock) {
122            
123            if (messageFrequency > 0) {
124                currentMessageCount++;
125                if (currentMessageCount % messageFrequency == 0) {
126                    doSend = true;
127                }
128            } else {
129                long now = System.currentTimeMillis();
130                if (now >= timeOfLastExchange + periodInMillis) {
131                    doSend = true;
132                    if (LOG.isTraceEnabled()) {
133                        LOG.trace(sampled.sample());
134                    }
135                    timeOfLastExchange = now;
136                } else {
137                    if (LOG.isTraceEnabled()) {
138                        LOG.trace(sampled.drop());
139                    }
140                }
141            }
142        }
143
144        if (doSend) {
145            // continue routing
146            return processor.process(exchange, callback);
147        } else {
148            // okay to invoke this synchronously as the stopper
149            // will just set a property
150            try {
151                stopper.process(exchange);
152            } catch (Exception e) {
153                exchange.setException(e);
154            }
155        }
156
157        // we are done synchronously
158        callback.done(true);
159        return true;
160    }
161
162    private static class SampleStats {
163        private long droppedThisPeriod;
164        private long totalDropped;
165        private long totalSampled;
166        private long totalThisPeriod;
167
168        String drop() {
169            droppedThisPeriod++;
170            totalThisPeriod++;
171            totalDropped++;
172            return getDroppedLog();
173        }
174
175        String sample() {
176            totalThisPeriod = 1; // a new period, reset to 1
177            totalSampled++;
178            droppedThisPeriod = 0;
179            return getSampledLog();
180        }
181
182        String getSampledLog() {
183            return String.format("Sampled %d of %d total exchanges", totalSampled, totalSampled + totalDropped);
184        }
185
186        String getDroppedLog() {
187            return String.format("Dropped %d of %d exchanges in this period, totalling %d dropped of %d exchanges overall.",
188                droppedThisPeriod, totalThisPeriod, totalDropped, totalSampled + totalDropped);
189        }
190    }
191
192}