001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.util.Locale; 020import java.util.concurrent.TimeUnit; 021 022import org.apache.camel.AsyncCallback; 023import org.apache.camel.Exchange; 024import org.apache.camel.Processor; 025import org.apache.camel.Traceable; 026import org.apache.camel.spi.IdAware; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * A <code>SamplingThrottler</code> is a special kind of throttler. It also 032 * limits the number of exchanges sent to a downstream endpoint. It differs from 033 * a normal throttler in that it will not queue exchanges above the threshold 034 * for a given period. Instead these exchanges will be stopped, precluding them 035 * from being processed at all by downstream consumers. 036 * <p/> 037 * This kind of throttling can be useful for taking a sample from 038 * an exchange stream, rough consolidation of noisy and bursty exchange traffic 039 * or where queuing of throttled exchanges is undesirable. 040 * 041 * @version 042 */ 043public class SamplingThrottler extends DelegateAsyncProcessor implements Traceable, IdAware { 044 045 private static final Logger LOG = LoggerFactory.getLogger(SamplingThrottler.class); 046 private String id; 047 private long messageFrequency; 048 private long currentMessageCount; 049 private long samplePeriod; 050 private long periodInMillis; 051 private TimeUnit units; 052 private long timeOfLastExchange; 053 private StopProcessor stopper = new StopProcessor(); 054 private final Object calculationLock = new Object(); 055 private SampleStats sampled = new SampleStats(); 056 057 public SamplingThrottler(Processor processor, long messageFrequency) { 058 super(processor); 059 060 if (messageFrequency <= 0) { 061 throw new IllegalArgumentException("A positive value is required for the sampling message frequency"); 062 } 063 this.messageFrequency = messageFrequency; 064 } 065 066 public SamplingThrottler(Processor processor, long samplePeriod, TimeUnit units) { 067 super(processor); 068 069 if (samplePeriod <= 0) { 070 throw new IllegalArgumentException("A positive value is required for the sampling period"); 071 } 072 if (units == null) { 073 throw new IllegalArgumentException("A invalid null value was supplied for the units of the sampling period"); 074 } 075 this.samplePeriod = samplePeriod; 076 this.units = units; 077 this.periodInMillis = units.toMillis(samplePeriod); 078 } 079 080 @Override 081 public String toString() { 082 if (messageFrequency > 0) { 083 return "SamplingThrottler[1 exchange per: " + messageFrequency + " messages received -> " + getProcessor() + "]"; 084 } else { 085 return "SamplingThrottler[1 exchange per: " + samplePeriod + " " + units.toString().toLowerCase(Locale.ENGLISH) + " -> " + getProcessor() + "]"; 086 } 087 } 088 089 public String getId() { 090 return id; 091 } 092 093 public void setId(String id) { 094 this.id = id; 095 } 096 097 public String getTraceLabel() { 098 if (messageFrequency > 0) { 099 return "samplingThrottler[1 exchange per: " + messageFrequency + " messages received]"; 100 } else { 101 return "samplingThrottler[1 exchange per: " + samplePeriod + " " + units.toString().toLowerCase(Locale.ENGLISH) + "]"; 102 } 103 } 104 105 public long getMessageFrequency() { 106 return messageFrequency; 107 } 108 109 public long getSamplePeriod() { 110 return samplePeriod; 111 } 112 113 public TimeUnit getUnits() { 114 return units; 115 } 116 117 @Override 118 public boolean process(Exchange exchange, AsyncCallback callback) { 119 boolean doSend = false; 120 121 synchronized (calculationLock) { 122 123 if (messageFrequency > 0) { 124 currentMessageCount++; 125 if (currentMessageCount % messageFrequency == 0) { 126 doSend = true; 127 } 128 } else { 129 long now = System.currentTimeMillis(); 130 if (now >= timeOfLastExchange + periodInMillis) { 131 doSend = true; 132 if (LOG.isTraceEnabled()) { 133 LOG.trace(sampled.sample()); 134 } 135 timeOfLastExchange = now; 136 } else { 137 if (LOG.isTraceEnabled()) { 138 LOG.trace(sampled.drop()); 139 } 140 } 141 } 142 } 143 144 if (doSend) { 145 // continue routing 146 return processor.process(exchange, callback); 147 } else { 148 // okay to invoke this synchronously as the stopper 149 // will just set a property 150 try { 151 stopper.process(exchange); 152 } catch (Exception e) { 153 exchange.setException(e); 154 } 155 } 156 157 // we are done synchronously 158 callback.done(true); 159 return true; 160 } 161 162 private static class SampleStats { 163 private long droppedThisPeriod; 164 private long totalDropped; 165 private long totalSampled; 166 private long totalThisPeriod; 167 168 String drop() { 169 droppedThisPeriod++; 170 totalThisPeriod++; 171 totalDropped++; 172 return getDroppedLog(); 173 } 174 175 String sample() { 176 totalThisPeriod = 1; // a new period, reset to 1 177 totalSampled++; 178 droppedThisPeriod = 0; 179 return getSampledLog(); 180 } 181 182 String getSampledLog() { 183 return String.format("Sampled %d of %d total exchanges", totalSampled, totalSampled + totalDropped); 184 } 185 186 String getDroppedLog() { 187 return String.format("Dropped %d of %d exchanges in this period, totalling %d dropped of %d exchanges overall.", 188 droppedThisPeriod, totalThisPeriod, totalDropped, totalSampled + totalDropped); 189 } 190 } 191 192}