001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.text.NumberFormat; 020 import java.util.concurrent.ScheduledExecutorService; 021 import java.util.concurrent.TimeUnit; 022 import java.util.concurrent.atomic.AtomicInteger; 023 024 import org.apache.camel.CamelContext; 025 import org.apache.camel.Exchange; 026 import org.apache.camel.Processor; 027 import org.apache.camel.support.ServiceSupport; 028 import org.apache.camel.util.CamelLogger; 029 import org.apache.camel.util.ObjectHelper; 030 import org.slf4j.Logger; 031 import org.slf4j.LoggerFactory; 032 033 /** 034 * A logger for logging message throughput. 035 * 036 * @version 037 */ 038 public class ThroughputLogger extends ServiceSupport implements Processor { 039 private static final Logger LOG = LoggerFactory.getLogger(ThroughputLogger.class); 040 041 private final AtomicInteger receivedCounter = new AtomicInteger(); 042 private NumberFormat numberFormat = NumberFormat.getNumberInstance(); 043 private long groupReceivedCount; 044 private boolean groupActiveOnly; 045 private Integer groupSize; 046 private long groupDelay = 1000; 047 private Long groupInterval; 048 private long startTime; 049 private long groupStartTime; 050 private String action = "Received"; 051 private CamelContext camelContext; 052 private ScheduledExecutorService logSchedulerService; 053 private CamelLogger log; 054 055 public ThroughputLogger(CamelLogger log) { 056 this.log = log; 057 } 058 059 public ThroughputLogger(CamelLogger log, Integer groupSize) { 060 this(log); 061 setGroupSize(groupSize); 062 } 063 064 public ThroughputLogger(CamelLogger log, CamelContext camelContext, Long groupInterval, Long groupDelay, Boolean groupActiveOnly) { 065 this(log); 066 this.camelContext = camelContext; 067 setGroupInterval(groupInterval); 068 setGroupActiveOnly(groupActiveOnly); 069 if (groupDelay != null) { 070 setGroupDelay(groupDelay); 071 } 072 } 073 074 @Override 075 public void process(Exchange exchange) { 076 if (startTime == 0) { 077 startTime = System.currentTimeMillis(); 078 } 079 int receivedCount = receivedCounter.incrementAndGet(); 080 081 //only process if groupSize is set...otherwise we're in groupInterval mode 082 if (groupSize != null) { 083 if (receivedCount % groupSize == 0) { 084 log.log(createLogMessage(exchange, receivedCount)); 085 } 086 } 087 } 088 089 public Integer getGroupSize() { 090 return groupSize; 091 } 092 093 public void setGroupSize(Integer groupSize) { 094 if (groupSize == null || groupSize <= 0) { 095 throw new IllegalArgumentException("groupSize must be positive, was: " + groupSize); 096 } 097 this.groupSize = groupSize; 098 } 099 100 public Long getGroupInterval() { 101 return groupInterval; 102 } 103 104 public void setGroupInterval(Long groupInterval) { 105 if (groupInterval == null || groupInterval <= 0) { 106 throw new IllegalArgumentException("groupInterval must be positive, was: " + groupInterval); 107 } 108 this.groupInterval = groupInterval; 109 } 110 111 public long getGroupDelay() { 112 return groupDelay; 113 } 114 115 public void setGroupDelay(long groupDelay) { 116 this.groupDelay = groupDelay; 117 } 118 119 public boolean getGroupActiveOnly() { 120 return groupActiveOnly; 121 } 122 123 private void setGroupActiveOnly(boolean groupActiveOnly) { 124 this.groupActiveOnly = groupActiveOnly; 125 } 126 127 public NumberFormat getNumberFormat() { 128 return numberFormat; 129 } 130 131 public void setNumberFormat(NumberFormat numberFormat) { 132 this.numberFormat = numberFormat; 133 } 134 135 public String getAction() { 136 return action; 137 } 138 139 public void setAction(String action) { 140 this.action = action; 141 } 142 143 @Override 144 public void doStart() throws Exception { 145 // if an interval was specified, create a background thread 146 if (groupInterval != null) { 147 ObjectHelper.notNull(camelContext, "CamelContext", this); 148 149 logSchedulerService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "ThroughputLogger", 1); 150 Runnable scheduledLogTask = new ScheduledLogTask(); 151 LOG.info("Scheduling throughput log to run every " + groupInterval + " millis."); 152 // must use fixed rate to have it trigger at every X interval 153 logSchedulerService.scheduleAtFixedRate(scheduledLogTask, groupDelay, groupInterval, TimeUnit.MILLISECONDS); 154 } 155 } 156 157 @Override 158 public void doStop() throws Exception { 159 if (logSchedulerService != null) { 160 camelContext.getExecutorServiceManager().shutdownNow(logSchedulerService); 161 logSchedulerService = null; 162 } 163 } 164 165 protected String createLogMessage(Exchange exchange, int receivedCount) { 166 long time = System.currentTimeMillis(); 167 if (groupStartTime == 0) { 168 groupStartTime = startTime; 169 } 170 171 double rate = messagesPerSecond(groupSize, groupStartTime, time); 172 double average = messagesPerSecond(receivedCount, startTime, time); 173 174 long duration = time - groupStartTime; 175 groupStartTime = time; 176 177 return getAction() + ": " + receivedCount + " messages so far. Last group took: " + duration 178 + " millis which is: " + numberFormat.format(rate) 179 + " messages per second. average: " + numberFormat.format(average); 180 } 181 182 /** 183 * Background task that logs throughput stats. 184 */ 185 private final class ScheduledLogTask implements Runnable { 186 187 public void run() { 188 // only run if CamelContext has been fully started 189 if (!camelContext.getStatus().isStarted()) { 190 LOG.trace("ThroughputLogger cannot start because CamelContext({}) has not been started yet", camelContext.getName()); 191 return; 192 } 193 194 createGroupIntervalLogMessage(); 195 } 196 } 197 198 protected void createGroupIntervalLogMessage() { 199 200 // this indicates that no messages have been received yet...don't log yet 201 if (startTime == 0) { 202 return; 203 } 204 205 int receivedCount = receivedCounter.get(); 206 207 // if configured, hide log messages when no new messages have been received 208 if (groupActiveOnly && receivedCount == groupReceivedCount) { 209 return; 210 } 211 212 long time = System.currentTimeMillis(); 213 if (groupStartTime == 0) { 214 groupStartTime = startTime; 215 } 216 217 long duration = time - groupStartTime; 218 long currentCount = receivedCount - groupReceivedCount; 219 double rate = messagesPerSecond(currentCount, groupStartTime, time); 220 double average = messagesPerSecond(receivedCount, startTime, time); 221 222 groupStartTime = time; 223 groupReceivedCount = receivedCount; 224 225 String message = getAction() + ": " + currentCount + " new messages, with total " + receivedCount + " so far. Last group took: " + duration 226 + " millis which is: " + numberFormat.format(rate) 227 + " messages per second. average: " + numberFormat.format(average); 228 log.log(message); 229 } 230 231 protected double messagesPerSecond(long messageCount, long startTime, long endTime) { 232 // timeOneMessage = elapsed / messageCount 233 // messagePerSend = 1000 / timeOneMessage 234 double rate = messageCount * 1000.0; 235 rate /= endTime - startTime; 236 return rate; 237 } 238 239 }