001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.text.NumberFormat;
020    import java.util.concurrent.ScheduledExecutorService;
021    import java.util.concurrent.TimeUnit;
022    import java.util.concurrent.atomic.AtomicInteger;
023    
024    import org.apache.camel.CamelContext;
025    import org.apache.camel.Exchange;
026    import org.apache.camel.Processor;
027    import org.apache.camel.support.ServiceSupport;
028    import org.apache.camel.util.CamelLogger;
029    import org.apache.camel.util.ObjectHelper;
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    /**
034     * A logger for logging message throughput.
035     *
036     * @version 
037     */
038    public class ThroughputLogger extends ServiceSupport implements Processor {
039        private static final Logger LOG = LoggerFactory.getLogger(ThroughputLogger.class);
040    
041        private final AtomicInteger receivedCounter = new AtomicInteger();
042        private NumberFormat numberFormat = NumberFormat.getNumberInstance();
043        private long groupReceivedCount;
044        private boolean groupActiveOnly;
045        private Integer groupSize;
046        private long groupDelay = 1000;
047        private Long groupInterval;
048        private long startTime;
049        private long groupStartTime;
050        private String action = "Received";
051        private CamelContext camelContext;
052        private ScheduledExecutorService logSchedulerService;
053        private CamelLogger log;
054    
055        public ThroughputLogger(CamelLogger log) {
056            this.log = log;
057        }
058    
059        public ThroughputLogger(CamelLogger log, Integer groupSize) {
060            this(log);
061            setGroupSize(groupSize);
062        }
063    
064        public ThroughputLogger(CamelLogger log, CamelContext camelContext, Long groupInterval, Long groupDelay, Boolean groupActiveOnly) {
065            this(log);
066            this.camelContext = camelContext;
067            setGroupInterval(groupInterval);
068            setGroupActiveOnly(groupActiveOnly);
069            if (groupDelay != null) {
070                setGroupDelay(groupDelay);
071            }
072        }
073    
074        @Override
075        public void process(Exchange exchange) {
076            if (startTime == 0) {
077                startTime = System.currentTimeMillis();
078            }
079            int receivedCount = receivedCounter.incrementAndGet();
080    
081            //only process if groupSize is set...otherwise we're in groupInterval mode
082            if (groupSize != null) {
083                if (receivedCount % groupSize == 0) {
084                    log.log(createLogMessage(exchange, receivedCount));
085                }
086            }
087        }
088    
089        public Integer getGroupSize() {
090            return groupSize;
091        }
092    
093        public void setGroupSize(Integer groupSize) {
094            if (groupSize == null || groupSize <= 0) {
095                throw new IllegalArgumentException("groupSize must be positive, was: " + groupSize);
096            }
097            this.groupSize = groupSize;
098        }
099    
100        public Long getGroupInterval() {
101            return groupInterval;
102        }
103    
104        public void setGroupInterval(Long groupInterval) {
105            if (groupInterval == null || groupInterval <= 0) {
106                throw new IllegalArgumentException("groupInterval must be positive, was: " + groupInterval);
107            }
108            this.groupInterval = groupInterval;
109        }
110    
111        public long getGroupDelay() {
112            return groupDelay;
113        }
114    
115        public void setGroupDelay(long groupDelay) {
116            this.groupDelay = groupDelay;
117        }
118    
119        public boolean getGroupActiveOnly() {
120            return groupActiveOnly;
121        }
122    
123        private void setGroupActiveOnly(boolean groupActiveOnly) {
124            this.groupActiveOnly = groupActiveOnly;
125        }
126    
127        public NumberFormat getNumberFormat() {
128            return numberFormat;
129        }
130    
131        public void setNumberFormat(NumberFormat numberFormat) {
132            this.numberFormat = numberFormat;
133        }
134    
135        public String getAction() {
136            return action;
137        }
138    
139        public void setAction(String action) {
140            this.action = action;
141        }
142        
143        @Override
144        public void doStart() throws Exception {
145            // if an interval was specified, create a background thread
146            if (groupInterval != null) {
147                ObjectHelper.notNull(camelContext, "CamelContext", this);
148    
149                logSchedulerService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "ThroughputLogger", 1);
150                Runnable scheduledLogTask = new ScheduledLogTask();
151                LOG.info("Scheduling throughput log to run every " + groupInterval + " millis.");
152                // must use fixed rate to have it trigger at every X interval
153                logSchedulerService.scheduleAtFixedRate(scheduledLogTask, groupDelay, groupInterval, TimeUnit.MILLISECONDS);
154            }
155        }
156    
157        @Override
158        public void doStop() throws Exception {
159            if (logSchedulerService != null) {
160                camelContext.getExecutorServiceManager().shutdownNow(logSchedulerService);
161                logSchedulerService = null;
162            }
163        }
164    
165        protected String createLogMessage(Exchange exchange, int receivedCount) {
166            long time = System.currentTimeMillis();
167            if (groupStartTime == 0) {
168                groupStartTime = startTime;
169            }
170    
171            double rate = messagesPerSecond(groupSize, groupStartTime, time);
172            double average = messagesPerSecond(receivedCount, startTime, time);
173    
174            long duration = time - groupStartTime;
175            groupStartTime = time;
176    
177            return getAction() + ": " + receivedCount + " messages so far. Last group took: " + duration
178                    + " millis which is: " + numberFormat.format(rate)
179                    + " messages per second. average: " + numberFormat.format(average);
180        }
181    
182        /**
183         * Background task that logs throughput stats.
184         */
185        private final class ScheduledLogTask implements Runnable {
186    
187            public void run() {
188                // only run if CamelContext has been fully started
189                if (!camelContext.getStatus().isStarted()) {
190                    LOG.trace("ThroughputLogger cannot start because CamelContext({}) has not been started yet", camelContext.getName());
191                    return;
192                }
193    
194                createGroupIntervalLogMessage();
195            }
196        }
197    
198        protected void createGroupIntervalLogMessage() {
199            
200            // this indicates that no messages have been received yet...don't log yet
201            if (startTime == 0) {
202                return;
203            }
204            
205            int receivedCount = receivedCounter.get();
206    
207            // if configured, hide log messages when no new messages have been received
208            if (groupActiveOnly && receivedCount == groupReceivedCount) {
209                return;
210            }
211    
212            long time = System.currentTimeMillis();
213            if (groupStartTime == 0) {
214                groupStartTime = startTime;
215            }
216    
217            long duration = time - groupStartTime;
218            long currentCount = receivedCount - groupReceivedCount;
219            double rate = messagesPerSecond(currentCount, groupStartTime, time);
220            double average = messagesPerSecond(receivedCount, startTime, time);
221    
222            groupStartTime = time;
223            groupReceivedCount = receivedCount;
224    
225            String message = getAction() + ": " + currentCount + " new messages, with total " + receivedCount + " so far. Last group took: " + duration
226                    + " millis which is: " + numberFormat.format(rate)
227                    + " messages per second. average: " + numberFormat.format(average);
228            log.log(message);
229        }
230    
231        protected double messagesPerSecond(long messageCount, long startTime, long endTime) {
232            // timeOneMessage = elapsed / messageCount
233            // messagePerSend = 1000 / timeOneMessage
234            double rate = messageCount * 1000.0;
235            rate /= endTime - startTime;
236            return rate;
237        }
238    
239    }