001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.management;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.ThreadPoolExecutor;
028
029import javax.management.JMException;
030import javax.management.MalformedObjectNameException;
031import javax.management.ObjectName;
032
033import org.apache.camel.CamelContext;
034import org.apache.camel.CamelContextAware;
035import org.apache.camel.Channel;
036import org.apache.camel.Component;
037import org.apache.camel.Consumer;
038import org.apache.camel.Endpoint;
039import org.apache.camel.ExtendedCamelContext;
040import org.apache.camel.ManagementStatisticsLevel;
041import org.apache.camel.NamedNode;
042import org.apache.camel.NonManagedService;
043import org.apache.camel.Processor;
044import org.apache.camel.Producer;
045import org.apache.camel.Route;
046import org.apache.camel.RuntimeCamelException;
047import org.apache.camel.Service;
048import org.apache.camel.StartupListener;
049import org.apache.camel.TimerListener;
050import org.apache.camel.VetoCamelContextStartException;
051import org.apache.camel.cluster.CamelClusterService;
052import org.apache.camel.health.HealthCheckRegistry;
053import org.apache.camel.impl.debugger.BacklogDebugger;
054import org.apache.camel.impl.debugger.BacklogTracer;
055import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager;
056import org.apache.camel.management.mbean.ManagedBacklogDebugger;
057import org.apache.camel.management.mbean.ManagedBacklogTracer;
058import org.apache.camel.management.mbean.ManagedBeanIntrospection;
059import org.apache.camel.management.mbean.ManagedCamelContext;
060import org.apache.camel.management.mbean.ManagedConsumerCache;
061import org.apache.camel.management.mbean.ManagedEndpoint;
062import org.apache.camel.management.mbean.ManagedEndpointRegistry;
063import org.apache.camel.management.mbean.ManagedInflightRepository;
064import org.apache.camel.management.mbean.ManagedProducerCache;
065import org.apache.camel.management.mbean.ManagedRestRegistry;
066import org.apache.camel.management.mbean.ManagedRoute;
067import org.apache.camel.management.mbean.ManagedRuntimeEndpointRegistry;
068import org.apache.camel.management.mbean.ManagedService;
069import org.apache.camel.management.mbean.ManagedStreamCachingStrategy;
070import org.apache.camel.management.mbean.ManagedThrottlingExceptionRoutePolicy;
071import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy;
072import org.apache.camel.management.mbean.ManagedTracer;
073import org.apache.camel.management.mbean.ManagedTransformerRegistry;
074import org.apache.camel.management.mbean.ManagedTypeConverterRegistry;
075import org.apache.camel.management.mbean.ManagedValidatorRegistry;
076import org.apache.camel.model.InterceptDefinition;
077import org.apache.camel.model.OnCompletionDefinition;
078import org.apache.camel.model.OnExceptionDefinition;
079import org.apache.camel.model.PolicyDefinition;
080import org.apache.camel.model.ProcessorDefinition;
081import org.apache.camel.model.ProcessorDefinitionHelper;
082import org.apache.camel.model.RouteDefinition;
083import org.apache.camel.spi.AsyncProcessorAwaitManager;
084import org.apache.camel.spi.BeanIntrospection;
085import org.apache.camel.spi.ConsumerCache;
086import org.apache.camel.spi.DataFormat;
087import org.apache.camel.spi.EndpointRegistry;
088import org.apache.camel.spi.EventNotifier;
089import org.apache.camel.spi.InflightRepository;
090import org.apache.camel.spi.InternalProcessor;
091import org.apache.camel.spi.LifecycleStrategy;
092import org.apache.camel.spi.ManagementAgent;
093import org.apache.camel.spi.ManagementInterceptStrategy.InstrumentationProcessor;
094import org.apache.camel.spi.ManagementNameStrategy;
095import org.apache.camel.spi.ManagementObjectStrategy;
096import org.apache.camel.spi.ManagementStrategy;
097import org.apache.camel.spi.ProducerCache;
098import org.apache.camel.spi.RestRegistry;
099import org.apache.camel.spi.RuntimeEndpointRegistry;
100import org.apache.camel.spi.StreamCachingStrategy;
101import org.apache.camel.spi.Tracer;
102import org.apache.camel.spi.TransformerRegistry;
103import org.apache.camel.spi.TypeConverterRegistry;
104import org.apache.camel.spi.UnitOfWork;
105import org.apache.camel.spi.ValidatorRegistry;
106import org.apache.camel.support.TimerListenerManager;
107import org.apache.camel.support.service.ServiceSupport;
108import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
109import org.apache.camel.throttling.ThrottlingInflightRoutePolicy;
110import org.apache.camel.util.KeyValueHolder;
111import org.apache.camel.util.ObjectHelper;
112import org.slf4j.Logger;
113import org.slf4j.LoggerFactory;
114
115/**
116 * Default JMX managed lifecycle strategy that registered objects using the configured
117 * {@link org.apache.camel.spi.ManagementStrategy}.
118 *
119 * @see org.apache.camel.spi.ManagementStrategy
120 */
121public class JmxManagementLifecycleStrategy extends ServiceSupport implements LifecycleStrategy, CamelContextAware {
122
123    private static final Logger LOG = LoggerFactory.getLogger(JmxManagementLifecycleStrategy.class);
124
125    // the wrapped processors is for performance counters, which are in use for the created routes
126    // when a route is removed, we should remove the associated processors from this map
127    private final Map<Processor, KeyValueHolder<NamedNode, InstrumentationProcessor>> wrappedProcessors = new HashMap<>();
128    private final List<java.util.function.Consumer<JmxManagementLifecycleStrategy>> preServices = new ArrayList<>();
129    private final TimerListenerManager loadTimer = new ManagedLoadTimer();
130    private final TimerListenerManagerStartupListener loadTimerStartupListener = new TimerListenerManagerStartupListener();
131    private volatile CamelContext camelContext;
132    private volatile ManagedCamelContext camelContextMBean;
133    private volatile boolean initialized;
134    private final Set<String> knowRouteIds = new HashSet<>();
135    private final Map<BacklogTracer, ManagedBacklogTracer> managedBacklogTracers = new HashMap<>();
136    private final Map<BacklogDebugger, ManagedBacklogDebugger> managedBacklogDebuggers = new HashMap<>();
137    private final Map<ThreadPoolExecutor, Object> managedThreadPools = new HashMap<>();
138
139    public JmxManagementLifecycleStrategy() {
140    }
141
142    public JmxManagementLifecycleStrategy(CamelContext camelContext) {
143        this.camelContext = camelContext;
144    }
145
146    // used for handing over pre-services between a provisional lifecycycle strategy
147    // and then later the actual strategy to be used when using XML
148    List<java.util.function.Consumer<JmxManagementLifecycleStrategy>> getPreServices() {
149        return preServices;
150    }
151
152    // used for handing over pre-services between a provisional lifecycycle strategy
153    // and then later the actual strategy to be used when using XML
154    void addPreService(java.util.function.Consumer<JmxManagementLifecycleStrategy> preService) {
155        preServices.add(preService);
156    }
157
158    @Override
159    public CamelContext getCamelContext() {
160        return camelContext;
161    }
162
163    @Override
164    public void setCamelContext(CamelContext camelContext) {
165        this.camelContext = camelContext;
166    }
167
168    @Override
169    public void onContextStarting(CamelContext context) throws VetoCamelContextStartException {
170        Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
171
172        String name = context.getName();
173        String managementName = context.getManagementName();
174
175        if (managementName == null) {
176            managementName = context.getManagementNameStrategy().getName();
177        }
178
179        try {
180            boolean done = false;
181            while (!done) {
182                ObjectName on = getManagementStrategy().getManagementObjectNameStrategy()
183                        .getObjectNameForCamelContext(managementName, name);
184                boolean exists = getManagementStrategy().isManagedName(on);
185                if (!exists) {
186                    done = true;
187                } else {
188                    // okay there exists already a CamelContext with this name, we can try to fix it by finding a free name
189                    boolean fixed = false;
190                    // if we use the default name strategy we can find a free name to use
191                    String newName = findFreeName(mc, context.getManagementNameStrategy(), name);
192                    if (newName != null) {
193                        // use this as the fixed name
194                        fixed = true;
195                        done = true;
196                        managementName = newName;
197                    }
198                    // we could not fix it so veto starting camel
199                    if (!fixed) {
200                        throw new VetoCamelContextStartException(
201                                "CamelContext (" + context.getName() + ") with ObjectName[" + on + "] is already registered."
202                                                                 + " Make sure to use unique names on CamelContext when using multiple CamelContexts in the same MBeanServer.",
203                                context);
204                    } else {
205                        LOG.warn("This CamelContext(" + context.getName() + ") will be registered using the name: "
206                                 + managementName
207                                 + " due to clash with an existing name already registered in MBeanServer.");
208                    }
209                }
210            }
211        } catch (VetoCamelContextStartException e) {
212            // rethrow veto
213            throw e;
214        } catch (Exception e) {
215            // must rethrow to allow CamelContext fallback to non JMX agent to allow
216            // Camel to continue to run
217            throw RuntimeCamelException.wrapRuntimeCamelException(e);
218        }
219
220        // set the name we are going to use
221        context.setManagementName(managementName);
222
223        // yes we made it and are initialized
224        initialized = true;
225
226        try {
227            manageObject(mc);
228        } catch (Exception e) {
229            // must rethrow to allow CamelContext fallback to non JMX agent to allow
230            // Camel to continue to run
231            throw RuntimeCamelException.wrapRuntimeCamelException(e);
232        }
233
234        if (mc instanceof ManagedCamelContext) {
235            camelContextMBean = (ManagedCamelContext) mc;
236        }
237
238        // register any pre registered now that we are initialized
239        enlistPreRegisteredServices();
240
241        // register health check if detected
242        HealthCheckRegistry hcr = context.getExtension(HealthCheckRegistry.class);
243        if (hcr != null) {
244            try {
245                Object me = getManagementObjectStrategy().getManagedObjectForCamelHealth(camelContext, hcr);
246                if (me == null) {
247                    // endpoint should not be managed
248                    return;
249                }
250                manageObject(me);
251            } catch (Exception e) {
252                LOG.warn("Could not register CamelHealth MBean. This exception will be ignored.", e);
253            }
254        }
255
256        try {
257            Object me = getManagementObjectStrategy().getManagedObjectForRouteController(camelContext,
258                    camelContext.getRouteController());
259            if (me == null) {
260                // endpoint should not be managed
261                return;
262            }
263            manageObject(me);
264        } catch (Exception e) {
265            LOG.warn("Could not register RouteController MBean. This exception will be ignored.", e);
266        }
267    }
268
269    private String findFreeName(Object mc, ManagementNameStrategy strategy, String name) throws MalformedObjectNameException {
270        // we cannot find a free name for fixed named strategies
271        if (strategy.isFixedName()) {
272            return null;
273        }
274
275        // okay try to find a free name
276        boolean done = false;
277        String newName = null;
278        while (!done) {
279            // compute the next name
280            newName = strategy.getNextName();
281            ObjectName on
282                    = getManagementStrategy().getManagementObjectNameStrategy().getObjectNameForCamelContext(newName, name);
283            done = !getManagementStrategy().isManagedName(on);
284            if (LOG.isTraceEnabled()) {
285                LOG.trace("Using name: {} in ObjectName[{}] exists? {}", name, on, done);
286            }
287        }
288        return newName;
289    }
290
291    /**
292     * After {@link CamelContext} has been enlisted in JMX using {@link #onContextStart(org.apache.camel.CamelContext)}
293     * then we can enlist any pre registered services as well, as we had to wait for {@link CamelContext} to be enlisted
294     * first.
295     * <p/>
296     * A component/endpoint/service etc. can be pre registered when using dependency injection and annotations such as
297     * {@link org.apache.camel.Produce}, {@link org.apache.camel.EndpointInject}. Therefore we need to capture those
298     * registrations up front, and then afterwards enlist in JMX when {@link CamelContext} is being started.
299     */
300    private void enlistPreRegisteredServices() {
301        if (preServices.isEmpty()) {
302            return;
303        }
304
305        LOG.debug("Registering {} pre registered services", preServices.size());
306        for (java.util.function.Consumer<JmxManagementLifecycleStrategy> pre : preServices) {
307            pre.accept(this);
308        }
309
310        // we are done so clear the list
311        preServices.clear();
312    }
313
314    @Override
315    public void onContextStopped(CamelContext context) {
316        // the agent hasn't been started
317        if (!initialized) {
318            return;
319        }
320
321        try {
322            Object mc = getManagementObjectStrategy().getManagedObjectForRouteController(context, context.getRouteController());
323            // the context could have been removed already
324            if (getManagementStrategy().isManaged(mc)) {
325                unmanageObject(mc);
326            }
327        } catch (Exception e) {
328            LOG.warn("Could not unregister RouteController MBean", e);
329        }
330
331        try {
332            Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
333            // the context could have been removed already
334            if (getManagementStrategy().isManaged(mc)) {
335                unmanageObject(mc);
336            }
337        } catch (Exception e) {
338            LOG.warn("Could not unregister CamelContext MBean", e);
339        }
340
341        HealthCheckRegistry hcr = context.getExtension(HealthCheckRegistry.class);
342        if (hcr != null) {
343            try {
344                Object mc = getManagementObjectStrategy().getManagedObjectForCamelHealth(context, hcr);
345                // the context could have been removed already
346                if (getManagementStrategy().isManaged(mc)) {
347                    unmanageObject(mc);
348                }
349            } catch (Exception e) {
350                LOG.warn("Could not unregister CamelHealth MBean", e);
351            }
352        }
353
354        camelContextMBean = null;
355    }
356
357    @Override
358    public void onComponentAdd(String name, Component component) {
359        // always register components as there are only a few of those
360        if (!initialized) {
361            // pre register so we can register later when we have been initialized
362            preServices.add(lf -> lf.onComponentAdd(name, component));
363            return;
364        }
365        try {
366            Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name);
367            manageObject(mc);
368        } catch (Exception e) {
369            LOG.warn("Could not register Component MBean", e);
370        }
371    }
372
373    @Override
374    public void onComponentRemove(String name, Component component) {
375        // the agent hasn't been started
376        if (!initialized) {
377            return;
378        }
379        try {
380            Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name);
381            unmanageObject(mc);
382        } catch (Exception e) {
383            LOG.warn("Could not unregister Component MBean", e);
384        }
385    }
386
387    /**
388     * If the endpoint is an instance of ManagedResource then register it with the mbean server, if it is not then wrap
389     * the endpoint in a {@link ManagedEndpoint} and register that with the mbean server.
390     *
391     * @param endpoint the Endpoint attempted to be added
392     */
393    @Override
394    public void onEndpointAdd(Endpoint endpoint) {
395        if (!initialized) {
396            // pre register so we can register later when we have been initialized
397            preServices.add(lf -> lf.onEndpointAdd(endpoint));
398            return;
399        }
400
401        if (!shouldRegister(endpoint, null)) {
402            // avoid registering if not needed
403            return;
404        }
405
406        try {
407            Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint);
408            if (me == null) {
409                // endpoint should not be managed
410                return;
411            }
412            manageObject(me);
413        } catch (Exception e) {
414            LOG.warn("Could not register Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e);
415        }
416    }
417
418    @Override
419    public void onEndpointRemove(Endpoint endpoint) {
420        // the agent hasn't been started
421        if (!initialized) {
422            return;
423        }
424
425        try {
426            Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint);
427            unmanageObject(me);
428        } catch (Exception e) {
429            LOG.warn("Could not unregister Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e);
430        }
431    }
432
433    @Override
434    public void onServiceAdd(CamelContext context, Service service, Route route) {
435        if (!initialized) {
436            // pre register so we can register later when we have been initialized
437            preServices.add(lf -> lf.onServiceAdd(camelContext, service, route));
438            return;
439        }
440
441        // services can by any kind of misc type but also processors
442        // so we have special logic when its a processor
443
444        if (!shouldRegister(service, route)) {
445            // avoid registering if not needed
446            return;
447        }
448
449        Object managedObject = getManagedObjectForService(context, service, route);
450        if (managedObject == null) {
451            // service should not be managed
452            return;
453        }
454
455        // skip already managed services, for example if a route has been restarted
456        if (getManagementStrategy().isManaged(managedObject)) {
457            LOG.trace("The service is already managed: {}", service);
458            return;
459        }
460
461        try {
462            manageObject(managedObject);
463        } catch (Exception e) {
464            LOG.warn("Could not register service: " + service + " as Service MBean.", e);
465        }
466    }
467
468    @Override
469    public void onServiceRemove(CamelContext context, Service service, Route route) {
470        // the agent hasn't been started
471        if (!initialized) {
472            return;
473        }
474
475        Object managedObject = getManagedObjectForService(context, service, route);
476        if (managedObject != null) {
477            try {
478                unmanageObject(managedObject);
479            } catch (Exception e) {
480                LOG.warn("Could not unregister service: " + service + " as Service MBean.", e);
481            }
482        }
483    }
484
485    @SuppressWarnings("unchecked")
486    private Object getManagedObjectForService(CamelContext context, Service service, Route route) {
487        // skip channel, UoW and dont double wrap instrumentation
488        if (service instanceof Channel || service instanceof UnitOfWork || service instanceof InstrumentationProcessor) {
489            return null;
490        }
491
492        // skip non managed services
493        if (service instanceof NonManagedService) {
494            return null;
495        }
496
497        Object answer = null;
498
499        if (service instanceof BacklogTracer) {
500            // special for backlog tracer
501            BacklogTracer backlogTracer = (BacklogTracer) service;
502            ManagedBacklogTracer mt = managedBacklogTracers.get(backlogTracer);
503            if (mt == null) {
504                mt = new ManagedBacklogTracer(context, backlogTracer);
505                mt.init(getManagementStrategy());
506                managedBacklogTracers.put(backlogTracer, mt);
507            }
508            return mt;
509        } else if (service instanceof BacklogDebugger) {
510            // special for backlog debugger
511            BacklogDebugger backlogDebugger = (BacklogDebugger) service;
512            ManagedBacklogDebugger md = managedBacklogDebuggers.get(backlogDebugger);
513            if (md == null) {
514                md = new ManagedBacklogDebugger(context, backlogDebugger);
515                md.init(getManagementStrategy());
516                managedBacklogDebuggers.put(backlogDebugger, md);
517            }
518            return md;
519        } else if (service instanceof Tracer) {
520            ManagedTracer mt = new ManagedTracer(camelContext, (Tracer) service);
521            mt.init(getManagementStrategy());
522            answer = mt;
523        } else if (service instanceof DataFormat) {
524            answer = getManagementObjectStrategy().getManagedObjectForDataFormat(context, (DataFormat) service);
525        } else if (service instanceof Producer) {
526            answer = getManagementObjectStrategy().getManagedObjectForProducer(context, (Producer) service);
527        } else if (service instanceof Consumer) {
528            answer = getManagementObjectStrategy().getManagedObjectForConsumer(context, (Consumer) service);
529        } else if (service instanceof Processor) {
530            // special for processors as we need to do some extra work
531            return getManagedObjectForProcessor(context, (Processor) service, route);
532        } else if (service instanceof ThrottlingInflightRoutePolicy) {
533            answer = new ManagedThrottlingInflightRoutePolicy(context, (ThrottlingInflightRoutePolicy) service);
534        } else if (service instanceof ThrottlingExceptionRoutePolicy) {
535            answer = new ManagedThrottlingExceptionRoutePolicy(context, (ThrottlingExceptionRoutePolicy) service);
536        } else if (service instanceof ConsumerCache) {
537            answer = new ManagedConsumerCache(context, (ConsumerCache) service);
538        } else if (service instanceof ProducerCache) {
539            answer = new ManagedProducerCache(context, (ProducerCache) service);
540        } else if (service instanceof EndpointRegistry) {
541            answer = new ManagedEndpointRegistry(context, (EndpointRegistry) service);
542        } else if (service instanceof BeanIntrospection) {
543            answer = new ManagedBeanIntrospection(context, (BeanIntrospection) service);
544        } else if (service instanceof TypeConverterRegistry) {
545            answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service);
546        } else if (service instanceof RestRegistry) {
547            answer = new ManagedRestRegistry(context, (RestRegistry) service);
548        } else if (service instanceof InflightRepository) {
549            answer = new ManagedInflightRepository(context, (InflightRepository) service);
550        } else if (service instanceof AsyncProcessorAwaitManager) {
551            answer = new ManagedAsyncProcessorAwaitManager(context, (AsyncProcessorAwaitManager) service);
552        } else if (service instanceof RuntimeEndpointRegistry) {
553            answer = new ManagedRuntimeEndpointRegistry(context, (RuntimeEndpointRegistry) service);
554        } else if (service instanceof StreamCachingStrategy) {
555            answer = new ManagedStreamCachingStrategy(context, (StreamCachingStrategy) service);
556        } else if (service instanceof EventNotifier) {
557            answer = getManagementObjectStrategy().getManagedObjectForEventNotifier(context, (EventNotifier) service);
558        } else if (service instanceof TransformerRegistry) {
559            answer = new ManagedTransformerRegistry(context, (TransformerRegistry) service);
560        } else if (service instanceof ValidatorRegistry) {
561            answer = new ManagedValidatorRegistry(context, (ValidatorRegistry) service);
562        } else if (service instanceof CamelClusterService) {
563            answer = getManagementObjectStrategy().getManagedObjectForClusterService(context, (CamelClusterService) service);
564        } else if (service != null) {
565            // fallback as generic service
566            answer = getManagementObjectStrategy().getManagedObjectForService(context, service);
567        }
568
569        if (answer instanceof ManagedService) {
570            ManagedService ms = (ManagedService) answer;
571            ms.setRoute(route);
572            ms.init(getManagementStrategy());
573        }
574
575        return answer;
576    }
577
578    private Object getManagedObjectForProcessor(CamelContext context, Processor processor, Route route) {
579        // a bit of magic here as the processors we want to manage have already been registered
580        // in the wrapped processors map when Camel have instrumented the route on route initialization
581        // so the idea is now to only manage the processors from the map
582        KeyValueHolder<NamedNode, InstrumentationProcessor> holder = wrappedProcessors.get(processor);
583        if (holder == null) {
584            // skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc.
585            return null;
586        }
587
588        // get the managed object as it can be a specialized type such as a Delayer/Throttler etc.
589        Object managedObject
590                = getManagementObjectStrategy().getManagedObjectForProcessor(context, processor, holder.getKey(), route);
591        // only manage if we have a name for it as otherwise we do not want to manage it anyway
592        if (managedObject != null) {
593            // is it a performance counter then we need to set our counter
594            if (managedObject instanceof PerformanceCounter) {
595                InstrumentationProcessor counter = holder.getValue();
596                if (counter != null) {
597                    // change counter to us
598                    counter.setCounter(managedObject);
599                }
600            }
601        }
602
603        return managedObject;
604    }
605
606    @Override
607    public void onRoutesAdd(Collection<Route> routes) {
608        for (Route route : routes) {
609
610            // if we are starting CamelContext or either of the two options has been
611            // enabled, then enlist the route as a known route
612            if (getCamelContext().getStatus().isStarting()
613                    || getManagementStrategy().getManagementAgent().getRegisterAlways()
614                    || getManagementStrategy().getManagementAgent().getRegisterNewRoutes()) {
615                // register as known route id
616                knowRouteIds.add(route.getId());
617            }
618
619            if (!shouldRegister(route, route)) {
620                // avoid registering if not needed, skip to next route
621                continue;
622            }
623
624            Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route);
625
626            // skip already managed routes, for example if the route has been restarted
627            if (getManagementStrategy().isManaged(mr)) {
628                LOG.trace("The route is already managed: {}", route);
629                continue;
630            }
631
632            // get the wrapped instrumentation processor from this route
633            // and set me as the counter
634            Processor processor = route.getProcessor();
635            if (processor instanceof InternalProcessor && mr instanceof ManagedRoute) {
636                InternalProcessor internal = (InternalProcessor) processor;
637                ManagedRoute routeMBean = (ManagedRoute) mr;
638
639                DefaultInstrumentationProcessor task = internal.getAdvice(DefaultInstrumentationProcessor.class);
640                if (task != null) {
641                    // we need to wrap the counter with the camel context so we get stats updated on the context as well
642                    if (camelContextMBean != null) {
643                        CompositePerformanceCounter wrapper = new CompositePerformanceCounter(routeMBean, camelContextMBean);
644                        task.setCounter(wrapper);
645                    } else {
646                        task.setCounter(routeMBean);
647                    }
648                }
649            }
650
651            try {
652                manageObject(mr);
653            } catch (JMException e) {
654                LOG.warn("Could not register Route MBean", e);
655            } catch (Exception e) {
656                LOG.warn("Could not create Route MBean", e);
657            }
658        }
659    }
660
661    @Override
662    public void onRoutesRemove(Collection<Route> routes) {
663        // the agent hasn't been started
664        if (!initialized) {
665            return;
666        }
667
668        for (Route route : routes) {
669            Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route);
670
671            // skip unmanaged routes
672            if (!getManagementStrategy().isManaged(mr)) {
673                LOG.trace("The route is not managed: {}", route);
674                continue;
675            }
676
677            try {
678                unmanageObject(mr);
679            } catch (Exception e) {
680                LOG.warn("Could not unregister Route MBean", e);
681            }
682
683            // remove from known routes ids, as the route has been removed
684            knowRouteIds.remove(route.getId());
685        }
686
687        // after the routes has been removed, we should clear the wrapped processors as we no longer need them
688        // as they were just a provisional map used during creation of routes
689        removeWrappedProcessorsForRoutes(routes);
690    }
691
692    @Override
693    public void onThreadPoolAdd(
694            CamelContext camelContext, ThreadPoolExecutor threadPool, String id,
695            String sourceId, String routeId, String threadPoolProfileId) {
696
697        if (!initialized) {
698            // pre register so we can register later when we have been initialized
699            preServices.add(lf -> lf.onThreadPoolAdd(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId));
700            return;
701        }
702
703        if (!shouldRegister(threadPool, null)) {
704            // avoid registering if not needed
705            return;
706        }
707
708        Object mtp = getManagementObjectStrategy().getManagedObjectForThreadPool(camelContext, threadPool, id, sourceId,
709                routeId, threadPoolProfileId);
710
711        // skip already managed services, for example if a route has been restarted
712        if (getManagementStrategy().isManaged(mtp)) {
713            LOG.trace("The thread pool is already managed: {}", threadPool);
714            return;
715        }
716
717        try {
718            manageObject(mtp);
719            // store a reference so we can unmanage from JMX when the thread pool is removed
720            // we need to keep track here, as we cannot re-construct the thread pool ObjectName when removing the thread pool
721            managedThreadPools.put(threadPool, mtp);
722        } catch (Exception e) {
723            LOG.warn("Could not register thread pool: " + threadPool + " as ThreadPool MBean.", e);
724        }
725    }
726
727    @Override
728    public void onThreadPoolRemove(CamelContext camelContext, ThreadPoolExecutor threadPool) {
729        if (!initialized) {
730            return;
731        }
732
733        // lookup the thread pool and remove it from JMX
734        Object mtp = managedThreadPools.remove(threadPool);
735        if (mtp != null) {
736            // skip unmanaged routes
737            if (!getManagementStrategy().isManaged(mtp)) {
738                LOG.trace("The thread pool is not managed: {}", threadPool);
739                return;
740            }
741
742            try {
743                unmanageObject(mtp);
744            } catch (Exception e) {
745                LOG.warn("Could not unregister ThreadPool MBean", e);
746            }
747        }
748    }
749
750    @Override
751    public void onRouteContextCreate(Route route) {
752        // Create a map (ProcessorType -> PerformanceCounter)
753        // to be passed to InstrumentationInterceptStrategy.
754        Map<NamedNode, PerformanceCounter> registeredCounters = new HashMap<>();
755
756        // Each processor in a route will have its own performance counter.
757        // These performance counter will be embedded to InstrumentationProcessor
758        // and wrap the appropriate processor by InstrumentationInterceptStrategy.
759        RouteDefinition routeDefinition = (RouteDefinition) route.getRoute();
760
761        // register performance counters for all processors and its children
762        for (ProcessorDefinition<?> processor : routeDefinition.getOutputs()) {
763            registerPerformanceCounters(route, processor, registeredCounters);
764        }
765
766        // set this managed intercept strategy that executes the JMX instrumentation for performance metrics
767        // so our registered counters can be used for fine grained performance instrumentation
768        route.setManagementInterceptStrategy(new InstrumentationInterceptStrategy(registeredCounters, wrappedProcessors));
769    }
770
771    /**
772     * Removes the wrapped processors for the given routes, as they are no longer in use.
773     * <p/>
774     * This is needed to avoid accumulating memory, if a lot of routes is being added and removed.
775     *
776     * @param routes the routes
777     */
778    private void removeWrappedProcessorsForRoutes(Collection<Route> routes) {
779        // loop the routes, and remove the route associated wrapped processors, as they are no longer in use
780        for (Route route : routes) {
781            String id = route.getId();
782
783            Iterator<KeyValueHolder<NamedNode, InstrumentationProcessor>> it = wrappedProcessors.values().iterator();
784            while (it.hasNext()) {
785                KeyValueHolder<NamedNode, InstrumentationProcessor> holder = it.next();
786                RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey());
787                if (def != null && id.equals(def.getId())) {
788                    it.remove();
789                }
790            }
791        }
792
793    }
794
795    private void registerPerformanceCounters(
796            Route route, ProcessorDefinition<?> processor,
797            Map<NamedNode, PerformanceCounter> registeredCounters) {
798
799        // traverse children if any exists
800        List<ProcessorDefinition<?>> children = processor.getOutputs();
801        for (ProcessorDefinition<?> child : children) {
802            registerPerformanceCounters(route, child, registeredCounters);
803        }
804
805        // skip processors that should not be registered
806        if (!registerProcessor(processor)) {
807            return;
808        }
809
810        // okay this is a processor we would like to manage so create the
811        // a delegate performance counter that acts as the placeholder in the interceptor
812        // that then delegates to the real mbean which we register later in the onServiceAdd method
813        DelegatePerformanceCounter pc = new DelegatePerformanceCounter();
814        // set statistics enabled depending on the option
815        boolean enabled = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isDefaultOrExtended();
816        pc.setStatisticsEnabled(enabled);
817
818        // and add it as a a registered counter that will be used lazy when Camel
819        // does the instrumentation of the route and adds the InstrumentationProcessor
820        // that does the actual performance metrics gatherings at runtime
821        registeredCounters.put(processor, pc);
822    }
823
824    /**
825     * Should the given processor be registered.
826     */
827    protected boolean registerProcessor(ProcessorDefinition<?> processor) {
828        // skip on exception
829        if (processor instanceof OnExceptionDefinition) {
830            return false;
831        }
832        // skip on completion
833        if (processor instanceof OnCompletionDefinition) {
834            return false;
835        }
836        // skip intercept
837        if (processor instanceof InterceptDefinition) {
838            return false;
839        }
840        // skip policy
841        if (processor instanceof PolicyDefinition) {
842            return false;
843        }
844
845        // only if custom id assigned
846        boolean only = getManagementStrategy().getManagementAgent().getOnlyRegisterProcessorWithCustomId() != null
847                && getManagementStrategy().getManagementAgent().getOnlyRegisterProcessorWithCustomId();
848        if (only) {
849            return processor.hasCustomIdAssigned();
850        }
851
852        // use customer filter
853        return getManagementStrategy().manageProcessor(processor);
854    }
855
856    private ManagementStrategy getManagementStrategy() {
857        ObjectHelper.notNull(camelContext, "CamelContext");
858        return camelContext.getManagementStrategy();
859    }
860
861    private ManagementObjectStrategy getManagementObjectStrategy() {
862        ObjectHelper.notNull(camelContext, "CamelContext");
863        return camelContext.getManagementStrategy().getManagementObjectStrategy();
864    }
865
866    /**
867     * Strategy for managing the object
868     *
869     * @param  me        the managed object
870     * @throws Exception is thrown if error registering the object for management
871     */
872    protected void manageObject(Object me) throws Exception {
873        getManagementStrategy().manageObject(me);
874        if (me instanceof TimerListener) {
875            TimerListener timer = (TimerListener) me;
876            loadTimer.addTimerListener(timer);
877        }
878    }
879
880    /**
881     * Un-manages the object.
882     *
883     * @param  me        the managed object
884     * @throws Exception is thrown if error unregistering the managed object
885     */
886    protected void unmanageObject(Object me) throws Exception {
887        if (me instanceof TimerListener) {
888            TimerListener timer = (TimerListener) me;
889            loadTimer.removeTimerListener(timer);
890        }
891        getManagementStrategy().unmanageObject(me);
892    }
893
894    /**
895     * Whether or not to register the mbean.
896     * <p/>
897     * The {@link ManagementAgent} has options which controls when to register. This allows us to only register mbeans
898     * accordingly. For example by default any dynamic endpoints is not registered. This avoids to register excessive
899     * mbeans, which most often is not desired.
900     *
901     * @param  service the object to register
902     * @param  route   an optional route the mbean is associated with, can be <tt>null</tt>
903     * @return         <tt>true</tt> to register, <tt>false</tt> to skip registering
904     */
905    protected boolean shouldRegister(Object service, Route route) {
906        // the agent hasn't been started
907        if (!initialized) {
908            return false;
909        }
910
911        LOG.trace("Checking whether to register {} from route: {}", service, route);
912
913        ManagementAgent agent = getManagementStrategy().getManagementAgent();
914        if (agent == null) {
915            // do not register if no agent
916            return false;
917        }
918
919        // always register if we are starting CamelContext
920        if (getCamelContext().getStatus().isStarting()
921                || getCamelContext().getStatus().isInitializing()) {
922            return true;
923        }
924
925        // always register if we are setting up routes
926        if (getCamelContext().adapt(ExtendedCamelContext.class).isSetupRoutes()) {
927            return true;
928        }
929
930        // register if always is enabled
931        if (agent.getRegisterAlways()) {
932            return true;
933        }
934
935        // is it a known route then always accept
936        if (route != null && knowRouteIds.contains(route.getId())) {
937            return true;
938        }
939
940        // only register if we are starting a new route, and current thread is in starting routes mode
941        if (agent.getRegisterNewRoutes()) {
942            // no specific route, then fallback to see if this thread is starting routes
943            // which is kept as state on the camel context
944            return getCamelContext().getRouteController().isStartingRoutes();
945        }
946
947        return false;
948    }
949
950    @Override
951    protected void doStart() throws Exception {
952        ObjectHelper.notNull(camelContext, "CamelContext");
953
954        // defer starting the timer manager until CamelContext has been fully started
955        camelContext.addStartupListener(loadTimerStartupListener);
956    }
957
958    private final class TimerListenerManagerStartupListener implements StartupListener {
959
960        @Override
961        public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception {
962            // we are disabled either if configured explicit, or if level is off
963            boolean load = camelContext.getManagementStrategy().getManagementAgent().getLoadStatisticsEnabled() != null
964                    && camelContext.getManagementStrategy().getManagementAgent().getLoadStatisticsEnabled();
965            boolean disabled = !load || camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel()
966                                        == ManagementStatisticsLevel.Off;
967
968            LOG.debug("Load performance statistics {}", disabled ? "disabled" : "enabled");
969            if (!disabled) {
970                // must use 1 sec interval as the load statistics is based on 1 sec calculations
971                loadTimer.setInterval(1000);
972                // we have to defer enlisting timer lister manager as a service until CamelContext has been started
973                getCamelContext().addService(loadTimer);
974            }
975        }
976    }
977
978    @Override
979    protected void doStop() throws Exception {
980        initialized = false;
981        knowRouteIds.clear();
982        preServices.clear();
983        wrappedProcessors.clear();
984        managedBacklogTracers.clear();
985        managedBacklogDebuggers.clear();
986        managedThreadPools.clear();
987    }
988
989    /**
990     * Class which holds any pre registration details.
991     *
992     * @see JmxManagementLifecycleStrategy#enlistPreRegisteredServices()
993     */
994    static final class PreRegisterService {
995
996        private String name;
997        private Component component;
998        private Endpoint endpoint;
999        private CamelContext camelContext;
1000        private Service service;
1001        private Route route;
1002        private java.util.function.Consumer<JmxManagementLifecycleStrategy> runnable;
1003
1004        public PreRegisterService() {
1005        }
1006
1007        public PreRegisterService(java.util.function.Consumer<JmxManagementLifecycleStrategy> runnable) {
1008            this.runnable = runnable;
1009        }
1010
1011        public void onComponentAdd(String name, Component component) {
1012            this.name = name;
1013            this.component = component;
1014        }
1015
1016        public void onEndpointAdd(Endpoint endpoint) {
1017            this.endpoint = endpoint;
1018        }
1019
1020        public void onServiceAdd(CamelContext camelContext, Service service, Route route) {
1021            this.camelContext = camelContext;
1022            this.service = service;
1023            this.route = route;
1024        }
1025
1026        public String getName() {
1027            return name;
1028        }
1029
1030        public Component getComponent() {
1031            return component;
1032        }
1033
1034        public Endpoint getEndpoint() {
1035            return endpoint;
1036        }
1037
1038        public CamelContext getCamelContext() {
1039            return camelContext;
1040        }
1041
1042        public Service getService() {
1043            return service;
1044        }
1045
1046        public Route getRoute() {
1047            return route;
1048        }
1049
1050        public java.util.function.Consumer<JmxManagementLifecycleStrategy> getRunnable() {
1051            return runnable;
1052        }
1053
1054    }
1055
1056}