001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.fabric; 018 019 import java.util.Date; 020 import java.util.Queue; 021 022 import org.apache.camel.AsyncCallback; 023 import org.apache.camel.Exchange; 024 import org.apache.camel.Processor; 025 import org.apache.camel.model.ProcessorDefinition; 026 import org.apache.camel.processor.DelegateAsyncProcessor; 027 import org.apache.camel.util.MessageHelper; 028 029 /** 030 * 031 */ 032 public class FabricTraceProcessor extends DelegateAsyncProcessor { 033 034 private final Queue<FabricTracerEventMessage> queue; 035 private final FabricTracer tracer; 036 private final ProcessorDefinition<?> processorDefinition; 037 private final ProcessorDefinition<?> routeDefinition; 038 private final boolean first; 039 040 public FabricTraceProcessor(Queue<FabricTracerEventMessage> queue, Processor processor, 041 ProcessorDefinition<?> processorDefinition, 042 ProcessorDefinition<?> routeDefinition, boolean first, 043 FabricTracer tracer) { 044 super(processor); 045 this.queue = queue; 046 this.processorDefinition = processorDefinition; 047 this.routeDefinition = routeDefinition; 048 this.first = first; 049 this.tracer = tracer; 050 } 051 052 @Override 053 public boolean process(Exchange exchange, AsyncCallback callback) { 054 try { 055 if (tracer.shouldTrace(processorDefinition)) { 056 // ensure there is space on the queue 057 int drain = queue.size() - tracer.getQueueSize(); 058 if (drain > 0) { 059 for (int i = 0; i < drain; i++) { 060 queue.poll(); 061 } 062 } 063 064 Date timestamp = new Date(); 065 String toNode = processorDefinition.getId(); 066 String exchangeId = exchange.getExchangeId(); 067 String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn()); 068 069 // if first we should add a pseudo trace message as well, so we have a starting message as well 070 if (first) { 071 Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, timestamp, Date.class); 072 String routeId = routeDefinition.getId(); 073 FabricTracerEventMessage pseudo = new FabricTracerEventMessage(tracer.incrementTraceCounter(), created, routeId, exchangeId, messageAsXml); 074 queue.add(pseudo); 075 } 076 FabricTracerEventMessage event = new FabricTracerEventMessage(tracer.incrementTraceCounter(), timestamp, toNode, exchangeId, messageAsXml); 077 queue.add(event); 078 } 079 080 // invoke processor 081 return super.process(exchange, callback); 082 083 } catch (Exception e) { 084 exchange.setException(e); 085 callback.done(true); 086 return true; 087 } 088 } 089 090 public void stop() throws Exception { 091 super.stop(); 092 queue.clear(); 093 // notify tracer we are stopping to not leak resources 094 tracer.stopProcessor(this, processorDefinition); 095 } 096 097 @Override 098 public String toString() { 099 return processor.toString(); 100 } 101 102 }