001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.Map; 020 021import org.apache.camel.CamelContext; 022import org.apache.camel.Endpoint; 023import org.apache.camel.Exchange; 024import org.apache.camel.FailedToCreateConsumerException; 025import org.apache.camel.IsSingleton; 026import org.apache.camel.PollingConsumer; 027import org.apache.camel.RuntimeCamelException; 028import org.apache.camel.ServicePoolAware; 029import org.apache.camel.spi.EndpointUtilizationStatistics; 030import org.apache.camel.spi.ServicePool; 031import org.apache.camel.support.ServiceSupport; 032import org.apache.camel.util.CamelContextHelper; 033import org.apache.camel.util.LRUCache; 034import org.apache.camel.util.LRUCacheFactory; 035import org.apache.camel.util.ServiceHelper; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * Cache containing created {@link org.apache.camel.Consumer}. 041 * 042 * @version 043 */ 044public class ConsumerCache extends ServiceSupport { 045 private static final Logger LOG = LoggerFactory.getLogger(ConsumerCache.class); 046 047 private final CamelContext camelContext; 048 private final ServicePool<Endpoint, PollingConsumer> pool; 049 private final Map<String, PollingConsumer> consumers; 050 private final Object source; 051 052 private EndpointUtilizationStatistics statistics; 053 private boolean extendedStatistics; 054 private int maxCacheSize; 055 056 public ConsumerCache(Object source, CamelContext camelContext) { 057 this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext)); 058 } 059 060 public ConsumerCache(Object source, CamelContext camelContext, int cacheSize) { 061 this(source, camelContext, createLRUCache(cacheSize)); 062 } 063 064 public ConsumerCache(Object source, CamelContext camelContext, Map<String, PollingConsumer> cache) { 065 this(source, camelContext, cache, camelContext.getPollingConsumerServicePool()); 066 } 067 068 public ConsumerCache(Object source, CamelContext camelContext, Map<String, PollingConsumer> cache, ServicePool<Endpoint, PollingConsumer> pool) { 069 this.camelContext = camelContext; 070 this.consumers = cache; 071 this.source = source; 072 this.pool = pool; 073 if (consumers instanceof LRUCache) { 074 maxCacheSize = ((LRUCache) consumers).getMaxCacheSize(); 075 } 076 077 // only if JMX is enabled 078 if (camelContext.getManagementStrategy().getManagementAgent() != null) { 079 this.extendedStatistics = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isExtended(); 080 } else { 081 this.extendedStatistics = false; 082 } 083 } 084 085 public boolean isExtendedStatistics() { 086 return extendedStatistics; 087 } 088 089 /** 090 * Whether extended JMX statistics is enabled for {@link org.apache.camel.spi.EndpointUtilizationStatistics} 091 */ 092 public void setExtendedStatistics(boolean extendedStatistics) { 093 this.extendedStatistics = extendedStatistics; 094 } 095 096 /** 097 * Creates the {@link LRUCache} to be used. 098 * <p/> 099 * This implementation returns a {@link LRUCache} instance. 100 101 * @param cacheSize the cache size 102 * @return the cache 103 */ 104 @SuppressWarnings("unchecked") 105 protected static LRUCache<String, PollingConsumer> createLRUCache(int cacheSize) { 106 // Use a regular cache as we want to ensure that the lifecycle of the consumers 107 // being cache is properly handled, such as they are stopped when being evicted 108 // or when this cache is stopped. This is needed as some consumers requires to 109 // be stopped so they can shutdown internal resources that otherwise may cause leaks 110 return LRUCacheFactory.newLRUCache(cacheSize); 111 } 112 113 /** 114 * Acquires a pooled PollingConsumer which you <b>must</b> release back again after usage using the 115 * {@link #releasePollingConsumer(org.apache.camel.Endpoint, org.apache.camel.PollingConsumer)} method. 116 * 117 * @param endpoint the endpoint 118 * @return the PollingConsumer 119 */ 120 public PollingConsumer acquirePollingConsumer(Endpoint endpoint) { 121 return doGetPollingConsumer(endpoint, true); 122 } 123 124 /** 125 * Releases an acquired producer back after usage. 126 * 127 * @param endpoint the endpoint 128 * @param pollingConsumer the pollingConsumer to release 129 */ 130 public void releasePollingConsumer(Endpoint endpoint, PollingConsumer pollingConsumer) { 131 if (pollingConsumer instanceof ServicePoolAware) { 132 // release back to the pool 133 pool.release(endpoint, pollingConsumer); 134 } else { 135 boolean singleton = false; 136 if (pollingConsumer instanceof IsSingleton) { 137 singleton = ((IsSingleton) pollingConsumer).isSingleton(); 138 } 139 String key = endpoint.getEndpointUri(); 140 boolean cached = consumers.containsKey(key); 141 if (!singleton || !cached) { 142 try { 143 // stop and shutdown non-singleton/non-cached consumers as we should not leak resources 144 if (!singleton) { 145 LOG.debug("Released PollingConsumer: {} is stopped as consumer is not singleton", endpoint); 146 } else { 147 LOG.debug("Released PollingConsumer: {} is stopped as consumer cache is full", endpoint); 148 } 149 ServiceHelper.stopAndShutdownService(pollingConsumer); 150 } catch (Throwable ex) { 151 if (ex instanceof RuntimeCamelException) { 152 throw (RuntimeCamelException)ex; 153 } else { 154 throw new RuntimeCamelException(ex); 155 } 156 } 157 } 158 } 159 } 160 161 public PollingConsumer getConsumer(Endpoint endpoint) { 162 return doGetPollingConsumer(endpoint, true); 163 } 164 165 protected synchronized PollingConsumer doGetPollingConsumer(Endpoint endpoint, boolean pooled) { 166 String key = endpoint.getEndpointUri(); 167 PollingConsumer answer = consumers.get(key); 168 if (pooled && answer == null) { 169 pool.acquire(endpoint); 170 } 171 172 if (answer == null) { 173 try { 174 answer = endpoint.createPollingConsumer(); 175 answer.start(); 176 } catch (Throwable e) { 177 throw new FailedToCreateConsumerException(endpoint, e); 178 } 179 if (pooled && answer instanceof ServicePoolAware) { 180 LOG.debug("Adding to producer service pool with key: {} for producer: {}", endpoint, answer); 181 answer = pool.addAndAcquire(endpoint, answer); 182 } else { 183 boolean singleton = false; 184 if (answer instanceof IsSingleton) { 185 singleton = ((IsSingleton) answer).isSingleton(); 186 } 187 if (singleton) { 188 LOG.debug("Adding to consumer cache with key: {} for consumer: {}", endpoint, answer); 189 consumers.put(key, answer); 190 } else { 191 LOG.debug("Consumer for endpoint: {} is not singleton and thus not added to consumer cache", key); 192 } 193 } 194 } 195 196 if (answer != null) { 197 // record statistics 198 if (extendedStatistics) { 199 statistics.onHit(key); 200 } 201 } 202 203 return answer; 204 } 205 206 public Exchange receive(Endpoint endpoint) { 207 LOG.debug("<<<< {}", endpoint); 208 PollingConsumer consumer = null; 209 try { 210 consumer = acquirePollingConsumer(endpoint); 211 return consumer.receive(); 212 } finally { 213 if (consumer != null) { 214 releasePollingConsumer(endpoint, consumer); 215 } 216 } 217 } 218 219 public Exchange receive(Endpoint endpoint, long timeout) { 220 LOG.debug("<<<< {}", endpoint); 221 PollingConsumer consumer = null; 222 try { 223 consumer = acquirePollingConsumer(endpoint); 224 return consumer.receive(timeout); 225 } finally { 226 if (consumer != null) { 227 releasePollingConsumer(endpoint, consumer); 228 } 229 } 230 } 231 232 public Exchange receiveNoWait(Endpoint endpoint) { 233 LOG.debug("<<<< {}", endpoint); 234 PollingConsumer consumer = null; 235 try { 236 consumer = doGetPollingConsumer(endpoint, true); 237 return consumer.receiveNoWait(); 238 } finally { 239 if (consumer != null) { 240 releasePollingConsumer(endpoint, consumer); 241 } 242 } 243 } 244 245 public CamelContext getCamelContext() { 246 return camelContext; 247 } 248 249 /** 250 * Gets the source which uses this cache 251 * 252 * @return the source 253 */ 254 public Object getSource() { 255 return source; 256 } 257 258 /** 259 * Returns the current size of the cache 260 * 261 * @return the current size 262 */ 263 public int size() { 264 int size = consumers.size(); 265 LOG.trace("size = {}", size); 266 return size; 267 } 268 269 /** 270 * Gets the maximum cache size (capacity). 271 * <p/> 272 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 273 * 274 * @return the capacity 275 */ 276 public int getCapacity() { 277 int capacity = -1; 278 if (consumers instanceof LRUCache) { 279 LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers; 280 capacity = cache.getMaxCacheSize(); 281 } 282 return capacity; 283 } 284 285 /** 286 * Gets the cache hits statistic 287 * <p/> 288 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 289 * 290 * @return the hits 291 */ 292 public long getHits() { 293 long hits = -1; 294 if (consumers instanceof LRUCache) { 295 LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers; 296 hits = cache.getHits(); 297 } 298 return hits; 299 } 300 301 /** 302 * Gets the cache misses statistic 303 * <p/> 304 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 305 * 306 * @return the misses 307 */ 308 public long getMisses() { 309 long misses = -1; 310 if (consumers instanceof LRUCache) { 311 LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers; 312 misses = cache.getMisses(); 313 } 314 return misses; 315 } 316 317 /** 318 * Gets the cache evicted statistic 319 * <p/> 320 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 321 * 322 * @return the evicted 323 */ 324 public long getEvicted() { 325 long evicted = -1; 326 if (consumers instanceof LRUCache) { 327 LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers; 328 evicted = cache.getEvicted(); 329 } 330 return evicted; 331 } 332 333 /** 334 * Resets the cache statistics 335 */ 336 public void resetCacheStatistics() { 337 if (consumers instanceof LRUCache) { 338 LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers; 339 cache.resetStatistics(); 340 } 341 if (statistics != null) { 342 statistics.clear(); 343 } 344 } 345 346 /** 347 * Purges this cache 348 */ 349 public synchronized void purge() { 350 consumers.clear(); 351 if (statistics != null) { 352 statistics.clear(); 353 } 354 } 355 356 /** 357 * Cleanup the cache (purging stale entries) 358 */ 359 public void cleanUp() { 360 if (consumers instanceof LRUCache) { 361 LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers; 362 cache.cleanUp(); 363 } 364 } 365 366 public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { 367 return statistics; 368 } 369 370 @Override 371 public String toString() { 372 return "ConsumerCache for source: " + source + ", capacity: " + getCapacity(); 373 } 374 375 protected void doStart() throws Exception { 376 if (extendedStatistics) { 377 int max = maxCacheSize == 0 ? CamelContextHelper.getMaximumCachePoolSize(camelContext) : maxCacheSize; 378 statistics = new DefaultEndpointUtilizationStatistics(max); 379 } 380 381 ServiceHelper.startServices(consumers.values()); 382 } 383 384 protected void doStop() throws Exception { 385 // when stopping we intend to shutdown 386 ServiceHelper.stopAndShutdownServices(statistics, pool); 387 try { 388 ServiceHelper.stopAndShutdownServices(consumers.values()); 389 } finally { 390 // ensure consumers are removed, and also from JMX 391 for (PollingConsumer consumer : consumers.values()) { 392 getCamelContext().removeService(consumer); 393 } 394 } 395 consumers.clear(); 396 if (statistics != null) { 397 statistics.clear(); 398 } 399 } 400 401}