001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.management; 018 019 import java.util.ArrayList; 020 import java.util.Collection; 021 import java.util.HashMap; 022 import java.util.HashSet; 023 import java.util.Iterator; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Set; 027 import java.util.concurrent.ScheduledExecutorService; 028 import java.util.concurrent.ThreadPoolExecutor; 029 import javax.management.JMException; 030 import javax.management.MalformedObjectNameException; 031 import javax.management.ObjectName; 032 033 import org.apache.camel.CamelContext; 034 import org.apache.camel.CamelContextAware; 035 import org.apache.camel.Channel; 036 import org.apache.camel.Component; 037 import org.apache.camel.Consumer; 038 import org.apache.camel.Endpoint; 039 import org.apache.camel.ErrorHandlerFactory; 040 import org.apache.camel.ManagementStatisticsLevel; 041 import org.apache.camel.Processor; 042 import org.apache.camel.Producer; 043 import org.apache.camel.Route; 044 import org.apache.camel.Service; 045 import org.apache.camel.StartupListener; 046 import org.apache.camel.TimerListener; 047 import org.apache.camel.VetoCamelContextStartException; 048 import org.apache.camel.api.management.PerformanceCounter; 049 import org.apache.camel.impl.ConsumerCache; 050 import org.apache.camel.impl.DefaultCamelContext; 051 import org.apache.camel.impl.EndpointRegistry; 052 import org.apache.camel.impl.EventDrivenConsumerRoute; 053 import org.apache.camel.impl.ProducerCache; 054 import org.apache.camel.impl.ThrottlingInflightRoutePolicy; 055 import org.apache.camel.management.mbean.ManagedCamelContext; 056 import org.apache.camel.management.mbean.ManagedConsumerCache; 057 import org.apache.camel.management.mbean.ManagedEndpoint; 058 import org.apache.camel.management.mbean.ManagedEndpointRegistry; 059 import org.apache.camel.management.mbean.ManagedProducerCache; 060 import org.apache.camel.management.mbean.ManagedRoute; 061 import org.apache.camel.management.mbean.ManagedService; 062 import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy; 063 import org.apache.camel.management.mbean.ManagedTracer; 064 import org.apache.camel.management.mbean.ManagedTypeConverterRegistry; 065 import org.apache.camel.model.AOPDefinition; 066 import org.apache.camel.model.InterceptDefinition; 067 import org.apache.camel.model.OnCompletionDefinition; 068 import org.apache.camel.model.OnExceptionDefinition; 069 import org.apache.camel.model.PolicyDefinition; 070 import org.apache.camel.model.ProcessorDefinition; 071 import org.apache.camel.model.ProcessorDefinitionHelper; 072 import org.apache.camel.model.RouteDefinition; 073 import org.apache.camel.processor.interceptor.Tracer; 074 import org.apache.camel.spi.EventNotifier; 075 import org.apache.camel.spi.LifecycleStrategy; 076 import org.apache.camel.spi.ManagementAgent; 077 import org.apache.camel.spi.ManagementAware; 078 import org.apache.camel.spi.ManagementNameStrategy; 079 import org.apache.camel.spi.ManagementObjectStrategy; 080 import org.apache.camel.spi.ManagementStrategy; 081 import org.apache.camel.spi.RouteContext; 082 import org.apache.camel.spi.TypeConverterRegistry; 083 import org.apache.camel.spi.UnitOfWork; 084 import org.apache.camel.support.ServiceSupport; 085 import org.apache.camel.support.TimerListenerManager; 086 import org.apache.camel.util.KeyValueHolder; 087 import org.apache.camel.util.ObjectHelper; 088 import org.slf4j.Logger; 089 import org.slf4j.LoggerFactory; 090 091 /** 092 * Default JMX managed lifecycle strategy that registered objects using the configured 093 * {@link org.apache.camel.spi.ManagementStrategy}. 094 * 095 * @see org.apache.camel.spi.ManagementStrategy 096 * @version 097 */ 098 @SuppressWarnings("deprecation") 099 public class DefaultManagementLifecycleStrategy extends ServiceSupport implements LifecycleStrategy, CamelContextAware { 100 101 private static final Logger LOG = LoggerFactory.getLogger(DefaultManagementLifecycleStrategy.class); 102 // the wrapped processors is for performance counters, which are in use for the created routes 103 // when a route is removed, we should remove the associated processors from this map 104 private final Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors = 105 new HashMap<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>>(); 106 private final List<PreRegisterService> preServices = new ArrayList<PreRegisterService>(); 107 private final TimerListenerManager timerListenerManager = new TimerListenerManager(); 108 private final TimerListenerManagerStartupListener timerManagerStartupListener = new TimerListenerManagerStartupListener(); 109 private volatile CamelContext camelContext; 110 private volatile ManagedCamelContext camelContextMBean; 111 private volatile boolean initialized; 112 private final Set<String> knowRouteIds = new HashSet<String>(); 113 private final Map<Tracer, ManagedTracer> managedTracers = new HashMap<Tracer, ManagedTracer>(); 114 private final Map<ThreadPoolExecutor, Object> managedThreadPools = new HashMap<ThreadPoolExecutor, Object>(); 115 116 public DefaultManagementLifecycleStrategy() { 117 } 118 119 public DefaultManagementLifecycleStrategy(CamelContext camelContext) { 120 this.camelContext = camelContext; 121 } 122 123 public CamelContext getCamelContext() { 124 return camelContext; 125 } 126 127 public void setCamelContext(CamelContext camelContext) { 128 this.camelContext = camelContext; 129 } 130 131 public void onContextStart(CamelContext context) throws VetoCamelContextStartException { 132 Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context); 133 134 String name = context.getName(); 135 String managementName = context.getManagementNameStrategy().getName(); 136 137 try { 138 boolean done = false; 139 while (!done) { 140 ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(managementName, name); 141 boolean exists = getManagementStrategy().isManaged(mc, on); 142 if (!exists) { 143 done = true; 144 } else { 145 // okay there exists already a CamelContext with this name, we can try to fix it by finding a free name 146 boolean fixed = false; 147 // if we use the default name strategy we can find a free name to use 148 String newName = findFreeName(mc, context.getManagementNameStrategy(), name); 149 if (newName != null) { 150 // use this as the fixed name 151 fixed = true; 152 done = true; 153 managementName = newName; 154 } 155 // we could not fix it so veto starting camel 156 if (!fixed) { 157 throw new VetoCamelContextStartException("CamelContext (" + context.getName() + ") with ObjectName[" + on + "] is already registered." 158 + " Make sure to use unique names on CamelContext when using multiple CamelContexts in the same MBeanServer.", context); 159 } else { 160 LOG.warn("This CamelContext(" + context.getName() + ") will be registered using the name: " + managementName 161 + " due to clash with an existing name already registered in MBeanServer."); 162 } 163 } 164 } 165 } catch (VetoCamelContextStartException e) { 166 // rethrow veto 167 throw e; 168 } catch (Exception e) { 169 // must rethrow to allow CamelContext fallback to non JMX agent to allow 170 // Camel to continue to run 171 throw ObjectHelper.wrapRuntimeCamelException(e); 172 } 173 174 // set the name we are going to use 175 if (context instanceof DefaultCamelContext) { 176 ((DefaultCamelContext) context).setManagementName(managementName); 177 } 178 179 try { 180 manageObject(mc); 181 } catch (Exception e) { 182 // must rethrow to allow CamelContext fallback to non JMX agent to allow 183 // Camel to continue to run 184 throw ObjectHelper.wrapRuntimeCamelException(e); 185 } 186 187 // yes we made it and are initialized 188 initialized = true; 189 190 if (mc instanceof ManagedCamelContext) { 191 camelContextMBean = (ManagedCamelContext) mc; 192 } 193 194 // register any pre registered now that we are initialized 195 enlistPreRegisteredServices(); 196 } 197 198 private String findFreeName(Object mc, ManagementNameStrategy strategy, String name) throws MalformedObjectNameException { 199 // we cannot find a free name for fixed named strategies 200 if (strategy.isFixedName()) { 201 return null; 202 } 203 204 // okay try to find a free name 205 boolean done = false; 206 String newName = null; 207 while (!done) { 208 // compute the next name 209 newName = strategy.getNextName(); 210 ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(newName, name); 211 done = !getManagementStrategy().isManaged(mc, on); 212 if (LOG.isTraceEnabled()) { 213 LOG.trace("Using name: {} in ObjectName[{}] exists? {}", new Object[]{name, on, done}); 214 } 215 } 216 return newName; 217 } 218 219 /** 220 * After {@link CamelContext} has been enlisted in JMX using {@link #onContextStart(org.apache.camel.CamelContext)} 221 * then we can enlist any pre registered services as well, as we had to wait for {@link CamelContext} to be 222 * enlisted first. 223 * <p/> 224 * A component/endpoint/service etc. can be pre registered when using dependency injection and annotations such as 225 * {@link org.apache.camel.Produce}, {@link org.apache.camel.EndpointInject}. Therefore we need to capture those 226 * registrations up front, and then afterwards enlist in JMX when {@link CamelContext} is being started. 227 */ 228 private void enlistPreRegisteredServices() { 229 if (preServices.isEmpty()) { 230 return; 231 } 232 233 LOG.debug("Registering {} pre registered services", preServices.size()); 234 for (PreRegisterService pre : preServices) { 235 if (pre.getComponent() != null) { 236 onComponentAdd(pre.getName(), pre.getComponent()); 237 } else if (pre.getEndpoint() != null) { 238 onEndpointAdd(pre.getEndpoint()); 239 } else if (pre.getService() != null) { 240 onServiceAdd(pre.getCamelContext(), pre.getService(), pre.getRoute()); 241 } 242 } 243 244 // we are done so clear the list 245 preServices.clear(); 246 } 247 248 public void onContextStop(CamelContext context) { 249 // the agent hasn't been started 250 if (!initialized) { 251 return; 252 } 253 try { 254 Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context); 255 // the context could have been removed already 256 if (getManagementStrategy().isManaged(mc, null)) { 257 unmanageObject(mc); 258 } 259 } catch (Exception e) { 260 LOG.warn("Could not unregister CamelContext MBean", e); 261 } 262 263 camelContextMBean = null; 264 } 265 266 public void onComponentAdd(String name, Component component) { 267 // always register components as there are only a few of those 268 if (!initialized) { 269 // pre register so we can register later when we have been initialized 270 PreRegisterService pre = new PreRegisterService(); 271 pre.onComponentAdd(name, component); 272 preServices.add(pre); 273 return; 274 } 275 try { 276 Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name); 277 manageObject(mc); 278 } catch (Exception e) { 279 LOG.warn("Could not register Component MBean", e); 280 } 281 } 282 283 public void onComponentRemove(String name, Component component) { 284 // the agent hasn't been started 285 if (!initialized) { 286 return; 287 } 288 try { 289 Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name); 290 unmanageObject(mc); 291 } catch (Exception e) { 292 LOG.warn("Could not unregister Component MBean", e); 293 } 294 } 295 296 /** 297 * If the endpoint is an instance of ManagedResource then register it with the 298 * mbean server, if it is not then wrap the endpoint in a {@link ManagedEndpoint} and 299 * register that with the mbean server. 300 * 301 * @param endpoint the Endpoint attempted to be added 302 */ 303 public void onEndpointAdd(Endpoint endpoint) { 304 if (!initialized) { 305 // pre register so we can register later when we have been initialized 306 PreRegisterService pre = new PreRegisterService(); 307 pre.onEndpointAdd(endpoint); 308 preServices.add(pre); 309 return; 310 } 311 312 if (!shouldRegister(endpoint, null)) { 313 // avoid registering if not needed 314 return; 315 } 316 317 try { 318 Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint); 319 if (me == null) { 320 // endpoint should not be managed 321 return; 322 } 323 manageObject(me); 324 } catch (Exception e) { 325 LOG.warn("Could not register Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e); 326 } 327 } 328 329 public void onEndpointRemove(Endpoint endpoint) { 330 // the agent hasn't been started 331 if (!initialized) { 332 return; 333 } 334 335 try { 336 Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint); 337 unmanageObject(me); 338 } catch (Exception e) { 339 LOG.warn("Could not unregister Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e); 340 } 341 } 342 343 public void onServiceAdd(CamelContext context, Service service, Route route) { 344 if (!initialized) { 345 // pre register so we can register later when we have been initialized 346 PreRegisterService pre = new PreRegisterService(); 347 pre.onServiceAdd(context, service, route); 348 preServices.add(pre); 349 return; 350 } 351 352 // services can by any kind of misc type but also processors 353 // so we have special logic when its a processor 354 355 if (!shouldRegister(service, route)) { 356 // avoid registering if not needed 357 return; 358 } 359 360 Object managedObject = getManagedObjectForService(context, service, route); 361 if (managedObject == null) { 362 // service should not be managed 363 return; 364 } 365 366 // skip already managed services, for example if a route has been restarted 367 if (getManagementStrategy().isManaged(managedObject, null)) { 368 LOG.trace("The service is already managed: {}", service); 369 return; 370 } 371 372 try { 373 manageObject(managedObject); 374 } catch (Exception e) { 375 LOG.warn("Could not register service: " + service + " as Service MBean.", e); 376 } 377 } 378 379 public void onServiceRemove(CamelContext context, Service service, Route route) { 380 // the agent hasn't been started 381 if (!initialized) { 382 return; 383 } 384 385 Object managedObject = getManagedObjectForService(context, service, route); 386 if (managedObject != null) { 387 try { 388 unmanageObject(managedObject); 389 } catch (Exception e) { 390 LOG.warn("Could not unregister service: " + service + " as Service MBean.", e); 391 } 392 } 393 } 394 395 @SuppressWarnings("unchecked") 396 private Object getManagedObjectForService(CamelContext context, Service service, Route route) { 397 // skip channel, UoW and dont double wrap instrumentation 398 if (service instanceof Channel || service instanceof UnitOfWork || service instanceof InstrumentationProcessor) { 399 return null; 400 } 401 402 Object answer = null; 403 404 if (service instanceof ManagementAware) { 405 return ((ManagementAware<Service>) service).getManagedObject(service); 406 } else if (service instanceof Tracer) { 407 // special for tracer 408 Tracer tracer = (Tracer) service; 409 ManagedTracer mt = managedTracers.get(tracer); 410 if (mt == null) { 411 mt = new ManagedTracer(context, tracer); 412 mt.init(getManagementStrategy()); 413 managedTracers.put(tracer, mt); 414 } 415 return mt; 416 } else if (service instanceof EventNotifier) { 417 answer = getManagementObjectStrategy().getManagedObjectForEventNotifier(context, (EventNotifier) service); 418 } else if (service instanceof Producer) { 419 answer = getManagementObjectStrategy().getManagedObjectForProducer(context, (Producer) service); 420 } else if (service instanceof Consumer) { 421 answer = getManagementObjectStrategy().getManagedObjectForConsumer(context, (Consumer) service); 422 } else if (service instanceof Processor) { 423 // special for processors as we need to do some extra work 424 return getManagedObjectForProcessor(context, (Processor) service, route); 425 } else if (service instanceof ThrottlingInflightRoutePolicy) { 426 answer = new ManagedThrottlingInflightRoutePolicy(context, (ThrottlingInflightRoutePolicy) service); 427 } else if (service instanceof ConsumerCache) { 428 answer = new ManagedConsumerCache(context, (ConsumerCache) service); 429 } else if (service instanceof ProducerCache) { 430 answer = new ManagedProducerCache(context, (ProducerCache) service); 431 } else if (service instanceof EndpointRegistry) { 432 answer = new ManagedEndpointRegistry(context, (EndpointRegistry) service); 433 } else if (service instanceof TypeConverterRegistry) { 434 answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service); 435 } else if (service != null) { 436 // fallback as generic service 437 answer = getManagementObjectStrategy().getManagedObjectForService(context, service); 438 } 439 440 if (answer != null && answer instanceof ManagedService) { 441 ManagedService ms = (ManagedService) answer; 442 ms.setRoute(route); 443 ms.init(getManagementStrategy()); 444 } 445 446 return answer; 447 } 448 449 private Object getManagedObjectForProcessor(CamelContext context, Processor processor, Route route) { 450 // a bit of magic here as the processors we want to manage have already been registered 451 // in the wrapped processors map when Camel have instrumented the route on route initialization 452 // so the idea is now to only manage the processors from the map 453 KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = wrappedProcessors.get(processor); 454 if (holder == null) { 455 // skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc. 456 return null; 457 } 458 459 // get the managed object as it can be a specialized type such as a Delayer/Throttler etc. 460 Object managedObject = getManagementObjectStrategy().getManagedObjectForProcessor(context, processor, holder.getKey(), route); 461 // only manage if we have a name for it as otherwise we do not want to manage it anyway 462 if (managedObject != null) { 463 // is it a performance counter then we need to set our counter 464 if (managedObject instanceof PerformanceCounter) { 465 InstrumentationProcessor counter = holder.getValue(); 466 if (counter != null) { 467 // change counter to us 468 counter.setCounter(managedObject); 469 } 470 } 471 } 472 473 return managedObject; 474 } 475 476 public void onRoutesAdd(Collection<Route> routes) { 477 for (Route route : routes) { 478 479 // if we are starting CamelContext or either of the two options has been 480 // enabled, then enlist the route as a known route 481 if (getCamelContext().getStatus().isStarting() 482 || getManagementStrategy().getManagementAgent().getRegisterAlways() 483 || getManagementStrategy().getManagementAgent().getRegisterNewRoutes()) { 484 // register as known route id 485 knowRouteIds.add(route.getId()); 486 } 487 488 if (!shouldRegister(route, route)) { 489 // avoid registering if not needed, skip to next route 490 continue; 491 } 492 493 Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route); 494 495 // skip already managed routes, for example if the route has been restarted 496 if (getManagementStrategy().isManaged(mr, null)) { 497 LOG.trace("The route is already managed: {}", route); 498 continue; 499 } 500 501 // get the wrapped instrumentation processor from this route 502 // and set me as the counter 503 if (route instanceof EventDrivenConsumerRoute) { 504 EventDrivenConsumerRoute edcr = (EventDrivenConsumerRoute) route; 505 Processor processor = edcr.getProcessor(); 506 if (processor instanceof InstrumentationProcessor && mr instanceof ManagedRoute) { 507 InstrumentationProcessor ip = (InstrumentationProcessor) processor; 508 ManagedRoute routeMBean = (ManagedRoute) mr; 509 510 // we need to wrap the counter with the camel context so we get stats updated on the context as well 511 if (camelContextMBean != null) { 512 CompositePerformanceCounter wrapper = new CompositePerformanceCounter(routeMBean, camelContextMBean); 513 ip.setCounter(wrapper); 514 } else { 515 ip.setCounter(routeMBean); 516 } 517 } 518 } 519 520 try { 521 manageObject(mr); 522 } catch (JMException e) { 523 LOG.warn("Could not register Route MBean", e); 524 } catch (Exception e) { 525 LOG.warn("Could not create Route MBean", e); 526 } 527 } 528 } 529 530 public void onRoutesRemove(Collection<Route> routes) { 531 // the agent hasn't been started 532 if (!initialized) { 533 return; 534 } 535 536 for (Route route : routes) { 537 Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route); 538 539 // skip unmanaged routes 540 if (!getManagementStrategy().isManaged(mr, null)) { 541 LOG.trace("The route is not managed: {}", route); 542 continue; 543 } 544 545 try { 546 unmanageObject(mr); 547 } catch (Exception e) { 548 LOG.warn("Could not unregister Route MBean", e); 549 } 550 551 // remove from known routes ids, as the route has been removed 552 knowRouteIds.remove(route.getId()); 553 } 554 555 // after the routes has been removed, we should clear the wrapped processors as we no longer need them 556 // as they were just a provisional map used during creation of routes 557 removeWrappedProcessorsForRoutes(routes); 558 } 559 560 public void onErrorHandlerAdd(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) { 561 if (!shouldRegister(errorHandler, null)) { 562 // avoid registering if not needed 563 return; 564 } 565 566 Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder); 567 568 // skip already managed services, for example if a route has been restarted 569 if (getManagementStrategy().isManaged(me, null)) { 570 LOG.trace("The error handler builder is already managed: {}", errorHandlerBuilder); 571 return; 572 } 573 574 try { 575 manageObject(me); 576 } catch (Exception e) { 577 LOG.warn("Could not register error handler builder: " + errorHandlerBuilder + " as ErrorHandler MBean.", e); 578 } 579 } 580 581 public void onErrorHandlerRemove(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) { 582 if (!initialized) { 583 return; 584 } 585 586 Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder); 587 if (me != null) { 588 try { 589 unmanageObject(me); 590 } catch (Exception e) { 591 LOG.warn("Could not unregister error handler: " + me + " as ErrorHandler MBean.", e); 592 } 593 } 594 } 595 596 public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool, String id, 597 String sourceId, String routeId, String threadPoolProfileId) { 598 599 if (!shouldRegister(threadPool, null)) { 600 // avoid registering if not needed 601 return; 602 } 603 604 Object mtp = getManagementObjectStrategy().getManagedObjectForThreadPool(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId); 605 606 // skip already managed services, for example if a route has been restarted 607 if (getManagementStrategy().isManaged(mtp, null)) { 608 LOG.trace("The thread pool is already managed: {}", threadPool); 609 return; 610 } 611 612 try { 613 manageObject(mtp); 614 // store a reference so we can unmanage from JMX when the thread pool is removed 615 // we need to keep track here, as we cannot re-construct the thread pool ObjectName when removing the thread pool 616 managedThreadPools.put(threadPool, mtp); 617 } catch (Exception e) { 618 LOG.warn("Could not register thread pool: " + threadPool + " as ThreadPool MBean.", e); 619 } 620 } 621 622 public void onThreadPoolRemove(CamelContext camelContext, ThreadPoolExecutor threadPool) { 623 if (!initialized) { 624 return; 625 } 626 627 // lookup the thread pool and remove it from JMX 628 Object mtp = managedThreadPools.remove(threadPool); 629 if (mtp != null) { 630 // skip unmanaged routes 631 if (!getManagementStrategy().isManaged(mtp, null)) { 632 LOG.trace("The thread pool is not managed: {}", threadPool); 633 return; 634 } 635 636 try { 637 unmanageObject(mtp); 638 } catch (Exception e) { 639 LOG.warn("Could not unregister ThreadPool MBean", e); 640 } 641 } 642 } 643 644 public void onRouteContextCreate(RouteContext routeContext) { 645 if (!initialized) { 646 return; 647 } 648 649 // Create a map (ProcessorType -> PerformanceCounter) 650 // to be passed to InstrumentationInterceptStrategy. 651 Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters = 652 new HashMap<ProcessorDefinition<?>, PerformanceCounter>(); 653 654 // Each processor in a route will have its own performance counter. 655 // These performance counter will be embedded to InstrumentationProcessor 656 // and wrap the appropriate processor by InstrumentationInterceptStrategy. 657 RouteDefinition route = routeContext.getRoute(); 658 659 // register performance counters for all processors and its children 660 for (ProcessorDefinition<?> processor : route.getOutputs()) { 661 registerPerformanceCounters(routeContext, processor, registeredCounters); 662 } 663 664 // set this managed intercept strategy that executes the JMX instrumentation for performance metrics 665 // so our registered counters can be used for fine grained performance instrumentation 666 routeContext.setManagedInterceptStrategy(new InstrumentationInterceptStrategy(registeredCounters, wrappedProcessors)); 667 } 668 669 /** 670 * Removes the wrapped processors for the given routes, as they are no longer in use. 671 * <p/> 672 * This is needed to avoid accumulating memory, if a lot of routes is being added and removed. 673 * 674 * @param routes the routes 675 */ 676 private void removeWrappedProcessorsForRoutes(Collection<Route> routes) { 677 // loop the routes, and remove the route associated wrapped processors, as they are no longer in use 678 for (Route route : routes) { 679 String id = route.getId(); 680 681 Iterator<KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> it = wrappedProcessors.values().iterator(); 682 while (it.hasNext()) { 683 KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = it.next(); 684 RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey()); 685 if (def != null && id.equals(def.getId())) { 686 it.remove(); 687 } 688 } 689 } 690 691 } 692 693 private void registerPerformanceCounters(RouteContext routeContext, ProcessorDefinition<?> processor, 694 Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters) { 695 696 // traverse children if any exists 697 List<ProcessorDefinition<?>> children = processor.getOutputs(); 698 for (ProcessorDefinition<?> child : children) { 699 registerPerformanceCounters(routeContext, child, registeredCounters); 700 } 701 702 // skip processors that should not be registered 703 if (!registerProcessor(processor)) { 704 return; 705 } 706 707 // okay this is a processor we would like to manage so create the 708 // a delegate performance counter that acts as the placeholder in the interceptor 709 // that then delegates to the real mbean which we register later in the onServiceAdd method 710 DelegatePerformanceCounter pc = new DelegatePerformanceCounter(); 711 // set statistics enabled depending on the option 712 boolean enabled = camelContext.getManagementStrategy().getStatisticsLevel() == ManagementStatisticsLevel.All; 713 pc.setStatisticsEnabled(enabled); 714 715 // and add it as a a registered counter that will be used lazy when Camel 716 // does the instrumentation of the route and adds the InstrumentationProcessor 717 // that does the actual performance metrics gatherings at runtime 718 registeredCounters.put(processor, pc); 719 } 720 721 /** 722 * Should the given processor be registered. 723 */ 724 protected boolean registerProcessor(ProcessorDefinition<?> processor) { 725 // skip on exception 726 if (processor instanceof OnExceptionDefinition) { 727 return false; 728 } 729 // skip on completion 730 if (processor instanceof OnCompletionDefinition) { 731 return false; 732 } 733 // skip intercept 734 if (processor instanceof InterceptDefinition) { 735 return false; 736 } 737 // skip aop 738 if (processor instanceof AOPDefinition) { 739 return false; 740 } 741 // skip policy 742 if (processor instanceof PolicyDefinition) { 743 return false; 744 } 745 746 // only if custom id assigned 747 if (getManagementStrategy().isOnlyManageProcessorWithCustomId()) { 748 return processor.hasCustomIdAssigned(); 749 } 750 751 // use customer filter 752 return getManagementStrategy().manageProcessor(processor); 753 } 754 755 private ManagementStrategy getManagementStrategy() { 756 ObjectHelper.notNull(camelContext, "CamelContext"); 757 return camelContext.getManagementStrategy(); 758 } 759 760 private ManagementObjectStrategy getManagementObjectStrategy() { 761 ObjectHelper.notNull(camelContext, "CamelContext"); 762 return camelContext.getManagementStrategy().getManagementObjectStrategy(); 763 } 764 765 /** 766 * Strategy for managing the object 767 * 768 * @param me the managed object 769 * @throws Exception is thrown if error registering the object for management 770 */ 771 protected void manageObject(Object me) throws Exception { 772 getManagementStrategy().manageObject(me); 773 if (timerListenerManager != null && me instanceof TimerListener) { 774 TimerListener timer = (TimerListener) me; 775 timerListenerManager.addTimerListener(timer); 776 } 777 } 778 779 /** 780 * Un-manages the object. 781 * 782 * @param me the managed object 783 * @throws Exception is thrown if error unregistering the managed object 784 */ 785 protected void unmanageObject(Object me) throws Exception { 786 if (timerListenerManager != null && me instanceof TimerListener) { 787 TimerListener timer = (TimerListener) me; 788 timerListenerManager.removeTimerListener(timer); 789 } 790 getManagementStrategy().unmanageObject(me); 791 } 792 793 /** 794 * Whether or not to register the mbean. 795 * <p/> 796 * The {@link ManagementAgent} has options which controls when to register. 797 * This allows us to only register mbeans accordingly. For example by default any 798 * dynamic endpoints is not registered. This avoids to register excessive mbeans, which 799 * most often is not desired. 800 * 801 * @param service the object to register 802 * @param route an optional route the mbean is associated with, can be <tt>null</tt> 803 * @return <tt>true</tt> to register, <tt>false</tt> to skip registering 804 */ 805 protected boolean shouldRegister(Object service, Route route) { 806 // the agent hasn't been started 807 if (!initialized) { 808 return false; 809 } 810 811 LOG.trace("Checking whether to register {} from route: {}", service, route); 812 813 ManagementAgent agent = getManagementStrategy().getManagementAgent(); 814 if (agent == null) { 815 // do not register if no agent 816 return false; 817 } 818 819 // always register if we are starting CamelContext 820 if (getCamelContext().getStatus().isStarting()) { 821 return true; 822 } 823 824 // register if always is enabled 825 if (agent.getRegisterAlways()) { 826 return true; 827 } 828 829 // is it a known route then always accept 830 if (route != null && knowRouteIds.contains(route.getId())) { 831 return true; 832 } 833 834 // only register if we are starting a new route, and current thread is in starting routes mode 835 if (agent.getRegisterNewRoutes()) { 836 // no specific route, then fallback to see if this thread is starting routes 837 // which is kept as state on the camel context 838 return getCamelContext().isStartingRoutes(); 839 } 840 841 return false; 842 } 843 844 @Override 845 protected void doStart() throws Exception { 846 ObjectHelper.notNull(camelContext, "CamelContext"); 847 848 // defer starting the timer manager until CamelContext has been fully started 849 camelContext.addStartupListener(timerManagerStartupListener); 850 } 851 852 private final class TimerListenerManagerStartupListener implements StartupListener { 853 854 @Override 855 public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception { 856 boolean enabled = camelContext.getManagementStrategy().getStatisticsLevel() != ManagementStatisticsLevel.Off; 857 if (enabled) { 858 LOG.info("StatisticsLevel at {} so enabling load performance statistics", camelContext.getManagementStrategy().getStatisticsLevel()); 859 // must use 1 sec interval as the load statistics is based on 1 sec calculations 860 timerListenerManager.setInterval(1000); 861 // we have to defer enlisting timer lister manager as a service until CamelContext has been started 862 getCamelContext().addService(timerListenerManager); 863 } 864 } 865 } 866 867 @Override 868 protected void doStop() throws Exception { 869 initialized = false; 870 knowRouteIds.clear(); 871 preServices.clear(); 872 wrappedProcessors.clear(); 873 managedTracers.clear(); 874 managedThreadPools.clear(); 875 } 876 877 /** 878 * Class which holds any pre registration details. 879 * 880 * @see org.apache.camel.management.DefaultManagementLifecycleStrategy#enlistPreRegisteredServices() 881 */ 882 private static final class PreRegisterService { 883 884 private String name; 885 private Component component; 886 private Endpoint endpoint; 887 private CamelContext camelContext; 888 private Service service; 889 private Route route; 890 891 public void onComponentAdd(String name, Component component) { 892 this.name = name; 893 this.component = component; 894 } 895 896 public void onEndpointAdd(Endpoint endpoint) { 897 this.endpoint = endpoint; 898 } 899 900 public void onServiceAdd(CamelContext camelContext, Service service, Route route) { 901 this.camelContext = camelContext; 902 this.service = service; 903 this.route = route; 904 } 905 906 public String getName() { 907 return name; 908 } 909 910 public Component getComponent() { 911 return component; 912 } 913 914 public Endpoint getEndpoint() { 915 return endpoint; 916 } 917 918 public CamelContext getCamelContext() { 919 return camelContext; 920 } 921 922 public Service getService() { 923 return service; 924 } 925 926 public Route getRoute() { 927 return route; 928 } 929 } 930 931 } 932