001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.Map; 022import java.util.concurrent.CompletableFuture; 023 024import org.apache.camel.AsyncCallback; 025import org.apache.camel.AsyncProcessor; 026import org.apache.camel.AsyncProducerCallback; 027import org.apache.camel.CamelContext; 028import org.apache.camel.Endpoint; 029import org.apache.camel.Exchange; 030import org.apache.camel.ExchangePattern; 031import org.apache.camel.FailedToCreateProducerException; 032import org.apache.camel.Processor; 033import org.apache.camel.Producer; 034import org.apache.camel.ProducerCallback; 035import org.apache.camel.ServicePoolAware; 036import org.apache.camel.processor.CamelInternalProcessor; 037import org.apache.camel.processor.Pipeline; 038import org.apache.camel.spi.EndpointUtilizationStatistics; 039import org.apache.camel.spi.ServicePool; 040import org.apache.camel.support.ServiceSupport; 041import org.apache.camel.util.AsyncProcessorConverterHelper; 042import org.apache.camel.util.CamelContextHelper; 043import org.apache.camel.util.EventHelper; 044import org.apache.camel.util.LRUCache; 045import org.apache.camel.util.ServiceHelper; 046import org.apache.camel.util.StopWatch; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050/** 051 * Cache containing created {@link Producer}. 052 * 053 * @version 054 */ 055public class ProducerCache extends ServiceSupport { 056 private static final Logger LOG = LoggerFactory.getLogger(ProducerCache.class); 057 058 private final CamelContext camelContext; 059 private final ServicePool<Endpoint, Producer> pool; 060 private final Map<String, Producer> producers; 061 private final Object source; 062 063 private EndpointUtilizationStatistics statistics; 064 private boolean eventNotifierEnabled = true; 065 private boolean extendedStatistics; 066 private int maxCacheSize; 067 private boolean stopServicePool; 068 069 public ProducerCache(Object source, CamelContext camelContext) { 070 this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext)); 071 } 072 073 public ProducerCache(Object source, CamelContext camelContext, int cacheSize) { 074 this(source, camelContext, null, createLRUCache(cacheSize)); 075 } 076 077 public ProducerCache(Object source, CamelContext camelContext, Map<String, Producer> cache) { 078 this(source, camelContext, null, cache); 079 } 080 081 public ProducerCache(Object source, CamelContext camelContext, ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) { 082 this.source = source; 083 this.camelContext = camelContext; 084 if (producerServicePool == null) { 085 // use shared producer pool which lifecycle is managed by CamelContext 086 this.pool = camelContext.getProducerServicePool(); 087 this.stopServicePool = false; 088 } else { 089 this.pool = producerServicePool; 090 this.stopServicePool = true; 091 } 092 this.producers = cache; 093 if (producers instanceof LRUCache) { 094 maxCacheSize = ((LRUCache) producers).getMaxCacheSize(); 095 } 096 097 // only if JMX is enabled 098 if (camelContext.getManagementStrategy().getManagementAgent() != null) { 099 this.extendedStatistics = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isExtended(); 100 } else { 101 this.extendedStatistics = false; 102 } 103 } 104 105 public boolean isEventNotifierEnabled() { 106 return eventNotifierEnabled; 107 } 108 109 /** 110 * Whether {@link org.apache.camel.spi.EventNotifier} is enabled 111 */ 112 public void setEventNotifierEnabled(boolean eventNotifierEnabled) { 113 this.eventNotifierEnabled = eventNotifierEnabled; 114 } 115 116 public boolean isExtendedStatistics() { 117 return extendedStatistics; 118 } 119 120 /** 121 * Whether extended JMX statistics is enabled for {@link org.apache.camel.spi.EndpointUtilizationStatistics} 122 */ 123 public void setExtendedStatistics(boolean extendedStatistics) { 124 this.extendedStatistics = extendedStatistics; 125 } 126 127 /** 128 * Creates the {@link LRUCache} to be used. 129 * <p/> 130 * This implementation returns a {@link LRUCache} instance. 131 132 * @param cacheSize the cache size 133 * @return the cache 134 */ 135 protected static LRUCache<String, Producer> createLRUCache(int cacheSize) { 136 // Use a regular cache as we want to ensure that the lifecycle of the producers 137 // being cache is properly handled, such as they are stopped when being evicted 138 // or when this cache is stopped. This is needed as some producers requires to 139 // be stopped so they can shutdown internal resources that otherwise may cause leaks 140 return new LRUCache<String, Producer>(cacheSize); 141 } 142 143 public CamelContext getCamelContext() { 144 return camelContext; 145 } 146 147 /** 148 * Gets the source which uses this cache 149 * 150 * @return the source 151 */ 152 public Object getSource() { 153 return source; 154 } 155 156 /** 157 * Acquires a pooled producer which you <b>must</b> release back again after usage using the 158 * {@link #releaseProducer(org.apache.camel.Endpoint, org.apache.camel.Producer)} method. 159 * 160 * @param endpoint the endpoint 161 * @return the producer 162 */ 163 public Producer acquireProducer(Endpoint endpoint) { 164 return doGetProducer(endpoint, true); 165 } 166 167 /** 168 * Releases an acquired producer back after usage. 169 * 170 * @param endpoint the endpoint 171 * @param producer the producer to release 172 * @throws Exception can be thrown if error stopping producer if that was needed. 173 */ 174 public void releaseProducer(Endpoint endpoint, Producer producer) throws Exception { 175 if (producer instanceof ServicePoolAware) { 176 // release back to the pool 177 pool.release(endpoint, producer); 178 } else if (!producer.isSingleton()) { 179 // stop and shutdown non-singleton producers as we should not leak resources 180 ServiceHelper.stopAndShutdownService(producer); 181 } 182 } 183 184 /** 185 * Starts the {@link Producer} to be used for sending to the given endpoint 186 * <p/> 187 * This can be used to early start the {@link Producer} to ensure it can be created, 188 * such as when Camel is started. This allows to fail fast in case the {@link Producer} 189 * could not be started. 190 * 191 * @param endpoint the endpoint to send the exchange to 192 * @throws Exception is thrown if failed to create or start the {@link Producer} 193 */ 194 public void startProducer(Endpoint endpoint) throws Exception { 195 Producer producer = acquireProducer(endpoint); 196 releaseProducer(endpoint, producer); 197 } 198 199 /** 200 * Sends the exchange to the given endpoint. 201 * <p> 202 * This method will <b>not</b> throw an exception. If processing of the given 203 * Exchange failed then the exception is stored on the provided Exchange 204 * 205 * @param endpoint the endpoint to send the exchange to 206 * @param exchange the exchange to send 207 */ 208 public void send(Endpoint endpoint, Exchange exchange) { 209 sendExchange(endpoint, null, null, null, exchange); 210 } 211 212 /** 213 * Sends an exchange to an endpoint using a supplied 214 * {@link Processor} to populate the exchange 215 * <p> 216 * This method will <b>not</b> throw an exception. If processing of the given 217 * Exchange failed then the exception is stored on the return Exchange 218 * 219 * @param endpoint the endpoint to send the exchange to 220 * @param processor the transformer used to populate the new exchange 221 * @throws org.apache.camel.CamelExecutionException is thrown if sending failed 222 * @return the exchange 223 */ 224 public Exchange send(Endpoint endpoint, Processor processor) { 225 return sendExchange(endpoint, null, processor, null, null); 226 } 227 228 /** 229 * Sends an exchange to an endpoint using a supplied 230 * {@link Processor} to populate the exchange 231 * <p> 232 * This method will <b>not</b> throw an exception. If processing of the given 233 * Exchange failed then the exception is stored on the return Exchange 234 * 235 * @param endpoint the endpoint to send the exchange to 236 * @param pattern the message {@link ExchangePattern} such as 237 * {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} 238 * @param processor the transformer used to populate the new exchange 239 * @return the exchange 240 */ 241 public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) { 242 return sendExchange(endpoint, pattern, processor, null, null); 243 } 244 245 /** 246 * Sends an exchange to an endpoint using a supplied 247 * {@link Processor} to populate the exchange 248 * <p> 249 * This method will <b>not</b> throw an exception. If processing of the given 250 * Exchange failed then the exception is stored on the return Exchange 251 * 252 * @param endpoint the endpoint to send the exchange to 253 * @param pattern the message {@link ExchangePattern} such as 254 * {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} 255 * @param processor the transformer used to populate the new exchange 256 * @param resultProcessor a processor to process the exchange when the send is complete. 257 * @return the exchange 258 */ 259 public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor, Processor resultProcessor) { 260 return sendExchange(endpoint, pattern, processor, resultProcessor, null); 261 } 262 263 /** 264 * Asynchronously sends an exchange to an endpoint using a supplied 265 * {@link Processor} to populate the exchange 266 * <p> 267 * This method will <b>neither</b> throw an exception <b>nor</b> complete future exceptionally. 268 * If processing of the given Exchange failed then the exception is stored on the return Exchange 269 * 270 * @param endpoint the endpoint to send the exchange to 271 * @param pattern the message {@link ExchangePattern} such as 272 * {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} 273 * @param processor the transformer used to populate the new exchange 274 * @param resultProcessor a processor to process the exchange when the send is complete. 275 * @param future the preexisting future to complete when processing is done or null if to create new one 276 * @return future that completes with exchange when processing is done. Either passed into future parameter 277 * or new one if parameter was null 278 */ 279 public CompletableFuture<Exchange> asyncSend(Endpoint endpoint, ExchangePattern pattern, Processor processor, Processor resultProcessor, 280 CompletableFuture<Exchange> future) { 281 return asyncSendExchange(endpoint, pattern, processor, resultProcessor, null, future); 282 } 283 284 /** 285 * Asynchronously sends an exchange to an endpoint using a supplied 286 * {@link Processor} to populate the exchange 287 * <p> 288 * This method will <b>neither</b> throw an exception <b>nor</b> complete future exceptionally. 289 * If processing of the given Exchange failed then the exception is stored on the return Exchange 290 * 291 * @param endpoint the endpoint to send the exchange to 292 * @param pattern the message {@link ExchangePattern} such as 293 * {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} 294 * @param processor the transformer used to populate the new exchange 295 * @param resultProcessor a processor to process the exchange when the send is complete. 296 * @param exchange an exchange to use in processing. Exchange will be created if parameter is null. 297 * @param future the preexisting future to complete when processing is done or null if to create new one 298 * @return future that completes with exchange when processing is done. Either passed into future parameter 299 * or new one if parameter was null 300 */ 301 public CompletableFuture<Exchange> asyncSendExchange(final Endpoint endpoint, ExchangePattern pattern, 302 final Processor processor, final Processor resultProcessor, Exchange exchange, 303 CompletableFuture<Exchange> future) { 304 AsyncCallbackToCompletableFutureAdapter<Exchange> futureAdapter = new AsyncCallbackToCompletableFutureAdapter<>(future, exchange); 305 doInAsyncProducer(endpoint, exchange, pattern, futureAdapter, 306 (producer, asyncProducer, innerExchange, exchangePattern, producerCallback) -> { 307 if (innerExchange == null) { 308 innerExchange = pattern != null 309 ? producer.getEndpoint().createExchange(pattern) 310 : producer.getEndpoint().createExchange(); 311 futureAdapter.setResult(innerExchange); 312 } 313 314 if (processor != null) { 315 // lets populate using the processor callback 316 AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(processor); 317 try { 318 final Exchange finalExchange = innerExchange; 319 asyncProcessor.process(innerExchange, 320 doneSync -> asyncDispatchExchange(endpoint, producer, resultProcessor, 321 finalExchange, producerCallback)); 322 return false; 323 } catch (Exception e) { 324 // populate failed so return 325 innerExchange.setException(e); 326 producerCallback.done(true); 327 return true; 328 } 329 } 330 331 return asyncDispatchExchange(endpoint, producer, resultProcessor, innerExchange, producerCallback); 332 }); 333 return futureAdapter.getFuture(); 334 } 335 336 /** 337 * Sends an exchange to an endpoint using a supplied callback, using the synchronous processing. 338 * <p/> 339 * If an exception was thrown during processing, it would be set on the given Exchange 340 * 341 * @param endpoint the endpoint to send the exchange to 342 * @param exchange the exchange, can be <tt>null</tt> if so then create a new exchange from the producer 343 * @param pattern the exchange pattern, can be <tt>null</tt> 344 * @param callback the callback 345 * @return the response from the callback 346 * @see #doInAsyncProducer(org.apache.camel.Endpoint, org.apache.camel.Exchange, org.apache.camel.ExchangePattern, org.apache.camel.AsyncCallback, org.apache.camel.AsyncProducerCallback) 347 */ 348 public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, ProducerCallback<T> callback) { 349 T answer = null; 350 351 // get the producer and we do not mind if its pooled as we can handle returning it back to the pool 352 Producer producer = doGetProducer(endpoint, true); 353 354 if (producer == null) { 355 if (isStopped()) { 356 LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange); 357 return null; 358 } else { 359 throw new IllegalStateException("No producer, this processor has not been started: " + this); 360 } 361 } 362 363 try { 364 // invoke the callback 365 answer = callback.doInProducer(producer, exchange, pattern); 366 } catch (Throwable e) { 367 if (exchange != null) { 368 exchange.setException(e); 369 } 370 } finally { 371 if (producer instanceof ServicePoolAware) { 372 // release back to the pool 373 pool.release(endpoint, producer); 374 } else if (!producer.isSingleton()) { 375 // stop and shutdown non-singleton producers as we should not leak resources 376 try { 377 ServiceHelper.stopAndShutdownService(producer); 378 } catch (Exception e) { 379 // ignore and continue 380 LOG.warn("Error stopping/shutting down producer: " + producer, e); 381 } 382 } 383 } 384 385 return answer; 386 } 387 388 /** 389 * Sends an exchange to an endpoint using a supplied callback supporting the asynchronous routing engine. 390 * <p/> 391 * If an exception was thrown during processing, it would be set on the given Exchange 392 * 393 * @param endpoint the endpoint to send the exchange to 394 * @param exchange the exchange, can be <tt>null</tt> if so then create a new exchange from the producer 395 * @param pattern the exchange pattern, can be <tt>null</tt> 396 * @param callback the asynchronous callback 397 * @param producerCallback the producer template callback to be executed 398 * @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously 399 */ 400 public boolean doInAsyncProducer(final Endpoint endpoint, final Exchange exchange, final ExchangePattern pattern, 401 final AsyncCallback callback, final AsyncProducerCallback producerCallback) { 402 403 Producer target; 404 try { 405 // get the producer and we do not mind if its pooled as we can handle returning it back to the pool 406 target = doGetProducer(endpoint, true); 407 408 if (target == null) { 409 if (isStopped()) { 410 LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange); 411 callback.done(true); 412 return true; 413 } else { 414 exchange.setException(new IllegalStateException("No producer, this processor has not been started: " + this)); 415 callback.done(true); 416 return true; 417 } 418 } 419 } catch (Throwable e) { 420 exchange.setException(e); 421 callback.done(true); 422 return true; 423 } 424 425 final Producer producer = target; 426 427 // record timing for sending the exchange using the producer 428 final StopWatch watch = eventNotifierEnabled && exchange != null ? new StopWatch() : null; 429 430 try { 431 if (eventNotifierEnabled && exchange != null) { 432 EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); 433 } 434 // invoke the callback 435 AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(producer); 436 return producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, doneSync -> { 437 try { 438 if (eventNotifierEnabled && watch != null) { 439 long timeTaken = watch.stop(); 440 // emit event that the exchange was sent to the endpoint 441 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 442 } 443 444 if (producer instanceof ServicePoolAware) { 445 // release back to the pool 446 pool.release(endpoint, producer); 447 } else if (!producer.isSingleton()) { 448 // stop and shutdown non-singleton producers as we should not leak resources 449 try { 450 ServiceHelper.stopAndShutdownService(producer); 451 } catch (Exception e) { 452 // ignore and continue 453 LOG.warn("Error stopping/shutting down producer: " + producer, e); 454 } 455 } 456 } finally { 457 callback.done(doneSync); 458 } 459 }); 460 } catch (Throwable e) { 461 // ensure exceptions is caught and set on the exchange 462 if (exchange != null) { 463 exchange.setException(e); 464 } 465 callback.done(true); 466 return true; 467 } 468 } 469 470 protected boolean asyncDispatchExchange(final Endpoint endpoint, Producer producer, 471 final Processor resultProcessor, Exchange exchange, AsyncCallback callback) { 472 // now lets dispatch 473 LOG.debug(">>>> {} {}", endpoint, exchange); 474 475 // set property which endpoint we send to 476 exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); 477 478 // send the exchange using the processor 479 try { 480 if (eventNotifierEnabled) { 481 callback = new EventNotifierCallback(callback, exchange, endpoint); 482 } 483 CamelInternalProcessor internal = prepareInternalProcessor(producer, resultProcessor); 484 485 return internal.process(exchange, callback); 486 } catch (Throwable e) { 487 // ensure exceptions is caught and set on the exchange 488 exchange.setException(e); 489 callback.done(true); 490 return true; 491 } 492 493 } 494 495 protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern, 496 final Processor processor, final Processor resultProcessor, Exchange exchange) { 497 return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() { 498 public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) { 499 if (exchange == null) { 500 exchange = pattern != null ? producer.getEndpoint().createExchange(pattern) : producer.getEndpoint().createExchange(); 501 } 502 503 if (processor != null) { 504 // lets populate using the processor callback 505 try { 506 processor.process(exchange); 507 } catch (Exception e) { 508 // populate failed so return 509 exchange.setException(e); 510 return exchange; 511 } 512 } 513 514 // now lets dispatch 515 LOG.debug(">>>> {} {}", endpoint, exchange); 516 517 // set property which endpoint we send to 518 exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); 519 520 // send the exchange using the processor 521 StopWatch watch = null; 522 try { 523 if (eventNotifierEnabled) { 524 watch = new StopWatch(); 525 EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); 526 } 527 528 CamelInternalProcessor internal = prepareInternalProcessor(producer, resultProcessor); 529 internal.process(exchange); 530 } catch (Throwable e) { 531 // ensure exceptions is caught and set on the exchange 532 exchange.setException(e); 533 } finally { 534 // emit event that the exchange was sent to the endpoint 535 if (eventNotifierEnabled && watch != null) { 536 long timeTaken = watch.stop(); 537 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 538 } 539 } 540 return exchange; 541 } 542 }); 543 } 544 545 protected CamelInternalProcessor prepareInternalProcessor(Producer producer, Processor resultProcessor) { 546 // if we have a result processor then wrap in pipeline to execute both of them in sequence 547 Processor target; 548 if (resultProcessor != null) { 549 List<Processor> processors = new ArrayList<Processor>(2); 550 processors.add(producer); 551 processors.add(resultProcessor); 552 target = Pipeline.newInstance(getCamelContext(), processors); 553 } else { 554 target = producer; 555 } 556 557 // wrap in unit of work 558 CamelInternalProcessor internal = new CamelInternalProcessor(target); 559 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null)); 560 return internal; 561 } 562 563 protected synchronized Producer doGetProducer(Endpoint endpoint, boolean pooled) { 564 String key = endpoint.getEndpointUri(); 565 Producer answer = producers.get(key); 566 if (pooled && answer == null) { 567 // try acquire from connection pool 568 answer = pool.acquire(endpoint); 569 } 570 571 if (answer == null) { 572 // create a new producer 573 try { 574 answer = endpoint.createProducer(); 575 // add as service which will also start the service 576 // (false => we and handling the lifecycle of the producer in this cache) 577 getCamelContext().addService(answer, false); 578 } catch (Exception e) { 579 throw new FailedToCreateProducerException(endpoint, e); 580 } 581 582 // add producer to cache or pool if applicable 583 if (pooled && answer instanceof ServicePoolAware) { 584 LOG.debug("Adding to producer service pool with key: {} for producer: {}", endpoint, answer); 585 answer = pool.addAndAcquire(endpoint, answer); 586 } else if (answer.isSingleton()) { 587 LOG.debug("Adding to producer cache with key: {} for producer: {}", endpoint, answer); 588 producers.put(key, answer); 589 } 590 } 591 592 if (answer != null) { 593 // record statistics 594 if (extendedStatistics) { 595 statistics.onHit(key); 596 } 597 } 598 599 return answer; 600 } 601 602 protected void doStart() throws Exception { 603 if (extendedStatistics) { 604 int max = maxCacheSize == 0 ? CamelContextHelper.getMaximumCachePoolSize(camelContext) : maxCacheSize; 605 statistics = new DefaultEndpointUtilizationStatistics(max); 606 } 607 608 ServiceHelper.startServices(producers.values()); 609 ServiceHelper.startServices(statistics, pool); 610 } 611 612 protected void doStop() throws Exception { 613 // when stopping we intend to shutdown 614 ServiceHelper.stopAndShutdownService(statistics); 615 if (stopServicePool) { 616 ServiceHelper.stopAndShutdownService(pool); 617 } 618 try { 619 ServiceHelper.stopAndShutdownServices(producers.values()); 620 } finally { 621 // ensure producers are removed, and also from JMX 622 for (Producer producer : producers.values()) { 623 getCamelContext().removeService(producer); 624 } 625 } 626 producers.clear(); 627 if (statistics != null) { 628 statistics.clear(); 629 } 630 } 631 632 /** 633 * Returns the current size of the cache 634 * 635 * @return the current size 636 */ 637 public int size() { 638 int size = producers.size(); 639 size += pool.size(); 640 641 LOG.trace("size = {}", size); 642 return size; 643 } 644 645 /** 646 * Gets the maximum cache size (capacity). 647 * <p/> 648 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 649 * 650 * @return the capacity 651 */ 652 public int getCapacity() { 653 int capacity = -1; 654 if (producers instanceof LRUCache) { 655 LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers; 656 capacity = cache.getMaxCacheSize(); 657 } 658 return capacity; 659 } 660 661 /** 662 * Gets the cache hits statistic 663 * <p/> 664 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 665 * 666 * @return the hits 667 */ 668 public long getHits() { 669 long hits = -1; 670 if (producers instanceof LRUCache) { 671 LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers; 672 hits = cache.getHits(); 673 } 674 return hits; 675 } 676 677 /** 678 * Gets the cache misses statistic 679 * <p/> 680 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 681 * 682 * @return the misses 683 */ 684 public long getMisses() { 685 long misses = -1; 686 if (producers instanceof LRUCache) { 687 LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers; 688 misses = cache.getMisses(); 689 } 690 return misses; 691 } 692 693 /** 694 * Gets the cache evicted statistic 695 * <p/> 696 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 697 * 698 * @return the evicted 699 */ 700 public long getEvicted() { 701 long evicted = -1; 702 if (producers instanceof LRUCache) { 703 LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers; 704 evicted = cache.getEvicted(); 705 } 706 return evicted; 707 } 708 709 /** 710 * Resets the cache statistics 711 */ 712 public void resetCacheStatistics() { 713 if (producers instanceof LRUCache) { 714 LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers; 715 cache.resetStatistics(); 716 } 717 if (statistics != null) { 718 statistics.clear(); 719 } 720 } 721 722 /** 723 * Purges this cache 724 */ 725 public synchronized void purge() { 726 producers.clear(); 727 pool.purge(); 728 if (statistics != null) { 729 statistics.clear(); 730 } 731 } 732 733 /** 734 * Cleanup the cache (purging stale entries) 735 */ 736 public void cleanUp() { 737 if (producers instanceof LRUCache) { 738 LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers; 739 cache.cleanUp(); 740 } 741 } 742 743 public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { 744 return statistics; 745 } 746 747 @Override 748 public String toString() { 749 return "ProducerCache for source: " + source + ", capacity: " + getCapacity(); 750 } 751 752}