001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.util; 018 019import java.util.HashMap; 020import java.util.LinkedList; 021import java.util.List; 022import java.util.Map; 023import java.util.concurrent.ExecutionException; 024import java.util.concurrent.Future; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.TimeoutException; 027import java.util.function.Predicate; 028 029import org.apache.camel.CamelContext; 030import org.apache.camel.CamelExchangeException; 031import org.apache.camel.CamelExecutionException; 032import org.apache.camel.Endpoint; 033import org.apache.camel.Exchange; 034import org.apache.camel.ExchangePattern; 035import org.apache.camel.InvalidPayloadException; 036import org.apache.camel.Message; 037import org.apache.camel.MessageHistory; 038import org.apache.camel.NoSuchBeanException; 039import org.apache.camel.NoSuchEndpointException; 040import org.apache.camel.NoSuchHeaderException; 041import org.apache.camel.NoSuchPropertyException; 042import org.apache.camel.NoTypeConversionAvailableException; 043import org.apache.camel.Route; 044import org.apache.camel.TypeConversionException; 045import org.apache.camel.TypeConverter; 046import org.apache.camel.impl.DefaultExchange; 047import org.apache.camel.impl.MessageSupport; 048import org.apache.camel.spi.Synchronization; 049import org.apache.camel.spi.UnitOfWork; 050 051/** 052 * Some helper methods for working with {@link Exchange} objects 053 * 054 * @version 055 */ 056public final class ExchangeHelper { 057 058 /** 059 * Utility classes should not have a public constructor. 060 */ 061 private ExchangeHelper() { 062 } 063 064 /** 065 * Extracts the Exchange.BINDING of the given type or null if not present 066 * 067 * @param exchange the message exchange 068 * @param type the expected binding type 069 * @return the binding object of the given type or null if it could not be found or converted 070 */ 071 public static <T> T getBinding(Exchange exchange, Class<T> type) { 072 return exchange != null ? exchange.getProperty(Exchange.BINDING, type) : null; 073 } 074 075 /** 076 * Attempts to resolve the endpoint for the given value 077 * 078 * @param exchange the message exchange being processed 079 * @param value the value which can be an {@link Endpoint} or an object 080 * which provides a String representation of an endpoint via 081 * {@link #toString()} 082 * @return the endpoint 083 * @throws NoSuchEndpointException if the endpoint cannot be resolved 084 */ 085 public static Endpoint resolveEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException { 086 Endpoint endpoint; 087 if (value instanceof Endpoint) { 088 endpoint = (Endpoint) value; 089 } else { 090 String uri = value.toString().trim(); 091 endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), uri); 092 } 093 return endpoint; 094 } 095 096 /** 097 * Gets the mandatory property of the exchange of the correct type 098 * 099 * @param exchange the exchange 100 * @param propertyName the property name 101 * @param type the type 102 * @return the property value 103 * @throws TypeConversionException is thrown if error during type conversion 104 * @throws NoSuchPropertyException is thrown if no property exists 105 */ 106 public static <T> T getMandatoryProperty(Exchange exchange, String propertyName, Class<T> type) throws NoSuchPropertyException { 107 T result = exchange.getProperty(propertyName, type); 108 if (result != null) { 109 return result; 110 } 111 throw new NoSuchPropertyException(exchange, propertyName, type); 112 } 113 114 /** 115 * Gets the mandatory inbound header of the correct type 116 * 117 * @param exchange the exchange 118 * @param headerName the header name 119 * @param type the type 120 * @return the header value 121 * @throws TypeConversionException is thrown if error during type conversion 122 * @throws NoSuchHeaderException is thrown if no headers exists 123 */ 124 public static <T> T getMandatoryHeader(Exchange exchange, String headerName, Class<T> type) throws TypeConversionException, NoSuchHeaderException { 125 T answer = exchange.getIn().getHeader(headerName, type); 126 if (answer == null) { 127 throw new NoSuchHeaderException(exchange, headerName, type); 128 } 129 return answer; 130 } 131 132 /** 133 * Gets the mandatory inbound header of the correct type 134 * 135 * @param message the message 136 * @param headerName the header name 137 * @param type the type 138 * @return the header value 139 * @throws TypeConversionException is thrown if error during type conversion 140 * @throws NoSuchHeaderException is thrown if no headers exists 141 */ 142 public static <T> T getMandatoryHeader(Message message, String headerName, Class<T> type) throws TypeConversionException, NoSuchHeaderException { 143 T answer = message.getHeader(headerName, type); 144 if (answer == null) { 145 throw new NoSuchHeaderException(message.getExchange(), headerName, type); 146 } 147 return answer; 148 } 149 150 /** 151 * Gets an header or property of the correct type 152 * 153 * @param exchange the exchange 154 * @param name the name of the header or the property 155 * @param type the type 156 * @return the header or property value 157 * @throws TypeConversionException is thrown if error during type conversion 158 * @throws NoSuchHeaderException is thrown if no headers exists 159 */ 160 public static <T> T getHeaderOrProperty(Exchange exchange, String name, Class<T> type) throws TypeConversionException { 161 T answer = exchange.getIn().getHeader(name, type); 162 if (answer == null) { 163 answer = exchange.getProperty(name, type); 164 } 165 return answer; 166 } 167 168 /** 169 * Returns the mandatory inbound message body of the correct type or throws 170 * an exception if it is not present 171 * 172 * @param exchange the exchange 173 * @return the body, is never <tt>null</tt> 174 * @throws InvalidPayloadException Is thrown if the body being <tt>null</tt> or wrong class type 175 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()} 176 */ 177 @Deprecated 178 public static Object getMandatoryInBody(Exchange exchange) throws InvalidPayloadException { 179 return exchange.getIn().getMandatoryBody(); 180 } 181 182 /** 183 * Returns the mandatory inbound message body of the correct type or throws 184 * an exception if it is not present 185 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)} 186 */ 187 @Deprecated 188 public static <T> T getMandatoryInBody(Exchange exchange, Class<T> type) throws InvalidPayloadException { 189 return exchange.getIn().getMandatoryBody(type); 190 } 191 192 /** 193 * Returns the mandatory outbound message body of the correct type or throws 194 * an exception if it is not present 195 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody()} 196 */ 197 @Deprecated 198 public static Object getMandatoryOutBody(Exchange exchange) throws InvalidPayloadException { 199 return exchange.getOut().getMandatoryBody(); 200 } 201 202 /** 203 * Returns the mandatory outbound message body of the correct type or throws 204 * an exception if it is not present 205 * @deprecated use {@link org.apache.camel.Message#getMandatoryBody(Class)} 206 */ 207 @Deprecated 208 public static <T> T getMandatoryOutBody(Exchange exchange, Class<T> type) throws InvalidPayloadException { 209 return exchange.getOut().getMandatoryBody(type); 210 } 211 212 /** 213 * Converts the value to the given expected type or throws an exception 214 * 215 * @return the converted value 216 * @throws TypeConversionException is thrown if error during type conversion 217 * @throws NoTypeConversionAvailableException} if no type converters exists to convert to the given type 218 */ 219 public static <T> T convertToMandatoryType(Exchange exchange, Class<T> type, Object value) 220 throws TypeConversionException, NoTypeConversionAvailableException { 221 CamelContext camelContext = exchange.getContext(); 222 ObjectHelper.notNull(camelContext, "CamelContext of Exchange"); 223 TypeConverter converter = camelContext.getTypeConverter(); 224 if (converter != null) { 225 return converter.mandatoryConvertTo(type, exchange, value); 226 } 227 throw new NoTypeConversionAvailableException(value, type); 228 } 229 230 /** 231 * Converts the value to the given expected type 232 * 233 * @return the converted value 234 * @throws org.apache.camel.TypeConversionException is thrown if error during type conversion 235 */ 236 public static <T> T convertToType(Exchange exchange, Class<T> type, Object value) throws TypeConversionException { 237 CamelContext camelContext = exchange.getContext(); 238 ObjectHelper.notNull(camelContext, "CamelContext of Exchange"); 239 TypeConverter converter = camelContext.getTypeConverter(); 240 if (converter != null) { 241 return converter.convertTo(type, exchange, value); 242 } 243 return null; 244 } 245 246 /** 247 * Creates a new instance and copies from the current message exchange so that it can be 248 * forwarded to another destination as a new instance. Unlike regular copy this operation 249 * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used 250 * for async messaging, where the original and copied exchange are independent. 251 * 252 * @param exchange original copy of the exchange 253 * @param handover whether the on completion callbacks should be handed over to the new copy. 254 */ 255 public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover) { 256 return createCorrelatedCopy(exchange, handover, false); 257 } 258 259 /** 260 * Creates a new instance and copies from the current message exchange so that it can be 261 * forwarded to another destination as a new instance. Unlike regular copy this operation 262 * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used 263 * for async messaging, where the original and copied exchange are independent. 264 * 265 * @param exchange original copy of the exchange 266 * @param handover whether the on completion callbacks should be handed over to the new copy. 267 * @param useSameMessageId whether to use same message id on the copy message. 268 */ 269 public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId) { 270 return createCorrelatedCopy(exchange, handover, useSameMessageId, null); 271 } 272 273 /** 274 * Creates a new instance and copies from the current message exchange so that it can be 275 * forwarded to another destination as a new instance. Unlike regular copy this operation 276 * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used 277 * for async messaging, where the original and copied exchange are independent. 278 * 279 * @param exchange original copy of the exchange 280 * @param handover whether the on completion callbacks should be handed over to the new copy. 281 * @param useSameMessageId whether to use same message id on the copy message. 282 * @param filter whether to handover the on completion 283 */ 284 public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId, Predicate<Synchronization> filter) { 285 String id = exchange.getExchangeId(); 286 287 // make sure to do a safe copy as the correlated copy can be routed independently of the source. 288 Exchange copy = exchange.copy(true); 289 // do not reuse message id on copy 290 if (!useSameMessageId) { 291 if (copy.hasOut()) { 292 copy.getOut().setMessageId(null); 293 } 294 copy.getIn().setMessageId(null); 295 } 296 // do not share the unit of work 297 copy.setUnitOfWork(null); 298 // do not reuse the message id 299 // hand over on completion to the copy if we got any 300 UnitOfWork uow = exchange.getUnitOfWork(); 301 if (handover && uow != null) { 302 uow.handoverSynchronization(copy, filter); 303 } 304 // set a correlation id so we can track back the original exchange 305 copy.setProperty(Exchange.CORRELATION_ID, id); 306 return copy; 307 } 308 309 /** 310 * Creates a new instance and copies from the current message exchange so that it can be 311 * forwarded to another destination as a new instance. 312 * 313 * @param exchange original copy of the exchange 314 * @param preserveExchangeId whether or not the exchange id should be preserved 315 * @return the copy 316 */ 317 public static Exchange createCopy(Exchange exchange, boolean preserveExchangeId) { 318 Exchange copy = exchange.copy(); 319 if (preserveExchangeId) { 320 // must preserve exchange id 321 copy.setExchangeId(exchange.getExchangeId()); 322 } 323 return copy; 324 } 325 326 /** 327 * Copies the results of a message exchange from the source exchange to the result exchange 328 * which will copy the message contents, exchange properties and the exception. 329 * Notice the {@link ExchangePattern} is <b>not</b> copied/altered. 330 * 331 * @param result the result exchange which will have the output and error state added 332 * @param source the source exchange which is not modified 333 */ 334 public static void copyResults(Exchange result, Exchange source) { 335 336 // -------------------------------------------------------------------- 337 // TODO: merge logic with that of copyResultsPreservePattern() 338 // -------------------------------------------------------------------- 339 340 if (result == source) { 341 // we just need to ensure MEP is as expected (eg copy result to OUT if out capable) 342 // and the result is not failed 343 if (result.getPattern() == ExchangePattern.InOptionalOut) { 344 // keep as is 345 } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) { 346 // copy IN to OUT as we expect a OUT response 347 result.getOut().copyFrom(source.getIn()); 348 } 349 return; 350 } 351 352 if (result != source) { 353 result.setException(source.getException()); 354 if (source.hasOut()) { 355 result.getOut().copyFrom(source.getOut()); 356 } else if (result.getPattern() == ExchangePattern.InOptionalOut) { 357 // special case where the result is InOptionalOut and with no OUT response 358 // so we should return null to indicate this fact 359 result.setOut(null); 360 } else { 361 // no results so lets copy the last input 362 // as the final processor on a pipeline might not 363 // have created any OUT; such as a mock:endpoint 364 // so lets assume the last IN is the OUT 365 if (result.getPattern().isOutCapable()) { 366 // only set OUT if its OUT capable 367 result.getOut().copyFrom(source.getIn()); 368 } else { 369 // if not replace IN instead to keep the MEP 370 result.getIn().copyFrom(source.getIn()); 371 // clear any existing OUT as the result is on the IN 372 if (result.hasOut()) { 373 result.setOut(null); 374 } 375 } 376 } 377 378 if (source.hasProperties()) { 379 result.getProperties().putAll(source.getProperties()); 380 } 381 } 382 } 383 384 /** 385 * Copies the <code>source</code> exchange to <code>target</code> exchange 386 * preserving the {@link ExchangePattern} of <code>target</code>. 387 * 388 * @param result target exchange. 389 * @param source source exchange. 390 */ 391 public static void copyResultsPreservePattern(Exchange result, Exchange source) { 392 393 // -------------------------------------------------------------------- 394 // TODO: merge logic with that of copyResults() 395 // -------------------------------------------------------------------- 396 397 if (result == source) { 398 // we just need to ensure MEP is as expected (eg copy result to OUT if out capable) 399 // and the result is not failed 400 if (result.getPattern() == ExchangePattern.InOptionalOut) { 401 // keep as is 402 } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) { 403 // copy IN to OUT as we expect a OUT response 404 result.getOut().copyFrom(source.getIn()); 405 } 406 return; 407 } 408 409 // copy in message 410 result.getIn().copyFrom(source.getIn()); 411 412 // copy out message 413 if (source.hasOut()) { 414 // exchange pattern sensitive 415 Message resultMessage = source.getOut().isFault() ? result.getOut() : getResultMessage(result); 416 resultMessage.copyFrom(source.getOut()); 417 } 418 419 // copy exception 420 result.setException(source.getException()); 421 422 // copy properties 423 if (source.hasProperties()) { 424 result.getProperties().putAll(source.getProperties()); 425 } 426 } 427 428 /** 429 * Returns the message where to write results in an 430 * exchange-pattern-sensitive way. 431 * 432 * @param exchange message exchange. 433 * @return result message. 434 */ 435 public static Message getResultMessage(Exchange exchange) { 436 if (exchange.getPattern().isOutCapable()) { 437 return exchange.getOut(); 438 } else { 439 return exchange.getIn(); 440 } 441 } 442 443 /** 444 * Returns true if the given exchange pattern (if defined) can support OUT messages 445 * 446 * @param exchange the exchange to interrogate 447 * @return true if the exchange is defined as an {@link ExchangePattern} which supports 448 * OUT messages 449 */ 450 public static boolean isOutCapable(Exchange exchange) { 451 ExchangePattern pattern = exchange.getPattern(); 452 return pattern != null && pattern.isOutCapable(); 453 } 454 455 /** 456 * Creates a new instance of the given type from the injector 457 * 458 * @param exchange the exchange 459 * @param type the given type 460 * @return the created instance of the given type 461 */ 462 public static <T> T newInstance(Exchange exchange, Class<T> type) { 463 return exchange.getContext().getInjector().newInstance(type); 464 } 465 466 /** 467 * Creates a Map of the variables which are made available to a script or template 468 * 469 * @param exchange the exchange to make available 470 * @return a Map populated with the require variables 471 */ 472 @Deprecated 473 public static Map<String, Object> createVariableMap(Exchange exchange) { 474 Map<String, Object> answer = new HashMap<>(); 475 populateVariableMap(exchange, answer, true); 476 return answer; 477 } 478 479 /** 480 * Creates a Map of the variables which are made available to a script or template 481 * 482 * @param exchange the exchange to make available 483 * @param allowContextMapAll whether to allow access to all context map or not 484 * (prefer to use false due to security reasons preferred to only allow access to body/headers) 485 * @return a Map populated with the require variables 486 */ 487 public static Map<String, Object> createVariableMap(Exchange exchange, boolean allowContextMapAll) { 488 Map<String, Object> answer = new HashMap<>(); 489 populateVariableMap(exchange, answer, allowContextMapAll); 490 return answer; 491 } 492 493 /** 494 * Populates the Map with the variables which are made available to a script or template 495 * 496 * @param exchange the exchange to make available 497 * @param map the map to populate 498 */ 499 @Deprecated 500 public static void populateVariableMap(Exchange exchange, Map<String, Object> map) { 501 populateVariableMap(exchange, map, true); 502 } 503 504 /** 505 * Populates the Map with the variables which are made available to a script or template 506 * 507 * @param exchange the exchange to make available 508 * @param map the map to populate 509 * @param allowContextMapAll whether to allow access to all context map or not 510 * (prefer to use false due to security reasons preferred to only allow access to body/headers) 511 */ 512 public static void populateVariableMap(Exchange exchange, Map<String, Object> map, boolean allowContextMapAll) { 513 Message in = exchange.getIn(); 514 map.put("headers", in.getHeaders()); 515 map.put("body", in.getBody()); 516 if (allowContextMapAll) { 517 map.put("in", in); 518 map.put("exchange", exchange); 519 map.put("request", in); 520 if (isOutCapable(exchange)) { 521 // if we are out capable then set out and response as well 522 // however only grab OUT if it exists, otherwise reuse IN 523 // this prevents side effects to alter the Exchange if we force creating an OUT message 524 Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); 525 map.put("out", msg); 526 map.put("response", msg); 527 } 528 map.put("camelContext", exchange.getContext()); 529 } 530 } 531 532 /** 533 * Returns the MIME content type on the input message or null if one is not defined 534 * 535 * @param exchange the exchange 536 * @return the MIME content type 537 */ 538 public static String getContentType(Exchange exchange) { 539 return MessageHelper.getContentType(exchange.getIn()); 540 } 541 542 /** 543 * Returns the MIME content encoding on the input message or null if one is not defined 544 * 545 * @param exchange the exchange 546 * @return the MIME content encoding 547 */ 548 public static String getContentEncoding(Exchange exchange) { 549 return MessageHelper.getContentEncoding(exchange.getIn()); 550 } 551 552 /** 553 * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found 554 * 555 * @param exchange the exchange 556 * @param name the bean name 557 * @return the bean 558 * @throws NoSuchBeanException if no bean could be found in the registry 559 */ 560 public static Object lookupMandatoryBean(Exchange exchange, String name) throws NoSuchBeanException { 561 Object value = lookupBean(exchange, name); 562 if (value == null) { 563 throw new NoSuchBeanException(name); 564 } 565 return value; 566 } 567 568 /** 569 * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found 570 * 571 * @param exchange the exchange 572 * @param name the bean name 573 * @param type the expected bean type 574 * @return the bean 575 * @throws NoSuchBeanException if no bean could be found in the registry 576 */ 577 public static <T> T lookupMandatoryBean(Exchange exchange, String name, Class<T> type) { 578 T value = lookupBean(exchange, name, type); 579 if (value == null) { 580 throw new NoSuchBeanException(name); 581 } 582 return value; 583 } 584 585 /** 586 * Performs a lookup in the registry of the bean name 587 * 588 * @param exchange the exchange 589 * @param name the bean name 590 * @return the bean, or <tt>null</tt> if no bean could be found 591 */ 592 public static Object lookupBean(Exchange exchange, String name) { 593 return exchange.getContext().getRegistry().lookupByName(name); 594 } 595 596 /** 597 * Performs a lookup in the registry of the bean name and type 598 * 599 * @param exchange the exchange 600 * @param name the bean name 601 * @param type the expected bean type 602 * @return the bean, or <tt>null</tt> if no bean could be found 603 */ 604 public static <T> T lookupBean(Exchange exchange, String name, Class<T> type) { 605 return exchange.getContext().getRegistry().lookupByNameAndType(name, type); 606 } 607 608 /** 609 * Returns the first exchange in the given collection of exchanges which has the same exchange ID as the one given 610 * or null if none could be found 611 * 612 * @param exchanges the exchanges 613 * @param exchangeId the exchangeId to find 614 * @return matching exchange, or <tt>null</tt> if none found 615 */ 616 public static Exchange getExchangeById(Iterable<Exchange> exchanges, String exchangeId) { 617 for (Exchange exchange : exchanges) { 618 String id = exchange.getExchangeId(); 619 if (id != null && id.equals(exchangeId)) { 620 return exchange; 621 } 622 } 623 return null; 624 } 625 626 /** 627 * Prepares the exchanges for aggregation. 628 * <p/> 629 * This implementation will copy the OUT body to the IN body so when you do 630 * aggregation the body is <b>only</b> in the IN body to avoid confusing end users. 631 * 632 * @param oldExchange the old exchange 633 * @param newExchange the new exchange 634 */ 635 public static void prepareAggregation(Exchange oldExchange, Exchange newExchange) { 636 // move body/header from OUT to IN 637 if (oldExchange != null) { 638 if (oldExchange.hasOut()) { 639 oldExchange.setIn(oldExchange.getOut()); 640 oldExchange.setOut(null); 641 } 642 } 643 644 if (newExchange != null) { 645 if (newExchange.hasOut()) { 646 newExchange.setIn(newExchange.getOut()); 647 newExchange.setOut(null); 648 } 649 } 650 } 651 652 /** 653 * Checks whether the exchange has been failure handed 654 * 655 * @param exchange the exchange 656 * @return <tt>true</tt> if failure handled, <tt>false</tt> otherwise 657 */ 658 public static boolean isFailureHandled(Exchange exchange) { 659 return exchange.getProperty(Exchange.FAILURE_HANDLED, false, Boolean.class); 660 } 661 662 /** 663 * Checks whether the exchange {@link UnitOfWork} is exhausted 664 * 665 * @param exchange the exchange 666 * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise 667 */ 668 public static boolean isUnitOfWorkExhausted(Exchange exchange) { 669 return exchange.getProperty(Exchange.UNIT_OF_WORK_EXHAUSTED, false, Boolean.class); 670 } 671 672 /** 673 * Sets the exchange to be failure handled. 674 * 675 * @param exchange the exchange 676 */ 677 public static void setFailureHandled(Exchange exchange) { 678 exchange.setProperty(Exchange.FAILURE_HANDLED, Boolean.TRUE); 679 // clear exception since its failure handled 680 exchange.setException(null); 681 } 682 683 /** 684 * Checks whether the exchange is redelivery exhausted 685 * 686 * @param exchange the exchange 687 * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise 688 */ 689 public static boolean isRedeliveryExhausted(Exchange exchange) { 690 return exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class); 691 } 692 693 /** 694 * Checks whether the exchange {@link UnitOfWork} is redelivered 695 * 696 * @param exchange the exchange 697 * @return <tt>true</tt> if redelivered, <tt>false</tt> otherwise 698 */ 699 public static boolean isRedelivered(Exchange exchange) { 700 return exchange.getIn().hasHeaders() && exchange.getIn().getHeader(Exchange.REDELIVERED, false, Boolean.class); 701 } 702 703 /** 704 * Checks whether the exchange {@link UnitOfWork} has been interrupted during processing 705 * 706 * @param exchange the exchange 707 * @return <tt>true</tt> if interrupted, <tt>false</tt> otherwise 708 */ 709 public static boolean isInterrupted(Exchange exchange) { 710 Object value = exchange.getProperty(Exchange.INTERRUPTED); 711 return value != null && Boolean.TRUE == value; 712 } 713 714 /** 715 * Check whether or not stream caching is enabled for the given route or globally. 716 * 717 * @param exchange the exchange 718 * @return <tt>true</tt> if enabled, <tt>false</tt> otherwise 719 */ 720 public static boolean isStreamCachingEnabled(final Exchange exchange) { 721 Route route = exchange.getContext().getRoute(exchange.getFromRouteId()); 722 if (route != null) { 723 return route.getRouteContext().isStreamCaching(); 724 } else { 725 return exchange.getContext().getStreamCachingStrategy().isEnabled(); 726 } 727 } 728 729 /** 730 * Extracts the body from the given exchange. 731 * <p/> 732 * If the exchange pattern is provided it will try to honor it and retrieve the body 733 * from either IN or OUT according to the pattern. 734 * 735 * @param exchange the exchange 736 * @param pattern exchange pattern if given, can be <tt>null</tt> 737 * @return the result body, can be <tt>null</tt>. 738 * @throws CamelExecutionException is thrown if the processing of the exchange failed 739 */ 740 public static Object extractResultBody(Exchange exchange, ExchangePattern pattern) { 741 Object answer = null; 742 if (exchange != null) { 743 // rethrow if there was an exception during execution 744 if (exchange.getException() != null) { 745 throw ObjectHelper.wrapCamelExecutionException(exchange, exchange.getException()); 746 } 747 748 // result could have a fault message 749 if (hasFaultMessage(exchange)) { 750 Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); 751 answer = msg.getBody(); 752 return answer; 753 } 754 755 // okay no fault then return the response according to the pattern 756 // try to honor pattern if provided 757 boolean notOut = pattern != null && !pattern.isOutCapable(); 758 boolean hasOut = exchange.hasOut(); 759 if (hasOut && !notOut) { 760 // we have a response in out and the pattern is out capable 761 answer = exchange.getOut().getBody(); 762 } else if (!hasOut && exchange.getPattern() == ExchangePattern.InOptionalOut) { 763 // special case where the result is InOptionalOut and with no OUT response 764 // so we should return null to indicate this fact 765 answer = null; 766 } else { 767 // use IN as the response 768 answer = exchange.getIn().getBody(); 769 } 770 } 771 return answer; 772 } 773 774 /** 775 * Tests whether the exchange has a fault message set and that its not null. 776 * 777 * @param exchange the exchange 778 * @return <tt>true</tt> if fault message exists 779 */ 780 public static boolean hasFaultMessage(Exchange exchange) { 781 Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); 782 return msg.isFault() && msg.getBody() != null; 783 } 784 785 /** 786 * Tests whether the exchange has already been handled by the error handler 787 * 788 * @param exchange the exchange 789 * @return <tt>true</tt> if handled already by error handler, <tt>false</tt> otherwise 790 */ 791 public static boolean hasExceptionBeenHandledByErrorHandler(Exchange exchange) { 792 return Boolean.TRUE.equals(exchange.getProperty(Exchange.ERRORHANDLER_HANDLED)); 793 } 794 795 /** 796 * Extracts the body from the given future, that represents a handle to an asynchronous exchange. 797 * <p/> 798 * Will wait until the future task is complete. 799 * 800 * @param context the camel context 801 * @param future the future handle 802 * @param type the expected body response type 803 * @return the result body, can be <tt>null</tt>. 804 * @throws CamelExecutionException is thrown if the processing of the exchange failed 805 */ 806 public static <T> T extractFutureBody(CamelContext context, Future<?> future, Class<T> type) { 807 try { 808 return doExtractFutureBody(context, future.get(), type); 809 } catch (InterruptedException e) { 810 throw ObjectHelper.wrapRuntimeCamelException(e); 811 } catch (ExecutionException e) { 812 // execution failed due to an exception so rethrow the cause 813 throw ObjectHelper.wrapCamelExecutionException(null, e.getCause()); 814 } finally { 815 // its harmless to cancel if task is already completed 816 // and in any case we do not want to get hold of the task a 2nd time 817 // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book 818 future.cancel(true); 819 } 820 } 821 822 /** 823 * Extracts the body from the given future, that represents a handle to an asynchronous exchange. 824 * <p/> 825 * Will wait for the future task to complete, but waiting at most the timeout value. 826 * 827 * @param context the camel context 828 * @param future the future handle 829 * @param timeout timeout value 830 * @param unit timeout unit 831 * @param type the expected body response type 832 * @return the result body, can be <tt>null</tt>. 833 * @throws CamelExecutionException is thrown if the processing of the exchange failed 834 * @throws java.util.concurrent.TimeoutException is thrown if a timeout triggered 835 */ 836 public static <T> T extractFutureBody(CamelContext context, Future<?> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException { 837 try { 838 if (timeout > 0) { 839 return doExtractFutureBody(context, future.get(timeout, unit), type); 840 } else { 841 return doExtractFutureBody(context, future.get(), type); 842 } 843 } catch (InterruptedException e) { 844 // execution failed due interruption so rethrow the cause 845 throw ObjectHelper.wrapCamelExecutionException(null, e); 846 } catch (ExecutionException e) { 847 // execution failed due to an exception so rethrow the cause 848 throw ObjectHelper.wrapCamelExecutionException(null, e.getCause()); 849 } finally { 850 // its harmless to cancel if task is already completed 851 // and in any case we do not want to get hold of the task a 2nd time 852 // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book 853 future.cancel(true); 854 } 855 } 856 857 private static <T> T doExtractFutureBody(CamelContext context, Object result, Class<T> type) { 858 if (result == null) { 859 return null; 860 } 861 if (type.isAssignableFrom(result.getClass())) { 862 return type.cast(result); 863 } 864 if (result instanceof Exchange) { 865 Exchange exchange = (Exchange) result; 866 Object answer = ExchangeHelper.extractResultBody(exchange, exchange.getPattern()); 867 return context.getTypeConverter().convertTo(type, exchange, answer); 868 } 869 return context.getTypeConverter().convertTo(type, result); 870 } 871 872 /** 873 * @deprecated use org.apache.camel.CamelExchangeException.createExceptionMessage instead 874 */ 875 @Deprecated 876 public static String createExceptionMessage(String message, Exchange exchange, Throwable cause) { 877 return CamelExchangeException.createExceptionMessage(message, exchange, cause); 878 } 879 880 /** 881 * Strategy to prepare results before next iterator or when we are complete, 882 * which is done by copying OUT to IN, so there is only an IN as input 883 * for the next iteration. 884 * 885 * @param exchange the exchange to prepare 886 */ 887 public static void prepareOutToIn(Exchange exchange) { 888 // we are routing using pipes and filters so we need to manually copy OUT to IN 889 if (exchange.hasOut()) { 890 exchange.setIn(exchange.getOut()); 891 exchange.setOut(null); 892 } 893 } 894 895 /** 896 * Gets both the messageId and exchangeId to be used for logging purposes. 897 * <p/> 898 * Logging both ids, can help to correlate exchanges which may be redelivered messages 899 * from for example a JMS broker. 900 * 901 * @param exchange the exchange 902 * @return a log message with both the messageId and exchangeId 903 */ 904 public static String logIds(Exchange exchange) { 905 String msgId = exchange.hasOut() ? exchange.getOut().getMessageId() : exchange.getIn().getMessageId(); 906 return "(MessageId: " + msgId + " on ExchangeId: " + exchange.getExchangeId() + ")"; 907 } 908 909 /** 910 * Copies the exchange but the copy will be tied to the given context 911 * 912 * @param exchange the source exchange 913 * @param context the camel context 914 * @return a copy with the given camel context 915 */ 916 public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context) { 917 return copyExchangeAndSetCamelContext(exchange, context, true); 918 } 919 920 /** 921 * Copies the exchange but the copy will be tied to the given context 922 * 923 * @param exchange the source exchange 924 * @param context the camel context 925 * @param handover whether to handover on completions from the source to the copy 926 * @return a copy with the given camel context 927 */ 928 public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context, boolean handover) { 929 DefaultExchange answer = new DefaultExchange(context, exchange.getPattern()); 930 if (exchange.hasProperties()) { 931 answer.setProperties(safeCopyProperties(exchange.getProperties())); 932 } 933 if (handover) { 934 // Need to hand over the completion for async invocation 935 exchange.handoverCompletions(answer); 936 } 937 answer.setIn(exchange.getIn().copy()); 938 if (exchange.hasOut()) { 939 answer.setOut(exchange.getOut().copy()); 940 } 941 answer.setException(exchange.getException()); 942 return answer; 943 } 944 945 /** 946 * Replaces the existing message with the new message 947 * 948 * @param exchange the exchange 949 * @param newMessage the new message 950 * @param outOnly whether to replace the message as OUT message 951 */ 952 public static void replaceMessage(Exchange exchange, Message newMessage, boolean outOnly) { 953 Message old = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); 954 if (outOnly || exchange.hasOut()) { 955 exchange.setOut(newMessage); 956 } else { 957 exchange.setIn(newMessage); 958 } 959 960 // need to de-reference old from the exchange so it can be GC 961 if (old instanceof MessageSupport) { 962 ((MessageSupport) old).setExchange(null); 963 } 964 } 965 966 /** 967 * Gets the original IN {@link Message} this Unit of Work was started with. 968 * <p/> 969 * The original message is only returned if the option {@link org.apache.camel.RuntimeConfiguration#isAllowUseOriginalMessage()} 970 * is enabled. If its disabled, then <tt>null</tt> is returned. 971 * 972 * @return the original IN {@link Message}, or <tt>null</tt> if using original message is disabled. 973 */ 974 public static Message getOriginalInMessage(Exchange exchange) { 975 Message answer = null; 976 977 // try parent first 978 UnitOfWork uow = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class); 979 if (uow != null) { 980 answer = uow.getOriginalInMessage(); 981 } 982 // fallback to the current exchange 983 if (answer == null) { 984 uow = exchange.getUnitOfWork(); 985 if (uow != null) { 986 answer = uow.getOriginalInMessage(); 987 } 988 } 989 return answer; 990 } 991 992 /** 993 * Resolve the component scheme (aka name) from the given endpoint uri 994 * 995 * @param uri the endpoint uri 996 * @return the component scheme (name), or <tt>null</tt> if not possible to resolve 997 */ 998 public static String resolveScheme(String uri) { 999 String scheme = null; 1000 if (uri != null) { 1001 // Use the URI prefix to find the component. 1002 String splitURI[] = StringHelper.splitOnCharacter(uri, ":", 2); 1003 if (splitURI[1] != null) { 1004 scheme = splitURI[0]; 1005 } 1006 } 1007 return scheme; 1008 } 1009 1010 @SuppressWarnings("unchecked") 1011 private static Map<String, Object> safeCopyProperties(Map<String, Object> properties) { 1012 if (properties == null) { 1013 return null; 1014 } 1015 1016 Map<String, Object> answer = new HashMap<>(properties); 1017 1018 // safe copy message history using a defensive copy 1019 List<MessageHistory> history = (List<MessageHistory>) answer.remove(Exchange.MESSAGE_HISTORY); 1020 if (history != null) { 1021 answer.put(Exchange.MESSAGE_HISTORY, new LinkedList<>(history)); 1022 } 1023 1024 return answer; 1025 } 1026 1027 /** 1028 * @see #getCharsetName(Exchange, boolean) 1029 */ 1030 public static String getCharsetName(Exchange exchange) { 1031 return getCharsetName(exchange, true); 1032 } 1033 1034 /** 1035 * Gets the charset name if set as header or property {@link Exchange#CHARSET_NAME}. <b>Notice:</b> The lookup from 1036 * the header has priority over the property. 1037 * 1038 * @param exchange the exchange 1039 * @param useDefault should we fallback and use JVM default charset if no property existed? 1040 * @return the charset, or <tt>null</tt> if no found 1041 */ 1042 public static String getCharsetName(Exchange exchange, boolean useDefault) { 1043 if (exchange != null) { 1044 // header takes precedence 1045 String charsetName = exchange.getIn().getHeader(Exchange.CHARSET_NAME, String.class); 1046 if (charsetName == null) { 1047 charsetName = exchange.getProperty(Exchange.CHARSET_NAME, String.class); 1048 } 1049 if (charsetName != null) { 1050 return IOHelper.normalizeCharset(charsetName); 1051 } 1052 } 1053 if (useDefault) { 1054 return getDefaultCharsetName(); 1055 } else { 1056 return null; 1057 } 1058 } 1059 1060 private static String getDefaultCharsetName() { 1061 return ObjectHelper.getSystemProperty(Exchange.DEFAULT_CHARSET_PROPERTY, "UTF-8"); 1062 } 1063 1064 1065 1066}