001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.Map; 020import java.util.concurrent.CompletableFuture; 021 022import org.apache.camel.AsyncCallback; 023import org.apache.camel.AsyncProcessor; 024import org.apache.camel.AsyncProducerCallback; 025import org.apache.camel.CamelContext; 026import org.apache.camel.Endpoint; 027import org.apache.camel.Exchange; 028import org.apache.camel.ExchangePattern; 029import org.apache.camel.FailedToCreateProducerException; 030import org.apache.camel.Processor; 031import org.apache.camel.Producer; 032import org.apache.camel.ProducerCallback; 033import org.apache.camel.ServicePoolAware; 034import org.apache.camel.processor.CamelInternalProcessor; 035import org.apache.camel.processor.SharedCamelInternalProcessor; 036import org.apache.camel.spi.EndpointUtilizationStatistics; 037import org.apache.camel.spi.ServicePool; 038import org.apache.camel.support.ServiceSupport; 039import org.apache.camel.util.AsyncProcessorConverterHelper; 040import org.apache.camel.util.CamelContextHelper; 041import org.apache.camel.util.EventHelper; 042import org.apache.camel.util.LRUCache; 043import org.apache.camel.util.LRUCacheFactory; 044import org.apache.camel.util.ServiceHelper; 045import org.apache.camel.util.StopWatch; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * Cache containing created {@link Producer}. 051 * 052 * @version 053 */ 054public class ProducerCache extends ServiceSupport { 055 private static final Logger LOG = LoggerFactory.getLogger(ProducerCache.class); 056 057 private final CamelContext camelContext; 058 private final ServicePool<Endpoint, Producer> pool; 059 private final Map<String, Producer> producers; 060 private final Object source; 061 private final SharedCamelInternalProcessor internalProcessor; 062 063 private EndpointUtilizationStatistics statistics; 064 private boolean eventNotifierEnabled = true; 065 private boolean extendedStatistics; 066 private int maxCacheSize; 067 private boolean stopServicePool; 068 069 public ProducerCache(Object source, CamelContext camelContext) { 070 this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext)); 071 } 072 073 public ProducerCache(Object source, CamelContext camelContext, int cacheSize) { 074 this(source, camelContext, null, createLRUCache(cacheSize)); 075 } 076 077 public ProducerCache(Object source, CamelContext camelContext, Map<String, Producer> cache) { 078 this(source, camelContext, null, cache); 079 } 080 081 public ProducerCache(Object source, CamelContext camelContext, ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) { 082 this.source = source; 083 this.camelContext = camelContext; 084 if (producerServicePool == null) { 085 // use shared producer pool which lifecycle is managed by CamelContext 086 this.pool = camelContext.getProducerServicePool(); 087 this.stopServicePool = false; 088 } else { 089 this.pool = producerServicePool; 090 this.stopServicePool = true; 091 } 092 this.producers = cache; 093 if (producers instanceof LRUCache) { 094 maxCacheSize = ((LRUCache) producers).getMaxCacheSize(); 095 } 096 097 // only if JMX is enabled 098 if (camelContext.getManagementStrategy().getManagementAgent() != null) { 099 this.extendedStatistics = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isExtended(); 100 } else { 101 this.extendedStatistics = false; 102 } 103 104 // internal processor used for sending 105 internalProcessor = new SharedCamelInternalProcessor(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null)); 106 } 107 108 public boolean isEventNotifierEnabled() { 109 return eventNotifierEnabled; 110 } 111 112 /** 113 * Whether {@link org.apache.camel.spi.EventNotifier} is enabled 114 */ 115 public void setEventNotifierEnabled(boolean eventNotifierEnabled) { 116 this.eventNotifierEnabled = eventNotifierEnabled; 117 } 118 119 public boolean isExtendedStatistics() { 120 return extendedStatistics; 121 } 122 123 /** 124 * Whether extended JMX statistics is enabled for {@link org.apache.camel.spi.EndpointUtilizationStatistics} 125 */ 126 public void setExtendedStatistics(boolean extendedStatistics) { 127 this.extendedStatistics = extendedStatistics; 128 } 129 130 /** 131 * Creates the {@link LRUCache} to be used. 132 * <p/> 133 * This implementation returns a {@link LRUCache} instance. 134 135 * @param cacheSize the cache size 136 * @return the cache 137 */ 138 @SuppressWarnings("unchecked") 139 protected static LRUCache<String, Producer> createLRUCache(int cacheSize) { 140 // Use a regular cache as we want to ensure that the lifecycle of the producers 141 // being cache is properly handled, such as they are stopped when being evicted 142 // or when this cache is stopped. This is needed as some producers requires to 143 // be stopped so they can shutdown internal resources that otherwise may cause leaks 144 return LRUCacheFactory.newLRUCache(cacheSize); 145 } 146 147 public CamelContext getCamelContext() { 148 return camelContext; 149 } 150 151 /** 152 * Gets the source which uses this cache 153 * 154 * @return the source 155 */ 156 public Object getSource() { 157 return source; 158 } 159 160 /** 161 * Acquires a pooled producer which you <b>must</b> release back again after usage using the 162 * {@link #releaseProducer(org.apache.camel.Endpoint, org.apache.camel.Producer)} method. 163 * 164 * @param endpoint the endpoint 165 * @return the producer 166 */ 167 public Producer acquireProducer(Endpoint endpoint) { 168 return doGetProducer(endpoint, true); 169 } 170 171 /** 172 * Releases an acquired producer back after usage. 173 * 174 * @param endpoint the endpoint 175 * @param producer the producer to release 176 * @throws Exception can be thrown if error stopping producer if that was needed. 177 */ 178 public void releaseProducer(Endpoint endpoint, Producer producer) throws Exception { 179 if (producer instanceof ServicePoolAware) { 180 // release back to the pool 181 pool.release(endpoint, producer); 182 } else if (!producer.isSingleton()) { 183 // stop and shutdown non-singleton producers as we should not leak resources 184 ServiceHelper.stopAndShutdownService(producer); 185 } 186 } 187 188 /** 189 * Starts the {@link Producer} to be used for sending to the given endpoint 190 * <p/> 191 * This can be used to early start the {@link Producer} to ensure it can be created, 192 * such as when Camel is started. This allows to fail fast in case the {@link Producer} 193 * could not be started. 194 * 195 * @param endpoint the endpoint to send the exchange to 196 * @throws Exception is thrown if failed to create or start the {@link Producer} 197 */ 198 public void startProducer(Endpoint endpoint) throws Exception { 199 Producer producer = acquireProducer(endpoint); 200 releaseProducer(endpoint, producer); 201 } 202 203 /** 204 * Sends the exchange to the given endpoint. 205 * <p> 206 * This method will <b>not</b> throw an exception. If processing of the given 207 * Exchange failed then the exception is stored on the provided Exchange 208 * 209 * @param endpoint the endpoint to send the exchange to 210 * @param exchange the exchange to send 211 */ 212 public void send(Endpoint endpoint, Exchange exchange) { 213 sendExchange(endpoint, null, null, null, exchange); 214 } 215 216 /** 217 * Sends an exchange to an endpoint using a supplied 218 * {@link Processor} to populate the exchange 219 * <p> 220 * This method will <b>not</b> throw an exception. If processing of the given 221 * Exchange failed then the exception is stored on the return Exchange 222 * 223 * @param endpoint the endpoint to send the exchange to 224 * @param processor the transformer used to populate the new exchange 225 * @throws org.apache.camel.CamelExecutionException is thrown if sending failed 226 * @return the exchange 227 */ 228 public Exchange send(Endpoint endpoint, Processor processor) { 229 return sendExchange(endpoint, null, processor, null, null); 230 } 231 232 /** 233 * Sends an exchange to an endpoint using a supplied 234 * {@link Processor} to populate the exchange 235 * <p> 236 * This method will <b>not</b> throw an exception. If processing of the given 237 * Exchange failed then the exception is stored on the return Exchange 238 * 239 * @param endpoint the endpoint to send the exchange to 240 * @param pattern the message {@link ExchangePattern} such as 241 * {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} 242 * @param processor the transformer used to populate the new exchange 243 * @return the exchange 244 */ 245 public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) { 246 return sendExchange(endpoint, pattern, processor, null, null); 247 } 248 249 /** 250 * Sends an exchange to an endpoint using a supplied 251 * {@link Processor} to populate the exchange 252 * <p> 253 * This method will <b>not</b> throw an exception. If processing of the given 254 * Exchange failed then the exception is stored on the return Exchange 255 * 256 * @param endpoint the endpoint to send the exchange to 257 * @param pattern the message {@link ExchangePattern} such as 258 * {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} 259 * @param processor the transformer used to populate the new exchange 260 * @param resultProcessor a processor to process the exchange when the send is complete. 261 * @return the exchange 262 */ 263 public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor, Processor resultProcessor) { 264 return sendExchange(endpoint, pattern, processor, resultProcessor, null); 265 } 266 267 /** 268 * Asynchronously sends an exchange to an endpoint using a supplied 269 * {@link Processor} to populate the exchange 270 * <p> 271 * This method will <b>neither</b> throw an exception <b>nor</b> complete future exceptionally. 272 * If processing of the given Exchange failed then the exception is stored on the return Exchange 273 * 274 * @param endpoint the endpoint to send the exchange to 275 * @param pattern the message {@link ExchangePattern} such as 276 * {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} 277 * @param processor the transformer used to populate the new exchange 278 * @param resultProcessor a processor to process the exchange when the send is complete. 279 * @param future the preexisting future to complete when processing is done or null if to create new one 280 * @return future that completes with exchange when processing is done. Either passed into future parameter 281 * or new one if parameter was null 282 */ 283 public CompletableFuture<Exchange> asyncSend(Endpoint endpoint, ExchangePattern pattern, Processor processor, Processor resultProcessor, 284 CompletableFuture<Exchange> future) { 285 return asyncSendExchange(endpoint, pattern, processor, resultProcessor, null, future); 286 } 287 288 /** 289 * Asynchronously sends an exchange to an endpoint using a supplied 290 * {@link Processor} to populate the exchange 291 * <p> 292 * This method will <b>neither</b> throw an exception <b>nor</b> complete future exceptionally. 293 * If processing of the given Exchange failed then the exception is stored on the return Exchange 294 * 295 * @param endpoint the endpoint to send the exchange to 296 * @param pattern the message {@link ExchangePattern} such as 297 * {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} 298 * @param processor the transformer used to populate the new exchange 299 * @param resultProcessor a processor to process the exchange when the send is complete. 300 * @param exchange an exchange to use in processing. Exchange will be created if parameter is null. 301 * @param future the preexisting future to complete when processing is done or null if to create new one 302 * @return future that completes with exchange when processing is done. Either passed into future parameter 303 * or new one if parameter was null 304 */ 305 public CompletableFuture<Exchange> asyncSendExchange(final Endpoint endpoint, ExchangePattern pattern, 306 final Processor processor, final Processor resultProcessor, Exchange exchange, 307 CompletableFuture<Exchange> future) { 308 AsyncCallbackToCompletableFutureAdapter<Exchange> futureAdapter = new AsyncCallbackToCompletableFutureAdapter<>(future, exchange); 309 doInAsyncProducer(endpoint, exchange, pattern, futureAdapter, 310 (producer, asyncProducer, innerExchange, exchangePattern, producerCallback) -> { 311 if (innerExchange == null) { 312 innerExchange = pattern != null 313 ? producer.getEndpoint().createExchange(pattern) 314 : producer.getEndpoint().createExchange(); 315 futureAdapter.setResult(innerExchange); 316 } 317 318 if (processor != null) { 319 // lets populate using the processor callback 320 AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(processor); 321 try { 322 final Exchange finalExchange = innerExchange; 323 asyncProcessor.process(innerExchange, 324 doneSync -> asyncDispatchExchange(endpoint, producer, resultProcessor, 325 finalExchange, producerCallback)); 326 return false; 327 } catch (Throwable e) { 328 // populate failed so return 329 innerExchange.setException(e); 330 producerCallback.done(true); 331 return true; 332 } 333 } 334 335 return asyncDispatchExchange(endpoint, producer, resultProcessor, innerExchange, producerCallback); 336 }); 337 return futureAdapter.getFuture(); 338 } 339 340 /** 341 * Sends an exchange to an endpoint using a supplied callback, using the synchronous processing. 342 * <p/> 343 * If an exception was thrown during processing, it would be set on the given Exchange 344 * 345 * @param endpoint the endpoint to send the exchange to 346 * @param exchange the exchange, can be <tt>null</tt> if so then create a new exchange from the producer 347 * @param pattern the exchange pattern, can be <tt>null</tt> 348 * @param callback the callback 349 * @return the response from the callback 350 * @see #doInAsyncProducer(org.apache.camel.Endpoint, org.apache.camel.Exchange, org.apache.camel.ExchangePattern, org.apache.camel.AsyncCallback, org.apache.camel.AsyncProducerCallback) 351 */ 352 public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, ProducerCallback<T> callback) { 353 T answer = null; 354 355 // get the producer and we do not mind if its pooled as we can handle returning it back to the pool 356 Producer producer = doGetProducer(endpoint, true); 357 358 if (producer == null) { 359 if (isStopped()) { 360 LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange); 361 return null; 362 } else { 363 throw new IllegalStateException("No producer, this processor has not been started: " + this); 364 } 365 } 366 367 try { 368 // invoke the callback 369 answer = callback.doInProducer(producer, exchange, pattern); 370 } catch (Throwable e) { 371 if (exchange != null) { 372 exchange.setException(e); 373 } 374 } finally { 375 if (producer instanceof ServicePoolAware) { 376 // release back to the pool 377 pool.release(endpoint, producer); 378 } else if (!producer.isSingleton()) { 379 // stop and shutdown non-singleton producers as we should not leak resources 380 try { 381 ServiceHelper.stopAndShutdownService(producer); 382 } catch (Exception e) { 383 // ignore and continue 384 LOG.warn("Error stopping/shutting down producer: " + producer, e); 385 } 386 } 387 } 388 389 return answer; 390 } 391 392 /** 393 * Sends an exchange to an endpoint using a supplied callback supporting the asynchronous routing engine. 394 * <p/> 395 * If an exception was thrown during processing, it would be set on the given Exchange 396 * 397 * @param endpoint the endpoint to send the exchange to 398 * @param exchange the exchange, can be <tt>null</tt> if so then create a new exchange from the producer 399 * @param pattern the exchange pattern, can be <tt>null</tt> 400 * @param callback the asynchronous callback 401 * @param producerCallback the producer template callback to be executed 402 * @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously 403 */ 404 public boolean doInAsyncProducer(final Endpoint endpoint, final Exchange exchange, final ExchangePattern pattern, 405 final AsyncCallback callback, final AsyncProducerCallback producerCallback) { 406 407 Producer target; 408 try { 409 // get the producer and we do not mind if its pooled as we can handle returning it back to the pool 410 target = doGetProducer(endpoint, true); 411 412 if (target == null) { 413 if (isStopped()) { 414 LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange); 415 callback.done(true); 416 return true; 417 } else { 418 exchange.setException(new IllegalStateException("No producer, this processor has not been started: " + this)); 419 callback.done(true); 420 return true; 421 } 422 } 423 } catch (Throwable e) { 424 exchange.setException(e); 425 callback.done(true); 426 return true; 427 } 428 429 final Producer producer = target; 430 431 try { 432 StopWatch sw = null; 433 if (eventNotifierEnabled && exchange != null) { 434 boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); 435 if (sending) { 436 sw = new StopWatch(); 437 } 438 } 439 440 // record timing for sending the exchange using the producer 441 final StopWatch watch = sw; 442 443 // invoke the callback 444 AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(producer); 445 return producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, doneSync -> { 446 try { 447 if (eventNotifierEnabled && watch != null) { 448 long timeTaken = watch.taken(); 449 // emit event that the exchange was sent to the endpoint 450 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 451 } 452 453 if (producer instanceof ServicePoolAware) { 454 // release back to the pool 455 pool.release(endpoint, producer); 456 } else if (!producer.isSingleton()) { 457 // stop and shutdown non-singleton producers as we should not leak resources 458 try { 459 ServiceHelper.stopAndShutdownService(producer); 460 } catch (Exception e) { 461 // ignore and continue 462 LOG.warn("Error stopping/shutting down producer: " + producer, e); 463 } 464 } 465 } finally { 466 callback.done(doneSync); 467 } 468 }); 469 } catch (Throwable e) { 470 // ensure exceptions is caught and set on the exchange 471 if (exchange != null) { 472 exchange.setException(e); 473 } 474 callback.done(true); 475 return true; 476 } 477 } 478 479 protected boolean asyncDispatchExchange(final Endpoint endpoint, Producer producer, 480 final Processor resultProcessor, Exchange exchange, AsyncCallback callback) { 481 // now lets dispatch 482 LOG.debug(">>>> {} {}", endpoint, exchange); 483 484 // set property which endpoint we send to 485 exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); 486 487 // send the exchange using the processor 488 try { 489 if (eventNotifierEnabled) { 490 callback = new EventNotifierCallback(callback, exchange, endpoint); 491 } 492 AsyncProcessor target = prepareProducer(producer); 493 // invoke the asynchronous method 494 return internalProcessor.process(exchange, callback, target, resultProcessor); 495 } catch (Throwable e) { 496 // ensure exceptions is caught and set on the exchange 497 exchange.setException(e); 498 callback.done(true); 499 return true; 500 } 501 502 } 503 504 protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern, 505 final Processor processor, final Processor resultProcessor, Exchange exchange) { 506 return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() { 507 public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) { 508 if (exchange == null) { 509 exchange = pattern != null ? producer.getEndpoint().createExchange(pattern) : producer.getEndpoint().createExchange(); 510 } 511 512 if (processor != null) { 513 // lets populate using the processor callback 514 try { 515 processor.process(exchange); 516 } catch (Throwable e) { 517 // populate failed so return 518 exchange.setException(e); 519 return exchange; 520 } 521 } 522 523 // now lets dispatch 524 LOG.debug(">>>> {} {}", endpoint, exchange); 525 526 // set property which endpoint we send to 527 exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); 528 529 // send the exchange using the processor 530 StopWatch watch = null; 531 try { 532 if (eventNotifierEnabled) { 533 boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); 534 if (sending) { 535 watch = new StopWatch(); 536 } 537 } 538 539 AsyncProcessor target = prepareProducer(producer); 540 // invoke the synchronous method 541 internalProcessor.process(exchange, target, resultProcessor); 542 543 } catch (Throwable e) { 544 // ensure exceptions is caught and set on the exchange 545 exchange.setException(e); 546 } finally { 547 // emit event that the exchange was sent to the endpoint 548 if (eventNotifierEnabled && watch != null) { 549 long timeTaken = watch.taken(); 550 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 551 } 552 } 553 return exchange; 554 } 555 }); 556 } 557 558 protected AsyncProcessor prepareProducer(Producer producer) { 559 return AsyncProcessorConverterHelper.convert(producer); 560 } 561 562 protected synchronized Producer doGetProducer(Endpoint endpoint, boolean pooled) { 563 String key = endpoint.getEndpointUri(); 564 Producer answer = producers.get(key); 565 if (pooled && answer == null) { 566 // try acquire from connection pool 567 answer = pool.acquire(endpoint); 568 } 569 570 if (answer == null) { 571 // create a new producer 572 try { 573 answer = endpoint.createProducer(); 574 // add as service to CamelContext so its managed via JMX 575 boolean add = answer.isSingleton() || answer instanceof ServicePoolAware; 576 if (add) { 577 // (false => we and handling the lifecycle of the producer in this cache) 578 getCamelContext().addService(answer, false); 579 } else { 580 // fallback and start producer manually 581 ServiceHelper.startService(answer); 582 } 583 } catch (Throwable e) { 584 throw new FailedToCreateProducerException(endpoint, e); 585 } 586 587 // add producer to cache or pool if applicable 588 if (pooled && answer instanceof ServicePoolAware) { 589 LOG.debug("Adding to producer service pool with key: {} for producer: {}", endpoint, answer); 590 answer = pool.addAndAcquire(endpoint, answer); 591 } else if (answer.isSingleton()) { 592 LOG.debug("Adding to producer cache with key: {} for producer: {}", endpoint, answer); 593 producers.put(key, answer); 594 } 595 } 596 597 if (answer != null) { 598 // record statistics 599 if (extendedStatistics) { 600 statistics.onHit(key); 601 } 602 } 603 604 return answer; 605 } 606 607 protected void doStart() throws Exception { 608 if (extendedStatistics) { 609 int max = maxCacheSize == 0 ? CamelContextHelper.getMaximumCachePoolSize(camelContext) : maxCacheSize; 610 statistics = new DefaultEndpointUtilizationStatistics(max); 611 } 612 613 ServiceHelper.startServices(producers.values()); 614 ServiceHelper.startServices(statistics, pool); 615 } 616 617 protected void doStop() throws Exception { 618 // when stopping we intend to shutdown 619 ServiceHelper.stopAndShutdownService(statistics); 620 if (stopServicePool) { 621 ServiceHelper.stopAndShutdownService(pool); 622 } 623 try { 624 ServiceHelper.stopAndShutdownServices(producers.values()); 625 } finally { 626 // ensure producers are removed, and also from JMX 627 for (Producer producer : producers.values()) { 628 getCamelContext().removeService(producer); 629 } 630 } 631 producers.clear(); 632 if (statistics != null) { 633 statistics.clear(); 634 } 635 } 636 637 /** 638 * Returns the current size of the cache 639 * 640 * @return the current size 641 */ 642 public int size() { 643 int size = producers.size(); 644 size += pool.size(); 645 646 LOG.trace("size = {}", size); 647 return size; 648 } 649 650 /** 651 * Gets the maximum cache size (capacity). 652 * <p/> 653 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 654 * 655 * @return the capacity 656 */ 657 public int getCapacity() { 658 int capacity = -1; 659 if (producers instanceof LRUCache) { 660 LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers; 661 capacity = cache.getMaxCacheSize(); 662 } 663 return capacity; 664 } 665 666 /** 667 * Gets the cache hits statistic 668 * <p/> 669 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 670 * 671 * @return the hits 672 */ 673 public long getHits() { 674 long hits = -1; 675 if (producers instanceof LRUCache) { 676 LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers; 677 hits = cache.getHits(); 678 } 679 return hits; 680 } 681 682 /** 683 * Gets the cache misses statistic 684 * <p/> 685 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 686 * 687 * @return the misses 688 */ 689 public long getMisses() { 690 long misses = -1; 691 if (producers instanceof LRUCache) { 692 LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers; 693 misses = cache.getMisses(); 694 } 695 return misses; 696 } 697 698 /** 699 * Gets the cache evicted statistic 700 * <p/> 701 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 702 * 703 * @return the evicted 704 */ 705 public long getEvicted() { 706 long evicted = -1; 707 if (producers instanceof LRUCache) { 708 LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers; 709 evicted = cache.getEvicted(); 710 } 711 return evicted; 712 } 713 714 /** 715 * Resets the cache statistics 716 */ 717 public void resetCacheStatistics() { 718 if (producers instanceof LRUCache) { 719 LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers; 720 cache.resetStatistics(); 721 } 722 if (statistics != null) { 723 statistics.clear(); 724 } 725 } 726 727 /** 728 * Purges this cache 729 */ 730 public synchronized void purge() { 731 producers.clear(); 732 pool.purge(); 733 if (statistics != null) { 734 statistics.clear(); 735 } 736 } 737 738 /** 739 * Cleanup the cache (purging stale entries) 740 */ 741 public void cleanUp() { 742 if (producers instanceof LRUCache) { 743 LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers; 744 cache.cleanUp(); 745 } 746 } 747 748 public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { 749 return statistics; 750 } 751 752 @Override 753 public String toString() { 754 return "ProducerCache for source: " + source + ", capacity: " + getCapacity(); 755 } 756 757}