001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.Map;
022import java.util.concurrent.CompletableFuture;
023
024import org.apache.camel.AsyncCallback;
025import org.apache.camel.AsyncProcessor;
026import org.apache.camel.AsyncProducerCallback;
027import org.apache.camel.CamelContext;
028import org.apache.camel.Endpoint;
029import org.apache.camel.Exchange;
030import org.apache.camel.ExchangePattern;
031import org.apache.camel.FailedToCreateProducerException;
032import org.apache.camel.Processor;
033import org.apache.camel.Producer;
034import org.apache.camel.ProducerCallback;
035import org.apache.camel.ServicePoolAware;
036import org.apache.camel.processor.CamelInternalProcessor;
037import org.apache.camel.processor.Pipeline;
038import org.apache.camel.spi.EndpointUtilizationStatistics;
039import org.apache.camel.spi.ServicePool;
040import org.apache.camel.support.ServiceSupport;
041import org.apache.camel.util.AsyncProcessorConverterHelper;
042import org.apache.camel.util.CamelContextHelper;
043import org.apache.camel.util.EventHelper;
044import org.apache.camel.util.LRUCache;
045import org.apache.camel.util.ServiceHelper;
046import org.apache.camel.util.StopWatch;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050/**
051 * Cache containing created {@link Producer}.
052 *
053 * @version
054 */
055public class ProducerCache extends ServiceSupport {
056    private static final Logger LOG = LoggerFactory.getLogger(ProducerCache.class);
057
058    private final CamelContext camelContext;
059    private final ServicePool<Endpoint, Producer> pool;
060    private final Map<String, Producer> producers;
061    private final Object source;
062
063    private EndpointUtilizationStatistics statistics;
064    private boolean eventNotifierEnabled = true;
065    private boolean extendedStatistics;
066    private int maxCacheSize;
067    private boolean stopServicePool;
068
069    public ProducerCache(Object source, CamelContext camelContext) {
070        this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext));
071    }
072
073    public ProducerCache(Object source, CamelContext camelContext, int cacheSize) {
074        this(source, camelContext, null, createLRUCache(cacheSize));
075    }
076
077    public ProducerCache(Object source, CamelContext camelContext, Map<String, Producer> cache) {
078        this(source, camelContext, null, cache);
079    }
080
081    public ProducerCache(Object source, CamelContext camelContext, ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) {
082        this.source = source;
083        this.camelContext = camelContext;
084        if (producerServicePool == null) {
085            // use shared producer pool which lifecycle is managed by CamelContext
086            this.pool = camelContext.getProducerServicePool();
087            this.stopServicePool = false;
088        } else {
089            this.pool = producerServicePool;
090            this.stopServicePool = true;
091        }
092        this.producers = cache;
093        if (producers instanceof LRUCache) {
094            maxCacheSize = ((LRUCache) producers).getMaxCacheSize();
095        }
096
097        // only if JMX is enabled
098        if (camelContext.getManagementStrategy().getManagementAgent() != null) {
099            this.extendedStatistics = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isExtended();
100        } else {
101            this.extendedStatistics = false;
102        }
103    }
104
105    public boolean isEventNotifierEnabled() {
106        return eventNotifierEnabled;
107    }
108
109    /**
110     * Whether {@link org.apache.camel.spi.EventNotifier} is enabled
111     */
112    public void setEventNotifierEnabled(boolean eventNotifierEnabled) {
113        this.eventNotifierEnabled = eventNotifierEnabled;
114    }
115
116    public boolean isExtendedStatistics() {
117        return extendedStatistics;
118    }
119
120    /**
121     * Whether extended JMX statistics is enabled for {@link org.apache.camel.spi.EndpointUtilizationStatistics}
122     */
123    public void setExtendedStatistics(boolean extendedStatistics) {
124        this.extendedStatistics = extendedStatistics;
125    }
126
127    /**
128     * Creates the {@link LRUCache} to be used.
129     * <p/>
130     * This implementation returns a {@link LRUCache} instance.
131
132     * @param cacheSize the cache size
133     * @return the cache
134     */
135    protected static LRUCache<String, Producer> createLRUCache(int cacheSize) {
136        // Use a regular cache as we want to ensure that the lifecycle of the producers
137        // being cache is properly handled, such as they are stopped when being evicted
138        // or when this cache is stopped. This is needed as some producers requires to
139        // be stopped so they can shutdown internal resources that otherwise may cause leaks
140        return new LRUCache<String, Producer>(cacheSize);
141    }
142
143    public CamelContext getCamelContext() {
144        return camelContext;
145    }
146
147    /**
148     * Gets the source which uses this cache
149     *
150     * @return the source
151     */
152    public Object getSource() {
153        return source;
154    }
155
156    /**
157     * Acquires a pooled producer which you <b>must</b> release back again after usage using the
158     * {@link #releaseProducer(org.apache.camel.Endpoint, org.apache.camel.Producer)} method.
159     *
160     * @param endpoint the endpoint
161     * @return the producer
162     */
163    public Producer acquireProducer(Endpoint endpoint) {
164        return doGetProducer(endpoint, true);
165    }
166
167    /**
168     * Releases an acquired producer back after usage.
169     *
170     * @param endpoint the endpoint
171     * @param producer the producer to release
172     * @throws Exception can be thrown if error stopping producer if that was needed.
173     */
174    public void releaseProducer(Endpoint endpoint, Producer producer) throws Exception {
175        if (producer instanceof ServicePoolAware) {
176            // release back to the pool
177            pool.release(endpoint, producer);
178        } else if (!producer.isSingleton()) {
179            // stop and shutdown non-singleton producers as we should not leak resources
180            ServiceHelper.stopAndShutdownService(producer);
181        }
182    }
183
184    /**
185     * Starts the {@link Producer} to be used for sending to the given endpoint
186     * <p/>
187     * This can be used to early start the {@link Producer} to ensure it can be created,
188     * such as when Camel is started. This allows to fail fast in case the {@link Producer}
189     * could not be started.
190     *
191     * @param endpoint the endpoint to send the exchange to
192     * @throws Exception is thrown if failed to create or start the {@link Producer}
193     */
194    public void startProducer(Endpoint endpoint) throws Exception {
195        Producer producer = acquireProducer(endpoint);
196        releaseProducer(endpoint, producer);
197    }
198
199    /**
200     * Sends the exchange to the given endpoint.
201     * <p>
202     * This method will <b>not</b> throw an exception. If processing of the given
203     * Exchange failed then the exception is stored on the provided Exchange
204     *
205     * @param endpoint the endpoint to send the exchange to
206     * @param exchange the exchange to send
207     */
208    public void send(Endpoint endpoint, Exchange exchange) {
209        sendExchange(endpoint, null, null, null, exchange);
210    }
211
212    /**
213     * Sends an exchange to an endpoint using a supplied
214     * {@link Processor} to populate the exchange
215     * <p>
216     * This method will <b>not</b> throw an exception. If processing of the given
217     * Exchange failed then the exception is stored on the return Exchange
218     *
219     * @param endpoint the endpoint to send the exchange to
220     * @param processor the transformer used to populate the new exchange
221     * @throws org.apache.camel.CamelExecutionException is thrown if sending failed
222     * @return the exchange
223     */
224    public Exchange send(Endpoint endpoint, Processor processor) {
225        return sendExchange(endpoint, null, processor, null, null);
226    }
227
228    /**
229     * Sends an exchange to an endpoint using a supplied
230     * {@link Processor} to populate the exchange
231     * <p>
232     * This method will <b>not</b> throw an exception. If processing of the given
233     * Exchange failed then the exception is stored on the return Exchange
234     *
235     * @param endpoint the endpoint to send the exchange to
236     * @param pattern the message {@link ExchangePattern} such as
237     *   {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
238     * @param processor the transformer used to populate the new exchange
239     * @return the exchange
240     */
241    public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
242        return sendExchange(endpoint, pattern, processor, null, null);
243    }
244
245    /**
246     * Sends an exchange to an endpoint using a supplied
247     * {@link Processor} to populate the exchange
248     * <p>
249     * This method will <b>not</b> throw an exception. If processing of the given
250     * Exchange failed then the exception is stored on the return Exchange
251     *
252     * @param endpoint the endpoint to send the exchange to
253     * @param pattern the message {@link ExchangePattern} such as
254     *   {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
255     * @param processor the transformer used to populate the new exchange
256     * @param resultProcessor a processor to process the exchange when the send is complete.
257     * @return the exchange
258     */
259    public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor, Processor resultProcessor) {
260        return sendExchange(endpoint, pattern, processor, resultProcessor, null);
261    }
262
263    /**
264     * Asynchronously sends an exchange to an endpoint using a supplied
265     * {@link Processor} to populate the exchange
266     * <p>
267     * This method will <b>neither</b> throw an exception <b>nor</b> complete future exceptionally.
268     * If processing of the given Exchange failed then the exception is stored on the return Exchange
269     *
270     * @param endpoint        the endpoint to send the exchange to
271     * @param pattern         the message {@link ExchangePattern} such as
272     *                        {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
273     * @param processor       the transformer used to populate the new exchange
274     * @param resultProcessor a processor to process the exchange when the send is complete.
275     * @param future          the preexisting future to complete when processing is done or null if to create new one
276     * @return future that completes with exchange when processing is done. Either passed into future parameter
277     *              or new one if parameter was null
278     */
279    public CompletableFuture<Exchange> asyncSend(Endpoint endpoint, ExchangePattern pattern, Processor processor, Processor resultProcessor,
280                                                 CompletableFuture<Exchange> future) {
281        return asyncSendExchange(endpoint, pattern, processor, resultProcessor, null, future);
282    }
283
284    /**
285     * Asynchronously sends an exchange to an endpoint using a supplied
286     * {@link Processor} to populate the exchange
287     * <p>
288     * This method will <b>neither</b> throw an exception <b>nor</b> complete future exceptionally.
289     * If processing of the given Exchange failed then the exception is stored on the return Exchange
290     *
291     * @param endpoint        the endpoint to send the exchange to
292     * @param pattern         the message {@link ExchangePattern} such as
293     *                        {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
294     * @param processor       the transformer used to populate the new exchange
295     * @param resultProcessor a processor to process the exchange when the send is complete.
296     * @param exchange        an exchange to use in processing. Exchange will be created if parameter is null.
297     * @param future          the preexisting future to complete when processing is done or null if to create new one
298     * @return future that completes with exchange when processing is done. Either passed into future parameter
299     *              or new one if parameter was null
300     */
301    public CompletableFuture<Exchange> asyncSendExchange(final Endpoint endpoint, ExchangePattern pattern,
302                                                         final Processor processor, final Processor resultProcessor, Exchange exchange,
303                                                         CompletableFuture<Exchange> future) {
304        AsyncCallbackToCompletableFutureAdapter<Exchange> futureAdapter = new AsyncCallbackToCompletableFutureAdapter<>(future, exchange);
305        doInAsyncProducer(endpoint, exchange, pattern, futureAdapter,
306            (producer, asyncProducer, innerExchange, exchangePattern, producerCallback) -> {
307                if (innerExchange == null) {
308                    innerExchange = pattern != null
309                            ? producer.getEndpoint().createExchange(pattern)
310                            : producer.getEndpoint().createExchange();
311                    futureAdapter.setResult(innerExchange);
312                }
313
314                if (processor != null) {
315                    // lets populate using the processor callback
316                    AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(processor);
317                    try {
318                        final Exchange finalExchange = innerExchange;
319                        asyncProcessor.process(innerExchange,
320                            doneSync -> asyncDispatchExchange(endpoint, producer, resultProcessor,
321                                    finalExchange, producerCallback));
322                        return false;
323                    } catch (Exception e) {
324                        // populate failed so return
325                        innerExchange.setException(e);
326                        producerCallback.done(true);
327                        return true;
328                    }
329                }
330
331                return asyncDispatchExchange(endpoint, producer, resultProcessor, innerExchange, producerCallback);
332            });
333        return futureAdapter.getFuture();
334    }
335
336    /**
337     * Sends an exchange to an endpoint using a supplied callback, using the synchronous processing.
338     * <p/>
339     * If an exception was thrown during processing, it would be set on the given Exchange
340     *
341     * @param endpoint  the endpoint to send the exchange to
342     * @param exchange  the exchange, can be <tt>null</tt> if so then create a new exchange from the producer
343     * @param pattern   the exchange pattern, can be <tt>null</tt>
344     * @param callback  the callback
345     * @return the response from the callback
346     * @see #doInAsyncProducer(org.apache.camel.Endpoint, org.apache.camel.Exchange, org.apache.camel.ExchangePattern, org.apache.camel.AsyncCallback, org.apache.camel.AsyncProducerCallback)
347     */
348    public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, ProducerCallback<T> callback) {
349        T answer = null;
350
351        // get the producer and we do not mind if its pooled as we can handle returning it back to the pool
352        Producer producer = doGetProducer(endpoint, true);
353
354        if (producer == null) {
355            if (isStopped()) {
356                LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
357                return null;
358            } else {
359                throw new IllegalStateException("No producer, this processor has not been started: " + this);
360            }
361        }
362
363        try {
364            // invoke the callback
365            answer = callback.doInProducer(producer, exchange, pattern);
366        } catch (Throwable e) {
367            if (exchange != null) {
368                exchange.setException(e);
369            }
370        } finally {
371            if (producer instanceof ServicePoolAware) {
372                // release back to the pool
373                pool.release(endpoint, producer);
374            } else if (!producer.isSingleton()) {
375                // stop and shutdown non-singleton producers as we should not leak resources
376                try {
377                    ServiceHelper.stopAndShutdownService(producer);
378                } catch (Exception e) {
379                    // ignore and continue
380                    LOG.warn("Error stopping/shutting down producer: " + producer, e);
381                }
382            }
383        }
384
385        return answer;
386    }
387
388    /**
389     * Sends an exchange to an endpoint using a supplied callback supporting the asynchronous routing engine.
390     * <p/>
391     * If an exception was thrown during processing, it would be set on the given Exchange
392     *
393     * @param endpoint         the endpoint to send the exchange to
394     * @param exchange         the exchange, can be <tt>null</tt> if so then create a new exchange from the producer
395     * @param pattern          the exchange pattern, can be <tt>null</tt>
396     * @param callback         the asynchronous callback
397     * @param producerCallback the producer template callback to be executed
398     * @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously
399     */
400    public boolean doInAsyncProducer(final Endpoint endpoint, final Exchange exchange, final ExchangePattern pattern,
401                                     final AsyncCallback callback, final AsyncProducerCallback producerCallback) {
402
403        Producer target;
404        try {
405            // get the producer and we do not mind if its pooled as we can handle returning it back to the pool
406            target = doGetProducer(endpoint, true);
407
408            if (target == null) {
409                if (isStopped()) {
410                    LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
411                    callback.done(true);
412                    return true;
413                } else {
414                    exchange.setException(new IllegalStateException("No producer, this processor has not been started: " + this));
415                    callback.done(true);
416                    return true;
417                }
418            }
419        } catch (Throwable e) {
420            exchange.setException(e);
421            callback.done(true);
422            return true;
423        }
424
425        final Producer producer = target;
426
427        // record timing for sending the exchange using the producer
428        final StopWatch watch = eventNotifierEnabled && exchange != null ? new StopWatch() : null;
429
430        try {
431            if (eventNotifierEnabled && exchange != null) {
432                EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
433            }
434            // invoke the callback
435            AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(producer);
436            return producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, doneSync -> {
437                try {
438                    if (eventNotifierEnabled && watch != null) {
439                        long timeTaken = watch.stop();
440                        // emit event that the exchange was sent to the endpoint
441                        EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
442                    }
443
444                    if (producer instanceof ServicePoolAware) {
445                        // release back to the pool
446                        pool.release(endpoint, producer);
447                    } else if (!producer.isSingleton()) {
448                        // stop and shutdown non-singleton producers as we should not leak resources
449                        try {
450                            ServiceHelper.stopAndShutdownService(producer);
451                        } catch (Exception e) {
452                            // ignore and continue
453                            LOG.warn("Error stopping/shutting down producer: " + producer, e);
454                        }
455                    }
456                } finally {
457                    callback.done(doneSync);
458                }
459            });
460        } catch (Throwable e) {
461            // ensure exceptions is caught and set on the exchange
462            if (exchange != null) {
463                exchange.setException(e);
464            }
465            callback.done(true);
466            return true;
467        }
468    }
469
470    protected boolean asyncDispatchExchange(final Endpoint endpoint, Producer producer,
471                                            final Processor resultProcessor, Exchange exchange, AsyncCallback callback) {
472        // now lets dispatch
473        LOG.debug(">>>> {} {}", endpoint, exchange);
474
475        // set property which endpoint we send to
476        exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
477
478        // send the exchange using the processor
479        try {
480            if (eventNotifierEnabled) {
481                callback = new EventNotifierCallback(callback, exchange, endpoint);
482            }
483            CamelInternalProcessor internal = prepareInternalProcessor(producer, resultProcessor);
484
485            return internal.process(exchange, callback);
486        } catch (Throwable e) {
487            // ensure exceptions is caught and set on the exchange
488            exchange.setException(e);
489            callback.done(true);
490            return true;
491        }
492
493    }
494
495    protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern,
496                                    final Processor processor, final Processor resultProcessor, Exchange exchange) {
497        return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() {
498            public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) {
499                if (exchange == null) {
500                    exchange = pattern != null ? producer.getEndpoint().createExchange(pattern) : producer.getEndpoint().createExchange();
501                }
502
503                if (processor != null) {
504                    // lets populate using the processor callback
505                    try {
506                        processor.process(exchange);
507                    } catch (Exception e) {
508                        // populate failed so return
509                        exchange.setException(e);
510                        return exchange;
511                    }
512                }
513
514                // now lets dispatch
515                LOG.debug(">>>> {} {}", endpoint, exchange);
516
517                // set property which endpoint we send to
518                exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
519
520                // send the exchange using the processor
521                StopWatch watch = null;
522                try {
523                    if (eventNotifierEnabled) {
524                        watch = new StopWatch();
525                        EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
526                    }
527
528                    CamelInternalProcessor internal = prepareInternalProcessor(producer, resultProcessor);
529                    internal.process(exchange);
530                } catch (Throwable e) {
531                    // ensure exceptions is caught and set on the exchange
532                    exchange.setException(e);
533                } finally {
534                    // emit event that the exchange was sent to the endpoint
535                    if (eventNotifierEnabled && watch != null) {
536                        long timeTaken = watch.stop();
537                        EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
538                    }
539                }
540                return exchange;
541            }
542        });
543    }
544
545    protected CamelInternalProcessor prepareInternalProcessor(Producer producer, Processor resultProcessor) {
546        // if we have a result processor then wrap in pipeline to execute both of them in sequence
547        Processor target;
548        if (resultProcessor != null) {
549            List<Processor> processors = new ArrayList<Processor>(2);
550            processors.add(producer);
551            processors.add(resultProcessor);
552            target = Pipeline.newInstance(getCamelContext(), processors);
553        } else {
554            target = producer;
555        }
556
557        // wrap in unit of work
558        CamelInternalProcessor internal = new CamelInternalProcessor(target);
559        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
560        return internal;
561    }
562
563    protected synchronized Producer doGetProducer(Endpoint endpoint, boolean pooled) {
564        String key = endpoint.getEndpointUri();
565        Producer answer = producers.get(key);
566        if (pooled && answer == null) {
567            // try acquire from connection pool
568            answer = pool.acquire(endpoint);
569        }
570
571        if (answer == null) {
572            // create a new producer
573            try {
574                answer = endpoint.createProducer();
575                // add as service which will also start the service
576                // (false => we and handling the lifecycle of the producer in this cache)
577                getCamelContext().addService(answer, false);
578            } catch (Exception e) {
579                throw new FailedToCreateProducerException(endpoint, e);
580            }
581
582            // add producer to cache or pool if applicable
583            if (pooled && answer instanceof ServicePoolAware) {
584                LOG.debug("Adding to producer service pool with key: {} for producer: {}", endpoint, answer);
585                answer = pool.addAndAcquire(endpoint, answer);
586            } else if (answer.isSingleton()) {
587                LOG.debug("Adding to producer cache with key: {} for producer: {}", endpoint, answer);
588                producers.put(key, answer);
589            }
590        }
591
592        if (answer != null) {
593            // record statistics
594            if (extendedStatistics) {
595                statistics.onHit(key);
596            }
597        }
598
599        return answer;
600    }
601
602    protected void doStart() throws Exception {
603        if (extendedStatistics) {
604            int max = maxCacheSize == 0 ? CamelContextHelper.getMaximumCachePoolSize(camelContext) : maxCacheSize;
605            statistics = new DefaultEndpointUtilizationStatistics(max);
606        }
607
608        ServiceHelper.startServices(producers.values());
609        ServiceHelper.startServices(statistics, pool);
610    }
611
612    protected void doStop() throws Exception {
613        // when stopping we intend to shutdown
614        ServiceHelper.stopAndShutdownService(statistics);
615        if (stopServicePool) {
616            ServiceHelper.stopAndShutdownService(pool);
617        }
618        try {
619            ServiceHelper.stopAndShutdownServices(producers.values());
620        } finally {
621            // ensure producers are removed, and also from JMX
622            for (Producer producer : producers.values()) {
623                getCamelContext().removeService(producer);
624            }
625        }
626        producers.clear();
627        if (statistics != null) {
628            statistics.clear();
629        }
630    }
631
632    /**
633     * Returns the current size of the cache
634     *
635     * @return the current size
636     */
637    public int size() {
638        int size = producers.size();
639        size += pool.size();
640
641        LOG.trace("size = {}", size);
642        return size;
643    }
644
645    /**
646     * Gets the maximum cache size (capacity).
647     * <p/>
648     * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
649     *
650     * @return the capacity
651     */
652    public int getCapacity() {
653        int capacity = -1;
654        if (producers instanceof LRUCache) {
655            LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers;
656            capacity = cache.getMaxCacheSize();
657        }
658        return capacity;
659    }
660
661    /**
662     * Gets the cache hits statistic
663     * <p/>
664     * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
665     *
666     * @return the hits
667     */
668    public long getHits() {
669        long hits = -1;
670        if (producers instanceof LRUCache) {
671            LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers;
672            hits = cache.getHits();
673        }
674        return hits;
675    }
676
677    /**
678     * Gets the cache misses statistic
679     * <p/>
680     * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
681     *
682     * @return the misses
683     */
684    public long getMisses() {
685        long misses = -1;
686        if (producers instanceof LRUCache) {
687            LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers;
688            misses = cache.getMisses();
689        }
690        return misses;
691    }
692
693    /**
694     * Gets the cache evicted statistic
695     * <p/>
696     * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
697     *
698     * @return the evicted
699     */
700    public long getEvicted() {
701        long evicted = -1;
702        if (producers instanceof LRUCache) {
703            LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers;
704            evicted = cache.getEvicted();
705        }
706        return evicted;
707    }
708
709    /**
710     * Resets the cache statistics
711     */
712    public void resetCacheStatistics() {
713        if (producers instanceof LRUCache) {
714            LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers;
715            cache.resetStatistics();
716        }
717        if (statistics != null) {
718            statistics.clear();
719        }
720    }
721
722    /**
723     * Purges this cache
724     */
725    public synchronized void purge() {
726        producers.clear();
727        pool.purge();
728        if (statistics != null) {
729            statistics.clear();
730        }
731    }
732
733    /**
734     * Cleanup the cache (purging stale entries)
735     */
736    public void cleanUp() {
737        if (producers instanceof LRUCache) {
738            LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers;
739            cache.cleanUp();
740        }
741    }
742
743    public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
744        return statistics;
745    }
746
747    @Override
748    public String toString() {
749        return "ProducerCache for source: " + source + ", capacity: " + getCapacity();
750    }
751
752}