001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.management;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.HashMap;
022    import java.util.HashSet;
023    import java.util.Iterator;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Set;
027    import java.util.concurrent.ScheduledExecutorService;
028    import java.util.concurrent.ThreadPoolExecutor;
029    import javax.management.JMException;
030    import javax.management.MalformedObjectNameException;
031    import javax.management.ObjectName;
032    
033    import org.apache.camel.CamelContext;
034    import org.apache.camel.CamelContextAware;
035    import org.apache.camel.Channel;
036    import org.apache.camel.Component;
037    import org.apache.camel.Consumer;
038    import org.apache.camel.Endpoint;
039    import org.apache.camel.ErrorHandlerFactory;
040    import org.apache.camel.ManagementStatisticsLevel;
041    import org.apache.camel.Processor;
042    import org.apache.camel.Producer;
043    import org.apache.camel.Route;
044    import org.apache.camel.Service;
045    import org.apache.camel.StartupListener;
046    import org.apache.camel.TimerListener;
047    import org.apache.camel.VetoCamelContextStartException;
048    import org.apache.camel.api.management.PerformanceCounter;
049    import org.apache.camel.impl.ConsumerCache;
050    import org.apache.camel.impl.DefaultCamelContext;
051    import org.apache.camel.impl.EndpointRegistry;
052    import org.apache.camel.impl.EventDrivenConsumerRoute;
053    import org.apache.camel.impl.ProducerCache;
054    import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
055    import org.apache.camel.management.mbean.ManagedCamelContext;
056    import org.apache.camel.management.mbean.ManagedConsumerCache;
057    import org.apache.camel.management.mbean.ManagedEndpoint;
058    import org.apache.camel.management.mbean.ManagedEndpointRegistry;
059    import org.apache.camel.management.mbean.ManagedProducerCache;
060    import org.apache.camel.management.mbean.ManagedRoute;
061    import org.apache.camel.management.mbean.ManagedService;
062    import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy;
063    import org.apache.camel.management.mbean.ManagedTracer;
064    import org.apache.camel.management.mbean.ManagedTypeConverterRegistry;
065    import org.apache.camel.model.AOPDefinition;
066    import org.apache.camel.model.InterceptDefinition;
067    import org.apache.camel.model.OnCompletionDefinition;
068    import org.apache.camel.model.OnExceptionDefinition;
069    import org.apache.camel.model.PolicyDefinition;
070    import org.apache.camel.model.ProcessorDefinition;
071    import org.apache.camel.model.ProcessorDefinitionHelper;
072    import org.apache.camel.model.RouteDefinition;
073    import org.apache.camel.processor.interceptor.Tracer;
074    import org.apache.camel.spi.EventNotifier;
075    import org.apache.camel.spi.LifecycleStrategy;
076    import org.apache.camel.spi.ManagementAgent;
077    import org.apache.camel.spi.ManagementAware;
078    import org.apache.camel.spi.ManagementNameStrategy;
079    import org.apache.camel.spi.ManagementObjectStrategy;
080    import org.apache.camel.spi.ManagementStrategy;
081    import org.apache.camel.spi.RouteContext;
082    import org.apache.camel.spi.TypeConverterRegistry;
083    import org.apache.camel.spi.UnitOfWork;
084    import org.apache.camel.support.ServiceSupport;
085    import org.apache.camel.support.TimerListenerManager;
086    import org.apache.camel.util.KeyValueHolder;
087    import org.apache.camel.util.ObjectHelper;
088    import org.slf4j.Logger;
089    import org.slf4j.LoggerFactory;
090    
091    /**
092     * Default JMX managed lifecycle strategy that registered objects using the configured
093     * {@link org.apache.camel.spi.ManagementStrategy}.
094     *
095     * @see org.apache.camel.spi.ManagementStrategy
096     * @version 
097     */
098    @SuppressWarnings("deprecation")
099    public class DefaultManagementLifecycleStrategy extends ServiceSupport implements LifecycleStrategy, CamelContextAware {
100    
101        private static final Logger LOG = LoggerFactory.getLogger(DefaultManagementLifecycleStrategy.class);
102        // the wrapped processors is for performance counters, which are in use for the created routes
103        // when a route is removed, we should remove the associated processors from this map
104        private final Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors =
105                new HashMap<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>>();
106        private final List<PreRegisterService> preServices = new ArrayList<PreRegisterService>();
107        private final TimerListenerManager timerListenerManager = new TimerListenerManager();
108        private final TimerListenerManagerStartupListener timerManagerStartupListener = new TimerListenerManagerStartupListener();
109        private volatile CamelContext camelContext;
110        private volatile ManagedCamelContext camelContextMBean;
111        private volatile boolean initialized;
112        private final Set<String> knowRouteIds = new HashSet<String>();
113        private final Map<Tracer, ManagedTracer> managedTracers = new HashMap<Tracer, ManagedTracer>();
114        private final Map<ThreadPoolExecutor, Object> managedThreadPools = new HashMap<ThreadPoolExecutor, Object>();
115    
116        public DefaultManagementLifecycleStrategy() {
117        }
118    
119        public DefaultManagementLifecycleStrategy(CamelContext camelContext) {
120            this.camelContext = camelContext;
121        }
122    
123        public CamelContext getCamelContext() {
124            return camelContext;
125        }
126    
127        public void setCamelContext(CamelContext camelContext) {
128            this.camelContext = camelContext;
129        }
130    
131        public void onContextStart(CamelContext context) throws VetoCamelContextStartException {
132            Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
133    
134            String name = context.getName();
135            String managementName = context.getManagementNameStrategy().getName();
136    
137            try {
138                boolean done = false;
139                while (!done) {
140                    ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(managementName, name);
141                    boolean exists = getManagementStrategy().isManaged(mc, on);
142                    if (!exists) {
143                        done = true;
144                    } else {
145                        // okay there exists already a CamelContext with this name, we can try to fix it by finding a free name
146                        boolean fixed = false;
147                        // if we use the default name strategy we can find a free name to use
148                        String newName = findFreeName(mc, context.getManagementNameStrategy(), name);
149                        if (newName != null) {
150                            // use this as the fixed name
151                            fixed = true;
152                            done = true;
153                            managementName = newName;
154                        }
155                        // we could not fix it so veto starting camel
156                        if (!fixed) {
157                            throw new VetoCamelContextStartException("CamelContext (" + context.getName() + ") with ObjectName[" + on + "] is already registered."
158                                + " Make sure to use unique names on CamelContext when using multiple CamelContexts in the same MBeanServer.", context);
159                        } else {
160                            LOG.warn("This CamelContext(" + context.getName() + ") will be registered using the name: " + managementName
161                                + " due to clash with an existing name already registered in MBeanServer.");
162                        }
163                    }
164                }
165            } catch (VetoCamelContextStartException e) {
166                // rethrow veto
167                throw e;
168            } catch (Exception e) {
169                // must rethrow to allow CamelContext fallback to non JMX agent to allow
170                // Camel to continue to run
171                throw ObjectHelper.wrapRuntimeCamelException(e);
172            }
173    
174            // set the name we are going to use
175            if (context instanceof DefaultCamelContext) {
176                ((DefaultCamelContext) context).setManagementName(managementName);
177            }
178    
179            try {
180                manageObject(mc);
181            } catch (Exception e) {
182                // must rethrow to allow CamelContext fallback to non JMX agent to allow
183                // Camel to continue to run
184                throw ObjectHelper.wrapRuntimeCamelException(e);
185            }
186    
187            // yes we made it and are initialized
188            initialized = true;
189    
190            if (mc instanceof ManagedCamelContext) {
191                camelContextMBean = (ManagedCamelContext) mc;
192            }
193    
194            // register any pre registered now that we are initialized
195            enlistPreRegisteredServices();
196        }
197    
198        private String findFreeName(Object mc, ManagementNameStrategy strategy, String name) throws MalformedObjectNameException {
199            // we cannot find a free name for fixed named strategies
200            if (strategy.isFixedName()) {
201                return null;
202            }
203    
204            // okay try to find a free name
205            boolean done = false;
206            String newName = null;
207            while (!done) {
208                // compute the next name
209                newName = strategy.getNextName();
210                ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(newName, name);
211                done = !getManagementStrategy().isManaged(mc, on);
212                if (LOG.isTraceEnabled()) {
213                    LOG.trace("Using name: {} in ObjectName[{}] exists? {}", new Object[]{name, on, done});
214                }
215            }
216            return newName;
217        }
218    
219        /**
220         * After {@link CamelContext} has been enlisted in JMX using {@link #onContextStart(org.apache.camel.CamelContext)}
221         * then we can enlist any pre registered services as well, as we had to wait for {@link CamelContext} to be
222         * enlisted first.
223         * <p/>
224         * A component/endpoint/service etc. can be pre registered when using dependency injection and annotations such as
225         * {@link org.apache.camel.Produce}, {@link org.apache.camel.EndpointInject}. Therefore we need to capture those
226         * registrations up front, and then afterwards enlist in JMX when {@link CamelContext} is being started.
227         */
228        private void enlistPreRegisteredServices() {
229            if (preServices.isEmpty()) {
230                return;
231            }
232    
233            LOG.debug("Registering {} pre registered services", preServices.size());
234            for (PreRegisterService pre : preServices) {
235                if (pre.getComponent() != null) {
236                    onComponentAdd(pre.getName(), pre.getComponent());
237                } else if (pre.getEndpoint() != null) {
238                    onEndpointAdd(pre.getEndpoint());
239                } else if (pre.getService() != null) {
240                    onServiceAdd(pre.getCamelContext(), pre.getService(), pre.getRoute());
241                }
242            }
243    
244            // we are done so clear the list
245            preServices.clear();
246        }
247    
248        public void onContextStop(CamelContext context) {
249            // the agent hasn't been started
250            if (!initialized) {
251                return;
252            }
253            try {
254                Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
255                // the context could have been removed already
256                if (getManagementStrategy().isManaged(mc, null)) {
257                    unmanageObject(mc);
258                }
259            } catch (Exception e) {
260                LOG.warn("Could not unregister CamelContext MBean", e);
261            }
262    
263            camelContextMBean = null;
264        }
265    
266        public void onComponentAdd(String name, Component component) {
267            // always register components as there are only a few of those
268            if (!initialized) {
269                // pre register so we can register later when we have been initialized
270                PreRegisterService pre = new PreRegisterService();
271                pre.onComponentAdd(name, component);
272                preServices.add(pre);
273                return;
274            }
275            try {
276                Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name);
277                manageObject(mc);
278            } catch (Exception e) {
279                LOG.warn("Could not register Component MBean", e);
280            }
281        }
282    
283        public void onComponentRemove(String name, Component component) {
284            // the agent hasn't been started
285            if (!initialized) {
286                return;
287            }
288            try {
289                Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name);
290                unmanageObject(mc);
291            } catch (Exception e) {
292                LOG.warn("Could not unregister Component MBean", e);
293            }
294        }
295    
296        /**
297         * If the endpoint is an instance of ManagedResource then register it with the
298         * mbean server, if it is not then wrap the endpoint in a {@link ManagedEndpoint} and
299         * register that with the mbean server.
300         *
301         * @param endpoint the Endpoint attempted to be added
302         */
303        public void onEndpointAdd(Endpoint endpoint) {
304            if (!initialized) {
305                // pre register so we can register later when we have been initialized
306                PreRegisterService pre = new PreRegisterService();
307                pre.onEndpointAdd(endpoint);
308                preServices.add(pre);
309                return;
310            }
311    
312            if (!shouldRegister(endpoint, null)) {
313                // avoid registering if not needed
314                return;
315            }
316    
317            try {
318                Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint);
319                if (me == null) {
320                    // endpoint should not be managed
321                    return;
322                }
323                manageObject(me);
324            } catch (Exception e) {
325                LOG.warn("Could not register Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e);
326            }
327        }
328    
329        public void onEndpointRemove(Endpoint endpoint) {
330            // the agent hasn't been started
331            if (!initialized) {
332                return;
333            }
334    
335            try {
336                Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint);
337                unmanageObject(me);
338            } catch (Exception e) {
339                LOG.warn("Could not unregister Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e);
340            }
341        }
342    
343        public void onServiceAdd(CamelContext context, Service service, Route route) {
344            if (!initialized) {
345                // pre register so we can register later when we have been initialized
346                PreRegisterService pre = new PreRegisterService();
347                pre.onServiceAdd(context, service, route);
348                preServices.add(pre);
349                return;
350            }
351    
352            // services can by any kind of misc type but also processors
353            // so we have special logic when its a processor
354    
355            if (!shouldRegister(service, route)) {
356                // avoid registering if not needed
357                return;
358            }
359    
360            Object managedObject = getManagedObjectForService(context, service, route);
361            if (managedObject == null) {
362                // service should not be managed
363                return;
364            }
365    
366            // skip already managed services, for example if a route has been restarted
367            if (getManagementStrategy().isManaged(managedObject, null)) {
368                LOG.trace("The service is already managed: {}", service);
369                return;
370            }
371    
372            try {
373                manageObject(managedObject);
374            } catch (Exception e) {
375                LOG.warn("Could not register service: " + service + " as Service MBean.", e);
376            }
377        }
378    
379        public void onServiceRemove(CamelContext context, Service service, Route route) {
380            // the agent hasn't been started
381            if (!initialized) {
382                return;
383            }
384    
385            Object managedObject = getManagedObjectForService(context, service, route);
386            if (managedObject != null) {
387                try {
388                    unmanageObject(managedObject);
389                } catch (Exception e) {
390                    LOG.warn("Could not unregister service: " + service + " as Service MBean.", e);
391                }
392            }
393        }
394    
395        @SuppressWarnings("unchecked")
396        private Object getManagedObjectForService(CamelContext context, Service service, Route route) {
397            // skip channel, UoW and dont double wrap instrumentation
398            if (service instanceof Channel || service instanceof UnitOfWork || service instanceof InstrumentationProcessor) {
399                return null;
400            }
401    
402            Object answer = null;
403    
404            if (service instanceof ManagementAware) {
405                return ((ManagementAware<Service>) service).getManagedObject(service);
406            } else if (service instanceof Tracer) {
407                // special for tracer
408                Tracer tracer = (Tracer) service;
409                ManagedTracer mt = managedTracers.get(tracer);
410                if (mt == null) {
411                    mt = new ManagedTracer(context, tracer);
412                    mt.init(getManagementStrategy());
413                    managedTracers.put(tracer, mt);
414                }
415                return mt;
416            } else if (service instanceof EventNotifier) {
417                answer = getManagementObjectStrategy().getManagedObjectForEventNotifier(context, (EventNotifier) service);
418            } else if (service instanceof Producer) {
419                answer = getManagementObjectStrategy().getManagedObjectForProducer(context, (Producer) service);
420            } else if (service instanceof Consumer) {
421                answer = getManagementObjectStrategy().getManagedObjectForConsumer(context, (Consumer) service);
422            } else if (service instanceof Processor) {
423                // special for processors as we need to do some extra work
424                return getManagedObjectForProcessor(context, (Processor) service, route);
425            } else if (service instanceof ThrottlingInflightRoutePolicy) {
426                answer = new ManagedThrottlingInflightRoutePolicy(context, (ThrottlingInflightRoutePolicy) service);
427            } else if (service instanceof ConsumerCache) {
428                answer = new ManagedConsumerCache(context, (ConsumerCache) service);
429            } else if (service instanceof ProducerCache) {
430                answer = new ManagedProducerCache(context, (ProducerCache) service);
431            } else if (service instanceof EndpointRegistry) {
432                answer = new ManagedEndpointRegistry(context, (EndpointRegistry) service);
433            } else if (service instanceof TypeConverterRegistry) {
434                answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service);
435            } else if (service != null) {
436                // fallback as generic service
437                answer = getManagementObjectStrategy().getManagedObjectForService(context, service);
438            }
439    
440            if (answer != null && answer instanceof ManagedService) {
441                ManagedService ms = (ManagedService) answer;
442                ms.setRoute(route);
443                ms.init(getManagementStrategy());
444            }
445    
446            return answer;
447        }
448    
449        private Object getManagedObjectForProcessor(CamelContext context, Processor processor, Route route) {
450            // a bit of magic here as the processors we want to manage have already been registered
451            // in the wrapped processors map when Camel have instrumented the route on route initialization
452            // so the idea is now to only manage the processors from the map
453            KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = wrappedProcessors.get(processor);
454            if (holder == null) {
455                // skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc.
456                return null;
457            }
458    
459            // get the managed object as it can be a specialized type such as a Delayer/Throttler etc.
460            Object managedObject = getManagementObjectStrategy().getManagedObjectForProcessor(context, processor, holder.getKey(), route);
461            // only manage if we have a name for it as otherwise we do not want to manage it anyway
462            if (managedObject != null) {
463                // is it a performance counter then we need to set our counter
464                if (managedObject instanceof PerformanceCounter) {
465                    InstrumentationProcessor counter = holder.getValue();
466                    if (counter != null) {
467                        // change counter to us
468                        counter.setCounter(managedObject);
469                    }
470                }
471            }
472    
473            return managedObject;
474        }
475    
476        public void onRoutesAdd(Collection<Route> routes) {
477            for (Route route : routes) {
478    
479                // if we are starting CamelContext or either of the two options has been
480                // enabled, then enlist the route as a known route
481                if (getCamelContext().getStatus().isStarting()
482                    || getManagementStrategy().getManagementAgent().getRegisterAlways()
483                    || getManagementStrategy().getManagementAgent().getRegisterNewRoutes()) {
484                    // register as known route id
485                    knowRouteIds.add(route.getId());
486                }
487    
488                if (!shouldRegister(route, route)) {
489                    // avoid registering if not needed, skip to next route
490                    continue;
491                }
492    
493                Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route);
494    
495                // skip already managed routes, for example if the route has been restarted
496                if (getManagementStrategy().isManaged(mr, null)) {
497                    LOG.trace("The route is already managed: {}", route);
498                    continue;
499                }
500    
501                // get the wrapped instrumentation processor from this route
502                // and set me as the counter
503                if (route instanceof EventDrivenConsumerRoute) {
504                    EventDrivenConsumerRoute edcr = (EventDrivenConsumerRoute) route;
505                    Processor processor = edcr.getProcessor();
506                    if (processor instanceof InstrumentationProcessor && mr instanceof ManagedRoute) {
507                        InstrumentationProcessor ip = (InstrumentationProcessor) processor;
508                        ManagedRoute routeMBean = (ManagedRoute) mr;
509    
510                        // we need to wrap the counter with the camel context so we get stats updated on the context as well
511                        if (camelContextMBean != null) {
512                            CompositePerformanceCounter wrapper = new CompositePerformanceCounter(routeMBean, camelContextMBean);
513                            ip.setCounter(wrapper);
514                        } else {
515                            ip.setCounter(routeMBean);
516                        }
517                    }
518                }
519    
520                try {
521                    manageObject(mr);
522                } catch (JMException e) {
523                    LOG.warn("Could not register Route MBean", e);
524                } catch (Exception e) {
525                    LOG.warn("Could not create Route MBean", e);
526                }
527            }
528        }
529    
530        public void onRoutesRemove(Collection<Route> routes) {
531            // the agent hasn't been started
532            if (!initialized) {
533                return;
534            }
535    
536            for (Route route : routes) {
537                Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route);
538    
539                // skip unmanaged routes
540                if (!getManagementStrategy().isManaged(mr, null)) {
541                    LOG.trace("The route is not managed: {}", route);
542                    continue;
543                }
544    
545                try {
546                    unmanageObject(mr);
547                } catch (Exception e) {
548                    LOG.warn("Could not unregister Route MBean", e);
549                }
550    
551                // remove from known routes ids, as the route has been removed
552                knowRouteIds.remove(route.getId());
553            }
554    
555            // after the routes has been removed, we should clear the wrapped processors as we no longer need them
556            // as they were just a provisional map used during creation of routes
557            removeWrappedProcessorsForRoutes(routes);
558        }
559    
560        public void onErrorHandlerAdd(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) {
561            if (!shouldRegister(errorHandler, null)) {
562                // avoid registering if not needed
563                return;
564            }
565    
566            Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder);
567    
568            // skip already managed services, for example if a route has been restarted
569            if (getManagementStrategy().isManaged(me, null)) {
570                LOG.trace("The error handler builder is already managed: {}", errorHandlerBuilder);
571                return;
572            }
573    
574            try {
575                manageObject(me);
576            } catch (Exception e) {
577                LOG.warn("Could not register error handler builder: " + errorHandlerBuilder + " as ErrorHandler MBean.", e);
578            }
579        }
580    
581        public void onErrorHandlerRemove(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) {
582            if (!initialized) {
583                return;
584            }
585    
586            Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder);
587            if (me != null) {
588                try {
589                    unmanageObject(me);
590                } catch (Exception e) {
591                    LOG.warn("Could not unregister error handler: " + me + " as ErrorHandler MBean.", e);
592                }
593            }
594        }
595    
596        public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool, String id,
597                                    String sourceId, String routeId, String threadPoolProfileId) {
598    
599            if (!shouldRegister(threadPool, null)) {
600                // avoid registering if not needed
601                return;
602            }
603    
604            Object mtp = getManagementObjectStrategy().getManagedObjectForThreadPool(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId);
605    
606            // skip already managed services, for example if a route has been restarted
607            if (getManagementStrategy().isManaged(mtp, null)) {
608                LOG.trace("The thread pool is already managed: {}", threadPool);
609                return;
610            }
611    
612            try {
613                manageObject(mtp);
614                // store a reference so we can unmanage from JMX when the thread pool is removed
615                // we need to keep track here, as we cannot re-construct the thread pool ObjectName when removing the thread pool
616                managedThreadPools.put(threadPool, mtp);
617            } catch (Exception e) {
618                LOG.warn("Could not register thread pool: " + threadPool + " as ThreadPool MBean.", e);
619            }
620        }
621    
622        public void onThreadPoolRemove(CamelContext camelContext, ThreadPoolExecutor threadPool) {
623            if (!initialized) {
624                return;
625            }
626    
627            // lookup the thread pool and remove it from JMX
628            Object mtp = managedThreadPools.remove(threadPool);
629            if (mtp != null) {
630                // skip unmanaged routes
631                if (!getManagementStrategy().isManaged(mtp, null)) {
632                    LOG.trace("The thread pool is not managed: {}", threadPool);
633                    return;
634                }
635    
636                try {
637                    unmanageObject(mtp);
638                } catch (Exception e) {
639                    LOG.warn("Could not unregister ThreadPool MBean", e);
640                }
641            }
642        }
643    
644        public void onRouteContextCreate(RouteContext routeContext) {
645            if (!initialized) {
646                return;
647            }
648    
649            // Create a map (ProcessorType -> PerformanceCounter)
650            // to be passed to InstrumentationInterceptStrategy.
651            Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters =
652                    new HashMap<ProcessorDefinition<?>, PerformanceCounter>();
653    
654            // Each processor in a route will have its own performance counter.
655            // These performance counter will be embedded to InstrumentationProcessor
656            // and wrap the appropriate processor by InstrumentationInterceptStrategy.
657            RouteDefinition route = routeContext.getRoute();
658    
659            // register performance counters for all processors and its children
660            for (ProcessorDefinition<?> processor : route.getOutputs()) {
661                registerPerformanceCounters(routeContext, processor, registeredCounters);
662            }
663    
664            // set this managed intercept strategy that executes the JMX instrumentation for performance metrics
665            // so our registered counters can be used for fine grained performance instrumentation
666            routeContext.setManagedInterceptStrategy(new InstrumentationInterceptStrategy(registeredCounters, wrappedProcessors));
667        }
668    
669        /**
670         * Removes the wrapped processors for the given routes, as they are no longer in use.
671         * <p/>
672         * This is needed to avoid accumulating memory, if a lot of routes is being added and removed.
673         *
674         * @param routes the routes
675         */
676        private void removeWrappedProcessorsForRoutes(Collection<Route> routes) {
677            // loop the routes, and remove the route associated wrapped processors, as they are no longer in use
678            for (Route route : routes) {
679                String id = route.getId();
680    
681                Iterator<KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> it = wrappedProcessors.values().iterator();
682                while (it.hasNext()) {
683                    KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = it.next();
684                    RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey());
685                    if (def != null && id.equals(def.getId())) {
686                        it.remove();
687                    }
688                }
689            }
690            
691        }
692    
693        private void registerPerformanceCounters(RouteContext routeContext, ProcessorDefinition<?> processor,
694                                                 Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters) {
695    
696            // traverse children if any exists
697            List<ProcessorDefinition<?>> children = processor.getOutputs();
698            for (ProcessorDefinition<?> child : children) {
699                registerPerformanceCounters(routeContext, child, registeredCounters);
700            }
701    
702            // skip processors that should not be registered
703            if (!registerProcessor(processor)) {
704                return;
705            }
706    
707            // okay this is a processor we would like to manage so create the
708            // a delegate performance counter that acts as the placeholder in the interceptor
709            // that then delegates to the real mbean which we register later in the onServiceAdd method
710            DelegatePerformanceCounter pc = new DelegatePerformanceCounter();
711            // set statistics enabled depending on the option
712            boolean enabled = camelContext.getManagementStrategy().getStatisticsLevel() == ManagementStatisticsLevel.All;
713            pc.setStatisticsEnabled(enabled);
714    
715            // and add it as a a registered counter that will be used lazy when Camel
716            // does the instrumentation of the route and adds the InstrumentationProcessor
717            // that does the actual performance metrics gatherings at runtime
718            registeredCounters.put(processor, pc);
719        }
720    
721        /**
722         * Should the given processor be registered.
723         */
724        protected boolean registerProcessor(ProcessorDefinition<?> processor) {
725            // skip on exception
726            if (processor instanceof OnExceptionDefinition) {
727                return false;
728            }
729            // skip on completion
730            if (processor instanceof OnCompletionDefinition) {
731                return false;
732            }
733            // skip intercept
734            if (processor instanceof InterceptDefinition) {
735                return false;
736            }
737            // skip aop
738            if (processor instanceof AOPDefinition) {
739                return false;
740            }
741            // skip policy
742            if (processor instanceof PolicyDefinition) {
743                return false;
744            }
745    
746            // only if custom id assigned
747            if (getManagementStrategy().isOnlyManageProcessorWithCustomId()) {
748                return processor.hasCustomIdAssigned();
749            }
750    
751            // use customer filter
752            return getManagementStrategy().manageProcessor(processor);
753        }
754    
755        private ManagementStrategy getManagementStrategy() {
756            ObjectHelper.notNull(camelContext, "CamelContext");
757            return camelContext.getManagementStrategy();
758        }
759    
760        private ManagementObjectStrategy getManagementObjectStrategy() {
761            ObjectHelper.notNull(camelContext, "CamelContext");
762            return camelContext.getManagementStrategy().getManagementObjectStrategy();
763        }
764    
765        /**
766         * Strategy for managing the object
767         *
768         * @param me the managed object
769         * @throws Exception is thrown if error registering the object for management
770         */
771        protected void manageObject(Object me) throws Exception {
772            getManagementStrategy().manageObject(me);
773            if (timerListenerManager != null && me instanceof TimerListener) {
774                TimerListener timer = (TimerListener) me;
775                timerListenerManager.addTimerListener(timer);
776            }
777        }
778    
779        /**
780         * Un-manages the object.
781         *
782         * @param me the managed object
783         * @throws Exception is thrown if error unregistering the managed object
784         */
785        protected void unmanageObject(Object me) throws Exception {
786            if (timerListenerManager != null && me instanceof TimerListener) {
787                TimerListener timer = (TimerListener) me;
788                timerListenerManager.removeTimerListener(timer);
789            }
790            getManagementStrategy().unmanageObject(me);
791        }
792    
793        /**
794         * Whether or not to register the mbean.
795         * <p/>
796         * The {@link ManagementAgent} has options which controls when to register.
797         * This allows us to only register mbeans accordingly. For example by default any
798         * dynamic endpoints is not registered. This avoids to register excessive mbeans, which
799         * most often is not desired.
800         *
801         * @param service the object to register
802         * @param route   an optional route the mbean is associated with, can be <tt>null</tt>
803         * @return <tt>true</tt> to register, <tt>false</tt> to skip registering
804         */
805        protected boolean shouldRegister(Object service, Route route) {
806            // the agent hasn't been started
807            if (!initialized) {
808                return false;
809            }
810    
811            LOG.trace("Checking whether to register {} from route: {}", service, route);
812    
813            ManagementAgent agent = getManagementStrategy().getManagementAgent();
814            if (agent == null) {
815                // do not register if no agent
816                return false;
817            }
818    
819            // always register if we are starting CamelContext
820            if (getCamelContext().getStatus().isStarting()) {
821                return true;
822            }
823    
824            // register if always is enabled
825            if (agent.getRegisterAlways()) {
826                return true;
827            }
828    
829            // is it a known route then always accept
830            if (route != null && knowRouteIds.contains(route.getId())) {
831                return true;
832            }
833    
834            // only register if we are starting a new route, and current thread is in starting routes mode
835            if (agent.getRegisterNewRoutes()) {
836                // no specific route, then fallback to see if this thread is starting routes
837                // which is kept as state on the camel context
838                return getCamelContext().isStartingRoutes();
839            }
840    
841            return false;
842        }
843    
844        @Override
845        protected void doStart() throws Exception {
846            ObjectHelper.notNull(camelContext, "CamelContext");
847    
848            // defer starting the timer manager until CamelContext has been fully started
849            camelContext.addStartupListener(timerManagerStartupListener);
850        }
851    
852        private final class TimerListenerManagerStartupListener implements StartupListener {
853    
854            @Override
855            public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception {
856                boolean enabled = camelContext.getManagementStrategy().getStatisticsLevel() != ManagementStatisticsLevel.Off;
857                if (enabled) {
858                    LOG.info("StatisticsLevel at {} so enabling load performance statistics", camelContext.getManagementStrategy().getStatisticsLevel());
859                    // must use 1 sec interval as the load statistics is based on 1 sec calculations
860                    timerListenerManager.setInterval(1000);
861                    // we have to defer enlisting timer lister manager as a service until CamelContext has been started
862                    getCamelContext().addService(timerListenerManager);
863                }
864            }
865        }
866    
867        @Override
868        protected void doStop() throws Exception {
869            initialized = false;
870            knowRouteIds.clear();
871            preServices.clear();
872            wrappedProcessors.clear();
873            managedTracers.clear();
874            managedThreadPools.clear();
875        }
876    
877        /**
878         * Class which holds any pre registration details.
879         *
880         * @see org.apache.camel.management.DefaultManagementLifecycleStrategy#enlistPreRegisteredServices()
881         */
882        private static final class PreRegisterService {
883    
884            private String name;
885            private Component component;
886            private Endpoint endpoint;
887            private CamelContext camelContext;
888            private Service service;
889            private Route route;
890    
891            public void onComponentAdd(String name, Component component) {
892                this.name = name;
893                this.component = component;
894            }
895    
896            public void onEndpointAdd(Endpoint endpoint) {
897                this.endpoint = endpoint;
898            }
899    
900            public void onServiceAdd(CamelContext camelContext, Service service, Route route) {
901                this.camelContext = camelContext;
902                this.service = service;
903                this.route = route;
904            }
905    
906            public String getName() {
907                return name;
908            }
909    
910            public Component getComponent() {
911                return component;
912            }
913    
914            public Endpoint getEndpoint() {
915                return endpoint;
916            }
917    
918            public CamelContext getCamelContext() {
919                return camelContext;
920            }
921    
922            public Service getService() {
923                return service;
924            }
925    
926            public Route getRoute() {
927                return route;
928            }
929        }
930    
931    }
932