001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.util;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    import java.util.concurrent.ConcurrentHashMap;
022    import java.util.concurrent.ExecutionException;
023    import java.util.concurrent.Future;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.TimeoutException;
026    
027    import org.apache.camel.CamelContext;
028    import org.apache.camel.CamelExchangeException;
029    import org.apache.camel.CamelExecutionException;
030    import org.apache.camel.Endpoint;
031    import org.apache.camel.Exchange;
032    import org.apache.camel.ExchangePattern;
033    import org.apache.camel.InvalidPayloadException;
034    import org.apache.camel.Message;
035    import org.apache.camel.NoSuchBeanException;
036    import org.apache.camel.NoSuchEndpointException;
037    import org.apache.camel.NoSuchHeaderException;
038    import org.apache.camel.NoSuchPropertyException;
039    import org.apache.camel.NoTypeConversionAvailableException;
040    import org.apache.camel.TypeConversionException;
041    import org.apache.camel.TypeConverter;
042    import org.apache.camel.impl.DefaultExchange;
043    import org.apache.camel.spi.UnitOfWork;
044    
045    /**
046     * Some helper methods for working with {@link Exchange} objects
047     *
048     * @version 
049     */
050    public final class ExchangeHelper {
051    
052        /**
053         * Utility classes should not have a public constructor.
054         */
055        private ExchangeHelper() {
056        }
057    
058        /**
059         * Extracts the Exchange.BINDING of the given type or null if not present
060         *
061         * @param exchange the message exchange
062         * @param type     the expected binding type
063         * @return the binding object of the given type or null if it could not be found or converted
064         */
065        public static <T> T getBinding(Exchange exchange, Class<T> type) {
066            return exchange != null ? exchange.getProperty(Exchange.BINDING, type) : null;
067        }
068    
069        /**
070         * Attempts to resolve the endpoint for the given value
071         *
072         * @param exchange the message exchange being processed
073         * @param value    the value which can be an {@link Endpoint} or an object
074         *                 which provides a String representation of an endpoint via
075         *                 {@link #toString()}
076         * @return the endpoint
077         * @throws NoSuchEndpointException if the endpoint cannot be resolved
078         */
079        public static Endpoint resolveEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException {
080            Endpoint endpoint;
081            if (value instanceof Endpoint) {
082                endpoint = (Endpoint) value;
083            } else {
084                String uri = value.toString().trim();
085                endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), uri);
086            }
087            return endpoint;
088        }
089    
090        /**
091         * Gets the mandatory property of the exchange of the correct type
092         *
093         * @param exchange      the exchange
094         * @param propertyName  the property name
095         * @param type          the type
096         * @return the property value
097         * @throws TypeConversionException is thrown if error during type conversion
098         * @throws NoSuchPropertyException is thrown if no property exists
099         */
100        public static <T> T getMandatoryProperty(Exchange exchange, String propertyName, Class<T> type) throws NoSuchPropertyException {
101            T result = exchange.getProperty(propertyName, type);
102            if (result != null) {
103                return result;
104            }
105            throw new NoSuchPropertyException(exchange, propertyName, type);
106        }
107    
108        /**
109         * Gets the mandatory inbound header of the correct type
110         *
111         * @param exchange      the exchange
112         * @param headerName    the header name
113         * @param type          the type
114         * @return the header value
115         * @throws TypeConversionException is thrown if error during type conversion
116         * @throws NoSuchHeaderException is thrown if no headers exists
117         */
118        public static <T> T getMandatoryHeader(Exchange exchange, String headerName, Class<T> type) throws TypeConversionException, NoSuchHeaderException {
119            T answer = exchange.getIn().getHeader(headerName, type);
120            if (answer == null) {
121                throw new NoSuchHeaderException(exchange, headerName, type);
122            }
123            return answer;
124        }
125    
126        /**
127         * Returns the mandatory inbound message body of the correct type or throws
128         * an exception if it is not present
129         *
130         * @param exchange the exchange
131         * @return the body, is never <tt>null</tt>
132         * @throws InvalidPayloadException Is thrown if the body being <tt>null</tt> or wrong class type
133         * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()}
134         */
135        @Deprecated
136        public static Object getMandatoryInBody(Exchange exchange) throws InvalidPayloadException {
137            return exchange.getIn().getMandatoryBody();
138        }
139    
140        /**
141         * Returns the mandatory inbound message body of the correct type or throws
142         * an exception if it is not present
143         * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)}
144         */
145        @Deprecated
146        public static <T> T getMandatoryInBody(Exchange exchange, Class<T> type) throws InvalidPayloadException {
147            return exchange.getIn().getMandatoryBody(type);
148        }
149    
150        /**
151         * Returns the mandatory outbound message body of the correct type or throws
152         * an exception if it is not present
153         * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()}
154         */
155        @Deprecated
156        public static Object getMandatoryOutBody(Exchange exchange) throws InvalidPayloadException {
157            return exchange.getOut().getMandatoryBody();
158        }
159    
160        /**
161         * Returns the mandatory outbound message body of the correct type or throws
162         * an exception if it is not present
163         * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)}
164         */
165        @Deprecated
166        public static <T> T getMandatoryOutBody(Exchange exchange, Class<T> type) throws InvalidPayloadException {
167            return exchange.getOut().getMandatoryBody(type);
168        }
169    
170        /**
171         * Converts the value to the given expected type or throws an exception
172         *
173         * @return the converted value
174         * @throws TypeConversionException is thrown if error during type conversion
175         * @throws NoTypeConversionAvailableException} if no type converters exists to convert to the given type
176         */
177        public static <T> T convertToMandatoryType(Exchange exchange, Class<T> type, Object value)
178            throws TypeConversionException, NoTypeConversionAvailableException {
179            CamelContext camelContext = exchange.getContext();
180            ObjectHelper.notNull(camelContext, "CamelContext of Exchange");
181            TypeConverter converter = camelContext.getTypeConverter();
182            if (converter != null) {
183                return converter.mandatoryConvertTo(type, exchange, value);
184            }
185            throw new NoTypeConversionAvailableException(value, type);
186        }
187    
188        /**
189         * Converts the value to the given expected type
190         *
191         * @return the converted value
192         * @throws org.apache.camel.TypeConversionException is thrown if error during type conversion
193         */
194        public static <T> T convertToType(Exchange exchange, Class<T> type, Object value) throws TypeConversionException {
195            CamelContext camelContext = exchange.getContext();
196            ObjectHelper.notNull(camelContext, "CamelContext of Exchange");
197            TypeConverter converter = camelContext.getTypeConverter();
198            if (converter != null) {
199                return converter.convertTo(type, exchange, value);
200            }
201            return null;
202        }
203    
204        /**
205         * Creates a new instance and copies from the current message exchange so that it can be
206         * forwarded to another destination as a new instance. Unlike regular copy this operation
207         * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used
208         * for async messaging, where the original and copied exchange are independent.
209         *
210         * @param exchange original copy of the exchange
211         * @param handover whether the on completion callbacks should be handed over to the new copy.
212         */
213        public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
214            String id = exchange.getExchangeId();
215    
216            Exchange copy = exchange.copy();
217            // do not share the unit of work
218            copy.setUnitOfWork(null);
219            // hand over on completion to the copy if we got any
220            UnitOfWork uow = exchange.getUnitOfWork();
221            if (handover && uow != null) {
222                uow.handoverSynchronization(copy);
223            }
224            // set a correlation id so we can track back the original exchange
225            copy.setProperty(Exchange.CORRELATION_ID, id);
226            return copy;
227        }
228    
229        /**
230         * Creates a new instance and copies from the current message exchange so that it can be
231         * forwarded to another destination as a new instance.
232         *
233         * @param exchange original copy of the exchange
234         * @param preserveExchangeId whether or not the exchange id should be preserved
235         * @return the copy
236         */
237        public static Exchange createCopy(Exchange exchange, boolean preserveExchangeId) {
238            Exchange copy = exchange.copy();
239            if (preserveExchangeId) {
240                // must preserve exchange id
241                copy.setExchangeId(exchange.getExchangeId());
242            }
243            return copy;
244        }
245    
246        /**
247         * Copies the results of a message exchange from the source exchange to the result exchange
248         * which will copy the out and fault message contents and the exception
249         *
250         * @param result the result exchange which will have the output and error state added
251         * @param source the source exchange which is not modified
252         */
253        public static void copyResults(Exchange result, Exchange source) {
254    
255            // --------------------------------------------------------------------
256            //  TODO: merge logic with that of copyResultsPreservePattern()
257            // --------------------------------------------------------------------
258    
259            if (result == source) {
260                // we just need to ensure MEP is as expected (eg copy result to OUT if out capable)
261                // and the result is not failed
262                if (result.getPattern() == ExchangePattern.InOptionalOut) {
263                    // keep as is
264                } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) {
265                    // copy IN to OUT as we expect a OUT response
266                    result.getOut().copyFrom(source.getIn());
267                }
268                return;
269            }
270    
271            if (result != source) {
272                result.setException(source.getException());
273                if (source.hasOut()) {
274                    result.getOut().copyFrom(source.getOut());
275                } else if (result.getPattern() == ExchangePattern.InOptionalOut) {
276                    // special case where the result is InOptionalOut and with no OUT response
277                    // so we should return null to indicate this fact
278                    result.setOut(null);
279                } else {
280                    // no results so lets copy the last input
281                    // as the final processor on a pipeline might not
282                    // have created any OUT; such as a mock:endpoint
283                    // so lets assume the last IN is the OUT
284                    if (result.getPattern().isOutCapable()) {
285                        // only set OUT if its OUT capable
286                        result.getOut().copyFrom(source.getIn());
287                    } else {
288                        // if not replace IN instead to keep the MEP
289                        result.getIn().copyFrom(source.getIn());
290                        // clear any existing OUT as the result is on the IN
291                        if (result.hasOut()) {
292                            result.setOut(null);
293                        }
294                    }
295                }
296    
297                if (source.hasProperties()) {
298                    result.getProperties().putAll(source.getProperties());
299                }
300            }
301        }
302    
303        /**
304         * Copies the <code>source</code> exchange to <code>target</code> exchange
305         * preserving the {@link ExchangePattern} of <code>target</code>.
306         *
307         * @param source source exchange.
308         * @param result target exchange.
309         */
310        public static void copyResultsPreservePattern(Exchange result, Exchange source) {
311    
312            // --------------------------------------------------------------------
313            //  TODO: merge logic with that of copyResults()
314            // --------------------------------------------------------------------
315    
316            if (result == source) {
317                // we just need to ensure MEP is as expected (eg copy result to OUT if out capable)
318                // and the result is not failed
319                if (result.getPattern() == ExchangePattern.InOptionalOut) {
320                    // keep as is
321                } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) {
322                    // copy IN to OUT as we expect a OUT response
323                    result.getOut().copyFrom(source.getIn());
324                }
325                return;
326            }
327    
328            // copy in message
329            result.getIn().copyFrom(source.getIn());
330    
331            // copy out message
332            if (source.hasOut()) {
333                // exchange pattern sensitive
334                Message resultMessage = source.getOut().isFault() ? result.getOut() : getResultMessage(result);
335                resultMessage.copyFrom(source.getOut());
336            }
337    
338            // copy exception
339            result.setException(source.getException());
340    
341            // copy properties
342            if (source.hasProperties()) {
343                result.getProperties().putAll(source.getProperties());
344            }
345        }
346    
347        /**
348         * Returns the message where to write results in an
349         * exchange-pattern-sensitive way.
350         *
351         * @param exchange message exchange.
352         * @return result message.
353         */
354        public static Message getResultMessage(Exchange exchange) {
355            if (exchange.getPattern().isOutCapable()) {
356                return exchange.getOut();
357            } else {
358                return exchange.getIn();
359            }
360        }
361    
362        /**
363         * Returns true if the given exchange pattern (if defined) can support OUT messages
364         *
365         * @param exchange the exchange to interrogate
366         * @return true if the exchange is defined as an {@link ExchangePattern} which supports
367         *         OUT messages
368         */
369        public static boolean isOutCapable(Exchange exchange) {
370            ExchangePattern pattern = exchange.getPattern();
371            return pattern != null && pattern.isOutCapable();
372        }
373    
374        /**
375         * Creates a new instance of the given type from the injector
376         *
377         * @param exchange the exchange
378         * @param type     the given type
379         * @return the created instance of the given type
380         */
381        public static <T> T newInstance(Exchange exchange, Class<T> type) {
382            return exchange.getContext().getInjector().newInstance(type);
383        }
384    
385        /**
386         * Creates a Map of the variables which are made available to a script or template
387         *
388         * @param exchange the exchange to make available
389         * @return a Map populated with the require variables
390         */
391        public static Map<String, Object> createVariableMap(Exchange exchange) {
392            Map<String, Object> answer = new HashMap<String, Object>();
393            populateVariableMap(exchange, answer);
394            return answer;
395        }
396    
397        /**
398         * Populates the Map with the variables which are made available to a script or template
399         *
400         * @param exchange the exchange to make available
401         * @param map      the map to populate
402         */
403        public static void populateVariableMap(Exchange exchange, Map<String, Object> map) {
404            map.put("exchange", exchange);
405            Message in = exchange.getIn();
406            map.put("in", in);
407            map.put("request", in);
408            map.put("headers", in.getHeaders());
409            map.put("body", in.getBody());
410            if (isOutCapable(exchange)) {
411                // if we are out capable then set out and response as well
412                // however only grab OUT if it exists, otherwise reuse IN
413                // this prevents side effects to alter the Exchange if we force creating an OUT message
414                Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
415                map.put("out", msg);
416                map.put("response", msg);
417            }
418            map.put("camelContext", exchange.getContext());
419        }
420    
421        /**
422         * Returns the MIME content type on the input message or null if one is not defined
423         *
424         * @param exchange the exchange
425         * @return the MIME content type
426         */
427        public static String getContentType(Exchange exchange) {
428            return MessageHelper.getContentType(exchange.getIn());
429        }
430    
431        /**
432         * Returns the MIME content encoding on the input message or null if one is not defined
433         *
434         * @param exchange the exchange
435         * @return the MIME content encoding
436         */
437        public static String getContentEncoding(Exchange exchange) {
438            return MessageHelper.getContentEncoding(exchange.getIn());
439        }
440    
441        /**
442         * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found
443         *
444         * @param exchange the exchange
445         * @param name     the bean name
446         * @return the bean
447         * @throws NoSuchBeanException if no bean could be found in the registry
448         */
449        public static Object lookupMandatoryBean(Exchange exchange, String name) throws NoSuchBeanException {
450            Object value = lookupBean(exchange, name);
451            if (value == null) {
452                throw new NoSuchBeanException(name);
453            }
454            return value;
455        }
456    
457        /**
458         * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found
459         *
460         * @param exchange the exchange
461         * @param name     the bean name
462         * @param type     the expected bean type
463         * @return the bean
464         * @throws NoSuchBeanException if no bean could be found in the registry
465         */
466        public static <T> T lookupMandatoryBean(Exchange exchange, String name, Class<T> type) {
467            T value = lookupBean(exchange, name, type);
468            if (value == null) {
469                throw new NoSuchBeanException(name);
470            }
471            return value;
472        }
473    
474        /**
475         * Performs a lookup in the registry of the bean name
476         *
477         * @param exchange the exchange
478         * @param name     the bean name
479         * @return the bean, or <tt>null</tt> if no bean could be found
480         */
481        public static Object lookupBean(Exchange exchange, String name) {
482            return exchange.getContext().getRegistry().lookup(name);
483        }
484    
485        /**
486         * Performs a lookup in the registry of the bean name and type
487         *
488         * @param exchange the exchange
489         * @param name     the bean name
490         * @param type     the expected bean type
491         * @return the bean, or <tt>null</tt> if no bean could be found
492         */
493        public static <T> T lookupBean(Exchange exchange, String name, Class<T> type) {
494            return exchange.getContext().getRegistry().lookup(name, type);
495        }
496    
497        /**
498         * Returns the first exchange in the given collection of exchanges which has the same exchange ID as the one given
499         * or null if none could be found
500         *
501         * @param exchanges  the exchanges
502         * @param exchangeId the exchangeId to find
503         * @return matching exchange, or <tt>null</tt> if none found
504         */
505        public static Exchange getExchangeById(Iterable<Exchange> exchanges, String exchangeId) {
506            for (Exchange exchange : exchanges) {
507                String id = exchange.getExchangeId();
508                if (id != null && id.equals(exchangeId)) {
509                    return exchange;
510                }
511            }
512            return null;
513        }
514    
515        /**
516         * Prepares the exchanges for aggregation.
517         * <p/>
518         * This implementation will copy the OUT body to the IN body so when you do
519         * aggregation the body is <b>only</b> in the IN body to avoid confusing end users.
520         *
521         * @param oldExchange the old exchange
522         * @param newExchange the new exchange
523         */
524        public static void prepareAggregation(Exchange oldExchange, Exchange newExchange) {
525            // move body/header from OUT to IN
526            if (oldExchange != null) {
527                if (oldExchange.hasOut()) {
528                    oldExchange.setIn(oldExchange.getOut());
529                    oldExchange.setOut(null);
530                }
531            }
532    
533            if (newExchange != null) {
534                if (newExchange.hasOut()) {
535                    newExchange.setIn(newExchange.getOut());
536                    newExchange.setOut(null);
537                }
538            }
539        }
540    
541        /**
542         * Checks whether the exchange has been failure handed
543         *
544         * @param exchange  the exchange
545         * @return <tt>true</tt> if failure handled, <tt>false</tt> otherwise
546         */
547        public static boolean isFailureHandled(Exchange exchange) {
548            return exchange.getProperty(Exchange.FAILURE_HANDLED, false, Boolean.class);
549        }
550    
551        /**
552         * Checks whether the exchange {@link UnitOfWork} is exhausted
553         *
554         * @param exchange  the exchange
555         * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise
556         */
557        public static boolean isUnitOfWorkExhausted(Exchange exchange) {
558            return exchange.getProperty(Exchange.UNIT_OF_WORK_EXHAUSTED, false, Boolean.class);
559        }
560    
561        /**
562         * Sets the exchange to be failure handled.
563         *
564         * @param exchange  the exchange
565         */
566        public static void setFailureHandled(Exchange exchange) {
567            exchange.setProperty(Exchange.FAILURE_HANDLED, Boolean.TRUE);
568            // clear exception since its failure handled
569            exchange.setException(null);
570        }
571    
572        /**
573         * Checks whether the exchange is redelivery exhausted
574         *
575         * @param exchange  the exchange
576         * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise
577         */
578        public static boolean isRedeliveryExhausted(Exchange exchange) {
579            return exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class);
580        }
581    
582        /**
583         * Checks whether the exchange {@link UnitOfWork} is redelivered
584         *
585         * @param exchange  the exchange
586         * @return <tt>true</tt> if redelivered, <tt>false</tt> otherwise
587         */
588        public static boolean isRedelivered(Exchange exchange) {
589            return exchange.getIn().hasHeaders() && exchange.getIn().getHeader(Exchange.REDELIVERED, false, Boolean.class);
590        }
591    
592        /**
593         * Checks whether the exchange {@link UnitOfWork} has been interrupted during processing
594         *
595         * @param exchange  the exchange
596         * @return <tt>true</tt> if interrupted, <tt>false</tt> otherwise
597         */
598        public static boolean isInterrupted(Exchange exchange) {
599            return exchange.getException(InterruptedException.class) != null;
600        }
601    
602        /**
603         * Extracts the body from the given exchange.
604         * <p/>
605         * If the exchange pattern is provided it will try to honor it and retrieve the body
606         * from either IN or OUT according to the pattern.
607         *
608         * @param exchange the exchange
609         * @param pattern  exchange pattern if given, can be <tt>null</tt>
610         * @return the result body, can be <tt>null</tt>.
611         * @throws CamelExecutionException is thrown if the processing of the exchange failed
612         */
613        public static Object extractResultBody(Exchange exchange, ExchangePattern pattern) {
614            Object answer = null;
615            if (exchange != null) {
616                // rethrow if there was an exception during execution
617                if (exchange.getException() != null) {
618                    throw ObjectHelper.wrapCamelExecutionException(exchange, exchange.getException());
619                }
620    
621                // result could have a fault message
622                if (hasFaultMessage(exchange)) {
623                    return exchange.getOut().getBody();
624                }
625    
626                // okay no fault then return the response according to the pattern
627                // try to honor pattern if provided
628                boolean notOut = pattern != null && !pattern.isOutCapable();
629                boolean hasOut = exchange.hasOut();
630                if (hasOut && !notOut) {
631                    // we have a response in out and the pattern is out capable
632                    answer = exchange.getOut().getBody();
633                } else if (!hasOut && exchange.getPattern() == ExchangePattern.InOptionalOut) {
634                    // special case where the result is InOptionalOut and with no OUT response
635                    // so we should return null to indicate this fact
636                    answer = null;
637                } else {
638                    // use IN as the response
639                    answer = exchange.getIn().getBody();
640                }
641            }
642            return answer;
643        }
644    
645        /**
646         * Tests whether the exchange has a fault message set and that its not null.
647         *
648         * @param exchange the exchange
649         * @return <tt>true</tt> if fault message exists
650         */
651        public static boolean hasFaultMessage(Exchange exchange) {
652            return exchange.hasOut() && exchange.getOut().isFault() && exchange.getOut().getBody() != null;
653        }
654    
655        /**
656         * Tests whether the exchange has already been handled by the error handler
657         *
658         * @param exchange the exchange
659         * @return <tt>true</tt> if handled already by error handler, <tt>false</tt> otherwise
660         */
661        public static boolean hasExceptionBeenHandledByErrorHandler(Exchange exchange) {
662            return Boolean.TRUE.equals(exchange.getProperty(Exchange.ERRORHANDLER_HANDLED));
663        }
664    
665        /**
666         * Extracts the body from the given future, that represents a handle to an asynchronous exchange.
667         * <p/>
668         * Will wait until the future task is complete.
669         *
670         * @param context the camel context
671         * @param future  the future handle
672         * @param type    the expected body response type
673         * @return the result body, can be <tt>null</tt>.
674         * @throws CamelExecutionException is thrown if the processing of the exchange failed
675         */
676        public static <T> T extractFutureBody(CamelContext context, Future<Object> future, Class<T> type) {
677            try {
678                return doExtractFutureBody(context, future.get(), type);
679            } catch (InterruptedException e) {
680                throw ObjectHelper.wrapRuntimeCamelException(e);
681            } catch (ExecutionException e) {
682                // execution failed due to an exception so rethrow the cause
683                throw ObjectHelper.wrapCamelExecutionException(null, e.getCause());
684            } finally {
685                // its harmless to cancel if task is already completed
686                // and in any case we do not want to get hold of the task a 2nd time
687                // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book
688                future.cancel(true);
689            }
690        }
691    
692        /**
693         * Extracts the body from the given future, that represents a handle to an asynchronous exchange.
694         * <p/>
695         * Will wait for the future task to complete, but waiting at most the timeout value.
696         *
697         * @param context the camel context
698         * @param future  the future handle
699         * @param timeout timeout value
700         * @param unit    timeout unit
701         * @param type    the expected body response type
702         * @return the result body, can be <tt>null</tt>.
703         * @throws CamelExecutionException is thrown if the processing of the exchange failed
704         * @throws java.util.concurrent.TimeoutException is thrown if a timeout triggered
705         */
706        public static <T> T extractFutureBody(CamelContext context, Future<Object> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
707            try {
708                if (timeout > 0) {
709                    return doExtractFutureBody(context, future.get(timeout, unit), type);
710                } else {
711                    return doExtractFutureBody(context, future.get(), type);
712                }
713            } catch (InterruptedException e) {
714                // execution failed due interruption so rethrow the cause
715                throw ObjectHelper.wrapCamelExecutionException(null, e);
716            } catch (ExecutionException e) {
717                // execution failed due to an exception so rethrow the cause
718                throw ObjectHelper.wrapCamelExecutionException(null, e.getCause());
719            } finally {
720                // its harmless to cancel if task is already completed
721                // and in any case we do not want to get hold of the task a 2nd time
722                // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book
723                future.cancel(true);
724            }
725        }
726    
727        private static <T> T doExtractFutureBody(CamelContext context, Object result, Class<T> type) {
728            if (result == null) {
729                return null;
730            }
731            if (type.isAssignableFrom(result.getClass())) {
732                return type.cast(result);
733            }
734            if (result instanceof Exchange) {
735                Exchange exchange = (Exchange) result;
736                Object answer = ExchangeHelper.extractResultBody(exchange, exchange.getPattern());
737                return context.getTypeConverter().convertTo(type, exchange, answer);
738            }
739            return context.getTypeConverter().convertTo(type, result);
740        }
741    
742        /**
743         * @deprecated use org.apache.camel.CamelExchangeException.createExceptionMessage instead
744         */
745        @Deprecated
746        public static String createExceptionMessage(String message, Exchange exchange, Throwable cause) {
747            return CamelExchangeException.createExceptionMessage(message, exchange, cause);
748        }
749    
750        /**
751         * Strategy to prepare results before next iterator or when we are complete,
752         * which is done by copying OUT to IN, so there is only an IN as input
753         * for the next iteration.
754         *
755         * @param exchange the exchange to prepare
756         */
757        public static void prepareOutToIn(Exchange exchange) {
758            // we are routing using pipes and filters so we need to manually copy OUT to IN
759            if (exchange.hasOut()) {
760                exchange.getIn().copyFrom(exchange.getOut());
761                exchange.setOut(null);
762            }
763        }
764    
765        /**
766         * Gets both the messageId and exchangeId to be used for logging purposes.
767         * <p/>
768         * Logging both ids, can help to correlate exchanges which may be redelivered messages
769         * from for example a JMS broker.
770         *
771         * @param exchange the exchange
772         * @return a log message with both the messageId and exchangeId
773         */
774        public static String logIds(Exchange exchange) {
775            String msgId = exchange.hasOut() ? exchange.getOut().getMessageId() : exchange.getIn().getMessageId();
776            return "(MessageId: " + msgId + " on ExchangeId: " + exchange.getExchangeId()  + ")";
777        }
778    
779        /**
780         * Copies the exchange but the copy will be tied to the given context
781         *
782         * @param exchange  the source exchange
783         * @param context   the camel context
784         * @return a copy with the given camel context
785         */
786        public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context) {
787            return copyExchangeAndSetCamelContext(exchange, context, true);
788        }
789    
790        /**
791         * Copies the exchange but the copy will be tied to the given context
792         *
793         * @param exchange  the source exchange
794         * @param context   the camel context
795         * @param handover  whether to handover on completions from the source to the copy
796         * @return a copy with the given camel context
797         */
798        public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context, boolean handover) {
799            DefaultExchange answer = new DefaultExchange(context, exchange.getPattern());
800            if (exchange.hasProperties()) {
801                answer.setProperties(safeCopy(exchange.getProperties()));
802            }
803            if (handover) {
804                // Need to hand over the completion for async invocation
805                exchange.handoverCompletions(answer);
806            }
807            answer.setIn(exchange.getIn().copy());
808            if (exchange.hasOut()) {
809                answer.setOut(exchange.getOut().copy());
810            }
811            answer.setException(exchange.getException());
812            return answer;
813        }
814    
815        private static Map<String, Object> safeCopy(Map<String, Object> properties) {
816            if (properties == null) {
817                return null;
818            }
819            return new ConcurrentHashMap<String, Object>(properties);
820        }
821    }