001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.Map;
020    import java.util.concurrent.Callable;
021    import java.util.concurrent.ExecutorService;
022    import java.util.concurrent.Future;
023    import java.util.concurrent.TimeUnit;
024    import java.util.concurrent.TimeoutException;
025    
026    import org.apache.camel.CamelContext;
027    import org.apache.camel.CamelExecutionException;
028    import org.apache.camel.Endpoint;
029    import org.apache.camel.Exchange;
030    import org.apache.camel.ExchangePattern;
031    import org.apache.camel.Message;
032    import org.apache.camel.NoSuchEndpointException;
033    import org.apache.camel.Processor;
034    import org.apache.camel.ProducerTemplate;
035    import org.apache.camel.spi.Synchronization;
036    import org.apache.camel.support.ServiceSupport;
037    import org.apache.camel.util.CamelContextHelper;
038    import org.apache.camel.util.ExchangeHelper;
039    import org.apache.camel.util.ObjectHelper;
040    import org.apache.camel.util.ServiceHelper;
041    
042    /**
043     * Template (named like Spring's TransactionTemplate & JmsTemplate
044     * et al) for working with Camel and sending {@link Message} instances in an
045     * {@link Exchange} to an {@link Endpoint}.
046     *
047     * @version 
048     */
049    public class DefaultProducerTemplate extends ServiceSupport implements ProducerTemplate {
050        private final CamelContext camelContext;
051        private volatile ProducerCache producerCache;
052        private volatile ExecutorService executor;
053        private Endpoint defaultEndpoint;
054        private int maximumCacheSize;
055    
056        public DefaultProducerTemplate(CamelContext camelContext) {
057            this.camelContext = camelContext;
058        }
059    
060        public DefaultProducerTemplate(CamelContext camelContext, ExecutorService executor) {
061            this.camelContext = camelContext;
062            this.executor = executor;
063        }
064    
065        public DefaultProducerTemplate(CamelContext camelContext, Endpoint defaultEndpoint) {
066            this(camelContext);
067            this.defaultEndpoint = defaultEndpoint;
068        }
069    
070        public static DefaultProducerTemplate newInstance(CamelContext camelContext, String defaultEndpointUri) {
071            Endpoint endpoint = CamelContextHelper.getMandatoryEndpoint(camelContext, defaultEndpointUri);
072            return new DefaultProducerTemplate(camelContext, endpoint);
073        }
074    
075        public int getMaximumCacheSize() {
076            return maximumCacheSize;
077        }
078    
079        public void setMaximumCacheSize(int maximumCacheSize) {
080            this.maximumCacheSize = maximumCacheSize;
081        }
082    
083        public int getCurrentCacheSize() {
084            if (producerCache == null) {
085                return 0;
086            }
087            return producerCache.size();
088        }
089    
090        public Exchange send(String endpointUri, Exchange exchange) {
091            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
092            return send(endpoint, exchange);
093        }
094    
095        public Exchange send(String endpointUri, Processor processor) {
096            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
097            return send(endpoint, processor);
098        }
099    
100        public Exchange send(String endpointUri, ExchangePattern pattern, Processor processor) {
101            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
102            return send(endpoint, pattern, processor);
103        }
104    
105        public Exchange send(Endpoint endpoint, Exchange exchange) {
106            getProducerCache().send(endpoint, exchange);
107            return exchange;
108        }
109    
110        public Exchange send(Endpoint endpoint, Processor processor) {
111            return getProducerCache().send(endpoint, processor);
112        }
113    
114        public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
115            return getProducerCache().send(endpoint, pattern, processor);
116        }
117    
118        public Object sendBody(Endpoint endpoint, ExchangePattern pattern, Object body) {
119            Exchange result = send(endpoint, pattern, createSetBodyProcessor(body));
120            return extractResultBody(result, pattern);
121        }
122    
123        public void sendBody(Endpoint endpoint, Object body) throws CamelExecutionException {
124            Exchange result = send(endpoint, createSetBodyProcessor(body));
125            // must invoke extract result body in case of exception to be rethrown
126            extractResultBody(result);
127        }
128    
129        public void sendBody(String endpointUri, Object body) throws CamelExecutionException {
130            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
131            sendBody(endpoint, body);
132        }
133    
134        public Object sendBody(String endpointUri, ExchangePattern pattern, Object body) throws CamelExecutionException {
135            Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
136            Object result = sendBody(endpoint, pattern, body);
137            if (pattern.isOutCapable()) {
138                return result;
139            } else {
140                // return null if not OUT capable
141                return null;
142            }
143        }
144    
145        public void sendBodyAndHeader(String endpointUri, final Object body, final String header, final Object headerValue) throws CamelExecutionException {
146            sendBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue);
147        }
148    
149        public void sendBodyAndHeader(Endpoint endpoint, final Object body, final String header, final Object headerValue) throws CamelExecutionException {
150            Exchange result = send(endpoint, createBodyAndHeaderProcessor(body, header, headerValue));
151            // must invoke extract result body in case of exception to be rethrown
152            extractResultBody(result);
153        }
154    
155        public Object sendBodyAndHeader(Endpoint endpoint, ExchangePattern pattern, final Object body,
156                                        final String header, final Object headerValue) throws CamelExecutionException {
157            Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
158            Object result = extractResultBody(exchange, pattern);
159            if (pattern.isOutCapable()) {
160                return result;
161            } else {
162                // return null if not OUT capable
163                return null;
164            }
165        }
166    
167        public Object sendBodyAndHeader(String endpoint, ExchangePattern pattern, final Object body,
168                                        final String header, final Object headerValue) throws CamelExecutionException {
169            Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
170            Object result = extractResultBody(exchange, pattern);
171            if (pattern.isOutCapable()) {
172                return result;
173            } else {
174                // return null if not OUT capable
175                return null;
176            }
177        }
178    
179        public void sendBodyAndProperty(String endpointUri, final Object body,
180                                        final String property, final Object propertyValue) throws CamelExecutionException {
181            sendBodyAndProperty(resolveMandatoryEndpoint(endpointUri), body, property, propertyValue);
182        }
183    
184        public void sendBodyAndProperty(Endpoint endpoint, final Object body,
185                                        final String property, final Object propertyValue) throws CamelExecutionException {
186            Exchange result = send(endpoint, createBodyAndPropertyProcessor(body, property, propertyValue));
187            // must invoke extract result body in case of exception to be rethrown
188            extractResultBody(result);
189        }
190    
191        public Object sendBodyAndProperty(Endpoint endpoint, ExchangePattern pattern, final Object body,
192                                          final String property, final Object propertyValue) throws CamelExecutionException {
193            Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue));
194            Object result = extractResultBody(exchange, pattern);
195            if (pattern.isOutCapable()) {
196                return result;
197            } else {
198                // return null if not OUT capable
199                return null;
200            }
201        }
202    
203        public Object sendBodyAndProperty(String endpoint, ExchangePattern pattern, final Object body,
204                                          final String property, final Object propertyValue) throws CamelExecutionException {
205            Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue));
206            Object result = extractResultBody(exchange, pattern);
207            if (pattern.isOutCapable()) {
208                return result;
209            } else {
210                // return null if not OUT capable
211                return null;
212            }
213        }
214    
215        public void sendBodyAndHeaders(String endpointUri, final Object body, final Map<String, Object> headers) throws CamelExecutionException {
216            sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
217        }
218    
219        public void sendBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) throws CamelExecutionException {
220            Exchange result = send(endpoint, new Processor() {
221                public void process(Exchange exchange) {
222                    Message in = exchange.getIn();
223                    for (Map.Entry<String, Object> header : headers.entrySet()) {
224                        in.setHeader(header.getKey(), header.getValue());
225                    }
226                    in.setBody(body);
227                }
228            });
229            // must invoke extract result body in case of exception to be rethrown
230            extractResultBody(result);
231        }
232    
233        public Object sendBodyAndHeaders(String endpointUri, ExchangePattern pattern, Object body, Map<String, Object> headers) throws CamelExecutionException {
234            return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), pattern, body, headers);
235        }
236    
237        public Object sendBodyAndHeaders(Endpoint endpoint, ExchangePattern pattern, final Object body, final Map<String, Object> headers) throws CamelExecutionException {
238            Exchange exchange = send(endpoint, pattern, new Processor() {
239                public void process(Exchange exchange) throws Exception {
240                    Message in = exchange.getIn();
241                    for (Map.Entry<String, Object> header : headers.entrySet()) {
242                        in.setHeader(header.getKey(), header.getValue());
243                    }
244                    in.setBody(body);
245                }
246            });
247            Object result = extractResultBody(exchange, pattern);
248            if (pattern.isOutCapable()) {
249                return result;
250            } else {
251                // return null if not OUT capable
252                return null;
253            }
254        }
255    
256        // Methods using an InOut ExchangePattern
257        // -----------------------------------------------------------------------
258    
259        public Exchange request(Endpoint endpoint, Processor processor) {
260            return send(endpoint, ExchangePattern.InOut, processor);
261        }
262    
263        public Object requestBody(Object body) throws CamelExecutionException {
264            return sendBody(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, body);
265        }
266    
267        public Object requestBody(Endpoint endpoint, Object body) throws CamelExecutionException {
268            return sendBody(endpoint, ExchangePattern.InOut, body);
269        }
270    
271        public Object requestBodyAndHeader(Object body, String header, Object headerValue) throws CamelExecutionException {
272            return sendBodyAndHeader(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, body, header, headerValue);
273        }
274    
275        public Object requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue) throws CamelExecutionException {
276            return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue);
277        }
278    
279        public Exchange request(String endpoint, Processor processor) throws CamelExecutionException {
280            return send(endpoint, ExchangePattern.InOut, processor);
281        }
282    
283        public Object requestBody(String endpoint, Object body) throws CamelExecutionException {
284            return sendBody(endpoint, ExchangePattern.InOut, body);
285        }
286    
287        public Object requestBodyAndHeader(String endpoint, Object body, String header, Object headerValue) throws CamelExecutionException {
288            return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue);
289        }
290    
291        public Object requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers) {
292            return requestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
293        }
294    
295        public Object requestBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) {
296            return sendBodyAndHeaders(endpoint, ExchangePattern.InOut, body, headers);
297        }
298    
299        public Object requestBodyAndHeaders(final Object body, final Map<String, Object> headers) {
300            return sendBodyAndHeaders(getDefaultEndpoint(), ExchangePattern.InOut, body, headers);
301        }
302    
303        public <T> T requestBody(Object body, Class<T> type) {
304            Object answer = requestBody(body);
305            return camelContext.getTypeConverter().convertTo(type, answer);
306        }
307    
308        public <T> T requestBody(Endpoint endpoint, Object body, Class<T> type) {
309            Object answer = requestBody(endpoint, body);
310            return camelContext.getTypeConverter().convertTo(type, answer);
311        }
312    
313        public <T> T requestBody(String endpointUri, Object body, Class<T> type) {
314            Object answer = requestBody(endpointUri, body);
315            return camelContext.getTypeConverter().convertTo(type, answer);
316        }
317    
318        public <T> T requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type) {
319            Object answer = requestBodyAndHeader(endpoint, body, header, headerValue);
320            return camelContext.getTypeConverter().convertTo(type, answer);
321        }
322    
323        public <T> T requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type) {
324            Object answer = requestBodyAndHeader(endpointUri, body, header, headerValue);
325            return camelContext.getTypeConverter().convertTo(type, answer);
326        }
327    
328        public <T> T requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type) {
329            Object answer = requestBodyAndHeaders(endpointUri, body, headers);
330            return camelContext.getTypeConverter().convertTo(type, answer);
331        }
332    
333        public <T> T requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type) {
334            Object answer = requestBodyAndHeaders(endpoint, body, headers);
335            return camelContext.getTypeConverter().convertTo(type, answer);
336        }
337    
338        // Methods using the default endpoint
339        // -----------------------------------------------------------------------
340    
341        public void sendBody(Object body) {
342            sendBody(getMandatoryDefaultEndpoint(), body);
343        }
344    
345        public Exchange send(Exchange exchange) {
346            return send(getMandatoryDefaultEndpoint(), exchange);
347        }
348    
349        public Exchange send(Processor processor) {
350            return send(getMandatoryDefaultEndpoint(), processor);
351        }
352    
353        public void sendBodyAndHeader(Object body, String header, Object headerValue) {
354            sendBodyAndHeader(getMandatoryDefaultEndpoint(), body, header, headerValue);
355        }
356    
357        public void sendBodyAndProperty(Object body, String property, Object propertyValue) {
358            sendBodyAndProperty(getMandatoryDefaultEndpoint(), body, property, propertyValue);
359        }
360    
361        public void sendBodyAndHeaders(Object body, Map<String, Object> headers) {
362            sendBodyAndHeaders(getMandatoryDefaultEndpoint(), body, headers);
363        }
364    
365        // Properties
366        // -----------------------------------------------------------------------
367    
368        /**
369         * @deprecated use {@link #getCamelContext()}
370         */
371        @Deprecated
372        public CamelContext getContext() {
373            return getCamelContext();
374        }
375    
376        public CamelContext getCamelContext() {
377            return camelContext;
378        }
379    
380        public Endpoint getDefaultEndpoint() {
381            return defaultEndpoint;
382        }
383    
384        public void setDefaultEndpoint(Endpoint defaultEndpoint) {
385            this.defaultEndpoint = defaultEndpoint;
386        }
387    
388        /**
389         * Sets the default endpoint to use if none is specified
390         */
391        public void setDefaultEndpointUri(String endpointUri) {
392            setDefaultEndpoint(getCamelContext().getEndpoint(endpointUri));
393        }
394    
395        /**
396         * @deprecated use {@link CamelContext#getEndpoint(String, Class)}
397         */
398        @Deprecated
399        public <T extends Endpoint> T getResolvedEndpoint(String endpointUri, Class<T> expectedClass) {
400            return camelContext.getEndpoint(endpointUri, expectedClass);
401        }
402    
403        // Implementation methods
404        // -----------------------------------------------------------------------
405    
406        protected Processor createBodyAndHeaderProcessor(final Object body, final String header, final Object headerValue) {
407            return new Processor() {
408                public void process(Exchange exchange) {
409                    Message in = exchange.getIn();
410                    in.setHeader(header, headerValue);
411                    in.setBody(body);
412                }
413            };
414        }
415    
416        protected Processor createBodyAndPropertyProcessor(final Object body, final String property, final Object propertyValue) {
417            return new Processor() {
418                public void process(Exchange exchange) {
419                    exchange.setProperty(property, propertyValue);
420                    Message in = exchange.getIn();
421                    in.setBody(body);
422                }
423            };
424        }
425    
426        protected Processor createSetBodyProcessor(final Object body) {
427            return new Processor() {
428                public void process(Exchange exchange) {
429                    Message in = exchange.getIn();
430                    in.setBody(body);
431                }
432            };
433        }
434    
435        protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
436            Endpoint endpoint = camelContext.getEndpoint(endpointUri);
437            if (endpoint == null) {
438                throw new NoSuchEndpointException(endpointUri);
439            }
440            return endpoint;
441        }
442    
443        protected Endpoint getMandatoryDefaultEndpoint() {
444            Endpoint answer = getDefaultEndpoint();
445            ObjectHelper.notNull(answer, "defaultEndpoint");
446            return answer;
447        }
448    
449        protected Object extractResultBody(Exchange result) {
450            return extractResultBody(result, null);
451        }
452    
453        protected Object extractResultBody(Exchange result, ExchangePattern pattern) {
454            return ExchangeHelper.extractResultBody(result, pattern);
455        }
456    
457        public void setExecutorService(ExecutorService executorService) {
458            this.executor = executorService;
459        }
460    
461        public Future<Exchange> asyncSend(final String uri, final Exchange exchange) {
462            return asyncSend(resolveMandatoryEndpoint(uri), exchange);
463        }
464    
465        public Future<Exchange> asyncSend(final String uri, final Processor processor) {
466            return asyncSend(resolveMandatoryEndpoint(uri), processor);
467        }
468    
469        public Future<Object> asyncSendBody(final String uri, final Object body) {
470            return asyncSendBody(resolveMandatoryEndpoint(uri), body);
471        }
472    
473        public Future<Object> asyncRequestBody(final String uri, final Object body) {
474            return asyncRequestBody(resolveMandatoryEndpoint(uri), body);
475        }
476    
477        public <T> Future<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) {
478            return asyncRequestBody(resolveMandatoryEndpoint(uri), body, type);
479        }
480    
481        public Future<Object> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) {
482            return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue);
483        }
484    
485        public <T> Future<T> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) {
486            return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue, type);
487        }
488    
489        public Future<Object> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers) {
490            return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
491        }
492    
493        public <T> Future<T> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) {
494            return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers, type);
495        }
496    
497        public <T> T extractFutureBody(Future<Object> future, Class<T> type) {
498            return ExchangeHelper.extractFutureBody(camelContext, future, type);
499        }
500    
501        public <T> T extractFutureBody(Future<Object> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
502            return ExchangeHelper.extractFutureBody(camelContext, future, timeout, unit, type);
503        }
504    
505        public Future<Object> asyncCallbackSendBody(String uri, Object body, Synchronization onCompletion) {
506            return asyncCallbackSendBody(resolveMandatoryEndpoint(uri), body, onCompletion);
507        }
508    
509        public Future<Object> asyncCallbackSendBody(Endpoint endpoint, Object body, Synchronization onCompletion) {
510            return asyncCallback(endpoint, ExchangePattern.InOnly, body, onCompletion);
511        }
512    
513        public Future<Object> asyncCallbackRequestBody(String uri, Object body, Synchronization onCompletion) {
514            return asyncCallbackRequestBody(resolveMandatoryEndpoint(uri), body, onCompletion);
515        }
516    
517        public Future<Object> asyncCallbackRequestBody(Endpoint endpoint, Object body, Synchronization onCompletion) {
518            return asyncCallback(endpoint, ExchangePattern.InOut, body, onCompletion);
519        }
520    
521        public Future<Exchange> asyncCallback(String uri, Exchange exchange, Synchronization onCompletion) {
522            return asyncCallback(resolveMandatoryEndpoint(uri), exchange, onCompletion);
523        }
524    
525        public Future<Exchange> asyncCallback(String uri, Processor processor, Synchronization onCompletion) {
526            return asyncCallback(resolveMandatoryEndpoint(uri), processor, onCompletion);
527        }
528    
529        public Future<Object> asyncRequestBody(final Endpoint endpoint, final Object body) {
530            Callable<Object> task = new Callable<Object>() {
531                public Object call() throws Exception {
532                    return requestBody(endpoint, body);
533                }
534            };
535            return getExecutorService().submit(task);
536        }
537    
538        public <T> Future<T> asyncRequestBody(final Endpoint endpoint, final Object body, final Class<T> type) {
539            Callable<T> task = new Callable<T>() {
540                public T call() throws Exception {
541                    return requestBody(endpoint, body, type);
542                }
543            };
544            return getExecutorService().submit(task);
545        }
546    
547        public Future<Object> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header,
548                                                        final Object headerValue) {
549            Callable<Object> task = new Callable<Object>() {
550                public Object call() throws Exception {
551                    return requestBodyAndHeader(endpoint, body, header, headerValue);
552                }
553            };
554            return getExecutorService().submit(task);
555        }
556    
557        public <T> Future<T> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header,
558                                                       final Object headerValue, final Class<T> type) {
559            Callable<T> task = new Callable<T>() {
560                public T call() throws Exception {
561                    return requestBodyAndHeader(endpoint, body, header, headerValue, type);
562                }
563            };
564            return getExecutorService().submit(task);
565        }
566    
567        public Future<Object> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body,
568                                                         final Map<String, Object> headers) {
569            Callable<Object> task = new Callable<Object>() {
570                public Object call() throws Exception {
571                    return requestBodyAndHeaders(endpoint, body, headers);
572                }
573            };
574            return getExecutorService().submit(task);
575        }
576    
577        public <T> Future<T> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body,
578                                                        final Map<String, Object> headers, final Class<T> type) {
579            Callable<T> task = new Callable<T>() {
580                public T call() throws Exception {
581                    return requestBodyAndHeaders(endpoint, body, headers, type);
582                }
583            };
584            return getExecutorService().submit(task);
585        }
586    
587        public Future<Exchange> asyncSend(final Endpoint endpoint, final Exchange exchange) {
588            Callable<Exchange> task = new Callable<Exchange>() {
589                public Exchange call() throws Exception {
590                    return send(endpoint, exchange);
591                }
592            };
593            return getExecutorService().submit(task);
594        }
595    
596        public Future<Exchange> asyncSend(final Endpoint endpoint, final Processor processor) {
597            Callable<Exchange> task = new Callable<Exchange>() {
598                public Exchange call() throws Exception {
599                    return send(endpoint, processor);
600                }
601            };
602            return getExecutorService().submit(task);
603        }
604    
605        public Future<Object> asyncSendBody(final Endpoint endpoint, final Object body) {
606            Callable<Object> task = new Callable<Object>() {
607                public Object call() throws Exception {
608                    sendBody(endpoint, body);
609                    // its InOnly, so no body to return
610                    return null;
611                }
612            };
613            return getExecutorService().submit(task);
614        }
615    
616        private Future<Object> asyncCallback(final Endpoint endpoint, final ExchangePattern pattern, final Object body, final Synchronization onCompletion) {
617            Callable<Object> task = new Callable<Object>() {
618                public Object call() throws Exception {
619                    Exchange answer = send(endpoint, pattern, createSetBodyProcessor(body));
620    
621                    // invoke callback before returning answer
622                    // as it allows callback to be used without UnitOfWorkProcessor invoking it
623                    // and thus it works directly from a producer template as well, as opposed
624                    // to the UnitOfWorkProcessor that is injected in routes
625                    if (answer.isFailed()) {
626                        onCompletion.onFailure(answer);
627                    } else {
628                        onCompletion.onComplete(answer);
629                    }
630    
631                    Object result = extractResultBody(answer, pattern);
632                    if (pattern.isOutCapable()) {
633                        return result;
634                    } else {
635                        // return null if not OUT capable
636                        return null;
637                    }
638                }
639            };
640            return getExecutorService().submit(task);
641        }
642    
643        public Future<Exchange> asyncCallback(final Endpoint endpoint, final Exchange exchange, final Synchronization onCompletion) {
644            Callable<Exchange> task = new Callable<Exchange>() {
645                public Exchange call() throws Exception {
646                    // process the exchange, any exception occurring will be caught and set on the exchange
647                    send(endpoint, exchange);
648    
649                    // invoke callback before returning answer
650                    // as it allows callback to be used without UnitOfWorkProcessor invoking it
651                    // and thus it works directly from a producer template as well, as opposed
652                    // to the UnitOfWorkProcessor that is injected in routes
653                    if (exchange.isFailed()) {
654                        onCompletion.onFailure(exchange);
655                    } else {
656                        onCompletion.onComplete(exchange);
657                    }
658                    return exchange;
659                }
660            };
661            return getExecutorService().submit(task);
662        }
663    
664        public Future<Exchange> asyncCallback(final Endpoint endpoint, final Processor processor, final Synchronization onCompletion) {
665            Callable<Exchange> task = new Callable<Exchange>() {
666                public Exchange call() throws Exception {
667                    // process the exchange, any exception occurring will be caught and set on the exchange
668                    Exchange answer = send(endpoint, processor);
669    
670                    // invoke callback before returning answer
671                    // as it allows callback to be used without UnitOfWorkProcessor invoking it
672                    // and thus it works directly from a producer template as well, as opposed
673                    // to the UnitOfWorkProcessor that is injected in routes
674                    if (answer.isFailed()) {
675                        onCompletion.onFailure(answer);
676                    } else {
677                        onCompletion.onComplete(answer);
678                    }
679                    return answer;
680                }
681            };
682            return getExecutorService().submit(task);
683        }
684    
685        private ProducerCache getProducerCache() {
686            if (!isStarted()) {
687                throw new IllegalStateException("ProducerTemplate has not been started");
688            }
689            return producerCache;
690        }
691    
692        private ExecutorService getExecutorService() {
693            if (!isStarted()) {
694                throw new IllegalStateException("ProducerTemplate has not been started");
695            }
696    
697            if (executor != null) {
698                return executor;
699            }
700    
701            // create a default executor which must be synchronized
702            synchronized (this) {
703                if (executor != null) {
704                    return executor;
705                }
706                executor = camelContext.getExecutorServiceManager().newDefaultThreadPool(this, "ProducerTemplate");
707            }
708    
709            ObjectHelper.notNull(executor, "ExecutorService");
710            return executor;
711        }
712    
713        protected void doStart() throws Exception {
714            if (producerCache == null) {
715                if (maximumCacheSize > 0) {
716                    producerCache = new ProducerCache(this, camelContext, maximumCacheSize);
717                } else {
718                    producerCache = new ProducerCache(this, camelContext);
719                }
720            }
721            ServiceHelper.startService(producerCache);
722        }
723    
724        protected void doStop() throws Exception {
725            ServiceHelper.stopService(producerCache);
726            producerCache = null;
727    
728            if (executor != null) {
729                camelContext.getExecutorServiceManager().shutdownNow(executor);
730                executor = null;
731            }
732        }
733    
734    }