001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.Map; 020 021import org.apache.camel.AsyncCallback; 022import org.apache.camel.AsyncProcessor; 023import org.apache.camel.AsyncProducerCallback; 024import org.apache.camel.CamelContext; 025import org.apache.camel.Endpoint; 026import org.apache.camel.Exchange; 027import org.apache.camel.ExchangePattern; 028import org.apache.camel.FailedToCreateProducerException; 029import org.apache.camel.Processor; 030import org.apache.camel.Producer; 031import org.apache.camel.ProducerCallback; 032import org.apache.camel.ServicePoolAware; 033import org.apache.camel.processor.UnitOfWorkProducer; 034import org.apache.camel.spi.EndpointUtilizationStatistics; 035import org.apache.camel.spi.ServicePool; 036import org.apache.camel.support.ServiceSupport; 037import org.apache.camel.util.AsyncProcessorConverterHelper; 038import org.apache.camel.util.CamelContextHelper; 039import org.apache.camel.util.EventHelper; 040import org.apache.camel.util.LRUCache; 041import org.apache.camel.util.ServiceHelper; 042import org.apache.camel.util.StopWatch; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * Cache containing created {@link Producer}. 048 * 049 * @version 050 */ 051public class ProducerCache extends ServiceSupport { 052 private static final Logger LOG = LoggerFactory.getLogger(ProducerCache.class); 053 054 private final CamelContext camelContext; 055 private final ServicePool<Endpoint, Producer> pool; 056 private final Map<String, Producer> producers; 057 private final Object source; 058 059 private EndpointUtilizationStatistics statistics; 060 private boolean eventNotifierEnabled = true; 061 private boolean extendedStatistics; 062 private int maxCacheSize; 063 private boolean stopServicePool; 064 065 public ProducerCache(Object source, CamelContext camelContext) { 066 this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext)); 067 } 068 069 public ProducerCache(Object source, CamelContext camelContext, int cacheSize) { 070 this(source, camelContext, null, createLRUCache(cacheSize)); 071 } 072 073 public ProducerCache(Object source, CamelContext camelContext, Map<String, Producer> cache) { 074 this(source, camelContext, null, cache); 075 } 076 077 public ProducerCache(Object source, CamelContext camelContext, ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) { 078 this.source = source; 079 this.camelContext = camelContext; 080 if (producerServicePool == null) { 081 // use shared producer pool which lifecycle is managed by CamelContext 082 this.pool = camelContext.getProducerServicePool(); 083 this.stopServicePool = false; 084 } else { 085 this.pool = producerServicePool; 086 this.stopServicePool = true; 087 } 088 this.producers = cache; 089 if (producers instanceof LRUCache) { 090 maxCacheSize = ((LRUCache) producers).getMaxCacheSize(); 091 } 092 093 // only if JMX is enabled 094 if (camelContext.getManagementStrategy().getManagementAgent() != null) { 095 this.extendedStatistics = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isExtended(); 096 } else { 097 this.extendedStatistics = false; 098 } 099 } 100 101 public boolean isEventNotifierEnabled() { 102 return eventNotifierEnabled; 103 } 104 105 /** 106 * Whether {@link org.apache.camel.spi.EventNotifier} is enabled 107 */ 108 public void setEventNotifierEnabled(boolean eventNotifierEnabled) { 109 this.eventNotifierEnabled = eventNotifierEnabled; 110 } 111 112 public boolean isExtendedStatistics() { 113 return extendedStatistics; 114 } 115 116 /** 117 * Whether extended JMX statistics is enabled for {@link org.apache.camel.spi.EndpointUtilizationStatistics} 118 */ 119 public void setExtendedStatistics(boolean extendedStatistics) { 120 this.extendedStatistics = extendedStatistics; 121 } 122 123 /** 124 * Creates the {@link LRUCache} to be used. 125 * <p/> 126 * This implementation returns a {@link LRUCache} instance. 127 128 * @param cacheSize the cache size 129 * @return the cache 130 */ 131 protected static LRUCache<String, Producer> createLRUCache(int cacheSize) { 132 // Use a regular cache as we want to ensure that the lifecycle of the producers 133 // being cache is properly handled, such as they are stopped when being evicted 134 // or when this cache is stopped. This is needed as some producers requires to 135 // be stopped so they can shutdown internal resources that otherwise may cause leaks 136 return new LRUCache<String, Producer>(cacheSize); 137 } 138 139 public CamelContext getCamelContext() { 140 return camelContext; 141 } 142 143 /** 144 * Gets the source which uses this cache 145 * 146 * @return the source 147 */ 148 public Object getSource() { 149 return source; 150 } 151 152 /** 153 * Acquires a pooled producer which you <b>must</b> release back again after usage using the 154 * {@link #releaseProducer(org.apache.camel.Endpoint, org.apache.camel.Producer)} method. 155 * 156 * @param endpoint the endpoint 157 * @return the producer 158 */ 159 public Producer acquireProducer(Endpoint endpoint) { 160 return doGetProducer(endpoint, true); 161 } 162 163 /** 164 * Releases an acquired producer back after usage. 165 * 166 * @param endpoint the endpoint 167 * @param producer the producer to release 168 * @throws Exception can be thrown if error stopping producer if that was needed. 169 */ 170 public void releaseProducer(Endpoint endpoint, Producer producer) throws Exception { 171 if (producer instanceof ServicePoolAware) { 172 // release back to the pool 173 pool.release(endpoint, producer); 174 } else if (!producer.isSingleton()) { 175 // stop and shutdown non-singleton producers as we should not leak resources 176 ServiceHelper.stopAndShutdownService(producer); 177 } 178 } 179 180 /** 181 * Starts the {@link Producer} to be used for sending to the given endpoint 182 * <p/> 183 * This can be used to early start the {@link Producer} to ensure it can be created, 184 * such as when Camel is started. This allows to fail fast in case the {@link Producer} 185 * could not be started. 186 * 187 * @param endpoint the endpoint to send the exchange to 188 * @throws Exception is thrown if failed to create or start the {@link Producer} 189 */ 190 public void startProducer(Endpoint endpoint) throws Exception { 191 Producer producer = acquireProducer(endpoint); 192 releaseProducer(endpoint, producer); 193 } 194 195 /** 196 * Sends the exchange to the given endpoint. 197 * <p> 198 * This method will <b>not</b> throw an exception. If processing of the given 199 * Exchange failed then the exception is stored on the provided Exchange 200 * 201 * @param endpoint the endpoint to send the exchange to 202 * @param exchange the exchange to send 203 */ 204 public void send(Endpoint endpoint, Exchange exchange) { 205 sendExchange(endpoint, null, null, exchange); 206 } 207 208 /** 209 * Sends an exchange to an endpoint using a supplied 210 * {@link Processor} to populate the exchange 211 * <p> 212 * This method will <b>not</b> throw an exception. If processing of the given 213 * Exchange failed then the exception is stored on the return Exchange 214 * 215 * @param endpoint the endpoint to send the exchange to 216 * @param processor the transformer used to populate the new exchange 217 * @throws org.apache.camel.CamelExecutionException is thrown if sending failed 218 * @return the exchange 219 */ 220 public Exchange send(Endpoint endpoint, Processor processor) { 221 return sendExchange(endpoint, null, processor, null); 222 } 223 224 /** 225 * Sends an exchange to an endpoint using a supplied 226 * {@link Processor} to populate the exchange 227 * <p> 228 * This method will <b>not</b> throw an exception. If processing of the given 229 * Exchange failed then the exception is stored on the return Exchange 230 * 231 * @param endpoint the endpoint to send the exchange to 232 * @param pattern the message {@link ExchangePattern} such as 233 * {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} 234 * @param processor the transformer used to populate the new exchange 235 * @return the exchange 236 */ 237 public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) { 238 return sendExchange(endpoint, pattern, processor, null); 239 } 240 241 /** 242 * Sends an exchange to an endpoint using a supplied callback, using the synchronous processing. 243 * <p/> 244 * If an exception was thrown during processing, it would be set on the given Exchange 245 * 246 * @param endpoint the endpoint to send the exchange to 247 * @param exchange the exchange, can be <tt>null</tt> if so then create a new exchange from the producer 248 * @param pattern the exchange pattern, can be <tt>null</tt> 249 * @param callback the callback 250 * @return the response from the callback 251 * @see #doInAsyncProducer(org.apache.camel.Endpoint, org.apache.camel.Exchange, org.apache.camel.ExchangePattern, org.apache.camel.AsyncCallback, org.apache.camel.AsyncProducerCallback) 252 */ 253 public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, ProducerCallback<T> callback) { 254 T answer = null; 255 256 // get the producer and we do not mind if its pooled as we can handle returning it back to the pool 257 Producer producer = doGetProducer(endpoint, true); 258 259 if (producer == null) { 260 if (isStopped()) { 261 LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange); 262 return null; 263 } else { 264 throw new IllegalStateException("No producer, this processor has not been started: " + this); 265 } 266 } 267 268 try { 269 // invoke the callback 270 answer = callback.doInProducer(producer, exchange, pattern); 271 } catch (Throwable e) { 272 if (exchange != null) { 273 exchange.setException(e); 274 } 275 } finally { 276 if (producer instanceof ServicePoolAware) { 277 // release back to the pool 278 pool.release(endpoint, producer); 279 } else if (!producer.isSingleton()) { 280 // stop and shutdown non-singleton producers as we should not leak resources 281 try { 282 ServiceHelper.stopAndShutdownService(producer); 283 } catch (Exception e) { 284 // ignore and continue 285 LOG.warn("Error stopping/shutting down producer: " + producer, e); 286 } 287 } 288 } 289 290 return answer; 291 } 292 293 /** 294 * Sends an exchange to an endpoint using a supplied callback supporting the asynchronous routing engine. 295 * <p/> 296 * If an exception was thrown during processing, it would be set on the given Exchange 297 * 298 * @param endpoint the endpoint to send the exchange to 299 * @param exchange the exchange, can be <tt>null</tt> if so then create a new exchange from the producer 300 * @param pattern the exchange pattern, can be <tt>null</tt> 301 * @param callback the asynchronous callback 302 * @param producerCallback the producer template callback to be executed 303 * @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously 304 */ 305 public boolean doInAsyncProducer(final Endpoint endpoint, final Exchange exchange, final ExchangePattern pattern, 306 final AsyncCallback callback, final AsyncProducerCallback producerCallback) { 307 308 Producer target; 309 try { 310 // get the producer and we do not mind if its pooled as we can handle returning it back to the pool 311 target = doGetProducer(endpoint, true); 312 313 if (target == null) { 314 if (isStopped()) { 315 LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange); 316 callback.done(true); 317 return true; 318 } else { 319 exchange.setException(new IllegalStateException("No producer, this processor has not been started: " + this)); 320 callback.done(true); 321 return true; 322 } 323 } 324 } catch (Throwable e) { 325 exchange.setException(e); 326 callback.done(true); 327 return true; 328 } 329 330 final Producer producer = target; 331 332 // record timing for sending the exchange using the producer 333 final StopWatch watch = eventNotifierEnabled && exchange != null ? new StopWatch() : null; 334 335 try { 336 if (eventNotifierEnabled && exchange != null) { 337 EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); 338 } 339 // invoke the callback 340 AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(producer); 341 return producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, new AsyncCallback() { 342 @Override 343 public void done(boolean doneSync) { 344 try { 345 if (eventNotifierEnabled && watch != null) { 346 long timeTaken = watch.stop(); 347 // emit event that the exchange was sent to the endpoint 348 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 349 } 350 351 if (producer instanceof ServicePoolAware) { 352 // release back to the pool 353 pool.release(endpoint, producer); 354 } else if (!producer.isSingleton()) { 355 // stop and shutdown non-singleton producers as we should not leak resources 356 try { 357 ServiceHelper.stopAndShutdownService(producer); 358 } catch (Exception e) { 359 // ignore and continue 360 LOG.warn("Error stopping/shutting down producer: " + producer, e); 361 } 362 } 363 } finally { 364 callback.done(doneSync); 365 } 366 } 367 }); 368 } catch (Throwable e) { 369 // ensure exceptions is caught and set on the exchange 370 if (exchange != null) { 371 exchange.setException(e); 372 } 373 callback.done(true); 374 return true; 375 } 376 } 377 378 protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern, 379 final Processor processor, Exchange exchange) { 380 return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() { 381 public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) { 382 if (exchange == null) { 383 exchange = pattern != null ? producer.getEndpoint().createExchange(pattern) : producer.getEndpoint().createExchange(); 384 } 385 386 if (processor != null) { 387 // lets populate using the processor callback 388 try { 389 processor.process(exchange); 390 } catch (Exception e) { 391 // populate failed so return 392 exchange.setException(e); 393 return exchange; 394 } 395 } 396 397 // now lets dispatch 398 LOG.debug(">>>> {} {}", endpoint, exchange); 399 400 // set property which endpoint we send to 401 exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); 402 403 // send the exchange using the processor 404 StopWatch watch = null; 405 try { 406 if (eventNotifierEnabled) { 407 watch = new StopWatch(); 408 EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); 409 } 410 // ensure we run in an unit of work 411 Producer target = new UnitOfWorkProducer(producer); 412 target.process(exchange); 413 } catch (Throwable e) { 414 // ensure exceptions is caught and set on the exchange 415 exchange.setException(e); 416 } finally { 417 // emit event that the exchange was sent to the endpoint 418 if (eventNotifierEnabled && watch != null) { 419 long timeTaken = watch.stop(); 420 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 421 } 422 } 423 return exchange; 424 } 425 }); 426 } 427 428 protected synchronized Producer doGetProducer(Endpoint endpoint, boolean pooled) { 429 String key = endpoint.getEndpointUri(); 430 Producer answer = producers.get(key); 431 if (pooled && answer == null) { 432 // try acquire from connection pool 433 answer = pool.acquire(endpoint); 434 } 435 436 if (answer == null) { 437 // create a new producer 438 try { 439 answer = endpoint.createProducer(); 440 // add as service which will also start the service 441 // (false => we and handling the lifecycle of the producer in this cache) 442 getCamelContext().addService(answer, false); 443 } catch (Exception e) { 444 throw new FailedToCreateProducerException(endpoint, e); 445 } 446 447 // add producer to cache or pool if applicable 448 if (pooled && answer instanceof ServicePoolAware) { 449 LOG.debug("Adding to producer service pool with key: {} for producer: {}", endpoint, answer); 450 answer = pool.addAndAcquire(endpoint, answer); 451 } else if (answer.isSingleton()) { 452 LOG.debug("Adding to producer cache with key: {} for producer: {}", endpoint, answer); 453 producers.put(key, answer); 454 } 455 } 456 457 if (answer != null) { 458 // record statistics 459 if (extendedStatistics) { 460 statistics.onHit(key); 461 } 462 } 463 464 return answer; 465 } 466 467 protected void doStart() throws Exception { 468 if (extendedStatistics) { 469 int max = maxCacheSize == 0 ? CamelContextHelper.getMaximumCachePoolSize(camelContext) : maxCacheSize; 470 statistics = new DefaultEndpointUtilizationStatistics(max); 471 } 472 473 ServiceHelper.startServices(producers.values()); 474 ServiceHelper.startServices(statistics, pool); 475 } 476 477 protected void doStop() throws Exception { 478 // when stopping we intend to shutdown 479 ServiceHelper.stopAndShutdownService(statistics); 480 if (stopServicePool) { 481 ServiceHelper.stopAndShutdownService(pool); 482 } 483 try { 484 ServiceHelper.stopAndShutdownServices(producers.values()); 485 } finally { 486 // ensure producers are removed, and also from JMX 487 for (Producer producer : producers.values()) { 488 getCamelContext().removeService(producer); 489 } 490 } 491 producers.clear(); 492 if (statistics != null) { 493 statistics.clear(); 494 } 495 } 496 497 /** 498 * Returns the current size of the cache 499 * 500 * @return the current size 501 */ 502 public int size() { 503 int size = producers.size(); 504 size += pool.size(); 505 506 LOG.trace("size = {}", size); 507 return size; 508 } 509 510 /** 511 * Gets the maximum cache size (capacity). 512 * <p/> 513 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 514 * 515 * @return the capacity 516 */ 517 public int getCapacity() { 518 int capacity = -1; 519 if (producers instanceof LRUCache) { 520 LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; 521 capacity = cache.getMaxCacheSize(); 522 } 523 return capacity; 524 } 525 526 /** 527 * Gets the cache hits statistic 528 * <p/> 529 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 530 * 531 * @return the hits 532 */ 533 public long getHits() { 534 long hits = -1; 535 if (producers instanceof LRUCache) { 536 LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; 537 hits = cache.getHits(); 538 } 539 return hits; 540 } 541 542 /** 543 * Gets the cache misses statistic 544 * <p/> 545 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 546 * 547 * @return the misses 548 */ 549 public long getMisses() { 550 long misses = -1; 551 if (producers instanceof LRUCache) { 552 LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; 553 misses = cache.getMisses(); 554 } 555 return misses; 556 } 557 558 /** 559 * Gets the cache evicted statistic 560 * <p/> 561 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 562 * 563 * @return the evicted 564 */ 565 public long getEvicted() { 566 long evicted = -1; 567 if (producers instanceof LRUCache) { 568 LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; 569 evicted = cache.getEvicted(); 570 } 571 return evicted; 572 } 573 574 /** 575 * Resets the cache statistics 576 */ 577 public void resetCacheStatistics() { 578 if (producers instanceof LRUCache) { 579 LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; 580 cache.resetStatistics(); 581 } 582 if (statistics != null) { 583 statistics.clear(); 584 } 585 } 586 587 /** 588 * Purges this cache 589 */ 590 public synchronized void purge() { 591 producers.clear(); 592 pool.purge(); 593 if (statistics != null) { 594 statistics.clear(); 595 } 596 } 597 598 public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { 599 return statistics; 600 } 601 602 @Override 603 public String toString() { 604 return "ProducerCache for source: " + source + ", capacity: " + getCapacity(); 605 } 606}