001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.management;
018
019import org.apache.camel.AsyncCallback;
020import org.apache.camel.Exchange;
021import org.apache.camel.api.management.PerformanceCounter;
022import org.apache.camel.management.mbean.ManagedPerformanceCounter;
023import org.apache.camel.processor.DelegateAsyncProcessor;
024import org.apache.camel.util.StopWatch;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028/**
029 * JMX enabled processor that uses the {@link org.apache.camel.management.mbean.ManagedCounter} for instrumenting
030 * processing of exchanges.
031 *
032 * @version 
033 */
034public class InstrumentationProcessor extends DelegateAsyncProcessor {
035
036    private static final Logger LOG = LoggerFactory.getLogger(InstrumentationProcessor.class);
037    private PerformanceCounter counter;
038    private String type;
039
040    public InstrumentationProcessor() {
041    }
042
043    public InstrumentationProcessor(PerformanceCounter counter) {
044        this.counter = counter;
045    }
046
047    @Override
048    public String toString() {
049        return "Instrumentation" + (type != null ? ":" + type : "") + "[" + processor + "]";
050    }
051
052    public void setCounter(Object counter) {
053        ManagedPerformanceCounter mpc = null;
054        if (counter instanceof ManagedPerformanceCounter) {
055            mpc = (ManagedPerformanceCounter) counter;
056        }
057
058        if (this.counter instanceof DelegatePerformanceCounter) {
059            ((DelegatePerformanceCounter) this.counter).setCounter(mpc);
060        } else if (mpc != null) {
061            this.counter = mpc;
062        } else if (counter instanceof PerformanceCounter) {
063            this.counter = (PerformanceCounter) counter;
064        }
065    }
066
067    @Override
068    public boolean process(final Exchange exchange, final AsyncCallback callback) {
069        // only record time if stats is enabled
070        final StopWatch watch = (counter != null && counter.isStatisticsEnabled()) ? new StopWatch() : null;
071
072        // mark beginning to process the exchange
073        if (watch != null) {
074            beginTime(exchange);
075        }
076
077        return processor.process(exchange, new AsyncCallback() {
078            public void done(boolean doneSync) {
079                try {
080                    // record end time
081                    if (watch != null) {
082                        recordTime(exchange, watch.stop());
083                    }
084                } finally {
085                    // and let the original callback know we are done as well
086                    callback.done(doneSync);
087                }
088            }
089
090            @Override
091            public String toString() {
092                return InstrumentationProcessor.this.toString();
093            }
094        });
095    }
096
097    protected void beginTime(Exchange exchange) {
098        counter.processExchange(exchange);
099    }
100
101    protected void recordTime(Exchange exchange, long duration) {
102        if (LOG.isTraceEnabled()) {
103            LOG.trace("{}Recording duration: {} millis for exchange: {}", new Object[]{type != null ? type + ": " : "", duration, exchange});
104        }
105
106        if (!exchange.isFailed() && exchange.getException() == null) {
107            counter.completedExchange(exchange, duration);
108        } else {
109            counter.failedExchange(exchange);
110        }
111    }
112
113    public String getType() {
114        return type;
115    }
116
117    public void setType(String type) {
118        this.type = type;
119    }
120}