001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.management; 018 019 import org.apache.camel.AsyncCallback; 020 import org.apache.camel.Exchange; 021 import org.apache.camel.Traceable; 022 import org.apache.camel.api.management.PerformanceCounter; 023 import org.apache.camel.management.mbean.ManagedPerformanceCounter; 024 import org.apache.camel.processor.DelegateAsyncProcessor; 025 import org.apache.camel.util.StopWatch; 026 import org.slf4j.Logger; 027 import org.slf4j.LoggerFactory; 028 029 /** 030 * JMX enabled processor that uses the {@link org.apache.camel.management.mbean.ManagedCounter} for instrumenting 031 * processing of exchanges. 032 * 033 * @version 034 */ 035 public class InstrumentationProcessor extends DelegateAsyncProcessor implements Traceable { 036 037 private static final transient Logger LOG = LoggerFactory.getLogger(InstrumentationProcessor.class); 038 private PerformanceCounter counter; 039 private String type; 040 041 public InstrumentationProcessor() { 042 } 043 044 public InstrumentationProcessor(PerformanceCounter counter) { 045 this.counter = counter; 046 } 047 048 @Override 049 public String toString() { 050 return "Instrumentation" + (type != null ? ":" + type : "") + "[" + processor + "]"; 051 } 052 053 public void setCounter(Object counter) { 054 ManagedPerformanceCounter mpc = null; 055 if (counter instanceof ManagedPerformanceCounter) { 056 mpc = (ManagedPerformanceCounter) counter; 057 } 058 059 if (this.counter instanceof DelegatePerformanceCounter) { 060 ((DelegatePerformanceCounter) this.counter).setCounter(mpc); 061 } else if (mpc != null) { 062 this.counter = mpc; 063 } else if (counter instanceof PerformanceCounter) { 064 this.counter = (PerformanceCounter) counter; 065 } 066 } 067 068 @Override 069 public boolean process(final Exchange exchange, final AsyncCallback callback) { 070 // only record time if stats is enabled 071 final StopWatch watch = (counter != null && counter.isStatisticsEnabled()) ? new StopWatch() : null; 072 073 return super.process(exchange, new AsyncCallback() { 074 public void done(boolean doneSync) { 075 try { 076 // record end time 077 if (watch != null) { 078 recordTime(exchange, watch.stop()); 079 } 080 } finally { 081 // and let the original callback know we are done as well 082 callback.done(doneSync); 083 } 084 } 085 086 @Override 087 public String toString() { 088 return InstrumentationProcessor.this.toString(); 089 } 090 }); 091 } 092 093 protected void recordTime(Exchange exchange, long duration) { 094 if (LOG.isTraceEnabled()) { 095 LOG.trace("{}Recording duration: {} millis for exchange: {}", new Object[]{type != null ? type + ": " : "", duration, exchange}); 096 } 097 098 if (!exchange.isFailed() && exchange.getException() == null) { 099 counter.completedExchange(exchange, duration); 100 } else { 101 counter.failedExchange(exchange); 102 } 103 } 104 105 public String getType() { 106 return type; 107 } 108 109 public void setType(String type) { 110 this.type = type; 111 } 112 113 @Override 114 public String getTraceLabel() { 115 if (processor instanceof Traceable) { 116 return ((Traceable)processor).getTraceLabel(); 117 } else { 118 return ""; 119 } 120 121 } 122 }