001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.management; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.HashMap; 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.concurrent.ThreadPoolExecutor; 028 029import javax.management.JMException; 030import javax.management.MalformedObjectNameException; 031import javax.management.ObjectName; 032 033import org.apache.camel.CamelContext; 034import org.apache.camel.CamelContextAware; 035import org.apache.camel.Channel; 036import org.apache.camel.Component; 037import org.apache.camel.Consumer; 038import org.apache.camel.Endpoint; 039import org.apache.camel.ExtendedCamelContext; 040import org.apache.camel.ManagementStatisticsLevel; 041import org.apache.camel.NamedNode; 042import org.apache.camel.NonManagedService; 043import org.apache.camel.Processor; 044import org.apache.camel.Producer; 045import org.apache.camel.Route; 046import org.apache.camel.RuntimeCamelException; 047import org.apache.camel.Service; 048import org.apache.camel.StartupListener; 049import org.apache.camel.TimerListener; 050import org.apache.camel.VetoCamelContextStartException; 051import org.apache.camel.cluster.CamelClusterService; 052import org.apache.camel.health.HealthCheckRegistry; 053import org.apache.camel.impl.debugger.BacklogDebugger; 054import org.apache.camel.impl.debugger.BacklogTracer; 055import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager; 056import org.apache.camel.management.mbean.ManagedBacklogDebugger; 057import org.apache.camel.management.mbean.ManagedBacklogTracer; 058import org.apache.camel.management.mbean.ManagedBeanIntrospection; 059import org.apache.camel.management.mbean.ManagedCamelContext; 060import org.apache.camel.management.mbean.ManagedConsumerCache; 061import org.apache.camel.management.mbean.ManagedEndpoint; 062import org.apache.camel.management.mbean.ManagedEndpointRegistry; 063import org.apache.camel.management.mbean.ManagedInflightRepository; 064import org.apache.camel.management.mbean.ManagedProducerCache; 065import org.apache.camel.management.mbean.ManagedRestRegistry; 066import org.apache.camel.management.mbean.ManagedRoute; 067import org.apache.camel.management.mbean.ManagedRuntimeEndpointRegistry; 068import org.apache.camel.management.mbean.ManagedService; 069import org.apache.camel.management.mbean.ManagedStreamCachingStrategy; 070import org.apache.camel.management.mbean.ManagedThrottlingExceptionRoutePolicy; 071import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy; 072import org.apache.camel.management.mbean.ManagedTracer; 073import org.apache.camel.management.mbean.ManagedTransformerRegistry; 074import org.apache.camel.management.mbean.ManagedTypeConverterRegistry; 075import org.apache.camel.management.mbean.ManagedValidatorRegistry; 076import org.apache.camel.model.InterceptDefinition; 077import org.apache.camel.model.OnCompletionDefinition; 078import org.apache.camel.model.OnExceptionDefinition; 079import org.apache.camel.model.PolicyDefinition; 080import org.apache.camel.model.ProcessorDefinition; 081import org.apache.camel.model.ProcessorDefinitionHelper; 082import org.apache.camel.model.RouteDefinition; 083import org.apache.camel.spi.AsyncProcessorAwaitManager; 084import org.apache.camel.spi.BeanIntrospection; 085import org.apache.camel.spi.ConsumerCache; 086import org.apache.camel.spi.DataFormat; 087import org.apache.camel.spi.EndpointRegistry; 088import org.apache.camel.spi.EventNotifier; 089import org.apache.camel.spi.InflightRepository; 090import org.apache.camel.spi.InternalProcessor; 091import org.apache.camel.spi.LifecycleStrategy; 092import org.apache.camel.spi.ManagementAgent; 093import org.apache.camel.spi.ManagementInterceptStrategy.InstrumentationProcessor; 094import org.apache.camel.spi.ManagementNameStrategy; 095import org.apache.camel.spi.ManagementObjectStrategy; 096import org.apache.camel.spi.ManagementStrategy; 097import org.apache.camel.spi.ProducerCache; 098import org.apache.camel.spi.RestRegistry; 099import org.apache.camel.spi.RuntimeEndpointRegistry; 100import org.apache.camel.spi.StreamCachingStrategy; 101import org.apache.camel.spi.Tracer; 102import org.apache.camel.spi.TransformerRegistry; 103import org.apache.camel.spi.TypeConverterRegistry; 104import org.apache.camel.spi.UnitOfWork; 105import org.apache.camel.spi.ValidatorRegistry; 106import org.apache.camel.support.TimerListenerManager; 107import org.apache.camel.support.service.ServiceSupport; 108import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy; 109import org.apache.camel.throttling.ThrottlingInflightRoutePolicy; 110import org.apache.camel.util.KeyValueHolder; 111import org.apache.camel.util.ObjectHelper; 112import org.slf4j.Logger; 113import org.slf4j.LoggerFactory; 114 115/** 116 * Default JMX managed lifecycle strategy that registered objects using the configured 117 * {@link org.apache.camel.spi.ManagementStrategy}. 118 * 119 * @see org.apache.camel.spi.ManagementStrategy 120 */ 121public class JmxManagementLifecycleStrategy extends ServiceSupport implements LifecycleStrategy, CamelContextAware { 122 123 private static final Logger LOG = LoggerFactory.getLogger(JmxManagementLifecycleStrategy.class); 124 125 // the wrapped processors is for performance counters, which are in use for the created routes 126 // when a route is removed, we should remove the associated processors from this map 127 private final Map<Processor, KeyValueHolder<NamedNode, InstrumentationProcessor>> wrappedProcessors = new HashMap<>(); 128 private final List<java.util.function.Consumer<JmxManagementLifecycleStrategy>> preServices = new ArrayList<>(); 129 private final TimerListenerManager loadTimer = new ManagedLoadTimer(); 130 private final TimerListenerManagerStartupListener loadTimerStartupListener = new TimerListenerManagerStartupListener(); 131 private volatile CamelContext camelContext; 132 private volatile ManagedCamelContext camelContextMBean; 133 private volatile boolean initialized; 134 private final Set<String> knowRouteIds = new HashSet<>(); 135 private final Map<BacklogTracer, ManagedBacklogTracer> managedBacklogTracers = new HashMap<>(); 136 private final Map<BacklogDebugger, ManagedBacklogDebugger> managedBacklogDebuggers = new HashMap<>(); 137 private final Map<ThreadPoolExecutor, Object> managedThreadPools = new HashMap<>(); 138 139 public JmxManagementLifecycleStrategy() { 140 } 141 142 public JmxManagementLifecycleStrategy(CamelContext camelContext) { 143 this.camelContext = camelContext; 144 } 145 146 // used for handing over pre-services between a provisional lifecycycle strategy 147 // and then later the actual strategy to be used when using XML 148 List<java.util.function.Consumer<JmxManagementLifecycleStrategy>> getPreServices() { 149 return preServices; 150 } 151 152 // used for handing over pre-services between a provisional lifecycycle strategy 153 // and then later the actual strategy to be used when using XML 154 void addPreService(java.util.function.Consumer<JmxManagementLifecycleStrategy> preService) { 155 preServices.add(preService); 156 } 157 158 @Override 159 public CamelContext getCamelContext() { 160 return camelContext; 161 } 162 163 @Override 164 public void setCamelContext(CamelContext camelContext) { 165 this.camelContext = camelContext; 166 } 167 168 @Override 169 public void onContextStarting(CamelContext context) throws VetoCamelContextStartException { 170 Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context); 171 172 String name = context.getName(); 173 String managementName = context.getManagementName(); 174 175 if (managementName == null) { 176 managementName = context.getManagementNameStrategy().getName(); 177 } 178 179 try { 180 boolean done = false; 181 while (!done) { 182 ObjectName on = getManagementStrategy().getManagementObjectNameStrategy() 183 .getObjectNameForCamelContext(managementName, name); 184 boolean exists = getManagementStrategy().isManagedName(on); 185 if (!exists) { 186 done = true; 187 } else { 188 // okay there exists already a CamelContext with this name, we can try to fix it by finding a free name 189 boolean fixed = false; 190 // if we use the default name strategy we can find a free name to use 191 String newName = findFreeName(mc, context.getManagementNameStrategy(), name); 192 if (newName != null) { 193 // use this as the fixed name 194 fixed = true; 195 done = true; 196 managementName = newName; 197 } 198 // we could not fix it so veto starting camel 199 if (!fixed) { 200 throw new VetoCamelContextStartException( 201 "CamelContext (" + context.getName() + ") with ObjectName[" + on + "] is already registered." 202 + " Make sure to use unique names on CamelContext when using multiple CamelContexts in the same MBeanServer.", 203 context); 204 } else { 205 LOG.warn("This CamelContext(" + context.getName() + ") will be registered using the name: " 206 + managementName 207 + " due to clash with an existing name already registered in MBeanServer."); 208 } 209 } 210 } 211 } catch (VetoCamelContextStartException e) { 212 // rethrow veto 213 throw e; 214 } catch (Exception e) { 215 // must rethrow to allow CamelContext fallback to non JMX agent to allow 216 // Camel to continue to run 217 throw RuntimeCamelException.wrapRuntimeCamelException(e); 218 } 219 220 // set the name we are going to use 221 context.setManagementName(managementName); 222 223 // yes we made it and are initialized 224 initialized = true; 225 226 try { 227 manageObject(mc); 228 } catch (Exception e) { 229 // must rethrow to allow CamelContext fallback to non JMX agent to allow 230 // Camel to continue to run 231 throw RuntimeCamelException.wrapRuntimeCamelException(e); 232 } 233 234 if (mc instanceof ManagedCamelContext) { 235 camelContextMBean = (ManagedCamelContext) mc; 236 } 237 238 // register any pre registered now that we are initialized 239 enlistPreRegisteredServices(); 240 241 // register health check if detected 242 HealthCheckRegistry hcr = context.getExtension(HealthCheckRegistry.class); 243 if (hcr != null) { 244 try { 245 Object me = getManagementObjectStrategy().getManagedObjectForCamelHealth(camelContext, hcr); 246 if (me == null) { 247 // endpoint should not be managed 248 return; 249 } 250 manageObject(me); 251 } catch (Exception e) { 252 LOG.warn("Could not register CamelHealth MBean. This exception will be ignored.", e); 253 } 254 } 255 256 try { 257 Object me = getManagementObjectStrategy().getManagedObjectForRouteController(camelContext, 258 camelContext.getRouteController()); 259 if (me == null) { 260 // endpoint should not be managed 261 return; 262 } 263 manageObject(me); 264 } catch (Exception e) { 265 LOG.warn("Could not register RouteController MBean. This exception will be ignored.", e); 266 } 267 } 268 269 private String findFreeName(Object mc, ManagementNameStrategy strategy, String name) throws MalformedObjectNameException { 270 // we cannot find a free name for fixed named strategies 271 if (strategy.isFixedName()) { 272 return null; 273 } 274 275 // okay try to find a free name 276 boolean done = false; 277 String newName = null; 278 while (!done) { 279 // compute the next name 280 newName = strategy.getNextName(); 281 ObjectName on 282 = getManagementStrategy().getManagementObjectNameStrategy().getObjectNameForCamelContext(newName, name); 283 done = !getManagementStrategy().isManagedName(on); 284 if (LOG.isTraceEnabled()) { 285 LOG.trace("Using name: {} in ObjectName[{}] exists? {}", name, on, done); 286 } 287 } 288 return newName; 289 } 290 291 /** 292 * After {@link CamelContext} has been enlisted in JMX using {@link #onContextStart(org.apache.camel.CamelContext)} 293 * then we can enlist any pre registered services as well, as we had to wait for {@link CamelContext} to be enlisted 294 * first. 295 * <p/> 296 * A component/endpoint/service etc. can be pre registered when using dependency injection and annotations such as 297 * {@link org.apache.camel.Produce}, {@link org.apache.camel.EndpointInject}. Therefore we need to capture those 298 * registrations up front, and then afterwards enlist in JMX when {@link CamelContext} is being started. 299 */ 300 private void enlistPreRegisteredServices() { 301 if (preServices.isEmpty()) { 302 return; 303 } 304 305 LOG.debug("Registering {} pre registered services", preServices.size()); 306 for (java.util.function.Consumer<JmxManagementLifecycleStrategy> pre : preServices) { 307 pre.accept(this); 308 } 309 310 // we are done so clear the list 311 preServices.clear(); 312 } 313 314 @Override 315 public void onContextStopped(CamelContext context) { 316 // the agent hasn't been started 317 if (!initialized) { 318 return; 319 } 320 321 try { 322 Object mc = getManagementObjectStrategy().getManagedObjectForRouteController(context, context.getRouteController()); 323 // the context could have been removed already 324 if (getManagementStrategy().isManaged(mc)) { 325 unmanageObject(mc); 326 } 327 } catch (Exception e) { 328 LOG.warn("Could not unregister RouteController MBean", e); 329 } 330 331 try { 332 Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context); 333 // the context could have been removed already 334 if (getManagementStrategy().isManaged(mc)) { 335 unmanageObject(mc); 336 } 337 } catch (Exception e) { 338 LOG.warn("Could not unregister CamelContext MBean", e); 339 } 340 341 HealthCheckRegistry hcr = context.getExtension(HealthCheckRegistry.class); 342 if (hcr != null) { 343 try { 344 Object mc = getManagementObjectStrategy().getManagedObjectForCamelHealth(context, hcr); 345 // the context could have been removed already 346 if (getManagementStrategy().isManaged(mc)) { 347 unmanageObject(mc); 348 } 349 } catch (Exception e) { 350 LOG.warn("Could not unregister CamelHealth MBean", e); 351 } 352 } 353 354 camelContextMBean = null; 355 } 356 357 @Override 358 public void onComponentAdd(String name, Component component) { 359 // always register components as there are only a few of those 360 if (!initialized) { 361 // pre register so we can register later when we have been initialized 362 preServices.add(lf -> lf.onComponentAdd(name, component)); 363 return; 364 } 365 try { 366 Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name); 367 manageObject(mc); 368 } catch (Exception e) { 369 LOG.warn("Could not register Component MBean", e); 370 } 371 } 372 373 @Override 374 public void onComponentRemove(String name, Component component) { 375 // the agent hasn't been started 376 if (!initialized) { 377 return; 378 } 379 try { 380 Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name); 381 unmanageObject(mc); 382 } catch (Exception e) { 383 LOG.warn("Could not unregister Component MBean", e); 384 } 385 } 386 387 /** 388 * If the endpoint is an instance of ManagedResource then register it with the mbean server, if it is not then wrap 389 * the endpoint in a {@link ManagedEndpoint} and register that with the mbean server. 390 * 391 * @param endpoint the Endpoint attempted to be added 392 */ 393 @Override 394 public void onEndpointAdd(Endpoint endpoint) { 395 if (!initialized) { 396 // pre register so we can register later when we have been initialized 397 preServices.add(lf -> lf.onEndpointAdd(endpoint)); 398 return; 399 } 400 401 if (!shouldRegister(endpoint, null)) { 402 // avoid registering if not needed 403 return; 404 } 405 406 try { 407 Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint); 408 if (me == null) { 409 // endpoint should not be managed 410 return; 411 } 412 manageObject(me); 413 } catch (Exception e) { 414 LOG.warn("Could not register Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e); 415 } 416 } 417 418 @Override 419 public void onEndpointRemove(Endpoint endpoint) { 420 // the agent hasn't been started 421 if (!initialized) { 422 return; 423 } 424 425 try { 426 Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint); 427 unmanageObject(me); 428 } catch (Exception e) { 429 LOG.warn("Could not unregister Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e); 430 } 431 } 432 433 @Override 434 public void onServiceAdd(CamelContext context, Service service, Route route) { 435 if (!initialized) { 436 // pre register so we can register later when we have been initialized 437 preServices.add(lf -> lf.onServiceAdd(camelContext, service, route)); 438 return; 439 } 440 441 // services can by any kind of misc type but also processors 442 // so we have special logic when its a processor 443 444 if (!shouldRegister(service, route)) { 445 // avoid registering if not needed 446 return; 447 } 448 449 Object managedObject = getManagedObjectForService(context, service, route); 450 if (managedObject == null) { 451 // service should not be managed 452 return; 453 } 454 455 // skip already managed services, for example if a route has been restarted 456 if (getManagementStrategy().isManaged(managedObject)) { 457 LOG.trace("The service is already managed: {}", service); 458 return; 459 } 460 461 try { 462 manageObject(managedObject); 463 } catch (Exception e) { 464 LOG.warn("Could not register service: " + service + " as Service MBean.", e); 465 } 466 } 467 468 @Override 469 public void onServiceRemove(CamelContext context, Service service, Route route) { 470 // the agent hasn't been started 471 if (!initialized) { 472 return; 473 } 474 475 Object managedObject = getManagedObjectForService(context, service, route); 476 if (managedObject != null) { 477 try { 478 unmanageObject(managedObject); 479 } catch (Exception e) { 480 LOG.warn("Could not unregister service: " + service + " as Service MBean.", e); 481 } 482 } 483 } 484 485 @SuppressWarnings("unchecked") 486 private Object getManagedObjectForService(CamelContext context, Service service, Route route) { 487 // skip channel, UoW and dont double wrap instrumentation 488 if (service instanceof Channel || service instanceof UnitOfWork || service instanceof InstrumentationProcessor) { 489 return null; 490 } 491 492 // skip non managed services 493 if (service instanceof NonManagedService) { 494 return null; 495 } 496 497 Object answer = null; 498 499 if (service instanceof BacklogTracer) { 500 // special for backlog tracer 501 BacklogTracer backlogTracer = (BacklogTracer) service; 502 ManagedBacklogTracer mt = managedBacklogTracers.get(backlogTracer); 503 if (mt == null) { 504 mt = new ManagedBacklogTracer(context, backlogTracer); 505 mt.init(getManagementStrategy()); 506 managedBacklogTracers.put(backlogTracer, mt); 507 } 508 return mt; 509 } else if (service instanceof BacklogDebugger) { 510 // special for backlog debugger 511 BacklogDebugger backlogDebugger = (BacklogDebugger) service; 512 ManagedBacklogDebugger md = managedBacklogDebuggers.get(backlogDebugger); 513 if (md == null) { 514 md = new ManagedBacklogDebugger(context, backlogDebugger); 515 md.init(getManagementStrategy()); 516 managedBacklogDebuggers.put(backlogDebugger, md); 517 } 518 return md; 519 } else if (service instanceof Tracer) { 520 ManagedTracer mt = new ManagedTracer(camelContext, (Tracer) service); 521 mt.init(getManagementStrategy()); 522 answer = mt; 523 } else if (service instanceof DataFormat) { 524 answer = getManagementObjectStrategy().getManagedObjectForDataFormat(context, (DataFormat) service); 525 } else if (service instanceof Producer) { 526 answer = getManagementObjectStrategy().getManagedObjectForProducer(context, (Producer) service); 527 } else if (service instanceof Consumer) { 528 answer = getManagementObjectStrategy().getManagedObjectForConsumer(context, (Consumer) service); 529 } else if (service instanceof Processor) { 530 // special for processors as we need to do some extra work 531 return getManagedObjectForProcessor(context, (Processor) service, route); 532 } else if (service instanceof ThrottlingInflightRoutePolicy) { 533 answer = new ManagedThrottlingInflightRoutePolicy(context, (ThrottlingInflightRoutePolicy) service); 534 } else if (service instanceof ThrottlingExceptionRoutePolicy) { 535 answer = new ManagedThrottlingExceptionRoutePolicy(context, (ThrottlingExceptionRoutePolicy) service); 536 } else if (service instanceof ConsumerCache) { 537 answer = new ManagedConsumerCache(context, (ConsumerCache) service); 538 } else if (service instanceof ProducerCache) { 539 answer = new ManagedProducerCache(context, (ProducerCache) service); 540 } else if (service instanceof EndpointRegistry) { 541 answer = new ManagedEndpointRegistry(context, (EndpointRegistry) service); 542 } else if (service instanceof BeanIntrospection) { 543 answer = new ManagedBeanIntrospection(context, (BeanIntrospection) service); 544 } else if (service instanceof TypeConverterRegistry) { 545 answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service); 546 } else if (service instanceof RestRegistry) { 547 answer = new ManagedRestRegistry(context, (RestRegistry) service); 548 } else if (service instanceof InflightRepository) { 549 answer = new ManagedInflightRepository(context, (InflightRepository) service); 550 } else if (service instanceof AsyncProcessorAwaitManager) { 551 answer = new ManagedAsyncProcessorAwaitManager(context, (AsyncProcessorAwaitManager) service); 552 } else if (service instanceof RuntimeEndpointRegistry) { 553 answer = new ManagedRuntimeEndpointRegistry(context, (RuntimeEndpointRegistry) service); 554 } else if (service instanceof StreamCachingStrategy) { 555 answer = new ManagedStreamCachingStrategy(context, (StreamCachingStrategy) service); 556 } else if (service instanceof EventNotifier) { 557 answer = getManagementObjectStrategy().getManagedObjectForEventNotifier(context, (EventNotifier) service); 558 } else if (service instanceof TransformerRegistry) { 559 answer = new ManagedTransformerRegistry(context, (TransformerRegistry) service); 560 } else if (service instanceof ValidatorRegistry) { 561 answer = new ManagedValidatorRegistry(context, (ValidatorRegistry) service); 562 } else if (service instanceof CamelClusterService) { 563 answer = getManagementObjectStrategy().getManagedObjectForClusterService(context, (CamelClusterService) service); 564 } else if (service != null) { 565 // fallback as generic service 566 answer = getManagementObjectStrategy().getManagedObjectForService(context, service); 567 } 568 569 if (answer instanceof ManagedService) { 570 ManagedService ms = (ManagedService) answer; 571 ms.setRoute(route); 572 ms.init(getManagementStrategy()); 573 } 574 575 return answer; 576 } 577 578 private Object getManagedObjectForProcessor(CamelContext context, Processor processor, Route route) { 579 // a bit of magic here as the processors we want to manage have already been registered 580 // in the wrapped processors map when Camel have instrumented the route on route initialization 581 // so the idea is now to only manage the processors from the map 582 KeyValueHolder<NamedNode, InstrumentationProcessor> holder = wrappedProcessors.get(processor); 583 if (holder == null) { 584 // skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc. 585 return null; 586 } 587 588 // get the managed object as it can be a specialized type such as a Delayer/Throttler etc. 589 Object managedObject 590 = getManagementObjectStrategy().getManagedObjectForProcessor(context, processor, holder.getKey(), route); 591 // only manage if we have a name for it as otherwise we do not want to manage it anyway 592 if (managedObject != null) { 593 // is it a performance counter then we need to set our counter 594 if (managedObject instanceof PerformanceCounter) { 595 InstrumentationProcessor counter = holder.getValue(); 596 if (counter != null) { 597 // change counter to us 598 counter.setCounter(managedObject); 599 } 600 } 601 } 602 603 return managedObject; 604 } 605 606 @Override 607 public void onRoutesAdd(Collection<Route> routes) { 608 for (Route route : routes) { 609 610 // if we are starting CamelContext or either of the two options has been 611 // enabled, then enlist the route as a known route 612 if (getCamelContext().getStatus().isStarting() 613 || getManagementStrategy().getManagementAgent().getRegisterAlways() 614 || getManagementStrategy().getManagementAgent().getRegisterNewRoutes()) { 615 // register as known route id 616 knowRouteIds.add(route.getId()); 617 } 618 619 if (!shouldRegister(route, route)) { 620 // avoid registering if not needed, skip to next route 621 continue; 622 } 623 624 Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route); 625 626 // skip already managed routes, for example if the route has been restarted 627 if (getManagementStrategy().isManaged(mr)) { 628 LOG.trace("The route is already managed: {}", route); 629 continue; 630 } 631 632 // get the wrapped instrumentation processor from this route 633 // and set me as the counter 634 Processor processor = route.getProcessor(); 635 if (processor instanceof InternalProcessor && mr instanceof ManagedRoute) { 636 InternalProcessor internal = (InternalProcessor) processor; 637 ManagedRoute routeMBean = (ManagedRoute) mr; 638 639 DefaultInstrumentationProcessor task = internal.getAdvice(DefaultInstrumentationProcessor.class); 640 if (task != null) { 641 // we need to wrap the counter with the camel context so we get stats updated on the context as well 642 if (camelContextMBean != null) { 643 CompositePerformanceCounter wrapper = new CompositePerformanceCounter(routeMBean, camelContextMBean); 644 task.setCounter(wrapper); 645 } else { 646 task.setCounter(routeMBean); 647 } 648 } 649 } 650 651 try { 652 manageObject(mr); 653 } catch (JMException e) { 654 LOG.warn("Could not register Route MBean", e); 655 } catch (Exception e) { 656 LOG.warn("Could not create Route MBean", e); 657 } 658 } 659 } 660 661 @Override 662 public void onRoutesRemove(Collection<Route> routes) { 663 // the agent hasn't been started 664 if (!initialized) { 665 return; 666 } 667 668 for (Route route : routes) { 669 Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route); 670 671 // skip unmanaged routes 672 if (!getManagementStrategy().isManaged(mr)) { 673 LOG.trace("The route is not managed: {}", route); 674 continue; 675 } 676 677 try { 678 unmanageObject(mr); 679 } catch (Exception e) { 680 LOG.warn("Could not unregister Route MBean", e); 681 } 682 683 // remove from known routes ids, as the route has been removed 684 knowRouteIds.remove(route.getId()); 685 } 686 687 // after the routes has been removed, we should clear the wrapped processors as we no longer need them 688 // as they were just a provisional map used during creation of routes 689 removeWrappedProcessorsForRoutes(routes); 690 } 691 692 @Override 693 public void onThreadPoolAdd( 694 CamelContext camelContext, ThreadPoolExecutor threadPool, String id, 695 String sourceId, String routeId, String threadPoolProfileId) { 696 697 if (!initialized) { 698 // pre register so we can register later when we have been initialized 699 preServices.add(lf -> lf.onThreadPoolAdd(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId)); 700 return; 701 } 702 703 if (!shouldRegister(threadPool, null)) { 704 // avoid registering if not needed 705 return; 706 } 707 708 Object mtp = getManagementObjectStrategy().getManagedObjectForThreadPool(camelContext, threadPool, id, sourceId, 709 routeId, threadPoolProfileId); 710 711 // skip already managed services, for example if a route has been restarted 712 if (getManagementStrategy().isManaged(mtp)) { 713 LOG.trace("The thread pool is already managed: {}", threadPool); 714 return; 715 } 716 717 try { 718 manageObject(mtp); 719 // store a reference so we can unmanage from JMX when the thread pool is removed 720 // we need to keep track here, as we cannot re-construct the thread pool ObjectName when removing the thread pool 721 managedThreadPools.put(threadPool, mtp); 722 } catch (Exception e) { 723 LOG.warn("Could not register thread pool: " + threadPool + " as ThreadPool MBean.", e); 724 } 725 } 726 727 @Override 728 public void onThreadPoolRemove(CamelContext camelContext, ThreadPoolExecutor threadPool) { 729 if (!initialized) { 730 return; 731 } 732 733 // lookup the thread pool and remove it from JMX 734 Object mtp = managedThreadPools.remove(threadPool); 735 if (mtp != null) { 736 // skip unmanaged routes 737 if (!getManagementStrategy().isManaged(mtp)) { 738 LOG.trace("The thread pool is not managed: {}", threadPool); 739 return; 740 } 741 742 try { 743 unmanageObject(mtp); 744 } catch (Exception e) { 745 LOG.warn("Could not unregister ThreadPool MBean", e); 746 } 747 } 748 } 749 750 @Override 751 public void onRouteContextCreate(Route route) { 752 // Create a map (ProcessorType -> PerformanceCounter) 753 // to be passed to InstrumentationInterceptStrategy. 754 Map<NamedNode, PerformanceCounter> registeredCounters = new HashMap<>(); 755 756 // Each processor in a route will have its own performance counter. 757 // These performance counter will be embedded to InstrumentationProcessor 758 // and wrap the appropriate processor by InstrumentationInterceptStrategy. 759 RouteDefinition routeDefinition = (RouteDefinition) route.getRoute(); 760 761 // register performance counters for all processors and its children 762 for (ProcessorDefinition<?> processor : routeDefinition.getOutputs()) { 763 registerPerformanceCounters(route, processor, registeredCounters); 764 } 765 766 // set this managed intercept strategy that executes the JMX instrumentation for performance metrics 767 // so our registered counters can be used for fine grained performance instrumentation 768 route.setManagementInterceptStrategy(new InstrumentationInterceptStrategy(registeredCounters, wrappedProcessors)); 769 } 770 771 /** 772 * Removes the wrapped processors for the given routes, as they are no longer in use. 773 * <p/> 774 * This is needed to avoid accumulating memory, if a lot of routes is being added and removed. 775 * 776 * @param routes the routes 777 */ 778 private void removeWrappedProcessorsForRoutes(Collection<Route> routes) { 779 // loop the routes, and remove the route associated wrapped processors, as they are no longer in use 780 for (Route route : routes) { 781 String id = route.getId(); 782 783 Iterator<KeyValueHolder<NamedNode, InstrumentationProcessor>> it = wrappedProcessors.values().iterator(); 784 while (it.hasNext()) { 785 KeyValueHolder<NamedNode, InstrumentationProcessor> holder = it.next(); 786 RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey()); 787 if (def != null && id.equals(def.getId())) { 788 it.remove(); 789 } 790 } 791 } 792 793 } 794 795 private void registerPerformanceCounters( 796 Route route, ProcessorDefinition<?> processor, 797 Map<NamedNode, PerformanceCounter> registeredCounters) { 798 799 // traverse children if any exists 800 List<ProcessorDefinition<?>> children = processor.getOutputs(); 801 for (ProcessorDefinition<?> child : children) { 802 registerPerformanceCounters(route, child, registeredCounters); 803 } 804 805 // skip processors that should not be registered 806 if (!registerProcessor(processor)) { 807 return; 808 } 809 810 // okay this is a processor we would like to manage so create the 811 // a delegate performance counter that acts as the placeholder in the interceptor 812 // that then delegates to the real mbean which we register later in the onServiceAdd method 813 DelegatePerformanceCounter pc = new DelegatePerformanceCounter(); 814 // set statistics enabled depending on the option 815 boolean enabled = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isDefaultOrExtended(); 816 pc.setStatisticsEnabled(enabled); 817 818 // and add it as a a registered counter that will be used lazy when Camel 819 // does the instrumentation of the route and adds the InstrumentationProcessor 820 // that does the actual performance metrics gatherings at runtime 821 registeredCounters.put(processor, pc); 822 } 823 824 /** 825 * Should the given processor be registered. 826 */ 827 protected boolean registerProcessor(ProcessorDefinition<?> processor) { 828 // skip on exception 829 if (processor instanceof OnExceptionDefinition) { 830 return false; 831 } 832 // skip on completion 833 if (processor instanceof OnCompletionDefinition) { 834 return false; 835 } 836 // skip intercept 837 if (processor instanceof InterceptDefinition) { 838 return false; 839 } 840 // skip policy 841 if (processor instanceof PolicyDefinition) { 842 return false; 843 } 844 845 // only if custom id assigned 846 boolean only = getManagementStrategy().getManagementAgent().getOnlyRegisterProcessorWithCustomId() != null 847 && getManagementStrategy().getManagementAgent().getOnlyRegisterProcessorWithCustomId(); 848 if (only) { 849 return processor.hasCustomIdAssigned(); 850 } 851 852 // use customer filter 853 return getManagementStrategy().manageProcessor(processor); 854 } 855 856 private ManagementStrategy getManagementStrategy() { 857 ObjectHelper.notNull(camelContext, "CamelContext"); 858 return camelContext.getManagementStrategy(); 859 } 860 861 private ManagementObjectStrategy getManagementObjectStrategy() { 862 ObjectHelper.notNull(camelContext, "CamelContext"); 863 return camelContext.getManagementStrategy().getManagementObjectStrategy(); 864 } 865 866 /** 867 * Strategy for managing the object 868 * 869 * @param me the managed object 870 * @throws Exception is thrown if error registering the object for management 871 */ 872 protected void manageObject(Object me) throws Exception { 873 getManagementStrategy().manageObject(me); 874 if (me instanceof TimerListener) { 875 TimerListener timer = (TimerListener) me; 876 loadTimer.addTimerListener(timer); 877 } 878 } 879 880 /** 881 * Un-manages the object. 882 * 883 * @param me the managed object 884 * @throws Exception is thrown if error unregistering the managed object 885 */ 886 protected void unmanageObject(Object me) throws Exception { 887 if (me instanceof TimerListener) { 888 TimerListener timer = (TimerListener) me; 889 loadTimer.removeTimerListener(timer); 890 } 891 getManagementStrategy().unmanageObject(me); 892 } 893 894 /** 895 * Whether or not to register the mbean. 896 * <p/> 897 * The {@link ManagementAgent} has options which controls when to register. This allows us to only register mbeans 898 * accordingly. For example by default any dynamic endpoints is not registered. This avoids to register excessive 899 * mbeans, which most often is not desired. 900 * 901 * @param service the object to register 902 * @param route an optional route the mbean is associated with, can be <tt>null</tt> 903 * @return <tt>true</tt> to register, <tt>false</tt> to skip registering 904 */ 905 protected boolean shouldRegister(Object service, Route route) { 906 // the agent hasn't been started 907 if (!initialized) { 908 return false; 909 } 910 911 LOG.trace("Checking whether to register {} from route: {}", service, route); 912 913 ManagementAgent agent = getManagementStrategy().getManagementAgent(); 914 if (agent == null) { 915 // do not register if no agent 916 return false; 917 } 918 919 // always register if we are starting CamelContext 920 if (getCamelContext().getStatus().isStarting() 921 || getCamelContext().getStatus().isInitializing()) { 922 return true; 923 } 924 925 // always register if we are setting up routes 926 if (getCamelContext().adapt(ExtendedCamelContext.class).isSetupRoutes()) { 927 return true; 928 } 929 930 // register if always is enabled 931 if (agent.getRegisterAlways()) { 932 return true; 933 } 934 935 // is it a known route then always accept 936 if (route != null && knowRouteIds.contains(route.getId())) { 937 return true; 938 } 939 940 // only register if we are starting a new route, and current thread is in starting routes mode 941 if (agent.getRegisterNewRoutes()) { 942 // no specific route, then fallback to see if this thread is starting routes 943 // which is kept as state on the camel context 944 return getCamelContext().getRouteController().isStartingRoutes(); 945 } 946 947 return false; 948 } 949 950 @Override 951 protected void doStart() throws Exception { 952 ObjectHelper.notNull(camelContext, "CamelContext"); 953 954 // defer starting the timer manager until CamelContext has been fully started 955 camelContext.addStartupListener(loadTimerStartupListener); 956 } 957 958 private final class TimerListenerManagerStartupListener implements StartupListener { 959 960 @Override 961 public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception { 962 // we are disabled either if configured explicit, or if level is off 963 boolean load = camelContext.getManagementStrategy().getManagementAgent().getLoadStatisticsEnabled() != null 964 && camelContext.getManagementStrategy().getManagementAgent().getLoadStatisticsEnabled(); 965 boolean disabled = !load || camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel() 966 == ManagementStatisticsLevel.Off; 967 968 LOG.debug("Load performance statistics {}", disabled ? "disabled" : "enabled"); 969 if (!disabled) { 970 // must use 1 sec interval as the load statistics is based on 1 sec calculations 971 loadTimer.setInterval(1000); 972 // we have to defer enlisting timer lister manager as a service until CamelContext has been started 973 getCamelContext().addService(loadTimer); 974 } 975 } 976 } 977 978 @Override 979 protected void doStop() throws Exception { 980 initialized = false; 981 knowRouteIds.clear(); 982 preServices.clear(); 983 wrappedProcessors.clear(); 984 managedBacklogTracers.clear(); 985 managedBacklogDebuggers.clear(); 986 managedThreadPools.clear(); 987 } 988 989 /** 990 * Class which holds any pre registration details. 991 * 992 * @see JmxManagementLifecycleStrategy#enlistPreRegisteredServices() 993 */ 994 static final class PreRegisterService { 995 996 private String name; 997 private Component component; 998 private Endpoint endpoint; 999 private CamelContext camelContext; 1000 private Service service; 1001 private Route route; 1002 private java.util.function.Consumer<JmxManagementLifecycleStrategy> runnable; 1003 1004 public PreRegisterService() { 1005 } 1006 1007 public PreRegisterService(java.util.function.Consumer<JmxManagementLifecycleStrategy> runnable) { 1008 this.runnable = runnable; 1009 } 1010 1011 public void onComponentAdd(String name, Component component) { 1012 this.name = name; 1013 this.component = component; 1014 } 1015 1016 public void onEndpointAdd(Endpoint endpoint) { 1017 this.endpoint = endpoint; 1018 } 1019 1020 public void onServiceAdd(CamelContext camelContext, Service service, Route route) { 1021 this.camelContext = camelContext; 1022 this.service = service; 1023 this.route = route; 1024 } 1025 1026 public String getName() { 1027 return name; 1028 } 1029 1030 public Component getComponent() { 1031 return component; 1032 } 1033 1034 public Endpoint getEndpoint() { 1035 return endpoint; 1036 } 1037 1038 public CamelContext getCamelContext() { 1039 return camelContext; 1040 } 1041 1042 public Service getService() { 1043 return service; 1044 } 1045 1046 public Route getRoute() { 1047 return route; 1048 } 1049 1050 public java.util.function.Consumer<JmxManagementLifecycleStrategy> getRunnable() { 1051 return runnable; 1052 } 1053 1054 } 1055 1056}