001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.management;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.ThreadPoolExecutor;
028import javax.management.JMException;
029import javax.management.MalformedObjectNameException;
030import javax.management.ObjectName;
031
032import org.apache.camel.CamelContext;
033import org.apache.camel.CamelContextAware;
034import org.apache.camel.Channel;
035import org.apache.camel.Component;
036import org.apache.camel.Consumer;
037import org.apache.camel.Endpoint;
038import org.apache.camel.ErrorHandlerFactory;
039import org.apache.camel.ManagementStatisticsLevel;
040import org.apache.camel.NonManagedService;
041import org.apache.camel.Processor;
042import org.apache.camel.Producer;
043import org.apache.camel.Route;
044import org.apache.camel.Service;
045import org.apache.camel.StartupListener;
046import org.apache.camel.TimerListener;
047import org.apache.camel.VetoCamelContextStartException;
048import org.apache.camel.api.management.PerformanceCounter;
049import org.apache.camel.impl.ConsumerCache;
050import org.apache.camel.impl.DefaultCamelContext;
051import org.apache.camel.impl.DefaultEndpointRegistry;
052import org.apache.camel.impl.EventDrivenConsumerRoute;
053import org.apache.camel.impl.ProducerCache;
054import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
055import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager;
056import org.apache.camel.management.mbean.ManagedBacklogDebugger;
057import org.apache.camel.management.mbean.ManagedBacklogTracer;
058import org.apache.camel.management.mbean.ManagedCamelContext;
059import org.apache.camel.management.mbean.ManagedConsumerCache;
060import org.apache.camel.management.mbean.ManagedEndpoint;
061import org.apache.camel.management.mbean.ManagedEndpointRegistry;
062import org.apache.camel.management.mbean.ManagedInflightRepository;
063import org.apache.camel.management.mbean.ManagedProducerCache;
064import org.apache.camel.management.mbean.ManagedRestRegistry;
065import org.apache.camel.management.mbean.ManagedRoute;
066import org.apache.camel.management.mbean.ManagedRuntimeEndpointRegistry;
067import org.apache.camel.management.mbean.ManagedService;
068import org.apache.camel.management.mbean.ManagedStreamCachingStrategy;
069import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy;
070import org.apache.camel.management.mbean.ManagedTracer;
071import org.apache.camel.management.mbean.ManagedTypeConverterRegistry;
072import org.apache.camel.model.AOPDefinition;
073import org.apache.camel.model.InterceptDefinition;
074import org.apache.camel.model.OnCompletionDefinition;
075import org.apache.camel.model.OnExceptionDefinition;
076import org.apache.camel.model.PolicyDefinition;
077import org.apache.camel.model.ProcessorDefinition;
078import org.apache.camel.model.ProcessorDefinitionHelper;
079import org.apache.camel.model.RouteDefinition;
080import org.apache.camel.processor.CamelInternalProcessor;
081import org.apache.camel.processor.interceptor.BacklogDebugger;
082import org.apache.camel.processor.interceptor.BacklogTracer;
083import org.apache.camel.processor.interceptor.Tracer;
084import org.apache.camel.spi.AsyncProcessorAwaitManager;
085import org.apache.camel.spi.DataFormat;
086import org.apache.camel.spi.EventNotifier;
087import org.apache.camel.spi.InflightRepository;
088import org.apache.camel.spi.LifecycleStrategy;
089import org.apache.camel.spi.ManagementAgent;
090import org.apache.camel.spi.ManagementAware;
091import org.apache.camel.spi.ManagementNameStrategy;
092import org.apache.camel.spi.ManagementObjectStrategy;
093import org.apache.camel.spi.ManagementStrategy;
094import org.apache.camel.spi.RestRegistry;
095import org.apache.camel.spi.RouteContext;
096import org.apache.camel.spi.RuntimeEndpointRegistry;
097import org.apache.camel.spi.StreamCachingStrategy;
098import org.apache.camel.spi.TypeConverterRegistry;
099import org.apache.camel.spi.UnitOfWork;
100import org.apache.camel.support.ServiceSupport;
101import org.apache.camel.support.TimerListenerManager;
102import org.apache.camel.util.KeyValueHolder;
103import org.apache.camel.util.ObjectHelper;
104import org.slf4j.Logger;
105import org.slf4j.LoggerFactory;
106
107/**
108 * Default JMX managed lifecycle strategy that registered objects using the configured
109 * {@link org.apache.camel.spi.ManagementStrategy}.
110 *
111 * @see org.apache.camel.spi.ManagementStrategy
112 * @version 
113 */
114@SuppressWarnings("deprecation")
115public class DefaultManagementLifecycleStrategy extends ServiceSupport implements LifecycleStrategy, CamelContextAware {
116
117    private static final Logger LOG = LoggerFactory.getLogger(DefaultManagementLifecycleStrategy.class);
118    // the wrapped processors is for performance counters, which are in use for the created routes
119    // when a route is removed, we should remove the associated processors from this map
120    private final Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors =
121            new HashMap<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>>();
122    private final List<PreRegisterService> preServices = new ArrayList<PreRegisterService>();
123    private final TimerListenerManager loadTimer = new ManagedLoadTimer();
124    private final TimerListenerManagerStartupListener loadTimerStartupListener = new TimerListenerManagerStartupListener();
125    private volatile CamelContext camelContext;
126    private volatile ManagedCamelContext camelContextMBean;
127    private volatile boolean initialized;
128    private final Set<String> knowRouteIds = new HashSet<String>();
129    private final Map<Tracer, ManagedTracer> managedTracers = new HashMap<Tracer, ManagedTracer>();
130    private final Map<BacklogTracer, ManagedBacklogTracer> managedBacklogTracers = new HashMap<BacklogTracer, ManagedBacklogTracer>();
131    private final Map<BacklogDebugger, ManagedBacklogDebugger> managedBacklogDebuggers = new HashMap<BacklogDebugger, ManagedBacklogDebugger>();
132    private final Map<ThreadPoolExecutor, Object> managedThreadPools = new HashMap<ThreadPoolExecutor, Object>();
133
134    public DefaultManagementLifecycleStrategy() {
135    }
136
137    public DefaultManagementLifecycleStrategy(CamelContext camelContext) {
138        this.camelContext = camelContext;
139    }
140
141    public CamelContext getCamelContext() {
142        return camelContext;
143    }
144
145    public void setCamelContext(CamelContext camelContext) {
146        this.camelContext = camelContext;
147    }
148
149    public void onContextStart(CamelContext context) throws VetoCamelContextStartException {
150        Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
151
152        String name = context.getName();
153        String managementName = context.getManagementNameStrategy().getName();
154
155        try {
156            boolean done = false;
157            while (!done) {
158                ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(managementName, name);
159                boolean exists = getManagementStrategy().isManaged(mc, on);
160                if (!exists) {
161                    done = true;
162                } else {
163                    // okay there exists already a CamelContext with this name, we can try to fix it by finding a free name
164                    boolean fixed = false;
165                    // if we use the default name strategy we can find a free name to use
166                    String newName = findFreeName(mc, context.getManagementNameStrategy(), name);
167                    if (newName != null) {
168                        // use this as the fixed name
169                        fixed = true;
170                        done = true;
171                        managementName = newName;
172                    }
173                    // we could not fix it so veto starting camel
174                    if (!fixed) {
175                        throw new VetoCamelContextStartException("CamelContext (" + context.getName() + ") with ObjectName[" + on + "] is already registered."
176                            + " Make sure to use unique names on CamelContext when using multiple CamelContexts in the same MBeanServer.", context);
177                    } else {
178                        LOG.warn("This CamelContext(" + context.getName() + ") will be registered using the name: " + managementName
179                            + " due to clash with an existing name already registered in MBeanServer.");
180                    }
181                }
182            }
183        } catch (VetoCamelContextStartException e) {
184            // rethrow veto
185            throw e;
186        } catch (Exception e) {
187            // must rethrow to allow CamelContext fallback to non JMX agent to allow
188            // Camel to continue to run
189            throw ObjectHelper.wrapRuntimeCamelException(e);
190        }
191
192        // set the name we are going to use
193        if (context instanceof DefaultCamelContext) {
194            ((DefaultCamelContext) context).setManagementName(managementName);
195        }
196
197        try {
198            manageObject(mc);
199        } catch (Exception e) {
200            // must rethrow to allow CamelContext fallback to non JMX agent to allow
201            // Camel to continue to run
202            throw ObjectHelper.wrapRuntimeCamelException(e);
203        }
204
205        // yes we made it and are initialized
206        initialized = true;
207
208        if (mc instanceof ManagedCamelContext) {
209            camelContextMBean = (ManagedCamelContext) mc;
210        }
211
212        // register any pre registered now that we are initialized
213        enlistPreRegisteredServices();
214    }
215
216    private String findFreeName(Object mc, ManagementNameStrategy strategy, String name) throws MalformedObjectNameException {
217        // we cannot find a free name for fixed named strategies
218        if (strategy.isFixedName()) {
219            return null;
220        }
221
222        // okay try to find a free name
223        boolean done = false;
224        String newName = null;
225        while (!done) {
226            // compute the next name
227            newName = strategy.getNextName();
228            ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(newName, name);
229            done = !getManagementStrategy().isManaged(mc, on);
230            if (LOG.isTraceEnabled()) {
231                LOG.trace("Using name: {} in ObjectName[{}] exists? {}", new Object[]{name, on, done});
232            }
233        }
234        return newName;
235    }
236
237    /**
238     * After {@link CamelContext} has been enlisted in JMX using {@link #onContextStart(org.apache.camel.CamelContext)}
239     * then we can enlist any pre registered services as well, as we had to wait for {@link CamelContext} to be
240     * enlisted first.
241     * <p/>
242     * A component/endpoint/service etc. can be pre registered when using dependency injection and annotations such as
243     * {@link org.apache.camel.Produce}, {@link org.apache.camel.EndpointInject}. Therefore we need to capture those
244     * registrations up front, and then afterwards enlist in JMX when {@link CamelContext} is being started.
245     */
246    private void enlistPreRegisteredServices() {
247        if (preServices.isEmpty()) {
248            return;
249        }
250
251        LOG.debug("Registering {} pre registered services", preServices.size());
252        for (PreRegisterService pre : preServices) {
253            if (pre.getComponent() != null) {
254                onComponentAdd(pre.getName(), pre.getComponent());
255            } else if (pre.getEndpoint() != null) {
256                onEndpointAdd(pre.getEndpoint());
257            } else if (pre.getService() != null) {
258                onServiceAdd(pre.getCamelContext(), pre.getService(), pre.getRoute());
259            }
260        }
261
262        // we are done so clear the list
263        preServices.clear();
264    }
265
266    public void onContextStop(CamelContext context) {
267        // the agent hasn't been started
268        if (!initialized) {
269            return;
270        }
271        try {
272            Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
273            // the context could have been removed already
274            if (getManagementStrategy().isManaged(mc, null)) {
275                unmanageObject(mc);
276            }
277        } catch (Exception e) {
278            LOG.warn("Could not unregister CamelContext MBean", e);
279        }
280
281        camelContextMBean = null;
282    }
283
284    public void onComponentAdd(String name, Component component) {
285        // always register components as there are only a few of those
286        if (!initialized) {
287            // pre register so we can register later when we have been initialized
288            PreRegisterService pre = new PreRegisterService();
289            pre.onComponentAdd(name, component);
290            preServices.add(pre);
291            return;
292        }
293        try {
294            Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name);
295            manageObject(mc);
296        } catch (Exception e) {
297            LOG.warn("Could not register Component MBean", e);
298        }
299    }
300
301    public void onComponentRemove(String name, Component component) {
302        // the agent hasn't been started
303        if (!initialized) {
304            return;
305        }
306        try {
307            Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name);
308            unmanageObject(mc);
309        } catch (Exception e) {
310            LOG.warn("Could not unregister Component MBean", e);
311        }
312    }
313
314    /**
315     * If the endpoint is an instance of ManagedResource then register it with the
316     * mbean server, if it is not then wrap the endpoint in a {@link ManagedEndpoint} and
317     * register that with the mbean server.
318     *
319     * @param endpoint the Endpoint attempted to be added
320     */
321    public void onEndpointAdd(Endpoint endpoint) {
322        if (!initialized) {
323            // pre register so we can register later when we have been initialized
324            PreRegisterService pre = new PreRegisterService();
325            pre.onEndpointAdd(endpoint);
326            preServices.add(pre);
327            return;
328        }
329
330        if (!shouldRegister(endpoint, null)) {
331            // avoid registering if not needed
332            return;
333        }
334
335        try {
336            Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint);
337            if (me == null) {
338                // endpoint should not be managed
339                return;
340            }
341            manageObject(me);
342        } catch (Exception e) {
343            LOG.warn("Could not register Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e);
344        }
345    }
346
347    public void onEndpointRemove(Endpoint endpoint) {
348        // the agent hasn't been started
349        if (!initialized) {
350            return;
351        }
352
353        try {
354            Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint);
355            unmanageObject(me);
356        } catch (Exception e) {
357            LOG.warn("Could not unregister Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e);
358        }
359    }
360
361    public void onServiceAdd(CamelContext context, Service service, Route route) {
362        if (!initialized) {
363            // pre register so we can register later when we have been initialized
364            PreRegisterService pre = new PreRegisterService();
365            pre.onServiceAdd(context, service, route);
366            preServices.add(pre);
367            return;
368        }
369
370        // services can by any kind of misc type but also processors
371        // so we have special logic when its a processor
372
373        if (!shouldRegister(service, route)) {
374            // avoid registering if not needed
375            return;
376        }
377
378        Object managedObject = getManagedObjectForService(context, service, route);
379        if (managedObject == null) {
380            // service should not be managed
381            return;
382        }
383
384        // skip already managed services, for example if a route has been restarted
385        if (getManagementStrategy().isManaged(managedObject, null)) {
386            LOG.trace("The service is already managed: {}", service);
387            return;
388        }
389
390        try {
391            manageObject(managedObject);
392        } catch (Exception e) {
393            LOG.warn("Could not register service: " + service + " as Service MBean.", e);
394        }
395    }
396
397    public void onServiceRemove(CamelContext context, Service service, Route route) {
398        // the agent hasn't been started
399        if (!initialized) {
400            return;
401        }
402
403        Object managedObject = getManagedObjectForService(context, service, route);
404        if (managedObject != null) {
405            try {
406                unmanageObject(managedObject);
407            } catch (Exception e) {
408                LOG.warn("Could not unregister service: " + service + " as Service MBean.", e);
409            }
410        }
411    }
412
413    @SuppressWarnings("unchecked")
414    private Object getManagedObjectForService(CamelContext context, Service service, Route route) {
415        // skip channel, UoW and dont double wrap instrumentation
416        if (service instanceof Channel || service instanceof UnitOfWork || service instanceof InstrumentationProcessor) {
417            return null;
418        }
419
420        // skip non managed services
421        if (service instanceof NonManagedService) {
422            return null;
423        }
424
425        Object answer = null;
426
427        if (service instanceof ManagementAware) {
428            return ((ManagementAware<Service>) service).getManagedObject(service);
429        } else if (service instanceof Tracer) {
430            // special for tracer
431            Tracer tracer = (Tracer) service;
432            ManagedTracer mt = managedTracers.get(tracer);
433            if (mt == null) {
434                mt = new ManagedTracer(context, tracer);
435                mt.init(getManagementStrategy());
436                managedTracers.put(tracer, mt);
437            }
438            return mt;
439        } else if (service instanceof BacklogTracer) {
440            // special for backlog tracer
441            BacklogTracer backlogTracer = (BacklogTracer) service;
442            ManagedBacklogTracer mt = managedBacklogTracers.get(backlogTracer);
443            if (mt == null) {
444                mt = new ManagedBacklogTracer(context, backlogTracer);
445                mt.init(getManagementStrategy());
446                managedBacklogTracers.put(backlogTracer, mt);
447            }
448            return mt;
449        } else if (service instanceof BacklogDebugger) {
450            // special for backlog debugger
451            BacklogDebugger backlogDebugger = (BacklogDebugger) service;
452            ManagedBacklogDebugger md = managedBacklogDebuggers.get(backlogDebugger);
453            if (md == null) {
454                md = new ManagedBacklogDebugger(context, backlogDebugger);
455                md.init(getManagementStrategy());
456                managedBacklogDebuggers.put(backlogDebugger, md);
457            }
458            return md;
459        } else if (service instanceof DataFormat) {
460            answer = getManagementObjectStrategy().getManagedObjectForDataFormat(context, (DataFormat) service);
461        } else if (service instanceof Producer) {
462            answer = getManagementObjectStrategy().getManagedObjectForProducer(context, (Producer) service);
463        } else if (service instanceof Consumer) {
464            answer = getManagementObjectStrategy().getManagedObjectForConsumer(context, (Consumer) service);
465        } else if (service instanceof Processor) {
466            // special for processors as we need to do some extra work
467            return getManagedObjectForProcessor(context, (Processor) service, route);
468        } else if (service instanceof ThrottlingInflightRoutePolicy) {
469            answer = new ManagedThrottlingInflightRoutePolicy(context, (ThrottlingInflightRoutePolicy) service);
470        } else if (service instanceof ConsumerCache) {
471            answer = new ManagedConsumerCache(context, (ConsumerCache) service);
472        } else if (service instanceof ProducerCache) {
473            answer = new ManagedProducerCache(context, (ProducerCache) service);
474        } else if (service instanceof DefaultEndpointRegistry) {
475            answer = new ManagedEndpointRegistry(context, (DefaultEndpointRegistry) service);
476        } else if (service instanceof TypeConverterRegistry) {
477            answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service);
478        } else if (service instanceof RestRegistry) {
479            answer = new ManagedRestRegistry(context, (RestRegistry) service);
480        } else if (service instanceof InflightRepository) {
481            answer = new ManagedInflightRepository(context, (InflightRepository) service);
482        } else if (service instanceof AsyncProcessorAwaitManager) {
483            answer = new ManagedAsyncProcessorAwaitManager(context, (AsyncProcessorAwaitManager) service);
484        } else if (service instanceof RuntimeEndpointRegistry) {
485            answer = new ManagedRuntimeEndpointRegistry(context, (RuntimeEndpointRegistry) service);
486        } else if (service instanceof StreamCachingStrategy) {
487            answer = new ManagedStreamCachingStrategy(context, (StreamCachingStrategy) service);
488        } else if (service instanceof EventNotifier) {
489            answer = getManagementObjectStrategy().getManagedObjectForEventNotifier(context, (EventNotifier) service);
490        } else if (service != null) {
491            // fallback as generic service
492            answer = getManagementObjectStrategy().getManagedObjectForService(context, service);
493        }
494
495        if (answer != null && answer instanceof ManagedService) {
496            ManagedService ms = (ManagedService) answer;
497            ms.setRoute(route);
498            ms.init(getManagementStrategy());
499        }
500
501        return answer;
502    }
503
504    private Object getManagedObjectForProcessor(CamelContext context, Processor processor, Route route) {
505        // a bit of magic here as the processors we want to manage have already been registered
506        // in the wrapped processors map when Camel have instrumented the route on route initialization
507        // so the idea is now to only manage the processors from the map
508        KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = wrappedProcessors.get(processor);
509        if (holder == null) {
510            // skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc.
511            return null;
512        }
513
514        // get the managed object as it can be a specialized type such as a Delayer/Throttler etc.
515        Object managedObject = getManagementObjectStrategy().getManagedObjectForProcessor(context, processor, holder.getKey(), route);
516        // only manage if we have a name for it as otherwise we do not want to manage it anyway
517        if (managedObject != null) {
518            // is it a performance counter then we need to set our counter
519            if (managedObject instanceof PerformanceCounter) {
520                InstrumentationProcessor counter = holder.getValue();
521                if (counter != null) {
522                    // change counter to us
523                    counter.setCounter(managedObject);
524                }
525            }
526        }
527
528        return managedObject;
529    }
530
531    public void onRoutesAdd(Collection<Route> routes) {
532        for (Route route : routes) {
533
534            // if we are starting CamelContext or either of the two options has been
535            // enabled, then enlist the route as a known route
536            if (getCamelContext().getStatus().isStarting()
537                || getManagementStrategy().getManagementAgent().getRegisterAlways()
538                || getManagementStrategy().getManagementAgent().getRegisterNewRoutes()) {
539                // register as known route id
540                knowRouteIds.add(route.getId());
541            }
542
543            if (!shouldRegister(route, route)) {
544                // avoid registering if not needed, skip to next route
545                continue;
546            }
547
548            Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route);
549
550            // skip already managed routes, for example if the route has been restarted
551            if (getManagementStrategy().isManaged(mr, null)) {
552                LOG.trace("The route is already managed: {}", route);
553                continue;
554            }
555
556            // get the wrapped instrumentation processor from this route
557            // and set me as the counter
558            if (route instanceof EventDrivenConsumerRoute) {
559                EventDrivenConsumerRoute edcr = (EventDrivenConsumerRoute) route;
560                Processor processor = edcr.getProcessor();
561                if (processor instanceof CamelInternalProcessor && mr instanceof ManagedRoute) {
562                    CamelInternalProcessor internal = (CamelInternalProcessor) processor;
563                    ManagedRoute routeMBean = (ManagedRoute) mr;
564
565                    CamelInternalProcessor.InstrumentationAdvice task = internal.getAdvice(CamelInternalProcessor.InstrumentationAdvice.class);
566                    if (task != null) {
567                        // we need to wrap the counter with the camel context so we get stats updated on the context as well
568                        if (camelContextMBean != null) {
569                            CompositePerformanceCounter wrapper = new CompositePerformanceCounter(routeMBean, camelContextMBean);
570                            task.setCounter(wrapper);
571                        } else {
572                            task.setCounter(routeMBean);
573                        }
574                    }
575                }
576            }
577
578            try {
579                manageObject(mr);
580            } catch (JMException e) {
581                LOG.warn("Could not register Route MBean", e);
582            } catch (Exception e) {
583                LOG.warn("Could not create Route MBean", e);
584            }
585        }
586    }
587
588    public void onRoutesRemove(Collection<Route> routes) {
589        // the agent hasn't been started
590        if (!initialized) {
591            return;
592        }
593
594        for (Route route : routes) {
595            Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route);
596
597            // skip unmanaged routes
598            if (!getManagementStrategy().isManaged(mr, null)) {
599                LOG.trace("The route is not managed: {}", route);
600                continue;
601            }
602
603            try {
604                unmanageObject(mr);
605            } catch (Exception e) {
606                LOG.warn("Could not unregister Route MBean", e);
607            }
608
609            // remove from known routes ids, as the route has been removed
610            knowRouteIds.remove(route.getId());
611        }
612
613        // after the routes has been removed, we should clear the wrapped processors as we no longer need them
614        // as they were just a provisional map used during creation of routes
615        removeWrappedProcessorsForRoutes(routes);
616    }
617
618    public void onErrorHandlerAdd(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) {
619        if (!shouldRegister(errorHandler, null)) {
620            // avoid registering if not needed
621            return;
622        }
623
624        Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder);
625
626        // skip already managed services, for example if a route has been restarted
627        if (getManagementStrategy().isManaged(me, null)) {
628            LOG.trace("The error handler builder is already managed: {}", errorHandlerBuilder);
629            return;
630        }
631
632        try {
633            manageObject(me);
634        } catch (Exception e) {
635            LOG.warn("Could not register error handler builder: " + errorHandlerBuilder + " as ErrorHandler MBean.", e);
636        }
637    }
638
639    public void onErrorHandlerRemove(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) {
640        if (!initialized) {
641            return;
642        }
643
644        Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder);
645        if (me != null) {
646            try {
647                unmanageObject(me);
648            } catch (Exception e) {
649                LOG.warn("Could not unregister error handler: " + me + " as ErrorHandler MBean.", e);
650            }
651        }
652    }
653
654    public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool, String id,
655                                String sourceId, String routeId, String threadPoolProfileId) {
656
657        if (!shouldRegister(threadPool, null)) {
658            // avoid registering if not needed
659            return;
660        }
661
662        Object mtp = getManagementObjectStrategy().getManagedObjectForThreadPool(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId);
663
664        // skip already managed services, for example if a route has been restarted
665        if (getManagementStrategy().isManaged(mtp, null)) {
666            LOG.trace("The thread pool is already managed: {}", threadPool);
667            return;
668        }
669
670        try {
671            manageObject(mtp);
672            // store a reference so we can unmanage from JMX when the thread pool is removed
673            // we need to keep track here, as we cannot re-construct the thread pool ObjectName when removing the thread pool
674            managedThreadPools.put(threadPool, mtp);
675        } catch (Exception e) {
676            LOG.warn("Could not register thread pool: " + threadPool + " as ThreadPool MBean.", e);
677        }
678    }
679
680    public void onThreadPoolRemove(CamelContext camelContext, ThreadPoolExecutor threadPool) {
681        if (!initialized) {
682            return;
683        }
684
685        // lookup the thread pool and remove it from JMX
686        Object mtp = managedThreadPools.remove(threadPool);
687        if (mtp != null) {
688            // skip unmanaged routes
689            if (!getManagementStrategy().isManaged(mtp, null)) {
690                LOG.trace("The thread pool is not managed: {}", threadPool);
691                return;
692            }
693
694            try {
695                unmanageObject(mtp);
696            } catch (Exception e) {
697                LOG.warn("Could not unregister ThreadPool MBean", e);
698            }
699        }
700    }
701
702    public void onRouteContextCreate(RouteContext routeContext) {
703        if (!initialized) {
704            return;
705        }
706
707        // Create a map (ProcessorType -> PerformanceCounter)
708        // to be passed to InstrumentationInterceptStrategy.
709        Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters =
710                new HashMap<ProcessorDefinition<?>, PerformanceCounter>();
711
712        // Each processor in a route will have its own performance counter.
713        // These performance counter will be embedded to InstrumentationProcessor
714        // and wrap the appropriate processor by InstrumentationInterceptStrategy.
715        RouteDefinition route = routeContext.getRoute();
716
717        // register performance counters for all processors and its children
718        for (ProcessorDefinition<?> processor : route.getOutputs()) {
719            registerPerformanceCounters(routeContext, processor, registeredCounters);
720        }
721
722        // set this managed intercept strategy that executes the JMX instrumentation for performance metrics
723        // so our registered counters can be used for fine grained performance instrumentation
724        routeContext.setManagedInterceptStrategy(new InstrumentationInterceptStrategy(registeredCounters, wrappedProcessors));
725    }
726
727    /**
728     * Removes the wrapped processors for the given routes, as they are no longer in use.
729     * <p/>
730     * This is needed to avoid accumulating memory, if a lot of routes is being added and removed.
731     *
732     * @param routes the routes
733     */
734    private void removeWrappedProcessorsForRoutes(Collection<Route> routes) {
735        // loop the routes, and remove the route associated wrapped processors, as they are no longer in use
736        for (Route route : routes) {
737            String id = route.getId();
738
739            Iterator<KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> it = wrappedProcessors.values().iterator();
740            while (it.hasNext()) {
741                KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = it.next();
742                RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey());
743                if (def != null && id.equals(def.getId())) {
744                    it.remove();
745                }
746            }
747        }
748        
749    }
750
751    private void registerPerformanceCounters(RouteContext routeContext, ProcessorDefinition<?> processor,
752                                             Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters) {
753
754        // traverse children if any exists
755        List<ProcessorDefinition<?>> children = processor.getOutputs();
756        for (ProcessorDefinition<?> child : children) {
757            registerPerformanceCounters(routeContext, child, registeredCounters);
758        }
759
760        // skip processors that should not be registered
761        if (!registerProcessor(processor)) {
762            return;
763        }
764
765        // okay this is a processor we would like to manage so create the
766        // a delegate performance counter that acts as the placeholder in the interceptor
767        // that then delegates to the real mbean which we register later in the onServiceAdd method
768        DelegatePerformanceCounter pc = new DelegatePerformanceCounter();
769        // set statistics enabled depending on the option
770        boolean enabled = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isDefaultOrExtended();
771        pc.setStatisticsEnabled(enabled);
772
773        // and add it as a a registered counter that will be used lazy when Camel
774        // does the instrumentation of the route and adds the InstrumentationProcessor
775        // that does the actual performance metrics gatherings at runtime
776        registeredCounters.put(processor, pc);
777    }
778
779    /**
780     * Should the given processor be registered.
781     */
782    protected boolean registerProcessor(ProcessorDefinition<?> processor) {
783        // skip on exception
784        if (processor instanceof OnExceptionDefinition) {
785            return false;
786        }
787        // skip on completion
788        if (processor instanceof OnCompletionDefinition) {
789            return false;
790        }
791        // skip intercept
792        if (processor instanceof InterceptDefinition) {
793            return false;
794        }
795        // skip aop
796        if (processor instanceof AOPDefinition) {
797            return false;
798        }
799        // skip policy
800        if (processor instanceof PolicyDefinition) {
801            return false;
802        }
803
804        // only if custom id assigned
805        boolean only = getManagementStrategy().getManagementAgent().getOnlyRegisterProcessorWithCustomId() != null
806                && getManagementStrategy().getManagementAgent().getOnlyRegisterProcessorWithCustomId();
807        if (only) {
808            return processor.hasCustomIdAssigned();
809        }
810
811        // use customer filter
812        return getManagementStrategy().manageProcessor(processor);
813    }
814
815    private ManagementStrategy getManagementStrategy() {
816        ObjectHelper.notNull(camelContext, "CamelContext");
817        return camelContext.getManagementStrategy();
818    }
819
820    private ManagementObjectStrategy getManagementObjectStrategy() {
821        ObjectHelper.notNull(camelContext, "CamelContext");
822        return camelContext.getManagementStrategy().getManagementObjectStrategy();
823    }
824
825    /**
826     * Strategy for managing the object
827     *
828     * @param me the managed object
829     * @throws Exception is thrown if error registering the object for management
830     */
831    protected void manageObject(Object me) throws Exception {
832        getManagementStrategy().manageObject(me);
833        if (me instanceof TimerListener) {
834            TimerListener timer = (TimerListener) me;
835            loadTimer.addTimerListener(timer);
836        }
837    }
838
839    /**
840     * Un-manages the object.
841     *
842     * @param me the managed object
843     * @throws Exception is thrown if error unregistering the managed object
844     */
845    protected void unmanageObject(Object me) throws Exception {
846        if (me instanceof TimerListener) {
847            TimerListener timer = (TimerListener) me;
848            loadTimer.removeTimerListener(timer);
849        }
850        getManagementStrategy().unmanageObject(me);
851    }
852
853    /**
854     * Whether or not to register the mbean.
855     * <p/>
856     * The {@link ManagementAgent} has options which controls when to register.
857     * This allows us to only register mbeans accordingly. For example by default any
858     * dynamic endpoints is not registered. This avoids to register excessive mbeans, which
859     * most often is not desired.
860     *
861     * @param service the object to register
862     * @param route   an optional route the mbean is associated with, can be <tt>null</tt>
863     * @return <tt>true</tt> to register, <tt>false</tt> to skip registering
864     */
865    protected boolean shouldRegister(Object service, Route route) {
866        // the agent hasn't been started
867        if (!initialized) {
868            return false;
869        }
870
871        LOG.trace("Checking whether to register {} from route: {}", service, route);
872
873        ManagementAgent agent = getManagementStrategy().getManagementAgent();
874        if (agent == null) {
875            // do not register if no agent
876            return false;
877        }
878
879        // always register if we are starting CamelContext
880        if (getCamelContext().getStatus().isStarting()) {
881            return true;
882        }
883
884        // always register if we are setting up routes
885        if (getCamelContext().isSetupRoutes()) {
886            return true;
887        }
888
889        // register if always is enabled
890        if (agent.getRegisterAlways()) {
891            return true;
892        }
893
894        // is it a known route then always accept
895        if (route != null && knowRouteIds.contains(route.getId())) {
896            return true;
897        }
898
899        // only register if we are starting a new route, and current thread is in starting routes mode
900        if (agent.getRegisterNewRoutes()) {
901            // no specific route, then fallback to see if this thread is starting routes
902            // which is kept as state on the camel context
903            return getCamelContext().isStartingRoutes();
904        }
905
906        return false;
907    }
908
909    @Override
910    protected void doStart() throws Exception {
911        ObjectHelper.notNull(camelContext, "CamelContext");
912
913        // defer starting the timer manager until CamelContext has been fully started
914        camelContext.addStartupListener(loadTimerStartupListener);
915    }
916
917    private final class TimerListenerManagerStartupListener implements StartupListener {
918
919        @Override
920        public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception {
921            // we are disabled either if configured explicit, or if level is off
922            boolean load = camelContext.getManagementStrategy().getManagementAgent().getLoadStatisticsEnabled() != null
923                    && camelContext.getManagementStrategy().getManagementAgent().getLoadStatisticsEnabled();
924            boolean disabled = !load || camelContext.getManagementStrategy().getStatisticsLevel() == ManagementStatisticsLevel.Off;
925
926            LOG.debug("Load performance statistics {}", disabled ? "disabled" : "enabled");
927            if (!disabled) {
928                // must use 1 sec interval as the load statistics is based on 1 sec calculations
929                loadTimer.setInterval(1000);
930                // we have to defer enlisting timer lister manager as a service until CamelContext has been started
931                getCamelContext().addService(loadTimer);
932            }
933        }
934    }
935
936    @Override
937    protected void doStop() throws Exception {
938        initialized = false;
939        knowRouteIds.clear();
940        preServices.clear();
941        wrappedProcessors.clear();
942        managedTracers.clear();
943        managedBacklogTracers.clear();
944        managedBacklogDebuggers.clear();
945        managedThreadPools.clear();
946    }
947
948    /**
949     * Class which holds any pre registration details.
950     *
951     * @see org.apache.camel.management.DefaultManagementLifecycleStrategy#enlistPreRegisteredServices()
952     */
953    private static final class PreRegisterService {
954
955        private String name;
956        private Component component;
957        private Endpoint endpoint;
958        private CamelContext camelContext;
959        private Service service;
960        private Route route;
961
962        public void onComponentAdd(String name, Component component) {
963            this.name = name;
964            this.component = component;
965        }
966
967        public void onEndpointAdd(Endpoint endpoint) {
968            this.endpoint = endpoint;
969        }
970
971        public void onServiceAdd(CamelContext camelContext, Service service, Route route) {
972            this.camelContext = camelContext;
973            this.service = service;
974            this.route = route;
975        }
976
977        public String getName() {
978            return name;
979        }
980
981        public Component getComponent() {
982            return component;
983        }
984
985        public Endpoint getEndpoint() {
986            return endpoint;
987        }
988
989        public CamelContext getCamelContext() {
990            return camelContext;
991        }
992
993        public Service getService() {
994            return service;
995        }
996
997        public Route getRoute() {
998            return route;
999        }
1000    }
1001
1002}
1003