001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.Map; 020 021 import org.apache.camel.AsyncCallback; 022 import org.apache.camel.AsyncProcessor; 023 import org.apache.camel.AsyncProducerCallback; 024 import org.apache.camel.CamelContext; 025 import org.apache.camel.Endpoint; 026 import org.apache.camel.Exchange; 027 import org.apache.camel.ExchangePattern; 028 import org.apache.camel.FailedToCreateProducerException; 029 import org.apache.camel.Processor; 030 import org.apache.camel.Producer; 031 import org.apache.camel.ProducerCallback; 032 import org.apache.camel.ServicePoolAware; 033 import org.apache.camel.processor.UnitOfWorkProducer; 034 import org.apache.camel.spi.ServicePool; 035 import org.apache.camel.support.ServiceSupport; 036 import org.apache.camel.util.AsyncProcessorConverterHelper; 037 import org.apache.camel.util.CamelContextHelper; 038 import org.apache.camel.util.EventHelper; 039 import org.apache.camel.util.LRUCache; 040 import org.apache.camel.util.ServiceHelper; 041 import org.apache.camel.util.StopWatch; 042 import org.slf4j.Logger; 043 import org.slf4j.LoggerFactory; 044 045 /** 046 * Cache containing created {@link Producer}. 047 * 048 * @version 049 */ 050 public class ProducerCache extends ServiceSupport { 051 private static final transient Logger LOG = LoggerFactory.getLogger(ProducerCache.class); 052 053 private final CamelContext camelContext; 054 private final ServicePool<Endpoint, Producer> pool; 055 private final Map<String, Producer> producers; 056 private final Object source; 057 058 public ProducerCache(Object source, CamelContext camelContext) { 059 this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext)); 060 } 061 062 public ProducerCache(Object source, CamelContext camelContext, int cacheSize) { 063 this(source, camelContext, camelContext.getProducerServicePool(), createLRUCache(cacheSize)); 064 } 065 066 public ProducerCache(Object source, CamelContext camelContext, Map<String, Producer> cache) { 067 this(source, camelContext, camelContext.getProducerServicePool(), cache); 068 } 069 070 public ProducerCache(Object source, CamelContext camelContext, ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) { 071 this.source = source; 072 this.camelContext = camelContext; 073 this.pool = producerServicePool; 074 this.producers = cache; 075 } 076 077 /** 078 * Creates the {@link LRUCache} to be used. 079 * <p/> 080 * This implementation returns a {@link LRUCache} instance. 081 082 * @param cacheSize the cache size 083 * @return the cache 084 */ 085 protected static LRUCache<String, Producer> createLRUCache(int cacheSize) { 086 // Use a regular cache as we want to ensure that the lifecycle of the producers 087 // being cache is properly handled, such as they are stopped when being evicted 088 // or when this cache is stopped. This is needed as some producers requires to 089 // be stopped so they can shutdown internal resources that otherwise may cause leaks 090 return new LRUCache<String, Producer>(cacheSize); 091 } 092 093 public CamelContext getCamelContext() { 094 return camelContext; 095 } 096 097 /** 098 * Gets the source which uses this cache 099 * 100 * @return the source 101 */ 102 public Object getSource() { 103 return source; 104 } 105 106 /** 107 * Acquires a pooled producer which you <b>must</b> release back again after usage using the 108 * {@link #releaseProducer(org.apache.camel.Endpoint, org.apache.camel.Producer)} method. 109 * 110 * @param endpoint the endpoint 111 * @return the producer 112 */ 113 public Producer acquireProducer(Endpoint endpoint) { 114 return doGetProducer(endpoint, true); 115 } 116 117 /** 118 * Releases an acquired producer back after usage. 119 * 120 * @param endpoint the endpoint 121 * @param producer the producer to release 122 * @throws Exception can be thrown if error stopping producer if that was needed. 123 */ 124 public void releaseProducer(Endpoint endpoint, Producer producer) throws Exception { 125 if (producer instanceof ServicePoolAware) { 126 // release back to the pool 127 pool.release(endpoint, producer); 128 } else if (!producer.isSingleton()) { 129 // stop non singleton producers as we should not leak resources 130 producer.stop(); 131 } 132 } 133 134 /** 135 * Starts the {@link Producer} to be used for sending to the given endpoint 136 * <p/> 137 * This can be used to early start the {@link Producer} to ensure it can be created, 138 * such as when Camel is started. This allows to fail fast in case the {@link Producer} 139 * could not be started. 140 * 141 * @param endpoint the endpoint to send the exchange to 142 * @throws Exception is thrown if failed to create or start the {@link Producer} 143 */ 144 public void startProducer(Endpoint endpoint) throws Exception { 145 Producer producer = acquireProducer(endpoint); 146 releaseProducer(endpoint, producer); 147 } 148 149 /** 150 * Sends the exchange to the given endpoint. 151 * <p> 152 * This method will <b>not</b> throw an exception. If processing of the given 153 * Exchange failed then the exception is stored on the provided Exchange 154 * 155 * @param endpoint the endpoint to send the exchange to 156 * @param exchange the exchange to send 157 */ 158 public void send(Endpoint endpoint, Exchange exchange) { 159 sendExchange(endpoint, null, null, exchange); 160 } 161 162 /** 163 * Sends an exchange to an endpoint using a supplied 164 * {@link Processor} to populate the exchange 165 * <p> 166 * This method will <b>not</b> throw an exception. If processing of the given 167 * Exchange failed then the exception is stored on the return Exchange 168 * 169 * @param endpoint the endpoint to send the exchange to 170 * @param processor the transformer used to populate the new exchange 171 * @throws org.apache.camel.CamelExecutionException is thrown if sending failed 172 * @return the exchange 173 */ 174 public Exchange send(Endpoint endpoint, Processor processor) { 175 return sendExchange(endpoint, null, processor, null); 176 } 177 178 /** 179 * Sends an exchange to an endpoint using a supplied 180 * {@link Processor} to populate the exchange 181 * <p> 182 * This method will <b>not</b> throw an exception. If processing of the given 183 * Exchange failed then the exception is stored on the return Exchange 184 * 185 * @param endpoint the endpoint to send the exchange to 186 * @param pattern the message {@link ExchangePattern} such as 187 * {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} 188 * @param processor the transformer used to populate the new exchange 189 * @return the exchange 190 */ 191 public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) { 192 return sendExchange(endpoint, pattern, processor, null); 193 } 194 195 /** 196 * Sends an exchange to an endpoint using a supplied callback, using the synchronous processing. 197 * <p/> 198 * If an exception was thrown during processing, it would be set on the given Exchange 199 * 200 * @param endpoint the endpoint to send the exchange to 201 * @param exchange the exchange, can be <tt>null</tt> if so then create a new exchange from the producer 202 * @param pattern the exchange pattern, can be <tt>null</tt> 203 * @param callback the callback 204 * @return the response from the callback 205 * @see #doInAsyncProducer(org.apache.camel.Endpoint, org.apache.camel.Exchange, org.apache.camel.ExchangePattern, org.apache.camel.AsyncCallback, org.apache.camel.AsyncProducerCallback) 206 */ 207 public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, ProducerCallback<T> callback) { 208 T answer = null; 209 210 // get the producer and we do not mind if its pooled as we can handle returning it back to the pool 211 Producer producer = doGetProducer(endpoint, true); 212 213 if (producer == null) { 214 if (isStopped()) { 215 LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange); 216 return null; 217 } else { 218 throw new IllegalStateException("No producer, this processor has not been started: " + this); 219 } 220 } 221 222 StopWatch watch = null; 223 if (exchange != null) { 224 // record timing for sending the exchange using the producer 225 watch = new StopWatch(); 226 } 227 228 try { 229 if (exchange != null) { 230 EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); 231 } 232 // invoke the callback 233 answer = callback.doInProducer(producer, exchange, pattern); 234 } catch (Throwable e) { 235 if (exchange != null) { 236 exchange.setException(e); 237 } 238 } finally { 239 if (exchange != null) { 240 long timeTaken = watch.stop(); 241 // emit event that the exchange was sent to the endpoint 242 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 243 } 244 if (producer instanceof ServicePoolAware) { 245 // release back to the pool 246 pool.release(endpoint, producer); 247 } else if (!producer.isSingleton()) { 248 // stop non singleton producers as we should not leak resources 249 try { 250 ServiceHelper.stopService(producer); 251 } catch (Exception e) { 252 // ignore and continue 253 LOG.warn("Error stopping producer: " + producer, e); 254 } 255 } 256 } 257 258 return answer; 259 } 260 261 /** 262 * Sends an exchange to an endpoint using a supplied callback supporting the asynchronous routing engine. 263 * <p/> 264 * If an exception was thrown during processing, it would be set on the given Exchange 265 * 266 * @param endpoint the endpoint to send the exchange to 267 * @param exchange the exchange, can be <tt>null</tt> if so then create a new exchange from the producer 268 * @param pattern the exchange pattern, can be <tt>null</tt> 269 * @param callback the asynchronous callback 270 * @param producerCallback the producer template callback to be executed 271 * @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously 272 */ 273 public boolean doInAsyncProducer(final Endpoint endpoint, final Exchange exchange, final ExchangePattern pattern, 274 final AsyncCallback callback, final AsyncProducerCallback producerCallback) { 275 boolean sync = true; 276 277 // get the producer and we do not mind if its pooled as we can handle returning it back to the pool 278 final Producer producer = doGetProducer(endpoint, true); 279 280 if (producer == null) { 281 if (isStopped()) { 282 LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange); 283 return false; 284 } else { 285 throw new IllegalStateException("No producer, this processor has not been started: " + this); 286 } 287 } 288 289 // record timing for sending the exchange using the producer 290 final StopWatch watch = exchange != null ? new StopWatch() : null; 291 292 try { 293 if (exchange != null) { 294 EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); 295 } 296 // invoke the callback 297 AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(producer); 298 sync = producerCallback.doInAsyncProducer(producer, asyncProcessor, exchange, pattern, new AsyncCallback() { 299 @Override 300 public void done(boolean doneSync) { 301 try { 302 if (watch != null) { 303 long timeTaken = watch.stop(); 304 // emit event that the exchange was sent to the endpoint 305 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 306 } 307 308 if (producer instanceof ServicePoolAware) { 309 // release back to the pool 310 pool.release(endpoint, producer); 311 } else if (!producer.isSingleton()) { 312 // stop non singleton producers as we should not leak resources 313 try { 314 ServiceHelper.stopService(producer); 315 } catch (Exception e) { 316 // ignore and continue 317 LOG.warn("Error stopping producer: " + producer, e); 318 } 319 } 320 } finally { 321 callback.done(doneSync); 322 } 323 } 324 }); 325 } catch (Throwable e) { 326 // ensure exceptions is caught and set on the exchange 327 if (exchange != null) { 328 exchange.setException(e); 329 } 330 } 331 332 return sync; 333 } 334 335 protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern, 336 final Processor processor, Exchange exchange) { 337 return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() { 338 public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) { 339 if (exchange == null) { 340 exchange = pattern != null ? producer.createExchange(pattern) : producer.createExchange(); 341 } 342 343 if (processor != null) { 344 // lets populate using the processor callback 345 try { 346 processor.process(exchange); 347 } catch (Exception e) { 348 // populate failed so return 349 exchange.setException(e); 350 return exchange; 351 } 352 } 353 354 // now lets dispatch 355 LOG.debug(">>>> {} {}", endpoint, exchange); 356 357 // set property which endpoint we send to 358 exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); 359 360 // send the exchange using the processor 361 StopWatch watch = new StopWatch(); 362 try { 363 EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); 364 // ensure we run in an unit of work 365 Producer target = new UnitOfWorkProducer(producer); 366 target.process(exchange); 367 } catch (Throwable e) { 368 // ensure exceptions is caught and set on the exchange 369 exchange.setException(e); 370 } finally { 371 // emit event that the exchange was sent to the endpoint 372 long timeTaken = watch.stop(); 373 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 374 } 375 return exchange; 376 } 377 }); 378 } 379 380 protected synchronized Producer doGetProducer(Endpoint endpoint, boolean pooled) { 381 String key = endpoint.getEndpointUri(); 382 Producer answer = producers.get(key); 383 if (pooled && answer == null) { 384 // try acquire from connection pool 385 answer = pool.acquire(endpoint); 386 } 387 388 if (answer == null) { 389 // create a new producer 390 try { 391 answer = endpoint.createProducer(); 392 // must then start service so producer is ready to be used 393 ServiceHelper.startService(answer); 394 } catch (Exception e) { 395 throw new FailedToCreateProducerException(endpoint, e); 396 } 397 398 // add producer to cache or pool if applicable 399 if (pooled && answer instanceof ServicePoolAware) { 400 LOG.debug("Adding to producer service pool with key: {} for producer: {}", endpoint, answer); 401 answer = pool.addAndAcquire(endpoint, answer); 402 } else if (answer.isSingleton()) { 403 LOG.debug("Adding to producer cache with key: {} for producer: {}", endpoint, answer); 404 producers.put(key, answer); 405 } 406 } 407 408 return answer; 409 } 410 411 protected void doStart() throws Exception { 412 ServiceHelper.startServices(producers.values()); 413 ServiceHelper.startServices(pool); 414 } 415 416 protected void doStop() throws Exception { 417 // when stopping we intend to shutdown 418 ServiceHelper.stopAndShutdownService(pool); 419 ServiceHelper.stopAndShutdownServices(producers.values()); 420 producers.clear(); 421 } 422 423 /** 424 * Returns the current size of the cache 425 * 426 * @return the current size 427 */ 428 public int size() { 429 int size = producers.size(); 430 size += pool.size(); 431 432 LOG.trace("size = {}", size); 433 return size; 434 } 435 436 /** 437 * Gets the maximum cache size (capacity). 438 * <p/> 439 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 440 * 441 * @return the capacity 442 */ 443 public int getCapacity() { 444 int capacity = -1; 445 if (producers instanceof LRUCache) { 446 LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; 447 capacity = cache.getMaxCacheSize(); 448 } 449 return capacity; 450 } 451 452 /** 453 * Gets the cache hits statistic 454 * <p/> 455 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 456 * 457 * @return the hits 458 */ 459 public long getHits() { 460 long hits = -1; 461 if (producers instanceof LRUCache) { 462 LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; 463 hits = cache.getHits(); 464 } 465 return hits; 466 } 467 468 /** 469 * Gets the cache misses statistic 470 * <p/> 471 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 472 * 473 * @return the misses 474 */ 475 public long getMisses() { 476 long misses = -1; 477 if (producers instanceof LRUCache) { 478 LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; 479 misses = cache.getMisses(); 480 } 481 return misses; 482 } 483 484 /** 485 * Gets the cache evicted statistic 486 * <p/> 487 * Will return <tt>-1</tt> if it cannot determine this if a custom cache was used. 488 * 489 * @return the evicted 490 */ 491 public long getEvicted() { 492 long evicted = -1; 493 if (producers instanceof LRUCache) { 494 LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; 495 evicted = cache.getEvicted(); 496 } 497 return evicted; 498 } 499 500 /** 501 * Resets the cache statistics 502 */ 503 public void resetCacheStatistics() { 504 if (producers instanceof LRUCache) { 505 LRUCache<String, Producer> cache = (LRUCache<String, Producer>)producers; 506 cache.resetStatistics(); 507 } 508 } 509 510 /** 511 * Purges this cache 512 */ 513 public synchronized void purge() { 514 producers.clear(); 515 pool.purge(); 516 } 517 518 @Override 519 public String toString() { 520 return "ProducerCache for source: " + source + ", capacity: " + getCapacity(); 521 } 522 }