001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.util;
018
019import java.util.HashMap;
020import java.util.LinkedList;
021import java.util.List;
022import java.util.Map;
023import java.util.concurrent.ExecutionException;
024import java.util.concurrent.Future;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.TimeoutException;
027import java.util.function.Predicate;
028
029import org.apache.camel.CamelContext;
030import org.apache.camel.CamelExchangeException;
031import org.apache.camel.CamelExecutionException;
032import org.apache.camel.Endpoint;
033import org.apache.camel.Exchange;
034import org.apache.camel.ExchangePattern;
035import org.apache.camel.InvalidPayloadException;
036import org.apache.camel.Message;
037import org.apache.camel.MessageHistory;
038import org.apache.camel.NoSuchBeanException;
039import org.apache.camel.NoSuchEndpointException;
040import org.apache.camel.NoSuchHeaderException;
041import org.apache.camel.NoSuchPropertyException;
042import org.apache.camel.NoTypeConversionAvailableException;
043import org.apache.camel.Route;
044import org.apache.camel.TypeConversionException;
045import org.apache.camel.TypeConverter;
046import org.apache.camel.impl.DefaultExchange;
047import org.apache.camel.impl.MessageSupport;
048import org.apache.camel.spi.Synchronization;
049import org.apache.camel.spi.UnitOfWork;
050
051/**
052 * Some helper methods for working with {@link Exchange} objects
053 *
054 * @version
055 */
056public final class ExchangeHelper {
057
058    /**
059     * Utility classes should not have a public constructor.
060     */
061    private ExchangeHelper() {
062    }
063
064    /**
065     * Extracts the Exchange.BINDING of the given type or null if not present
066     *
067     * @param exchange the message exchange
068     * @param type     the expected binding type
069     * @return the binding object of the given type or null if it could not be found or converted
070     */
071    public static <T> T getBinding(Exchange exchange, Class<T> type) {
072        return exchange != null ? exchange.getProperty(Exchange.BINDING, type) : null;
073    }
074
075    /**
076     * Attempts to resolve the endpoint for the given value
077     *
078     * @param exchange the message exchange being processed
079     * @param value    the value which can be an {@link Endpoint} or an object
080     *                 which provides a String representation of an endpoint via
081     *                 {@link #toString()}
082     * @return the endpoint
083     * @throws NoSuchEndpointException if the endpoint cannot be resolved
084     */
085    public static Endpoint resolveEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException {
086        Endpoint endpoint;
087        if (value instanceof Endpoint) {
088            endpoint = (Endpoint) value;
089        } else {
090            String uri = value.toString().trim();
091            endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), uri);
092        }
093        return endpoint;
094    }
095
096    /**
097     * Gets the mandatory property of the exchange of the correct type
098     *
099     * @param exchange      the exchange
100     * @param propertyName  the property name
101     * @param type          the type
102     * @return the property value
103     * @throws TypeConversionException is thrown if error during type conversion
104     * @throws NoSuchPropertyException is thrown if no property exists
105     */
106    public static <T> T getMandatoryProperty(Exchange exchange, String propertyName, Class<T> type) throws NoSuchPropertyException {
107        T result = exchange.getProperty(propertyName, type);
108        if (result != null) {
109            return result;
110        }
111        throw new NoSuchPropertyException(exchange, propertyName, type);
112    }
113
114    /**
115     * Gets the mandatory inbound header of the correct type
116     *
117     * @param exchange      the exchange
118     * @param headerName    the header name
119     * @param type          the type
120     * @return the header value
121     * @throws TypeConversionException is thrown if error during type conversion
122     * @throws NoSuchHeaderException is thrown if no headers exists
123     */
124    public static <T> T getMandatoryHeader(Exchange exchange, String headerName, Class<T> type) throws TypeConversionException, NoSuchHeaderException {
125        T answer = exchange.getIn().getHeader(headerName, type);
126        if (answer == null) {
127            throw new NoSuchHeaderException(exchange, headerName, type);
128        }
129        return answer;
130    }
131
132    /**
133     * Gets the mandatory inbound header of the correct type
134     *
135     * @param message       the message
136     * @param headerName    the header name
137     * @param type          the type
138     * @return the header value
139     * @throws TypeConversionException is thrown if error during type conversion
140     * @throws NoSuchHeaderException is thrown if no headers exists
141     */
142    public static <T> T getMandatoryHeader(Message message, String headerName, Class<T> type) throws TypeConversionException, NoSuchHeaderException {
143        T answer = message.getHeader(headerName, type);
144        if (answer == null) {
145            throw new NoSuchHeaderException(message.getExchange(), headerName, type);
146        }
147        return answer;
148    }
149
150    /**
151     * Gets an header or property of the correct type
152     *
153     * @param exchange      the exchange
154     * @param name          the name of the header or the property
155     * @param type          the type
156     * @return the header or property value
157     * @throws TypeConversionException is thrown if error during type conversion
158     * @throws NoSuchHeaderException is thrown if no headers exists
159     */
160    public static <T> T getHeaderOrProperty(Exchange exchange, String name, Class<T> type) throws TypeConversionException {
161        T answer = exchange.getIn().getHeader(name, type);
162        if (answer == null) {
163            answer = exchange.getProperty(name, type);
164        }
165        return answer;
166    }
167
168    /**
169     * Returns the mandatory inbound message body of the correct type or throws
170     * an exception if it is not present
171     *
172     * @param exchange the exchange
173     * @return the body, is never <tt>null</tt>
174     * @throws InvalidPayloadException Is thrown if the body being <tt>null</tt> or wrong class type
175     * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()}
176     */
177    @Deprecated
178    public static Object getMandatoryInBody(Exchange exchange) throws InvalidPayloadException {
179        return exchange.getIn().getMandatoryBody();
180    }
181
182    /**
183     * Returns the mandatory inbound message body of the correct type or throws
184     * an exception if it is not present
185     * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)}
186     */
187    @Deprecated
188    public static <T> T getMandatoryInBody(Exchange exchange, Class<T> type) throws InvalidPayloadException {
189        return exchange.getIn().getMandatoryBody(type);
190    }
191
192    /**
193     * Returns the mandatory outbound message body of the correct type or throws
194     * an exception if it is not present
195     * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()}
196     */
197    @Deprecated
198    public static Object getMandatoryOutBody(Exchange exchange) throws InvalidPayloadException {
199        return exchange.getOut().getMandatoryBody();
200    }
201
202    /**
203     * Returns the mandatory outbound message body of the correct type or throws
204     * an exception if it is not present
205     * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)}
206     */
207    @Deprecated
208    public static <T> T getMandatoryOutBody(Exchange exchange, Class<T> type) throws InvalidPayloadException {
209        return exchange.getOut().getMandatoryBody(type);
210    }
211
212    /**
213     * Converts the value to the given expected type or throws an exception
214     *
215     * @return the converted value
216     * @throws TypeConversionException is thrown if error during type conversion
217     * @throws NoTypeConversionAvailableException} if no type converters exists to convert to the given type
218     */
219    public static <T> T convertToMandatoryType(Exchange exchange, Class<T> type, Object value)
220        throws TypeConversionException, NoTypeConversionAvailableException {
221        CamelContext camelContext = exchange.getContext();
222        ObjectHelper.notNull(camelContext, "CamelContext of Exchange");
223        TypeConverter converter = camelContext.getTypeConverter();
224        if (converter != null) {
225            return converter.mandatoryConvertTo(type, exchange, value);
226        }
227        throw new NoTypeConversionAvailableException(value, type);
228    }
229
230    /**
231     * Converts the value to the given expected type
232     *
233     * @return the converted value
234     * @throws org.apache.camel.TypeConversionException is thrown if error during type conversion
235     */
236    public static <T> T convertToType(Exchange exchange, Class<T> type, Object value) throws TypeConversionException {
237        CamelContext camelContext = exchange.getContext();
238        ObjectHelper.notNull(camelContext, "CamelContext of Exchange");
239        TypeConverter converter = camelContext.getTypeConverter();
240        if (converter != null) {
241            return converter.convertTo(type, exchange, value);
242        }
243        return null;
244    }
245
246    /**
247     * Creates a new instance and copies from the current message exchange so that it can be
248     * forwarded to another destination as a new instance. Unlike regular copy this operation
249     * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used
250     * for async messaging, where the original and copied exchange are independent.
251     *
252     * @param exchange original copy of the exchange
253     * @param handover whether the on completion callbacks should be handed over to the new copy.
254     */
255    public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
256        return createCorrelatedCopy(exchange, handover, false);
257    }
258
259    /**
260     * Creates a new instance and copies from the current message exchange so that it can be
261     * forwarded to another destination as a new instance. Unlike regular copy this operation
262     * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used
263     * for async messaging, where the original and copied exchange are independent.
264     *
265     * @param exchange original copy of the exchange
266     * @param handover whether the on completion callbacks should be handed over to the new copy.
267     * @param useSameMessageId whether to use same message id on the copy message.
268     */
269    public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId) {
270        return createCorrelatedCopy(exchange, handover, useSameMessageId, null);
271    }
272
273    /**
274     * Creates a new instance and copies from the current message exchange so that it can be
275     * forwarded to another destination as a new instance. Unlike regular copy this operation
276     * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used
277     * for async messaging, where the original and copied exchange are independent.
278     *
279     * @param exchange original copy of the exchange
280     * @param handover whether the on completion callbacks should be handed over to the new copy.
281     * @param useSameMessageId whether to use same message id on the copy message.
282     * @param filter whether to handover the on completion
283     */
284    public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId, Predicate<Synchronization> filter) {
285        String id = exchange.getExchangeId();
286
287        // make sure to do a safe copy as the correlated copy can be routed independently of the source.
288        Exchange copy = exchange.copy(true);
289        // do not reuse message id on copy
290        if (!useSameMessageId) {
291            if (copy.hasOut()) {
292                copy.getOut().setMessageId(null);
293            }
294            copy.getIn().setMessageId(null);
295        }
296        // do not share the unit of work
297        copy.setUnitOfWork(null);
298        // do not reuse the message id
299        // hand over on completion to the copy if we got any
300        UnitOfWork uow = exchange.getUnitOfWork();
301        if (handover && uow != null) {
302            uow.handoverSynchronization(copy, filter);
303        }
304        // set a correlation id so we can track back the original exchange
305        copy.setProperty(Exchange.CORRELATION_ID, id);
306        return copy;
307    }
308
309    /**
310     * Creates a new instance and copies from the current message exchange so that it can be
311     * forwarded to another destination as a new instance.
312     *
313     * @param exchange original copy of the exchange
314     * @param preserveExchangeId whether or not the exchange id should be preserved
315     * @return the copy
316     */
317    public static Exchange createCopy(Exchange exchange, boolean preserveExchangeId) {
318        Exchange copy = exchange.copy();
319        if (preserveExchangeId) {
320            // must preserve exchange id
321            copy.setExchangeId(exchange.getExchangeId());
322        }
323        return copy;
324    }
325
326    /**
327     * Copies the results of a message exchange from the source exchange to the result exchange
328     * which will copy the message contents, exchange properties and the exception.
329     * Notice the {@link ExchangePattern} is <b>not</b> copied/altered.
330     *
331     * @param result the result exchange which will have the output and error state added
332     * @param source the source exchange which is not modified
333     */
334    public static void copyResults(Exchange result, Exchange source) {
335
336        // --------------------------------------------------------------------
337        //  TODO: merge logic with that of copyResultsPreservePattern()
338        // --------------------------------------------------------------------
339
340        if (result == source) {
341            // we just need to ensure MEP is as expected (eg copy result to OUT if out capable)
342            // and the result is not failed
343            if (result.getPattern() == ExchangePattern.InOptionalOut) {
344                // keep as is
345            } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) {
346                // copy IN to OUT as we expect a OUT response
347                result.getOut().copyFrom(source.getIn());
348            }
349            return;
350        }
351
352        if (result != source) {
353            result.setException(source.getException());
354            if (source.hasOut()) {
355                result.getOut().copyFrom(source.getOut());
356            } else if (result.getPattern() == ExchangePattern.InOptionalOut) {
357                // special case where the result is InOptionalOut and with no OUT response
358                // so we should return null to indicate this fact
359                result.setOut(null);
360            } else {
361                // no results so lets copy the last input
362                // as the final processor on a pipeline might not
363                // have created any OUT; such as a mock:endpoint
364                // so lets assume the last IN is the OUT
365                if (result.getPattern().isOutCapable()) {
366                    // only set OUT if its OUT capable
367                    result.getOut().copyFrom(source.getIn());
368                } else {
369                    // if not replace IN instead to keep the MEP
370                    result.getIn().copyFrom(source.getIn());
371                    // clear any existing OUT as the result is on the IN
372                    if (result.hasOut()) {
373                        result.setOut(null);
374                    }
375                }
376            }
377
378            if (source.hasProperties()) {
379                result.getProperties().putAll(source.getProperties());
380            }
381        }
382    }
383
384    /**
385     * Copies the <code>source</code> exchange to <code>target</code> exchange
386     * preserving the {@link ExchangePattern} of <code>target</code>.
387     *
388     * @param result target exchange.
389     * @param source source exchange.
390     */
391    public static void copyResultsPreservePattern(Exchange result, Exchange source) {
392
393        // --------------------------------------------------------------------
394        //  TODO: merge logic with that of copyResults()
395        // --------------------------------------------------------------------
396
397        if (result == source) {
398            // we just need to ensure MEP is as expected (eg copy result to OUT if out capable)
399            // and the result is not failed
400            if (result.getPattern() == ExchangePattern.InOptionalOut) {
401                // keep as is
402            } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) {
403                // copy IN to OUT as we expect a OUT response
404                result.getOut().copyFrom(source.getIn());
405            }
406            return;
407        }
408
409        // copy in message
410        result.getIn().copyFrom(source.getIn());
411
412        // copy out message
413        if (source.hasOut()) {
414            // exchange pattern sensitive
415            Message resultMessage = source.getOut().isFault() ? result.getOut() : getResultMessage(result);
416            resultMessage.copyFrom(source.getOut());
417        }
418
419        // copy exception
420        result.setException(source.getException());
421
422        // copy properties
423        if (source.hasProperties()) {
424            result.getProperties().putAll(source.getProperties());
425        }
426    }
427
428    /**
429     * Returns the message where to write results in an
430     * exchange-pattern-sensitive way.
431     *
432     * @param exchange message exchange.
433     * @return result message.
434     */
435    public static Message getResultMessage(Exchange exchange) {
436        if (exchange.getPattern().isOutCapable()) {
437            return exchange.getOut();
438        } else {
439            return exchange.getIn();
440        }
441    }
442
443    /**
444     * Returns true if the given exchange pattern (if defined) can support OUT messages
445     *
446     * @param exchange the exchange to interrogate
447     * @return true if the exchange is defined as an {@link ExchangePattern} which supports
448     *         OUT messages
449     */
450    public static boolean isOutCapable(Exchange exchange) {
451        ExchangePattern pattern = exchange.getPattern();
452        return pattern != null && pattern.isOutCapable();
453    }
454
455    /**
456     * Creates a new instance of the given type from the injector
457     *
458     * @param exchange the exchange
459     * @param type     the given type
460     * @return the created instance of the given type
461     */
462    public static <T> T newInstance(Exchange exchange, Class<T> type) {
463        return exchange.getContext().getInjector().newInstance(type);
464    }
465
466    /**
467     * Creates a Map of the variables which are made available to a script or template
468     *
469     * @param exchange the exchange to make available
470     * @return a Map populated with the require variables
471     */
472    @Deprecated
473    public static Map<String, Object> createVariableMap(Exchange exchange) {
474        Map<String, Object> answer = new HashMap<>();
475        populateVariableMap(exchange, answer, true);
476        return answer;
477    }
478
479    /**
480     * Creates a Map of the variables which are made available to a script or template
481     *
482     * @param exchange the exchange to make available
483     * @param allowContextMapAll whether to allow access to all context map or not
484     *                           (prefer to use false due to security reasons preferred to only allow access to body/headers)
485     * @return a Map populated with the require variables
486     */
487    public static Map<String, Object> createVariableMap(Exchange exchange, boolean allowContextMapAll) {
488        Map<String, Object> answer = new HashMap<>();
489        populateVariableMap(exchange, answer, allowContextMapAll);
490        return answer;
491    }
492
493    /**
494     * Populates the Map with the variables which are made available to a script or template
495     *
496     * @param exchange the exchange to make available
497     * @param map      the map to populate
498     */
499    @Deprecated
500    public static void populateVariableMap(Exchange exchange, Map<String, Object> map) {
501        populateVariableMap(exchange, map, true);
502    }
503
504    /**
505     * Populates the Map with the variables which are made available to a script or template
506     *
507     * @param exchange the exchange to make available
508     * @param map      the map to populate
509     * @param allowContextMapAll whether to allow access to all context map or not
510     *                           (prefer to use false due to security reasons preferred to only allow access to body/headers)
511     */
512    public static void populateVariableMap(Exchange exchange, Map<String, Object> map, boolean allowContextMapAll) {
513        Message in = exchange.getIn();
514        map.put("headers", in.getHeaders());
515        map.put("body", in.getBody());
516        if (allowContextMapAll) {
517            map.put("in", in);
518            map.put("exchange", exchange);
519            map.put("request", in);
520            if (isOutCapable(exchange)) {
521                // if we are out capable then set out and response as well
522                // however only grab OUT if it exists, otherwise reuse IN
523                // this prevents side effects to alter the Exchange if we force creating an OUT message
524                Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
525                map.put("out", msg);
526                map.put("response", msg);
527            }
528            map.put("camelContext", exchange.getContext());
529        }
530    }
531
532    /**
533     * Returns the MIME content type on the input message or null if one is not defined
534     *
535     * @param exchange the exchange
536     * @return the MIME content type
537     */
538    public static String getContentType(Exchange exchange) {
539        return MessageHelper.getContentType(exchange.getIn());
540    }
541
542    /**
543     * Returns the MIME content encoding on the input message or null if one is not defined
544     *
545     * @param exchange the exchange
546     * @return the MIME content encoding
547     */
548    public static String getContentEncoding(Exchange exchange) {
549        return MessageHelper.getContentEncoding(exchange.getIn());
550    }
551
552    /**
553     * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found
554     *
555     * @param exchange the exchange
556     * @param name     the bean name
557     * @return the bean
558     * @throws NoSuchBeanException if no bean could be found in the registry
559     */
560    public static Object lookupMandatoryBean(Exchange exchange, String name) throws NoSuchBeanException {
561        Object value = lookupBean(exchange, name);
562        if (value == null) {
563            throw new NoSuchBeanException(name);
564        }
565        return value;
566    }
567
568    /**
569     * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found
570     *
571     * @param exchange the exchange
572     * @param name     the bean name
573     * @param type     the expected bean type
574     * @return the bean
575     * @throws NoSuchBeanException if no bean could be found in the registry
576     */
577    public static <T> T lookupMandatoryBean(Exchange exchange, String name, Class<T> type) {
578        T value = lookupBean(exchange, name, type);
579        if (value == null) {
580            throw new NoSuchBeanException(name);
581        }
582        return value;
583    }
584
585    /**
586     * Performs a lookup in the registry of the bean name
587     *
588     * @param exchange the exchange
589     * @param name     the bean name
590     * @return the bean, or <tt>null</tt> if no bean could be found
591     */
592    public static Object lookupBean(Exchange exchange, String name) {
593        return exchange.getContext().getRegistry().lookupByName(name);
594    }
595
596    /**
597     * Performs a lookup in the registry of the bean name and type
598     *
599     * @param exchange the exchange
600     * @param name     the bean name
601     * @param type     the expected bean type
602     * @return the bean, or <tt>null</tt> if no bean could be found
603     */
604    public static <T> T lookupBean(Exchange exchange, String name, Class<T> type) {
605        return exchange.getContext().getRegistry().lookupByNameAndType(name, type);
606    }
607
608    /**
609     * Returns the first exchange in the given collection of exchanges which has the same exchange ID as the one given
610     * or null if none could be found
611     *
612     * @param exchanges  the exchanges
613     * @param exchangeId the exchangeId to find
614     * @return matching exchange, or <tt>null</tt> if none found
615     */
616    public static Exchange getExchangeById(Iterable<Exchange> exchanges, String exchangeId) {
617        for (Exchange exchange : exchanges) {
618            String id = exchange.getExchangeId();
619            if (id != null && id.equals(exchangeId)) {
620                return exchange;
621            }
622        }
623        return null;
624    }
625
626    /**
627     * Prepares the exchanges for aggregation.
628     * <p/>
629     * This implementation will copy the OUT body to the IN body so when you do
630     * aggregation the body is <b>only</b> in the IN body to avoid confusing end users.
631     *
632     * @param oldExchange the old exchange
633     * @param newExchange the new exchange
634     */
635    public static void prepareAggregation(Exchange oldExchange, Exchange newExchange) {
636        // move body/header from OUT to IN
637        if (oldExchange != null) {
638            if (oldExchange.hasOut()) {
639                oldExchange.setIn(oldExchange.getOut());
640                oldExchange.setOut(null);
641            }
642        }
643
644        if (newExchange != null) {
645            if (newExchange.hasOut()) {
646                newExchange.setIn(newExchange.getOut());
647                newExchange.setOut(null);
648            }
649        }
650    }
651
652    /**
653     * Checks whether the exchange has been failure handed
654     *
655     * @param exchange  the exchange
656     * @return <tt>true</tt> if failure handled, <tt>false</tt> otherwise
657     */
658    public static boolean isFailureHandled(Exchange exchange) {
659        return exchange.getProperty(Exchange.FAILURE_HANDLED, false, Boolean.class);
660    }
661
662    /**
663     * Checks whether the exchange {@link UnitOfWork} is exhausted
664     *
665     * @param exchange  the exchange
666     * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise
667     */
668    public static boolean isUnitOfWorkExhausted(Exchange exchange) {
669        return exchange.getProperty(Exchange.UNIT_OF_WORK_EXHAUSTED, false, Boolean.class);
670    }
671
672    /**
673     * Sets the exchange to be failure handled.
674     *
675     * @param exchange  the exchange
676     */
677    public static void setFailureHandled(Exchange exchange) {
678        exchange.setProperty(Exchange.FAILURE_HANDLED, Boolean.TRUE);
679        // clear exception since its failure handled
680        exchange.setException(null);
681    }
682
683    /**
684     * Checks whether the exchange is redelivery exhausted
685     *
686     * @param exchange  the exchange
687     * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise
688     */
689    public static boolean isRedeliveryExhausted(Exchange exchange) {
690        return exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class);
691    }
692
693    /**
694     * Checks whether the exchange {@link UnitOfWork} is redelivered
695     *
696     * @param exchange  the exchange
697     * @return <tt>true</tt> if redelivered, <tt>false</tt> otherwise
698     */
699    public static boolean isRedelivered(Exchange exchange) {
700        return exchange.getIn().hasHeaders() && exchange.getIn().getHeader(Exchange.REDELIVERED, false, Boolean.class);
701    }
702
703    /**
704     * Checks whether the exchange {@link UnitOfWork} has been interrupted during processing
705     *
706     * @param exchange  the exchange
707     * @return <tt>true</tt> if interrupted, <tt>false</tt> otherwise
708     */
709    public static boolean isInterrupted(Exchange exchange) {
710        Object value = exchange.getProperty(Exchange.INTERRUPTED);
711        return value != null && Boolean.TRUE == value;
712    }
713
714    /**
715     * Check whether or not stream caching is enabled for the given route or globally.
716     *
717     * @param exchange  the exchange
718     * @return <tt>true</tt> if enabled, <tt>false</tt> otherwise
719     */
720    public static boolean isStreamCachingEnabled(final Exchange exchange) {
721        Route route = exchange.getContext().getRoute(exchange.getFromRouteId());
722        if (route != null) {
723            return route.getRouteContext().isStreamCaching();
724        } else {
725            return exchange.getContext().getStreamCachingStrategy().isEnabled();
726        }
727    }
728
729    /**
730     * Extracts the body from the given exchange.
731     * <p/>
732     * If the exchange pattern is provided it will try to honor it and retrieve the body
733     * from either IN or OUT according to the pattern.
734     *
735     * @param exchange the exchange
736     * @param pattern  exchange pattern if given, can be <tt>null</tt>
737     * @return the result body, can be <tt>null</tt>.
738     * @throws CamelExecutionException is thrown if the processing of the exchange failed
739     */
740    public static Object extractResultBody(Exchange exchange, ExchangePattern pattern) {
741        Object answer = null;
742        if (exchange != null) {
743            // rethrow if there was an exception during execution
744            if (exchange.getException() != null) {
745                throw ObjectHelper.wrapCamelExecutionException(exchange, exchange.getException());
746            }
747
748            // result could have a fault message
749            if (hasFaultMessage(exchange)) {
750                Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
751                answer = msg.getBody();
752                return answer;
753            }
754
755            // okay no fault then return the response according to the pattern
756            // try to honor pattern if provided
757            boolean notOut = pattern != null && !pattern.isOutCapable();
758            boolean hasOut = exchange.hasOut();
759            if (hasOut && !notOut) {
760                // we have a response in out and the pattern is out capable
761                answer = exchange.getOut().getBody();
762            } else if (!hasOut && exchange.getPattern() == ExchangePattern.InOptionalOut) {
763                // special case where the result is InOptionalOut and with no OUT response
764                // so we should return null to indicate this fact
765                answer = null;
766            } else {
767                // use IN as the response
768                answer = exchange.getIn().getBody();
769            }
770        }
771        return answer;
772    }
773
774    /**
775     * Tests whether the exchange has a fault message set and that its not null.
776     *
777     * @param exchange the exchange
778     * @return <tt>true</tt> if fault message exists
779     */
780    public static boolean hasFaultMessage(Exchange exchange) {
781        Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
782        return msg.isFault() && msg.getBody() != null;
783    }
784
785    /**
786     * Tests whether the exchange has already been handled by the error handler
787     *
788     * @param exchange the exchange
789     * @return <tt>true</tt> if handled already by error handler, <tt>false</tt> otherwise
790     */
791    public static boolean hasExceptionBeenHandledByErrorHandler(Exchange exchange) {
792        return Boolean.TRUE.equals(exchange.getProperty(Exchange.ERRORHANDLER_HANDLED));
793    }
794
795    /**
796     * Extracts the body from the given future, that represents a handle to an asynchronous exchange.
797     * <p/>
798     * Will wait until the future task is complete.
799     *
800     * @param context the camel context
801     * @param future  the future handle
802     * @param type    the expected body response type
803     * @return the result body, can be <tt>null</tt>.
804     * @throws CamelExecutionException is thrown if the processing of the exchange failed
805     */
806    public static <T> T extractFutureBody(CamelContext context, Future<?> future, Class<T> type) {
807        try {
808            return doExtractFutureBody(context, future.get(), type);
809        } catch (InterruptedException e) {
810            throw ObjectHelper.wrapRuntimeCamelException(e);
811        } catch (ExecutionException e) {
812            // execution failed due to an exception so rethrow the cause
813            throw ObjectHelper.wrapCamelExecutionException(null, e.getCause());
814        } finally {
815            // its harmless to cancel if task is already completed
816            // and in any case we do not want to get hold of the task a 2nd time
817            // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book
818            future.cancel(true);
819        }
820    }
821
822    /**
823     * Extracts the body from the given future, that represents a handle to an asynchronous exchange.
824     * <p/>
825     * Will wait for the future task to complete, but waiting at most the timeout value.
826     *
827     * @param context the camel context
828     * @param future  the future handle
829     * @param timeout timeout value
830     * @param unit    timeout unit
831     * @param type    the expected body response type
832     * @return the result body, can be <tt>null</tt>.
833     * @throws CamelExecutionException is thrown if the processing of the exchange failed
834     * @throws java.util.concurrent.TimeoutException is thrown if a timeout triggered
835     */
836    public static <T> T extractFutureBody(CamelContext context, Future<?> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
837        try {
838            if (timeout > 0) {
839                return doExtractFutureBody(context, future.get(timeout, unit), type);
840            } else {
841                return doExtractFutureBody(context, future.get(), type);
842            }
843        } catch (InterruptedException e) {
844            // execution failed due interruption so rethrow the cause
845            throw ObjectHelper.wrapCamelExecutionException(null, e);
846        } catch (ExecutionException e) {
847            // execution failed due to an exception so rethrow the cause
848            throw ObjectHelper.wrapCamelExecutionException(null, e.getCause());
849        } finally {
850            // its harmless to cancel if task is already completed
851            // and in any case we do not want to get hold of the task a 2nd time
852            // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book
853            future.cancel(true);
854        }
855    }
856
857    private static <T> T doExtractFutureBody(CamelContext context, Object result, Class<T> type) {
858        if (result == null) {
859            return null;
860        }
861        if (type.isAssignableFrom(result.getClass())) {
862            return type.cast(result);
863        }
864        if (result instanceof Exchange) {
865            Exchange exchange = (Exchange) result;
866            Object answer = ExchangeHelper.extractResultBody(exchange, exchange.getPattern());
867            return context.getTypeConverter().convertTo(type, exchange, answer);
868        }
869        return context.getTypeConverter().convertTo(type, result);
870    }
871
872    /**
873     * @deprecated use org.apache.camel.CamelExchangeException.createExceptionMessage instead
874     */
875    @Deprecated
876    public static String createExceptionMessage(String message, Exchange exchange, Throwable cause) {
877        return CamelExchangeException.createExceptionMessage(message, exchange, cause);
878    }
879
880    /**
881     * Strategy to prepare results before next iterator or when we are complete,
882     * which is done by copying OUT to IN, so there is only an IN as input
883     * for the next iteration.
884     *
885     * @param exchange the exchange to prepare
886     */
887    public static void prepareOutToIn(Exchange exchange) {
888        // we are routing using pipes and filters so we need to manually copy OUT to IN
889        if (exchange.hasOut()) {
890            exchange.setIn(exchange.getOut());
891            exchange.setOut(null);
892        }
893    }
894
895    /**
896     * Gets both the messageId and exchangeId to be used for logging purposes.
897     * <p/>
898     * Logging both ids, can help to correlate exchanges which may be redelivered messages
899     * from for example a JMS broker.
900     *
901     * @param exchange the exchange
902     * @return a log message with both the messageId and exchangeId
903     */
904    public static String logIds(Exchange exchange) {
905        String msgId = exchange.hasOut() ? exchange.getOut().getMessageId() : exchange.getIn().getMessageId();
906        return "(MessageId: " + msgId + " on ExchangeId: " + exchange.getExchangeId()  + ")";
907    }
908
909    /**
910     * Copies the exchange but the copy will be tied to the given context
911     *
912     * @param exchange  the source exchange
913     * @param context   the camel context
914     * @return a copy with the given camel context
915     */
916    public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context) {
917        return copyExchangeAndSetCamelContext(exchange, context, true);
918    }
919
920    /**
921     * Copies the exchange but the copy will be tied to the given context
922     *
923     * @param exchange  the source exchange
924     * @param context   the camel context
925     * @param handover  whether to handover on completions from the source to the copy
926     * @return a copy with the given camel context
927     */
928    public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context, boolean handover) {
929        DefaultExchange answer = new DefaultExchange(context, exchange.getPattern());
930        if (exchange.hasProperties()) {
931            answer.setProperties(safeCopyProperties(exchange.getProperties()));
932        }
933        if (handover) {
934            // Need to hand over the completion for async invocation
935            exchange.handoverCompletions(answer);
936        }
937        answer.setIn(exchange.getIn().copy());
938        if (exchange.hasOut()) {
939            answer.setOut(exchange.getOut().copy());
940        }
941        answer.setException(exchange.getException());
942        return answer;
943    }
944
945    /**
946     * Replaces the existing message with the new message
947     *
948     * @param exchange  the exchange
949     * @param newMessage the new message
950     * @param outOnly    whether to replace the message as OUT message
951     */
952    public static void replaceMessage(Exchange exchange, Message newMessage, boolean outOnly) {
953        Message old = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
954        if (outOnly || exchange.hasOut()) {
955            exchange.setOut(newMessage);
956        } else {
957            exchange.setIn(newMessage);
958        }
959
960        // need to de-reference old from the exchange so it can be GC
961        if (old instanceof MessageSupport) {
962            ((MessageSupport) old).setExchange(null);
963        }
964    }
965
966    /**
967     * Gets the original IN {@link Message} this Unit of Work was started with.
968     * <p/>
969     * The original message is only returned if the option {@link org.apache.camel.RuntimeConfiguration#isAllowUseOriginalMessage()}
970     * is enabled. If its disabled, then <tt>null</tt> is returned.
971     *
972     * @return the original IN {@link Message}, or <tt>null</tt> if using original message is disabled.
973     */
974    public static Message getOriginalInMessage(Exchange exchange) {
975        Message answer = null;
976
977        // try parent first
978        UnitOfWork uow = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class);
979        if (uow != null) {
980            answer = uow.getOriginalInMessage();
981        }
982        // fallback to the current exchange
983        if (answer == null) {
984            uow = exchange.getUnitOfWork();
985            if (uow != null) {
986                answer = uow.getOriginalInMessage();
987            }
988        }
989        return answer;
990    }
991
992    /**
993     * Resolve the component scheme (aka name) from the given endpoint uri
994     *
995     * @param uri  the endpoint uri
996     * @return     the component scheme (name), or <tt>null</tt> if not possible to resolve
997     */
998    public static String resolveScheme(String uri) {
999        String scheme = null;
1000        if (uri != null) {
1001            // Use the URI prefix to find the component.
1002            String splitURI[] = StringHelper.splitOnCharacter(uri, ":", 2);
1003            if (splitURI[1] != null) {
1004                scheme = splitURI[0];
1005            }
1006        }
1007        return scheme;
1008    }
1009
1010    @SuppressWarnings("unchecked")
1011    private static Map<String, Object> safeCopyProperties(Map<String, Object> properties) {
1012        if (properties == null) {
1013            return null;
1014        }
1015
1016        Map<String, Object> answer = new HashMap<>(properties);
1017
1018        // safe copy message history using a defensive copy
1019        List<MessageHistory> history = (List<MessageHistory>) answer.remove(Exchange.MESSAGE_HISTORY);
1020        if (history != null) {
1021            answer.put(Exchange.MESSAGE_HISTORY, new LinkedList<>(history));
1022        }
1023
1024        return answer;
1025    }
1026}