001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.management.mbean;
018
019import java.util.Date;
020import java.util.HashMap;
021import java.util.Map;
022import java.util.concurrent.atomic.AtomicLong;
023import javax.management.Notification;
024
025import org.apache.camel.Exchange;
026import org.apache.camel.Message;
027import org.apache.camel.Processor;
028import org.apache.camel.Traceable;
029import org.apache.camel.api.management.NotificationSender;
030import org.apache.camel.api.management.NotificationSenderAware;
031import org.apache.camel.model.ProcessorDefinition;
032import org.apache.camel.processor.interceptor.TraceEventHandler;
033import org.apache.camel.processor.interceptor.TraceInterceptor;
034import org.apache.camel.processor.interceptor.Tracer;
035import org.apache.camel.util.MessageHelper;
036
037public final class JMXNotificationTraceEventHandler implements TraceEventHandler, NotificationSenderAware {
038    private static final int MAX_MESSAGE_LENGTH = 60;
039    private final AtomicLong num = new AtomicLong();
040    private final Tracer tracer;
041    private NotificationSender notificationSender;
042
043    public JMXNotificationTraceEventHandler(Tracer tracer) {
044        this.tracer = tracer;
045    }
046
047    public void traceExchangeOut(ProcessorDefinition<?> node, Processor target, TraceInterceptor traceInterceptor, Exchange exchange, Object traceState) throws Exception {
048        // We do nothing here
049    }
050
051    public Object traceExchangeIn(ProcessorDefinition<?> node, Processor target, TraceInterceptor traceInterceptor, Exchange exchange) throws Exception {
052        // Just trace the exchange as usual
053        traceExchange(node, target, traceInterceptor, exchange);
054        return null;
055    }
056
057    public void traceExchange(ProcessorDefinition<?> node, Processor target, TraceInterceptor traceInterceptor, Exchange exchange) throws Exception {
058        if (notificationSender != null && tracer.isJmxTraceNotifications()) {
059            String body = MessageHelper.extractBodyForLogging(exchange.getIn(), "", false, true, tracer.getTraceBodySize());
060            
061            if (body == null) {
062                body = "";
063            }
064            String message = body.substring(0, Math.min(body.length(), MAX_MESSAGE_LENGTH));
065            Map<String, Object> tm = createTraceMessage(node, exchange, body);
066
067            Notification notification = new Notification("TraceNotification", exchange.toString(), num.getAndIncrement(), System.currentTimeMillis(), message);
068            notification.setUserData(tm);
069
070            notificationSender.sendNotification(notification);
071        }
072
073    }
074
075    private Map<String, Object> createTraceMessage(ProcessorDefinition<?> node, Exchange exchange, String body) {
076        Map<String, Object> mi = new HashMap<String, Object>();
077        mi.put("ExchangeId", exchange.getExchangeId());
078        mi.put("EndpointURI", getEndpointUri(node));
079        mi.put("TimeStamp", new Date(System.currentTimeMillis()));
080        mi.put("Body", body);
081
082        Message message = exchange.getIn();
083        Map<String, Object> sHeaders = message.getHeaders();
084        Map<String, Object> sProperties = exchange.getProperties();
085
086        Map<String, String> headers = new HashMap<String, String>();
087        for (String key : sHeaders.keySet()) {
088            headers.put(key, message.getHeader(key, String.class));
089        }
090        mi.put("Headers", headers);
091
092        Map<String, String> properties = new HashMap<String, String>();
093        for (String key : sProperties.keySet()) {
094            properties.put(key, exchange.getProperty(key, String.class));
095        }
096        mi.put("Properties", properties);
097        return mi;
098    }
099
100    private String getEndpointUri(ProcessorDefinition<?> node) {
101        if (node instanceof Traceable) {
102            Traceable tr = (Traceable)node;
103            return tr.getTraceLabel();
104        } else {
105            return node.getLabel();
106        }
107    }
108
109    @Override
110    public void setNotificationSender(NotificationSender sender) {
111        this.notificationSender = sender;
112    }
113
114}