001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.Map;
020
021import org.apache.camel.AsyncCallback;
022import org.apache.camel.AsyncProcessor;
023import org.apache.camel.AsyncProducerCallback;
024import org.apache.camel.CamelContext;
025import org.apache.camel.Endpoint;
026import org.apache.camel.Exchange;
027import org.apache.camel.ExchangePattern;
028import org.apache.camel.FailedToCreateProducerException;
029import org.apache.camel.Processor;
030import org.apache.camel.Producer;
031import org.apache.camel.ProducerCallback;
032import org.apache.camel.ServicePoolAware;
033import org.apache.camel.processor.UnitOfWorkProducer;
034import org.apache.camel.spi.EndpointUtilizationStatistics;
035import org.apache.camel.spi.ServicePool;
036import org.apache.camel.support.ServiceSupport;
037import org.apache.camel.util.AsyncProcessorConverterHelper;
038import org.apache.camel.util.CamelContextHelper;
039import org.apache.camel.util.EventHelper;
040import org.apache.camel.util.LRUCache;
041import org.apache.camel.util.ServiceHelper;
042import org.apache.camel.util.StopWatch;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * Cache containing created {@link Producer}.
048 *
049 * @version 
050 */
051public class ProducerCache extends ServiceSupport {
052    private static final Logger LOG = LoggerFactory.getLogger(ProducerCache.class);
053
054    private final CamelContext camelContext;
055    private final ServicePool<Endpoint, Producer> pool;
056    private final Map<String, Producer> producers;
057    private final Object source;
058
059    private EndpointUtilizationStatistics statistics;
060    private boolean eventNotifierEnabled = true;
061    private boolean extendedStatistics;
062    private int maxCacheSize;
063    private boolean stopServicePool;
064
065    public ProducerCache(Object source, CamelContext camelContext) {
066        this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext));
067    }
068
069    public ProducerCache(Object source, CamelContext camelContext, int cacheSize) {
070        this(source, camelContext, null, createLRUCache(cacheSize));
071    }
072
073    public ProducerCache(Object source, CamelContext camelContext, Map<String, Producer> cache) {
074        this(source, camelContext, null, cache);
075    }
076
077    public ProducerCache(Object source, CamelContext camelContext, ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) {
078        this.source = source;
079        this.camelContext = camelContext;
080        if (producerServicePool == null) {
081            // use shared producer pool which lifecycle is managed by CamelContext
082            this.pool = camelContext.getProducerServicePool();
083            this.stopServicePool = false;
084        } else {
085            this.pool = producerServicePool;
086            this.stopServicePool = true;
087        }
088        this.producers = cache;
089        if (producers instanceof LRUCache) {
090            maxCacheSize = ((LRUCache) producers).getMaxCacheSize();
091        }
092
093        // only if JMX is enabled
094        if (camelContext.getManagementStrategy().getManagementAgent() != null) {
095            this.extendedStatistics = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isExtended();
096        } else {
097            this.extendedStatistics = false;
098        }
099    }
100
101    public boolean isEventNotifierEnabled() {
102        return eventNotifierEnabled;
103    }
104
105    /**
106     * Whether {@link org.apache.camel.spi.EventNotifier} is enabled
107     */
108    public void setEventNotifierEnabled(boolean eventNotifierEnabled) {
109        this.eventNotifierEnabled = eventNotifierEnabled;
110    }
111
112    public boolean isExtendedStatistics() {
113        return extendedStatistics;
114    }
115
116    /**
117     * Whether extended JMX statistics is enabled for {@link org.apache.camel.spi.EndpointUtilizationStatistics}
118     */
119    public void setExtendedStatistics(boolean extendedStatistics) {
120        this.extendedStatistics = extendedStatistics;
121    }
122
123    /**
124     * Creates the {@link LRUCache} to be used.
125     * <p/>
126     * This implementation returns a {@link LRUCache} instance.
127
128     * @param cacheSize the cache size
129     * @return the cache
130     */
131    protected static LRUCache<String, Producer> createLRUCache(int cacheSize) {
132        // Use a regular cache as we want to ensure that the lifecycle of the producers
133        // being cache is properly handled, such as they are stopped when being evicted
134        // or when this cache is stopped. This is needed as some producers requires to
135        // be stopped so they can shutdown internal resources that otherwise may cause leaks
136        return new LRUCache<String, Producer>(cacheSize);
137    }
138
139    public CamelContext getCamelContext() {
140        return camelContext;
141    }
142
143    /**
144     * Gets the source which uses this cache
145     *
146     * @return the source
147     */
148    public Object getSource() {
149        return source;
150    }
151
152    /**
153     * Acquires a pooled producer which you <b>must</b> release back again after usage using the
154     * {@link #releaseProducer(org.apache.camel.Endpoint, org.apache.camel.Producer)} method.
155     *
156     * @param endpoint the endpoint
157     * @return the producer
158     */
159    public Producer acquireProducer(Endpoint endpoint) {
160        return doGetProducer(endpoint, true);
161    }
162
163    /**
164     * Releases an acquired producer back after usage.
165     *
166     * @param endpoint the endpoint
167     * @param producer the producer to release
168     * @throws Exception can be thrown if error stopping producer if that was needed.
169     */
170    public void releaseProducer(Endpoint endpoint, Producer producer) throws Exception {
171        if (producer instanceof ServicePoolAware) {
172            // release back to the pool
173            pool.release(endpoint, producer);
174        } else if (!producer.isSingleton()) {
175            // stop and shutdown non-singleton producers as we should not leak resources
176            ServiceHelper.stopAndShutdownService(producer);
177        }
178    }
179
180    /**
181     * Starts the {@link Producer} to be used for sending to the given endpoint
182     * <p/>
183     * This can be used to early start the {@link Producer} to ensure it can be created,
184     * such as when Camel is started. This allows to fail fast in case the {@link Producer}
185     * could not be started.
186     *
187     * @param endpoint the endpoint to send the exchange to
188     * @throws Exception is thrown if failed to create or start the {@link Producer}
189     */
190    public void startProducer(Endpoint endpoint) throws Exception {
191        Producer producer = acquireProducer(endpoint);
192        releaseProducer(endpoint, producer);
193    }
194
195    /**
196     * Sends the exchange to the given endpoint.
197     * <p>
198     * This method will <b>not</b> throw an exception. If processing of the given
199     * Exchange failed then the exception is stored on the provided Exchange
200     *
201     * @param endpoint the endpoint to send the exchange to
202     * @param exchange the exchange to send
203     */
204    public void send(Endpoint endpoint, Exchange exchange) {
205        sendExchange(endpoint, null, null, exchange);
206    }
207
208    /**
209     * Sends an exchange to an endpoint using a supplied
210     * {@link Processor} to populate the exchange
211     * <p>
212     * This method will <b>not</b> throw an exception. If processing of the given
213     * Exchange failed then the exception is stored on the return Exchange
214     *
215     * @param endpoint the endpoint to send the exchange to
216     * @param processor the transformer used to populate the new exchange
217     * @throws org.apache.camel.CamelExecutionException is thrown if sending failed
218     * @return the exchange
219     */
220    public Exchange send(Endpoint endpoint, Processor processor) {
221        return sendExchange(endpoint, null, processor, null);
222    }
223
224    /**
225     * Sends an exchange to an endpoint using a supplied
226     * {@link Processor} to populate the exchange
227     * <p>
228     * This method will <b>not</b> throw an exception. If processing of the given
229     * Exchange failed then the exception is stored on the return Exchange
230     *
231     * @param endpoint the endpoint to send the exchange to
232     * @param pattern the message {@link ExchangePattern} such as
233     *   {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
234     * @param processor the transformer used to populate the new exchange
235     * @return the exchange
236     */
237    public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
238        return sendExchange(endpoint, pattern, processor, null);
239    }
240
241    /**
242     * Sends an exchange to an endpoint using a supplied callback, using the synchronous processing.
243     * <p/>
244     * If an exception was thrown during processing, it would be set on the given Exchange
245     *
246     * @param endpoint  the endpoint to send the exchange to
247     * @param exchange  the exchange, can be <tt>null</tt> if so then create a new exchange from the producer
248     * @param pattern   the exchange pattern, can be <tt>null</tt>
249     * @param callback  the callback
250     * @return the response from the callback
251     * @see #doInAsyncProducer(org.apache.camel.Endpoint, org.apache.camel.Exchange, org.apache.camel.ExchangePattern, org.apache.camel.AsyncCallback, org.apache.camel.AsyncProducerCallback)
252     */
253    public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, ProducerCallback<T> callback) {
254        T answer = null;
255
256        // get the producer and we do not mind if its pooled as we can handle returning it back to the pool
257        Producer producer = acquireProducer(endpoint);
258
259        if (producer == null) {
260            if (isStopped()) {
261                LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
262                return null;
263            } else {
264                throw new IllegalStateException("No producer, this processor has not been started: " + this);
265            }
266        }
267
268        try {
269            // invoke the callback
270            answer = callback.doInProducer(producer, exchange, pattern);
271        } catch (Throwable e) {
272            if (exchange != null) {
273                exchange.setException(e);
274            }
275        } finally {
276            if (producer instanceof ServicePoolAware) {
277                // release back to the pool
278                pool.release(endpoint, producer);
279            } else if (!producer.isSingleton()) {
280                // stop and shutdown non-singleton producers as we should not leak resources
281                try {
282                    ServiceHelper.stopAndShutdownService(producer);
283                } catch (Exception e) {
284                    // ignore and continue
285                    LOG.warn("Error stopping/shutting down producer: " + producer, e);
286                }
287            }
288        }
289
290        return answer;
291    }
292
293    /**
294     * Sends an exchange to an endpoint using a supplied callback supporting the asynchronous routing engine.
295     * <p/>
296     * If an exception was thrown during processing, it would be set on the given Exchange
297     *
298     * @param endpoint         the endpoint to send the exchange to
299     * @param exchange         the exchange, can be <tt>null</tt> if so then create a new exchange from the producer
300     * @param pattern          the exchange pattern, can be <tt>null</tt>
301     * @param callback         the asynchronous callback
302     * @param producerCallback the producer template callback to be executed
303     * @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously
304     */
305    public boolean doInAsyncProducer(final Endpoint endpoint, final Exchange exchange, final ExchangePattern pattern,
306                                     final AsyncCallback callback, final AsyncProducerCallback producerCallback) {
307
308        Producer target;
309        try {
310            // get the producer and we do not mind if its pooled as we can handle returning it back to the pool
311            target = doGetProducer(endpoint, true);
312
313            if (target == null) {
314                if (isStopped()) {
315                    LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
316                    callback.done(true);
317                    return true;
318                } else {
319                    exchange.setException(new IllegalStateException("No producer, this processor has not been started: " + this));
320                    callback.done(true);
321                    return true;
322                }
323            }
324        } catch (Throwable e) {
325            exchange.setException(e);
326            callback.done(true);
327            return true;
328        }
329
330        final Producer producer = target;
331
332        // record timing for sending the exchange using the producer
333        final StopWatch watch = eventNotifierEnabled && exchange != null ? new StopWatch() : null;
334
335        try {
336            if (eventNotifierEnabled && exchange != null) {
337                EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
338            }
339            // invoke the callback
340            AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(producer);
341            return producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, new AsyncCallback() {
342                @Override
343                public void done(boolean doneSync) {
344                    try {
345                        if (eventNotifierEnabled && watch != null) {
346                            long timeTaken = watch.stop();
347                            // emit event that the exchange was sent to the endpoint
348                            EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
349                        }
350
351                        if (producer instanceof ServicePoolAware) {
352                            // release back to the pool
353                            pool.release(endpoint, producer);
354                        } else if (!producer.isSingleton()) {
355                            // stop and shutdown non-singleton producers as we should not leak resources
356                            try {
357                                ServiceHelper.stopAndShutdownService(producer);
358                            } catch (Exception e) {
359                                // ignore and continue
360                                LOG.warn("Error stopping/shutting down producer: " + producer, e);
361                            }
362                        }
363                    } finally {
364                        callback.done(doneSync);
365                    }
366                }
367            });
368        } catch (Throwable e) {
369            // ensure exceptions is caught and set on the exchange
370            if (exchange != null) {
371                exchange.setException(e);
372            }
373            callback.done(true);
374            return true;
375        }
376    }
377
378    protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern,
379                                    final Processor processor, Exchange exchange) {
380        return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() {
381            public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) {
382                if (exchange == null) {
383                    exchange = pattern != null ? producer.getEndpoint().createExchange(pattern) : producer.getEndpoint().createExchange();
384                }
385
386                if (processor != null) {
387                    // lets populate using the processor callback
388                    try {
389                        processor.process(exchange);
390                    } catch (Exception e) {
391                        // populate failed so return
392                        exchange.setException(e);
393                        return exchange;
394                    }
395                }
396
397                // now lets dispatch
398                LOG.debug(">>>> {} {}", endpoint, exchange);
399
400                // set property which endpoint we send to
401                exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
402
403                // send the exchange using the processor
404                StopWatch watch = null;
405                try {
406                    if (eventNotifierEnabled) {
407                        watch = new StopWatch();
408                        EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
409                    }
410                    // ensure we run in an unit of work
411                    Producer target = new UnitOfWorkProducer(producer);
412                    target.process(exchange);
413                } catch (Throwable e) {
414                    // ensure exceptions is caught and set on the exchange
415                    exchange.setException(e);
416                } finally {
417                    // emit event that the exchange was sent to the endpoint
418                    if (eventNotifierEnabled && watch != null) {
419                        long timeTaken = watch.stop();
420                        EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
421                    }
422                }
423                return exchange;
424            }
425        });
426    }
427
428    protected synchronized Producer doGetProducer(Endpoint endpoint, boolean pooled) {
429        String key = endpoint.getEndpointUri();
430        Producer answer = producers.get(key);
431        if (pooled && answer == null) {
432            // try acquire from connection pool
433            answer = pool.acquire(endpoint);
434        }
435
436        if (answer == null) {
437            // create a new producer
438            try {
439                answer = endpoint.createProducer();
440                // add as service which will also start the service
441                // (false => we and handling the lifecycle of the producer in this cache)
442                getCamelContext().addService(answer, false);
443            } catch (Exception e) {
444                throw new FailedToCreateProducerException(endpoint, e);
445            }
446
447            // add producer to cache or pool if applicable
448            if (pooled && answer instanceof ServicePoolAware) {
449                LOG.debug("Adding to producer service pool with key: {} for producer: {}", endpoint, answer);
450                answer = pool.addAndAcquire(endpoint, answer);
451            } else if (answer.isSingleton()) {
452                LOG.debug("Adding to producer cache with key: {} for producer: {}", endpoint, answer);
453                producers.put(key, answer);
454            }
455        }
456
457        if (answer != null) {
458            // record statistics
459            if (extendedStatistics) {
460                statistics.onHit(key);
461            }
462        }
463
464        return answer;
465    }
466
467    protected void doStart() throws Exception {
468        if (extendedStatistics) {
469            int max = maxCacheSize == 0 ? CamelContextHelper.getMaximumCachePoolSize(camelContext) : maxCacheSize;
470            statistics = new DefaultEndpointUtilizationStatistics(max);
471        }
472
473        ServiceHelper.startServices(producers.values());
474        ServiceHelper.startServices(statistics, pool);
475    }
476
477    protected void doStop() throws Exception {
478        // when stopping we intend to shutdown
479        ServiceHelper.stopAndShutdownService(statistics);
480        if (stopServicePool) {
481            ServiceHelper.stopAndShutdownService(pool);
482        }
483        try {
484            ServiceHelper.stopAndShutdownServices(producers.values());
485        } finally {
486            // ensure producers are removed, and also from JMX
487            for (Producer producer : producers.values()) {
488                getCamelContext().removeService(producer);
489            }
490        }
491        producers.clear();
492        if (statistics != null) {
493            statistics.clear();
494        }
495    }
496
497    /**
498     * Returns the current size of the cache
499     *
500     * @return the current size
501     */
502    public int size() {
503        int size = producers.size();
504        size += pool.size();
505
506        LOG.trace("size = {}", size);
507        return size;
508    }
509
510    /**
511     * Gets the maximum cache size (capacity).
512     * <p/>
513     * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
514     *
515     * @return the capacity
516     */
517    public int getCapacity() {
518        int capacity = -1;
519        if (producers instanceof LRUCache) {
520            LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers;
521            capacity = cache.getMaxCacheSize();
522        }
523        return capacity;
524    }
525
526    /**
527     * Gets the cache hits statistic
528     * <p/>
529     * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
530     *
531     * @return the hits
532     */
533    public long getHits() {
534        long hits = -1;
535        if (producers instanceof LRUCache) {
536            LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers;
537            hits = cache.getHits();
538        }
539        return hits;
540    }
541
542    /**
543     * Gets the cache misses statistic
544     * <p/>
545     * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
546     *
547     * @return the misses
548     */
549    public long getMisses() {
550        long misses = -1;
551        if (producers instanceof LRUCache) {
552            LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers;
553            misses = cache.getMisses();
554        }
555        return misses;
556    }
557
558    /**
559     * Gets the cache evicted statistic
560     * <p/>
561     * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
562     *
563     * @return the evicted
564     */
565    public long getEvicted() {
566        long evicted = -1;
567        if (producers instanceof LRUCache) {
568            LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers;
569            evicted = cache.getEvicted();
570        }
571        return evicted;
572    }
573
574    /**
575     * Resets the cache statistics
576     */
577    public void resetCacheStatistics() {
578        if (producers instanceof LRUCache) {
579            LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers;
580            cache.resetStatistics();
581        }
582        if (statistics != null) {
583            statistics.clear();
584        }
585    }
586
587    /**
588     * Purges this cache
589     */
590    public synchronized void purge() {
591        producers.clear();
592        pool.purge();
593        if (statistics != null) {
594            statistics.clear();
595        }
596    }
597
598    public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
599        return statistics;
600    }
601
602    @Override
603    public String toString() {
604        return "ProducerCache for source: " + source + ", capacity: " + getCapacity();
605    }
606}