001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.Map;
020import java.util.concurrent.CompletableFuture;
021
022import org.apache.camel.AsyncCallback;
023import org.apache.camel.AsyncProcessor;
024import org.apache.camel.AsyncProducerCallback;
025import org.apache.camel.CamelContext;
026import org.apache.camel.Endpoint;
027import org.apache.camel.Exchange;
028import org.apache.camel.ExchangePattern;
029import org.apache.camel.FailedToCreateProducerException;
030import org.apache.camel.Processor;
031import org.apache.camel.Producer;
032import org.apache.camel.ProducerCallback;
033import org.apache.camel.ServicePoolAware;
034import org.apache.camel.processor.CamelInternalProcessor;
035import org.apache.camel.processor.SharedCamelInternalProcessor;
036import org.apache.camel.spi.EndpointUtilizationStatistics;
037import org.apache.camel.spi.ServicePool;
038import org.apache.camel.support.ServiceSupport;
039import org.apache.camel.util.AsyncProcessorConverterHelper;
040import org.apache.camel.util.CamelContextHelper;
041import org.apache.camel.util.EventHelper;
042import org.apache.camel.util.LRUCache;
043import org.apache.camel.util.LRUCacheFactory;
044import org.apache.camel.util.ServiceHelper;
045import org.apache.camel.util.StopWatch;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Cache containing created {@link Producer}.
051 *
052 * @version
053 */
054public class ProducerCache extends ServiceSupport {
055    private static final Logger LOG = LoggerFactory.getLogger(ProducerCache.class);
056
057    private final CamelContext camelContext;
058    private final ServicePool<Endpoint, Producer> pool;
059    private final Map<String, Producer> producers;
060    private final Object source;
061    private final SharedCamelInternalProcessor internalProcessor;
062
063    private EndpointUtilizationStatistics statistics;
064    private boolean eventNotifierEnabled = true;
065    private boolean extendedStatistics;
066    private int maxCacheSize;
067    private boolean stopServicePool;
068
069    public ProducerCache(Object source, CamelContext camelContext) {
070        this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext));
071    }
072
073    public ProducerCache(Object source, CamelContext camelContext, int cacheSize) {
074        this(source, camelContext, null, createLRUCache(cacheSize));
075    }
076
077    public ProducerCache(Object source, CamelContext camelContext, Map<String, Producer> cache) {
078        this(source, camelContext, null, cache);
079    }
080
081    public ProducerCache(Object source, CamelContext camelContext, ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) {
082        this.source = source;
083        this.camelContext = camelContext;
084        if (producerServicePool == null) {
085            // use shared producer pool which lifecycle is managed by CamelContext
086            this.pool = camelContext.getProducerServicePool();
087            this.stopServicePool = false;
088        } else {
089            this.pool = producerServicePool;
090            this.stopServicePool = true;
091        }
092        this.producers = cache;
093        if (producers instanceof LRUCache) {
094            maxCacheSize = ((LRUCache) producers).getMaxCacheSize();
095        }
096
097        // only if JMX is enabled
098        if (camelContext.getManagementStrategy().getManagementAgent() != null) {
099            this.extendedStatistics = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isExtended();
100        } else {
101            this.extendedStatistics = false;
102        }
103
104        // internal processor used for sending
105        internalProcessor = new SharedCamelInternalProcessor(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
106    }
107
108    public boolean isEventNotifierEnabled() {
109        return eventNotifierEnabled;
110    }
111
112    /**
113     * Whether {@link org.apache.camel.spi.EventNotifier} is enabled
114     */
115    public void setEventNotifierEnabled(boolean eventNotifierEnabled) {
116        this.eventNotifierEnabled = eventNotifierEnabled;
117    }
118
119    public boolean isExtendedStatistics() {
120        return extendedStatistics;
121    }
122
123    /**
124     * Whether extended JMX statistics is enabled for {@link org.apache.camel.spi.EndpointUtilizationStatistics}
125     */
126    public void setExtendedStatistics(boolean extendedStatistics) {
127        this.extendedStatistics = extendedStatistics;
128    }
129
130    /**
131     * Creates the {@link LRUCache} to be used.
132     * <p/>
133     * This implementation returns a {@link LRUCache} instance.
134
135     * @param cacheSize the cache size
136     * @return the cache
137     */
138    @SuppressWarnings("unchecked")
139    protected static Map<String, Producer> createLRUCache(int cacheSize) {
140        // Use a regular cache as we want to ensure that the lifecycle of the producers
141        // being cache is properly handled, such as they are stopped when being evicted
142        // or when this cache is stopped. This is needed as some producers requires to
143        // be stopped so they can shutdown internal resources that otherwise may cause leaks
144        return LRUCacheFactory.newLRUCache(cacheSize);
145    }
146
147    public CamelContext getCamelContext() {
148        return camelContext;
149    }
150
151    /**
152     * Gets the source which uses this cache
153     *
154     * @return the source
155     */
156    public Object getSource() {
157        return source;
158    }
159
160    /**
161     * Acquires a pooled producer which you <b>must</b> release back again after usage using the
162     * {@link #releaseProducer(org.apache.camel.Endpoint, org.apache.camel.Producer)} method.
163     *
164     * @param endpoint the endpoint
165     * @return the producer
166     */
167    public Producer acquireProducer(Endpoint endpoint) {
168        return doGetProducer(endpoint, true);
169    }
170
171    /**
172     * Releases an acquired producer back after usage.
173     *
174     * @param endpoint the endpoint
175     * @param producer the producer to release
176     * @throws Exception can be thrown if error stopping producer if that was needed.
177     */
178    public void releaseProducer(Endpoint endpoint, Producer producer) throws Exception {
179        if (producer instanceof ServicePoolAware) {
180            // release back to the pool
181            pool.release(endpoint, producer);
182        } else if (!producer.isSingleton()) {
183            // stop and shutdown non-singleton producers as we should not leak resources
184            ServiceHelper.stopAndShutdownService(producer);
185        }
186    }
187
188    /**
189     * Starts the {@link Producer} to be used for sending to the given endpoint
190     * <p/>
191     * This can be used to early start the {@link Producer} to ensure it can be created,
192     * such as when Camel is started. This allows to fail fast in case the {@link Producer}
193     * could not be started.
194     *
195     * @param endpoint the endpoint to send the exchange to
196     * @throws Exception is thrown if failed to create or start the {@link Producer}
197     */
198    public void startProducer(Endpoint endpoint) throws Exception {
199        Producer producer = acquireProducer(endpoint);
200        releaseProducer(endpoint, producer);
201    }
202
203    /**
204     * Sends the exchange to the given endpoint.
205     * <p>
206     * This method will <b>not</b> throw an exception. If processing of the given
207     * Exchange failed then the exception is stored on the provided Exchange
208     *
209     * @param endpoint the endpoint to send the exchange to
210     * @param exchange the exchange to send
211     */
212    public void send(Endpoint endpoint, Exchange exchange) {
213        sendExchange(endpoint, null, null, null, exchange);
214    }
215
216    /**
217     * Sends an exchange to an endpoint using a supplied
218     * {@link Processor} to populate the exchange
219     * <p>
220     * This method will <b>not</b> throw an exception. If processing of the given
221     * Exchange failed then the exception is stored on the return Exchange
222     *
223     * @param endpoint the endpoint to send the exchange to
224     * @param processor the transformer used to populate the new exchange
225     * @throws org.apache.camel.CamelExecutionException is thrown if sending failed
226     * @return the exchange
227     */
228    public Exchange send(Endpoint endpoint, Processor processor) {
229        return sendExchange(endpoint, null, processor, null, null);
230    }
231
232    /**
233     * Sends an exchange to an endpoint using a supplied
234     * {@link Processor} to populate the exchange
235     * <p>
236     * This method will <b>not</b> throw an exception. If processing of the given
237     * Exchange failed then the exception is stored on the return Exchange
238     *
239     * @param endpoint the endpoint to send the exchange to
240     * @param pattern the message {@link ExchangePattern} such as
241     *   {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
242     * @param processor the transformer used to populate the new exchange
243     * @return the exchange
244     */
245    public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
246        return sendExchange(endpoint, pattern, processor, null, null);
247    }
248
249    /**
250     * Sends an exchange to an endpoint using a supplied
251     * {@link Processor} to populate the exchange
252     * <p>
253     * This method will <b>not</b> throw an exception. If processing of the given
254     * Exchange failed then the exception is stored on the return Exchange
255     *
256     * @param endpoint the endpoint to send the exchange to
257     * @param pattern the message {@link ExchangePattern} such as
258     *   {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
259     * @param processor the transformer used to populate the new exchange
260     * @param resultProcessor a processor to process the exchange when the send is complete.
261     * @return the exchange
262     */
263    public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor, Processor resultProcessor) {
264        return sendExchange(endpoint, pattern, processor, resultProcessor, null);
265    }
266
267    /**
268     * Asynchronously sends an exchange to an endpoint using a supplied
269     * {@link Processor} to populate the exchange
270     * <p>
271     * This method will <b>neither</b> throw an exception <b>nor</b> complete future exceptionally.
272     * If processing of the given Exchange failed then the exception is stored on the return Exchange
273     *
274     * @param endpoint        the endpoint to send the exchange to
275     * @param pattern         the message {@link ExchangePattern} such as
276     *                        {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
277     * @param processor       the transformer used to populate the new exchange
278     * @param resultProcessor a processor to process the exchange when the send is complete.
279     * @param future          the preexisting future to complete when processing is done or null if to create new one
280     * @return future that completes with exchange when processing is done. Either passed into future parameter
281     *              or new one if parameter was null
282     */
283    public CompletableFuture<Exchange> asyncSend(Endpoint endpoint, ExchangePattern pattern, Processor processor, Processor resultProcessor,
284                                                 CompletableFuture<Exchange> future) {
285        return asyncSendExchange(endpoint, pattern, processor, resultProcessor, null, future);
286    }
287
288    /**
289     * Asynchronously sends an exchange to an endpoint using a supplied
290     * {@link Processor} to populate the exchange
291     * <p>
292     * This method will <b>neither</b> throw an exception <b>nor</b> complete future exceptionally.
293     * If processing of the given Exchange failed then the exception is stored on the return Exchange
294     *
295     * @param endpoint        the endpoint to send the exchange to
296     * @param pattern         the message {@link ExchangePattern} such as
297     *                        {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
298     * @param processor       the transformer used to populate the new exchange
299     * @param resultProcessor a processor to process the exchange when the send is complete.
300     * @param exchange        an exchange to use in processing. Exchange will be created if parameter is null.
301     * @param future          the preexisting future to complete when processing is done or null if to create new one
302     * @return future that completes with exchange when processing is done. Either passed into future parameter
303     *              or new one if parameter was null
304     */
305    public CompletableFuture<Exchange> asyncSendExchange(final Endpoint endpoint, ExchangePattern pattern,
306                                                         final Processor processor, final Processor resultProcessor, Exchange exchange,
307                                                         CompletableFuture<Exchange> future) {
308        AsyncCallbackToCompletableFutureAdapter<Exchange> futureAdapter = new AsyncCallbackToCompletableFutureAdapter<>(future, exchange);
309        doInAsyncProducer(endpoint, exchange, pattern, futureAdapter,
310            (producer, asyncProducer, innerExchange, exchangePattern, producerCallback) -> {
311                if (innerExchange == null) {
312                    innerExchange = pattern != null
313                            ? producer.getEndpoint().createExchange(pattern)
314                            : producer.getEndpoint().createExchange();
315                    futureAdapter.setResult(innerExchange);
316                }
317
318                if (processor != null) {
319                    // lets populate using the processor callback
320                    AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(processor);
321                    try {
322                        final Exchange finalExchange = innerExchange;
323                        asyncProcessor.process(innerExchange,
324                            doneSync -> asyncDispatchExchange(endpoint, producer, resultProcessor,
325                                    finalExchange, producerCallback));
326                        return false;
327                    } catch (Throwable e) {
328                        // populate failed so return
329                        innerExchange.setException(e);
330                        producerCallback.done(true);
331                        return true;
332                    }
333                }
334
335                return asyncDispatchExchange(endpoint, producer, resultProcessor, innerExchange, producerCallback);
336            });
337        return futureAdapter.getFuture();
338    }
339
340    /**
341     * Sends an exchange to an endpoint using a supplied callback, using the synchronous processing.
342     * <p/>
343     * If an exception was thrown during processing, it would be set on the given Exchange
344     *
345     * @param endpoint  the endpoint to send the exchange to
346     * @param exchange  the exchange, can be <tt>null</tt> if so then create a new exchange from the producer
347     * @param pattern   the exchange pattern, can be <tt>null</tt>
348     * @param callback  the callback
349     * @return the response from the callback
350     * @see #doInAsyncProducer(org.apache.camel.Endpoint, org.apache.camel.Exchange, org.apache.camel.ExchangePattern, org.apache.camel.AsyncCallback, org.apache.camel.AsyncProducerCallback)
351     */
352    public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, ProducerCallback<T> callback) {
353        T answer = null;
354
355        // get the producer and we do not mind if its pooled as we can handle returning it back to the pool
356        Producer producer = doGetProducer(endpoint, true);
357
358        if (producer == null) {
359            if (isStopped()) {
360                LOG.warn("Ignoring exchange sent after processor is stopped: {}", exchange);
361                return null;
362            } else {
363                throw new IllegalStateException("No producer, this processor has not been started: " + this);
364            }
365        }
366
367        try {
368            // invoke the callback
369            answer = callback.doInProducer(producer, exchange, pattern);
370        } catch (Throwable e) {
371            if (exchange != null) {
372                exchange.setException(e);
373            }
374        } finally {
375            if (producer instanceof ServicePoolAware) {
376                // release back to the pool
377                pool.release(endpoint, producer);
378            } else if (!producer.isSingleton()) {
379                // stop and shutdown non-singleton producers as we should not leak resources
380                try {
381                    ServiceHelper.stopAndShutdownService(producer);
382                } catch (Exception e) {
383                    // ignore and continue
384                    LOG.warn("Error stopping/shutting down producer: {}", producer, e);
385                }
386            }
387        }
388
389        return answer;
390    }
391
392    /**
393     * Sends an exchange to an endpoint using a supplied callback supporting the asynchronous routing engine.
394     * <p/>
395     * If an exception was thrown during processing, it would be set on the given Exchange
396     *
397     * @param endpoint         the endpoint to send the exchange to
398     * @param exchange         the exchange, can be <tt>null</tt> if so then create a new exchange from the producer
399     * @param pattern          the exchange pattern, can be <tt>null</tt>
400     * @param callback         the asynchronous callback
401     * @param producerCallback the producer template callback to be executed
402     * @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously
403     */
404    public boolean doInAsyncProducer(final Endpoint endpoint, final Exchange exchange, final ExchangePattern pattern,
405                                     final AsyncCallback callback, final AsyncProducerCallback producerCallback) {
406
407        Producer target;
408        try {
409            // get the producer and we do not mind if its pooled as we can handle returning it back to the pool
410            target = doGetProducer(endpoint, true);
411
412            if (target == null) {
413                if (isStopped()) {
414                    LOG.warn("Ignoring exchange sent after processor is stopped: {}", exchange);
415                    callback.done(true);
416                    return true;
417                } else {
418                    exchange.setException(new IllegalStateException("No producer, this processor has not been started: " + this));
419                    callback.done(true);
420                    return true;
421                }
422            }
423        } catch (Throwable e) {
424            exchange.setException(e);
425            callback.done(true);
426            return true;
427        }
428
429        final Producer producer = target;
430
431        try {
432            StopWatch sw = null;
433            if (eventNotifierEnabled && exchange != null) {
434                boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
435                if (sending) {
436                    sw = new StopWatch();
437                }
438            }
439
440            // record timing for sending the exchange using the producer
441            final StopWatch watch = sw;
442
443            // invoke the callback
444            AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(producer);
445            return producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, doneSync -> {
446                try {
447                    if (eventNotifierEnabled && watch != null) {
448                        long timeTaken = watch.taken();
449                        // emit event that the exchange was sent to the endpoint
450                        EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
451                    }
452
453                    if (producer instanceof ServicePoolAware) {
454                        // release back to the pool
455                        pool.release(endpoint, producer);
456                    } else if (!producer.isSingleton()) {
457                        // stop and shutdown non-singleton producers as we should not leak resources
458                        try {
459                            ServiceHelper.stopAndShutdownService(producer);
460                        } catch (Exception e) {
461                            // ignore and continue
462                            LOG.warn("Error stopping/shutting down producer: {}", producer, e);
463                        }
464                    }
465                } finally {
466                    callback.done(doneSync);
467                }
468            });
469        } catch (Throwable e) {
470            // ensure exceptions is caught and set on the exchange
471            if (exchange != null) {
472                exchange.setException(e);
473            }
474            callback.done(true);
475            return true;
476        }
477    }
478
479    protected boolean asyncDispatchExchange(final Endpoint endpoint, Producer producer,
480                                            final Processor resultProcessor, Exchange exchange, AsyncCallback callback) {
481        // now lets dispatch
482        LOG.debug(">>>> {} {}", endpoint, exchange);
483
484        // set property which endpoint we send to
485        exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
486
487        // send the exchange using the processor
488        try {
489            if (eventNotifierEnabled) {
490                callback = new EventNotifierCallback(callback, exchange, endpoint);
491            }
492            AsyncProcessor target = prepareProducer(producer);
493            // invoke the asynchronous method
494            return internalProcessor.process(exchange, callback, target, resultProcessor);
495        } catch (Throwable e) {
496            // ensure exceptions is caught and set on the exchange
497            exchange.setException(e);
498            callback.done(true);
499            return true;
500        }
501
502    }
503
504    protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern,
505                                    final Processor processor, final Processor resultProcessor, Exchange exchange) {
506        return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() {
507            public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) {
508                if (exchange == null) {
509                    exchange = pattern != null ? producer.getEndpoint().createExchange(pattern) : producer.getEndpoint().createExchange();
510                }
511
512                if (processor != null) {
513                    // lets populate using the processor callback
514                    try {
515                        processor.process(exchange);
516                    } catch (Throwable e) {
517                        // populate failed so return
518                        exchange.setException(e);
519                        return exchange;
520                    }
521                }
522
523                // now lets dispatch
524                LOG.debug(">>>> {} {}", endpoint, exchange);
525
526                // set property which endpoint we send to
527                exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
528
529                // send the exchange using the processor
530                StopWatch watch = null;
531                try {
532                    if (eventNotifierEnabled) {
533                        boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
534                        if (sending) {
535                            watch = new StopWatch();
536                        }
537                    }
538
539                    AsyncProcessor target = prepareProducer(producer);
540                    // invoke the synchronous method
541                    internalProcessor.process(exchange, target, resultProcessor);
542
543                } catch (Throwable e) {
544                    // ensure exceptions is caught and set on the exchange
545                    exchange.setException(e);
546                } finally {
547                    // emit event that the exchange was sent to the endpoint
548                    if (eventNotifierEnabled && watch != null) {
549                        long timeTaken = watch.taken();
550                        EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
551                    }
552                }
553                return exchange;
554            }
555        });
556    }
557
558    protected AsyncProcessor prepareProducer(Producer producer) {
559        return AsyncProcessorConverterHelper.convert(producer);
560    }
561
562    protected synchronized Producer doGetProducer(Endpoint endpoint, boolean pooled) {
563        String key = endpoint.getEndpointUri();
564        Producer answer = producers.get(key);
565        if (pooled && answer == null) {
566            // try acquire from connection pool
567            answer = pool.acquire(endpoint);
568        }
569
570        if (answer == null) {
571            // create a new producer
572            try {
573                answer = endpoint.createProducer();
574                // add as service to CamelContext so its managed via JMX
575                boolean add = answer.isSingleton() || answer instanceof ServicePoolAware;
576                if (add) {
577                    // (false => we and handling the lifecycle of the producer in this cache)
578                    getCamelContext().addService(answer, false);
579                } else {
580                    // fallback and start producer manually
581                    ServiceHelper.startService(answer);
582                }
583            } catch (Throwable e) {
584                throw new FailedToCreateProducerException(endpoint, e);
585            }
586
587            // add producer to cache or pool if applicable
588            if (pooled && answer instanceof ServicePoolAware) {
589                LOG.debug("Adding to producer service pool with key: {} for producer: {}", endpoint, answer);
590                answer = pool.addAndAcquire(endpoint, answer);
591            } else if (answer.isSingleton()) {
592                LOG.debug("Adding to producer cache with key: {} for producer: {}", endpoint, answer);
593                producers.put(key, answer);
594            }
595        }
596
597        if (answer != null) {
598            // record statistics
599            if (extendedStatistics) {
600                statistics.onHit(key);
601            }
602        }
603
604        return answer;
605    }
606
607    protected void doStart() throws Exception {
608        if (extendedStatistics) {
609            int max = maxCacheSize == 0 ? CamelContextHelper.getMaximumCachePoolSize(camelContext) : maxCacheSize;
610            statistics = new DefaultEndpointUtilizationStatistics(max);
611        }
612
613        ServiceHelper.startServices(producers.values());
614        ServiceHelper.startServices(statistics, pool);
615    }
616
617    protected void doStop() throws Exception {
618        // when stopping we intend to shutdown
619        ServiceHelper.stopAndShutdownService(statistics);
620        if (stopServicePool) {
621            ServiceHelper.stopAndShutdownService(pool);
622        }
623        try {
624            ServiceHelper.stopAndShutdownServices(producers.values());
625        } finally {
626            // ensure producers are removed, and also from JMX
627            for (Producer producer : producers.values()) {
628                getCamelContext().removeService(producer);
629            }
630        }
631        producers.clear();
632        if (statistics != null) {
633            statistics.clear();
634        }
635    }
636
637    /**
638     * Returns the current size of the cache
639     *
640     * @return the current size
641     */
642    public int size() {
643        int size = producers.size();
644        size += pool.size();
645
646        LOG.trace("size = {}", size);
647        return size;
648    }
649
650    /**
651     * Gets the maximum cache size (capacity).
652     * <p/>
653     * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
654     *
655     * @return the capacity
656     */
657    public int getCapacity() {
658        int capacity = -1;
659        if (producers instanceof LRUCache) {
660            LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers;
661            capacity = cache.getMaxCacheSize();
662        }
663        return capacity;
664    }
665
666    /**
667     * Gets the cache hits statistic
668     * <p/>
669     * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
670     *
671     * @return the hits
672     */
673    public long getHits() {
674        long hits = -1;
675        if (producers instanceof LRUCache) {
676            LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers;
677            hits = cache.getHits();
678        }
679        return hits;
680    }
681
682    /**
683     * Gets the cache misses statistic
684     * <p/>
685     * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
686     *
687     * @return the misses
688     */
689    public long getMisses() {
690        long misses = -1;
691        if (producers instanceof LRUCache) {
692            LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers;
693            misses = cache.getMisses();
694        }
695        return misses;
696    }
697
698    /**
699     * Gets the cache evicted statistic
700     * <p/>
701     * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
702     *
703     * @return the evicted
704     */
705    public long getEvicted() {
706        long evicted = -1;
707        if (producers instanceof LRUCache) {
708            LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers;
709            evicted = cache.getEvicted();
710        }
711        return evicted;
712    }
713
714    /**
715     * Resets the cache statistics
716     */
717    public void resetCacheStatistics() {
718        if (producers instanceof LRUCache) {
719            LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers;
720            cache.resetStatistics();
721        }
722        if (statistics != null) {
723            statistics.clear();
724        }
725    }
726
727    /**
728     * Purges this cache
729     */
730    public synchronized void purge() {
731        producers.clear();
732        pool.purge();
733        if (statistics != null) {
734            statistics.clear();
735        }
736    }
737
738    /**
739     * Cleanup the cache (purging stale entries)
740     */
741    public void cleanUp() {
742        if (producers instanceof LRUCache) {
743            LRUCache<String, Producer> cache = (LRUCache<String, Producer>) producers;
744            cache.cleanUp();
745        }
746    }
747
748    public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
749        return statistics;
750    }
751
752    @Override
753    public String toString() {
754        return "ProducerCache for source: " + source + ", capacity: " + getCapacity();
755    }
756
757}