001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.util; 018 019import java.util.ArrayList; 020import java.util.HashMap; 021import java.util.List; 022import java.util.Map; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.ExecutionException; 025import java.util.concurrent.Future; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.TimeoutException; 028 029import org.apache.camel.CamelContext; 030import org.apache.camel.CamelExchangeException; 031import org.apache.camel.CamelExecutionException; 032import org.apache.camel.Endpoint; 033import org.apache.camel.Exchange; 034import org.apache.camel.ExchangePattern; 035import org.apache.camel.InvalidPayloadException; 036import org.apache.camel.Message; 037import org.apache.camel.MessageHistory; 038import org.apache.camel.NoSuchBeanException; 039import org.apache.camel.NoSuchEndpointException; 040import org.apache.camel.NoSuchHeaderException; 041import org.apache.camel.NoSuchPropertyException; 042import org.apache.camel.NoTypeConversionAvailableException; 043import org.apache.camel.TypeConversionException; 044import org.apache.camel.TypeConverter; 045import org.apache.camel.impl.DefaultExchange; 046import org.apache.camel.impl.MessageSupport; 047import org.apache.camel.spi.UnitOfWork; 048 049/** 050 * Some helper methods for working with {@link Exchange} objects 051 * 052 * @version 053 */ 054public final class ExchangeHelper { 055 056 /** 057 * Utility classes should not have a public constructor. 058 */ 059 private ExchangeHelper() { 060 } 061 062 /** 063 * Extracts the Exchange.BINDING of the given type or null if not present 064 * 065 * @param exchange the message exchange 066 * @param type the expected binding type 067 * @return the binding object of the given type or null if it could not be found or converted 068 */ 069 public static <T> T getBinding(Exchange exchange, Class<T> type) { 070 return exchange != null ? exchange.getProperty(Exchange.BINDING, type) : null; 071 } 072 073 /** 074 * Attempts to resolve the endpoint for the given value 075 * 076 * @param exchange the message exchange being processed 077 * @param value the value which can be an {@link Endpoint} or an object 078 * which provides a String representation of an endpoint via 079 * {@link #toString()} 080 * @return the endpoint 081 * @throws NoSuchEndpointException if the endpoint cannot be resolved 082 */ 083 public static Endpoint resolveEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException { 084 Endpoint endpoint; 085 if (value instanceof Endpoint) { 086 endpoint = (Endpoint) value; 087 } else { 088 String uri = value.toString().trim(); 089 endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), uri); 090 } 091 return endpoint; 092 } 093 094 /** 095 * Gets the mandatory property of the exchange of the correct type 096 * 097 * @param exchange the exchange 098 * @param propertyName the property name 099 * @param type the type 100 * @return the property value 101 * @throws TypeConversionException is thrown if error during type conversion 102 * @throws NoSuchPropertyException is thrown if no property exists 103 */ 104 public static <T> T getMandatoryProperty(Exchange exchange, String propertyName, Class<T> type) throws NoSuchPropertyException { 105 T result = exchange.getProperty(propertyName, type); 106 if (result != null) { 107 return result; 108 } 109 throw new NoSuchPropertyException(exchange, propertyName, type); 110 } 111 112 /** 113 * Gets the mandatory inbound header of the correct type 114 * 115 * @param exchange the exchange 116 * @param headerName the header name 117 * @param type the type 118 * @return the header value 119 * @throws TypeConversionException is thrown if error during type conversion 120 * @throws NoSuchHeaderException is thrown if no headers exists 121 */ 122 public static <T> T getMandatoryHeader(Exchange exchange, String headerName, Class<T> type) throws TypeConversionException, NoSuchHeaderException { 123 T answer = exchange.getIn().getHeader(headerName, type); 124 if (answer == null) { 125 throw new NoSuchHeaderException(exchange, headerName, type); 126 } 127 return answer; 128 } 129 130 /** 131 * Returns the mandatory inbound message body of the correct type or throws 132 * an exception if it is not present 133 * 134 * @param exchange the exchange 135 * @return the body, is never <tt>null</tt> 136 * @throws InvalidPayloadException Is thrown if the body being <tt>null</tt> or wrong class type 137 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()} 138 */ 139 @Deprecated 140 public static Object getMandatoryInBody(Exchange exchange) throws InvalidPayloadException { 141 return exchange.getIn().getMandatoryBody(); 142 } 143 144 /** 145 * Returns the mandatory inbound message body of the correct type or throws 146 * an exception if it is not present 147 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)} 148 */ 149 @Deprecated 150 public static <T> T getMandatoryInBody(Exchange exchange, Class<T> type) throws InvalidPayloadException { 151 return exchange.getIn().getMandatoryBody(type); 152 } 153 154 /** 155 * Returns the mandatory outbound message body of the correct type or throws 156 * an exception if it is not present 157 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()} 158 */ 159 @Deprecated 160 public static Object getMandatoryOutBody(Exchange exchange) throws InvalidPayloadException { 161 return exchange.getOut().getMandatoryBody(); 162 } 163 164 /** 165 * Returns the mandatory outbound message body of the correct type or throws 166 * an exception if it is not present 167 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)} 168 */ 169 @Deprecated 170 public static <T> T getMandatoryOutBody(Exchange exchange, Class<T> type) throws InvalidPayloadException { 171 return exchange.getOut().getMandatoryBody(type); 172 } 173 174 /** 175 * Converts the value to the given expected type or throws an exception 176 * 177 * @return the converted value 178 * @throws TypeConversionException is thrown if error during type conversion 179 * @throws NoTypeConversionAvailableException} if no type converters exists to convert to the given type 180 */ 181 public static <T> T convertToMandatoryType(Exchange exchange, Class<T> type, Object value) 182 throws TypeConversionException, NoTypeConversionAvailableException { 183 CamelContext camelContext = exchange.getContext(); 184 ObjectHelper.notNull(camelContext, "CamelContext of Exchange"); 185 TypeConverter converter = camelContext.getTypeConverter(); 186 if (converter != null) { 187 return converter.mandatoryConvertTo(type, exchange, value); 188 } 189 throw new NoTypeConversionAvailableException(value, type); 190 } 191 192 /** 193 * Converts the value to the given expected type 194 * 195 * @return the converted value 196 * @throws org.apache.camel.TypeConversionException is thrown if error during type conversion 197 */ 198 public static <T> T convertToType(Exchange exchange, Class<T> type, Object value) throws TypeConversionException { 199 CamelContext camelContext = exchange.getContext(); 200 ObjectHelper.notNull(camelContext, "CamelContext of Exchange"); 201 TypeConverter converter = camelContext.getTypeConverter(); 202 if (converter != null) { 203 return converter.convertTo(type, exchange, value); 204 } 205 return null; 206 } 207 208 /** 209 * Creates a new instance and copies from the current message exchange so that it can be 210 * forwarded to another destination as a new instance. Unlike regular copy this operation 211 * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used 212 * for async messaging, where the original and copied exchange are independent. 213 * 214 * @param exchange original copy of the exchange 215 * @param handover whether the on completion callbacks should be handed over to the new copy. 216 */ 217 public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover) { 218 return createCorrelatedCopy(exchange, handover, false); 219 } 220 221 /** 222 * Creates a new instance and copies from the current message exchange so that it can be 223 * forwarded to another destination as a new instance. Unlike regular copy this operation 224 * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used 225 * for async messaging, where the original and copied exchange are independent. 226 * 227 * @param exchange original copy of the exchange 228 * @param handover whether the on completion callbacks should be handed over to the new copy. 229 * @param useSameMessageId whether to use same message id on the copy message. 230 */ 231 public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId) { 232 String id = exchange.getExchangeId(); 233 234 // make sure to do a safe copy as the correlated copy can be routed independently of the source. 235 Exchange copy = exchange.copy(true); 236 // do not reuse message id on copy 237 if (!useSameMessageId) { 238 if (copy.hasOut()) { 239 copy.getOut().setMessageId(null); 240 } 241 copy.getIn().setMessageId(null); 242 } 243 // do not share the unit of work 244 copy.setUnitOfWork(null); 245 // do not reuse the message id 246 // hand over on completion to the copy if we got any 247 UnitOfWork uow = exchange.getUnitOfWork(); 248 if (handover && uow != null) { 249 uow.handoverSynchronization(copy); 250 } 251 // set a correlation id so we can track back the original exchange 252 copy.setProperty(Exchange.CORRELATION_ID, id); 253 return copy; 254 } 255 256 /** 257 * Creates a new instance and copies from the current message exchange so that it can be 258 * forwarded to another destination as a new instance. 259 * 260 * @param exchange original copy of the exchange 261 * @param preserveExchangeId whether or not the exchange id should be preserved 262 * @return the copy 263 */ 264 public static Exchange createCopy(Exchange exchange, boolean preserveExchangeId) { 265 Exchange copy = exchange.copy(); 266 if (preserveExchangeId) { 267 // must preserve exchange id 268 copy.setExchangeId(exchange.getExchangeId()); 269 } 270 return copy; 271 } 272 273 /** 274 * Copies the results of a message exchange from the source exchange to the result exchange 275 * which will copy the message contents, exchange properties and the exception. 276 * Notice the {@link ExchangePattern} is <b>not</b> copied/altered. 277 * 278 * @param result the result exchange which will have the output and error state added 279 * @param source the source exchange which is not modified 280 */ 281 public static void copyResults(Exchange result, Exchange source) { 282 283 // -------------------------------------------------------------------- 284 // TODO: merge logic with that of copyResultsPreservePattern() 285 // -------------------------------------------------------------------- 286 287 if (result == source) { 288 // we just need to ensure MEP is as expected (eg copy result to OUT if out capable) 289 // and the result is not failed 290 if (result.getPattern() == ExchangePattern.InOptionalOut) { 291 // keep as is 292 } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) { 293 // copy IN to OUT as we expect a OUT response 294 result.getOut().copyFrom(source.getIn()); 295 } 296 return; 297 } 298 299 if (result != source) { 300 result.setException(source.getException()); 301 if (source.hasOut()) { 302 result.getOut().copyFrom(source.getOut()); 303 } else if (result.getPattern() == ExchangePattern.InOptionalOut) { 304 // special case where the result is InOptionalOut and with no OUT response 305 // so we should return null to indicate this fact 306 result.setOut(null); 307 } else { 308 // no results so lets copy the last input 309 // as the final processor on a pipeline might not 310 // have created any OUT; such as a mock:endpoint 311 // so lets assume the last IN is the OUT 312 if (result.getPattern().isOutCapable()) { 313 // only set OUT if its OUT capable 314 result.getOut().copyFrom(source.getIn()); 315 } else { 316 // if not replace IN instead to keep the MEP 317 result.getIn().copyFrom(source.getIn()); 318 // clear any existing OUT as the result is on the IN 319 if (result.hasOut()) { 320 result.setOut(null); 321 } 322 } 323 } 324 325 if (source.hasProperties()) { 326 result.getProperties().putAll(source.getProperties()); 327 } 328 } 329 } 330 331 /** 332 * Copies the <code>source</code> exchange to <code>target</code> exchange 333 * preserving the {@link ExchangePattern} of <code>target</code>. 334 * 335 * @param source source exchange. 336 * @param result target exchange. 337 */ 338 public static void copyResultsPreservePattern(Exchange result, Exchange source) { 339 340 // -------------------------------------------------------------------- 341 // TODO: merge logic with that of copyResults() 342 // -------------------------------------------------------------------- 343 344 if (result == source) { 345 // we just need to ensure MEP is as expected (eg copy result to OUT if out capable) 346 // and the result is not failed 347 if (result.getPattern() == ExchangePattern.InOptionalOut) { 348 // keep as is 349 } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) { 350 // copy IN to OUT as we expect a OUT response 351 result.getOut().copyFrom(source.getIn()); 352 } 353 return; 354 } 355 356 // copy in message 357 result.getIn().copyFrom(source.getIn()); 358 359 // copy out message 360 if (source.hasOut()) { 361 // exchange pattern sensitive 362 Message resultMessage = source.getOut().isFault() ? result.getOut() : getResultMessage(result); 363 resultMessage.copyFrom(source.getOut()); 364 } 365 366 // copy exception 367 result.setException(source.getException()); 368 369 // copy properties 370 if (source.hasProperties()) { 371 result.getProperties().putAll(source.getProperties()); 372 } 373 } 374 375 /** 376 * Returns the message where to write results in an 377 * exchange-pattern-sensitive way. 378 * 379 * @param exchange message exchange. 380 * @return result message. 381 */ 382 public static Message getResultMessage(Exchange exchange) { 383 if (exchange.getPattern().isOutCapable()) { 384 return exchange.getOut(); 385 } else { 386 return exchange.getIn(); 387 } 388 } 389 390 /** 391 * Returns true if the given exchange pattern (if defined) can support OUT messages 392 * 393 * @param exchange the exchange to interrogate 394 * @return true if the exchange is defined as an {@link ExchangePattern} which supports 395 * OUT messages 396 */ 397 public static boolean isOutCapable(Exchange exchange) { 398 ExchangePattern pattern = exchange.getPattern(); 399 return pattern != null && pattern.isOutCapable(); 400 } 401 402 /** 403 * Creates a new instance of the given type from the injector 404 * 405 * @param exchange the exchange 406 * @param type the given type 407 * @return the created instance of the given type 408 */ 409 public static <T> T newInstance(Exchange exchange, Class<T> type) { 410 return exchange.getContext().getInjector().newInstance(type); 411 } 412 413 /** 414 * Creates a Map of the variables which are made available to a script or template 415 * 416 * @param exchange the exchange to make available 417 * @return a Map populated with the require variables 418 */ 419 @Deprecated 420 public static Map<String, Object> createVariableMap(Exchange exchange) { 421 Map<String, Object> answer = new HashMap<String, Object>(); 422 populateVariableMap(exchange, answer, true); 423 return answer; 424 } 425 426 /** 427 * Creates a Map of the variables which are made available to a script or template 428 * 429 * @param exchange the exchange to make available 430 * @param allowContextMapAll whether to allow access to all context map or not 431 * (prefer to use false due to security reasons preferred to only allow access to body/headers) 432 * @return a Map populated with the require variables 433 */ 434 public static Map<String, Object> createVariableMap(Exchange exchange, boolean allowContextMapAll) { 435 Map<String, Object> answer = new HashMap<>(); 436 populateVariableMap(exchange, answer, allowContextMapAll); 437 return answer; 438 } 439 440 /** 441 * Populates the Map with the variables which are made available to a script or template 442 * 443 * @param exchange the exchange to make available 444 * @param map the map to populate 445 */ 446 @Deprecated 447 public static void populateVariableMap(Exchange exchange, Map<String, Object> map) { 448 populateVariableMap(exchange, map, true); 449 } 450 451 /** 452 * Populates the Map with the variables which are made available to a script or template 453 * 454 * @param exchange the exchange to make available 455 * @param map the map to populate 456 * @param allowContextMapAll whether to allow access to all context map or not 457 * (prefer to use false due to security reasons preferred to only allow access to body/headers) 458 */ 459 public static void populateVariableMap(Exchange exchange, Map<String, Object> map, boolean allowContextMapAll) { 460 Message in = exchange.getIn(); 461 map.put("headers", in.getHeaders()); 462 map.put("body", in.getBody()); 463 if (allowContextMapAll) { 464 map.put("in", in); 465 map.put("exchange", exchange); 466 map.put("request", in); 467 if (isOutCapable(exchange)) { 468 // if we are out capable then set out and response as well 469 // however only grab OUT if it exists, otherwise reuse IN 470 // this prevents side effects to alter the Exchange if we force creating an OUT message 471 Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); 472 map.put("out", msg); 473 map.put("response", msg); 474 } 475 map.put("camelContext", exchange.getContext()); 476 } 477 } 478 479 /** 480 * Returns the MIME content type on the input message or null if one is not defined 481 * 482 * @param exchange the exchange 483 * @return the MIME content type 484 */ 485 public static String getContentType(Exchange exchange) { 486 return MessageHelper.getContentType(exchange.getIn()); 487 } 488 489 /** 490 * Returns the MIME content encoding on the input message or null if one is not defined 491 * 492 * @param exchange the exchange 493 * @return the MIME content encoding 494 */ 495 public static String getContentEncoding(Exchange exchange) { 496 return MessageHelper.getContentEncoding(exchange.getIn()); 497 } 498 499 /** 500 * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found 501 * 502 * @param exchange the exchange 503 * @param name the bean name 504 * @return the bean 505 * @throws NoSuchBeanException if no bean could be found in the registry 506 */ 507 public static Object lookupMandatoryBean(Exchange exchange, String name) throws NoSuchBeanException { 508 Object value = lookupBean(exchange, name); 509 if (value == null) { 510 throw new NoSuchBeanException(name); 511 } 512 return value; 513 } 514 515 /** 516 * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found 517 * 518 * @param exchange the exchange 519 * @param name the bean name 520 * @param type the expected bean type 521 * @return the bean 522 * @throws NoSuchBeanException if no bean could be found in the registry 523 */ 524 public static <T> T lookupMandatoryBean(Exchange exchange, String name, Class<T> type) { 525 T value = lookupBean(exchange, name, type); 526 if (value == null) { 527 throw new NoSuchBeanException(name); 528 } 529 return value; 530 } 531 532 /** 533 * Performs a lookup in the registry of the bean name 534 * 535 * @param exchange the exchange 536 * @param name the bean name 537 * @return the bean, or <tt>null</tt> if no bean could be found 538 */ 539 public static Object lookupBean(Exchange exchange, String name) { 540 return exchange.getContext().getRegistry().lookupByName(name); 541 } 542 543 /** 544 * Performs a lookup in the registry of the bean name and type 545 * 546 * @param exchange the exchange 547 * @param name the bean name 548 * @param type the expected bean type 549 * @return the bean, or <tt>null</tt> if no bean could be found 550 */ 551 public static <T> T lookupBean(Exchange exchange, String name, Class<T> type) { 552 return exchange.getContext().getRegistry().lookupByNameAndType(name, type); 553 } 554 555 /** 556 * Returns the first exchange in the given collection of exchanges which has the same exchange ID as the one given 557 * or null if none could be found 558 * 559 * @param exchanges the exchanges 560 * @param exchangeId the exchangeId to find 561 * @return matching exchange, or <tt>null</tt> if none found 562 */ 563 public static Exchange getExchangeById(Iterable<Exchange> exchanges, String exchangeId) { 564 for (Exchange exchange : exchanges) { 565 String id = exchange.getExchangeId(); 566 if (id != null && id.equals(exchangeId)) { 567 return exchange; 568 } 569 } 570 return null; 571 } 572 573 /** 574 * Prepares the exchanges for aggregation. 575 * <p/> 576 * This implementation will copy the OUT body to the IN body so when you do 577 * aggregation the body is <b>only</b> in the IN body to avoid confusing end users. 578 * 579 * @param oldExchange the old exchange 580 * @param newExchange the new exchange 581 */ 582 public static void prepareAggregation(Exchange oldExchange, Exchange newExchange) { 583 // move body/header from OUT to IN 584 if (oldExchange != null) { 585 if (oldExchange.hasOut()) { 586 oldExchange.setIn(oldExchange.getOut()); 587 oldExchange.setOut(null); 588 } 589 } 590 591 if (newExchange != null) { 592 if (newExchange.hasOut()) { 593 newExchange.setIn(newExchange.getOut()); 594 newExchange.setOut(null); 595 } 596 } 597 } 598 599 /** 600 * Checks whether the exchange has been failure handed 601 * 602 * @param exchange the exchange 603 * @return <tt>true</tt> if failure handled, <tt>false</tt> otherwise 604 */ 605 public static boolean isFailureHandled(Exchange exchange) { 606 return exchange.getProperty(Exchange.FAILURE_HANDLED, false, Boolean.class); 607 } 608 609 /** 610 * Checks whether the exchange {@link UnitOfWork} is exhausted 611 * 612 * @param exchange the exchange 613 * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise 614 */ 615 public static boolean isUnitOfWorkExhausted(Exchange exchange) { 616 return exchange.getProperty(Exchange.UNIT_OF_WORK_EXHAUSTED, false, Boolean.class); 617 } 618 619 /** 620 * Sets the exchange to be failure handled. 621 * 622 * @param exchange the exchange 623 */ 624 public static void setFailureHandled(Exchange exchange) { 625 exchange.setProperty(Exchange.FAILURE_HANDLED, Boolean.TRUE); 626 // clear exception since its failure handled 627 exchange.setException(null); 628 } 629 630 /** 631 * Checks whether the exchange is redelivery exhausted 632 * 633 * @param exchange the exchange 634 * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise 635 */ 636 public static boolean isRedeliveryExhausted(Exchange exchange) { 637 return exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class); 638 } 639 640 /** 641 * Checks whether the exchange {@link UnitOfWork} is redelivered 642 * 643 * @param exchange the exchange 644 * @return <tt>true</tt> if redelivered, <tt>false</tt> otherwise 645 */ 646 public static boolean isRedelivered(Exchange exchange) { 647 return exchange.getIn().hasHeaders() && exchange.getIn().getHeader(Exchange.REDELIVERED, false, Boolean.class); 648 } 649 650 /** 651 * Checks whether the exchange {@link UnitOfWork} has been interrupted during processing 652 * 653 * @param exchange the exchange 654 * @return <tt>true</tt> if interrupted, <tt>false</tt> otherwise 655 */ 656 public static boolean isInterrupted(Exchange exchange) { 657 return exchange.getException(InterruptedException.class) != null; 658 } 659 660 /** 661 * Check whether or not stream caching is enabled for the given route or globally. 662 * 663 * @param exchange the exchange 664 * @return <tt>true</tt> if enabled, <tt>false</tt> otherwise 665 */ 666 public static boolean isStreamCachingEnabled(final Exchange exchange) { 667 if (exchange.getFromRouteId() == null) { 668 return exchange.getContext().getStreamCachingStrategy().isEnabled(); 669 } else { 670 return exchange.getContext().getRoute(exchange.getFromRouteId()).getRouteContext().isStreamCaching(); 671 } 672 } 673 674 /** 675 * Extracts the body from the given exchange. 676 * <p/> 677 * If the exchange pattern is provided it will try to honor it and retrieve the body 678 * from either IN or OUT according to the pattern. 679 * 680 * @param exchange the exchange 681 * @param pattern exchange pattern if given, can be <tt>null</tt> 682 * @return the result body, can be <tt>null</tt>. 683 * @throws CamelExecutionException is thrown if the processing of the exchange failed 684 */ 685 public static Object extractResultBody(Exchange exchange, ExchangePattern pattern) { 686 Object answer = null; 687 if (exchange != null) { 688 // rethrow if there was an exception during execution 689 if (exchange.getException() != null) { 690 throw ObjectHelper.wrapCamelExecutionException(exchange, exchange.getException()); 691 } 692 693 // result could have a fault message 694 if (hasFaultMessage(exchange)) { 695 Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); 696 answer = msg.getBody(); 697 return answer; 698 } 699 700 // okay no fault then return the response according to the pattern 701 // try to honor pattern if provided 702 boolean notOut = pattern != null && !pattern.isOutCapable(); 703 boolean hasOut = exchange.hasOut(); 704 if (hasOut && !notOut) { 705 // we have a response in out and the pattern is out capable 706 answer = exchange.getOut().getBody(); 707 } else if (!hasOut && exchange.getPattern() == ExchangePattern.InOptionalOut) { 708 // special case where the result is InOptionalOut and with no OUT response 709 // so we should return null to indicate this fact 710 answer = null; 711 } else { 712 // use IN as the response 713 answer = exchange.getIn().getBody(); 714 } 715 } 716 return answer; 717 } 718 719 /** 720 * Tests whether the exchange has a fault message set and that its not null. 721 * 722 * @param exchange the exchange 723 * @return <tt>true</tt> if fault message exists 724 */ 725 public static boolean hasFaultMessage(Exchange exchange) { 726 Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); 727 return msg.isFault() && msg.getBody() != null; 728 } 729 730 /** 731 * Tests whether the exchange has already been handled by the error handler 732 * 733 * @param exchange the exchange 734 * @return <tt>true</tt> if handled already by error handler, <tt>false</tt> otherwise 735 */ 736 public static boolean hasExceptionBeenHandledByErrorHandler(Exchange exchange) { 737 return Boolean.TRUE.equals(exchange.getProperty(Exchange.ERRORHANDLER_HANDLED)); 738 } 739 740 /** 741 * Extracts the body from the given future, that represents a handle to an asynchronous exchange. 742 * <p/> 743 * Will wait until the future task is complete. 744 * 745 * @param context the camel context 746 * @param future the future handle 747 * @param type the expected body response type 748 * @return the result body, can be <tt>null</tt>. 749 * @throws CamelExecutionException is thrown if the processing of the exchange failed 750 */ 751 public static <T> T extractFutureBody(CamelContext context, Future<Object> future, Class<T> type) { 752 try { 753 return doExtractFutureBody(context, future.get(), type); 754 } catch (InterruptedException e) { 755 throw ObjectHelper.wrapRuntimeCamelException(e); 756 } catch (ExecutionException e) { 757 // execution failed due to an exception so rethrow the cause 758 throw ObjectHelper.wrapCamelExecutionException(null, e.getCause()); 759 } finally { 760 // its harmless to cancel if task is already completed 761 // and in any case we do not want to get hold of the task a 2nd time 762 // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book 763 future.cancel(true); 764 } 765 } 766 767 /** 768 * Extracts the body from the given future, that represents a handle to an asynchronous exchange. 769 * <p/> 770 * Will wait for the future task to complete, but waiting at most the timeout value. 771 * 772 * @param context the camel context 773 * @param future the future handle 774 * @param timeout timeout value 775 * @param unit timeout unit 776 * @param type the expected body response type 777 * @return the result body, can be <tt>null</tt>. 778 * @throws CamelExecutionException is thrown if the processing of the exchange failed 779 * @throws java.util.concurrent.TimeoutException is thrown if a timeout triggered 780 */ 781 public static <T> T extractFutureBody(CamelContext context, Future<Object> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException { 782 try { 783 if (timeout > 0) { 784 return doExtractFutureBody(context, future.get(timeout, unit), type); 785 } else { 786 return doExtractFutureBody(context, future.get(), type); 787 } 788 } catch (InterruptedException e) { 789 // execution failed due interruption so rethrow the cause 790 throw ObjectHelper.wrapCamelExecutionException(null, e); 791 } catch (ExecutionException e) { 792 // execution failed due to an exception so rethrow the cause 793 throw ObjectHelper.wrapCamelExecutionException(null, e.getCause()); 794 } finally { 795 // its harmless to cancel if task is already completed 796 // and in any case we do not want to get hold of the task a 2nd time 797 // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book 798 future.cancel(true); 799 } 800 } 801 802 private static <T> T doExtractFutureBody(CamelContext context, Object result, Class<T> type) { 803 if (result == null) { 804 return null; 805 } 806 if (type.isAssignableFrom(result.getClass())) { 807 return type.cast(result); 808 } 809 if (result instanceof Exchange) { 810 Exchange exchange = (Exchange) result; 811 Object answer = ExchangeHelper.extractResultBody(exchange, exchange.getPattern()); 812 return context.getTypeConverter().convertTo(type, exchange, answer); 813 } 814 return context.getTypeConverter().convertTo(type, result); 815 } 816 817 /** 818 * @deprecated use org.apache.camel.CamelExchangeException.createExceptionMessage instead 819 */ 820 @Deprecated 821 public static String createExceptionMessage(String message, Exchange exchange, Throwable cause) { 822 return CamelExchangeException.createExceptionMessage(message, exchange, cause); 823 } 824 825 /** 826 * Strategy to prepare results before next iterator or when we are complete, 827 * which is done by copying OUT to IN, so there is only an IN as input 828 * for the next iteration. 829 * 830 * @param exchange the exchange to prepare 831 */ 832 public static void prepareOutToIn(Exchange exchange) { 833 // we are routing using pipes and filters so we need to manually copy OUT to IN 834 if (exchange.hasOut()) { 835 exchange.setIn(exchange.getOut()); 836 exchange.setOut(null); 837 } 838 } 839 840 /** 841 * Gets both the messageId and exchangeId to be used for logging purposes. 842 * <p/> 843 * Logging both ids, can help to correlate exchanges which may be redelivered messages 844 * from for example a JMS broker. 845 * 846 * @param exchange the exchange 847 * @return a log message with both the messageId and exchangeId 848 */ 849 public static String logIds(Exchange exchange) { 850 String msgId = exchange.hasOut() ? exchange.getOut().getMessageId() : exchange.getIn().getMessageId(); 851 return "(MessageId: " + msgId + " on ExchangeId: " + exchange.getExchangeId() + ")"; 852 } 853 854 /** 855 * Copies the exchange but the copy will be tied to the given context 856 * 857 * @param exchange the source exchange 858 * @param context the camel context 859 * @return a copy with the given camel context 860 */ 861 public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context) { 862 return copyExchangeAndSetCamelContext(exchange, context, true); 863 } 864 865 /** 866 * Copies the exchange but the copy will be tied to the given context 867 * 868 * @param exchange the source exchange 869 * @param context the camel context 870 * @param handover whether to handover on completions from the source to the copy 871 * @return a copy with the given camel context 872 */ 873 public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context, boolean handover) { 874 DefaultExchange answer = new DefaultExchange(context, exchange.getPattern()); 875 if (exchange.hasProperties()) { 876 answer.setProperties(safeCopy(exchange.getProperties())); 877 } 878 if (handover) { 879 // Need to hand over the completion for async invocation 880 exchange.handoverCompletions(answer); 881 } 882 answer.setIn(exchange.getIn().copy()); 883 if (exchange.hasOut()) { 884 answer.setOut(exchange.getOut().copy()); 885 } 886 answer.setException(exchange.getException()); 887 return answer; 888 } 889 890 /** 891 * Replaces the existing message with the new message 892 * 893 * @param exchange the exchange 894 * @param newMessage the new message 895 * @param outOnly whether to replace the message as OUT message 896 */ 897 public static void replaceMessage(Exchange exchange, Message newMessage, boolean outOnly) { 898 Message old = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); 899 if (outOnly || exchange.hasOut()) { 900 exchange.setOut(newMessage); 901 } else { 902 exchange.setIn(newMessage); 903 } 904 905 // need to de-reference old from the exchange so it can be GC 906 if (old instanceof MessageSupport) { 907 ((MessageSupport) old).setExchange(null); 908 } 909 } 910 911 /** 912 * Gets the original IN {@link Message} this Unit of Work was started with. 913 * <p/> 914 * The original message is only returned if the option {@link org.apache.camel.RuntimeConfiguration#isAllowUseOriginalMessage()} 915 * is enabled. If its disabled, then <tt>null</tt> is returned. 916 * 917 * @return the original IN {@link Message}, or <tt>null</tt> if using original message is disabled. 918 */ 919 public static Message getOriginalInMessage(Exchange exchange) { 920 Message answer = null; 921 922 // try parent first 923 UnitOfWork uow = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class); 924 if (uow != null) { 925 answer = uow.getOriginalInMessage(); 926 } 927 // fallback to the current exchange 928 if (answer == null) { 929 uow = exchange.getUnitOfWork(); 930 if (uow != null) { 931 answer = uow.getOriginalInMessage(); 932 } 933 } 934 return answer; 935 } 936 937 @SuppressWarnings("unchecked") 938 private static Map<String, Object> safeCopy(Map<String, Object> properties) { 939 if (properties == null) { 940 return null; 941 } 942 943 Map<String, Object> answer = new ConcurrentHashMap<String, Object>(properties); 944 945 // safe copy message history using a defensive copy 946 List<MessageHistory> history = (List<MessageHistory>) answer.remove(Exchange.MESSAGE_HISTORY); 947 if (history != null) { 948 answer.put(Exchange.MESSAGE_HISTORY, new ArrayList<MessageHistory>(history)); 949 } 950 951 return answer; 952 } 953}