001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.management; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.HashMap; 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.concurrent.ThreadPoolExecutor; 028 029import javax.management.JMException; 030import javax.management.MalformedObjectNameException; 031import javax.management.ObjectName; 032 033import org.apache.camel.CamelContext; 034import org.apache.camel.CamelContextAware; 035import org.apache.camel.Channel; 036import org.apache.camel.Component; 037import org.apache.camel.Consumer; 038import org.apache.camel.Endpoint; 039import org.apache.camel.ErrorHandlerFactory; 040import org.apache.camel.ManagementStatisticsLevel; 041import org.apache.camel.NonManagedService; 042import org.apache.camel.Processor; 043import org.apache.camel.Producer; 044import org.apache.camel.Route; 045import org.apache.camel.Service; 046import org.apache.camel.StartupListener; 047import org.apache.camel.TimerListener; 048import org.apache.camel.VetoCamelContextStartException; 049import org.apache.camel.api.management.PerformanceCounter; 050import org.apache.camel.cluster.CamelClusterService; 051import org.apache.camel.impl.ConsumerCache; 052import org.apache.camel.impl.DefaultCamelContext; 053import org.apache.camel.impl.DefaultEndpointRegistry; 054import org.apache.camel.impl.EventDrivenConsumerRoute; 055import org.apache.camel.impl.ProducerCache; 056import org.apache.camel.impl.ThrottlingExceptionRoutePolicy; 057import org.apache.camel.impl.ThrottlingInflightRoutePolicy; 058import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager; 059import org.apache.camel.management.mbean.ManagedBacklogDebugger; 060import org.apache.camel.management.mbean.ManagedBacklogTracer; 061import org.apache.camel.management.mbean.ManagedCamelContext; 062import org.apache.camel.management.mbean.ManagedConsumerCache; 063import org.apache.camel.management.mbean.ManagedEndpoint; 064import org.apache.camel.management.mbean.ManagedEndpointRegistry; 065import org.apache.camel.management.mbean.ManagedInflightRepository; 066import org.apache.camel.management.mbean.ManagedProducerCache; 067import org.apache.camel.management.mbean.ManagedRestRegistry; 068import org.apache.camel.management.mbean.ManagedRoute; 069import org.apache.camel.management.mbean.ManagedRuntimeCamelCatalog; 070import org.apache.camel.management.mbean.ManagedRuntimeEndpointRegistry; 071import org.apache.camel.management.mbean.ManagedService; 072import org.apache.camel.management.mbean.ManagedStreamCachingStrategy; 073import org.apache.camel.management.mbean.ManagedThrottlingExceptionRoutePolicy; 074import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy; 075import org.apache.camel.management.mbean.ManagedTracer; 076import org.apache.camel.management.mbean.ManagedTransformerRegistry; 077import org.apache.camel.management.mbean.ManagedTypeConverterRegistry; 078import org.apache.camel.management.mbean.ManagedValidatorRegistry; 079import org.apache.camel.model.AOPDefinition; 080import org.apache.camel.model.InterceptDefinition; 081import org.apache.camel.model.OnCompletionDefinition; 082import org.apache.camel.model.OnExceptionDefinition; 083import org.apache.camel.model.PolicyDefinition; 084import org.apache.camel.model.ProcessorDefinition; 085import org.apache.camel.model.ProcessorDefinitionHelper; 086import org.apache.camel.model.RouteDefinition; 087import org.apache.camel.processor.CamelInternalProcessor; 088import org.apache.camel.processor.interceptor.BacklogDebugger; 089import org.apache.camel.processor.interceptor.BacklogTracer; 090import org.apache.camel.processor.interceptor.Tracer; 091import org.apache.camel.runtimecatalog.RuntimeCamelCatalog; 092import org.apache.camel.spi.AsyncProcessorAwaitManager; 093import org.apache.camel.spi.DataFormat; 094import org.apache.camel.spi.EventNotifier; 095import org.apache.camel.spi.InflightRepository; 096import org.apache.camel.spi.LifecycleStrategy; 097import org.apache.camel.spi.ManagementAgent; 098import org.apache.camel.spi.ManagementAware; 099import org.apache.camel.spi.ManagementNameStrategy; 100import org.apache.camel.spi.ManagementObjectStrategy; 101import org.apache.camel.spi.ManagementStrategy; 102import org.apache.camel.spi.RestRegistry; 103import org.apache.camel.spi.RouteContext; 104import org.apache.camel.spi.RuntimeEndpointRegistry; 105import org.apache.camel.spi.StreamCachingStrategy; 106import org.apache.camel.spi.TransformerRegistry; 107import org.apache.camel.spi.TypeConverterRegistry; 108import org.apache.camel.spi.UnitOfWork; 109import org.apache.camel.spi.ValidatorRegistry; 110import org.apache.camel.support.ServiceSupport; 111import org.apache.camel.support.TimerListenerManager; 112import org.apache.camel.util.KeyValueHolder; 113import org.apache.camel.util.ObjectHelper; 114import org.slf4j.Logger; 115import org.slf4j.LoggerFactory; 116 117/** 118 * Default JMX managed lifecycle strategy that registered objects using the configured 119 * {@link org.apache.camel.spi.ManagementStrategy}. 120 * 121 * @see org.apache.camel.spi.ManagementStrategy 122 * @version 123 */ 124@SuppressWarnings("deprecation") 125public class DefaultManagementLifecycleStrategy extends ServiceSupport implements LifecycleStrategy, CamelContextAware { 126 127 private static final Logger LOG = LoggerFactory.getLogger(DefaultManagementLifecycleStrategy.class); 128 // the wrapped processors is for performance counters, which are in use for the created routes 129 // when a route is removed, we should remove the associated processors from this map 130 private final Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors = new HashMap<>(); 131 private final List<PreRegisterService> preServices = new ArrayList<>(); 132 private final TimerListenerManager loadTimer = new ManagedLoadTimer(); 133 private final TimerListenerManagerStartupListener loadTimerStartupListener = new TimerListenerManagerStartupListener(); 134 private volatile CamelContext camelContext; 135 private volatile ManagedCamelContext camelContextMBean; 136 private volatile boolean initialized; 137 private final Set<String> knowRouteIds = new HashSet<>(); 138 private final Map<Tracer, ManagedTracer> managedTracers = new HashMap<>(); 139 private final Map<BacklogTracer, ManagedBacklogTracer> managedBacklogTracers = new HashMap<>(); 140 private final Map<BacklogDebugger, ManagedBacklogDebugger> managedBacklogDebuggers = new HashMap<>(); 141 private final Map<ThreadPoolExecutor, Object> managedThreadPools = new HashMap<>(); 142 143 public DefaultManagementLifecycleStrategy() { 144 } 145 146 public DefaultManagementLifecycleStrategy(CamelContext camelContext) { 147 this.camelContext = camelContext; 148 } 149 150 public CamelContext getCamelContext() { 151 return camelContext; 152 } 153 154 public void setCamelContext(CamelContext camelContext) { 155 this.camelContext = camelContext; 156 } 157 158 public void onContextStart(CamelContext context) throws VetoCamelContextStartException { 159 Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context); 160 161 String name = context.getName(); 162 String managementName = context.getManagementNameStrategy().getName(); 163 164 try { 165 boolean done = false; 166 while (!done) { 167 ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(managementName, name); 168 boolean exists = getManagementStrategy().isManaged(mc, on); 169 if (!exists) { 170 done = true; 171 } else { 172 // okay there exists already a CamelContext with this name, we can try to fix it by finding a free name 173 boolean fixed = false; 174 // if we use the default name strategy we can find a free name to use 175 String newName = findFreeName(mc, context.getManagementNameStrategy(), name); 176 if (newName != null) { 177 // use this as the fixed name 178 fixed = true; 179 done = true; 180 managementName = newName; 181 } 182 // we could not fix it so veto starting camel 183 if (!fixed) { 184 throw new VetoCamelContextStartException("CamelContext (" + context.getName() + ") with ObjectName[" + on + "] is already registered." 185 + " Make sure to use unique names on CamelContext when using multiple CamelContexts in the same MBeanServer.", context); 186 } else { 187 LOG.warn("This CamelContext(" + context.getName() + ") will be registered using the name: " + managementName 188 + " due to clash with an existing name already registered in MBeanServer."); 189 } 190 } 191 } 192 } catch (VetoCamelContextStartException e) { 193 // rethrow veto 194 throw e; 195 } catch (Exception e) { 196 // must rethrow to allow CamelContext fallback to non JMX agent to allow 197 // Camel to continue to run 198 throw ObjectHelper.wrapRuntimeCamelException(e); 199 } 200 201 // set the name we are going to use 202 if (context instanceof DefaultCamelContext) { 203 ((DefaultCamelContext) context).setManagementName(managementName); 204 } 205 206 try { 207 manageObject(mc); 208 } catch (Exception e) { 209 // must rethrow to allow CamelContext fallback to non JMX agent to allow 210 // Camel to continue to run 211 throw ObjectHelper.wrapRuntimeCamelException(e); 212 } 213 214 // yes we made it and are initialized 215 initialized = true; 216 217 if (mc instanceof ManagedCamelContext) { 218 camelContextMBean = (ManagedCamelContext) mc; 219 } 220 221 // register any pre registered now that we are initialized 222 enlistPreRegisteredServices(); 223 224 try { 225 Object me = getManagementObjectStrategy().getManagedObjectForCamelHealth(camelContext); 226 if (me == null) { 227 // endpoint should not be managed 228 return; 229 } 230 manageObject(me); 231 } catch (Exception e) { 232 LOG.warn("Could not register CamelHealth MBean. This exception will be ignored.", e); 233 } 234 235 try { 236 Object me = getManagementObjectStrategy().getManagedObjectForRouteController(camelContext); 237 if (me == null) { 238 // endpoint should not be managed 239 return; 240 } 241 manageObject(me); 242 } catch (Exception e) { 243 LOG.warn("Could not register RouteController MBean. This exception will be ignored.", e); 244 } 245 } 246 247 private String findFreeName(Object mc, ManagementNameStrategy strategy, String name) throws MalformedObjectNameException { 248 // we cannot find a free name for fixed named strategies 249 if (strategy.isFixedName()) { 250 return null; 251 } 252 253 // okay try to find a free name 254 boolean done = false; 255 String newName = null; 256 while (!done) { 257 // compute the next name 258 newName = strategy.getNextName(); 259 ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(newName, name); 260 done = !getManagementStrategy().isManaged(mc, on); 261 if (LOG.isTraceEnabled()) { 262 LOG.trace("Using name: {} in ObjectName[{}] exists? {}", name, on, done); 263 } 264 } 265 return newName; 266 } 267 268 /** 269 * After {@link CamelContext} has been enlisted in JMX using {@link #onContextStart(org.apache.camel.CamelContext)} 270 * then we can enlist any pre registered services as well, as we had to wait for {@link CamelContext} to be 271 * enlisted first. 272 * <p/> 273 * A component/endpoint/service etc. can be pre registered when using dependency injection and annotations such as 274 * {@link org.apache.camel.Produce}, {@link org.apache.camel.EndpointInject}. Therefore we need to capture those 275 * registrations up front, and then afterwards enlist in JMX when {@link CamelContext} is being started. 276 */ 277 private void enlistPreRegisteredServices() { 278 if (preServices.isEmpty()) { 279 return; 280 } 281 282 LOG.debug("Registering {} pre registered services", preServices.size()); 283 for (PreRegisterService pre : preServices) { 284 if (pre.getComponent() != null) { 285 onComponentAdd(pre.getName(), pre.getComponent()); 286 } else if (pre.getEndpoint() != null) { 287 onEndpointAdd(pre.getEndpoint()); 288 } else if (pre.getService() != null) { 289 onServiceAdd(pre.getCamelContext(), pre.getService(), pre.getRoute()); 290 } 291 } 292 293 // we are done so clear the list 294 preServices.clear(); 295 } 296 297 public void onContextStop(CamelContext context) { 298 // the agent hasn't been started 299 if (!initialized) { 300 return; 301 } 302 303 try { 304 Object mc = getManagementObjectStrategy().getManagedObjectForRouteController(context); 305 // the context could have been removed already 306 if (getManagementStrategy().isManaged(mc, null)) { 307 unmanageObject(mc); 308 } 309 } catch (Exception e) { 310 LOG.warn("Could not unregister RouteController MBean", e); 311 } 312 313 try { 314 Object mc = getManagementObjectStrategy().getManagedObjectForCamelHealth(context); 315 // the context could have been removed already 316 if (getManagementStrategy().isManaged(mc, null)) { 317 unmanageObject(mc); 318 } 319 } catch (Exception e) { 320 LOG.warn("Could not unregister CamelHealth MBean", e); 321 } 322 323 try { 324 Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context); 325 // the context could have been removed already 326 if (getManagementStrategy().isManaged(mc, null)) { 327 unmanageObject(mc); 328 } 329 } catch (Exception e) { 330 LOG.warn("Could not unregister CamelContext MBean", e); 331 } 332 333 camelContextMBean = null; 334 } 335 336 public void onComponentAdd(String name, Component component) { 337 // always register components as there are only a few of those 338 if (!initialized) { 339 // pre register so we can register later when we have been initialized 340 PreRegisterService pre = new PreRegisterService(); 341 pre.onComponentAdd(name, component); 342 preServices.add(pre); 343 return; 344 } 345 try { 346 Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name); 347 manageObject(mc); 348 } catch (Exception e) { 349 LOG.warn("Could not register Component MBean", e); 350 } 351 } 352 353 public void onComponentRemove(String name, Component component) { 354 // the agent hasn't been started 355 if (!initialized) { 356 return; 357 } 358 try { 359 Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name); 360 unmanageObject(mc); 361 } catch (Exception e) { 362 LOG.warn("Could not unregister Component MBean", e); 363 } 364 } 365 366 /** 367 * If the endpoint is an instance of ManagedResource then register it with the 368 * mbean server, if it is not then wrap the endpoint in a {@link ManagedEndpoint} and 369 * register that with the mbean server. 370 * 371 * @param endpoint the Endpoint attempted to be added 372 */ 373 public void onEndpointAdd(Endpoint endpoint) { 374 if (!initialized) { 375 // pre register so we can register later when we have been initialized 376 PreRegisterService pre = new PreRegisterService(); 377 pre.onEndpointAdd(endpoint); 378 preServices.add(pre); 379 return; 380 } 381 382 if (!shouldRegister(endpoint, null)) { 383 // avoid registering if not needed 384 return; 385 } 386 387 try { 388 Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint); 389 if (me == null) { 390 // endpoint should not be managed 391 return; 392 } 393 manageObject(me); 394 } catch (Exception e) { 395 LOG.warn("Could not register Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e); 396 } 397 } 398 399 public void onEndpointRemove(Endpoint endpoint) { 400 // the agent hasn't been started 401 if (!initialized) { 402 return; 403 } 404 405 try { 406 Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint); 407 unmanageObject(me); 408 } catch (Exception e) { 409 LOG.warn("Could not unregister Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e); 410 } 411 } 412 413 public void onServiceAdd(CamelContext context, Service service, Route route) { 414 if (!initialized) { 415 // pre register so we can register later when we have been initialized 416 PreRegisterService pre = new PreRegisterService(); 417 pre.onServiceAdd(context, service, route); 418 preServices.add(pre); 419 return; 420 } 421 422 // services can by any kind of misc type but also processors 423 // so we have special logic when its a processor 424 425 if (!shouldRegister(service, route)) { 426 // avoid registering if not needed 427 return; 428 } 429 430 Object managedObject = getManagedObjectForService(context, service, route); 431 if (managedObject == null) { 432 // service should not be managed 433 return; 434 } 435 436 // skip already managed services, for example if a route has been restarted 437 if (getManagementStrategy().isManaged(managedObject, null)) { 438 LOG.trace("The service is already managed: {}", service); 439 return; 440 } 441 442 try { 443 manageObject(managedObject); 444 } catch (Exception e) { 445 LOG.warn("Could not register service: " + service + " as Service MBean.", e); 446 } 447 } 448 449 public void onServiceRemove(CamelContext context, Service service, Route route) { 450 // the agent hasn't been started 451 if (!initialized) { 452 return; 453 } 454 455 Object managedObject = getManagedObjectForService(context, service, route); 456 if (managedObject != null) { 457 try { 458 unmanageObject(managedObject); 459 } catch (Exception e) { 460 LOG.warn("Could not unregister service: " + service + " as Service MBean.", e); 461 } 462 } 463 } 464 465 @SuppressWarnings("unchecked") 466 private Object getManagedObjectForService(CamelContext context, Service service, Route route) { 467 // skip channel, UoW and dont double wrap instrumentation 468 if (service instanceof Channel || service instanceof UnitOfWork || service instanceof InstrumentationProcessor) { 469 return null; 470 } 471 472 // skip non managed services 473 if (service instanceof NonManagedService) { 474 return null; 475 } 476 477 Object answer = null; 478 479 if (service instanceof ManagementAware) { 480 return ((ManagementAware<Service>) service).getManagedObject(service); 481 } else if (service instanceof Tracer) { 482 // special for tracer 483 Tracer tracer = (Tracer) service; 484 ManagedTracer mt = managedTracers.get(tracer); 485 if (mt == null) { 486 mt = new ManagedTracer(context, tracer); 487 mt.init(getManagementStrategy()); 488 managedTracers.put(tracer, mt); 489 } 490 return mt; 491 } else if (service instanceof BacklogTracer) { 492 // special for backlog tracer 493 BacklogTracer backlogTracer = (BacklogTracer) service; 494 ManagedBacklogTracer mt = managedBacklogTracers.get(backlogTracer); 495 if (mt == null) { 496 mt = new ManagedBacklogTracer(context, backlogTracer); 497 mt.init(getManagementStrategy()); 498 managedBacklogTracers.put(backlogTracer, mt); 499 } 500 return mt; 501 } else if (service instanceof BacklogDebugger) { 502 // special for backlog debugger 503 BacklogDebugger backlogDebugger = (BacklogDebugger) service; 504 ManagedBacklogDebugger md = managedBacklogDebuggers.get(backlogDebugger); 505 if (md == null) { 506 md = new ManagedBacklogDebugger(context, backlogDebugger); 507 md.init(getManagementStrategy()); 508 managedBacklogDebuggers.put(backlogDebugger, md); 509 } 510 return md; 511 } else if (service instanceof DataFormat) { 512 answer = getManagementObjectStrategy().getManagedObjectForDataFormat(context, (DataFormat) service); 513 } else if (service instanceof Producer) { 514 answer = getManagementObjectStrategy().getManagedObjectForProducer(context, (Producer) service); 515 } else if (service instanceof Consumer) { 516 answer = getManagementObjectStrategy().getManagedObjectForConsumer(context, (Consumer) service); 517 } else if (service instanceof Processor) { 518 // special for processors as we need to do some extra work 519 return getManagedObjectForProcessor(context, (Processor) service, route); 520 } else if (service instanceof ThrottlingInflightRoutePolicy) { 521 answer = new ManagedThrottlingInflightRoutePolicy(context, (ThrottlingInflightRoutePolicy) service); 522 } else if (service instanceof ThrottlingExceptionRoutePolicy) { 523 answer = new ManagedThrottlingExceptionRoutePolicy(context, (ThrottlingExceptionRoutePolicy) service); 524 } else if (service instanceof ConsumerCache) { 525 answer = new ManagedConsumerCache(context, (ConsumerCache) service); 526 } else if (service instanceof ProducerCache) { 527 answer = new ManagedProducerCache(context, (ProducerCache) service); 528 } else if (service instanceof DefaultEndpointRegistry) { 529 answer = new ManagedEndpointRegistry(context, (DefaultEndpointRegistry) service); 530 } else if (service instanceof TypeConverterRegistry) { 531 answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service); 532 } else if (service instanceof RestRegistry) { 533 answer = new ManagedRestRegistry(context, (RestRegistry) service); 534 } else if (service instanceof InflightRepository) { 535 answer = new ManagedInflightRepository(context, (InflightRepository) service); 536 } else if (service instanceof AsyncProcessorAwaitManager) { 537 answer = new ManagedAsyncProcessorAwaitManager(context, (AsyncProcessorAwaitManager) service); 538 } else if (service instanceof RuntimeEndpointRegistry) { 539 answer = new ManagedRuntimeEndpointRegistry(context, (RuntimeEndpointRegistry) service); 540 } else if (service instanceof StreamCachingStrategy) { 541 answer = new ManagedStreamCachingStrategy(context, (StreamCachingStrategy) service); 542 } else if (service instanceof EventNotifier) { 543 answer = getManagementObjectStrategy().getManagedObjectForEventNotifier(context, (EventNotifier) service); 544 } else if (service instanceof TransformerRegistry) { 545 answer = new ManagedTransformerRegistry(context, (TransformerRegistry)service); 546 } else if (service instanceof ValidatorRegistry) { 547 answer = new ManagedValidatorRegistry(context, (ValidatorRegistry)service); 548 } else if (service instanceof RuntimeCamelCatalog) { 549 answer = new ManagedRuntimeCamelCatalog(context, (RuntimeCamelCatalog) service); 550 } else if (service instanceof CamelClusterService) { 551 answer = getManagementObjectStrategy().getManagedObjectForClusterService(context, (CamelClusterService)service); 552 } else if (service != null) { 553 // fallback as generic service 554 answer = getManagementObjectStrategy().getManagedObjectForService(context, service); 555 } 556 557 if (answer instanceof ManagedService) { 558 ManagedService ms = (ManagedService) answer; 559 ms.setRoute(route); 560 ms.init(getManagementStrategy()); 561 } 562 563 return answer; 564 } 565 566 private Object getManagedObjectForProcessor(CamelContext context, Processor processor, Route route) { 567 // a bit of magic here as the processors we want to manage have already been registered 568 // in the wrapped processors map when Camel have instrumented the route on route initialization 569 // so the idea is now to only manage the processors from the map 570 KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = wrappedProcessors.get(processor); 571 if (holder == null) { 572 // skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc. 573 return null; 574 } 575 576 // get the managed object as it can be a specialized type such as a Delayer/Throttler etc. 577 Object managedObject = getManagementObjectStrategy().getManagedObjectForProcessor(context, processor, holder.getKey(), route); 578 // only manage if we have a name for it as otherwise we do not want to manage it anyway 579 if (managedObject != null) { 580 // is it a performance counter then we need to set our counter 581 if (managedObject instanceof PerformanceCounter) { 582 InstrumentationProcessor counter = holder.getValue(); 583 if (counter != null) { 584 // change counter to us 585 counter.setCounter(managedObject); 586 } 587 } 588 } 589 590 return managedObject; 591 } 592 593 public void onRoutesAdd(Collection<Route> routes) { 594 for (Route route : routes) { 595 596 // if we are starting CamelContext or either of the two options has been 597 // enabled, then enlist the route as a known route 598 if (getCamelContext().getStatus().isStarting() 599 || getManagementStrategy().getManagementAgent().getRegisterAlways() 600 || getManagementStrategy().getManagementAgent().getRegisterNewRoutes()) { 601 // register as known route id 602 knowRouteIds.add(route.getId()); 603 } 604 605 if (!shouldRegister(route, route)) { 606 // avoid registering if not needed, skip to next route 607 continue; 608 } 609 610 Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route); 611 612 // skip already managed routes, for example if the route has been restarted 613 if (getManagementStrategy().isManaged(mr, null)) { 614 LOG.trace("The route is already managed: {}", route); 615 continue; 616 } 617 618 // get the wrapped instrumentation processor from this route 619 // and set me as the counter 620 if (route instanceof EventDrivenConsumerRoute) { 621 EventDrivenConsumerRoute edcr = (EventDrivenConsumerRoute) route; 622 Processor processor = edcr.getProcessor(); 623 if (processor instanceof CamelInternalProcessor && mr instanceof ManagedRoute) { 624 CamelInternalProcessor internal = (CamelInternalProcessor) processor; 625 ManagedRoute routeMBean = (ManagedRoute) mr; 626 627 CamelInternalProcessor.InstrumentationAdvice task = internal.getAdvice(CamelInternalProcessor.InstrumentationAdvice.class); 628 if (task != null) { 629 // we need to wrap the counter with the camel context so we get stats updated on the context as well 630 if (camelContextMBean != null) { 631 CompositePerformanceCounter wrapper = new CompositePerformanceCounter(routeMBean, camelContextMBean); 632 task.setCounter(wrapper); 633 } else { 634 task.setCounter(routeMBean); 635 } 636 } 637 } 638 } 639 640 try { 641 manageObject(mr); 642 } catch (JMException e) { 643 LOG.warn("Could not register Route MBean", e); 644 } catch (Exception e) { 645 LOG.warn("Could not create Route MBean", e); 646 } 647 } 648 } 649 650 public void onRoutesRemove(Collection<Route> routes) { 651 // the agent hasn't been started 652 if (!initialized) { 653 return; 654 } 655 656 for (Route route : routes) { 657 Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route); 658 659 // skip unmanaged routes 660 if (!getManagementStrategy().isManaged(mr, null)) { 661 LOG.trace("The route is not managed: {}", route); 662 continue; 663 } 664 665 try { 666 unmanageObject(mr); 667 } catch (Exception e) { 668 LOG.warn("Could not unregister Route MBean", e); 669 } 670 671 // remove from known routes ids, as the route has been removed 672 knowRouteIds.remove(route.getId()); 673 } 674 675 // after the routes has been removed, we should clear the wrapped processors as we no longer need them 676 // as they were just a provisional map used during creation of routes 677 removeWrappedProcessorsForRoutes(routes); 678 } 679 680 public void onErrorHandlerAdd(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) { 681 if (!shouldRegister(errorHandler, null)) { 682 // avoid registering if not needed 683 return; 684 } 685 686 Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder); 687 688 // skip already managed services, for example if a route has been restarted 689 if (getManagementStrategy().isManaged(me, null)) { 690 LOG.trace("The error handler builder is already managed: {}", errorHandlerBuilder); 691 return; 692 } 693 694 try { 695 manageObject(me); 696 } catch (Exception e) { 697 LOG.warn("Could not register error handler builder: " + errorHandlerBuilder + " as ErrorHandler MBean.", e); 698 } 699 } 700 701 public void onErrorHandlerRemove(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) { 702 if (!initialized) { 703 return; 704 } 705 706 Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder); 707 if (me != null) { 708 try { 709 unmanageObject(me); 710 } catch (Exception e) { 711 LOG.warn("Could not unregister error handler: " + me + " as ErrorHandler MBean.", e); 712 } 713 } 714 } 715 716 public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool, String id, 717 String sourceId, String routeId, String threadPoolProfileId) { 718 719 if (!shouldRegister(threadPool, null)) { 720 // avoid registering if not needed 721 return; 722 } 723 724 Object mtp = getManagementObjectStrategy().getManagedObjectForThreadPool(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId); 725 726 // skip already managed services, for example if a route has been restarted 727 if (getManagementStrategy().isManaged(mtp, null)) { 728 LOG.trace("The thread pool is already managed: {}", threadPool); 729 return; 730 } 731 732 try { 733 manageObject(mtp); 734 // store a reference so we can unmanage from JMX when the thread pool is removed 735 // we need to keep track here, as we cannot re-construct the thread pool ObjectName when removing the thread pool 736 managedThreadPools.put(threadPool, mtp); 737 } catch (Exception e) { 738 LOG.warn("Could not register thread pool: " + threadPool + " as ThreadPool MBean.", e); 739 } 740 } 741 742 public void onThreadPoolRemove(CamelContext camelContext, ThreadPoolExecutor threadPool) { 743 if (!initialized) { 744 return; 745 } 746 747 // lookup the thread pool and remove it from JMX 748 Object mtp = managedThreadPools.remove(threadPool); 749 if (mtp != null) { 750 // skip unmanaged routes 751 if (!getManagementStrategy().isManaged(mtp, null)) { 752 LOG.trace("The thread pool is not managed: {}", threadPool); 753 return; 754 } 755 756 try { 757 unmanageObject(mtp); 758 } catch (Exception e) { 759 LOG.warn("Could not unregister ThreadPool MBean", e); 760 } 761 } 762 } 763 764 public void onRouteContextCreate(RouteContext routeContext) { 765 if (!initialized) { 766 return; 767 } 768 769 // Create a map (ProcessorType -> PerformanceCounter) 770 // to be passed to InstrumentationInterceptStrategy. 771 Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters = new HashMap<>(); 772 773 // Each processor in a route will have its own performance counter. 774 // These performance counter will be embedded to InstrumentationProcessor 775 // and wrap the appropriate processor by InstrumentationInterceptStrategy. 776 RouteDefinition route = routeContext.getRoute(); 777 778 // register performance counters for all processors and its children 779 for (ProcessorDefinition<?> processor : route.getOutputs()) { 780 registerPerformanceCounters(routeContext, processor, registeredCounters); 781 } 782 783 // set this managed intercept strategy that executes the JMX instrumentation for performance metrics 784 // so our registered counters can be used for fine grained performance instrumentation 785 routeContext.setManagedInterceptStrategy(new InstrumentationInterceptStrategy(registeredCounters, wrappedProcessors)); 786 } 787 788 /** 789 * Removes the wrapped processors for the given routes, as they are no longer in use. 790 * <p/> 791 * This is needed to avoid accumulating memory, if a lot of routes is being added and removed. 792 * 793 * @param routes the routes 794 */ 795 private void removeWrappedProcessorsForRoutes(Collection<Route> routes) { 796 // loop the routes, and remove the route associated wrapped processors, as they are no longer in use 797 for (Route route : routes) { 798 String id = route.getId(); 799 800 Iterator<KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> it = wrappedProcessors.values().iterator(); 801 while (it.hasNext()) { 802 KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = it.next(); 803 RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey()); 804 if (def != null && id.equals(def.getId())) { 805 it.remove(); 806 } 807 } 808 } 809 810 } 811 812 private void registerPerformanceCounters(RouteContext routeContext, ProcessorDefinition<?> processor, 813 Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters) { 814 815 // traverse children if any exists 816 List<ProcessorDefinition<?>> children = processor.getOutputs(); 817 for (ProcessorDefinition<?> child : children) { 818 registerPerformanceCounters(routeContext, child, registeredCounters); 819 } 820 821 // skip processors that should not be registered 822 if (!registerProcessor(processor)) { 823 return; 824 } 825 826 // okay this is a processor we would like to manage so create the 827 // a delegate performance counter that acts as the placeholder in the interceptor 828 // that then delegates to the real mbean which we register later in the onServiceAdd method 829 DelegatePerformanceCounter pc = new DelegatePerformanceCounter(); 830 // set statistics enabled depending on the option 831 boolean enabled = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isDefaultOrExtended(); 832 pc.setStatisticsEnabled(enabled); 833 834 // and add it as a a registered counter that will be used lazy when Camel 835 // does the instrumentation of the route and adds the InstrumentationProcessor 836 // that does the actual performance metrics gatherings at runtime 837 registeredCounters.put(processor, pc); 838 } 839 840 /** 841 * Should the given processor be registered. 842 */ 843 protected boolean registerProcessor(ProcessorDefinition<?> processor) { 844 // skip on exception 845 if (processor instanceof OnExceptionDefinition) { 846 return false; 847 } 848 // skip on completion 849 if (processor instanceof OnCompletionDefinition) { 850 return false; 851 } 852 // skip intercept 853 if (processor instanceof InterceptDefinition) { 854 return false; 855 } 856 // skip aop 857 if (processor instanceof AOPDefinition) { 858 return false; 859 } 860 // skip policy 861 if (processor instanceof PolicyDefinition) { 862 return false; 863 } 864 865 // only if custom id assigned 866 boolean only = getManagementStrategy().getManagementAgent().getOnlyRegisterProcessorWithCustomId() != null 867 && getManagementStrategy().getManagementAgent().getOnlyRegisterProcessorWithCustomId(); 868 if (only) { 869 return processor.hasCustomIdAssigned(); 870 } 871 872 // use customer filter 873 return getManagementStrategy().manageProcessor(processor); 874 } 875 876 private ManagementStrategy getManagementStrategy() { 877 ObjectHelper.notNull(camelContext, "CamelContext"); 878 return camelContext.getManagementStrategy(); 879 } 880 881 private ManagementObjectStrategy getManagementObjectStrategy() { 882 ObjectHelper.notNull(camelContext, "CamelContext"); 883 return camelContext.getManagementStrategy().getManagementObjectStrategy(); 884 } 885 886 /** 887 * Strategy for managing the object 888 * 889 * @param me the managed object 890 * @throws Exception is thrown if error registering the object for management 891 */ 892 protected void manageObject(Object me) throws Exception { 893 getManagementStrategy().manageObject(me); 894 if (me instanceof TimerListener) { 895 TimerListener timer = (TimerListener) me; 896 loadTimer.addTimerListener(timer); 897 } 898 } 899 900 /** 901 * Un-manages the object. 902 * 903 * @param me the managed object 904 * @throws Exception is thrown if error unregistering the managed object 905 */ 906 protected void unmanageObject(Object me) throws Exception { 907 if (me instanceof TimerListener) { 908 TimerListener timer = (TimerListener) me; 909 loadTimer.removeTimerListener(timer); 910 } 911 getManagementStrategy().unmanageObject(me); 912 } 913 914 /** 915 * Whether or not to register the mbean. 916 * <p/> 917 * The {@link ManagementAgent} has options which controls when to register. 918 * This allows us to only register mbeans accordingly. For example by default any 919 * dynamic endpoints is not registered. This avoids to register excessive mbeans, which 920 * most often is not desired. 921 * 922 * @param service the object to register 923 * @param route an optional route the mbean is associated with, can be <tt>null</tt> 924 * @return <tt>true</tt> to register, <tt>false</tt> to skip registering 925 */ 926 protected boolean shouldRegister(Object service, Route route) { 927 // the agent hasn't been started 928 if (!initialized) { 929 return false; 930 } 931 932 LOG.trace("Checking whether to register {} from route: {}", service, route); 933 934 ManagementAgent agent = getManagementStrategy().getManagementAgent(); 935 if (agent == null) { 936 // do not register if no agent 937 return false; 938 } 939 940 // always register if we are starting CamelContext 941 if (getCamelContext().getStatus().isStarting()) { 942 return true; 943 } 944 945 // always register if we are setting up routes 946 if (getCamelContext().isSetupRoutes()) { 947 return true; 948 } 949 950 // register if always is enabled 951 if (agent.getRegisterAlways()) { 952 return true; 953 } 954 955 // is it a known route then always accept 956 if (route != null && knowRouteIds.contains(route.getId())) { 957 return true; 958 } 959 960 // only register if we are starting a new route, and current thread is in starting routes mode 961 if (agent.getRegisterNewRoutes()) { 962 // no specific route, then fallback to see if this thread is starting routes 963 // which is kept as state on the camel context 964 return getCamelContext().isStartingRoutes(); 965 } 966 967 return false; 968 } 969 970 @Override 971 protected void doStart() throws Exception { 972 ObjectHelper.notNull(camelContext, "CamelContext"); 973 974 // defer starting the timer manager until CamelContext has been fully started 975 camelContext.addStartupListener(loadTimerStartupListener); 976 } 977 978 private final class TimerListenerManagerStartupListener implements StartupListener { 979 980 @Override 981 public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception { 982 // we are disabled either if configured explicit, or if level is off 983 boolean load = camelContext.getManagementStrategy().getManagementAgent().getLoadStatisticsEnabled() != null 984 && camelContext.getManagementStrategy().getManagementAgent().getLoadStatisticsEnabled(); 985 boolean disabled = !load || camelContext.getManagementStrategy().getStatisticsLevel() == ManagementStatisticsLevel.Off; 986 987 LOG.debug("Load performance statistics {}", disabled ? "disabled" : "enabled"); 988 if (!disabled) { 989 // must use 1 sec interval as the load statistics is based on 1 sec calculations 990 loadTimer.setInterval(1000); 991 // we have to defer enlisting timer lister manager as a service until CamelContext has been started 992 getCamelContext().addService(loadTimer); 993 } 994 } 995 } 996 997 @Override 998 protected void doStop() throws Exception { 999 initialized = false; 1000 knowRouteIds.clear(); 1001 preServices.clear(); 1002 wrappedProcessors.clear(); 1003 managedTracers.clear(); 1004 managedBacklogTracers.clear(); 1005 managedBacklogDebuggers.clear(); 1006 managedThreadPools.clear(); 1007 } 1008 1009 /** 1010 * Class which holds any pre registration details. 1011 * 1012 * @see org.apache.camel.management.DefaultManagementLifecycleStrategy#enlistPreRegisteredServices() 1013 */ 1014 private static final class PreRegisterService { 1015 1016 private String name; 1017 private Component component; 1018 private Endpoint endpoint; 1019 private CamelContext camelContext; 1020 private Service service; 1021 private Route route; 1022 1023 public void onComponentAdd(String name, Component component) { 1024 this.name = name; 1025 this.component = component; 1026 } 1027 1028 public void onEndpointAdd(Endpoint endpoint) { 1029 this.endpoint = endpoint; 1030 } 1031 1032 public void onServiceAdd(CamelContext camelContext, Service service, Route route) { 1033 this.camelContext = camelContext; 1034 this.service = service; 1035 this.route = route; 1036 } 1037 1038 public String getName() { 1039 return name; 1040 } 1041 1042 public Component getComponent() { 1043 return component; 1044 } 1045 1046 public Endpoint getEndpoint() { 1047 return endpoint; 1048 } 1049 1050 public CamelContext getCamelContext() { 1051 return camelContext; 1052 } 1053 1054 public Service getService() { 1055 return service; 1056 } 1057 1058 public Route getRoute() { 1059 return route; 1060 } 1061 } 1062 1063} 1064