001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.management; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.HashMap; 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.concurrent.ThreadPoolExecutor; 028import javax.management.JMException; 029import javax.management.MalformedObjectNameException; 030import javax.management.ObjectName; 031 032import org.apache.camel.CamelContext; 033import org.apache.camel.CamelContextAware; 034import org.apache.camel.Channel; 035import org.apache.camel.Component; 036import org.apache.camel.Consumer; 037import org.apache.camel.Endpoint; 038import org.apache.camel.ErrorHandlerFactory; 039import org.apache.camel.ManagementStatisticsLevel; 040import org.apache.camel.NonManagedService; 041import org.apache.camel.Processor; 042import org.apache.camel.Producer; 043import org.apache.camel.Route; 044import org.apache.camel.Service; 045import org.apache.camel.StartupListener; 046import org.apache.camel.TimerListener; 047import org.apache.camel.VetoCamelContextStartException; 048import org.apache.camel.api.management.PerformanceCounter; 049import org.apache.camel.impl.ConsumerCache; 050import org.apache.camel.impl.DefaultCamelContext; 051import org.apache.camel.impl.DefaultEndpointRegistry; 052import org.apache.camel.impl.EventDrivenConsumerRoute; 053import org.apache.camel.impl.ProducerCache; 054import org.apache.camel.impl.ThrottlingExceptionRoutePolicy; 055import org.apache.camel.impl.ThrottlingInflightRoutePolicy; 056import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager; 057import org.apache.camel.management.mbean.ManagedBacklogDebugger; 058import org.apache.camel.management.mbean.ManagedBacklogTracer; 059import org.apache.camel.management.mbean.ManagedCamelContext; 060import org.apache.camel.management.mbean.ManagedConsumerCache; 061import org.apache.camel.management.mbean.ManagedEndpoint; 062import org.apache.camel.management.mbean.ManagedEndpointRegistry; 063import org.apache.camel.management.mbean.ManagedInflightRepository; 064import org.apache.camel.management.mbean.ManagedProducerCache; 065import org.apache.camel.management.mbean.ManagedRestRegistry; 066import org.apache.camel.management.mbean.ManagedRoute; 067import org.apache.camel.management.mbean.ManagedRuntimeEndpointRegistry; 068import org.apache.camel.management.mbean.ManagedService; 069import org.apache.camel.management.mbean.ManagedStreamCachingStrategy; 070import org.apache.camel.management.mbean.ManagedThrottlingExceptionRoutePolicy; 071import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy; 072import org.apache.camel.management.mbean.ManagedTracer; 073import org.apache.camel.management.mbean.ManagedTypeConverterRegistry; 074import org.apache.camel.model.AOPDefinition; 075import org.apache.camel.model.InterceptDefinition; 076import org.apache.camel.model.OnCompletionDefinition; 077import org.apache.camel.model.OnExceptionDefinition; 078import org.apache.camel.model.PolicyDefinition; 079import org.apache.camel.model.ProcessorDefinition; 080import org.apache.camel.model.ProcessorDefinitionHelper; 081import org.apache.camel.model.RouteDefinition; 082import org.apache.camel.processor.CamelInternalProcessor; 083import org.apache.camel.processor.interceptor.BacklogDebugger; 084import org.apache.camel.processor.interceptor.BacklogTracer; 085import org.apache.camel.processor.interceptor.Tracer; 086import org.apache.camel.spi.AsyncProcessorAwaitManager; 087import org.apache.camel.spi.DataFormat; 088import org.apache.camel.spi.EventNotifier; 089import org.apache.camel.spi.InflightRepository; 090import org.apache.camel.spi.LifecycleStrategy; 091import org.apache.camel.spi.ManagementAgent; 092import org.apache.camel.spi.ManagementAware; 093import org.apache.camel.spi.ManagementNameStrategy; 094import org.apache.camel.spi.ManagementObjectStrategy; 095import org.apache.camel.spi.ManagementStrategy; 096import org.apache.camel.spi.RestRegistry; 097import org.apache.camel.spi.RouteContext; 098import org.apache.camel.spi.RuntimeEndpointRegistry; 099import org.apache.camel.spi.StreamCachingStrategy; 100import org.apache.camel.spi.TypeConverterRegistry; 101import org.apache.camel.spi.UnitOfWork; 102import org.apache.camel.support.ServiceSupport; 103import org.apache.camel.support.TimerListenerManager; 104import org.apache.camel.util.KeyValueHolder; 105import org.apache.camel.util.ObjectHelper; 106import org.slf4j.Logger; 107import org.slf4j.LoggerFactory; 108 109/** 110 * Default JMX managed lifecycle strategy that registered objects using the configured 111 * {@link org.apache.camel.spi.ManagementStrategy}. 112 * 113 * @see org.apache.camel.spi.ManagementStrategy 114 * @version 115 */ 116@SuppressWarnings("deprecation") 117public class DefaultManagementLifecycleStrategy extends ServiceSupport implements LifecycleStrategy, CamelContextAware { 118 119 private static final Logger LOG = LoggerFactory.getLogger(DefaultManagementLifecycleStrategy.class); 120 // the wrapped processors is for performance counters, which are in use for the created routes 121 // when a route is removed, we should remove the associated processors from this map 122 private final Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors = 123 new HashMap<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>>(); 124 private final List<PreRegisterService> preServices = new ArrayList<PreRegisterService>(); 125 private final TimerListenerManager loadTimer = new ManagedLoadTimer(); 126 private final TimerListenerManagerStartupListener loadTimerStartupListener = new TimerListenerManagerStartupListener(); 127 private volatile CamelContext camelContext; 128 private volatile ManagedCamelContext camelContextMBean; 129 private volatile boolean initialized; 130 private final Set<String> knowRouteIds = new HashSet<String>(); 131 private final Map<Tracer, ManagedTracer> managedTracers = new HashMap<Tracer, ManagedTracer>(); 132 private final Map<BacklogTracer, ManagedBacklogTracer> managedBacklogTracers = new HashMap<BacklogTracer, ManagedBacklogTracer>(); 133 private final Map<BacklogDebugger, ManagedBacklogDebugger> managedBacklogDebuggers = new HashMap<BacklogDebugger, ManagedBacklogDebugger>(); 134 private final Map<ThreadPoolExecutor, Object> managedThreadPools = new HashMap<ThreadPoolExecutor, Object>(); 135 136 public DefaultManagementLifecycleStrategy() { 137 } 138 139 public DefaultManagementLifecycleStrategy(CamelContext camelContext) { 140 this.camelContext = camelContext; 141 } 142 143 public CamelContext getCamelContext() { 144 return camelContext; 145 } 146 147 public void setCamelContext(CamelContext camelContext) { 148 this.camelContext = camelContext; 149 } 150 151 public void onContextStart(CamelContext context) throws VetoCamelContextStartException { 152 Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context); 153 154 String name = context.getName(); 155 String managementName = context.getManagementNameStrategy().getName(); 156 157 try { 158 boolean done = false; 159 while (!done) { 160 ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(managementName, name); 161 boolean exists = getManagementStrategy().isManaged(mc, on); 162 if (!exists) { 163 done = true; 164 } else { 165 // okay there exists already a CamelContext with this name, we can try to fix it by finding a free name 166 boolean fixed = false; 167 // if we use the default name strategy we can find a free name to use 168 String newName = findFreeName(mc, context.getManagementNameStrategy(), name); 169 if (newName != null) { 170 // use this as the fixed name 171 fixed = true; 172 done = true; 173 managementName = newName; 174 } 175 // we could not fix it so veto starting camel 176 if (!fixed) { 177 throw new VetoCamelContextStartException("CamelContext (" + context.getName() + ") with ObjectName[" + on + "] is already registered." 178 + " Make sure to use unique names on CamelContext when using multiple CamelContexts in the same MBeanServer.", context); 179 } else { 180 LOG.warn("This CamelContext(" + context.getName() + ") will be registered using the name: " + managementName 181 + " due to clash with an existing name already registered in MBeanServer."); 182 } 183 } 184 } 185 } catch (VetoCamelContextStartException e) { 186 // rethrow veto 187 throw e; 188 } catch (Exception e) { 189 // must rethrow to allow CamelContext fallback to non JMX agent to allow 190 // Camel to continue to run 191 throw ObjectHelper.wrapRuntimeCamelException(e); 192 } 193 194 // set the name we are going to use 195 if (context instanceof DefaultCamelContext) { 196 ((DefaultCamelContext) context).setManagementName(managementName); 197 } 198 199 try { 200 manageObject(mc); 201 } catch (Exception e) { 202 // must rethrow to allow CamelContext fallback to non JMX agent to allow 203 // Camel to continue to run 204 throw ObjectHelper.wrapRuntimeCamelException(e); 205 } 206 207 // yes we made it and are initialized 208 initialized = true; 209 210 if (mc instanceof ManagedCamelContext) { 211 camelContextMBean = (ManagedCamelContext) mc; 212 } 213 214 // register any pre registered now that we are initialized 215 enlistPreRegisteredServices(); 216 } 217 218 private String findFreeName(Object mc, ManagementNameStrategy strategy, String name) throws MalformedObjectNameException { 219 // we cannot find a free name for fixed named strategies 220 if (strategy.isFixedName()) { 221 return null; 222 } 223 224 // okay try to find a free name 225 boolean done = false; 226 String newName = null; 227 while (!done) { 228 // compute the next name 229 newName = strategy.getNextName(); 230 ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(newName, name); 231 done = !getManagementStrategy().isManaged(mc, on); 232 if (LOG.isTraceEnabled()) { 233 LOG.trace("Using name: {} in ObjectName[{}] exists? {}", new Object[]{name, on, done}); 234 } 235 } 236 return newName; 237 } 238 239 /** 240 * After {@link CamelContext} has been enlisted in JMX using {@link #onContextStart(org.apache.camel.CamelContext)} 241 * then we can enlist any pre registered services as well, as we had to wait for {@link CamelContext} to be 242 * enlisted first. 243 * <p/> 244 * A component/endpoint/service etc. can be pre registered when using dependency injection and annotations such as 245 * {@link org.apache.camel.Produce}, {@link org.apache.camel.EndpointInject}. Therefore we need to capture those 246 * registrations up front, and then afterwards enlist in JMX when {@link CamelContext} is being started. 247 */ 248 private void enlistPreRegisteredServices() { 249 if (preServices.isEmpty()) { 250 return; 251 } 252 253 LOG.debug("Registering {} pre registered services", preServices.size()); 254 for (PreRegisterService pre : preServices) { 255 if (pre.getComponent() != null) { 256 onComponentAdd(pre.getName(), pre.getComponent()); 257 } else if (pre.getEndpoint() != null) { 258 onEndpointAdd(pre.getEndpoint()); 259 } else if (pre.getService() != null) { 260 onServiceAdd(pre.getCamelContext(), pre.getService(), pre.getRoute()); 261 } 262 } 263 264 // we are done so clear the list 265 preServices.clear(); 266 } 267 268 public void onContextStop(CamelContext context) { 269 // the agent hasn't been started 270 if (!initialized) { 271 return; 272 } 273 try { 274 Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context); 275 // the context could have been removed already 276 if (getManagementStrategy().isManaged(mc, null)) { 277 unmanageObject(mc); 278 } 279 } catch (Exception e) { 280 LOG.warn("Could not unregister CamelContext MBean", e); 281 } 282 283 camelContextMBean = null; 284 } 285 286 public void onComponentAdd(String name, Component component) { 287 // always register components as there are only a few of those 288 if (!initialized) { 289 // pre register so we can register later when we have been initialized 290 PreRegisterService pre = new PreRegisterService(); 291 pre.onComponentAdd(name, component); 292 preServices.add(pre); 293 return; 294 } 295 try { 296 Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name); 297 manageObject(mc); 298 } catch (Exception e) { 299 LOG.warn("Could not register Component MBean", e); 300 } 301 } 302 303 public void onComponentRemove(String name, Component component) { 304 // the agent hasn't been started 305 if (!initialized) { 306 return; 307 } 308 try { 309 Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name); 310 unmanageObject(mc); 311 } catch (Exception e) { 312 LOG.warn("Could not unregister Component MBean", e); 313 } 314 } 315 316 /** 317 * If the endpoint is an instance of ManagedResource then register it with the 318 * mbean server, if it is not then wrap the endpoint in a {@link ManagedEndpoint} and 319 * register that with the mbean server. 320 * 321 * @param endpoint the Endpoint attempted to be added 322 */ 323 public void onEndpointAdd(Endpoint endpoint) { 324 if (!initialized) { 325 // pre register so we can register later when we have been initialized 326 PreRegisterService pre = new PreRegisterService(); 327 pre.onEndpointAdd(endpoint); 328 preServices.add(pre); 329 return; 330 } 331 332 if (!shouldRegister(endpoint, null)) { 333 // avoid registering if not needed 334 return; 335 } 336 337 try { 338 Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint); 339 if (me == null) { 340 // endpoint should not be managed 341 return; 342 } 343 manageObject(me); 344 } catch (Exception e) { 345 LOG.warn("Could not register Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e); 346 } 347 } 348 349 public void onEndpointRemove(Endpoint endpoint) { 350 // the agent hasn't been started 351 if (!initialized) { 352 return; 353 } 354 355 try { 356 Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint); 357 unmanageObject(me); 358 } catch (Exception e) { 359 LOG.warn("Could not unregister Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e); 360 } 361 } 362 363 public void onServiceAdd(CamelContext context, Service service, Route route) { 364 if (!initialized) { 365 // pre register so we can register later when we have been initialized 366 PreRegisterService pre = new PreRegisterService(); 367 pre.onServiceAdd(context, service, route); 368 preServices.add(pre); 369 return; 370 } 371 372 // services can by any kind of misc type but also processors 373 // so we have special logic when its a processor 374 375 if (!shouldRegister(service, route)) { 376 // avoid registering if not needed 377 return; 378 } 379 380 Object managedObject = getManagedObjectForService(context, service, route); 381 if (managedObject == null) { 382 // service should not be managed 383 return; 384 } 385 386 // skip already managed services, for example if a route has been restarted 387 if (getManagementStrategy().isManaged(managedObject, null)) { 388 LOG.trace("The service is already managed: {}", service); 389 return; 390 } 391 392 try { 393 manageObject(managedObject); 394 } catch (Exception e) { 395 LOG.warn("Could not register service: " + service + " as Service MBean.", e); 396 } 397 } 398 399 public void onServiceRemove(CamelContext context, Service service, Route route) { 400 // the agent hasn't been started 401 if (!initialized) { 402 return; 403 } 404 405 Object managedObject = getManagedObjectForService(context, service, route); 406 if (managedObject != null) { 407 try { 408 unmanageObject(managedObject); 409 } catch (Exception e) { 410 LOG.warn("Could not unregister service: " + service + " as Service MBean.", e); 411 } 412 } 413 } 414 415 @SuppressWarnings("unchecked") 416 private Object getManagedObjectForService(CamelContext context, Service service, Route route) { 417 // skip channel, UoW and dont double wrap instrumentation 418 if (service instanceof Channel || service instanceof UnitOfWork || service instanceof InstrumentationProcessor) { 419 return null; 420 } 421 422 // skip non managed services 423 if (service instanceof NonManagedService) { 424 return null; 425 } 426 427 Object answer = null; 428 429 if (service instanceof ManagementAware) { 430 return ((ManagementAware<Service>) service).getManagedObject(service); 431 } else if (service instanceof Tracer) { 432 // special for tracer 433 Tracer tracer = (Tracer) service; 434 ManagedTracer mt = managedTracers.get(tracer); 435 if (mt == null) { 436 mt = new ManagedTracer(context, tracer); 437 mt.init(getManagementStrategy()); 438 managedTracers.put(tracer, mt); 439 } 440 return mt; 441 } else if (service instanceof BacklogTracer) { 442 // special for backlog tracer 443 BacklogTracer backlogTracer = (BacklogTracer) service; 444 ManagedBacklogTracer mt = managedBacklogTracers.get(backlogTracer); 445 if (mt == null) { 446 mt = new ManagedBacklogTracer(context, backlogTracer); 447 mt.init(getManagementStrategy()); 448 managedBacklogTracers.put(backlogTracer, mt); 449 } 450 return mt; 451 } else if (service instanceof BacklogDebugger) { 452 // special for backlog debugger 453 BacklogDebugger backlogDebugger = (BacklogDebugger) service; 454 ManagedBacklogDebugger md = managedBacklogDebuggers.get(backlogDebugger); 455 if (md == null) { 456 md = new ManagedBacklogDebugger(context, backlogDebugger); 457 md.init(getManagementStrategy()); 458 managedBacklogDebuggers.put(backlogDebugger, md); 459 } 460 return md; 461 } else if (service instanceof DataFormat) { 462 answer = getManagementObjectStrategy().getManagedObjectForDataFormat(context, (DataFormat) service); 463 } else if (service instanceof Producer) { 464 answer = getManagementObjectStrategy().getManagedObjectForProducer(context, (Producer) service); 465 } else if (service instanceof Consumer) { 466 answer = getManagementObjectStrategy().getManagedObjectForConsumer(context, (Consumer) service); 467 } else if (service instanceof Processor) { 468 // special for processors as we need to do some extra work 469 return getManagedObjectForProcessor(context, (Processor) service, route); 470 } else if (service instanceof ThrottlingInflightRoutePolicy) { 471 answer = new ManagedThrottlingInflightRoutePolicy(context, (ThrottlingInflightRoutePolicy) service); 472 } else if (service instanceof ThrottlingExceptionRoutePolicy) { 473 answer = new ManagedThrottlingExceptionRoutePolicy(context, (ThrottlingExceptionRoutePolicy) service); 474 } else if (service instanceof ConsumerCache) { 475 answer = new ManagedConsumerCache(context, (ConsumerCache) service); 476 } else if (service instanceof ProducerCache) { 477 answer = new ManagedProducerCache(context, (ProducerCache) service); 478 } else if (service instanceof DefaultEndpointRegistry) { 479 answer = new ManagedEndpointRegistry(context, (DefaultEndpointRegistry) service); 480 } else if (service instanceof TypeConverterRegistry) { 481 answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service); 482 } else if (service instanceof RestRegistry) { 483 answer = new ManagedRestRegistry(context, (RestRegistry) service); 484 } else if (service instanceof InflightRepository) { 485 answer = new ManagedInflightRepository(context, (InflightRepository) service); 486 } else if (service instanceof AsyncProcessorAwaitManager) { 487 answer = new ManagedAsyncProcessorAwaitManager(context, (AsyncProcessorAwaitManager) service); 488 } else if (service instanceof RuntimeEndpointRegistry) { 489 answer = new ManagedRuntimeEndpointRegistry(context, (RuntimeEndpointRegistry) service); 490 } else if (service instanceof StreamCachingStrategy) { 491 answer = new ManagedStreamCachingStrategy(context, (StreamCachingStrategy) service); 492 } else if (service instanceof EventNotifier) { 493 answer = getManagementObjectStrategy().getManagedObjectForEventNotifier(context, (EventNotifier) service); 494 } else if (service != null) { 495 // fallback as generic service 496 answer = getManagementObjectStrategy().getManagedObjectForService(context, service); 497 } 498 499 if (answer != null && answer instanceof ManagedService) { 500 ManagedService ms = (ManagedService) answer; 501 ms.setRoute(route); 502 ms.init(getManagementStrategy()); 503 } 504 505 return answer; 506 } 507 508 private Object getManagedObjectForProcessor(CamelContext context, Processor processor, Route route) { 509 // a bit of magic here as the processors we want to manage have already been registered 510 // in the wrapped processors map when Camel have instrumented the route on route initialization 511 // so the idea is now to only manage the processors from the map 512 KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = wrappedProcessors.get(processor); 513 if (holder == null) { 514 // skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc. 515 return null; 516 } 517 518 // get the managed object as it can be a specialized type such as a Delayer/Throttler etc. 519 Object managedObject = getManagementObjectStrategy().getManagedObjectForProcessor(context, processor, holder.getKey(), route); 520 // only manage if we have a name for it as otherwise we do not want to manage it anyway 521 if (managedObject != null) { 522 // is it a performance counter then we need to set our counter 523 if (managedObject instanceof PerformanceCounter) { 524 InstrumentationProcessor counter = holder.getValue(); 525 if (counter != null) { 526 // change counter to us 527 counter.setCounter(managedObject); 528 } 529 } 530 } 531 532 return managedObject; 533 } 534 535 public void onRoutesAdd(Collection<Route> routes) { 536 for (Route route : routes) { 537 538 // if we are starting CamelContext or either of the two options has been 539 // enabled, then enlist the route as a known route 540 if (getCamelContext().getStatus().isStarting() 541 || getManagementStrategy().getManagementAgent().getRegisterAlways() 542 || getManagementStrategy().getManagementAgent().getRegisterNewRoutes()) { 543 // register as known route id 544 knowRouteIds.add(route.getId()); 545 } 546 547 if (!shouldRegister(route, route)) { 548 // avoid registering if not needed, skip to next route 549 continue; 550 } 551 552 Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route); 553 554 // skip already managed routes, for example if the route has been restarted 555 if (getManagementStrategy().isManaged(mr, null)) { 556 LOG.trace("The route is already managed: {}", route); 557 continue; 558 } 559 560 // get the wrapped instrumentation processor from this route 561 // and set me as the counter 562 if (route instanceof EventDrivenConsumerRoute) { 563 EventDrivenConsumerRoute edcr = (EventDrivenConsumerRoute) route; 564 Processor processor = edcr.getProcessor(); 565 if (processor instanceof CamelInternalProcessor && mr instanceof ManagedRoute) { 566 CamelInternalProcessor internal = (CamelInternalProcessor) processor; 567 ManagedRoute routeMBean = (ManagedRoute) mr; 568 569 CamelInternalProcessor.InstrumentationAdvice task = internal.getAdvice(CamelInternalProcessor.InstrumentationAdvice.class); 570 if (task != null) { 571 // we need to wrap the counter with the camel context so we get stats updated on the context as well 572 if (camelContextMBean != null) { 573 CompositePerformanceCounter wrapper = new CompositePerformanceCounter(routeMBean, camelContextMBean); 574 task.setCounter(wrapper); 575 } else { 576 task.setCounter(routeMBean); 577 } 578 } 579 } 580 } 581 582 try { 583 manageObject(mr); 584 } catch (JMException e) { 585 LOG.warn("Could not register Route MBean", e); 586 } catch (Exception e) { 587 LOG.warn("Could not create Route MBean", e); 588 } 589 } 590 } 591 592 public void onRoutesRemove(Collection<Route> routes) { 593 // the agent hasn't been started 594 if (!initialized) { 595 return; 596 } 597 598 for (Route route : routes) { 599 Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route); 600 601 // skip unmanaged routes 602 if (!getManagementStrategy().isManaged(mr, null)) { 603 LOG.trace("The route is not managed: {}", route); 604 continue; 605 } 606 607 try { 608 unmanageObject(mr); 609 } catch (Exception e) { 610 LOG.warn("Could not unregister Route MBean", e); 611 } 612 613 // remove from known routes ids, as the route has been removed 614 knowRouteIds.remove(route.getId()); 615 } 616 617 // after the routes has been removed, we should clear the wrapped processors as we no longer need them 618 // as they were just a provisional map used during creation of routes 619 removeWrappedProcessorsForRoutes(routes); 620 } 621 622 public void onErrorHandlerAdd(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) { 623 if (!shouldRegister(errorHandler, null)) { 624 // avoid registering if not needed 625 return; 626 } 627 628 Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder); 629 630 // skip already managed services, for example if a route has been restarted 631 if (getManagementStrategy().isManaged(me, null)) { 632 LOG.trace("The error handler builder is already managed: {}", errorHandlerBuilder); 633 return; 634 } 635 636 try { 637 manageObject(me); 638 } catch (Exception e) { 639 LOG.warn("Could not register error handler builder: " + errorHandlerBuilder + " as ErrorHandler MBean.", e); 640 } 641 } 642 643 public void onErrorHandlerRemove(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) { 644 if (!initialized) { 645 return; 646 } 647 648 Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder); 649 if (me != null) { 650 try { 651 unmanageObject(me); 652 } catch (Exception e) { 653 LOG.warn("Could not unregister error handler: " + me + " as ErrorHandler MBean.", e); 654 } 655 } 656 } 657 658 public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool, String id, 659 String sourceId, String routeId, String threadPoolProfileId) { 660 661 if (!shouldRegister(threadPool, null)) { 662 // avoid registering if not needed 663 return; 664 } 665 666 Object mtp = getManagementObjectStrategy().getManagedObjectForThreadPool(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId); 667 668 // skip already managed services, for example if a route has been restarted 669 if (getManagementStrategy().isManaged(mtp, null)) { 670 LOG.trace("The thread pool is already managed: {}", threadPool); 671 return; 672 } 673 674 try { 675 manageObject(mtp); 676 // store a reference so we can unmanage from JMX when the thread pool is removed 677 // we need to keep track here, as we cannot re-construct the thread pool ObjectName when removing the thread pool 678 managedThreadPools.put(threadPool, mtp); 679 } catch (Exception e) { 680 LOG.warn("Could not register thread pool: " + threadPool + " as ThreadPool MBean.", e); 681 } 682 } 683 684 public void onThreadPoolRemove(CamelContext camelContext, ThreadPoolExecutor threadPool) { 685 if (!initialized) { 686 return; 687 } 688 689 // lookup the thread pool and remove it from JMX 690 Object mtp = managedThreadPools.remove(threadPool); 691 if (mtp != null) { 692 // skip unmanaged routes 693 if (!getManagementStrategy().isManaged(mtp, null)) { 694 LOG.trace("The thread pool is not managed: {}", threadPool); 695 return; 696 } 697 698 try { 699 unmanageObject(mtp); 700 } catch (Exception e) { 701 LOG.warn("Could not unregister ThreadPool MBean", e); 702 } 703 } 704 } 705 706 public void onRouteContextCreate(RouteContext routeContext) { 707 if (!initialized) { 708 return; 709 } 710 711 // Create a map (ProcessorType -> PerformanceCounter) 712 // to be passed to InstrumentationInterceptStrategy. 713 Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters = 714 new HashMap<ProcessorDefinition<?>, PerformanceCounter>(); 715 716 // Each processor in a route will have its own performance counter. 717 // These performance counter will be embedded to InstrumentationProcessor 718 // and wrap the appropriate processor by InstrumentationInterceptStrategy. 719 RouteDefinition route = routeContext.getRoute(); 720 721 // register performance counters for all processors and its children 722 for (ProcessorDefinition<?> processor : route.getOutputs()) { 723 registerPerformanceCounters(routeContext, processor, registeredCounters); 724 } 725 726 // set this managed intercept strategy that executes the JMX instrumentation for performance metrics 727 // so our registered counters can be used for fine grained performance instrumentation 728 routeContext.setManagedInterceptStrategy(new InstrumentationInterceptStrategy(registeredCounters, wrappedProcessors)); 729 } 730 731 /** 732 * Removes the wrapped processors for the given routes, as they are no longer in use. 733 * <p/> 734 * This is needed to avoid accumulating memory, if a lot of routes is being added and removed. 735 * 736 * @param routes the routes 737 */ 738 private void removeWrappedProcessorsForRoutes(Collection<Route> routes) { 739 // loop the routes, and remove the route associated wrapped processors, as they are no longer in use 740 for (Route route : routes) { 741 String id = route.getId(); 742 743 Iterator<KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> it = wrappedProcessors.values().iterator(); 744 while (it.hasNext()) { 745 KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = it.next(); 746 RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey()); 747 if (def != null && id.equals(def.getId())) { 748 it.remove(); 749 } 750 } 751 } 752 753 } 754 755 private void registerPerformanceCounters(RouteContext routeContext, ProcessorDefinition<?> processor, 756 Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters) { 757 758 // traverse children if any exists 759 List<ProcessorDefinition<?>> children = processor.getOutputs(); 760 for (ProcessorDefinition<?> child : children) { 761 registerPerformanceCounters(routeContext, child, registeredCounters); 762 } 763 764 // skip processors that should not be registered 765 if (!registerProcessor(processor)) { 766 return; 767 } 768 769 // okay this is a processor we would like to manage so create the 770 // a delegate performance counter that acts as the placeholder in the interceptor 771 // that then delegates to the real mbean which we register later in the onServiceAdd method 772 DelegatePerformanceCounter pc = new DelegatePerformanceCounter(); 773 // set statistics enabled depending on the option 774 boolean enabled = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isDefaultOrExtended(); 775 pc.setStatisticsEnabled(enabled); 776 777 // and add it as a a registered counter that will be used lazy when Camel 778 // does the instrumentation of the route and adds the InstrumentationProcessor 779 // that does the actual performance metrics gatherings at runtime 780 registeredCounters.put(processor, pc); 781 } 782 783 /** 784 * Should the given processor be registered. 785 */ 786 protected boolean registerProcessor(ProcessorDefinition<?> processor) { 787 // skip on exception 788 if (processor instanceof OnExceptionDefinition) { 789 return false; 790 } 791 // skip on completion 792 if (processor instanceof OnCompletionDefinition) { 793 return false; 794 } 795 // skip intercept 796 if (processor instanceof InterceptDefinition) { 797 return false; 798 } 799 // skip aop 800 if (processor instanceof AOPDefinition) { 801 return false; 802 } 803 // skip policy 804 if (processor instanceof PolicyDefinition) { 805 return false; 806 } 807 808 // only if custom id assigned 809 boolean only = getManagementStrategy().getManagementAgent().getOnlyRegisterProcessorWithCustomId() != null 810 && getManagementStrategy().getManagementAgent().getOnlyRegisterProcessorWithCustomId(); 811 if (only) { 812 return processor.hasCustomIdAssigned(); 813 } 814 815 // use customer filter 816 return getManagementStrategy().manageProcessor(processor); 817 } 818 819 private ManagementStrategy getManagementStrategy() { 820 ObjectHelper.notNull(camelContext, "CamelContext"); 821 return camelContext.getManagementStrategy(); 822 } 823 824 private ManagementObjectStrategy getManagementObjectStrategy() { 825 ObjectHelper.notNull(camelContext, "CamelContext"); 826 return camelContext.getManagementStrategy().getManagementObjectStrategy(); 827 } 828 829 /** 830 * Strategy for managing the object 831 * 832 * @param me the managed object 833 * @throws Exception is thrown if error registering the object for management 834 */ 835 protected void manageObject(Object me) throws Exception { 836 getManagementStrategy().manageObject(me); 837 if (me instanceof TimerListener) { 838 TimerListener timer = (TimerListener) me; 839 loadTimer.addTimerListener(timer); 840 } 841 } 842 843 /** 844 * Un-manages the object. 845 * 846 * @param me the managed object 847 * @throws Exception is thrown if error unregistering the managed object 848 */ 849 protected void unmanageObject(Object me) throws Exception { 850 if (me instanceof TimerListener) { 851 TimerListener timer = (TimerListener) me; 852 loadTimer.removeTimerListener(timer); 853 } 854 getManagementStrategy().unmanageObject(me); 855 } 856 857 /** 858 * Whether or not to register the mbean. 859 * <p/> 860 * The {@link ManagementAgent} has options which controls when to register. 861 * This allows us to only register mbeans accordingly. For example by default any 862 * dynamic endpoints is not registered. This avoids to register excessive mbeans, which 863 * most often is not desired. 864 * 865 * @param service the object to register 866 * @param route an optional route the mbean is associated with, can be <tt>null</tt> 867 * @return <tt>true</tt> to register, <tt>false</tt> to skip registering 868 */ 869 protected boolean shouldRegister(Object service, Route route) { 870 // the agent hasn't been started 871 if (!initialized) { 872 return false; 873 } 874 875 LOG.trace("Checking whether to register {} from route: {}", service, route); 876 877 ManagementAgent agent = getManagementStrategy().getManagementAgent(); 878 if (agent == null) { 879 // do not register if no agent 880 return false; 881 } 882 883 // always register if we are starting CamelContext 884 if (getCamelContext().getStatus().isStarting()) { 885 return true; 886 } 887 888 // always register if we are setting up routes 889 if (getCamelContext().isSetupRoutes()) { 890 return true; 891 } 892 893 // register if always is enabled 894 if (agent.getRegisterAlways()) { 895 return true; 896 } 897 898 // is it a known route then always accept 899 if (route != null && knowRouteIds.contains(route.getId())) { 900 return true; 901 } 902 903 // only register if we are starting a new route, and current thread is in starting routes mode 904 if (agent.getRegisterNewRoutes()) { 905 // no specific route, then fallback to see if this thread is starting routes 906 // which is kept as state on the camel context 907 return getCamelContext().isStartingRoutes(); 908 } 909 910 return false; 911 } 912 913 @Override 914 protected void doStart() throws Exception { 915 ObjectHelper.notNull(camelContext, "CamelContext"); 916 917 // defer starting the timer manager until CamelContext has been fully started 918 camelContext.addStartupListener(loadTimerStartupListener); 919 } 920 921 private final class TimerListenerManagerStartupListener implements StartupListener { 922 923 @Override 924 public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception { 925 // we are disabled either if configured explicit, or if level is off 926 boolean load = camelContext.getManagementStrategy().getManagementAgent().getLoadStatisticsEnabled() != null 927 && camelContext.getManagementStrategy().getManagementAgent().getLoadStatisticsEnabled(); 928 boolean disabled = !load || camelContext.getManagementStrategy().getStatisticsLevel() == ManagementStatisticsLevel.Off; 929 930 LOG.debug("Load performance statistics {}", disabled ? "disabled" : "enabled"); 931 if (!disabled) { 932 // must use 1 sec interval as the load statistics is based on 1 sec calculations 933 loadTimer.setInterval(1000); 934 // we have to defer enlisting timer lister manager as a service until CamelContext has been started 935 getCamelContext().addService(loadTimer); 936 } 937 } 938 } 939 940 @Override 941 protected void doStop() throws Exception { 942 initialized = false; 943 knowRouteIds.clear(); 944 preServices.clear(); 945 wrappedProcessors.clear(); 946 managedTracers.clear(); 947 managedBacklogTracers.clear(); 948 managedBacklogDebuggers.clear(); 949 managedThreadPools.clear(); 950 } 951 952 /** 953 * Class which holds any pre registration details. 954 * 955 * @see org.apache.camel.management.DefaultManagementLifecycleStrategy#enlistPreRegisteredServices() 956 */ 957 private static final class PreRegisterService { 958 959 private String name; 960 private Component component; 961 private Endpoint endpoint; 962 private CamelContext camelContext; 963 private Service service; 964 private Route route; 965 966 public void onComponentAdd(String name, Component component) { 967 this.name = name; 968 this.component = component; 969 } 970 971 public void onEndpointAdd(Endpoint endpoint) { 972 this.endpoint = endpoint; 973 } 974 975 public void onServiceAdd(CamelContext camelContext, Service service, Route route) { 976 this.camelContext = camelContext; 977 this.service = service; 978 this.route = route; 979 } 980 981 public String getName() { 982 return name; 983 } 984 985 public Component getComponent() { 986 return component; 987 } 988 989 public Endpoint getEndpoint() { 990 return endpoint; 991 } 992 993 public CamelContext getCamelContext() { 994 return camelContext; 995 } 996 997 public Service getService() { 998 return service; 999 } 1000 1001 public Route getRoute() { 1002 return route; 1003 } 1004 } 1005 1006} 1007