001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.util; 018 019 import java.util.HashMap; 020 import java.util.Map; 021 import java.util.concurrent.ConcurrentHashMap; 022 import java.util.concurrent.ExecutionException; 023 import java.util.concurrent.Future; 024 import java.util.concurrent.TimeUnit; 025 import java.util.concurrent.TimeoutException; 026 027 import org.apache.camel.CamelContext; 028 import org.apache.camel.CamelExchangeException; 029 import org.apache.camel.CamelExecutionException; 030 import org.apache.camel.Endpoint; 031 import org.apache.camel.Exchange; 032 import org.apache.camel.ExchangePattern; 033 import org.apache.camel.InvalidPayloadException; 034 import org.apache.camel.Message; 035 import org.apache.camel.NoSuchBeanException; 036 import org.apache.camel.NoSuchEndpointException; 037 import org.apache.camel.NoSuchHeaderException; 038 import org.apache.camel.NoSuchPropertyException; 039 import org.apache.camel.NoTypeConversionAvailableException; 040 import org.apache.camel.TypeConversionException; 041 import org.apache.camel.TypeConverter; 042 import org.apache.camel.impl.DefaultExchange; 043 import org.apache.camel.spi.UnitOfWork; 044 045 /** 046 * Some helper methods for working with {@link Exchange} objects 047 * 048 * @version 049 */ 050 public final class ExchangeHelper { 051 052 /** 053 * Utility classes should not have a public constructor. 054 */ 055 private ExchangeHelper() { 056 } 057 058 /** 059 * Extracts the Exchange.BINDING of the given type or null if not present 060 * 061 * @param exchange the message exchange 062 * @param type the expected binding type 063 * @return the binding object of the given type or null if it could not be found or converted 064 */ 065 public static <T> T getBinding(Exchange exchange, Class<T> type) { 066 return exchange != null ? exchange.getProperty(Exchange.BINDING, type) : null; 067 } 068 069 /** 070 * Attempts to resolve the endpoint for the given value 071 * 072 * @param exchange the message exchange being processed 073 * @param value the value which can be an {@link Endpoint} or an object 074 * which provides a String representation of an endpoint via 075 * {@link #toString()} 076 * @return the endpoint 077 * @throws NoSuchEndpointException if the endpoint cannot be resolved 078 */ 079 public static Endpoint resolveEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException { 080 Endpoint endpoint; 081 if (value instanceof Endpoint) { 082 endpoint = (Endpoint) value; 083 } else { 084 String uri = value.toString().trim(); 085 endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), uri); 086 } 087 return endpoint; 088 } 089 090 /** 091 * Gets the mandatory property of the exchange of the correct type 092 * 093 * @param exchange the exchange 094 * @param propertyName the property name 095 * @param type the type 096 * @return the property value 097 * @throws TypeConversionException is thrown if error during type conversion 098 * @throws NoSuchPropertyException is thrown if no property exists 099 */ 100 public static <T> T getMandatoryProperty(Exchange exchange, String propertyName, Class<T> type) throws NoSuchPropertyException { 101 T result = exchange.getProperty(propertyName, type); 102 if (result != null) { 103 return result; 104 } 105 throw new NoSuchPropertyException(exchange, propertyName, type); 106 } 107 108 /** 109 * Gets the mandatory inbound header of the correct type 110 * 111 * @param exchange the exchange 112 * @param headerName the header name 113 * @param type the type 114 * @return the header value 115 * @throws TypeConversionException is thrown if error during type conversion 116 * @throws NoSuchHeaderException is thrown if no headers exists 117 */ 118 public static <T> T getMandatoryHeader(Exchange exchange, String headerName, Class<T> type) throws TypeConversionException, NoSuchHeaderException { 119 T answer = exchange.getIn().getHeader(headerName, type); 120 if (answer == null) { 121 throw new NoSuchHeaderException(exchange, headerName, type); 122 } 123 return answer; 124 } 125 126 /** 127 * Returns the mandatory inbound message body of the correct type or throws 128 * an exception if it is not present 129 * 130 * @param exchange the exchange 131 * @return the body, is never <tt>null</tt> 132 * @throws InvalidPayloadException Is thrown if the body being <tt>null</tt> or wrong class type 133 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()} 134 */ 135 @Deprecated 136 public static Object getMandatoryInBody(Exchange exchange) throws InvalidPayloadException { 137 return exchange.getIn().getMandatoryBody(); 138 } 139 140 /** 141 * Returns the mandatory inbound message body of the correct type or throws 142 * an exception if it is not present 143 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)} 144 */ 145 @Deprecated 146 public static <T> T getMandatoryInBody(Exchange exchange, Class<T> type) throws InvalidPayloadException { 147 return exchange.getIn().getMandatoryBody(type); 148 } 149 150 /** 151 * Returns the mandatory outbound message body of the correct type or throws 152 * an exception if it is not present 153 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()} 154 */ 155 @Deprecated 156 public static Object getMandatoryOutBody(Exchange exchange) throws InvalidPayloadException { 157 return exchange.getOut().getMandatoryBody(); 158 } 159 160 /** 161 * Returns the mandatory outbound message body of the correct type or throws 162 * an exception if it is not present 163 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)} 164 */ 165 @Deprecated 166 public static <T> T getMandatoryOutBody(Exchange exchange, Class<T> type) throws InvalidPayloadException { 167 return exchange.getOut().getMandatoryBody(type); 168 } 169 170 /** 171 * Converts the value to the given expected type or throws an exception 172 * 173 * @return the converted value 174 * @throws TypeConversionException is thrown if error during type conversion 175 * @throws NoTypeConversionAvailableException} if no type converters exists to convert to the given type 176 */ 177 public static <T> T convertToMandatoryType(Exchange exchange, Class<T> type, Object value) 178 throws TypeConversionException, NoTypeConversionAvailableException { 179 CamelContext camelContext = exchange.getContext(); 180 ObjectHelper.notNull(camelContext, "CamelContext of Exchange"); 181 TypeConverter converter = camelContext.getTypeConverter(); 182 if (converter != null) { 183 return converter.mandatoryConvertTo(type, exchange, value); 184 } 185 throw new NoTypeConversionAvailableException(value, type); 186 } 187 188 /** 189 * Converts the value to the given expected type 190 * 191 * @return the converted value 192 * @throws org.apache.camel.TypeConversionException is thrown if error during type conversion 193 */ 194 public static <T> T convertToType(Exchange exchange, Class<T> type, Object value) throws TypeConversionException { 195 CamelContext camelContext = exchange.getContext(); 196 ObjectHelper.notNull(camelContext, "CamelContext of Exchange"); 197 TypeConverter converter = camelContext.getTypeConverter(); 198 if (converter != null) { 199 return converter.convertTo(type, exchange, value); 200 } 201 return null; 202 } 203 204 /** 205 * Creates a new instance and copies from the current message exchange so that it can be 206 * forwarded to another destination as a new instance. Unlike regular copy this operation 207 * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used 208 * for async messaging, where the original and copied exchange are independent. 209 * 210 * @param exchange original copy of the exchange 211 * @param handover whether the on completion callbacks should be handed over to the new copy. 212 */ 213 public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover) { 214 String id = exchange.getExchangeId(); 215 216 Exchange copy = exchange.copy(); 217 // do not share the unit of work 218 copy.setUnitOfWork(null); 219 // hand over on completion to the copy if we got any 220 UnitOfWork uow = exchange.getUnitOfWork(); 221 if (handover && uow != null) { 222 uow.handoverSynchronization(copy); 223 } 224 // set a correlation id so we can track back the original exchange 225 copy.setProperty(Exchange.CORRELATION_ID, id); 226 return copy; 227 } 228 229 /** 230 * Creates a new instance and copies from the current message exchange so that it can be 231 * forwarded to another destination as a new instance. 232 * 233 * @param exchange original copy of the exchange 234 * @param preserveExchangeId whether or not the exchange id should be preserved 235 * @return the copy 236 */ 237 public static Exchange createCopy(Exchange exchange, boolean preserveExchangeId) { 238 Exchange copy = exchange.copy(); 239 if (preserveExchangeId) { 240 // must preserve exchange id 241 copy.setExchangeId(exchange.getExchangeId()); 242 } 243 return copy; 244 } 245 246 /** 247 * Copies the results of a message exchange from the source exchange to the result exchange 248 * which will copy the out and fault message contents and the exception 249 * 250 * @param result the result exchange which will have the output and error state added 251 * @param source the source exchange which is not modified 252 */ 253 public static void copyResults(Exchange result, Exchange source) { 254 255 // -------------------------------------------------------------------- 256 // TODO: merge logic with that of copyResultsPreservePattern() 257 // -------------------------------------------------------------------- 258 259 if (result == source) { 260 // we just need to ensure MEP is as expected (eg copy result to OUT if out capable) 261 // and the result is not failed 262 if (result.getPattern() == ExchangePattern.InOptionalOut) { 263 // keep as is 264 } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) { 265 // copy IN to OUT as we expect a OUT response 266 result.getOut().copyFrom(source.getIn()); 267 } 268 return; 269 } 270 271 if (result != source) { 272 result.setException(source.getException()); 273 if (source.hasOut()) { 274 result.getOut().copyFrom(source.getOut()); 275 } else if (result.getPattern() == ExchangePattern.InOptionalOut) { 276 // special case where the result is InOptionalOut and with no OUT response 277 // so we should return null to indicate this fact 278 result.setOut(null); 279 } else { 280 // no results so lets copy the last input 281 // as the final processor on a pipeline might not 282 // have created any OUT; such as a mock:endpoint 283 // so lets assume the last IN is the OUT 284 if (result.getPattern().isOutCapable()) { 285 // only set OUT if its OUT capable 286 result.getOut().copyFrom(source.getIn()); 287 } else { 288 // if not replace IN instead to keep the MEP 289 result.getIn().copyFrom(source.getIn()); 290 // clear any existing OUT as the result is on the IN 291 if (result.hasOut()) { 292 result.setOut(null); 293 } 294 } 295 } 296 297 if (source.hasProperties()) { 298 result.getProperties().putAll(source.getProperties()); 299 } 300 } 301 } 302 303 /** 304 * Copies the <code>source</code> exchange to <code>target</code> exchange 305 * preserving the {@link ExchangePattern} of <code>target</code>. 306 * 307 * @param source source exchange. 308 * @param result target exchange. 309 */ 310 public static void copyResultsPreservePattern(Exchange result, Exchange source) { 311 312 // -------------------------------------------------------------------- 313 // TODO: merge logic with that of copyResults() 314 // -------------------------------------------------------------------- 315 316 if (result == source) { 317 // we just need to ensure MEP is as expected (eg copy result to OUT if out capable) 318 // and the result is not failed 319 if (result.getPattern() == ExchangePattern.InOptionalOut) { 320 // keep as is 321 } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) { 322 // copy IN to OUT as we expect a OUT response 323 result.getOut().copyFrom(source.getIn()); 324 } 325 return; 326 } 327 328 // copy in message 329 result.getIn().copyFrom(source.getIn()); 330 331 // copy out message 332 if (source.hasOut()) { 333 // exchange pattern sensitive 334 Message resultMessage = source.getOut().isFault() ? result.getOut() : getResultMessage(result); 335 resultMessage.copyFrom(source.getOut()); 336 } 337 338 // copy exception 339 result.setException(source.getException()); 340 341 // copy properties 342 if (source.hasProperties()) { 343 result.getProperties().putAll(source.getProperties()); 344 } 345 } 346 347 /** 348 * Returns the message where to write results in an 349 * exchange-pattern-sensitive way. 350 * 351 * @param exchange message exchange. 352 * @return result message. 353 */ 354 public static Message getResultMessage(Exchange exchange) { 355 if (exchange.getPattern().isOutCapable()) { 356 return exchange.getOut(); 357 } else { 358 return exchange.getIn(); 359 } 360 } 361 362 /** 363 * Returns true if the given exchange pattern (if defined) can support OUT messages 364 * 365 * @param exchange the exchange to interrogate 366 * @return true if the exchange is defined as an {@link ExchangePattern} which supports 367 * OUT messages 368 */ 369 public static boolean isOutCapable(Exchange exchange) { 370 ExchangePattern pattern = exchange.getPattern(); 371 return pattern != null && pattern.isOutCapable(); 372 } 373 374 /** 375 * Creates a new instance of the given type from the injector 376 * 377 * @param exchange the exchange 378 * @param type the given type 379 * @return the created instance of the given type 380 */ 381 public static <T> T newInstance(Exchange exchange, Class<T> type) { 382 return exchange.getContext().getInjector().newInstance(type); 383 } 384 385 /** 386 * Creates a Map of the variables which are made available to a script or template 387 * 388 * @param exchange the exchange to make available 389 * @return a Map populated with the require variables 390 */ 391 public static Map<String, Object> createVariableMap(Exchange exchange) { 392 Map<String, Object> answer = new HashMap<String, Object>(); 393 populateVariableMap(exchange, answer); 394 return answer; 395 } 396 397 /** 398 * Populates the Map with the variables which are made available to a script or template 399 * 400 * @param exchange the exchange to make available 401 * @param map the map to populate 402 */ 403 public static void populateVariableMap(Exchange exchange, Map<String, Object> map) { 404 map.put("exchange", exchange); 405 Message in = exchange.getIn(); 406 map.put("in", in); 407 map.put("request", in); 408 map.put("headers", in.getHeaders()); 409 map.put("body", in.getBody()); 410 if (isOutCapable(exchange)) { 411 // if we are out capable then set out and response as well 412 // however only grab OUT if it exists, otherwise reuse IN 413 // this prevents side effects to alter the Exchange if we force creating an OUT message 414 Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); 415 map.put("out", msg); 416 map.put("response", msg); 417 } 418 map.put("camelContext", exchange.getContext()); 419 } 420 421 /** 422 * Returns the MIME content type on the input message or null if one is not defined 423 * 424 * @param exchange the exchange 425 * @return the MIME content type 426 */ 427 public static String getContentType(Exchange exchange) { 428 return MessageHelper.getContentType(exchange.getIn()); 429 } 430 431 /** 432 * Returns the MIME content encoding on the input message or null if one is not defined 433 * 434 * @param exchange the exchange 435 * @return the MIME content encoding 436 */ 437 public static String getContentEncoding(Exchange exchange) { 438 return MessageHelper.getContentEncoding(exchange.getIn()); 439 } 440 441 /** 442 * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found 443 * 444 * @param exchange the exchange 445 * @param name the bean name 446 * @return the bean 447 * @throws NoSuchBeanException if no bean could be found in the registry 448 */ 449 public static Object lookupMandatoryBean(Exchange exchange, String name) throws NoSuchBeanException { 450 Object value = lookupBean(exchange, name); 451 if (value == null) { 452 throw new NoSuchBeanException(name); 453 } 454 return value; 455 } 456 457 /** 458 * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found 459 * 460 * @param exchange the exchange 461 * @param name the bean name 462 * @param type the expected bean type 463 * @return the bean 464 * @throws NoSuchBeanException if no bean could be found in the registry 465 */ 466 public static <T> T lookupMandatoryBean(Exchange exchange, String name, Class<T> type) { 467 T value = lookupBean(exchange, name, type); 468 if (value == null) { 469 throw new NoSuchBeanException(name); 470 } 471 return value; 472 } 473 474 /** 475 * Performs a lookup in the registry of the bean name 476 * 477 * @param exchange the exchange 478 * @param name the bean name 479 * @return the bean, or <tt>null</tt> if no bean could be found 480 */ 481 public static Object lookupBean(Exchange exchange, String name) { 482 return exchange.getContext().getRegistry().lookup(name); 483 } 484 485 /** 486 * Performs a lookup in the registry of the bean name and type 487 * 488 * @param exchange the exchange 489 * @param name the bean name 490 * @param type the expected bean type 491 * @return the bean, or <tt>null</tt> if no bean could be found 492 */ 493 public static <T> T lookupBean(Exchange exchange, String name, Class<T> type) { 494 return exchange.getContext().getRegistry().lookup(name, type); 495 } 496 497 /** 498 * Returns the first exchange in the given collection of exchanges which has the same exchange ID as the one given 499 * or null if none could be found 500 * 501 * @param exchanges the exchanges 502 * @param exchangeId the exchangeId to find 503 * @return matching exchange, or <tt>null</tt> if none found 504 */ 505 public static Exchange getExchangeById(Iterable<Exchange> exchanges, String exchangeId) { 506 for (Exchange exchange : exchanges) { 507 String id = exchange.getExchangeId(); 508 if (id != null && id.equals(exchangeId)) { 509 return exchange; 510 } 511 } 512 return null; 513 } 514 515 /** 516 * Prepares the exchanges for aggregation. 517 * <p/> 518 * This implementation will copy the OUT body to the IN body so when you do 519 * aggregation the body is <b>only</b> in the IN body to avoid confusing end users. 520 * 521 * @param oldExchange the old exchange 522 * @param newExchange the new exchange 523 */ 524 public static void prepareAggregation(Exchange oldExchange, Exchange newExchange) { 525 // move body/header from OUT to IN 526 if (oldExchange != null) { 527 if (oldExchange.hasOut()) { 528 oldExchange.setIn(oldExchange.getOut()); 529 oldExchange.setOut(null); 530 } 531 } 532 533 if (newExchange != null) { 534 if (newExchange.hasOut()) { 535 newExchange.setIn(newExchange.getOut()); 536 newExchange.setOut(null); 537 } 538 } 539 } 540 541 /** 542 * Checks whether the exchange has been failure handed 543 * 544 * @param exchange the exchange 545 * @return <tt>true</tt> if failure handled, <tt>false</tt> otherwise 546 */ 547 public static boolean isFailureHandled(Exchange exchange) { 548 return exchange.getProperty(Exchange.FAILURE_HANDLED, false, Boolean.class); 549 } 550 551 /** 552 * Checks whether the exchange {@link UnitOfWork} is exhausted 553 * 554 * @param exchange the exchange 555 * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise 556 */ 557 public static boolean isUnitOfWorkExhausted(Exchange exchange) { 558 return exchange.getProperty(Exchange.UNIT_OF_WORK_EXHAUSTED, false, Boolean.class); 559 } 560 561 /** 562 * Sets the exchange to be failure handled. 563 * 564 * @param exchange the exchange 565 */ 566 public static void setFailureHandled(Exchange exchange) { 567 exchange.setProperty(Exchange.FAILURE_HANDLED, Boolean.TRUE); 568 // clear exception since its failure handled 569 exchange.setException(null); 570 } 571 572 /** 573 * Checks whether the exchange is redelivery exhausted 574 * 575 * @param exchange the exchange 576 * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise 577 */ 578 public static boolean isRedeliveryExhausted(Exchange exchange) { 579 return exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class); 580 } 581 582 /** 583 * Checks whether the exchange {@link UnitOfWork} is redelivered 584 * 585 * @param exchange the exchange 586 * @return <tt>true</tt> if redelivered, <tt>false</tt> otherwise 587 */ 588 public static boolean isRedelivered(Exchange exchange) { 589 return exchange.getIn().hasHeaders() && exchange.getIn().getHeader(Exchange.REDELIVERED, false, Boolean.class); 590 } 591 592 /** 593 * Checks whether the exchange {@link UnitOfWork} has been interrupted during processing 594 * 595 * @param exchange the exchange 596 * @return <tt>true</tt> if interrupted, <tt>false</tt> otherwise 597 */ 598 public static boolean isInterrupted(Exchange exchange) { 599 return exchange.getException(InterruptedException.class) != null; 600 } 601 602 /** 603 * Extracts the body from the given exchange. 604 * <p/> 605 * If the exchange pattern is provided it will try to honor it and retrieve the body 606 * from either IN or OUT according to the pattern. 607 * 608 * @param exchange the exchange 609 * @param pattern exchange pattern if given, can be <tt>null</tt> 610 * @return the result body, can be <tt>null</tt>. 611 * @throws CamelExecutionException is thrown if the processing of the exchange failed 612 */ 613 public static Object extractResultBody(Exchange exchange, ExchangePattern pattern) { 614 Object answer = null; 615 if (exchange != null) { 616 // rethrow if there was an exception during execution 617 if (exchange.getException() != null) { 618 throw ObjectHelper.wrapCamelExecutionException(exchange, exchange.getException()); 619 } 620 621 // result could have a fault message 622 if (hasFaultMessage(exchange)) { 623 return exchange.getOut().getBody(); 624 } 625 626 // okay no fault then return the response according to the pattern 627 // try to honor pattern if provided 628 boolean notOut = pattern != null && !pattern.isOutCapable(); 629 boolean hasOut = exchange.hasOut(); 630 if (hasOut && !notOut) { 631 // we have a response in out and the pattern is out capable 632 answer = exchange.getOut().getBody(); 633 } else if (!hasOut && exchange.getPattern() == ExchangePattern.InOptionalOut) { 634 // special case where the result is InOptionalOut and with no OUT response 635 // so we should return null to indicate this fact 636 answer = null; 637 } else { 638 // use IN as the response 639 answer = exchange.getIn().getBody(); 640 } 641 } 642 return answer; 643 } 644 645 /** 646 * Tests whether the exchange has a fault message set and that its not null. 647 * 648 * @param exchange the exchange 649 * @return <tt>true</tt> if fault message exists 650 */ 651 public static boolean hasFaultMessage(Exchange exchange) { 652 return exchange.hasOut() && exchange.getOut().isFault() && exchange.getOut().getBody() != null; 653 } 654 655 /** 656 * Tests whether the exchange has already been handled by the error handler 657 * 658 * @param exchange the exchange 659 * @return <tt>true</tt> if handled already by error handler, <tt>false</tt> otherwise 660 */ 661 public static boolean hasExceptionBeenHandledByErrorHandler(Exchange exchange) { 662 return Boolean.TRUE.equals(exchange.getProperty(Exchange.ERRORHANDLER_HANDLED)); 663 } 664 665 /** 666 * Extracts the body from the given future, that represents a handle to an asynchronous exchange. 667 * <p/> 668 * Will wait until the future task is complete. 669 * 670 * @param context the camel context 671 * @param future the future handle 672 * @param type the expected body response type 673 * @return the result body, can be <tt>null</tt>. 674 * @throws CamelExecutionException is thrown if the processing of the exchange failed 675 */ 676 public static <T> T extractFutureBody(CamelContext context, Future<Object> future, Class<T> type) { 677 try { 678 return doExtractFutureBody(context, future.get(), type); 679 } catch (InterruptedException e) { 680 throw ObjectHelper.wrapRuntimeCamelException(e); 681 } catch (ExecutionException e) { 682 // execution failed due to an exception so rethrow the cause 683 throw ObjectHelper.wrapCamelExecutionException(null, e.getCause()); 684 } finally { 685 // its harmless to cancel if task is already completed 686 // and in any case we do not want to get hold of the task a 2nd time 687 // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book 688 future.cancel(true); 689 } 690 } 691 692 /** 693 * Extracts the body from the given future, that represents a handle to an asynchronous exchange. 694 * <p/> 695 * Will wait for the future task to complete, but waiting at most the timeout value. 696 * 697 * @param context the camel context 698 * @param future the future handle 699 * @param timeout timeout value 700 * @param unit timeout unit 701 * @param type the expected body response type 702 * @return the result body, can be <tt>null</tt>. 703 * @throws CamelExecutionException is thrown if the processing of the exchange failed 704 * @throws java.util.concurrent.TimeoutException is thrown if a timeout triggered 705 */ 706 public static <T> T extractFutureBody(CamelContext context, Future<Object> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException { 707 try { 708 if (timeout > 0) { 709 return doExtractFutureBody(context, future.get(timeout, unit), type); 710 } else { 711 return doExtractFutureBody(context, future.get(), type); 712 } 713 } catch (InterruptedException e) { 714 // execution failed due interruption so rethrow the cause 715 throw ObjectHelper.wrapCamelExecutionException(null, e); 716 } catch (ExecutionException e) { 717 // execution failed due to an exception so rethrow the cause 718 throw ObjectHelper.wrapCamelExecutionException(null, e.getCause()); 719 } finally { 720 // its harmless to cancel if task is already completed 721 // and in any case we do not want to get hold of the task a 2nd time 722 // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book 723 future.cancel(true); 724 } 725 } 726 727 private static <T> T doExtractFutureBody(CamelContext context, Object result, Class<T> type) { 728 if (result == null) { 729 return null; 730 } 731 if (type.isAssignableFrom(result.getClass())) { 732 return type.cast(result); 733 } 734 if (result instanceof Exchange) { 735 Exchange exchange = (Exchange) result; 736 Object answer = ExchangeHelper.extractResultBody(exchange, exchange.getPattern()); 737 return context.getTypeConverter().convertTo(type, exchange, answer); 738 } 739 return context.getTypeConverter().convertTo(type, result); 740 } 741 742 /** 743 * @deprecated use org.apache.camel.CamelExchangeException.createExceptionMessage instead 744 */ 745 @Deprecated 746 public static String createExceptionMessage(String message, Exchange exchange, Throwable cause) { 747 return CamelExchangeException.createExceptionMessage(message, exchange, cause); 748 } 749 750 /** 751 * Strategy to prepare results before next iterator or when we are complete, 752 * which is done by copying OUT to IN, so there is only an IN as input 753 * for the next iteration. 754 * 755 * @param exchange the exchange to prepare 756 */ 757 public static void prepareOutToIn(Exchange exchange) { 758 // we are routing using pipes and filters so we need to manually copy OUT to IN 759 if (exchange.hasOut()) { 760 exchange.getIn().copyFrom(exchange.getOut()); 761 exchange.setOut(null); 762 } 763 } 764 765 /** 766 * Gets both the messageId and exchangeId to be used for logging purposes. 767 * <p/> 768 * Logging both ids, can help to correlate exchanges which may be redelivered messages 769 * from for example a JMS broker. 770 * 771 * @param exchange the exchange 772 * @return a log message with both the messageId and exchangeId 773 */ 774 public static String logIds(Exchange exchange) { 775 String msgId = exchange.hasOut() ? exchange.getOut().getMessageId() : exchange.getIn().getMessageId(); 776 return "(MessageId: " + msgId + " on ExchangeId: " + exchange.getExchangeId() + ")"; 777 } 778 779 /** 780 * Copies the exchange but the copy will be tied to the given context 781 * 782 * @param exchange the source exchange 783 * @param context the camel context 784 * @return a copy with the given camel context 785 */ 786 public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context) { 787 return copyExchangeAndSetCamelContext(exchange, context, true); 788 } 789 790 /** 791 * Copies the exchange but the copy will be tied to the given context 792 * 793 * @param exchange the source exchange 794 * @param context the camel context 795 * @param handover whether to handover on completions from the source to the copy 796 * @return a copy with the given camel context 797 */ 798 public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context, boolean handover) { 799 DefaultExchange answer = new DefaultExchange(context, exchange.getPattern()); 800 if (exchange.hasProperties()) { 801 answer.setProperties(safeCopy(exchange.getProperties())); 802 } 803 if (handover) { 804 // Need to hand over the completion for async invocation 805 exchange.handoverCompletions(answer); 806 } 807 answer.setIn(exchange.getIn().copy()); 808 if (exchange.hasOut()) { 809 answer.setOut(exchange.getOut().copy()); 810 } 811 answer.setException(exchange.getException()); 812 return answer; 813 } 814 815 private static Map<String, Object> safeCopy(Map<String, Object> properties) { 816 if (properties == null) { 817 return null; 818 } 819 return new ConcurrentHashMap<String, Object>(properties); 820 } 821 }