001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.management;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.ThreadPoolExecutor;
028import javax.management.JMException;
029import javax.management.MalformedObjectNameException;
030import javax.management.ObjectName;
031
032import org.apache.camel.CamelContext;
033import org.apache.camel.CamelContextAware;
034import org.apache.camel.Channel;
035import org.apache.camel.Component;
036import org.apache.camel.Consumer;
037import org.apache.camel.Endpoint;
038import org.apache.camel.ErrorHandlerFactory;
039import org.apache.camel.ManagementStatisticsLevel;
040import org.apache.camel.NonManagedService;
041import org.apache.camel.Processor;
042import org.apache.camel.Producer;
043import org.apache.camel.Route;
044import org.apache.camel.Service;
045import org.apache.camel.StartupListener;
046import org.apache.camel.TimerListener;
047import org.apache.camel.VetoCamelContextStartException;
048import org.apache.camel.api.management.PerformanceCounter;
049import org.apache.camel.impl.ConsumerCache;
050import org.apache.camel.impl.DefaultCamelContext;
051import org.apache.camel.impl.DefaultEndpointRegistry;
052import org.apache.camel.impl.EventDrivenConsumerRoute;
053import org.apache.camel.impl.ProducerCache;
054import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
055import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
056import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager;
057import org.apache.camel.management.mbean.ManagedBacklogDebugger;
058import org.apache.camel.management.mbean.ManagedBacklogTracer;
059import org.apache.camel.management.mbean.ManagedCamelContext;
060import org.apache.camel.management.mbean.ManagedConsumerCache;
061import org.apache.camel.management.mbean.ManagedEndpoint;
062import org.apache.camel.management.mbean.ManagedEndpointRegistry;
063import org.apache.camel.management.mbean.ManagedInflightRepository;
064import org.apache.camel.management.mbean.ManagedProducerCache;
065import org.apache.camel.management.mbean.ManagedRestRegistry;
066import org.apache.camel.management.mbean.ManagedRoute;
067import org.apache.camel.management.mbean.ManagedRuntimeEndpointRegistry;
068import org.apache.camel.management.mbean.ManagedService;
069import org.apache.camel.management.mbean.ManagedStreamCachingStrategy;
070import org.apache.camel.management.mbean.ManagedThrottlingExceptionRoutePolicy;
071import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy;
072import org.apache.camel.management.mbean.ManagedTracer;
073import org.apache.camel.management.mbean.ManagedTypeConverterRegistry;
074import org.apache.camel.model.AOPDefinition;
075import org.apache.camel.model.InterceptDefinition;
076import org.apache.camel.model.OnCompletionDefinition;
077import org.apache.camel.model.OnExceptionDefinition;
078import org.apache.camel.model.PolicyDefinition;
079import org.apache.camel.model.ProcessorDefinition;
080import org.apache.camel.model.ProcessorDefinitionHelper;
081import org.apache.camel.model.RouteDefinition;
082import org.apache.camel.processor.CamelInternalProcessor;
083import org.apache.camel.processor.interceptor.BacklogDebugger;
084import org.apache.camel.processor.interceptor.BacklogTracer;
085import org.apache.camel.processor.interceptor.Tracer;
086import org.apache.camel.spi.AsyncProcessorAwaitManager;
087import org.apache.camel.spi.DataFormat;
088import org.apache.camel.spi.EventNotifier;
089import org.apache.camel.spi.InflightRepository;
090import org.apache.camel.spi.LifecycleStrategy;
091import org.apache.camel.spi.ManagementAgent;
092import org.apache.camel.spi.ManagementAware;
093import org.apache.camel.spi.ManagementNameStrategy;
094import org.apache.camel.spi.ManagementObjectStrategy;
095import org.apache.camel.spi.ManagementStrategy;
096import org.apache.camel.spi.RestRegistry;
097import org.apache.camel.spi.RouteContext;
098import org.apache.camel.spi.RuntimeEndpointRegistry;
099import org.apache.camel.spi.StreamCachingStrategy;
100import org.apache.camel.spi.TypeConverterRegistry;
101import org.apache.camel.spi.UnitOfWork;
102import org.apache.camel.support.ServiceSupport;
103import org.apache.camel.support.TimerListenerManager;
104import org.apache.camel.util.KeyValueHolder;
105import org.apache.camel.util.ObjectHelper;
106import org.slf4j.Logger;
107import org.slf4j.LoggerFactory;
108
109/**
110 * Default JMX managed lifecycle strategy that registered objects using the configured
111 * {@link org.apache.camel.spi.ManagementStrategy}.
112 *
113 * @see org.apache.camel.spi.ManagementStrategy
114 * @version 
115 */
116@SuppressWarnings("deprecation")
117public class DefaultManagementLifecycleStrategy extends ServiceSupport implements LifecycleStrategy, CamelContextAware {
118
119    private static final Logger LOG = LoggerFactory.getLogger(DefaultManagementLifecycleStrategy.class);
120    // the wrapped processors is for performance counters, which are in use for the created routes
121    // when a route is removed, we should remove the associated processors from this map
122    private final Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors =
123            new HashMap<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>>();
124    private final List<PreRegisterService> preServices = new ArrayList<PreRegisterService>();
125    private final TimerListenerManager loadTimer = new ManagedLoadTimer();
126    private final TimerListenerManagerStartupListener loadTimerStartupListener = new TimerListenerManagerStartupListener();
127    private volatile CamelContext camelContext;
128    private volatile ManagedCamelContext camelContextMBean;
129    private volatile boolean initialized;
130    private final Set<String> knowRouteIds = new HashSet<String>();
131    private final Map<Tracer, ManagedTracer> managedTracers = new HashMap<Tracer, ManagedTracer>();
132    private final Map<BacklogTracer, ManagedBacklogTracer> managedBacklogTracers = new HashMap<BacklogTracer, ManagedBacklogTracer>();
133    private final Map<BacklogDebugger, ManagedBacklogDebugger> managedBacklogDebuggers = new HashMap<BacklogDebugger, ManagedBacklogDebugger>();
134    private final Map<ThreadPoolExecutor, Object> managedThreadPools = new HashMap<ThreadPoolExecutor, Object>();
135
136    public DefaultManagementLifecycleStrategy() {
137    }
138
139    public DefaultManagementLifecycleStrategy(CamelContext camelContext) {
140        this.camelContext = camelContext;
141    }
142
143    public CamelContext getCamelContext() {
144        return camelContext;
145    }
146
147    public void setCamelContext(CamelContext camelContext) {
148        this.camelContext = camelContext;
149    }
150
151    public void onContextStart(CamelContext context) throws VetoCamelContextStartException {
152        Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
153
154        String name = context.getName();
155        String managementName = context.getManagementNameStrategy().getName();
156
157        try {
158            boolean done = false;
159            while (!done) {
160                ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(managementName, name);
161                boolean exists = getManagementStrategy().isManaged(mc, on);
162                if (!exists) {
163                    done = true;
164                } else {
165                    // okay there exists already a CamelContext with this name, we can try to fix it by finding a free name
166                    boolean fixed = false;
167                    // if we use the default name strategy we can find a free name to use
168                    String newName = findFreeName(mc, context.getManagementNameStrategy(), name);
169                    if (newName != null) {
170                        // use this as the fixed name
171                        fixed = true;
172                        done = true;
173                        managementName = newName;
174                    }
175                    // we could not fix it so veto starting camel
176                    if (!fixed) {
177                        throw new VetoCamelContextStartException("CamelContext (" + context.getName() + ") with ObjectName[" + on + "] is already registered."
178                            + " Make sure to use unique names on CamelContext when using multiple CamelContexts in the same MBeanServer.", context);
179                    } else {
180                        LOG.warn("This CamelContext(" + context.getName() + ") will be registered using the name: " + managementName
181                            + " due to clash with an existing name already registered in MBeanServer.");
182                    }
183                }
184            }
185        } catch (VetoCamelContextStartException e) {
186            // rethrow veto
187            throw e;
188        } catch (Exception e) {
189            // must rethrow to allow CamelContext fallback to non JMX agent to allow
190            // Camel to continue to run
191            throw ObjectHelper.wrapRuntimeCamelException(e);
192        }
193
194        // set the name we are going to use
195        if (context instanceof DefaultCamelContext) {
196            ((DefaultCamelContext) context).setManagementName(managementName);
197        }
198
199        try {
200            manageObject(mc);
201        } catch (Exception e) {
202            // must rethrow to allow CamelContext fallback to non JMX agent to allow
203            // Camel to continue to run
204            throw ObjectHelper.wrapRuntimeCamelException(e);
205        }
206
207        // yes we made it and are initialized
208        initialized = true;
209
210        if (mc instanceof ManagedCamelContext) {
211            camelContextMBean = (ManagedCamelContext) mc;
212        }
213
214        // register any pre registered now that we are initialized
215        enlistPreRegisteredServices();
216    }
217
218    private String findFreeName(Object mc, ManagementNameStrategy strategy, String name) throws MalformedObjectNameException {
219        // we cannot find a free name for fixed named strategies
220        if (strategy.isFixedName()) {
221            return null;
222        }
223
224        // okay try to find a free name
225        boolean done = false;
226        String newName = null;
227        while (!done) {
228            // compute the next name
229            newName = strategy.getNextName();
230            ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(newName, name);
231            done = !getManagementStrategy().isManaged(mc, on);
232            if (LOG.isTraceEnabled()) {
233                LOG.trace("Using name: {} in ObjectName[{}] exists? {}", new Object[]{name, on, done});
234            }
235        }
236        return newName;
237    }
238
239    /**
240     * After {@link CamelContext} has been enlisted in JMX using {@link #onContextStart(org.apache.camel.CamelContext)}
241     * then we can enlist any pre registered services as well, as we had to wait for {@link CamelContext} to be
242     * enlisted first.
243     * <p/>
244     * A component/endpoint/service etc. can be pre registered when using dependency injection and annotations such as
245     * {@link org.apache.camel.Produce}, {@link org.apache.camel.EndpointInject}. Therefore we need to capture those
246     * registrations up front, and then afterwards enlist in JMX when {@link CamelContext} is being started.
247     */
248    private void enlistPreRegisteredServices() {
249        if (preServices.isEmpty()) {
250            return;
251        }
252
253        LOG.debug("Registering {} pre registered services", preServices.size());
254        for (PreRegisterService pre : preServices) {
255            if (pre.getComponent() != null) {
256                onComponentAdd(pre.getName(), pre.getComponent());
257            } else if (pre.getEndpoint() != null) {
258                onEndpointAdd(pre.getEndpoint());
259            } else if (pre.getService() != null) {
260                onServiceAdd(pre.getCamelContext(), pre.getService(), pre.getRoute());
261            }
262        }
263
264        // we are done so clear the list
265        preServices.clear();
266    }
267
268    public void onContextStop(CamelContext context) {
269        // the agent hasn't been started
270        if (!initialized) {
271            return;
272        }
273        try {
274            Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
275            // the context could have been removed already
276            if (getManagementStrategy().isManaged(mc, null)) {
277                unmanageObject(mc);
278            }
279        } catch (Exception e) {
280            LOG.warn("Could not unregister CamelContext MBean", e);
281        }
282
283        camelContextMBean = null;
284    }
285
286    public void onComponentAdd(String name, Component component) {
287        // always register components as there are only a few of those
288        if (!initialized) {
289            // pre register so we can register later when we have been initialized
290            PreRegisterService pre = new PreRegisterService();
291            pre.onComponentAdd(name, component);
292            preServices.add(pre);
293            return;
294        }
295        try {
296            Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name);
297            manageObject(mc);
298        } catch (Exception e) {
299            LOG.warn("Could not register Component MBean", e);
300        }
301    }
302
303    public void onComponentRemove(String name, Component component) {
304        // the agent hasn't been started
305        if (!initialized) {
306            return;
307        }
308        try {
309            Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name);
310            unmanageObject(mc);
311        } catch (Exception e) {
312            LOG.warn("Could not unregister Component MBean", e);
313        }
314    }
315
316    /**
317     * If the endpoint is an instance of ManagedResource then register it with the
318     * mbean server, if it is not then wrap the endpoint in a {@link ManagedEndpoint} and
319     * register that with the mbean server.
320     *
321     * @param endpoint the Endpoint attempted to be added
322     */
323    public void onEndpointAdd(Endpoint endpoint) {
324        if (!initialized) {
325            // pre register so we can register later when we have been initialized
326            PreRegisterService pre = new PreRegisterService();
327            pre.onEndpointAdd(endpoint);
328            preServices.add(pre);
329            return;
330        }
331
332        if (!shouldRegister(endpoint, null)) {
333            // avoid registering if not needed
334            return;
335        }
336
337        try {
338            Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint);
339            if (me == null) {
340                // endpoint should not be managed
341                return;
342            }
343            manageObject(me);
344        } catch (Exception e) {
345            LOG.warn("Could not register Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e);
346        }
347    }
348
349    public void onEndpointRemove(Endpoint endpoint) {
350        // the agent hasn't been started
351        if (!initialized) {
352            return;
353        }
354
355        try {
356            Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint);
357            unmanageObject(me);
358        } catch (Exception e) {
359            LOG.warn("Could not unregister Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e);
360        }
361    }
362
363    public void onServiceAdd(CamelContext context, Service service, Route route) {
364        if (!initialized) {
365            // pre register so we can register later when we have been initialized
366            PreRegisterService pre = new PreRegisterService();
367            pre.onServiceAdd(context, service, route);
368            preServices.add(pre);
369            return;
370        }
371
372        // services can by any kind of misc type but also processors
373        // so we have special logic when its a processor
374
375        if (!shouldRegister(service, route)) {
376            // avoid registering if not needed
377            return;
378        }
379
380        Object managedObject = getManagedObjectForService(context, service, route);
381        if (managedObject == null) {
382            // service should not be managed
383            return;
384        }
385
386        // skip already managed services, for example if a route has been restarted
387        if (getManagementStrategy().isManaged(managedObject, null)) {
388            LOG.trace("The service is already managed: {}", service);
389            return;
390        }
391
392        try {
393            manageObject(managedObject);
394        } catch (Exception e) {
395            LOG.warn("Could not register service: " + service + " as Service MBean.", e);
396        }
397    }
398
399    public void onServiceRemove(CamelContext context, Service service, Route route) {
400        // the agent hasn't been started
401        if (!initialized) {
402            return;
403        }
404
405        Object managedObject = getManagedObjectForService(context, service, route);
406        if (managedObject != null) {
407            try {
408                unmanageObject(managedObject);
409            } catch (Exception e) {
410                LOG.warn("Could not unregister service: " + service + " as Service MBean.", e);
411            }
412        }
413    }
414
415    @SuppressWarnings("unchecked")
416    private Object getManagedObjectForService(CamelContext context, Service service, Route route) {
417        // skip channel, UoW and dont double wrap instrumentation
418        if (service instanceof Channel || service instanceof UnitOfWork || service instanceof InstrumentationProcessor) {
419            return null;
420        }
421
422        // skip non managed services
423        if (service instanceof NonManagedService) {
424            return null;
425        }
426
427        Object answer = null;
428
429        if (service instanceof ManagementAware) {
430            return ((ManagementAware<Service>) service).getManagedObject(service);
431        } else if (service instanceof Tracer) {
432            // special for tracer
433            Tracer tracer = (Tracer) service;
434            ManagedTracer mt = managedTracers.get(tracer);
435            if (mt == null) {
436                mt = new ManagedTracer(context, tracer);
437                mt.init(getManagementStrategy());
438                managedTracers.put(tracer, mt);
439            }
440            return mt;
441        } else if (service instanceof BacklogTracer) {
442            // special for backlog tracer
443            BacklogTracer backlogTracer = (BacklogTracer) service;
444            ManagedBacklogTracer mt = managedBacklogTracers.get(backlogTracer);
445            if (mt == null) {
446                mt = new ManagedBacklogTracer(context, backlogTracer);
447                mt.init(getManagementStrategy());
448                managedBacklogTracers.put(backlogTracer, mt);
449            }
450            return mt;
451        } else if (service instanceof BacklogDebugger) {
452            // special for backlog debugger
453            BacklogDebugger backlogDebugger = (BacklogDebugger) service;
454            ManagedBacklogDebugger md = managedBacklogDebuggers.get(backlogDebugger);
455            if (md == null) {
456                md = new ManagedBacklogDebugger(context, backlogDebugger);
457                md.init(getManagementStrategy());
458                managedBacklogDebuggers.put(backlogDebugger, md);
459            }
460            return md;
461        } else if (service instanceof DataFormat) {
462            answer = getManagementObjectStrategy().getManagedObjectForDataFormat(context, (DataFormat) service);
463        } else if (service instanceof Producer) {
464            answer = getManagementObjectStrategy().getManagedObjectForProducer(context, (Producer) service);
465        } else if (service instanceof Consumer) {
466            answer = getManagementObjectStrategy().getManagedObjectForConsumer(context, (Consumer) service);
467        } else if (service instanceof Processor) {
468            // special for processors as we need to do some extra work
469            return getManagedObjectForProcessor(context, (Processor) service, route);
470        } else if (service instanceof ThrottlingInflightRoutePolicy) {
471            answer = new ManagedThrottlingInflightRoutePolicy(context, (ThrottlingInflightRoutePolicy) service);
472        } else if (service instanceof ThrottlingExceptionRoutePolicy) {
473            answer = new ManagedThrottlingExceptionRoutePolicy(context, (ThrottlingExceptionRoutePolicy) service);
474        } else if (service instanceof ConsumerCache) {
475            answer = new ManagedConsumerCache(context, (ConsumerCache) service);
476        } else if (service instanceof ProducerCache) {
477            answer = new ManagedProducerCache(context, (ProducerCache) service);
478        } else if (service instanceof DefaultEndpointRegistry) {
479            answer = new ManagedEndpointRegistry(context, (DefaultEndpointRegistry) service);
480        } else if (service instanceof TypeConverterRegistry) {
481            answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service);
482        } else if (service instanceof RestRegistry) {
483            answer = new ManagedRestRegistry(context, (RestRegistry) service);
484        } else if (service instanceof InflightRepository) {
485            answer = new ManagedInflightRepository(context, (InflightRepository) service);
486        } else if (service instanceof AsyncProcessorAwaitManager) {
487            answer = new ManagedAsyncProcessorAwaitManager(context, (AsyncProcessorAwaitManager) service);
488        } else if (service instanceof RuntimeEndpointRegistry) {
489            answer = new ManagedRuntimeEndpointRegistry(context, (RuntimeEndpointRegistry) service);
490        } else if (service instanceof StreamCachingStrategy) {
491            answer = new ManagedStreamCachingStrategy(context, (StreamCachingStrategy) service);
492        } else if (service instanceof EventNotifier) {
493            answer = getManagementObjectStrategy().getManagedObjectForEventNotifier(context, (EventNotifier) service);
494        } else if (service != null) {
495            // fallback as generic service
496            answer = getManagementObjectStrategy().getManagedObjectForService(context, service);
497        }
498
499        if (answer != null && answer instanceof ManagedService) {
500            ManagedService ms = (ManagedService) answer;
501            ms.setRoute(route);
502            ms.init(getManagementStrategy());
503        }
504
505        return answer;
506    }
507
508    private Object getManagedObjectForProcessor(CamelContext context, Processor processor, Route route) {
509        // a bit of magic here as the processors we want to manage have already been registered
510        // in the wrapped processors map when Camel have instrumented the route on route initialization
511        // so the idea is now to only manage the processors from the map
512        KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = wrappedProcessors.get(processor);
513        if (holder == null) {
514            // skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc.
515            return null;
516        }
517
518        // get the managed object as it can be a specialized type such as a Delayer/Throttler etc.
519        Object managedObject = getManagementObjectStrategy().getManagedObjectForProcessor(context, processor, holder.getKey(), route);
520        // only manage if we have a name for it as otherwise we do not want to manage it anyway
521        if (managedObject != null) {
522            // is it a performance counter then we need to set our counter
523            if (managedObject instanceof PerformanceCounter) {
524                InstrumentationProcessor counter = holder.getValue();
525                if (counter != null) {
526                    // change counter to us
527                    counter.setCounter(managedObject);
528                }
529            }
530        }
531
532        return managedObject;
533    }
534
535    public void onRoutesAdd(Collection<Route> routes) {
536        for (Route route : routes) {
537
538            // if we are starting CamelContext or either of the two options has been
539            // enabled, then enlist the route as a known route
540            if (getCamelContext().getStatus().isStarting()
541                || getManagementStrategy().getManagementAgent().getRegisterAlways()
542                || getManagementStrategy().getManagementAgent().getRegisterNewRoutes()) {
543                // register as known route id
544                knowRouteIds.add(route.getId());
545            }
546
547            if (!shouldRegister(route, route)) {
548                // avoid registering if not needed, skip to next route
549                continue;
550            }
551
552            Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route);
553
554            // skip already managed routes, for example if the route has been restarted
555            if (getManagementStrategy().isManaged(mr, null)) {
556                LOG.trace("The route is already managed: {}", route);
557                continue;
558            }
559
560            // get the wrapped instrumentation processor from this route
561            // and set me as the counter
562            if (route instanceof EventDrivenConsumerRoute) {
563                EventDrivenConsumerRoute edcr = (EventDrivenConsumerRoute) route;
564                Processor processor = edcr.getProcessor();
565                if (processor instanceof CamelInternalProcessor && mr instanceof ManagedRoute) {
566                    CamelInternalProcessor internal = (CamelInternalProcessor) processor;
567                    ManagedRoute routeMBean = (ManagedRoute) mr;
568
569                    CamelInternalProcessor.InstrumentationAdvice task = internal.getAdvice(CamelInternalProcessor.InstrumentationAdvice.class);
570                    if (task != null) {
571                        // we need to wrap the counter with the camel context so we get stats updated on the context as well
572                        if (camelContextMBean != null) {
573                            CompositePerformanceCounter wrapper = new CompositePerformanceCounter(routeMBean, camelContextMBean);
574                            task.setCounter(wrapper);
575                        } else {
576                            task.setCounter(routeMBean);
577                        }
578                    }
579                }
580            }
581
582            try {
583                manageObject(mr);
584            } catch (JMException e) {
585                LOG.warn("Could not register Route MBean", e);
586            } catch (Exception e) {
587                LOG.warn("Could not create Route MBean", e);
588            }
589        }
590    }
591
592    public void onRoutesRemove(Collection<Route> routes) {
593        // the agent hasn't been started
594        if (!initialized) {
595            return;
596        }
597
598        for (Route route : routes) {
599            Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route);
600
601            // skip unmanaged routes
602            if (!getManagementStrategy().isManaged(mr, null)) {
603                LOG.trace("The route is not managed: {}", route);
604                continue;
605            }
606
607            try {
608                unmanageObject(mr);
609            } catch (Exception e) {
610                LOG.warn("Could not unregister Route MBean", e);
611            }
612
613            // remove from known routes ids, as the route has been removed
614            knowRouteIds.remove(route.getId());
615        }
616
617        // after the routes has been removed, we should clear the wrapped processors as we no longer need them
618        // as they were just a provisional map used during creation of routes
619        removeWrappedProcessorsForRoutes(routes);
620    }
621
622    public void onErrorHandlerAdd(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) {
623        if (!shouldRegister(errorHandler, null)) {
624            // avoid registering if not needed
625            return;
626        }
627
628        Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder);
629
630        // skip already managed services, for example if a route has been restarted
631        if (getManagementStrategy().isManaged(me, null)) {
632            LOG.trace("The error handler builder is already managed: {}", errorHandlerBuilder);
633            return;
634        }
635
636        try {
637            manageObject(me);
638        } catch (Exception e) {
639            LOG.warn("Could not register error handler builder: " + errorHandlerBuilder + " as ErrorHandler MBean.", e);
640        }
641    }
642
643    public void onErrorHandlerRemove(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) {
644        if (!initialized) {
645            return;
646        }
647
648        Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder);
649        if (me != null) {
650            try {
651                unmanageObject(me);
652            } catch (Exception e) {
653                LOG.warn("Could not unregister error handler: " + me + " as ErrorHandler MBean.", e);
654            }
655        }
656    }
657
658    public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool, String id,
659                                String sourceId, String routeId, String threadPoolProfileId) {
660
661        if (!shouldRegister(threadPool, null)) {
662            // avoid registering if not needed
663            return;
664        }
665
666        Object mtp = getManagementObjectStrategy().getManagedObjectForThreadPool(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId);
667
668        // skip already managed services, for example if a route has been restarted
669        if (getManagementStrategy().isManaged(mtp, null)) {
670            LOG.trace("The thread pool is already managed: {}", threadPool);
671            return;
672        }
673
674        try {
675            manageObject(mtp);
676            // store a reference so we can unmanage from JMX when the thread pool is removed
677            // we need to keep track here, as we cannot re-construct the thread pool ObjectName when removing the thread pool
678            managedThreadPools.put(threadPool, mtp);
679        } catch (Exception e) {
680            LOG.warn("Could not register thread pool: " + threadPool + " as ThreadPool MBean.", e);
681        }
682    }
683
684    public void onThreadPoolRemove(CamelContext camelContext, ThreadPoolExecutor threadPool) {
685        if (!initialized) {
686            return;
687        }
688
689        // lookup the thread pool and remove it from JMX
690        Object mtp = managedThreadPools.remove(threadPool);
691        if (mtp != null) {
692            // skip unmanaged routes
693            if (!getManagementStrategy().isManaged(mtp, null)) {
694                LOG.trace("The thread pool is not managed: {}", threadPool);
695                return;
696            }
697
698            try {
699                unmanageObject(mtp);
700            } catch (Exception e) {
701                LOG.warn("Could not unregister ThreadPool MBean", e);
702            }
703        }
704    }
705
706    public void onRouteContextCreate(RouteContext routeContext) {
707        if (!initialized) {
708            return;
709        }
710
711        // Create a map (ProcessorType -> PerformanceCounter)
712        // to be passed to InstrumentationInterceptStrategy.
713        Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters =
714                new HashMap<ProcessorDefinition<?>, PerformanceCounter>();
715
716        // Each processor in a route will have its own performance counter.
717        // These performance counter will be embedded to InstrumentationProcessor
718        // and wrap the appropriate processor by InstrumentationInterceptStrategy.
719        RouteDefinition route = routeContext.getRoute();
720
721        // register performance counters for all processors and its children
722        for (ProcessorDefinition<?> processor : route.getOutputs()) {
723            registerPerformanceCounters(routeContext, processor, registeredCounters);
724        }
725
726        // set this managed intercept strategy that executes the JMX instrumentation for performance metrics
727        // so our registered counters can be used for fine grained performance instrumentation
728        routeContext.setManagedInterceptStrategy(new InstrumentationInterceptStrategy(registeredCounters, wrappedProcessors));
729    }
730
731    /**
732     * Removes the wrapped processors for the given routes, as they are no longer in use.
733     * <p/>
734     * This is needed to avoid accumulating memory, if a lot of routes is being added and removed.
735     *
736     * @param routes the routes
737     */
738    private void removeWrappedProcessorsForRoutes(Collection<Route> routes) {
739        // loop the routes, and remove the route associated wrapped processors, as they are no longer in use
740        for (Route route : routes) {
741            String id = route.getId();
742
743            Iterator<KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> it = wrappedProcessors.values().iterator();
744            while (it.hasNext()) {
745                KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = it.next();
746                RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey());
747                if (def != null && id.equals(def.getId())) {
748                    it.remove();
749                }
750            }
751        }
752        
753    }
754
755    private void registerPerformanceCounters(RouteContext routeContext, ProcessorDefinition<?> processor,
756                                             Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters) {
757
758        // traverse children if any exists
759        List<ProcessorDefinition<?>> children = processor.getOutputs();
760        for (ProcessorDefinition<?> child : children) {
761            registerPerformanceCounters(routeContext, child, registeredCounters);
762        }
763
764        // skip processors that should not be registered
765        if (!registerProcessor(processor)) {
766            return;
767        }
768
769        // okay this is a processor we would like to manage so create the
770        // a delegate performance counter that acts as the placeholder in the interceptor
771        // that then delegates to the real mbean which we register later in the onServiceAdd method
772        DelegatePerformanceCounter pc = new DelegatePerformanceCounter();
773        // set statistics enabled depending on the option
774        boolean enabled = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isDefaultOrExtended();
775        pc.setStatisticsEnabled(enabled);
776
777        // and add it as a a registered counter that will be used lazy when Camel
778        // does the instrumentation of the route and adds the InstrumentationProcessor
779        // that does the actual performance metrics gatherings at runtime
780        registeredCounters.put(processor, pc);
781    }
782
783    /**
784     * Should the given processor be registered.
785     */
786    protected boolean registerProcessor(ProcessorDefinition<?> processor) {
787        // skip on exception
788        if (processor instanceof OnExceptionDefinition) {
789            return false;
790        }
791        // skip on completion
792        if (processor instanceof OnCompletionDefinition) {
793            return false;
794        }
795        // skip intercept
796        if (processor instanceof InterceptDefinition) {
797            return false;
798        }
799        // skip aop
800        if (processor instanceof AOPDefinition) {
801            return false;
802        }
803        // skip policy
804        if (processor instanceof PolicyDefinition) {
805            return false;
806        }
807
808        // only if custom id assigned
809        boolean only = getManagementStrategy().getManagementAgent().getOnlyRegisterProcessorWithCustomId() != null
810                && getManagementStrategy().getManagementAgent().getOnlyRegisterProcessorWithCustomId();
811        if (only) {
812            return processor.hasCustomIdAssigned();
813        }
814
815        // use customer filter
816        return getManagementStrategy().manageProcessor(processor);
817    }
818
819    private ManagementStrategy getManagementStrategy() {
820        ObjectHelper.notNull(camelContext, "CamelContext");
821        return camelContext.getManagementStrategy();
822    }
823
824    private ManagementObjectStrategy getManagementObjectStrategy() {
825        ObjectHelper.notNull(camelContext, "CamelContext");
826        return camelContext.getManagementStrategy().getManagementObjectStrategy();
827    }
828
829    /**
830     * Strategy for managing the object
831     *
832     * @param me the managed object
833     * @throws Exception is thrown if error registering the object for management
834     */
835    protected void manageObject(Object me) throws Exception {
836        getManagementStrategy().manageObject(me);
837        if (me instanceof TimerListener) {
838            TimerListener timer = (TimerListener) me;
839            loadTimer.addTimerListener(timer);
840        }
841    }
842
843    /**
844     * Un-manages the object.
845     *
846     * @param me the managed object
847     * @throws Exception is thrown if error unregistering the managed object
848     */
849    protected void unmanageObject(Object me) throws Exception {
850        if (me instanceof TimerListener) {
851            TimerListener timer = (TimerListener) me;
852            loadTimer.removeTimerListener(timer);
853        }
854        getManagementStrategy().unmanageObject(me);
855    }
856
857    /**
858     * Whether or not to register the mbean.
859     * <p/>
860     * The {@link ManagementAgent} has options which controls when to register.
861     * This allows us to only register mbeans accordingly. For example by default any
862     * dynamic endpoints is not registered. This avoids to register excessive mbeans, which
863     * most often is not desired.
864     *
865     * @param service the object to register
866     * @param route   an optional route the mbean is associated with, can be <tt>null</tt>
867     * @return <tt>true</tt> to register, <tt>false</tt> to skip registering
868     */
869    protected boolean shouldRegister(Object service, Route route) {
870        // the agent hasn't been started
871        if (!initialized) {
872            return false;
873        }
874
875        LOG.trace("Checking whether to register {} from route: {}", service, route);
876
877        ManagementAgent agent = getManagementStrategy().getManagementAgent();
878        if (agent == null) {
879            // do not register if no agent
880            return false;
881        }
882
883        // always register if we are starting CamelContext
884        if (getCamelContext().getStatus().isStarting()) {
885            return true;
886        }
887
888        // always register if we are setting up routes
889        if (getCamelContext().isSetupRoutes()) {
890            return true;
891        }
892
893        // register if always is enabled
894        if (agent.getRegisterAlways()) {
895            return true;
896        }
897
898        // is it a known route then always accept
899        if (route != null && knowRouteIds.contains(route.getId())) {
900            return true;
901        }
902
903        // only register if we are starting a new route, and current thread is in starting routes mode
904        if (agent.getRegisterNewRoutes()) {
905            // no specific route, then fallback to see if this thread is starting routes
906            // which is kept as state on the camel context
907            return getCamelContext().isStartingRoutes();
908        }
909
910        return false;
911    }
912
913    @Override
914    protected void doStart() throws Exception {
915        ObjectHelper.notNull(camelContext, "CamelContext");
916
917        // defer starting the timer manager until CamelContext has been fully started
918        camelContext.addStartupListener(loadTimerStartupListener);
919    }
920
921    private final class TimerListenerManagerStartupListener implements StartupListener {
922
923        @Override
924        public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception {
925            // we are disabled either if configured explicit, or if level is off
926            boolean load = camelContext.getManagementStrategy().getManagementAgent().getLoadStatisticsEnabled() != null
927                    && camelContext.getManagementStrategy().getManagementAgent().getLoadStatisticsEnabled();
928            boolean disabled = !load || camelContext.getManagementStrategy().getStatisticsLevel() == ManagementStatisticsLevel.Off;
929
930            LOG.debug("Load performance statistics {}", disabled ? "disabled" : "enabled");
931            if (!disabled) {
932                // must use 1 sec interval as the load statistics is based on 1 sec calculations
933                loadTimer.setInterval(1000);
934                // we have to defer enlisting timer lister manager as a service until CamelContext has been started
935                getCamelContext().addService(loadTimer);
936            }
937        }
938    }
939
940    @Override
941    protected void doStop() throws Exception {
942        initialized = false;
943        knowRouteIds.clear();
944        preServices.clear();
945        wrappedProcessors.clear();
946        managedTracers.clear();
947        managedBacklogTracers.clear();
948        managedBacklogDebuggers.clear();
949        managedThreadPools.clear();
950    }
951
952    /**
953     * Class which holds any pre registration details.
954     *
955     * @see org.apache.camel.management.DefaultManagementLifecycleStrategy#enlistPreRegisteredServices()
956     */
957    private static final class PreRegisterService {
958
959        private String name;
960        private Component component;
961        private Endpoint endpoint;
962        private CamelContext camelContext;
963        private Service service;
964        private Route route;
965
966        public void onComponentAdd(String name, Component component) {
967            this.name = name;
968            this.component = component;
969        }
970
971        public void onEndpointAdd(Endpoint endpoint) {
972            this.endpoint = endpoint;
973        }
974
975        public void onServiceAdd(CamelContext camelContext, Service service, Route route) {
976            this.camelContext = camelContext;
977            this.service = service;
978            this.route = route;
979        }
980
981        public String getName() {
982            return name;
983        }
984
985        public Component getComponent() {
986            return component;
987        }
988
989        public Endpoint getEndpoint() {
990            return endpoint;
991        }
992
993        public CamelContext getCamelContext() {
994            return camelContext;
995        }
996
997        public Service getService() {
998            return service;
999        }
1000
1001        public Route getRoute() {
1002            return route;
1003        }
1004    }
1005
1006}
1007