001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.management;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.ThreadPoolExecutor;
028
029import javax.management.JMException;
030import javax.management.MalformedObjectNameException;
031import javax.management.ObjectName;
032
033import org.apache.camel.CamelContext;
034import org.apache.camel.CamelContextAware;
035import org.apache.camel.Channel;
036import org.apache.camel.Component;
037import org.apache.camel.Consumer;
038import org.apache.camel.Endpoint;
039import org.apache.camel.ErrorHandlerFactory;
040import org.apache.camel.ManagementStatisticsLevel;
041import org.apache.camel.NonManagedService;
042import org.apache.camel.Processor;
043import org.apache.camel.Producer;
044import org.apache.camel.Route;
045import org.apache.camel.Service;
046import org.apache.camel.StartupListener;
047import org.apache.camel.TimerListener;
048import org.apache.camel.VetoCamelContextStartException;
049import org.apache.camel.api.management.PerformanceCounter;
050import org.apache.camel.cluster.CamelClusterService;
051import org.apache.camel.impl.ConsumerCache;
052import org.apache.camel.impl.DefaultCamelContext;
053import org.apache.camel.impl.DefaultEndpointRegistry;
054import org.apache.camel.impl.EventDrivenConsumerRoute;
055import org.apache.camel.impl.ProducerCache;
056import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
057import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
058import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager;
059import org.apache.camel.management.mbean.ManagedBacklogDebugger;
060import org.apache.camel.management.mbean.ManagedBacklogTracer;
061import org.apache.camel.management.mbean.ManagedCamelContext;
062import org.apache.camel.management.mbean.ManagedConsumerCache;
063import org.apache.camel.management.mbean.ManagedEndpoint;
064import org.apache.camel.management.mbean.ManagedEndpointRegistry;
065import org.apache.camel.management.mbean.ManagedInflightRepository;
066import org.apache.camel.management.mbean.ManagedProducerCache;
067import org.apache.camel.management.mbean.ManagedRestRegistry;
068import org.apache.camel.management.mbean.ManagedRoute;
069import org.apache.camel.management.mbean.ManagedRuntimeCamelCatalog;
070import org.apache.camel.management.mbean.ManagedRuntimeEndpointRegistry;
071import org.apache.camel.management.mbean.ManagedService;
072import org.apache.camel.management.mbean.ManagedStreamCachingStrategy;
073import org.apache.camel.management.mbean.ManagedThrottlingExceptionRoutePolicy;
074import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy;
075import org.apache.camel.management.mbean.ManagedTracer;
076import org.apache.camel.management.mbean.ManagedTransformerRegistry;
077import org.apache.camel.management.mbean.ManagedTypeConverterRegistry;
078import org.apache.camel.management.mbean.ManagedValidatorRegistry;
079import org.apache.camel.model.AOPDefinition;
080import org.apache.camel.model.InterceptDefinition;
081import org.apache.camel.model.OnCompletionDefinition;
082import org.apache.camel.model.OnExceptionDefinition;
083import org.apache.camel.model.PolicyDefinition;
084import org.apache.camel.model.ProcessorDefinition;
085import org.apache.camel.model.ProcessorDefinitionHelper;
086import org.apache.camel.model.RouteDefinition;
087import org.apache.camel.processor.CamelInternalProcessor;
088import org.apache.camel.processor.interceptor.BacklogDebugger;
089import org.apache.camel.processor.interceptor.BacklogTracer;
090import org.apache.camel.processor.interceptor.Tracer;
091import org.apache.camel.runtimecatalog.RuntimeCamelCatalog;
092import org.apache.camel.spi.AsyncProcessorAwaitManager;
093import org.apache.camel.spi.DataFormat;
094import org.apache.camel.spi.EventNotifier;
095import org.apache.camel.spi.InflightRepository;
096import org.apache.camel.spi.LifecycleStrategy;
097import org.apache.camel.spi.ManagementAgent;
098import org.apache.camel.spi.ManagementAware;
099import org.apache.camel.spi.ManagementNameStrategy;
100import org.apache.camel.spi.ManagementObjectStrategy;
101import org.apache.camel.spi.ManagementStrategy;
102import org.apache.camel.spi.RestRegistry;
103import org.apache.camel.spi.RouteContext;
104import org.apache.camel.spi.RuntimeEndpointRegistry;
105import org.apache.camel.spi.StreamCachingStrategy;
106import org.apache.camel.spi.TransformerRegistry;
107import org.apache.camel.spi.TypeConverterRegistry;
108import org.apache.camel.spi.UnitOfWork;
109import org.apache.camel.spi.ValidatorRegistry;
110import org.apache.camel.support.ServiceSupport;
111import org.apache.camel.support.TimerListenerManager;
112import org.apache.camel.util.KeyValueHolder;
113import org.apache.camel.util.ObjectHelper;
114import org.slf4j.Logger;
115import org.slf4j.LoggerFactory;
116
117/**
118 * Default JMX managed lifecycle strategy that registered objects using the configured
119 * {@link org.apache.camel.spi.ManagementStrategy}.
120 *
121 * @see org.apache.camel.spi.ManagementStrategy
122 * @version 
123 */
124@SuppressWarnings("deprecation")
125public class DefaultManagementLifecycleStrategy extends ServiceSupport implements LifecycleStrategy, CamelContextAware {
126
127    private static final Logger LOG = LoggerFactory.getLogger(DefaultManagementLifecycleStrategy.class);
128    // the wrapped processors is for performance counters, which are in use for the created routes
129    // when a route is removed, we should remove the associated processors from this map
130    private final Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors = new HashMap<>();
131    private final List<PreRegisterService> preServices = new ArrayList<>();
132    private final TimerListenerManager loadTimer = new ManagedLoadTimer();
133    private final TimerListenerManagerStartupListener loadTimerStartupListener = new TimerListenerManagerStartupListener();
134    private volatile CamelContext camelContext;
135    private volatile ManagedCamelContext camelContextMBean;
136    private volatile boolean initialized;
137    private final Set<String> knowRouteIds = new HashSet<>();
138    private final Map<Tracer, ManagedTracer> managedTracers = new HashMap<>();
139    private final Map<BacklogTracer, ManagedBacklogTracer> managedBacklogTracers = new HashMap<>();
140    private final Map<BacklogDebugger, ManagedBacklogDebugger> managedBacklogDebuggers = new HashMap<>();
141    private final Map<ThreadPoolExecutor, Object> managedThreadPools = new HashMap<>();
142
143    public DefaultManagementLifecycleStrategy() {
144    }
145
146    public DefaultManagementLifecycleStrategy(CamelContext camelContext) {
147        this.camelContext = camelContext;
148    }
149
150    public CamelContext getCamelContext() {
151        return camelContext;
152    }
153
154    public void setCamelContext(CamelContext camelContext) {
155        this.camelContext = camelContext;
156    }
157
158    public void onContextStart(CamelContext context) throws VetoCamelContextStartException {
159        Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
160
161        String name = context.getName();
162        String managementName = context.getManagementNameStrategy().getName();
163
164        try {
165            boolean done = false;
166            while (!done) {
167                ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(managementName, name);
168                boolean exists = getManagementStrategy().isManaged(mc, on);
169                if (!exists) {
170                    done = true;
171                } else {
172                    // okay there exists already a CamelContext with this name, we can try to fix it by finding a free name
173                    boolean fixed = false;
174                    // if we use the default name strategy we can find a free name to use
175                    String newName = findFreeName(mc, context.getManagementNameStrategy(), name);
176                    if (newName != null) {
177                        // use this as the fixed name
178                        fixed = true;
179                        done = true;
180                        managementName = newName;
181                    }
182                    // we could not fix it so veto starting camel
183                    if (!fixed) {
184                        throw new VetoCamelContextStartException("CamelContext (" + context.getName() + ") with ObjectName[" + on + "] is already registered."
185                            + " Make sure to use unique names on CamelContext when using multiple CamelContexts in the same MBeanServer.", context);
186                    } else {
187                        LOG.warn("This CamelContext(" + context.getName() + ") will be registered using the name: " + managementName
188                            + " due to clash with an existing name already registered in MBeanServer.");
189                    }
190                }
191            }
192        } catch (VetoCamelContextStartException e) {
193            // rethrow veto
194            throw e;
195        } catch (Exception e) {
196            // must rethrow to allow CamelContext fallback to non JMX agent to allow
197            // Camel to continue to run
198            throw ObjectHelper.wrapRuntimeCamelException(e);
199        }
200
201        // set the name we are going to use
202        if (context instanceof DefaultCamelContext) {
203            ((DefaultCamelContext) context).setManagementName(managementName);
204        }
205
206        try {
207            manageObject(mc);
208        } catch (Exception e) {
209            // must rethrow to allow CamelContext fallback to non JMX agent to allow
210            // Camel to continue to run
211            throw ObjectHelper.wrapRuntimeCamelException(e);
212        }
213
214        // yes we made it and are initialized
215        initialized = true;
216
217        if (mc instanceof ManagedCamelContext) {
218            camelContextMBean = (ManagedCamelContext) mc;
219        }
220
221        // register any pre registered now that we are initialized
222        enlistPreRegisteredServices();
223
224        try {
225            Object me = getManagementObjectStrategy().getManagedObjectForCamelHealth(camelContext);
226            if (me == null) {
227                // endpoint should not be managed
228                return;
229            }
230            manageObject(me);
231        } catch (Exception e) {
232            LOG.warn("Could not register CamelHealth MBean. This exception will be ignored.", e);
233        }
234
235        try {
236            Object me = getManagementObjectStrategy().getManagedObjectForRouteController(camelContext);
237            if (me == null) {
238                // endpoint should not be managed
239                return;
240            }
241            manageObject(me);
242        } catch (Exception e) {
243            LOG.warn("Could not register RouteController MBean. This exception will be ignored.", e);
244        }
245    }
246
247    private String findFreeName(Object mc, ManagementNameStrategy strategy, String name) throws MalformedObjectNameException {
248        // we cannot find a free name for fixed named strategies
249        if (strategy.isFixedName()) {
250            return null;
251        }
252
253        // okay try to find a free name
254        boolean done = false;
255        String newName = null;
256        while (!done) {
257            // compute the next name
258            newName = strategy.getNextName();
259            ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(newName, name);
260            done = !getManagementStrategy().isManaged(mc, on);
261            if (LOG.isTraceEnabled()) {
262                LOG.trace("Using name: {} in ObjectName[{}] exists? {}", name, on, done);
263            }
264        }
265        return newName;
266    }
267
268    /**
269     * After {@link CamelContext} has been enlisted in JMX using {@link #onContextStart(org.apache.camel.CamelContext)}
270     * then we can enlist any pre registered services as well, as we had to wait for {@link CamelContext} to be
271     * enlisted first.
272     * <p/>
273     * A component/endpoint/service etc. can be pre registered when using dependency injection and annotations such as
274     * {@link org.apache.camel.Produce}, {@link org.apache.camel.EndpointInject}. Therefore we need to capture those
275     * registrations up front, and then afterwards enlist in JMX when {@link CamelContext} is being started.
276     */
277    private void enlistPreRegisteredServices() {
278        if (preServices.isEmpty()) {
279            return;
280        }
281
282        LOG.debug("Registering {} pre registered services", preServices.size());
283        for (PreRegisterService pre : preServices) {
284            if (pre.getComponent() != null) {
285                onComponentAdd(pre.getName(), pre.getComponent());
286            } else if (pre.getEndpoint() != null) {
287                onEndpointAdd(pre.getEndpoint());
288            } else if (pre.getService() != null) {
289                onServiceAdd(pre.getCamelContext(), pre.getService(), pre.getRoute());
290            }
291        }
292
293        // we are done so clear the list
294        preServices.clear();
295    }
296
297    public void onContextStop(CamelContext context) {
298        // the agent hasn't been started
299        if (!initialized) {
300            return;
301        }
302
303        try {
304            Object mc = getManagementObjectStrategy().getManagedObjectForRouteController(context);
305            // the context could have been removed already
306            if (getManagementStrategy().isManaged(mc, null)) {
307                unmanageObject(mc);
308            }
309        } catch (Exception e) {
310            LOG.warn("Could not unregister RouteController MBean", e);
311        }
312
313        try {
314            Object mc = getManagementObjectStrategy().getManagedObjectForCamelHealth(context);
315            // the context could have been removed already
316            if (getManagementStrategy().isManaged(mc, null)) {
317                unmanageObject(mc);
318            }
319        } catch (Exception e) {
320            LOG.warn("Could not unregister CamelHealth MBean", e);
321        }
322
323        try {
324            Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
325            // the context could have been removed already
326            if (getManagementStrategy().isManaged(mc, null)) {
327                unmanageObject(mc);
328            }
329        } catch (Exception e) {
330            LOG.warn("Could not unregister CamelContext MBean", e);
331        }
332
333        camelContextMBean = null;
334    }
335
336    public void onComponentAdd(String name, Component component) {
337        // always register components as there are only a few of those
338        if (!initialized) {
339            // pre register so we can register later when we have been initialized
340            PreRegisterService pre = new PreRegisterService();
341            pre.onComponentAdd(name, component);
342            preServices.add(pre);
343            return;
344        }
345        try {
346            Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name);
347            manageObject(mc);
348        } catch (Exception e) {
349            LOG.warn("Could not register Component MBean", e);
350        }
351    }
352
353    public void onComponentRemove(String name, Component component) {
354        // the agent hasn't been started
355        if (!initialized) {
356            return;
357        }
358        try {
359            Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name);
360            unmanageObject(mc);
361        } catch (Exception e) {
362            LOG.warn("Could not unregister Component MBean", e);
363        }
364    }
365
366    /**
367     * If the endpoint is an instance of ManagedResource then register it with the
368     * mbean server, if it is not then wrap the endpoint in a {@link ManagedEndpoint} and
369     * register that with the mbean server.
370     *
371     * @param endpoint the Endpoint attempted to be added
372     */
373    public void onEndpointAdd(Endpoint endpoint) {
374        if (!initialized) {
375            // pre register so we can register later when we have been initialized
376            PreRegisterService pre = new PreRegisterService();
377            pre.onEndpointAdd(endpoint);
378            preServices.add(pre);
379            return;
380        }
381
382        if (!shouldRegister(endpoint, null)) {
383            // avoid registering if not needed
384            return;
385        }
386
387        try {
388            Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint);
389            if (me == null) {
390                // endpoint should not be managed
391                return;
392            }
393            manageObject(me);
394        } catch (Exception e) {
395            LOG.warn("Could not register Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e);
396        }
397    }
398
399    public void onEndpointRemove(Endpoint endpoint) {
400        // the agent hasn't been started
401        if (!initialized) {
402            return;
403        }
404
405        try {
406            Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint);
407            unmanageObject(me);
408        } catch (Exception e) {
409            LOG.warn("Could not unregister Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e);
410        }
411    }
412
413    public void onServiceAdd(CamelContext context, Service service, Route route) {
414        if (!initialized) {
415            // pre register so we can register later when we have been initialized
416            PreRegisterService pre = new PreRegisterService();
417            pre.onServiceAdd(context, service, route);
418            preServices.add(pre);
419            return;
420        }
421
422        // services can by any kind of misc type but also processors
423        // so we have special logic when its a processor
424
425        if (!shouldRegister(service, route)) {
426            // avoid registering if not needed
427            return;
428        }
429
430        Object managedObject = getManagedObjectForService(context, service, route);
431        if (managedObject == null) {
432            // service should not be managed
433            return;
434        }
435
436        // skip already managed services, for example if a route has been restarted
437        if (getManagementStrategy().isManaged(managedObject, null)) {
438            LOG.trace("The service is already managed: {}", service);
439            return;
440        }
441
442        try {
443            manageObject(managedObject);
444        } catch (Exception e) {
445            LOG.warn("Could not register service: " + service + " as Service MBean.", e);
446        }
447    }
448
449    public void onServiceRemove(CamelContext context, Service service, Route route) {
450        // the agent hasn't been started
451        if (!initialized) {
452            return;
453        }
454
455        Object managedObject = getManagedObjectForService(context, service, route);
456        if (managedObject != null) {
457            try {
458                unmanageObject(managedObject);
459            } catch (Exception e) {
460                LOG.warn("Could not unregister service: " + service + " as Service MBean.", e);
461            }
462        }
463    }
464
465    @SuppressWarnings("unchecked")
466    private Object getManagedObjectForService(CamelContext context, Service service, Route route) {
467        // skip channel, UoW and dont double wrap instrumentation
468        if (service instanceof Channel || service instanceof UnitOfWork || service instanceof InstrumentationProcessor) {
469            return null;
470        }
471
472        // skip non managed services
473        if (service instanceof NonManagedService) {
474            return null;
475        }
476
477        Object answer = null;
478
479        if (service instanceof ManagementAware) {
480            return ((ManagementAware<Service>) service).getManagedObject(service);
481        } else if (service instanceof Tracer) {
482            // special for tracer
483            Tracer tracer = (Tracer) service;
484            ManagedTracer mt = managedTracers.get(tracer);
485            if (mt == null) {
486                mt = new ManagedTracer(context, tracer);
487                mt.init(getManagementStrategy());
488                managedTracers.put(tracer, mt);
489            }
490            return mt;
491        } else if (service instanceof BacklogTracer) {
492            // special for backlog tracer
493            BacklogTracer backlogTracer = (BacklogTracer) service;
494            ManagedBacklogTracer mt = managedBacklogTracers.get(backlogTracer);
495            if (mt == null) {
496                mt = new ManagedBacklogTracer(context, backlogTracer);
497                mt.init(getManagementStrategy());
498                managedBacklogTracers.put(backlogTracer, mt);
499            }
500            return mt;
501        } else if (service instanceof BacklogDebugger) {
502            // special for backlog debugger
503            BacklogDebugger backlogDebugger = (BacklogDebugger) service;
504            ManagedBacklogDebugger md = managedBacklogDebuggers.get(backlogDebugger);
505            if (md == null) {
506                md = new ManagedBacklogDebugger(context, backlogDebugger);
507                md.init(getManagementStrategy());
508                managedBacklogDebuggers.put(backlogDebugger, md);
509            }
510            return md;
511        } else if (service instanceof DataFormat) {
512            answer = getManagementObjectStrategy().getManagedObjectForDataFormat(context, (DataFormat) service);
513        } else if (service instanceof Producer) {
514            answer = getManagementObjectStrategy().getManagedObjectForProducer(context, (Producer) service);
515        } else if (service instanceof Consumer) {
516            answer = getManagementObjectStrategy().getManagedObjectForConsumer(context, (Consumer) service);
517        } else if (service instanceof Processor) {
518            // special for processors as we need to do some extra work
519            return getManagedObjectForProcessor(context, (Processor) service, route);
520        } else if (service instanceof ThrottlingInflightRoutePolicy) {
521            answer = new ManagedThrottlingInflightRoutePolicy(context, (ThrottlingInflightRoutePolicy) service);
522        } else if (service instanceof ThrottlingExceptionRoutePolicy) {
523            answer = new ManagedThrottlingExceptionRoutePolicy(context, (ThrottlingExceptionRoutePolicy) service);
524        } else if (service instanceof ConsumerCache) {
525            answer = new ManagedConsumerCache(context, (ConsumerCache) service);
526        } else if (service instanceof ProducerCache) {
527            answer = new ManagedProducerCache(context, (ProducerCache) service);
528        } else if (service instanceof DefaultEndpointRegistry) {
529            answer = new ManagedEndpointRegistry(context, (DefaultEndpointRegistry) service);
530        } else if (service instanceof TypeConverterRegistry) {
531            answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service);
532        } else if (service instanceof RestRegistry) {
533            answer = new ManagedRestRegistry(context, (RestRegistry) service);
534        } else if (service instanceof InflightRepository) {
535            answer = new ManagedInflightRepository(context, (InflightRepository) service);
536        } else if (service instanceof AsyncProcessorAwaitManager) {
537            answer = new ManagedAsyncProcessorAwaitManager(context, (AsyncProcessorAwaitManager) service);
538        } else if (service instanceof RuntimeEndpointRegistry) {
539            answer = new ManagedRuntimeEndpointRegistry(context, (RuntimeEndpointRegistry) service);
540        } else if (service instanceof StreamCachingStrategy) {
541            answer = new ManagedStreamCachingStrategy(context, (StreamCachingStrategy) service);
542        } else if (service instanceof EventNotifier) {
543            answer = getManagementObjectStrategy().getManagedObjectForEventNotifier(context, (EventNotifier) service);
544        } else if (service instanceof TransformerRegistry) {
545            answer = new ManagedTransformerRegistry(context, (TransformerRegistry)service);
546        } else if (service instanceof ValidatorRegistry) {
547            answer = new ManagedValidatorRegistry(context, (ValidatorRegistry)service);
548        } else if (service instanceof RuntimeCamelCatalog) {
549            answer = new ManagedRuntimeCamelCatalog(context, (RuntimeCamelCatalog) service);
550        } else if (service instanceof CamelClusterService) {
551            answer = getManagementObjectStrategy().getManagedObjectForClusterService(context, (CamelClusterService)service);
552        } else if (service != null) {
553            // fallback as generic service
554            answer = getManagementObjectStrategy().getManagedObjectForService(context, service);
555        }
556
557        if (answer instanceof ManagedService) {
558            ManagedService ms = (ManagedService) answer;
559            ms.setRoute(route);
560            ms.init(getManagementStrategy());
561        }
562
563        return answer;
564    }
565
566    private Object getManagedObjectForProcessor(CamelContext context, Processor processor, Route route) {
567        // a bit of magic here as the processors we want to manage have already been registered
568        // in the wrapped processors map when Camel have instrumented the route on route initialization
569        // so the idea is now to only manage the processors from the map
570        KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = wrappedProcessors.get(processor);
571        if (holder == null) {
572            // skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc.
573            return null;
574        }
575
576        // get the managed object as it can be a specialized type such as a Delayer/Throttler etc.
577        Object managedObject = getManagementObjectStrategy().getManagedObjectForProcessor(context, processor, holder.getKey(), route);
578        // only manage if we have a name for it as otherwise we do not want to manage it anyway
579        if (managedObject != null) {
580            // is it a performance counter then we need to set our counter
581            if (managedObject instanceof PerformanceCounter) {
582                InstrumentationProcessor counter = holder.getValue();
583                if (counter != null) {
584                    // change counter to us
585                    counter.setCounter(managedObject);
586                }
587            }
588        }
589
590        return managedObject;
591    }
592
593    public void onRoutesAdd(Collection<Route> routes) {
594        for (Route route : routes) {
595
596            // if we are starting CamelContext or either of the two options has been
597            // enabled, then enlist the route as a known route
598            if (getCamelContext().getStatus().isStarting()
599                || getManagementStrategy().getManagementAgent().getRegisterAlways()
600                || getManagementStrategy().getManagementAgent().getRegisterNewRoutes()) {
601                // register as known route id
602                knowRouteIds.add(route.getId());
603            }
604
605            if (!shouldRegister(route, route)) {
606                // avoid registering if not needed, skip to next route
607                continue;
608            }
609
610            Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route);
611
612            // skip already managed routes, for example if the route has been restarted
613            if (getManagementStrategy().isManaged(mr, null)) {
614                LOG.trace("The route is already managed: {}", route);
615                continue;
616            }
617
618            // get the wrapped instrumentation processor from this route
619            // and set me as the counter
620            if (route instanceof EventDrivenConsumerRoute) {
621                EventDrivenConsumerRoute edcr = (EventDrivenConsumerRoute) route;
622                Processor processor = edcr.getProcessor();
623                if (processor instanceof CamelInternalProcessor && mr instanceof ManagedRoute) {
624                    CamelInternalProcessor internal = (CamelInternalProcessor) processor;
625                    ManagedRoute routeMBean = (ManagedRoute) mr;
626
627                    CamelInternalProcessor.InstrumentationAdvice task = internal.getAdvice(CamelInternalProcessor.InstrumentationAdvice.class);
628                    if (task != null) {
629                        // we need to wrap the counter with the camel context so we get stats updated on the context as well
630                        if (camelContextMBean != null) {
631                            CompositePerformanceCounter wrapper = new CompositePerformanceCounter(routeMBean, camelContextMBean);
632                            task.setCounter(wrapper);
633                        } else {
634                            task.setCounter(routeMBean);
635                        }
636                    }
637                }
638            }
639
640            try {
641                manageObject(mr);
642            } catch (JMException e) {
643                LOG.warn("Could not register Route MBean", e);
644            } catch (Exception e) {
645                LOG.warn("Could not create Route MBean", e);
646            }
647        }
648    }
649
650    public void onRoutesRemove(Collection<Route> routes) {
651        // the agent hasn't been started
652        if (!initialized) {
653            return;
654        }
655
656        for (Route route : routes) {
657            Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route);
658
659            // skip unmanaged routes
660            if (!getManagementStrategy().isManaged(mr, null)) {
661                LOG.trace("The route is not managed: {}", route);
662                continue;
663            }
664
665            try {
666                unmanageObject(mr);
667            } catch (Exception e) {
668                LOG.warn("Could not unregister Route MBean", e);
669            }
670
671            // remove from known routes ids, as the route has been removed
672            knowRouteIds.remove(route.getId());
673        }
674
675        // after the routes has been removed, we should clear the wrapped processors as we no longer need them
676        // as they were just a provisional map used during creation of routes
677        removeWrappedProcessorsForRoutes(routes);
678    }
679
680    public void onErrorHandlerAdd(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) {
681        if (!shouldRegister(errorHandler, null)) {
682            // avoid registering if not needed
683            return;
684        }
685
686        Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder);
687
688        // skip already managed services, for example if a route has been restarted
689        if (getManagementStrategy().isManaged(me, null)) {
690            LOG.trace("The error handler builder is already managed: {}", errorHandlerBuilder);
691            return;
692        }
693
694        try {
695            manageObject(me);
696        } catch (Exception e) {
697            LOG.warn("Could not register error handler builder: " + errorHandlerBuilder + " as ErrorHandler MBean.", e);
698        }
699    }
700
701    public void onErrorHandlerRemove(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) {
702        if (!initialized) {
703            return;
704        }
705
706        Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder);
707        if (me != null) {
708            try {
709                unmanageObject(me);
710            } catch (Exception e) {
711                LOG.warn("Could not unregister error handler: " + me + " as ErrorHandler MBean.", e);
712            }
713        }
714    }
715
716    public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool, String id,
717                                String sourceId, String routeId, String threadPoolProfileId) {
718
719        if (!shouldRegister(threadPool, null)) {
720            // avoid registering if not needed
721            return;
722        }
723
724        Object mtp = getManagementObjectStrategy().getManagedObjectForThreadPool(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId);
725
726        // skip already managed services, for example if a route has been restarted
727        if (getManagementStrategy().isManaged(mtp, null)) {
728            LOG.trace("The thread pool is already managed: {}", threadPool);
729            return;
730        }
731
732        try {
733            manageObject(mtp);
734            // store a reference so we can unmanage from JMX when the thread pool is removed
735            // we need to keep track here, as we cannot re-construct the thread pool ObjectName when removing the thread pool
736            managedThreadPools.put(threadPool, mtp);
737        } catch (Exception e) {
738            LOG.warn("Could not register thread pool: " + threadPool + " as ThreadPool MBean.", e);
739        }
740    }
741
742    public void onThreadPoolRemove(CamelContext camelContext, ThreadPoolExecutor threadPool) {
743        if (!initialized) {
744            return;
745        }
746
747        // lookup the thread pool and remove it from JMX
748        Object mtp = managedThreadPools.remove(threadPool);
749        if (mtp != null) {
750            // skip unmanaged routes
751            if (!getManagementStrategy().isManaged(mtp, null)) {
752                LOG.trace("The thread pool is not managed: {}", threadPool);
753                return;
754            }
755
756            try {
757                unmanageObject(mtp);
758            } catch (Exception e) {
759                LOG.warn("Could not unregister ThreadPool MBean", e);
760            }
761        }
762    }
763
764    public void onRouteContextCreate(RouteContext routeContext) {
765        if (!initialized) {
766            return;
767        }
768
769        // Create a map (ProcessorType -> PerformanceCounter)
770        // to be passed to InstrumentationInterceptStrategy.
771        Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters = new HashMap<>();
772
773        // Each processor in a route will have its own performance counter.
774        // These performance counter will be embedded to InstrumentationProcessor
775        // and wrap the appropriate processor by InstrumentationInterceptStrategy.
776        RouteDefinition route = routeContext.getRoute();
777
778        // register performance counters for all processors and its children
779        for (ProcessorDefinition<?> processor : route.getOutputs()) {
780            registerPerformanceCounters(routeContext, processor, registeredCounters);
781        }
782
783        // set this managed intercept strategy that executes the JMX instrumentation for performance metrics
784        // so our registered counters can be used for fine grained performance instrumentation
785        routeContext.setManagedInterceptStrategy(new InstrumentationInterceptStrategy(registeredCounters, wrappedProcessors));
786    }
787
788    /**
789     * Removes the wrapped processors for the given routes, as they are no longer in use.
790     * <p/>
791     * This is needed to avoid accumulating memory, if a lot of routes is being added and removed.
792     *
793     * @param routes the routes
794     */
795    private void removeWrappedProcessorsForRoutes(Collection<Route> routes) {
796        // loop the routes, and remove the route associated wrapped processors, as they are no longer in use
797        for (Route route : routes) {
798            String id = route.getId();
799
800            Iterator<KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> it = wrappedProcessors.values().iterator();
801            while (it.hasNext()) {
802                KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = it.next();
803                RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey());
804                if (def != null && id.equals(def.getId())) {
805                    it.remove();
806                }
807            }
808        }
809        
810    }
811
812    private void registerPerformanceCounters(RouteContext routeContext, ProcessorDefinition<?> processor,
813                                             Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters) {
814
815        // traverse children if any exists
816        List<ProcessorDefinition<?>> children = processor.getOutputs();
817        for (ProcessorDefinition<?> child : children) {
818            registerPerformanceCounters(routeContext, child, registeredCounters);
819        }
820
821        // skip processors that should not be registered
822        if (!registerProcessor(processor)) {
823            return;
824        }
825
826        // okay this is a processor we would like to manage so create the
827        // a delegate performance counter that acts as the placeholder in the interceptor
828        // that then delegates to the real mbean which we register later in the onServiceAdd method
829        DelegatePerformanceCounter pc = new DelegatePerformanceCounter();
830        // set statistics enabled depending on the option
831        boolean enabled = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isDefaultOrExtended();
832        pc.setStatisticsEnabled(enabled);
833
834        // and add it as a a registered counter that will be used lazy when Camel
835        // does the instrumentation of the route and adds the InstrumentationProcessor
836        // that does the actual performance metrics gatherings at runtime
837        registeredCounters.put(processor, pc);
838    }
839
840    /**
841     * Should the given processor be registered.
842     */
843    protected boolean registerProcessor(ProcessorDefinition<?> processor) {
844        // skip on exception
845        if (processor instanceof OnExceptionDefinition) {
846            return false;
847        }
848        // skip on completion
849        if (processor instanceof OnCompletionDefinition) {
850            return false;
851        }
852        // skip intercept
853        if (processor instanceof InterceptDefinition) {
854            return false;
855        }
856        // skip aop
857        if (processor instanceof AOPDefinition) {
858            return false;
859        }
860        // skip policy
861        if (processor instanceof PolicyDefinition) {
862            return false;
863        }
864
865        // only if custom id assigned
866        boolean only = getManagementStrategy().getManagementAgent().getOnlyRegisterProcessorWithCustomId() != null
867                && getManagementStrategy().getManagementAgent().getOnlyRegisterProcessorWithCustomId();
868        if (only) {
869            return processor.hasCustomIdAssigned();
870        }
871
872        // use customer filter
873        return getManagementStrategy().manageProcessor(processor);
874    }
875
876    private ManagementStrategy getManagementStrategy() {
877        ObjectHelper.notNull(camelContext, "CamelContext");
878        return camelContext.getManagementStrategy();
879    }
880
881    private ManagementObjectStrategy getManagementObjectStrategy() {
882        ObjectHelper.notNull(camelContext, "CamelContext");
883        return camelContext.getManagementStrategy().getManagementObjectStrategy();
884    }
885
886    /**
887     * Strategy for managing the object
888     *
889     * @param me the managed object
890     * @throws Exception is thrown if error registering the object for management
891     */
892    protected void manageObject(Object me) throws Exception {
893        getManagementStrategy().manageObject(me);
894        if (me instanceof TimerListener) {
895            TimerListener timer = (TimerListener) me;
896            loadTimer.addTimerListener(timer);
897        }
898    }
899
900    /**
901     * Un-manages the object.
902     *
903     * @param me the managed object
904     * @throws Exception is thrown if error unregistering the managed object
905     */
906    protected void unmanageObject(Object me) throws Exception {
907        if (me instanceof TimerListener) {
908            TimerListener timer = (TimerListener) me;
909            loadTimer.removeTimerListener(timer);
910        }
911        getManagementStrategy().unmanageObject(me);
912    }
913
914    /**
915     * Whether or not to register the mbean.
916     * <p/>
917     * The {@link ManagementAgent} has options which controls when to register.
918     * This allows us to only register mbeans accordingly. For example by default any
919     * dynamic endpoints is not registered. This avoids to register excessive mbeans, which
920     * most often is not desired.
921     *
922     * @param service the object to register
923     * @param route   an optional route the mbean is associated with, can be <tt>null</tt>
924     * @return <tt>true</tt> to register, <tt>false</tt> to skip registering
925     */
926    protected boolean shouldRegister(Object service, Route route) {
927        // the agent hasn't been started
928        if (!initialized) {
929            return false;
930        }
931
932        LOG.trace("Checking whether to register {} from route: {}", service, route);
933
934        ManagementAgent agent = getManagementStrategy().getManagementAgent();
935        if (agent == null) {
936            // do not register if no agent
937            return false;
938        }
939
940        // always register if we are starting CamelContext
941        if (getCamelContext().getStatus().isStarting()) {
942            return true;
943        }
944
945        // always register if we are setting up routes
946        if (getCamelContext().isSetupRoutes()) {
947            return true;
948        }
949
950        // register if always is enabled
951        if (agent.getRegisterAlways()) {
952            return true;
953        }
954
955        // is it a known route then always accept
956        if (route != null && knowRouteIds.contains(route.getId())) {
957            return true;
958        }
959
960        // only register if we are starting a new route, and current thread is in starting routes mode
961        if (agent.getRegisterNewRoutes()) {
962            // no specific route, then fallback to see if this thread is starting routes
963            // which is kept as state on the camel context
964            return getCamelContext().isStartingRoutes();
965        }
966
967        return false;
968    }
969
970    @Override
971    protected void doStart() throws Exception {
972        ObjectHelper.notNull(camelContext, "CamelContext");
973
974        // defer starting the timer manager until CamelContext has been fully started
975        camelContext.addStartupListener(loadTimerStartupListener);
976    }
977
978    private final class TimerListenerManagerStartupListener implements StartupListener {
979
980        @Override
981        public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception {
982            // we are disabled either if configured explicit, or if level is off
983            boolean load = camelContext.getManagementStrategy().getManagementAgent().getLoadStatisticsEnabled() != null
984                    && camelContext.getManagementStrategy().getManagementAgent().getLoadStatisticsEnabled();
985            boolean disabled = !load || camelContext.getManagementStrategy().getStatisticsLevel() == ManagementStatisticsLevel.Off;
986
987            LOG.debug("Load performance statistics {}", disabled ? "disabled" : "enabled");
988            if (!disabled) {
989                // must use 1 sec interval as the load statistics is based on 1 sec calculations
990                loadTimer.setInterval(1000);
991                // we have to defer enlisting timer lister manager as a service until CamelContext has been started
992                getCamelContext().addService(loadTimer);
993            }
994        }
995    }
996
997    @Override
998    protected void doStop() throws Exception {
999        initialized = false;
1000        knowRouteIds.clear();
1001        preServices.clear();
1002        wrappedProcessors.clear();
1003        managedTracers.clear();
1004        managedBacklogTracers.clear();
1005        managedBacklogDebuggers.clear();
1006        managedThreadPools.clear();
1007    }
1008
1009    /**
1010     * Class which holds any pre registration details.
1011     *
1012     * @see org.apache.camel.management.DefaultManagementLifecycleStrategy#enlistPreRegisteredServices()
1013     */
1014    private static final class PreRegisterService {
1015
1016        private String name;
1017        private Component component;
1018        private Endpoint endpoint;
1019        private CamelContext camelContext;
1020        private Service service;
1021        private Route route;
1022
1023        public void onComponentAdd(String name, Component component) {
1024            this.name = name;
1025            this.component = component;
1026        }
1027
1028        public void onEndpointAdd(Endpoint endpoint) {
1029            this.endpoint = endpoint;
1030        }
1031
1032        public void onServiceAdd(CamelContext camelContext, Service service, Route route) {
1033            this.camelContext = camelContext;
1034            this.service = service;
1035            this.route = route;
1036        }
1037
1038        public String getName() {
1039            return name;
1040        }
1041
1042        public Component getComponent() {
1043            return component;
1044        }
1045
1046        public Endpoint getEndpoint() {
1047            return endpoint;
1048        }
1049
1050        public CamelContext getCamelContext() {
1051            return camelContext;
1052        }
1053
1054        public Service getService() {
1055            return service;
1056        }
1057
1058        public Route getRoute() {
1059            return route;
1060        }
1061    }
1062
1063}
1064