001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.Map; 020 021import org.apache.camel.CamelContext; 022import org.apache.camel.Endpoint; 023import org.apache.camel.Exchange; 024import org.apache.camel.FailedToCreateConsumerException; 025import org.apache.camel.IsSingleton; 026import org.apache.camel.PollingConsumer; 027import org.apache.camel.RuntimeCamelException; 028import org.apache.camel.ServicePoolAware; 029import org.apache.camel.spi.EndpointUtilizationStatistics; 030import org.apache.camel.spi.ServicePool; 031import org.apache.camel.support.ServiceSupport; 032import org.apache.camel.util.CamelContextHelper; 033import org.apache.camel.util.LRUCache; 034import org.apache.camel.util.ServiceHelper; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * Cache containing created {@link org.apache.camel.Consumer}. 040 * 041 * @version 042 */ 043public class ConsumerCache extends ServiceSupport { 044 private static final Logger LOG = LoggerFactory.getLogger(ConsumerCache.class); 045 046 private final CamelContext camelContext; 047 private final ServicePool<Endpoint, PollingConsumer> pool; 048 private final Map<String, PollingConsumer> consumers; 049 private final Object source; 050 051 private EndpointUtilizationStatistics statistics; 052 private boolean extendedStatistics; 053 private int maxCacheSize; 054 055 public ConsumerCache(Object source, CamelContext camelContext) { 056 this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext)); 057 } 058 059 public ConsumerCache(Object source, CamelContext camelContext, int cacheSize) { 060 this(source, camelContext, createLRUCache(cacheSize)); 061 } 062 063 public ConsumerCache(Object source, CamelContext camelContext, Map<String, PollingConsumer> cache) { 064 this(source, camelContext, cache, camelContext.getPollingConsumerServicePool()); 065 } 066 067 public ConsumerCache(Object source, CamelContext camelContext, Map<String, PollingConsumer> cache, ServicePool<Endpoint, PollingConsumer> pool) { 068 this.camelContext = camelContext; 069 this.consumers = cache; 070 this.source = source; 071 this.pool = pool; 072 if (consumers instanceof LRUCache) { 073 maxCacheSize = ((LRUCache) consumers).getMaxCacheSize(); 074 } 075 076 // only if JMX is enabled 077 if (camelContext.getManagementStrategy().getManagementAgent() != null) { 078 this.extendedStatistics = camelContext.getManagementStrategy().getManagementAgent().getStatisticsLevel().isExtended(); 079 } else { 080 this.extendedStatistics = false; 081 } 082 } 083 084 public boolean isExtendedStatistics() { 085 return extendedStatistics; 086 } 087 088 /** 089 * Whether extended JMX statistics is enabled for {@link org.apache.camel.spi.EndpointUtilizationStatistics} 090 */ 091 public void setExtendedStatistics(boolean extendedStatistics) { 092 this.extendedStatistics = extendedStatistics; 093 } 094 095 /** 096 * Creates the {@link LRUCache} to be used. 097 * <p/> 098 * This implementation returns a {@link LRUCache} instance. 099 100 * @param cacheSize the cache size 101 * @return the cache 102 */ 103 protected static LRUCache<String, PollingConsumer> createLRUCache(int cacheSize) { 104 // Use a regular cache as we want to ensure that the lifecycle of the consumers 105 // being cache is properly handled, such as they are stopped when being evicted 106 // or when this cache is stopped. This is needed as some consumers requires to 107 // be stopped so they can shutdown internal resources that otherwise may cause leaks 108 return new LRUCache<String, PollingConsumer>(cacheSize); 109 } 110 111 /** 112 * Acquires a pooled PollingConsumer which you <b>must</b> release back again after usage using the 113 * {@link #releasePollingConsumer(org.apache.camel.Endpoint, org.apache.camel.PollingConsumer)} method. 114 * 115 * @param endpoint the endpoint 116 * @return the PollingConsumer 117 */ 118 public PollingConsumer acquirePollingConsumer(Endpoint endpoint) { 119 return doGetPollingConsumer(endpoint, true); 120 } 121 122 /** 123 * Releases an acquired producer back after usage. 124 * 125 * @param endpoint the endpoint 126 * @param pollingConsumer the pollingConsumer to release 127 */ 128 public void releasePollingConsumer(Endpoint endpoint, PollingConsumer pollingConsumer) { 129 if (pollingConsumer instanceof ServicePoolAware) { 130 // release back to the pool 131 pool.release(endpoint, pollingConsumer); 132 } else { 133 boolean singleton = false; 134 if (pollingConsumer instanceof IsSingleton) { 135 singleton = ((IsSingleton) pollingConsumer).isSingleton(); 136 } 137 if (!singleton) { 138 try { 139 // stop and shutdown non-singleton producers as we should not leak resources 140 ServiceHelper.stopAndShutdownService(pollingConsumer); 141 } catch (Exception ex) { 142 if (ex instanceof RuntimeCamelException) { 143 throw (RuntimeCamelException)ex; 144 } else { 145 throw new RuntimeCamelException(ex); 146 } 147 } 148 } 149 } 150 } 151 152 public PollingConsumer getConsumer(Endpoint endpoint) { 153 return doGetPollingConsumer(endpoint, true); 154 } 155 156 protected synchronized PollingConsumer doGetPollingConsumer(Endpoint endpoint, boolean pooled) { 157 String key = endpoint.getEndpointUri(); 158 PollingConsumer answer = consumers.get(key); 159 if (pooled && answer == null) { 160 pool.acquire(endpoint); 161 } 162 163 if (answer == null) { 164 try { 165 answer = endpoint.createPollingConsumer(); 166 answer.start(); 167 } catch (Exception e) { 168 throw new FailedToCreateConsumerException(endpoint, e); 169 } 170 if (pooled && answer instanceof ServicePoolAware) { 171 LOG.debug("Adding to producer service pool with key: {} for producer: {}", endpoint, answer); 172 answer = pool.addAndAcquire(endpoint, answer); 173 } else { 174 boolean singleton = false; 175 if (answer instanceof IsSingleton) { 176 singleton = ((IsSingleton) answer).isSingleton(); 177 } 178 if (singleton) { 179 LOG.debug("Adding to consumer cache with key: {} for consumer: {}", endpoint, answer); 180 consumers.put(key, answer); 181 } else { 182 LOG.debug("Consumer for endpoint: {} is not singleton and thus not added to consumer cache", key); 183 } 184 } 185 } 186 187 if (answer != null) { 188 // record statistics 189 if (extendedStatistics) { 190 statistics.onHit(key); 191 } 192 } 193 194 return answer; 195 } 196 197 public Exchange receive(Endpoint endpoint) { 198 LOG.debug("<<<< {}", endpoint); 199 PollingConsumer consumer = null; 200 try { 201 consumer = acquirePollingConsumer(endpoint); 202 return consumer.receive(); 203 } finally { 204 if (consumer != null) { 205 releasePollingConsumer(endpoint, consumer); 206 } 207 } 208 } 209 210 public Exchange receive(Endpoint endpoint, long timeout) { 211 LOG.debug("<<<< {}", endpoint); 212 PollingConsumer consumer = null; 213 try { 214 consumer = acquirePollingConsumer(endpoint); 215 return consumer.receive(timeout); 216 } finally { 217 if (consumer != null) { 218 releasePollingConsumer(endpoint, consumer); 219 } 220 } 221 } 222 223 public Exchange receiveNoWait(Endpoint endpoint) { 224 LOG.debug("<<<< {}", endpoint); 225 PollingConsumer consumer = null; 226 try { 227 consumer = doGetPollingConsumer(endpoint, true); 228 return consumer.receiveNoWait(); 229 } finally { 230 if (consumer != null) { 231 releasePollingConsumer(endpoint, consumer); 232 } 233 } 234 } 235 236 public CamelContext getCamelContext() { 237 return camelContext; 238 } 239 240 /** 241 * Gets the source which uses this cache 242 * 243 * @return the source 244 */ 245 public Object getSource() { 246 return source; 247 } 248 249 /** 250 * Returns the current size of the cache 251 * 252 * @return the current size 253 */ 254 public int size() { 255 int size = consumers.size(); 256 LOG.trace("size = {}", size); 257 return size; 258 } 259 260 /** 261 * Gets the maximum cache size (capacity). 262 * <p/> 263 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 264 * 265 * @return the capacity 266 */ 267 public int getCapacity() { 268 int capacity = -1; 269 if (consumers instanceof LRUCache) { 270 LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers; 271 capacity = cache.getMaxCacheSize(); 272 } 273 return capacity; 274 } 275 276 /** 277 * Gets the cache hits statistic 278 * <p/> 279 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 280 * 281 * @return the hits 282 */ 283 public long getHits() { 284 long hits = -1; 285 if (consumers instanceof LRUCache) { 286 LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers; 287 hits = cache.getHits(); 288 } 289 return hits; 290 } 291 292 /** 293 * Gets the cache misses statistic 294 * <p/> 295 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 296 * 297 * @return the misses 298 */ 299 public long getMisses() { 300 long misses = -1; 301 if (consumers instanceof LRUCache) { 302 LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers; 303 misses = cache.getMisses(); 304 } 305 return misses; 306 } 307 308 /** 309 * Gets the cache evicted statistic 310 * <p/> 311 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 312 * 313 * @return the evicted 314 */ 315 public long getEvicted() { 316 long evicted = -1; 317 if (consumers instanceof LRUCache) { 318 LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers; 319 evicted = cache.getEvicted(); 320 } 321 return evicted; 322 } 323 324 /** 325 * Resets the cache statistics 326 */ 327 public void resetCacheStatistics() { 328 if (consumers instanceof LRUCache) { 329 LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers; 330 cache.resetStatistics(); 331 } 332 if (statistics != null) { 333 statistics.clear(); 334 } 335 } 336 337 /** 338 * Purges this cache 339 */ 340 public synchronized void purge() { 341 consumers.clear(); 342 if (statistics != null) { 343 statistics.clear(); 344 } 345 } 346 347 /** 348 * Cleanup the cache (purging stale entries) 349 */ 350 public void cleanUp() { 351 if (consumers instanceof LRUCache) { 352 LRUCache<String, PollingConsumer> cache = (LRUCache<String, PollingConsumer>)consumers; 353 cache.cleanUp(); 354 } 355 } 356 357 public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { 358 return statistics; 359 } 360 361 @Override 362 public String toString() { 363 return "ConsumerCache for source: " + source + ", capacity: " + getCapacity(); 364 } 365 366 protected void doStart() throws Exception { 367 if (extendedStatistics) { 368 int max = maxCacheSize == 0 ? CamelContextHelper.getMaximumCachePoolSize(camelContext) : maxCacheSize; 369 statistics = new DefaultEndpointUtilizationStatistics(max); 370 } 371 372 ServiceHelper.startServices(consumers.values()); 373 } 374 375 protected void doStop() throws Exception { 376 // when stopping we intend to shutdown 377 ServiceHelper.stopAndShutdownServices(statistics, pool); 378 try { 379 ServiceHelper.stopAndShutdownServices(consumers.values()); 380 } finally { 381 // ensure consumers are removed, and also from JMX 382 for (PollingConsumer consumer : consumers.values()) { 383 getCamelContext().removeService(consumer); 384 } 385 } 386 consumers.clear(); 387 if (statistics != null) { 388 statistics.clear(); 389 } 390 } 391 392}