001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.Map;
020
021import org.apache.camel.CamelContext;
022import org.apache.camel.Endpoint;
023import org.apache.camel.Exchange;
024import org.apache.camel.FailedToCreateConsumerException;
025import org.apache.camel.IsSingleton;
026import org.apache.camel.PollingConsumer;
027import org.apache.camel.RuntimeCamelException;
028import org.apache.camel.ServicePoolAware;
029import org.apache.camel.spi.EndpointUtilizationStatistics;
030import org.apache.camel.spi.ServicePool;
031import org.apache.camel.support.ServiceSupport;
032import org.apache.camel.util.CamelContextHelper;
033import org.apache.camel.util.LRUCache;
034import org.apache.camel.util.ServiceHelper;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * Cache containing created {@link org.apache.camel.Consumer}.
040 *
041 * @version 
042 */
043public class ConsumerCache extends ServiceSupport {
044    private static final Logger LOG = LoggerFactory.getLogger(ConsumerCache.class);
045
046    private final CamelContext camelContext;
047    private final ServicePool<Endpoint, PollingConsumer> pool;
048    private final Map<String, PollingConsumer> consumers;
049    private final Object source;
050
051    private EndpointUtilizationStatistics statistics;
052    private boolean extendedStatistics;
053    private int maxCacheSize;
054
055    public ConsumerCache(Object source, CamelContext camelContext) {
056        this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext));
057    }
058
059    public ConsumerCache(Object source, CamelContext camelContext, int cacheSize) {
060        this(source, camelContext, createLRUCache(cacheSize));
061    }
062    
063    public ConsumerCache(Object source, CamelContext camelContext, Map<String, PollingConsumer> cache) {
064        this(source, camelContext, cache, camelContext.getPollingConsumerServicePool());
065    }
066
067    public ConsumerCache(Object source, CamelContext camelContext, Map<String, PollingConsumer> cache, ServicePool<Endpoint, PollingConsumer> pool) {
068        this.camelContext = camelContext;
069        this.consumers = cache;
070        this.source = source;
071        this.pool = pool;
072        if (consumers instanceof LRUCache) {
073            maxCacheSize = ((LRUCache) consumers).getMaxCacheSize();
074        }
075
076        // only if JMX is enabled
077        if (camelContext.getManagementStrategy().getManagementAgent() != null) {
078            this.extendedStatistics = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isExtended();
079        } else {
080            this.extendedStatistics = false;
081        }
082    }
083
084    public boolean isExtendedStatistics() {
085        return extendedStatistics;
086    }
087
088    /**
089     * Whether extended JMX statistics is enabled for {@link org.apache.camel.spi.EndpointUtilizationStatistics}
090     */
091    public void setExtendedStatistics(boolean extendedStatistics) {
092        this.extendedStatistics = extendedStatistics;
093    }
094
095    /**
096     * Creates the {@link LRUCache} to be used.
097     * <p/>
098     * This implementation returns a {@link LRUCache} instance.
099
100     * @param cacheSize the cache size
101     * @return the cache
102     */
103    protected static LRUCache<String, PollingConsumer> createLRUCache(int cacheSize) {
104        // Use a regular cache as we want to ensure that the lifecycle of the consumers
105        // being cache is properly handled, such as they are stopped when being evicted
106        // or when this cache is stopped. This is needed as some consumers requires to
107        // be stopped so they can shutdown internal resources that otherwise may cause leaks
108        return new LRUCache<String, PollingConsumer>(cacheSize);
109    }
110    
111    /**
112     * Acquires a pooled PollingConsumer which you <b>must</b> release back again after usage using the
113     * {@link #releasePollingConsumer(org.apache.camel.Endpoint, org.apache.camel.PollingConsumer)} method.
114     *
115     * @param endpoint the endpoint
116     * @return the PollingConsumer
117     */
118    public PollingConsumer acquirePollingConsumer(Endpoint endpoint) {
119        return doGetPollingConsumer(endpoint, true);
120    }
121
122    /**
123     * Releases an acquired producer back after usage.
124     *
125     * @param endpoint the endpoint
126     * @param pollingConsumer the pollingConsumer to release
127     */
128    public void releasePollingConsumer(Endpoint endpoint, PollingConsumer pollingConsumer) {
129        if (pollingConsumer instanceof ServicePoolAware) {
130            // release back to the pool
131            pool.release(endpoint, pollingConsumer);
132        } else {
133            boolean singleton = false;
134            if (pollingConsumer instanceof IsSingleton) {
135                singleton = ((IsSingleton) pollingConsumer).isSingleton();
136            }
137            if (!singleton) {
138                try {
139                    // stop and shutdown non-singleton producers as we should not leak resources
140                    ServiceHelper.stopAndShutdownService(pollingConsumer);
141                } catch (Exception ex) {
142                    if (ex instanceof RuntimeCamelException) {
143                        throw (RuntimeCamelException)ex;
144                    } else {
145                        throw new RuntimeCamelException(ex);
146                    }
147                }
148            }
149        }
150    }
151
152    public PollingConsumer getConsumer(Endpoint endpoint) {
153        return doGetPollingConsumer(endpoint, true);
154    }
155    
156    protected synchronized PollingConsumer doGetPollingConsumer(Endpoint endpoint, boolean pooled) {
157        String key = endpoint.getEndpointUri();
158        PollingConsumer answer = consumers.get(key);
159        if (pooled && answer == null) {
160            pool.acquire(endpoint);
161        }  
162        
163        if (answer == null) {
164            try {
165                answer = endpoint.createPollingConsumer();
166                answer.start();
167            } catch (Exception e) {
168                throw new FailedToCreateConsumerException(endpoint, e);
169            }
170            if (pooled && answer instanceof ServicePoolAware) {
171                LOG.debug("Adding to producer service pool with key: {} for producer: {}", endpoint, answer);
172                answer = pool.addAndAcquire(endpoint, answer);
173            } else {
174                boolean singleton = false;
175                if (answer instanceof IsSingleton) {
176                    singleton = ((IsSingleton) answer).isSingleton();
177                }
178                if (singleton) {
179                    LOG.debug("Adding to consumer cache with key: {} for consumer: {}", endpoint, answer);
180                    consumers.put(key, answer);
181                } else {
182                    LOG.debug("Consumer for endpoint: {} is not singleton and thus not added to consumer cache", key);
183                }
184            }
185        }
186
187        if (answer != null) {
188            // record statistics
189            if (extendedStatistics) {
190                statistics.onHit(key);
191            }
192        }
193
194        return answer;
195    }
196 
197    public Exchange receive(Endpoint endpoint) {
198        LOG.debug("<<<< {}", endpoint);
199        PollingConsumer consumer = null;
200        try {
201            consumer = acquirePollingConsumer(endpoint);
202            return consumer.receive();
203        } finally {
204            if (consumer != null) {
205                releasePollingConsumer(endpoint, consumer);
206            }
207        }
208    }
209
210    public Exchange receive(Endpoint endpoint, long timeout) {
211        LOG.debug("<<<< {}", endpoint);
212        PollingConsumer consumer = null;
213        try {
214            consumer = acquirePollingConsumer(endpoint);
215            return consumer.receive(timeout);
216        } finally {
217            if (consumer != null) {
218                releasePollingConsumer(endpoint, consumer);
219            }
220        }
221    }
222
223    public Exchange receiveNoWait(Endpoint endpoint) {
224        LOG.debug("<<<< {}", endpoint);
225        PollingConsumer consumer = null;
226        try {
227            consumer = doGetPollingConsumer(endpoint, true);
228            return consumer.receiveNoWait();
229        } finally {
230            if (consumer != null) {
231                releasePollingConsumer(endpoint, consumer);
232            }
233        }
234    }
235    
236    public CamelContext getCamelContext() {
237        return camelContext;
238    }
239
240    /**
241     * Gets the source which uses this cache
242     *
243     * @return the source
244     */
245    public Object getSource() {
246        return source;
247    }
248
249    /**
250     * Returns the current size of the cache
251     *
252     * @return the current size
253     */
254    public int size() {
255        int size = consumers.size();
256        LOG.trace("size = {}", size);
257        return size;
258    }
259
260    /**
261     * Gets the maximum cache size (capacity).
262     * <p/>
263     * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
264     *
265     * @return the capacity
266     */
267    public int getCapacity() {
268        int capacity = -1;
269        if (consumers instanceof LRUCache) {
270            LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers;
271            capacity = cache.getMaxCacheSize();
272        }
273        return capacity;
274    }
275
276    /**
277     * Gets the cache hits statistic
278     * <p/>
279     * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
280     *
281     * @return the hits
282     */
283    public long getHits() {
284        long hits = -1;
285        if (consumers instanceof LRUCache) {
286            LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers;
287            hits = cache.getHits();
288        }
289        return hits;
290    }
291
292    /**
293     * Gets the cache misses statistic
294     * <p/>
295     * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
296     *
297     * @return the misses
298     */
299    public long getMisses() {
300        long misses = -1;
301        if (consumers instanceof LRUCache) {
302            LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers;
303            misses = cache.getMisses();
304        }
305        return misses;
306    }
307
308    /**
309     * Gets the cache evicted statistic
310     * <p/>
311     * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used.
312     *
313     * @return the evicted
314     */
315    public long getEvicted() {
316        long evicted = -1;
317        if (consumers instanceof LRUCache) {
318            LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers;
319            evicted = cache.getEvicted();
320        }
321        return evicted;
322    }
323
324    /**
325     * Resets the cache statistics
326     */
327    public void resetCacheStatistics() {
328        if (consumers instanceof LRUCache) {
329            LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers;
330            cache.resetStatistics();
331        }
332        if (statistics != null) {
333            statistics.clear();
334        }
335    }
336
337    /**
338     * Purges this cache
339     */
340    public synchronized void purge() {
341        consumers.clear();
342        if (statistics != null) {
343            statistics.clear();
344        }
345    }
346
347    public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
348        return statistics;
349    }
350
351    @Override
352    public String toString() {
353        return "ConsumerCache for source: " + source + ", capacity: " + getCapacity();
354    }
355
356    protected void doStart() throws Exception {
357        if (extendedStatistics) {
358            int max = maxCacheSize == 0 ? CamelContextHelper.getMaximumCachePoolSize(camelContext) : maxCacheSize;
359            statistics = new DefaultEndpointUtilizationStatistics(max);
360        }
361
362        ServiceHelper.startServices(consumers.values());
363    }
364
365    protected void doStop() throws Exception {
366        // when stopping we intend to shutdown
367        ServiceHelper.stopAndShutdownServices(statistics, pool);
368        try {
369            ServiceHelper.stopAndShutdownServices(consumers.values());
370        } finally {
371            // ensure consumers are removed, and also from JMX
372            for (PollingConsumer consumer : consumers.values()) {
373                getCamelContext().removeService(consumer);
374            }
375        }
376        consumers.clear();
377        if (statistics != null) {
378            statistics.clear();
379        }
380    }
381
382}