001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.Map;
020    
021    import org.apache.camel.AsyncCallback;
022    import org.apache.camel.AsyncProcessor;
023    import org.apache.camel.AsyncProducerCallback;
024    import org.apache.camel.CamelContext;
025    import org.apache.camel.Endpoint;
026    import org.apache.camel.Exchange;
027    import org.apache.camel.ExchangePattern;
028    import org.apache.camel.FailedToCreateProducerException;
029    import org.apache.camel.Processor;
030    import org.apache.camel.Producer;
031    import org.apache.camel.ProducerCallback;
032    import org.apache.camel.ServicePoolAware;
033    import org.apache.camel.processor.UnitOfWorkProducer;
034    import org.apache.camel.spi.ServicePool;
035    import org.apache.camel.support.ServiceSupport;
036    import org.apache.camel.util.AsyncProcessorConverterHelper;
037    import org.apache.camel.util.CamelContextHelper;
038    import org.apache.camel.util.EventHelper;
039    import org.apache.camel.util.LRUCache;
040    import org.apache.camel.util.ServiceHelper;
041    import org.apache.camel.util.StopWatch;
042    import org.slf4j.Logger;
043    import org.slf4j.LoggerFactory;
044    
045    /**
046     * Cache containing created {@link Producer}.
047     *
048     * @version 
049     */
050    public class ProducerCache extends ServiceSupport {
051        private static final transient Logger LOG = LoggerFactory.getLogger(ProducerCache.class);
052    
053        private final CamelContext camelContext;
054        private final ServicePool<Endpoint, Producer> pool;
055        private final Map<String, Producer> producers;
056        private final Object source;
057    
058        public ProducerCache(Object source, CamelContext camelContext) {
059            this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext));
060        }
061    
062        public ProducerCache(Object source, CamelContext camelContext, int cacheSize) {
063            this(source, camelContext, camelContext.getProducerServicePool(), createLRUCache(cacheSize));
064        }
065    
066        public ProducerCache(Object source, CamelContext camelContext, Map<String, Producer> cache) {
067            this(source, camelContext, camelContext.getProducerServicePool(), cache);
068        }
069    
070        public ProducerCache(Object source, CamelContext camelContext, ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) {
071            this.source = source;
072            this.camelContext = camelContext;
073            this.pool = producerServicePool;
074            this.producers = cache;
075        }
076    
077        /**
078         * Creates the {@link LRUCache} to be used.
079         * <p/>
080         * This implementation returns a {@link LRUCache} instance.
081    
082         * @param cacheSize the cache size
083         * @return the cache
084         */
085        protected static LRUCache<String, Producer> createLRUCache(int cacheSize) {
086            // Use a regular cache as we want to ensure that the lifecycle of the producers
087            // being cache is properly handled, such as they are stopped when being evicted
088            // or when this cache is stopped. This is needed as some producers requires to
089            // be stopped so they can shutdown internal resources that otherwise may cause leaks
090            return new LRUCache<String, Producer>(cacheSize);
091        }
092    
093        public CamelContext getCamelContext() {
094            return camelContext;
095        }
096    
097        /**
098         * Gets the source which uses this cache
099         *
100         * @return the source
101         */
102        public Object getSource() {
103            return source;
104        }
105    
106        /**
107         * Acquires a pooled producer which you <b>must</b> release back again after usage using the
108         * {@link #releaseProducer(org.apache.camel.Endpoint, org.apache.camel.Producer)} method.
109         *
110         * @param endpoint the endpoint
111         * @return the producer
112         */
113        public Producer acquireProducer(Endpoint endpoint) {
114            return doGetProducer(endpoint, true);
115        }
116    
117        /**
118         * Releases an acquired producer back after usage.
119         *
120         * @param endpoint the endpoint
121         * @param producer the producer to release
122         * @throws Exception can be thrown if error stopping producer if that was needed.
123         */
124        public void releaseProducer(Endpoint endpoint, Producer producer) throws Exception {
125            if (producer instanceof ServicePoolAware) {
126                // release back to the pool
127                pool.release(endpoint, producer);
128            } else if (!producer.isSingleton()) {
129                // stop non singleton producers as we should not leak resources
130                producer.stop();
131            }
132        }
133    
134        /**
135         * Starts the {@link Producer} to be used for sending to the given endpoint
136         * <p/>
137         * This can be used to early start the {@link Producer} to ensure it can be created,
138         * such as when Camel is started. This allows to fail fast in case the {@link Producer}
139         * could not be started.
140         *
141         * @param endpoint the endpoint to send the exchange to
142         * @throws Exception is thrown if failed to create or start the {@link Producer}
143         */
144        public void startProducer(Endpoint endpoint) throws Exception {
145            Producer producer = acquireProducer(endpoint);
146            releaseProducer(endpoint, producer);
147        }
148    
149        /**
150         * Sends the exchange to the given endpoint.
151         * <p>
152         * This method will <b>not</b> throw an exception. If processing of the given
153         * Exchange failed then the exception is stored on the provided Exchange
154         *
155         * @param endpoint the endpoint to send the exchange to
156         * @param exchange the exchange to send
157         */
158        public void send(Endpoint endpoint, Exchange exchange) {
159            sendExchange(endpoint, null, null, exchange);
160        }
161    
162        /**
163         * Sends an exchange to an endpoint using a supplied
164         * {@link Processor} to populate the exchange
165         * <p>
166         * This method will <b>not</b> throw an exception. If processing of the given
167         * Exchange failed then the exception is stored on the return Exchange
168         *
169         * @param endpoint the endpoint to send the exchange to
170         * @param processor the transformer used to populate the new exchange
171         * @throws org.apache.camel.CamelExecutionException is thrown if sending failed
172         * @return the exchange
173         */
174        public Exchange send(Endpoint endpoint, Processor processor) {
175            return sendExchange(endpoint, null, processor, null);
176        }
177    
178        /**
179         * Sends an exchange to an endpoint using a supplied
180         * {@link Processor} to populate the exchange
181         * <p>
182         * This method will <b>not</b> throw an exception. If processing of the given
183         * Exchange failed then the exception is stored on the return Exchange
184         *
185         * @param endpoint the endpoint to send the exchange to
186         * @param pattern the message {@link ExchangePattern} such as
187         *   {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
188         * @param processor the transformer used to populate the new exchange
189         * @return the exchange
190         */
191        public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
192            return sendExchange(endpoint, pattern, processor, null);
193        }
194    
195        /**
196         * Sends an exchange to an endpoint using a supplied callback, using the synchronous processing.
197         * <p/>
198         * If an exception was thrown during processing, it would be set on the given Exchange
199         *
200         * @param endpoint  the endpoint to send the exchange to
201         * @param exchange  the exchange, can be <tt>null</tt> if so then create a new exchange from the producer
202         * @param pattern   the exchange pattern, can be <tt>null</tt>
203         * @param callback  the callback
204         * @return the response from the callback
205         * @see #doInAsyncProducer(org.apache.camel.Endpoint, org.apache.camel.Exchange, org.apache.camel.ExchangePattern, org.apache.camel.AsyncCallback, org.apache.camel.AsyncProducerCallback)
206         */
207        public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, ProducerCallback<T> callback) {
208            T answer = null;
209    
210            // get the producer and we do not mind if its pooled as we can handle returning it back to the pool
211            Producer producer = doGetProducer(endpoint, true);
212    
213            if (producer == null) {
214                if (isStopped()) {
215                    LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
216                    return null;
217                } else {
218                    throw new IllegalStateException("No producer, this processor has not been started: " + this);
219                }
220            }
221    
222            StopWatch watch = null;
223            if (exchange != null) {
224                // record timing for sending the exchange using the producer
225                watch = new StopWatch();
226            }
227    
228            try {
229                if (exchange != null) {
230                    EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
231                }
232                // invoke the callback
233                answer = callback.doInProducer(producer, exchange, pattern);
234            } catch (Throwable e) {
235                if (exchange != null) {
236                    exchange.setException(e);
237                }
238            } finally {
239                if (exchange != null) {
240                    long timeTaken = watch.stop();
241                    // emit event that the exchange was sent to the endpoint
242                    EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
243                }
244                if (producer instanceof ServicePoolAware) {
245                    // release back to the pool
246                    pool.release(endpoint, producer);
247                } else if (!producer.isSingleton()) {
248                    // stop non singleton producers as we should not leak resources
249                    try {
250                        ServiceHelper.stopService(producer);
251                    } catch (Exception e) {
252                        // ignore and continue
253                        LOG.warn("Error stopping producer: " + producer, e);
254                    }
255                }
256            }
257    
258            return answer;
259        }
260    
261        /**
262         * Sends an exchange to an endpoint using a supplied callback supporting the asynchronous routing engine.
263         * <p/>
264         * If an exception was thrown during processing, it would be set on the given Exchange
265         *
266         * @param endpoint         the endpoint to send the exchange to
267         * @param exchange         the exchange, can be <tt>null</tt> if so then create a new exchange from the producer
268         * @param pattern          the exchange pattern, can be <tt>null</tt>
269         * @param callback         the asynchronous callback
270         * @param producerCallback the producer template callback to be executed
271         * @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously
272         */
273        public boolean doInAsyncProducer(final Endpoint endpoint, final Exchange exchange, final ExchangePattern pattern,
274                                         final AsyncCallback callback, final AsyncProducerCallback producerCallback) {
275            boolean sync = true;
276    
277            // get the producer and we do not mind if its pooled as we can handle returning it back to the pool
278            final Producer producer = doGetProducer(endpoint, true);
279    
280            if (producer == null) {
281                if (isStopped()) {
282                    LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
283                    return false;
284                } else {
285                    throw new IllegalStateException("No producer, this processor has not been started: " + this);
286                }
287            }
288    
289            // record timing for sending the exchange using the producer
290            final StopWatch watch = exchange != null ? new StopWatch() : null;
291    
292            try {
293                if (exchange != null) {
294                    EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
295                }
296                // invoke the callback
297                AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(producer);
298                sync = producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, new AsyncCallback() {
299                    @Override
300                    public void done(boolean doneSync) {
301                        try {
302                            if (watch != null) {
303                                long timeTaken = watch.stop();
304                                // emit event that the exchange was sent to the endpoint
305                                EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
306                            }
307    
308                            if (producer instanceof ServicePoolAware) {
309                                // release back to the pool
310                                pool.release(endpoint, producer);
311                            } else if (!producer.isSingleton()) {
312                                // stop non singleton producers as we should not leak resources
313                                try {
314                                    ServiceHelper.stopService(producer);
315                                } catch (Exception e) {
316                                    // ignore and continue
317                                    LOG.warn("Error stopping producer: " + producer, e);
318                                }
319                            }
320                        } finally {
321                            callback.done(doneSync);
322                        }
323                    }
324                });
325            } catch (Throwable e) {
326                // ensure exceptions is caught and set on the exchange
327                if (exchange != null) {
328                    exchange.setException(e);
329                }
330            }
331    
332            return sync;
333        }
334    
335        protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern,
336                                        final Processor processor, Exchange exchange) {
337            return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() {
338                public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) {
339                    if (exchange == null) {
340                        exchange = pattern != null ? producer.createExchange(pattern) : producer.createExchange();
341                    }
342    
343                    if (processor != null) {
344                        // lets populate using the processor callback
345                        try {
346                            processor.process(exchange);
347                        } catch (Exception e) {
348                            // populate failed so return
349                            exchange.setException(e);
350                            return exchange;
351                        }
352                    }
353    
354                    // now lets dispatch
355                    LOG.debug(">>>> {} {}", endpoint, exchange);
356    
357                    // set property which endpoint we send to
358                    exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
359    
360                    // send the exchange using the processor
361                    StopWatch watch = new StopWatch();
362                    try {
363                        EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
364                        // ensure we run in an unit of work
365                        Producer target = new UnitOfWorkProducer(producer);
366                        target.process(exchange);
367                    } catch (Throwable e) {
368                        // ensure exceptions is caught and set on the exchange
369                        exchange.setException(e);
370                    } finally {
371                        // emit event that the exchange was sent to the endpoint
372                        long timeTaken = watch.stop();
373                        EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
374                    }
375                    return exchange;
376                }
377            });
378        }
379    
380        protected synchronized Producer doGetProducer(Endpoint endpoint, boolean pooled) {
381            String key = endpoint.getEndpointUri();
382            Producer answer = producers.get(key);
383            if (pooled && answer == null) {
384                // try acquire from connection pool
385                answer = pool.acquire(endpoint);
386            }
387    
388            if (answer == null) {
389                // create a new producer
390                try {
391                    answer = endpoint.createProducer();
392                    // must then start service so producer is ready to be used
393                    ServiceHelper.startService(answer);
394                } catch (Exception e) {
395                    throw new FailedToCreateProducerException(endpoint, e);
396                }
397    
398                // add producer to cache or pool if applicable
399                if (pooled && answer instanceof ServicePoolAware) {
400                    LOG.debug("Adding to producer service pool with key: {} for producer: {}", endpoint, answer);
401                    answer = pool.addAndAcquire(endpoint, answer);
402                } else if (answer.isSingleton()) {
403                    LOG.debug("Adding to producer cache with key: {} for producer: {}", endpoint, answer);
404                    producers.put(key, answer);
405                }
406            }
407    
408            return answer;
409        }
410    
411        protected void doStart() throws Exception {
412            ServiceHelper.startServices(producers.values());
413            ServiceHelper.startServices(pool);
414        }
415    
416        protected void doStop() throws Exception {
417            // when stopping we intend to shutdown
418            ServiceHelper.stopAndShutdownService(pool);
419            ServiceHelper.stopAndShutdownServices(producers.values());
420            producers.clear();
421        }
422    
423        /**
424         * Returns the current size of the cache
425         *
426         * @return the current size
427         */
428        public int size() {
429            int size = producers.size();
430            size += pool.size();
431    
432            LOG.trace("size = {}", size);
433            return size;
434        }
435    
436        /**
437         * Gets the maximum cache size (capacity).
438         * <p/>
439         * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
440         *
441         * @return the capacity
442         */
443        public int getCapacity() {
444            int capacity = -1;
445            if (producers instanceof LRUCache) {
446                LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers;
447                capacity = cache.getMaxCacheSize();
448            }
449            return capacity;
450        }
451    
452        /**
453         * Gets the cache hits statistic
454         * <p/>
455         * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
456         *
457         * @return the hits
458         */
459        public long getHits() {
460            long hits = -1;
461            if (producers instanceof LRUCache) {
462                LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers;
463                hits = cache.getHits();
464            }
465            return hits;
466        }
467    
468        /**
469         * Gets the cache misses statistic
470         * <p/>
471         * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
472         *
473         * @return the misses
474         */
475        public long getMisses() {
476            long misses = -1;
477            if (producers instanceof LRUCache) {
478                LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers;
479                misses = cache.getMisses();
480            }
481            return misses;
482        }
483    
484        /**
485         * Gets the cache evicted statistic
486         * <p/>
487         * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
488         *
489         * @return the evicted
490         */
491        public long getEvicted() {
492            long evicted = -1;
493            if (producers instanceof LRUCache) {
494                LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers;
495                evicted = cache.getEvicted();
496            }
497            return evicted;
498        }
499    
500        /**
501         * Resets the cache statistics
502         */
503        public void resetCacheStatistics() {
504            if (producers instanceof LRUCache) {
505                LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers;
506                cache.resetStatistics();
507            }
508        }
509    
510        /**
511         * Purges this cache
512         */
513        public synchronized void purge() {
514            producers.clear();
515            pool.purge();
516        }
517    
518        @Override
519        public String toString() {
520            return "ProducerCache for source: " + source + ", capacity: " + getCapacity();
521        }
522    }