001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.management; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.HashMap; 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.concurrent.ThreadPoolExecutor; 028import javax.management.JMException; 029import javax.management.MalformedObjectNameException; 030import javax.management.ObjectName; 031 032import org.apache.camel.CamelContext; 033import org.apache.camel.CamelContextAware; 034import org.apache.camel.Channel; 035import org.apache.camel.Component; 036import org.apache.camel.Consumer; 037import org.apache.camel.Endpoint; 038import org.apache.camel.ErrorHandlerFactory; 039import org.apache.camel.ManagementStatisticsLevel; 040import org.apache.camel.NonManagedService; 041import org.apache.camel.Processor; 042import org.apache.camel.Producer; 043import org.apache.camel.Route; 044import org.apache.camel.Service; 045import org.apache.camel.StartupListener; 046import org.apache.camel.TimerListener; 047import org.apache.camel.VetoCamelContextStartException; 048import org.apache.camel.api.management.PerformanceCounter; 049import org.apache.camel.impl.ConsumerCache; 050import org.apache.camel.impl.DefaultCamelContext; 051import org.apache.camel.impl.DefaultEndpointRegistry; 052import org.apache.camel.impl.EventDrivenConsumerRoute; 053import org.apache.camel.impl.ProducerCache; 054import org.apache.camel.impl.ThrottlingInflightRoutePolicy; 055import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager; 056import org.apache.camel.management.mbean.ManagedBacklogDebugger; 057import org.apache.camel.management.mbean.ManagedBacklogTracer; 058import org.apache.camel.management.mbean.ManagedCamelContext; 059import org.apache.camel.management.mbean.ManagedConsumerCache; 060import org.apache.camel.management.mbean.ManagedEndpoint; 061import org.apache.camel.management.mbean.ManagedEndpointRegistry; 062import org.apache.camel.management.mbean.ManagedInflightRepository; 063import org.apache.camel.management.mbean.ManagedProducerCache; 064import org.apache.camel.management.mbean.ManagedRestRegistry; 065import org.apache.camel.management.mbean.ManagedRoute; 066import org.apache.camel.management.mbean.ManagedRuntimeEndpointRegistry; 067import org.apache.camel.management.mbean.ManagedService; 068import org.apache.camel.management.mbean.ManagedStreamCachingStrategy; 069import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy; 070import org.apache.camel.management.mbean.ManagedTracer; 071import org.apache.camel.management.mbean.ManagedTypeConverterRegistry; 072import org.apache.camel.model.AOPDefinition; 073import org.apache.camel.model.InterceptDefinition; 074import org.apache.camel.model.OnCompletionDefinition; 075import org.apache.camel.model.OnExceptionDefinition; 076import org.apache.camel.model.PolicyDefinition; 077import org.apache.camel.model.ProcessorDefinition; 078import org.apache.camel.model.ProcessorDefinitionHelper; 079import org.apache.camel.model.RouteDefinition; 080import org.apache.camel.processor.CamelInternalProcessor; 081import org.apache.camel.processor.interceptor.BacklogDebugger; 082import org.apache.camel.processor.interceptor.BacklogTracer; 083import org.apache.camel.processor.interceptor.Tracer; 084import org.apache.camel.spi.AsyncProcessorAwaitManager; 085import org.apache.camel.spi.DataFormat; 086import org.apache.camel.spi.EventNotifier; 087import org.apache.camel.spi.InflightRepository; 088import org.apache.camel.spi.LifecycleStrategy; 089import org.apache.camel.spi.ManagementAgent; 090import org.apache.camel.spi.ManagementAware; 091import org.apache.camel.spi.ManagementNameStrategy; 092import org.apache.camel.spi.ManagementObjectStrategy; 093import org.apache.camel.spi.ManagementStrategy; 094import org.apache.camel.spi.RestRegistry; 095import org.apache.camel.spi.RouteContext; 096import org.apache.camel.spi.RuntimeEndpointRegistry; 097import org.apache.camel.spi.StreamCachingStrategy; 098import org.apache.camel.spi.TypeConverterRegistry; 099import org.apache.camel.spi.UnitOfWork; 100import org.apache.camel.support.ServiceSupport; 101import org.apache.camel.support.TimerListenerManager; 102import org.apache.camel.util.KeyValueHolder; 103import org.apache.camel.util.ObjectHelper; 104import org.slf4j.Logger; 105import org.slf4j.LoggerFactory; 106 107/** 108 * Default JMX managed lifecycle strategy that registered objects using the configured 109 * {@link org.apache.camel.spi.ManagementStrategy}. 110 * 111 * @see org.apache.camel.spi.ManagementStrategy 112 * @version 113 */ 114@SuppressWarnings("deprecation") 115public class DefaultManagementLifecycleStrategy extends ServiceSupport implements LifecycleStrategy, CamelContextAware { 116 117 private static final Logger LOG = LoggerFactory.getLogger(DefaultManagementLifecycleStrategy.class); 118 // the wrapped processors is for performance counters, which are in use for the created routes 119 // when a route is removed, we should remove the associated processors from this map 120 private final Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors = 121 new HashMap<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>>(); 122 private final List<PreRegisterService> preServices = new ArrayList<PreRegisterService>(); 123 private final TimerListenerManager loadTimer = new ManagedLoadTimer(); 124 private final TimerListenerManagerStartupListener loadTimerStartupListener = new TimerListenerManagerStartupListener(); 125 private volatile CamelContext camelContext; 126 private volatile ManagedCamelContext camelContextMBean; 127 private volatile boolean initialized; 128 private final Set<String> knowRouteIds = new HashSet<String>(); 129 private final Map<Tracer, ManagedTracer> managedTracers = new HashMap<Tracer, ManagedTracer>(); 130 private final Map<BacklogTracer, ManagedBacklogTracer> managedBacklogTracers = new HashMap<BacklogTracer, ManagedBacklogTracer>(); 131 private final Map<BacklogDebugger, ManagedBacklogDebugger> managedBacklogDebuggers = new HashMap<BacklogDebugger, ManagedBacklogDebugger>(); 132 private final Map<ThreadPoolExecutor, Object> managedThreadPools = new HashMap<ThreadPoolExecutor, Object>(); 133 134 public DefaultManagementLifecycleStrategy() { 135 } 136 137 public DefaultManagementLifecycleStrategy(CamelContext camelContext) { 138 this.camelContext = camelContext; 139 } 140 141 public CamelContext getCamelContext() { 142 return camelContext; 143 } 144 145 public void setCamelContext(CamelContext camelContext) { 146 this.camelContext = camelContext; 147 } 148 149 public void onContextStart(CamelContext context) throws VetoCamelContextStartException { 150 Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context); 151 152 String name = context.getName(); 153 String managementName = context.getManagementNameStrategy().getName(); 154 155 try { 156 boolean done = false; 157 while (!done) { 158 ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(managementName, name); 159 boolean exists = getManagementStrategy().isManaged(mc, on); 160 if (!exists) { 161 done = true; 162 } else { 163 // okay there exists already a CamelContext with this name, we can try to fix it by finding a free name 164 boolean fixed = false; 165 // if we use the default name strategy we can find a free name to use 166 String newName = findFreeName(mc, context.getManagementNameStrategy(), name); 167 if (newName != null) { 168 // use this as the fixed name 169 fixed = true; 170 done = true; 171 managementName = newName; 172 } 173 // we could not fix it so veto starting camel 174 if (!fixed) { 175 throw new VetoCamelContextStartException("CamelContext (" + context.getName() + ") with ObjectName[" + on + "] is already registered." 176 + " Make sure to use unique names on CamelContext when using multiple CamelContexts in the same MBeanServer.", context); 177 } else { 178 LOG.warn("This CamelContext(" + context.getName() + ") will be registered using the name: " + managementName 179 + " due to clash with an existing name already registered in MBeanServer."); 180 } 181 } 182 } 183 } catch (VetoCamelContextStartException e) { 184 // rethrow veto 185 throw e; 186 } catch (Exception e) { 187 // must rethrow to allow CamelContext fallback to non JMX agent to allow 188 // Camel to continue to run 189 throw ObjectHelper.wrapRuntimeCamelException(e); 190 } 191 192 // set the name we are going to use 193 if (context instanceof DefaultCamelContext) { 194 ((DefaultCamelContext) context).setManagementName(managementName); 195 } 196 197 try { 198 manageObject(mc); 199 } catch (Exception e) { 200 // must rethrow to allow CamelContext fallback to non JMX agent to allow 201 // Camel to continue to run 202 throw ObjectHelper.wrapRuntimeCamelException(e); 203 } 204 205 // yes we made it and are initialized 206 initialized = true; 207 208 if (mc instanceof ManagedCamelContext) { 209 camelContextMBean = (ManagedCamelContext) mc; 210 } 211 212 // register any pre registered now that we are initialized 213 enlistPreRegisteredServices(); 214 } 215 216 private String findFreeName(Object mc, ManagementNameStrategy strategy, String name) throws MalformedObjectNameException { 217 // we cannot find a free name for fixed named strategies 218 if (strategy.isFixedName()) { 219 return null; 220 } 221 222 // okay try to find a free name 223 boolean done = false; 224 String newName = null; 225 while (!done) { 226 // compute the next name 227 newName = strategy.getNextName(); 228 ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(newName, name); 229 done = !getManagementStrategy().isManaged(mc, on); 230 if (LOG.isTraceEnabled()) { 231 LOG.trace("Using name: {} in ObjectName[{}] exists? {}", new Object[]{name, on, done}); 232 } 233 } 234 return newName; 235 } 236 237 /** 238 * After {@link CamelContext} has been enlisted in JMX using {@link #onContextStart(org.apache.camel.CamelContext)} 239 * then we can enlist any pre registered services as well, as we had to wait for {@link CamelContext} to be 240 * enlisted first. 241 * <p/> 242 * A component/endpoint/service etc. can be pre registered when using dependency injection and annotations such as 243 * {@link org.apache.camel.Produce}, {@link org.apache.camel.EndpointInject}. Therefore we need to capture those 244 * registrations up front, and then afterwards enlist in JMX when {@link CamelContext} is being started. 245 */ 246 private void enlistPreRegisteredServices() { 247 if (preServices.isEmpty()) { 248 return; 249 } 250 251 LOG.debug("Registering {} pre registered services", preServices.size()); 252 for (PreRegisterService pre : preServices) { 253 if (pre.getComponent() != null) { 254 onComponentAdd(pre.getName(), pre.getComponent()); 255 } else if (pre.getEndpoint() != null) { 256 onEndpointAdd(pre.getEndpoint()); 257 } else if (pre.getService() != null) { 258 onServiceAdd(pre.getCamelContext(), pre.getService(), pre.getRoute()); 259 } 260 } 261 262 // we are done so clear the list 263 preServices.clear(); 264 } 265 266 public void onContextStop(CamelContext context) { 267 // the agent hasn't been started 268 if (!initialized) { 269 return; 270 } 271 try { 272 Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context); 273 // the context could have been removed already 274 if (getManagementStrategy().isManaged(mc, null)) { 275 unmanageObject(mc); 276 } 277 } catch (Exception e) { 278 LOG.warn("Could not unregister CamelContext MBean", e); 279 } 280 281 camelContextMBean = null; 282 } 283 284 public void onComponentAdd(String name, Component component) { 285 // always register components as there are only a few of those 286 if (!initialized) { 287 // pre register so we can register later when we have been initialized 288 PreRegisterService pre = new PreRegisterService(); 289 pre.onComponentAdd(name, component); 290 preServices.add(pre); 291 return; 292 } 293 try { 294 Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name); 295 manageObject(mc); 296 } catch (Exception e) { 297 LOG.warn("Could not register Component MBean", e); 298 } 299 } 300 301 public void onComponentRemove(String name, Component component) { 302 // the agent hasn't been started 303 if (!initialized) { 304 return; 305 } 306 try { 307 Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name); 308 unmanageObject(mc); 309 } catch (Exception e) { 310 LOG.warn("Could not unregister Component MBean", e); 311 } 312 } 313 314 /** 315 * If the endpoint is an instance of ManagedResource then register it with the 316 * mbean server, if it is not then wrap the endpoint in a {@link ManagedEndpoint} and 317 * register that with the mbean server. 318 * 319 * @param endpoint the Endpoint attempted to be added 320 */ 321 public void onEndpointAdd(Endpoint endpoint) { 322 if (!initialized) { 323 // pre register so we can register later when we have been initialized 324 PreRegisterService pre = new PreRegisterService(); 325 pre.onEndpointAdd(endpoint); 326 preServices.add(pre); 327 return; 328 } 329 330 if (!shouldRegister(endpoint, null)) { 331 // avoid registering if not needed 332 return; 333 } 334 335 try { 336 Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint); 337 if (me == null) { 338 // endpoint should not be managed 339 return; 340 } 341 manageObject(me); 342 } catch (Exception e) { 343 LOG.warn("Could not register Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e); 344 } 345 } 346 347 public void onEndpointRemove(Endpoint endpoint) { 348 // the agent hasn't been started 349 if (!initialized) { 350 return; 351 } 352 353 try { 354 Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint); 355 unmanageObject(me); 356 } catch (Exception e) { 357 LOG.warn("Could not unregister Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e); 358 } 359 } 360 361 public void onServiceAdd(CamelContext context, Service service, Route route) { 362 if (!initialized) { 363 // pre register so we can register later when we have been initialized 364 PreRegisterService pre = new PreRegisterService(); 365 pre.onServiceAdd(context, service, route); 366 preServices.add(pre); 367 return; 368 } 369 370 // services can by any kind of misc type but also processors 371 // so we have special logic when its a processor 372 373 if (!shouldRegister(service, route)) { 374 // avoid registering if not needed 375 return; 376 } 377 378 Object managedObject = getManagedObjectForService(context, service, route); 379 if (managedObject == null) { 380 // service should not be managed 381 return; 382 } 383 384 // skip already managed services, for example if a route has been restarted 385 if (getManagementStrategy().isManaged(managedObject, null)) { 386 LOG.trace("The service is already managed: {}", service); 387 return; 388 } 389 390 try { 391 manageObject(managedObject); 392 } catch (Exception e) { 393 LOG.warn("Could not register service: " + service + " as Service MBean.", e); 394 } 395 } 396 397 public void onServiceRemove(CamelContext context, Service service, Route route) { 398 // the agent hasn't been started 399 if (!initialized) { 400 return; 401 } 402 403 Object managedObject = getManagedObjectForService(context, service, route); 404 if (managedObject != null) { 405 try { 406 unmanageObject(managedObject); 407 } catch (Exception e) { 408 LOG.warn("Could not unregister service: " + service + " as Service MBean.", e); 409 } 410 } 411 } 412 413 @SuppressWarnings("unchecked") 414 private Object getManagedObjectForService(CamelContext context, Service service, Route route) { 415 // skip channel, UoW and dont double wrap instrumentation 416 if (service instanceof Channel || service instanceof UnitOfWork || service instanceof InstrumentationProcessor) { 417 return null; 418 } 419 420 // skip non managed services 421 if (service instanceof NonManagedService) { 422 return null; 423 } 424 425 Object answer = null; 426 427 if (service instanceof ManagementAware) { 428 return ((ManagementAware<Service>) service).getManagedObject(service); 429 } else if (service instanceof Tracer) { 430 // special for tracer 431 Tracer tracer = (Tracer) service; 432 ManagedTracer mt = managedTracers.get(tracer); 433 if (mt == null) { 434 mt = new ManagedTracer(context, tracer); 435 mt.init(getManagementStrategy()); 436 managedTracers.put(tracer, mt); 437 } 438 return mt; 439 } else if (service instanceof BacklogTracer) { 440 // special for backlog tracer 441 BacklogTracer backlogTracer = (BacklogTracer) service; 442 ManagedBacklogTracer mt = managedBacklogTracers.get(backlogTracer); 443 if (mt == null) { 444 mt = new ManagedBacklogTracer(context, backlogTracer); 445 mt.init(getManagementStrategy()); 446 managedBacklogTracers.put(backlogTracer, mt); 447 } 448 return mt; 449 } else if (service instanceof BacklogDebugger) { 450 // special for backlog debugger 451 BacklogDebugger backlogDebugger = (BacklogDebugger) service; 452 ManagedBacklogDebugger md = managedBacklogDebuggers.get(backlogDebugger); 453 if (md == null) { 454 md = new ManagedBacklogDebugger(context, backlogDebugger); 455 md.init(getManagementStrategy()); 456 managedBacklogDebuggers.put(backlogDebugger, md); 457 } 458 return md; 459 } else if (service instanceof DataFormat) { 460 answer = getManagementObjectStrategy().getManagedObjectForDataFormat(context, (DataFormat) service); 461 } else if (service instanceof Producer) { 462 answer = getManagementObjectStrategy().getManagedObjectForProducer(context, (Producer) service); 463 } else if (service instanceof Consumer) { 464 answer = getManagementObjectStrategy().getManagedObjectForConsumer(context, (Consumer) service); 465 } else if (service instanceof Processor) { 466 // special for processors as we need to do some extra work 467 return getManagedObjectForProcessor(context, (Processor) service, route); 468 } else if (service instanceof ThrottlingInflightRoutePolicy) { 469 answer = new ManagedThrottlingInflightRoutePolicy(context, (ThrottlingInflightRoutePolicy) service); 470 } else if (service instanceof ConsumerCache) { 471 answer = new ManagedConsumerCache(context, (ConsumerCache) service); 472 } else if (service instanceof ProducerCache) { 473 answer = new ManagedProducerCache(context, (ProducerCache) service); 474 } else if (service instanceof DefaultEndpointRegistry) { 475 answer = new ManagedEndpointRegistry(context, (DefaultEndpointRegistry) service); 476 } else if (service instanceof TypeConverterRegistry) { 477 answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service); 478 } else if (service instanceof RestRegistry) { 479 answer = new ManagedRestRegistry(context, (RestRegistry) service); 480 } else if (service instanceof InflightRepository) { 481 answer = new ManagedInflightRepository(context, (InflightRepository) service); 482 } else if (service instanceof AsyncProcessorAwaitManager) { 483 answer = new ManagedAsyncProcessorAwaitManager(context, (AsyncProcessorAwaitManager) service); 484 } else if (service instanceof RuntimeEndpointRegistry) { 485 answer = new ManagedRuntimeEndpointRegistry(context, (RuntimeEndpointRegistry) service); 486 } else if (service instanceof StreamCachingStrategy) { 487 answer = new ManagedStreamCachingStrategy(context, (StreamCachingStrategy) service); 488 } else if (service instanceof EventNotifier) { 489 answer = getManagementObjectStrategy().getManagedObjectForEventNotifier(context, (EventNotifier) service); 490 } else if (service != null) { 491 // fallback as generic service 492 answer = getManagementObjectStrategy().getManagedObjectForService(context, service); 493 } 494 495 if (answer != null && answer instanceof ManagedService) { 496 ManagedService ms = (ManagedService) answer; 497 ms.setRoute(route); 498 ms.init(getManagementStrategy()); 499 } 500 501 return answer; 502 } 503 504 private Object getManagedObjectForProcessor(CamelContext context, Processor processor, Route route) { 505 // a bit of magic here as the processors we want to manage have already been registered 506 // in the wrapped processors map when Camel have instrumented the route on route initialization 507 // so the idea is now to only manage the processors from the map 508 KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = wrappedProcessors.get(processor); 509 if (holder == null) { 510 // skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc. 511 return null; 512 } 513 514 // get the managed object as it can be a specialized type such as a Delayer/Throttler etc. 515 Object managedObject = getManagementObjectStrategy().getManagedObjectForProcessor(context, processor, holder.getKey(), route); 516 // only manage if we have a name for it as otherwise we do not want to manage it anyway 517 if (managedObject != null) { 518 // is it a performance counter then we need to set our counter 519 if (managedObject instanceof PerformanceCounter) { 520 InstrumentationProcessor counter = holder.getValue(); 521 if (counter != null) { 522 // change counter to us 523 counter.setCounter(managedObject); 524 } 525 } 526 } 527 528 return managedObject; 529 } 530 531 public void onRoutesAdd(Collection<Route> routes) { 532 for (Route route : routes) { 533 534 // if we are starting CamelContext or either of the two options has been 535 // enabled, then enlist the route as a known route 536 if (getCamelContext().getStatus().isStarting() 537 || getManagementStrategy().getManagementAgent().getRegisterAlways() 538 || getManagementStrategy().getManagementAgent().getRegisterNewRoutes()) { 539 // register as known route id 540 knowRouteIds.add(route.getId()); 541 } 542 543 if (!shouldRegister(route, route)) { 544 // avoid registering if not needed, skip to next route 545 continue; 546 } 547 548 Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route); 549 550 // skip already managed routes, for example if the route has been restarted 551 if (getManagementStrategy().isManaged(mr, null)) { 552 LOG.trace("The route is already managed: {}", route); 553 continue; 554 } 555 556 // get the wrapped instrumentation processor from this route 557 // and set me as the counter 558 if (route instanceof EventDrivenConsumerRoute) { 559 EventDrivenConsumerRoute edcr = (EventDrivenConsumerRoute) route; 560 Processor processor = edcr.getProcessor(); 561 if (processor instanceof CamelInternalProcessor && mr instanceof ManagedRoute) { 562 CamelInternalProcessor internal = (CamelInternalProcessor) processor; 563 ManagedRoute routeMBean = (ManagedRoute) mr; 564 565 CamelInternalProcessor.InstrumentationAdvice task = internal.getAdvice(CamelInternalProcessor.InstrumentationAdvice.class); 566 if (task != null) { 567 // we need to wrap the counter with the camel context so we get stats updated on the context as well 568 if (camelContextMBean != null) { 569 CompositePerformanceCounter wrapper = new CompositePerformanceCounter(routeMBean, camelContextMBean); 570 task.setCounter(wrapper); 571 } else { 572 task.setCounter(routeMBean); 573 } 574 } 575 } 576 } 577 578 try { 579 manageObject(mr); 580 } catch (JMException e) { 581 LOG.warn("Could not register Route MBean", e); 582 } catch (Exception e) { 583 LOG.warn("Could not create Route MBean", e); 584 } 585 } 586 } 587 588 public void onRoutesRemove(Collection<Route> routes) { 589 // the agent hasn't been started 590 if (!initialized) { 591 return; 592 } 593 594 for (Route route : routes) { 595 Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route); 596 597 // skip unmanaged routes 598 if (!getManagementStrategy().isManaged(mr, null)) { 599 LOG.trace("The route is not managed: {}", route); 600 continue; 601 } 602 603 try { 604 unmanageObject(mr); 605 } catch (Exception e) { 606 LOG.warn("Could not unregister Route MBean", e); 607 } 608 609 // remove from known routes ids, as the route has been removed 610 knowRouteIds.remove(route.getId()); 611 } 612 613 // after the routes has been removed, we should clear the wrapped processors as we no longer need them 614 // as they were just a provisional map used during creation of routes 615 removeWrappedProcessorsForRoutes(routes); 616 } 617 618 public void onErrorHandlerAdd(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) { 619 if (!shouldRegister(errorHandler, null)) { 620 // avoid registering if not needed 621 return; 622 } 623 624 Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder); 625 626 // skip already managed services, for example if a route has been restarted 627 if (getManagementStrategy().isManaged(me, null)) { 628 LOG.trace("The error handler builder is already managed: {}", errorHandlerBuilder); 629 return; 630 } 631 632 try { 633 manageObject(me); 634 } catch (Exception e) { 635 LOG.warn("Could not register error handler builder: " + errorHandlerBuilder + " as ErrorHandler MBean.", e); 636 } 637 } 638 639 public void onErrorHandlerRemove(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) { 640 if (!initialized) { 641 return; 642 } 643 644 Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder); 645 if (me != null) { 646 try { 647 unmanageObject(me); 648 } catch (Exception e) { 649 LOG.warn("Could not unregister error handler: " + me + " as ErrorHandler MBean.", e); 650 } 651 } 652 } 653 654 public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool, String id, 655 String sourceId, String routeId, String threadPoolProfileId) { 656 657 if (!shouldRegister(threadPool, null)) { 658 // avoid registering if not needed 659 return; 660 } 661 662 Object mtp = getManagementObjectStrategy().getManagedObjectForThreadPool(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId); 663 664 // skip already managed services, for example if a route has been restarted 665 if (getManagementStrategy().isManaged(mtp, null)) { 666 LOG.trace("The thread pool is already managed: {}", threadPool); 667 return; 668 } 669 670 try { 671 manageObject(mtp); 672 // store a reference so we can unmanage from JMX when the thread pool is removed 673 // we need to keep track here, as we cannot re-construct the thread pool ObjectName when removing the thread pool 674 managedThreadPools.put(threadPool, mtp); 675 } catch (Exception e) { 676 LOG.warn("Could not register thread pool: " + threadPool + " as ThreadPool MBean.", e); 677 } 678 } 679 680 public void onThreadPoolRemove(CamelContext camelContext, ThreadPoolExecutor threadPool) { 681 if (!initialized) { 682 return; 683 } 684 685 // lookup the thread pool and remove it from JMX 686 Object mtp = managedThreadPools.remove(threadPool); 687 if (mtp != null) { 688 // skip unmanaged routes 689 if (!getManagementStrategy().isManaged(mtp, null)) { 690 LOG.trace("The thread pool is not managed: {}", threadPool); 691 return; 692 } 693 694 try { 695 unmanageObject(mtp); 696 } catch (Exception e) { 697 LOG.warn("Could not unregister ThreadPool MBean", e); 698 } 699 } 700 } 701 702 public void onRouteContextCreate(RouteContext routeContext) { 703 if (!initialized) { 704 return; 705 } 706 707 // Create a map (ProcessorType -> PerformanceCounter) 708 // to be passed to InstrumentationInterceptStrategy. 709 Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters = 710 new HashMap<ProcessorDefinition<?>, PerformanceCounter>(); 711 712 // Each processor in a route will have its own performance counter. 713 // These performance counter will be embedded to InstrumentationProcessor 714 // and wrap the appropriate processor by InstrumentationInterceptStrategy. 715 RouteDefinition route = routeContext.getRoute(); 716 717 // register performance counters for all processors and its children 718 for (ProcessorDefinition<?> processor : route.getOutputs()) { 719 registerPerformanceCounters(routeContext, processor, registeredCounters); 720 } 721 722 // set this managed intercept strategy that executes the JMX instrumentation for performance metrics 723 // so our registered counters can be used for fine grained performance instrumentation 724 routeContext.setManagedInterceptStrategy(new InstrumentationInterceptStrategy(registeredCounters, wrappedProcessors)); 725 } 726 727 /** 728 * Removes the wrapped processors for the given routes, as they are no longer in use. 729 * <p/> 730 * This is needed to avoid accumulating memory, if a lot of routes is being added and removed. 731 * 732 * @param routes the routes 733 */ 734 private void removeWrappedProcessorsForRoutes(Collection<Route> routes) { 735 // loop the routes, and remove the route associated wrapped processors, as they are no longer in use 736 for (Route route : routes) { 737 String id = route.getId(); 738 739 Iterator<KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> it = wrappedProcessors.values().iterator(); 740 while (it.hasNext()) { 741 KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = it.next(); 742 RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey()); 743 if (def != null && id.equals(def.getId())) { 744 it.remove(); 745 } 746 } 747 } 748 749 } 750 751 private void registerPerformanceCounters(RouteContext routeContext, ProcessorDefinition<?> processor, 752 Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters) { 753 754 // traverse children if any exists 755 List<ProcessorDefinition<?>> children = processor.getOutputs(); 756 for (ProcessorDefinition<?> child : children) { 757 registerPerformanceCounters(routeContext, child, registeredCounters); 758 } 759 760 // skip processors that should not be registered 761 if (!registerProcessor(processor)) { 762 return; 763 } 764 765 // okay this is a processor we would like to manage so create the 766 // a delegate performance counter that acts as the placeholder in the interceptor 767 // that then delegates to the real mbean which we register later in the onServiceAdd method 768 DelegatePerformanceCounter pc = new DelegatePerformanceCounter(); 769 // set statistics enabled depending on the option 770 boolean enabled = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isDefaultOrExtended(); 771 pc.setStatisticsEnabled(enabled); 772 773 // and add it as a a registered counter that will be used lazy when Camel 774 // does the instrumentation of the route and adds the InstrumentationProcessor 775 // that does the actual performance metrics gatherings at runtime 776 registeredCounters.put(processor, pc); 777 } 778 779 /** 780 * Should the given processor be registered. 781 */ 782 protected boolean registerProcessor(ProcessorDefinition<?> processor) { 783 // skip on exception 784 if (processor instanceof OnExceptionDefinition) { 785 return false; 786 } 787 // skip on completion 788 if (processor instanceof OnCompletionDefinition) { 789 return false; 790 } 791 // skip intercept 792 if (processor instanceof InterceptDefinition) { 793 return false; 794 } 795 // skip aop 796 if (processor instanceof AOPDefinition) { 797 return false; 798 } 799 // skip policy 800 if (processor instanceof PolicyDefinition) { 801 return false; 802 } 803 804 // only if custom id assigned 805 boolean only = getManagementStrategy().getManagementAgent().getOnlyRegisterProcessorWithCustomId() != null 806 && getManagementStrategy().getManagementAgent().getOnlyRegisterProcessorWithCustomId(); 807 if (only) { 808 return processor.hasCustomIdAssigned(); 809 } 810 811 // use customer filter 812 return getManagementStrategy().manageProcessor(processor); 813 } 814 815 private ManagementStrategy getManagementStrategy() { 816 ObjectHelper.notNull(camelContext, "CamelContext"); 817 return camelContext.getManagementStrategy(); 818 } 819 820 private ManagementObjectStrategy getManagementObjectStrategy() { 821 ObjectHelper.notNull(camelContext, "CamelContext"); 822 return camelContext.getManagementStrategy().getManagementObjectStrategy(); 823 } 824 825 /** 826 * Strategy for managing the object 827 * 828 * @param me the managed object 829 * @throws Exception is thrown if error registering the object for management 830 */ 831 protected void manageObject(Object me) throws Exception { 832 getManagementStrategy().manageObject(me); 833 if (me instanceof TimerListener) { 834 TimerListener timer = (TimerListener) me; 835 loadTimer.addTimerListener(timer); 836 } 837 } 838 839 /** 840 * Un-manages the object. 841 * 842 * @param me the managed object 843 * @throws Exception is thrown if error unregistering the managed object 844 */ 845 protected void unmanageObject(Object me) throws Exception { 846 if (me instanceof TimerListener) { 847 TimerListener timer = (TimerListener) me; 848 loadTimer.removeTimerListener(timer); 849 } 850 getManagementStrategy().unmanageObject(me); 851 } 852 853 /** 854 * Whether or not to register the mbean. 855 * <p/> 856 * The {@link ManagementAgent} has options which controls when to register. 857 * This allows us to only register mbeans accordingly. For example by default any 858 * dynamic endpoints is not registered. This avoids to register excessive mbeans, which 859 * most often is not desired. 860 * 861 * @param service the object to register 862 * @param route an optional route the mbean is associated with, can be <tt>null</tt> 863 * @return <tt>true</tt> to register, <tt>false</tt> to skip registering 864 */ 865 protected boolean shouldRegister(Object service, Route route) { 866 // the agent hasn't been started 867 if (!initialized) { 868 return false; 869 } 870 871 LOG.trace("Checking whether to register {} from route: {}", service, route); 872 873 ManagementAgent agent = getManagementStrategy().getManagementAgent(); 874 if (agent == null) { 875 // do not register if no agent 876 return false; 877 } 878 879 // always register if we are starting CamelContext 880 if (getCamelContext().getStatus().isStarting()) { 881 return true; 882 } 883 884 // always register if we are setting up routes 885 if (getCamelContext().isSetupRoutes()) { 886 return true; 887 } 888 889 // register if always is enabled 890 if (agent.getRegisterAlways()) { 891 return true; 892 } 893 894 // is it a known route then always accept 895 if (route != null && knowRouteIds.contains(route.getId())) { 896 return true; 897 } 898 899 // only register if we are starting a new route, and current thread is in starting routes mode 900 if (agent.getRegisterNewRoutes()) { 901 // no specific route, then fallback to see if this thread is starting routes 902 // which is kept as state on the camel context 903 return getCamelContext().isStartingRoutes(); 904 } 905 906 return false; 907 } 908 909 @Override 910 protected void doStart() throws Exception { 911 ObjectHelper.notNull(camelContext, "CamelContext"); 912 913 // defer starting the timer manager until CamelContext has been fully started 914 camelContext.addStartupListener(loadTimerStartupListener); 915 } 916 917 private final class TimerListenerManagerStartupListener implements StartupListener { 918 919 @Override 920 public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception { 921 // we are disabled either if configured explicit, or if level is off 922 boolean load = camelContext.getManagementStrategy().getManagementAgent().getLoadStatisticsEnabled() != null 923 && camelContext.getManagementStrategy().getManagementAgent().getLoadStatisticsEnabled(); 924 boolean disabled = !load || camelContext.getManagementStrategy().getStatisticsLevel() == ManagementStatisticsLevel.Off; 925 926 LOG.debug("Load performance statistics {}", disabled ? "disabled" : "enabled"); 927 if (!disabled) { 928 // must use 1 sec interval as the load statistics is based on 1 sec calculations 929 loadTimer.setInterval(1000); 930 // we have to defer enlisting timer lister manager as a service until CamelContext has been started 931 getCamelContext().addService(loadTimer); 932 } 933 } 934 } 935 936 @Override 937 protected void doStop() throws Exception { 938 initialized = false; 939 knowRouteIds.clear(); 940 preServices.clear(); 941 wrappedProcessors.clear(); 942 managedTracers.clear(); 943 managedBacklogTracers.clear(); 944 managedBacklogDebuggers.clear(); 945 managedThreadPools.clear(); 946 } 947 948 /** 949 * Class which holds any pre registration details. 950 * 951 * @see org.apache.camel.management.DefaultManagementLifecycleStrategy#enlistPreRegisteredServices() 952 */ 953 private static final class PreRegisterService { 954 955 private String name; 956 private Component component; 957 private Endpoint endpoint; 958 private CamelContext camelContext; 959 private Service service; 960 private Route route; 961 962 public void onComponentAdd(String name, Component component) { 963 this.name = name; 964 this.component = component; 965 } 966 967 public void onEndpointAdd(Endpoint endpoint) { 968 this.endpoint = endpoint; 969 } 970 971 public void onServiceAdd(CamelContext camelContext, Service service, Route route) { 972 this.camelContext = camelContext; 973 this.service = service; 974 this.route = route; 975 } 976 977 public String getName() { 978 return name; 979 } 980 981 public Component getComponent() { 982 return component; 983 } 984 985 public Endpoint getEndpoint() { 986 return endpoint; 987 } 988 989 public CamelContext getCamelContext() { 990 return camelContext; 991 } 992 993 public Service getService() { 994 return service; 995 } 996 997 public Route getRoute() { 998 return route; 999 } 1000 } 1001 1002} 1003