001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.util; 018 019import java.util.HashMap; 020import java.util.LinkedList; 021import java.util.List; 022import java.util.Map; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.ExecutionException; 025import java.util.concurrent.Future; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.TimeoutException; 028import java.util.function.Predicate; 029 030import org.apache.camel.CamelContext; 031import org.apache.camel.CamelExchangeException; 032import org.apache.camel.CamelExecutionException; 033import org.apache.camel.Endpoint; 034import org.apache.camel.Exchange; 035import org.apache.camel.ExchangePattern; 036import org.apache.camel.InvalidPayloadException; 037import org.apache.camel.Message; 038import org.apache.camel.MessageHistory; 039import org.apache.camel.NoSuchBeanException; 040import org.apache.camel.NoSuchEndpointException; 041import org.apache.camel.NoSuchHeaderException; 042import org.apache.camel.NoSuchPropertyException; 043import org.apache.camel.NoTypeConversionAvailableException; 044import org.apache.camel.TypeConversionException; 045import org.apache.camel.TypeConverter; 046import org.apache.camel.impl.DefaultExchange; 047import org.apache.camel.impl.MessageSupport; 048import org.apache.camel.spi.Synchronization; 049import org.apache.camel.spi.UnitOfWork; 050 051/** 052 * Some helper methods for working with {@link Exchange} objects 053 * 054 * @version 055 */ 056public final class ExchangeHelper { 057 058 /** 059 * Utility classes should not have a public constructor. 060 */ 061 private ExchangeHelper() { 062 } 063 064 /** 065 * Extracts the Exchange.BINDING of the given type or null if not present 066 * 067 * @param exchange the message exchange 068 * @param type the expected binding type 069 * @return the binding object of the given type or null if it could not be found or converted 070 */ 071 public static <T> T getBinding(Exchange exchange, Class<T> type) { 072 return exchange != null ? exchange.getProperty(Exchange.BINDING, type) : null; 073 } 074 075 /** 076 * Attempts to resolve the endpoint for the given value 077 * 078 * @param exchange the message exchange being processed 079 * @param value the value which can be an {@link Endpoint} or an object 080 * which provides a String representation of an endpoint via 081 * {@link #toString()} 082 * @return the endpoint 083 * @throws NoSuchEndpointException if the endpoint cannot be resolved 084 */ 085 public static Endpoint resolveEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException { 086 Endpoint endpoint; 087 if (value instanceof Endpoint) { 088 endpoint = (Endpoint) value; 089 } else { 090 String uri = value.toString().trim(); 091 endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), uri); 092 } 093 return endpoint; 094 } 095 096 /** 097 * Gets the mandatory property of the exchange of the correct type 098 * 099 * @param exchange the exchange 100 * @param propertyName the property name 101 * @param type the type 102 * @return the property value 103 * @throws TypeConversionException is thrown if error during type conversion 104 * @throws NoSuchPropertyException is thrown if no property exists 105 */ 106 public static <T> T getMandatoryProperty(Exchange exchange, String propertyName, Class<T> type) throws NoSuchPropertyException { 107 T result = exchange.getProperty(propertyName, type); 108 if (result != null) { 109 return result; 110 } 111 throw new NoSuchPropertyException(exchange, propertyName, type); 112 } 113 114 /** 115 * Gets the mandatory inbound header of the correct type 116 * 117 * @param exchange the exchange 118 * @param headerName the header name 119 * @param type the type 120 * @return the header value 121 * @throws TypeConversionException is thrown if error during type conversion 122 * @throws NoSuchHeaderException is thrown if no headers exists 123 */ 124 public static <T> T getMandatoryHeader(Exchange exchange, String headerName, Class<T> type) throws TypeConversionException, NoSuchHeaderException { 125 T answer = exchange.getIn().getHeader(headerName, type); 126 if (answer == null) { 127 throw new NoSuchHeaderException(exchange, headerName, type); 128 } 129 return answer; 130 } 131 132 /** 133 * Returns the mandatory inbound message body of the correct type or throws 134 * an exception if it is not present 135 * 136 * @param exchange the exchange 137 * @return the body, is never <tt>null</tt> 138 * @throws InvalidPayloadException Is thrown if the body being <tt>null</tt> or wrong class type 139 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()} 140 */ 141 @Deprecated 142 public static Object getMandatoryInBody(Exchange exchange) throws InvalidPayloadException { 143 return exchange.getIn().getMandatoryBody(); 144 } 145 146 /** 147 * Returns the mandatory inbound message body of the correct type or throws 148 * an exception if it is not present 149 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)} 150 */ 151 @Deprecated 152 public static <T> T getMandatoryInBody(Exchange exchange, Class<T> type) throws InvalidPayloadException { 153 return exchange.getIn().getMandatoryBody(type); 154 } 155 156 /** 157 * Returns the mandatory outbound message body of the correct type or throws 158 * an exception if it is not present 159 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()} 160 */ 161 @Deprecated 162 public static Object getMandatoryOutBody(Exchange exchange) throws InvalidPayloadException { 163 return exchange.getOut().getMandatoryBody(); 164 } 165 166 /** 167 * Returns the mandatory outbound message body of the correct type or throws 168 * an exception if it is not present 169 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)} 170 */ 171 @Deprecated 172 public static <T> T getMandatoryOutBody(Exchange exchange, Class<T> type) throws InvalidPayloadException { 173 return exchange.getOut().getMandatoryBody(type); 174 } 175 176 /** 177 * Converts the value to the given expected type or throws an exception 178 * 179 * @return the converted value 180 * @throws TypeConversionException is thrown if error during type conversion 181 * @throws NoTypeConversionAvailableException} if no type converters exists to convert to the given type 182 */ 183 public static <T> T convertToMandatoryType(Exchange exchange, Class<T> type, Object value) 184 throws TypeConversionException, NoTypeConversionAvailableException { 185 CamelContext camelContext = exchange.getContext(); 186 ObjectHelper.notNull(camelContext, "CamelContext of Exchange"); 187 TypeConverter converter = camelContext.getTypeConverter(); 188 if (converter != null) { 189 return converter.mandatoryConvertTo(type, exchange, value); 190 } 191 throw new NoTypeConversionAvailableException(value, type); 192 } 193 194 /** 195 * Converts the value to the given expected type 196 * 197 * @return the converted value 198 * @throws org.apache.camel.TypeConversionException is thrown if error during type conversion 199 */ 200 public static <T> T convertToType(Exchange exchange, Class<T> type, Object value) throws TypeConversionException { 201 CamelContext camelContext = exchange.getContext(); 202 ObjectHelper.notNull(camelContext, "CamelContext of Exchange"); 203 TypeConverter converter = camelContext.getTypeConverter(); 204 if (converter != null) { 205 return converter.convertTo(type, exchange, value); 206 } 207 return null; 208 } 209 210 /** 211 * Creates a new instance and copies from the current message exchange so that it can be 212 * forwarded to another destination as a new instance. Unlike regular copy this operation 213 * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used 214 * for async messaging, where the original and copied exchange are independent. 215 * 216 * @param exchange original copy of the exchange 217 * @param handover whether the on completion callbacks should be handed over to the new copy. 218 */ 219 public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover) { 220 return createCorrelatedCopy(exchange, handover, false); 221 } 222 223 /** 224 * Creates a new instance and copies from the current message exchange so that it can be 225 * forwarded to another destination as a new instance. Unlike regular copy this operation 226 * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used 227 * for async messaging, where the original and copied exchange are independent. 228 * 229 * @param exchange original copy of the exchange 230 * @param handover whether the on completion callbacks should be handed over to the new copy. 231 * @param useSameMessageId whether to use same message id on the copy message. 232 */ 233 public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId) { 234 return createCorrelatedCopy(exchange, handover, useSameMessageId, null); 235 } 236 237 /** 238 * Creates a new instance and copies from the current message exchange so that it can be 239 * forwarded to another destination as a new instance. Unlike regular copy this operation 240 * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used 241 * for async messaging, where the original and copied exchange are independent. 242 * 243 * @param exchange original copy of the exchange 244 * @param handover whether the on completion callbacks should be handed over to the new copy. 245 * @param useSameMessageId whether to use same message id on the copy message. 246 * @param filter whether to handover the on completion 247 */ 248 public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId, Predicate<Synchronization> filter) { 249 String id = exchange.getExchangeId(); 250 251 // make sure to do a safe copy as the correlated copy can be routed independently of the source. 252 Exchange copy = exchange.copy(true); 253 // do not reuse message id on copy 254 if (!useSameMessageId) { 255 if (copy.hasOut()) { 256 copy.getOut().setMessageId(null); 257 } 258 copy.getIn().setMessageId(null); 259 } 260 // do not share the unit of work 261 copy.setUnitOfWork(null); 262 // do not reuse the message id 263 // hand over on completion to the copy if we got any 264 UnitOfWork uow = exchange.getUnitOfWork(); 265 if (handover && uow != null) { 266 uow.handoverSynchronization(copy, filter); 267 } 268 // set a correlation id so we can track back the original exchange 269 copy.setProperty(Exchange.CORRELATION_ID, id); 270 return copy; 271 } 272 273 /** 274 * Creates a new instance and copies from the current message exchange so that it can be 275 * forwarded to another destination as a new instance. 276 * 277 * @param exchange original copy of the exchange 278 * @param preserveExchangeId whether or not the exchange id should be preserved 279 * @return the copy 280 */ 281 public static Exchange createCopy(Exchange exchange, boolean preserveExchangeId) { 282 Exchange copy = exchange.copy(); 283 if (preserveExchangeId) { 284 // must preserve exchange id 285 copy.setExchangeId(exchange.getExchangeId()); 286 } 287 return copy; 288 } 289 290 /** 291 * Copies the results of a message exchange from the source exchange to the result exchange 292 * which will copy the message contents, exchange properties and the exception. 293 * Notice the {@link ExchangePattern} is <b>not</b> copied/altered. 294 * 295 * @param result the result exchange which will have the output and error state added 296 * @param source the source exchange which is not modified 297 */ 298 public static void copyResults(Exchange result, Exchange source) { 299 300 // -------------------------------------------------------------------- 301 // TODO: merge logic with that of copyResultsPreservePattern() 302 // -------------------------------------------------------------------- 303 304 if (result == source) { 305 // we just need to ensure MEP is as expected (eg copy result to OUT if out capable) 306 // and the result is not failed 307 if (result.getPattern() == ExchangePattern.InOptionalOut) { 308 // keep as is 309 } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) { 310 // copy IN to OUT as we expect a OUT response 311 result.getOut().copyFrom(source.getIn()); 312 } 313 return; 314 } 315 316 if (result != source) { 317 result.setException(source.getException()); 318 if (source.hasOut()) { 319 result.getOut().copyFrom(source.getOut()); 320 } else if (result.getPattern() == ExchangePattern.InOptionalOut) { 321 // special case where the result is InOptionalOut and with no OUT response 322 // so we should return null to indicate this fact 323 result.setOut(null); 324 } else { 325 // no results so lets copy the last input 326 // as the final processor on a pipeline might not 327 // have created any OUT; such as a mock:endpoint 328 // so lets assume the last IN is the OUT 329 if (result.getPattern().isOutCapable()) { 330 // only set OUT if its OUT capable 331 result.getOut().copyFrom(source.getIn()); 332 } else { 333 // if not replace IN instead to keep the MEP 334 result.getIn().copyFrom(source.getIn()); 335 // clear any existing OUT as the result is on the IN 336 if (result.hasOut()) { 337 result.setOut(null); 338 } 339 } 340 } 341 342 if (source.hasProperties()) { 343 result.getProperties().putAll(source.getProperties()); 344 } 345 } 346 } 347 348 /** 349 * Copies the <code>source</code> exchange to <code>target</code> exchange 350 * preserving the {@link ExchangePattern} of <code>target</code>. 351 * 352 * @param source source exchange. 353 * @param result target exchange. 354 */ 355 public static void copyResultsPreservePattern(Exchange result, Exchange source) { 356 357 // -------------------------------------------------------------------- 358 // TODO: merge logic with that of copyResults() 359 // -------------------------------------------------------------------- 360 361 if (result == source) { 362 // we just need to ensure MEP is as expected (eg copy result to OUT if out capable) 363 // and the result is not failed 364 if (result.getPattern() == ExchangePattern.InOptionalOut) { 365 // keep as is 366 } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) { 367 // copy IN to OUT as we expect a OUT response 368 result.getOut().copyFrom(source.getIn()); 369 } 370 return; 371 } 372 373 // copy in message 374 result.getIn().copyFrom(source.getIn()); 375 376 // copy out message 377 if (source.hasOut()) { 378 // exchange pattern sensitive 379 Message resultMessage = source.getOut().isFault() ? result.getOut() : getResultMessage(result); 380 resultMessage.copyFrom(source.getOut()); 381 } 382 383 // copy exception 384 result.setException(source.getException()); 385 386 // copy properties 387 if (source.hasProperties()) { 388 result.getProperties().putAll(source.getProperties()); 389 } 390 } 391 392 /** 393 * Returns the message where to write results in an 394 * exchange-pattern-sensitive way. 395 * 396 * @param exchange message exchange. 397 * @return result message. 398 */ 399 public static Message getResultMessage(Exchange exchange) { 400 if (exchange.getPattern().isOutCapable()) { 401 return exchange.getOut(); 402 } else { 403 return exchange.getIn(); 404 } 405 } 406 407 /** 408 * Returns true if the given exchange pattern (if defined) can support OUT messages 409 * 410 * @param exchange the exchange to interrogate 411 * @return true if the exchange is defined as an {@link ExchangePattern} which supports 412 * OUT messages 413 */ 414 public static boolean isOutCapable(Exchange exchange) { 415 ExchangePattern pattern = exchange.getPattern(); 416 return pattern != null && pattern.isOutCapable(); 417 } 418 419 /** 420 * Creates a new instance of the given type from the injector 421 * 422 * @param exchange the exchange 423 * @param type the given type 424 * @return the created instance of the given type 425 */ 426 public static <T> T newInstance(Exchange exchange, Class<T> type) { 427 return exchange.getContext().getInjector().newInstance(type); 428 } 429 430 /** 431 * Creates a Map of the variables which are made available to a script or template 432 * 433 * @param exchange the exchange to make available 434 * @return a Map populated with the require variables 435 */ 436 public static Map<String, Object> createVariableMap(Exchange exchange) { 437 Map<String, Object> answer = new HashMap<String, Object>(); 438 populateVariableMap(exchange, answer); 439 return answer; 440 } 441 442 /** 443 * Populates the Map with the variables which are made available to a script or template 444 * 445 * @param exchange the exchange to make available 446 * @param map the map to populate 447 */ 448 public static void populateVariableMap(Exchange exchange, Map<String, Object> map) { 449 map.put("exchange", exchange); 450 Message in = exchange.getIn(); 451 map.put("in", in); 452 map.put("request", in); 453 map.put("headers", in.getHeaders()); 454 map.put("body", in.getBody()); 455 if (isOutCapable(exchange)) { 456 // if we are out capable then set out and response as well 457 // however only grab OUT if it exists, otherwise reuse IN 458 // this prevents side effects to alter the Exchange if we force creating an OUT message 459 Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); 460 map.put("out", msg); 461 map.put("response", msg); 462 } 463 map.put("camelContext", exchange.getContext()); 464 } 465 466 /** 467 * Returns the MIME content type on the input message or null if one is not defined 468 * 469 * @param exchange the exchange 470 * @return the MIME content type 471 */ 472 public static String getContentType(Exchange exchange) { 473 return MessageHelper.getContentType(exchange.getIn()); 474 } 475 476 /** 477 * Returns the MIME content encoding on the input message or null if one is not defined 478 * 479 * @param exchange the exchange 480 * @return the MIME content encoding 481 */ 482 public static String getContentEncoding(Exchange exchange) { 483 return MessageHelper.getContentEncoding(exchange.getIn()); 484 } 485 486 /** 487 * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found 488 * 489 * @param exchange the exchange 490 * @param name the bean name 491 * @return the bean 492 * @throws NoSuchBeanException if no bean could be found in the registry 493 */ 494 public static Object lookupMandatoryBean(Exchange exchange, String name) throws NoSuchBeanException { 495 Object value = lookupBean(exchange, name); 496 if (value == null) { 497 throw new NoSuchBeanException(name); 498 } 499 return value; 500 } 501 502 /** 503 * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found 504 * 505 * @param exchange the exchange 506 * @param name the bean name 507 * @param type the expected bean type 508 * @return the bean 509 * @throws NoSuchBeanException if no bean could be found in the registry 510 */ 511 public static <T> T lookupMandatoryBean(Exchange exchange, String name, Class<T> type) { 512 T value = lookupBean(exchange, name, type); 513 if (value == null) { 514 throw new NoSuchBeanException(name); 515 } 516 return value; 517 } 518 519 /** 520 * Performs a lookup in the registry of the bean name 521 * 522 * @param exchange the exchange 523 * @param name the bean name 524 * @return the bean, or <tt>null</tt> if no bean could be found 525 */ 526 public static Object lookupBean(Exchange exchange, String name) { 527 return exchange.getContext().getRegistry().lookupByName(name); 528 } 529 530 /** 531 * Performs a lookup in the registry of the bean name and type 532 * 533 * @param exchange the exchange 534 * @param name the bean name 535 * @param type the expected bean type 536 * @return the bean, or <tt>null</tt> if no bean could be found 537 */ 538 public static <T> T lookupBean(Exchange exchange, String name, Class<T> type) { 539 return exchange.getContext().getRegistry().lookupByNameAndType(name, type); 540 } 541 542 /** 543 * Returns the first exchange in the given collection of exchanges which has the same exchange ID as the one given 544 * or null if none could be found 545 * 546 * @param exchanges the exchanges 547 * @param exchangeId the exchangeId to find 548 * @return matching exchange, or <tt>null</tt> if none found 549 */ 550 public static Exchange getExchangeById(Iterable<Exchange> exchanges, String exchangeId) { 551 for (Exchange exchange : exchanges) { 552 String id = exchange.getExchangeId(); 553 if (id != null && id.equals(exchangeId)) { 554 return exchange; 555 } 556 } 557 return null; 558 } 559 560 /** 561 * Prepares the exchanges for aggregation. 562 * <p/> 563 * This implementation will copy the OUT body to the IN body so when you do 564 * aggregation the body is <b>only</b> in the IN body to avoid confusing end users. 565 * 566 * @param oldExchange the old exchange 567 * @param newExchange the new exchange 568 */ 569 public static void prepareAggregation(Exchange oldExchange, Exchange newExchange) { 570 // move body/header from OUT to IN 571 if (oldExchange != null) { 572 if (oldExchange.hasOut()) { 573 oldExchange.setIn(oldExchange.getOut()); 574 oldExchange.setOut(null); 575 } 576 } 577 578 if (newExchange != null) { 579 if (newExchange.hasOut()) { 580 newExchange.setIn(newExchange.getOut()); 581 newExchange.setOut(null); 582 } 583 } 584 } 585 586 /** 587 * Checks whether the exchange has been failure handed 588 * 589 * @param exchange the exchange 590 * @return <tt>true</tt> if failure handled, <tt>false</tt> otherwise 591 */ 592 public static boolean isFailureHandled(Exchange exchange) { 593 return exchange.getProperty(Exchange.FAILURE_HANDLED, false, Boolean.class); 594 } 595 596 /** 597 * Checks whether the exchange {@link UnitOfWork} is exhausted 598 * 599 * @param exchange the exchange 600 * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise 601 */ 602 public static boolean isUnitOfWorkExhausted(Exchange exchange) { 603 return exchange.getProperty(Exchange.UNIT_OF_WORK_EXHAUSTED, false, Boolean.class); 604 } 605 606 /** 607 * Sets the exchange to be failure handled. 608 * 609 * @param exchange the exchange 610 */ 611 public static void setFailureHandled(Exchange exchange) { 612 exchange.setProperty(Exchange.FAILURE_HANDLED, Boolean.TRUE); 613 // clear exception since its failure handled 614 exchange.setException(null); 615 } 616 617 /** 618 * Checks whether the exchange is redelivery exhausted 619 * 620 * @param exchange the exchange 621 * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise 622 */ 623 public static boolean isRedeliveryExhausted(Exchange exchange) { 624 return exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class); 625 } 626 627 /** 628 * Checks whether the exchange {@link UnitOfWork} is redelivered 629 * 630 * @param exchange the exchange 631 * @return <tt>true</tt> if redelivered, <tt>false</tt> otherwise 632 */ 633 public static boolean isRedelivered(Exchange exchange) { 634 return exchange.getIn().hasHeaders() && exchange.getIn().getHeader(Exchange.REDELIVERED, false, Boolean.class); 635 } 636 637 /** 638 * Checks whether the exchange {@link UnitOfWork} has been interrupted during processing 639 * 640 * @param exchange the exchange 641 * @return <tt>true</tt> if interrupted, <tt>false</tt> otherwise 642 */ 643 public static boolean isInterrupted(Exchange exchange) { 644 return exchange.getException(InterruptedException.class) != null; 645 } 646 647 /** 648 * Check whether or not stream caching is enabled for the given route or globally. 649 * 650 * @param exchange the exchange 651 * @return <tt>true</tt> if enabled, <tt>false</tt> otherwise 652 */ 653 public static boolean isStreamCachingEnabled(final Exchange exchange) { 654 if (exchange.getFromRouteId() == null) { 655 return exchange.getContext().getStreamCachingStrategy().isEnabled(); 656 } else { 657 return exchange.getContext().getRoute(exchange.getFromRouteId()).getRouteContext().isStreamCaching(); 658 } 659 } 660 661 /** 662 * Extracts the body from the given exchange. 663 * <p/> 664 * If the exchange pattern is provided it will try to honor it and retrieve the body 665 * from either IN or OUT according to the pattern. 666 * 667 * @param exchange the exchange 668 * @param pattern exchange pattern if given, can be <tt>null</tt> 669 * @return the result body, can be <tt>null</tt>. 670 * @throws CamelExecutionException is thrown if the processing of the exchange failed 671 */ 672 public static Object extractResultBody(Exchange exchange, ExchangePattern pattern) { 673 Object answer = null; 674 if (exchange != null) { 675 // rethrow if there was an exception during execution 676 if (exchange.getException() != null) { 677 throw ObjectHelper.wrapCamelExecutionException(exchange, exchange.getException()); 678 } 679 680 // result could have a fault message 681 if (hasFaultMessage(exchange)) { 682 Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); 683 answer = msg.getBody(); 684 return answer; 685 } 686 687 // okay no fault then return the response according to the pattern 688 // try to honor pattern if provided 689 boolean notOut = pattern != null && !pattern.isOutCapable(); 690 boolean hasOut = exchange.hasOut(); 691 if (hasOut && !notOut) { 692 // we have a response in out and the pattern is out capable 693 answer = exchange.getOut().getBody(); 694 } else if (!hasOut && exchange.getPattern() == ExchangePattern.InOptionalOut) { 695 // special case where the result is InOptionalOut and with no OUT response 696 // so we should return null to indicate this fact 697 answer = null; 698 } else { 699 // use IN as the response 700 answer = exchange.getIn().getBody(); 701 } 702 } 703 return answer; 704 } 705 706 /** 707 * Tests whether the exchange has a fault message set and that its not null. 708 * 709 * @param exchange the exchange 710 * @return <tt>true</tt> if fault message exists 711 */ 712 public static boolean hasFaultMessage(Exchange exchange) { 713 Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); 714 return msg.isFault() && msg.getBody() != null; 715 } 716 717 /** 718 * Tests whether the exchange has already been handled by the error handler 719 * 720 * @param exchange the exchange 721 * @return <tt>true</tt> if handled already by error handler, <tt>false</tt> otherwise 722 */ 723 public static boolean hasExceptionBeenHandledByErrorHandler(Exchange exchange) { 724 return Boolean.TRUE.equals(exchange.getProperty(Exchange.ERRORHANDLER_HANDLED)); 725 } 726 727 /** 728 * Extracts the body from the given future, that represents a handle to an asynchronous exchange. 729 * <p/> 730 * Will wait until the future task is complete. 731 * 732 * @param context the camel context 733 * @param future the future handle 734 * @param type the expected body response type 735 * @return the result body, can be <tt>null</tt>. 736 * @throws CamelExecutionException is thrown if the processing of the exchange failed 737 */ 738 public static <T> T extractFutureBody(CamelContext context, Future<?> future, Class<T> type) { 739 try { 740 return doExtractFutureBody(context, future.get(), type); 741 } catch (InterruptedException e) { 742 throw ObjectHelper.wrapRuntimeCamelException(e); 743 } catch (ExecutionException e) { 744 // execution failed due to an exception so rethrow the cause 745 throw ObjectHelper.wrapCamelExecutionException(null, e.getCause()); 746 } finally { 747 // its harmless to cancel if task is already completed 748 // and in any case we do not want to get hold of the task a 2nd time 749 // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book 750 future.cancel(true); 751 } 752 } 753 754 /** 755 * Extracts the body from the given future, that represents a handle to an asynchronous exchange. 756 * <p/> 757 * Will wait for the future task to complete, but waiting at most the timeout value. 758 * 759 * @param context the camel context 760 * @param future the future handle 761 * @param timeout timeout value 762 * @param unit timeout unit 763 * @param type the expected body response type 764 * @return the result body, can be <tt>null</tt>. 765 * @throws CamelExecutionException is thrown if the processing of the exchange failed 766 * @throws java.util.concurrent.TimeoutException is thrown if a timeout triggered 767 */ 768 public static <T> T extractFutureBody(CamelContext context, Future<?> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException { 769 try { 770 if (timeout > 0) { 771 return doExtractFutureBody(context, future.get(timeout, unit), type); 772 } else { 773 return doExtractFutureBody(context, future.get(), type); 774 } 775 } catch (InterruptedException e) { 776 // execution failed due interruption so rethrow the cause 777 throw ObjectHelper.wrapCamelExecutionException(null, e); 778 } catch (ExecutionException e) { 779 // execution failed due to an exception so rethrow the cause 780 throw ObjectHelper.wrapCamelExecutionException(null, e.getCause()); 781 } finally { 782 // its harmless to cancel if task is already completed 783 // and in any case we do not want to get hold of the task a 2nd time 784 // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book 785 future.cancel(true); 786 } 787 } 788 789 private static <T> T doExtractFutureBody(CamelContext context, Object result, Class<T> type) { 790 if (result == null) { 791 return null; 792 } 793 if (type.isAssignableFrom(result.getClass())) { 794 return type.cast(result); 795 } 796 if (result instanceof Exchange) { 797 Exchange exchange = (Exchange) result; 798 Object answer = ExchangeHelper.extractResultBody(exchange, exchange.getPattern()); 799 return context.getTypeConverter().convertTo(type, exchange, answer); 800 } 801 return context.getTypeConverter().convertTo(type, result); 802 } 803 804 /** 805 * @deprecated use org.apache.camel.CamelExchangeException.createExceptionMessage instead 806 */ 807 @Deprecated 808 public static String createExceptionMessage(String message, Exchange exchange, Throwable cause) { 809 return CamelExchangeException.createExceptionMessage(message, exchange, cause); 810 } 811 812 /** 813 * Strategy to prepare results before next iterator or when we are complete, 814 * which is done by copying OUT to IN, so there is only an IN as input 815 * for the next iteration. 816 * 817 * @param exchange the exchange to prepare 818 */ 819 public static void prepareOutToIn(Exchange exchange) { 820 // we are routing using pipes and filters so we need to manually copy OUT to IN 821 if (exchange.hasOut()) { 822 exchange.setIn(exchange.getOut()); 823 exchange.setOut(null); 824 } 825 } 826 827 /** 828 * Gets both the messageId and exchangeId to be used for logging purposes. 829 * <p/> 830 * Logging both ids, can help to correlate exchanges which may be redelivered messages 831 * from for example a JMS broker. 832 * 833 * @param exchange the exchange 834 * @return a log message with both the messageId and exchangeId 835 */ 836 public static String logIds(Exchange exchange) { 837 String msgId = exchange.hasOut() ? exchange.getOut().getMessageId() : exchange.getIn().getMessageId(); 838 return "(MessageId: " + msgId + " on ExchangeId: " + exchange.getExchangeId() + ")"; 839 } 840 841 /** 842 * Copies the exchange but the copy will be tied to the given context 843 * 844 * @param exchange the source exchange 845 * @param context the camel context 846 * @return a copy with the given camel context 847 */ 848 public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context) { 849 return copyExchangeAndSetCamelContext(exchange, context, true); 850 } 851 852 /** 853 * Copies the exchange but the copy will be tied to the given context 854 * 855 * @param exchange the source exchange 856 * @param context the camel context 857 * @param handover whether to handover on completions from the source to the copy 858 * @return a copy with the given camel context 859 */ 860 public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context, boolean handover) { 861 DefaultExchange answer = new DefaultExchange(context, exchange.getPattern()); 862 if (exchange.hasProperties()) { 863 answer.setProperties(safeCopy(exchange.getProperties())); 864 } 865 if (handover) { 866 // Need to hand over the completion for async invocation 867 exchange.handoverCompletions(answer); 868 } 869 answer.setIn(exchange.getIn().copy()); 870 if (exchange.hasOut()) { 871 answer.setOut(exchange.getOut().copy()); 872 } 873 answer.setException(exchange.getException()); 874 return answer; 875 } 876 877 /** 878 * Replaces the existing message with the new message 879 * 880 * @param exchange the exchange 881 * @param newMessage the new message 882 * @param outOnly whether to replace the message as OUT message 883 */ 884 public static void replaceMessage(Exchange exchange, Message newMessage, boolean outOnly) { 885 Message old = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); 886 if (outOnly || exchange.hasOut()) { 887 exchange.setOut(newMessage); 888 } else { 889 exchange.setIn(newMessage); 890 } 891 892 // need to de-reference old from the exchange so it can be GC 893 if (old instanceof MessageSupport) { 894 ((MessageSupport) old).setExchange(null); 895 } 896 } 897 898 /** 899 * Gets the original IN {@link Message} this Unit of Work was started with. 900 * <p/> 901 * The original message is only returned if the option {@link org.apache.camel.RuntimeConfiguration#isAllowUseOriginalMessage()} 902 * is enabled. If its disabled, then <tt>null</tt> is returned. 903 * 904 * @return the original IN {@link Message}, or <tt>null</tt> if using original message is disabled. 905 */ 906 public static Message getOriginalInMessage(Exchange exchange) { 907 Message answer = null; 908 909 // try parent first 910 UnitOfWork uow = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class); 911 if (uow != null) { 912 answer = uow.getOriginalInMessage(); 913 } 914 // fallback to the current exchange 915 if (answer == null) { 916 uow = exchange.getUnitOfWork(); 917 if (uow != null) { 918 answer = uow.getOriginalInMessage(); 919 } 920 } 921 return answer; 922 } 923 924 @SuppressWarnings("unchecked") 925 private static Map<String, Object> safeCopy(Map<String, Object> properties) { 926 if (properties == null) { 927 return null; 928 } 929 930 Map<String, Object> answer = new ConcurrentHashMap<String, Object>(properties); 931 932 // safe copy message history using a defensive copy 933 List<MessageHistory> history = (List<MessageHistory>) answer.remove(Exchange.MESSAGE_HISTORY); 934 if (history != null) { 935 answer.put(Exchange.MESSAGE_HISTORY, new LinkedList<>(history)); 936 } 937 938 return answer; 939 } 940}