001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.fabric; 018 019 import java.util.ArrayList; 020 import java.util.HashSet; 021 import java.util.List; 022 import java.util.Queue; 023 import java.util.Set; 024 import java.util.concurrent.ArrayBlockingQueue; 025 import java.util.concurrent.atomic.AtomicLong; 026 027 import org.apache.camel.CamelContext; 028 import org.apache.camel.Processor; 029 import org.apache.camel.api.management.ManagedAttribute; 030 import org.apache.camel.api.management.ManagedOperation; 031 import org.apache.camel.api.management.ManagedResource; 032 import org.apache.camel.model.ProcessorDefinition; 033 import org.apache.camel.model.ProcessorDefinitionHelper; 034 import org.apache.camel.model.RouteDefinition; 035 import org.apache.camel.model.RouteDefinitionHelper; 036 import org.apache.camel.spi.InterceptStrategy; 037 import org.apache.camel.spi.NodeIdFactory; 038 import org.apache.camel.support.ServiceSupport; 039 040 /** 041 * 042 */ 043 @ManagedResource(description = "FabricTracer") 044 public class FabricTracer extends ServiceSupport implements InterceptStrategy { 045 046 private final CamelContext camelContext; 047 private boolean enabled; 048 private final AtomicLong traceCounter = new AtomicLong(0); 049 private Queue<FabricTracerEventMessage> queue = new ArrayBlockingQueue<FabricTracerEventMessage>(1000); 050 private int queueSize = 10; 051 // remember the processors we are tracing, which we need later 052 private final Set<ProcessorDefinition<?>> processors = new HashSet<ProcessorDefinition<?>>(); 053 054 public FabricTracer(CamelContext camelContext) { 055 this.camelContext = camelContext; 056 } 057 058 @Override 059 public Processor wrapProcessorInInterceptors(CamelContext context, ProcessorDefinition<?> definition, Processor target, Processor nextTarget) throws Exception { 060 // is this the first output from a route, as we want to know this so we can do special logic in first 061 boolean first = false; 062 RouteDefinition route = ProcessorDefinitionHelper.getRoute(definition); 063 if (route != null && !route.getOutputs().isEmpty()) { 064 first = route.getOutputs().get(0) == definition; 065 } 066 067 processors.add(definition); 068 return new FabricTraceProcessor(queue, target, definition, route, first, this); 069 } 070 071 /** 072 * Whether or not to trace the given processor definition. 073 * 074 * @param definition the processor definition 075 * @return <tt>true</tt> to trace, <tt>false</tt> to skip tracing 076 */ 077 public boolean shouldTrace(ProcessorDefinition<?> definition) { 078 return enabled; 079 } 080 081 @ManagedAttribute(description = "Is tracing enabled") 082 public boolean isEnabled() { 083 return enabled; 084 } 085 086 @ManagedAttribute(description = "Is tracing enabled") 087 public void setEnabled(boolean enabled) { 088 // okay tracer is enabled then force auto assigning ids 089 if (enabled) { 090 forceAutoAssigningIds(); 091 } 092 this.enabled = enabled; 093 } 094 095 @ManagedAttribute(description = "Number of traced messages to keep in FIFO queue") 096 public int getQueueSize() { 097 return queueSize; 098 } 099 100 @ManagedAttribute(description = "Number of traced messages to keep in FIFO queue") 101 public void setQueueSize(int queueSize) { 102 if (queueSize <= 0) { 103 throw new IllegalArgumentException("The queue size must be a positive number, was: " + queueSize); 104 } 105 this.queueSize = queueSize; 106 } 107 108 @ManagedAttribute(description = "Number of total traced messages") 109 public long getTraceCounter() { 110 return traceCounter.get(); 111 } 112 113 @ManagedOperation(description = "Resets the trace counter") 114 public void resetTraceCounter() { 115 traceCounter.set(0); 116 } 117 118 @ManagedOperation(description = "Dumps the traced messages for the given node") 119 public List<FabricTracerEventMessage> dumpTracedMessages(String nodeId) { 120 List<FabricTracerEventMessage> answer = new ArrayList<FabricTracerEventMessage>(); 121 if (nodeId != null) { 122 for (FabricTracerEventMessage message : queue) { 123 if (nodeId.equals(message.getToNode())) { 124 answer.add(message); 125 } 126 } 127 } 128 return answer; 129 } 130 131 @ManagedOperation(description = "Dumps the traced messages for the given node in xml format") 132 public String dumpTracedMessagesAsXml(String nodeId) { 133 List<FabricTracerEventMessage> events = dumpTracedMessages(nodeId); 134 135 StringBuilder sb = new StringBuilder(); 136 sb.append("<").append(FabricTracerEventMessage.ROOT_TAG).append("s>"); 137 for (FabricTracerEventMessage event : events) { 138 sb.append("\n").append(event.toXml()); 139 } 140 sb.append("\n</").append(FabricTracerEventMessage.ROOT_TAG).append("s>"); 141 return sb.toString(); 142 } 143 144 @ManagedOperation(description = "Dumps the traced messages for all nodes") 145 public List<FabricTracerEventMessage> dumpAllTracedMessages() { 146 List<FabricTracerEventMessage> answer = new ArrayList<FabricTracerEventMessage>(); 147 answer.addAll(queue); 148 queue.clear(); 149 return answer; 150 } 151 152 @ManagedOperation(description = "Dumps the traced messages for all nodes in xml format") 153 public String dumpAllTracedMessagesAsXml() { 154 List<FabricTracerEventMessage> events = dumpAllTracedMessages(); 155 156 StringBuilder sb = new StringBuilder(); 157 sb.append("<").append(FabricTracerEventMessage.ROOT_TAG).append("s>"); 158 for (FabricTracerEventMessage event : events) { 159 sb.append("\n").append(event.toXml()); 160 } 161 sb.append("\n</").append(FabricTracerEventMessage.ROOT_TAG).append("s>"); 162 return sb.toString(); 163 } 164 165 long incrementTraceCounter() { 166 return traceCounter.incrementAndGet(); 167 } 168 169 void stopProcessor(FabricTraceProcessor processor, ProcessorDefinition<?> processorDefinition) { 170 this.processors.remove(processorDefinition); 171 } 172 173 @Override 174 protected void doStart() throws Exception { 175 } 176 177 @Override 178 protected void doStop() throws Exception { 179 queue.clear(); 180 } 181 182 @Override 183 protected void doShutdown() throws Exception { 184 queue.clear(); 185 processors.clear(); 186 } 187 188 private void forceAutoAssigningIds() { 189 NodeIdFactory factory = camelContext.getNodeIdFactory(); 190 if (factory != null) { 191 for (ProcessorDefinition<?> child : processors) { 192 // ensure also the children get ids assigned 193 RouteDefinitionHelper.forceAssignIds(camelContext, child); 194 } 195 } 196 } 197 198 }