001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.Map; 020 import java.util.concurrent.Callable; 021 import java.util.concurrent.ExecutorService; 022 import java.util.concurrent.Future; 023 import java.util.concurrent.TimeUnit; 024 import java.util.concurrent.TimeoutException; 025 026 import org.apache.camel.CamelContext; 027 import org.apache.camel.CamelExecutionException; 028 import org.apache.camel.Endpoint; 029 import org.apache.camel.Exchange; 030 import org.apache.camel.ExchangePattern; 031 import org.apache.camel.Message; 032 import org.apache.camel.NoSuchEndpointException; 033 import org.apache.camel.Processor; 034 import org.apache.camel.ProducerTemplate; 035 import org.apache.camel.spi.Synchronization; 036 import org.apache.camel.support.ServiceSupport; 037 import org.apache.camel.util.CamelContextHelper; 038 import org.apache.camel.util.ExchangeHelper; 039 import org.apache.camel.util.ObjectHelper; 040 import org.apache.camel.util.ServiceHelper; 041 042 /** 043 * Template (named like Spring's TransactionTemplate & JmsTemplate 044 * et al) for working with Camel and sending {@link Message} instances in an 045 * {@link Exchange} to an {@link Endpoint}. 046 * 047 * @version 048 */ 049 public class DefaultProducerTemplate extends ServiceSupport implements ProducerTemplate { 050 private final CamelContext camelContext; 051 private volatile ProducerCache producerCache; 052 private volatile ExecutorService executor; 053 private Endpoint defaultEndpoint; 054 private int maximumCacheSize; 055 056 public DefaultProducerTemplate(CamelContext camelContext) { 057 this.camelContext = camelContext; 058 } 059 060 public DefaultProducerTemplate(CamelContext camelContext, ExecutorService executor) { 061 this.camelContext = camelContext; 062 this.executor = executor; 063 } 064 065 public DefaultProducerTemplate(CamelContext camelContext, Endpoint defaultEndpoint) { 066 this(camelContext); 067 this.defaultEndpoint = defaultEndpoint; 068 } 069 070 public static DefaultProducerTemplate newInstance(CamelContext camelContext, String defaultEndpointUri) { 071 Endpoint endpoint = CamelContextHelper.getMandatoryEndpoint(camelContext, defaultEndpointUri); 072 return new DefaultProducerTemplate(camelContext, endpoint); 073 } 074 075 public int getMaximumCacheSize() { 076 return maximumCacheSize; 077 } 078 079 public void setMaximumCacheSize(int maximumCacheSize) { 080 this.maximumCacheSize = maximumCacheSize; 081 } 082 083 public int getCurrentCacheSize() { 084 if (producerCache == null) { 085 return 0; 086 } 087 return producerCache.size(); 088 } 089 090 public Exchange send(String endpointUri, Exchange exchange) { 091 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 092 return send(endpoint, exchange); 093 } 094 095 public Exchange send(String endpointUri, Processor processor) { 096 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 097 return send(endpoint, processor); 098 } 099 100 public Exchange send(String endpointUri, ExchangePattern pattern, Processor processor) { 101 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 102 return send(endpoint, pattern, processor); 103 } 104 105 public Exchange send(Endpoint endpoint, Exchange exchange) { 106 getProducerCache().send(endpoint, exchange); 107 return exchange; 108 } 109 110 public Exchange send(Endpoint endpoint, Processor processor) { 111 return getProducerCache().send(endpoint, processor); 112 } 113 114 public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) { 115 return getProducerCache().send(endpoint, pattern, processor); 116 } 117 118 public Object sendBody(Endpoint endpoint, ExchangePattern pattern, Object body) { 119 Exchange result = send(endpoint, pattern, createSetBodyProcessor(body)); 120 return extractResultBody(result, pattern); 121 } 122 123 public void sendBody(Endpoint endpoint, Object body) throws CamelExecutionException { 124 Exchange result = send(endpoint, createSetBodyProcessor(body)); 125 // must invoke extract result body in case of exception to be rethrown 126 extractResultBody(result); 127 } 128 129 public void sendBody(String endpointUri, Object body) throws CamelExecutionException { 130 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 131 sendBody(endpoint, body); 132 } 133 134 public Object sendBody(String endpointUri, ExchangePattern pattern, Object body) throws CamelExecutionException { 135 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 136 Object result = sendBody(endpoint, pattern, body); 137 if (pattern.isOutCapable()) { 138 return result; 139 } else { 140 // return null if not OUT capable 141 return null; 142 } 143 } 144 145 public void sendBodyAndHeader(String endpointUri, final Object body, final String header, final Object headerValue) throws CamelExecutionException { 146 sendBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue); 147 } 148 149 public void sendBodyAndHeader(Endpoint endpoint, final Object body, final String header, final Object headerValue) throws CamelExecutionException { 150 Exchange result = send(endpoint, createBodyAndHeaderProcessor(body, header, headerValue)); 151 // must invoke extract result body in case of exception to be rethrown 152 extractResultBody(result); 153 } 154 155 public Object sendBodyAndHeader(Endpoint endpoint, ExchangePattern pattern, final Object body, 156 final String header, final Object headerValue) throws CamelExecutionException { 157 Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue)); 158 Object result = extractResultBody(exchange, pattern); 159 if (pattern.isOutCapable()) { 160 return result; 161 } else { 162 // return null if not OUT capable 163 return null; 164 } 165 } 166 167 public Object sendBodyAndHeader(String endpoint, ExchangePattern pattern, final Object body, 168 final String header, final Object headerValue) throws CamelExecutionException { 169 Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue)); 170 Object result = extractResultBody(exchange, pattern); 171 if (pattern.isOutCapable()) { 172 return result; 173 } else { 174 // return null if not OUT capable 175 return null; 176 } 177 } 178 179 public void sendBodyAndProperty(String endpointUri, final Object body, 180 final String property, final Object propertyValue) throws CamelExecutionException { 181 sendBodyAndProperty(resolveMandatoryEndpoint(endpointUri), body, property, propertyValue); 182 } 183 184 public void sendBodyAndProperty(Endpoint endpoint, final Object body, 185 final String property, final Object propertyValue) throws CamelExecutionException { 186 Exchange result = send(endpoint, createBodyAndPropertyProcessor(body, property, propertyValue)); 187 // must invoke extract result body in case of exception to be rethrown 188 extractResultBody(result); 189 } 190 191 public Object sendBodyAndProperty(Endpoint endpoint, ExchangePattern pattern, final Object body, 192 final String property, final Object propertyValue) throws CamelExecutionException { 193 Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue)); 194 Object result = extractResultBody(exchange, pattern); 195 if (pattern.isOutCapable()) { 196 return result; 197 } else { 198 // return null if not OUT capable 199 return null; 200 } 201 } 202 203 public Object sendBodyAndProperty(String endpoint, ExchangePattern pattern, final Object body, 204 final String property, final Object propertyValue) throws CamelExecutionException { 205 Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue)); 206 Object result = extractResultBody(exchange, pattern); 207 if (pattern.isOutCapable()) { 208 return result; 209 } else { 210 // return null if not OUT capable 211 return null; 212 } 213 } 214 215 public void sendBodyAndHeaders(String endpointUri, final Object body, final Map<String, Object> headers) throws CamelExecutionException { 216 sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers); 217 } 218 219 public void sendBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) throws CamelExecutionException { 220 Exchange result = send(endpoint, new Processor() { 221 public void process(Exchange exchange) { 222 Message in = exchange.getIn(); 223 for (Map.Entry<String, Object> header : headers.entrySet()) { 224 in.setHeader(header.getKey(), header.getValue()); 225 } 226 in.setBody(body); 227 } 228 }); 229 // must invoke extract result body in case of exception to be rethrown 230 extractResultBody(result); 231 } 232 233 public Object sendBodyAndHeaders(String endpointUri, ExchangePattern pattern, Object body, Map<String, Object> headers) throws CamelExecutionException { 234 return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), pattern, body, headers); 235 } 236 237 public Object sendBodyAndHeaders(Endpoint endpoint, ExchangePattern pattern, final Object body, final Map<String, Object> headers) throws CamelExecutionException { 238 Exchange exchange = send(endpoint, pattern, new Processor() { 239 public void process(Exchange exchange) throws Exception { 240 Message in = exchange.getIn(); 241 for (Map.Entry<String, Object> header : headers.entrySet()) { 242 in.setHeader(header.getKey(), header.getValue()); 243 } 244 in.setBody(body); 245 } 246 }); 247 Object result = extractResultBody(exchange, pattern); 248 if (pattern.isOutCapable()) { 249 return result; 250 } else { 251 // return null if not OUT capable 252 return null; 253 } 254 } 255 256 // Methods using an InOut ExchangePattern 257 // ----------------------------------------------------------------------- 258 259 public Exchange request(Endpoint endpoint, Processor processor) { 260 return send(endpoint, ExchangePattern.InOut, processor); 261 } 262 263 public Object requestBody(Object body) throws CamelExecutionException { 264 return sendBody(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, body); 265 } 266 267 public Object requestBody(Endpoint endpoint, Object body) throws CamelExecutionException { 268 return sendBody(endpoint, ExchangePattern.InOut, body); 269 } 270 271 public Object requestBodyAndHeader(Object body, String header, Object headerValue) throws CamelExecutionException { 272 return sendBodyAndHeader(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, body, header, headerValue); 273 } 274 275 public Object requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue) throws CamelExecutionException { 276 return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue); 277 } 278 279 public Exchange request(String endpoint, Processor processor) throws CamelExecutionException { 280 return send(endpoint, ExchangePattern.InOut, processor); 281 } 282 283 public Object requestBody(String endpoint, Object body) throws CamelExecutionException { 284 return sendBody(endpoint, ExchangePattern.InOut, body); 285 } 286 287 public Object requestBodyAndHeader(String endpoint, Object body, String header, Object headerValue) throws CamelExecutionException { 288 return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue); 289 } 290 291 public Object requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers) { 292 return requestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers); 293 } 294 295 public Object requestBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) { 296 return sendBodyAndHeaders(endpoint, ExchangePattern.InOut, body, headers); 297 } 298 299 public Object requestBodyAndHeaders(final Object body, final Map<String, Object> headers) { 300 return sendBodyAndHeaders(getDefaultEndpoint(), ExchangePattern.InOut, body, headers); 301 } 302 303 public <T> T requestBody(Object body, Class<T> type) { 304 Object answer = requestBody(body); 305 return camelContext.getTypeConverter().convertTo(type, answer); 306 } 307 308 public <T> T requestBody(Endpoint endpoint, Object body, Class<T> type) { 309 Object answer = requestBody(endpoint, body); 310 return camelContext.getTypeConverter().convertTo(type, answer); 311 } 312 313 public <T> T requestBody(String endpointUri, Object body, Class<T> type) { 314 Object answer = requestBody(endpointUri, body); 315 return camelContext.getTypeConverter().convertTo(type, answer); 316 } 317 318 public <T> T requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type) { 319 Object answer = requestBodyAndHeader(endpoint, body, header, headerValue); 320 return camelContext.getTypeConverter().convertTo(type, answer); 321 } 322 323 public <T> T requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type) { 324 Object answer = requestBodyAndHeader(endpointUri, body, header, headerValue); 325 return camelContext.getTypeConverter().convertTo(type, answer); 326 } 327 328 public <T> T requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type) { 329 Object answer = requestBodyAndHeaders(endpointUri, body, headers); 330 return camelContext.getTypeConverter().convertTo(type, answer); 331 } 332 333 public <T> T requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type) { 334 Object answer = requestBodyAndHeaders(endpoint, body, headers); 335 return camelContext.getTypeConverter().convertTo(type, answer); 336 } 337 338 // Methods using the default endpoint 339 // ----------------------------------------------------------------------- 340 341 public void sendBody(Object body) { 342 sendBody(getMandatoryDefaultEndpoint(), body); 343 } 344 345 public Exchange send(Exchange exchange) { 346 return send(getMandatoryDefaultEndpoint(), exchange); 347 } 348 349 public Exchange send(Processor processor) { 350 return send(getMandatoryDefaultEndpoint(), processor); 351 } 352 353 public void sendBodyAndHeader(Object body, String header, Object headerValue) { 354 sendBodyAndHeader(getMandatoryDefaultEndpoint(), body, header, headerValue); 355 } 356 357 public void sendBodyAndProperty(Object body, String property, Object propertyValue) { 358 sendBodyAndProperty(getMandatoryDefaultEndpoint(), body, property, propertyValue); 359 } 360 361 public void sendBodyAndHeaders(Object body, Map<String, Object> headers) { 362 sendBodyAndHeaders(getMandatoryDefaultEndpoint(), body, headers); 363 } 364 365 // Properties 366 // ----------------------------------------------------------------------- 367 368 /** 369 * @deprecated use {@link #getCamelContext()} 370 */ 371 @Deprecated 372 public CamelContext getContext() { 373 return getCamelContext(); 374 } 375 376 public CamelContext getCamelContext() { 377 return camelContext; 378 } 379 380 public Endpoint getDefaultEndpoint() { 381 return defaultEndpoint; 382 } 383 384 public void setDefaultEndpoint(Endpoint defaultEndpoint) { 385 this.defaultEndpoint = defaultEndpoint; 386 } 387 388 /** 389 * Sets the default endpoint to use if none is specified 390 */ 391 public void setDefaultEndpointUri(String endpointUri) { 392 setDefaultEndpoint(getCamelContext().getEndpoint(endpointUri)); 393 } 394 395 /** 396 * @deprecated use {@link CamelContext#getEndpoint(String, Class)} 397 */ 398 @Deprecated 399 public <T extends Endpoint> T getResolvedEndpoint(String endpointUri, Class<T> expectedClass) { 400 return camelContext.getEndpoint(endpointUri, expectedClass); 401 } 402 403 // Implementation methods 404 // ----------------------------------------------------------------------- 405 406 protected Processor createBodyAndHeaderProcessor(final Object body, final String header, final Object headerValue) { 407 return new Processor() { 408 public void process(Exchange exchange) { 409 Message in = exchange.getIn(); 410 in.setHeader(header, headerValue); 411 in.setBody(body); 412 } 413 }; 414 } 415 416 protected Processor createBodyAndPropertyProcessor(final Object body, final String property, final Object propertyValue) { 417 return new Processor() { 418 public void process(Exchange exchange) { 419 exchange.setProperty(property, propertyValue); 420 Message in = exchange.getIn(); 421 in.setBody(body); 422 } 423 }; 424 } 425 426 protected Processor createSetBodyProcessor(final Object body) { 427 return new Processor() { 428 public void process(Exchange exchange) { 429 Message in = exchange.getIn(); 430 in.setBody(body); 431 } 432 }; 433 } 434 435 protected Endpoint resolveMandatoryEndpoint(String endpointUri) { 436 Endpoint endpoint = camelContext.getEndpoint(endpointUri); 437 if (endpoint == null) { 438 throw new NoSuchEndpointException(endpointUri); 439 } 440 return endpoint; 441 } 442 443 protected Endpoint getMandatoryDefaultEndpoint() { 444 Endpoint answer = getDefaultEndpoint(); 445 ObjectHelper.notNull(answer, "defaultEndpoint"); 446 return answer; 447 } 448 449 protected Object extractResultBody(Exchange result) { 450 return extractResultBody(result, null); 451 } 452 453 protected Object extractResultBody(Exchange result, ExchangePattern pattern) { 454 return ExchangeHelper.extractResultBody(result, pattern); 455 } 456 457 public void setExecutorService(ExecutorService executorService) { 458 this.executor = executorService; 459 } 460 461 public Future<Exchange> asyncSend(final String uri, final Exchange exchange) { 462 return asyncSend(resolveMandatoryEndpoint(uri), exchange); 463 } 464 465 public Future<Exchange> asyncSend(final String uri, final Processor processor) { 466 return asyncSend(resolveMandatoryEndpoint(uri), processor); 467 } 468 469 public Future<Object> asyncSendBody(final String uri, final Object body) { 470 return asyncSendBody(resolveMandatoryEndpoint(uri), body); 471 } 472 473 public Future<Object> asyncRequestBody(final String uri, final Object body) { 474 return asyncRequestBody(resolveMandatoryEndpoint(uri), body); 475 } 476 477 public <T> Future<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) { 478 return asyncRequestBody(resolveMandatoryEndpoint(uri), body, type); 479 } 480 481 public Future<Object> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) { 482 return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue); 483 } 484 485 public <T> Future<T> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) { 486 return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue, type); 487 } 488 489 public Future<Object> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers) { 490 return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers); 491 } 492 493 public <T> Future<T> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) { 494 return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers, type); 495 } 496 497 public <T> T extractFutureBody(Future<Object> future, Class<T> type) { 498 return ExchangeHelper.extractFutureBody(camelContext, future, type); 499 } 500 501 public <T> T extractFutureBody(Future<Object> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException { 502 return ExchangeHelper.extractFutureBody(camelContext, future, timeout, unit, type); 503 } 504 505 public Future<Object> asyncCallbackSendBody(String uri, Object body, Synchronization onCompletion) { 506 return asyncCallbackSendBody(resolveMandatoryEndpoint(uri), body, onCompletion); 507 } 508 509 public Future<Object> asyncCallbackSendBody(Endpoint endpoint, Object body, Synchronization onCompletion) { 510 return asyncCallback(endpoint, ExchangePattern.InOnly, body, onCompletion); 511 } 512 513 public Future<Object> asyncCallbackRequestBody(String uri, Object body, Synchronization onCompletion) { 514 return asyncCallbackRequestBody(resolveMandatoryEndpoint(uri), body, onCompletion); 515 } 516 517 public Future<Object> asyncCallbackRequestBody(Endpoint endpoint, Object body, Synchronization onCompletion) { 518 return asyncCallback(endpoint, ExchangePattern.InOut, body, onCompletion); 519 } 520 521 public Future<Exchange> asyncCallback(String uri, Exchange exchange, Synchronization onCompletion) { 522 return asyncCallback(resolveMandatoryEndpoint(uri), exchange, onCompletion); 523 } 524 525 public Future<Exchange> asyncCallback(String uri, Processor processor, Synchronization onCompletion) { 526 return asyncCallback(resolveMandatoryEndpoint(uri), processor, onCompletion); 527 } 528 529 public Future<Object> asyncRequestBody(final Endpoint endpoint, final Object body) { 530 Callable<Object> task = new Callable<Object>() { 531 public Object call() throws Exception { 532 return requestBody(endpoint, body); 533 } 534 }; 535 return getExecutorService().submit(task); 536 } 537 538 public <T> Future<T> asyncRequestBody(final Endpoint endpoint, final Object body, final Class<T> type) { 539 Callable<T> task = new Callable<T>() { 540 public T call() throws Exception { 541 return requestBody(endpoint, body, type); 542 } 543 }; 544 return getExecutorService().submit(task); 545 } 546 547 public Future<Object> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header, 548 final Object headerValue) { 549 Callable<Object> task = new Callable<Object>() { 550 public Object call() throws Exception { 551 return requestBodyAndHeader(endpoint, body, header, headerValue); 552 } 553 }; 554 return getExecutorService().submit(task); 555 } 556 557 public <T> Future<T> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header, 558 final Object headerValue, final Class<T> type) { 559 Callable<T> task = new Callable<T>() { 560 public T call() throws Exception { 561 return requestBodyAndHeader(endpoint, body, header, headerValue, type); 562 } 563 }; 564 return getExecutorService().submit(task); 565 } 566 567 public Future<Object> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body, 568 final Map<String, Object> headers) { 569 Callable<Object> task = new Callable<Object>() { 570 public Object call() throws Exception { 571 return requestBodyAndHeaders(endpoint, body, headers); 572 } 573 }; 574 return getExecutorService().submit(task); 575 } 576 577 public <T> Future<T> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body, 578 final Map<String, Object> headers, final Class<T> type) { 579 Callable<T> task = new Callable<T>() { 580 public T call() throws Exception { 581 return requestBodyAndHeaders(endpoint, body, headers, type); 582 } 583 }; 584 return getExecutorService().submit(task); 585 } 586 587 public Future<Exchange> asyncSend(final Endpoint endpoint, final Exchange exchange) { 588 Callable<Exchange> task = new Callable<Exchange>() { 589 public Exchange call() throws Exception { 590 return send(endpoint, exchange); 591 } 592 }; 593 return getExecutorService().submit(task); 594 } 595 596 public Future<Exchange> asyncSend(final Endpoint endpoint, final Processor processor) { 597 Callable<Exchange> task = new Callable<Exchange>() { 598 public Exchange call() throws Exception { 599 return send(endpoint, processor); 600 } 601 }; 602 return getExecutorService().submit(task); 603 } 604 605 public Future<Object> asyncSendBody(final Endpoint endpoint, final Object body) { 606 Callable<Object> task = new Callable<Object>() { 607 public Object call() throws Exception { 608 sendBody(endpoint, body); 609 // its InOnly, so no body to return 610 return null; 611 } 612 }; 613 return getExecutorService().submit(task); 614 } 615 616 private Future<Object> asyncCallback(final Endpoint endpoint, final ExchangePattern pattern, final Object body, final Synchronization onCompletion) { 617 Callable<Object> task = new Callable<Object>() { 618 public Object call() throws Exception { 619 Exchange answer = send(endpoint, pattern, createSetBodyProcessor(body)); 620 621 // invoke callback before returning answer 622 // as it allows callback to be used without UnitOfWorkProcessor invoking it 623 // and thus it works directly from a producer template as well, as opposed 624 // to the UnitOfWorkProcessor that is injected in routes 625 if (answer.isFailed()) { 626 onCompletion.onFailure(answer); 627 } else { 628 onCompletion.onComplete(answer); 629 } 630 631 Object result = extractResultBody(answer, pattern); 632 if (pattern.isOutCapable()) { 633 return result; 634 } else { 635 // return null if not OUT capable 636 return null; 637 } 638 } 639 }; 640 return getExecutorService().submit(task); 641 } 642 643 public Future<Exchange> asyncCallback(final Endpoint endpoint, final Exchange exchange, final Synchronization onCompletion) { 644 Callable<Exchange> task = new Callable<Exchange>() { 645 public Exchange call() throws Exception { 646 // process the exchange, any exception occurring will be caught and set on the exchange 647 send(endpoint, exchange); 648 649 // invoke callback before returning answer 650 // as it allows callback to be used without UnitOfWorkProcessor invoking it 651 // and thus it works directly from a producer template as well, as opposed 652 // to the UnitOfWorkProcessor that is injected in routes 653 if (exchange.isFailed()) { 654 onCompletion.onFailure(exchange); 655 } else { 656 onCompletion.onComplete(exchange); 657 } 658 return exchange; 659 } 660 }; 661 return getExecutorService().submit(task); 662 } 663 664 public Future<Exchange> asyncCallback(final Endpoint endpoint, final Processor processor, final Synchronization onCompletion) { 665 Callable<Exchange> task = new Callable<Exchange>() { 666 public Exchange call() throws Exception { 667 // process the exchange, any exception occurring will be caught and set on the exchange 668 Exchange answer = send(endpoint, processor); 669 670 // invoke callback before returning answer 671 // as it allows callback to be used without UnitOfWorkProcessor invoking it 672 // and thus it works directly from a producer template as well, as opposed 673 // to the UnitOfWorkProcessor that is injected in routes 674 if (answer.isFailed()) { 675 onCompletion.onFailure(answer); 676 } else { 677 onCompletion.onComplete(answer); 678 } 679 return answer; 680 } 681 }; 682 return getExecutorService().submit(task); 683 } 684 685 private ProducerCache getProducerCache() { 686 if (!isStarted()) { 687 throw new IllegalStateException("ProducerTemplate has not been started"); 688 } 689 return producerCache; 690 } 691 692 private ExecutorService getExecutorService() { 693 if (!isStarted()) { 694 throw new IllegalStateException("ProducerTemplate has not been started"); 695 } 696 697 if (executor != null) { 698 return executor; 699 } 700 701 // create a default executor which must be synchronized 702 synchronized (this) { 703 if (executor != null) { 704 return executor; 705 } 706 executor = camelContext.getExecutorServiceManager().newDefaultThreadPool(this, "ProducerTemplate"); 707 } 708 709 ObjectHelper.notNull(executor, "ExecutorService"); 710 return executor; 711 } 712 713 protected void doStart() throws Exception { 714 if (producerCache == null) { 715 if (maximumCacheSize > 0) { 716 producerCache = new ProducerCache(this, camelContext, maximumCacheSize); 717 } else { 718 producerCache = new ProducerCache(this, camelContext); 719 } 720 } 721 ServiceHelper.startService(producerCache); 722 } 723 724 protected void doStop() throws Exception { 725 ServiceHelper.stopService(producerCache); 726 producerCache = null; 727 728 if (executor != null) { 729 camelContext.getExecutorServiceManager().shutdownNow(executor); 730 executor = null; 731 } 732 } 733 734 }