001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.fabric;
018    
019    import java.util.ArrayList;
020    import java.util.HashSet;
021    import java.util.List;
022    import java.util.Queue;
023    import java.util.Set;
024    import java.util.concurrent.ArrayBlockingQueue;
025    import java.util.concurrent.atomic.AtomicLong;
026    
027    import org.apache.camel.CamelContext;
028    import org.apache.camel.Processor;
029    import org.apache.camel.api.management.ManagedAttribute;
030    import org.apache.camel.api.management.ManagedOperation;
031    import org.apache.camel.api.management.ManagedResource;
032    import org.apache.camel.model.ProcessorDefinition;
033    import org.apache.camel.model.ProcessorDefinitionHelper;
034    import org.apache.camel.model.RouteDefinition;
035    import org.apache.camel.model.RouteDefinitionHelper;
036    import org.apache.camel.spi.InterceptStrategy;
037    import org.apache.camel.spi.NodeIdFactory;
038    import org.apache.camel.support.ServiceSupport;
039    
040    /**
041     *
042     */
043    @ManagedResource(description = "FabricTracer")
044    public class FabricTracer extends ServiceSupport implements InterceptStrategy {
045    
046        private final CamelContext camelContext;
047        private boolean enabled;
048        private final AtomicLong traceCounter = new AtomicLong(0);
049        private Queue<FabricTracerEventMessage> queue =  new ArrayBlockingQueue<FabricTracerEventMessage>(1000);
050        private int queueSize = 10;
051        // remember the processors we are tracing, which we need later
052        private final Set<ProcessorDefinition<?>> processors = new HashSet<ProcessorDefinition<?>>();
053    
054        public FabricTracer(CamelContext camelContext) {
055            this.camelContext = camelContext;
056        }
057    
058        @Override
059        public Processor wrapProcessorInInterceptors(CamelContext context, ProcessorDefinition<?> definition, Processor target, Processor nextTarget) throws Exception {
060            // is this the first output from a route, as we want to know this so we can do special logic in first
061            boolean first = false;
062            RouteDefinition route = ProcessorDefinitionHelper.getRoute(definition);
063            if (route != null && !route.getOutputs().isEmpty()) {
064                first = route.getOutputs().get(0) == definition;
065            }
066    
067            processors.add(definition);
068            return new FabricTraceProcessor(queue, target, definition, route, first, this);
069        }
070    
071        /**
072         * Whether or not to trace the given processor definition.
073         *
074         * @param definition the processor definition
075         * @return <tt>true</tt> to trace, <tt>false</tt> to skip tracing
076         */
077        public boolean shouldTrace(ProcessorDefinition<?> definition) {
078            return enabled;
079        }
080    
081        @ManagedAttribute(description = "Is tracing enabled")
082        public boolean isEnabled() {
083            return enabled;
084        }
085    
086        @ManagedAttribute(description = "Is tracing enabled")
087        public void setEnabled(boolean enabled) {
088            // okay tracer is enabled then force auto assigning ids
089            if (enabled) {
090                forceAutoAssigningIds();
091            }
092            this.enabled = enabled;
093        }
094    
095        @ManagedAttribute(description = "Number of traced messages to keep in FIFO queue")
096        public int getQueueSize() {
097            return queueSize;
098        }
099    
100        @ManagedAttribute(description = "Number of traced messages to keep in FIFO queue")
101        public void setQueueSize(int queueSize) {
102            if (queueSize <= 0) {
103                throw new IllegalArgumentException("The queue size must be a positive number, was: " + queueSize);
104            }
105            this.queueSize = queueSize;
106        }
107    
108        @ManagedAttribute(description = "Number of total traced messages")
109        public long getTraceCounter() {
110            return traceCounter.get();
111        }
112    
113        @ManagedOperation(description = "Resets the trace counter")
114        public void resetTraceCounter() {
115            traceCounter.set(0);
116        }
117    
118        @ManagedOperation(description = "Dumps the traced messages for the given node")
119        public List<FabricTracerEventMessage> dumpTracedMessages(String nodeId) {
120            List<FabricTracerEventMessage> answer = new ArrayList<FabricTracerEventMessage>();
121            if (nodeId != null) {
122                for (FabricTracerEventMessage message : queue) {
123                    if (nodeId.equals(message.getToNode())) {
124                        answer.add(message);
125                    }
126                }
127            }
128            return answer;
129        }
130    
131        @ManagedOperation(description = "Dumps the traced messages for the given node in xml format")
132        public String dumpTracedMessagesAsXml(String nodeId) {
133            List<FabricTracerEventMessage> events = dumpTracedMessages(nodeId);
134    
135            StringBuilder sb = new StringBuilder();
136            sb.append("<").append(FabricTracerEventMessage.ROOT_TAG).append("s>");
137            for (FabricTracerEventMessage event : events) {
138                sb.append("\n").append(event.toXml());
139            }
140            sb.append("\n</").append(FabricTracerEventMessage.ROOT_TAG).append("s>");
141            return sb.toString();
142        }
143    
144        @ManagedOperation(description = "Dumps the traced messages for all nodes")
145        public List<FabricTracerEventMessage> dumpAllTracedMessages() {
146            List<FabricTracerEventMessage> answer = new ArrayList<FabricTracerEventMessage>();
147            answer.addAll(queue);
148            queue.clear();
149            return answer;
150        }
151    
152        @ManagedOperation(description = "Dumps the traced messages for all nodes in xml format")
153        public String dumpAllTracedMessagesAsXml() {
154            List<FabricTracerEventMessage> events = dumpAllTracedMessages();
155    
156            StringBuilder sb = new StringBuilder();
157            sb.append("<").append(FabricTracerEventMessage.ROOT_TAG).append("s>");
158            for (FabricTracerEventMessage event : events) {
159                sb.append("\n").append(event.toXml());
160            }
161            sb.append("\n</").append(FabricTracerEventMessage.ROOT_TAG).append("s>");
162            return sb.toString();
163        }
164    
165        long incrementTraceCounter() {
166            return traceCounter.incrementAndGet();
167        }
168        
169        void stopProcessor(FabricTraceProcessor processor, ProcessorDefinition<?> processorDefinition) {
170            this.processors.remove(processorDefinition);
171        }
172    
173        @Override
174        protected void doStart() throws Exception {
175        }
176    
177        @Override
178        protected void doStop() throws Exception {
179            queue.clear();
180        }
181    
182        @Override
183        protected void doShutdown() throws Exception {
184            queue.clear();
185            processors.clear();
186        }
187    
188        private void forceAutoAssigningIds() {
189            NodeIdFactory factory = camelContext.getNodeIdFactory();
190            if (factory != null) {
191                for (ProcessorDefinition<?> child : processors) {
192                    // ensure also the children get ids assigned
193                    RouteDefinitionHelper.forceAssignIds(camelContext, child);
194                }
195            }
196        }
197    
198    }