001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.util.Iterator; 020 021import org.apache.camel.AsyncCallback; 022import org.apache.camel.AsyncProcessor; 023import org.apache.camel.AsyncProducerCallback; 024import org.apache.camel.CamelContext; 025import org.apache.camel.Endpoint; 026import org.apache.camel.Exchange; 027import org.apache.camel.ExchangePattern; 028import org.apache.camel.Expression; 029import org.apache.camel.FailedToCreateProducerException; 030import org.apache.camel.Message; 031import org.apache.camel.Producer; 032import org.apache.camel.Traceable; 033import org.apache.camel.builder.ExpressionBuilder; 034import org.apache.camel.impl.DefaultExchange; 035import org.apache.camel.impl.EmptyProducerCache; 036import org.apache.camel.impl.ProducerCache; 037import org.apache.camel.spi.EndpointUtilizationStatistics; 038import org.apache.camel.spi.IdAware; 039import org.apache.camel.spi.RouteContext; 040import org.apache.camel.support.ServiceSupport; 041import org.apache.camel.util.AsyncProcessorHelper; 042import org.apache.camel.util.ExchangeHelper; 043import org.apache.camel.util.MessageHelper; 044import org.apache.camel.util.ObjectHelper; 045import org.apache.camel.util.ServiceHelper; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import static org.apache.camel.processor.PipelineHelper.continueProcessing; 050import static org.apache.camel.util.ObjectHelper.notNull; 051 052/** 053 * Implements a <a href="http://camel.apache.org/routing-slip.html">Routing Slip</a> 054 * pattern where the list of actual endpoints to send a message exchange to are 055 * dependent on the value of a message header. 056 * <p/> 057 * This implementation mirrors the logic from the {@link org.apache.camel.processor.Pipeline} in the async variation 058 * as the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the 059 * pipeline to ensure it works the same and the async routing engine is flawless. 060 */ 061public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { 062 protected final Logger log = LoggerFactory.getLogger(getClass()); 063 protected String id; 064 protected ProducerCache producerCache; 065 protected int cacheSize; 066 protected boolean ignoreInvalidEndpoints; 067 protected String header; 068 protected Expression expression; 069 protected String uriDelimiter; 070 protected final CamelContext camelContext; 071 protected AsyncProcessor errorHandler; 072 protected SendDynamicProcessor sendDynamicProcessor; 073 074 /** 075 * The iterator to be used for retrieving the next routing slip(s) to be used. 076 */ 077 protected interface RoutingSlipIterator { 078 079 /** 080 * Are the more routing slip(s)? 081 * 082 * @param exchange the current exchange 083 * @return <tt>true</tt> if more slips, <tt>false</tt> otherwise. 084 */ 085 boolean hasNext(Exchange exchange); 086 087 /** 088 * Returns the next routing slip(s). 089 * 090 * @param exchange the current exchange 091 * @return the slip(s). 092 */ 093 Object next(Exchange exchange); 094 095 } 096 097 public RoutingSlip(CamelContext camelContext) { 098 notNull(camelContext, "camelContext"); 099 this.camelContext = camelContext; 100 } 101 102 public RoutingSlip(CamelContext camelContext, Expression expression, String uriDelimiter) { 103 notNull(camelContext, "camelContext"); 104 notNull(expression, "expression"); 105 106 this.camelContext = camelContext; 107 this.expression = expression; 108 this.uriDelimiter = uriDelimiter; 109 this.header = null; 110 } 111 112 public String getId() { 113 return id; 114 } 115 116 public void setId(String id) { 117 this.id = id; 118 } 119 120 public Expression getExpression() { 121 return expression; 122 } 123 124 public String getUriDelimiter() { 125 return uriDelimiter; 126 } 127 128 public void setDelimiter(String delimiter) { 129 this.uriDelimiter = delimiter; 130 } 131 132 public boolean isIgnoreInvalidEndpoints() { 133 return ignoreInvalidEndpoints; 134 } 135 136 public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) { 137 this.ignoreInvalidEndpoints = ignoreInvalidEndpoints; 138 } 139 140 public int getCacheSize() { 141 return cacheSize; 142 } 143 144 public void setCacheSize(int cacheSize) { 145 this.cacheSize = cacheSize; 146 } 147 148 public AsyncProcessor getErrorHandler() { 149 return errorHandler; 150 } 151 152 public void setErrorHandler(AsyncProcessor errorHandler) { 153 this.errorHandler = errorHandler; 154 } 155 156 @Override 157 public String toString() { 158 return "RoutingSlip[expression=" + expression + " uriDelimiter=" + uriDelimiter + "]"; 159 } 160 161 public String getTraceLabel() { 162 return "routingSlip[" + expression + "]"; 163 } 164 165 public void process(Exchange exchange) throws Exception { 166 AsyncProcessorHelper.process(this, exchange); 167 } 168 169 public boolean process(Exchange exchange, AsyncCallback callback) { 170 if (!isStarted()) { 171 exchange.setException(new IllegalStateException("RoutingSlip has not been started: " + this)); 172 callback.done(true); 173 return true; 174 } 175 176 return doRoutingSlipWithExpression(exchange, this.expression, callback); 177 } 178 179 public boolean doRoutingSlip(Exchange exchange, Object routingSlip, AsyncCallback callback) { 180 if (routingSlip instanceof Expression) { 181 return doRoutingSlipWithExpression(exchange, (Expression) routingSlip, callback); 182 } else { 183 return doRoutingSlipWithExpression(exchange, ExpressionBuilder.constantExpression(routingSlip), callback); 184 } 185 } 186 187 /** 188 * Creates the route slip iterator to be used. 189 * 190 * @param exchange the exchange 191 * @param expression the expression 192 * @return the iterator, should never be <tt>null</tt> 193 */ 194 protected RoutingSlipIterator createRoutingSlipIterator(final Exchange exchange, final Expression expression) throws Exception { 195 Object slip = expression.evaluate(exchange, Object.class); 196 if (exchange.getException() != null) { 197 // force any exceptions occurred during evaluation to be thrown 198 throw exchange.getException(); 199 } 200 201 final Iterator<Object> delegate = ObjectHelper.createIterator(slip, uriDelimiter); 202 203 return new RoutingSlipIterator() { 204 public boolean hasNext(Exchange exchange) { 205 return delegate.hasNext(); 206 } 207 208 public Object next(Exchange exchange) { 209 return delegate.next(); 210 } 211 }; 212 } 213 214 private boolean doRoutingSlipWithExpression(final Exchange exchange, final Expression expression, final AsyncCallback originalCallback) { 215 Exchange current = exchange; 216 RoutingSlipIterator iter; 217 try { 218 iter = createRoutingSlipIterator(exchange, expression); 219 } catch (Exception e) { 220 exchange.setException(e); 221 originalCallback.done(true); 222 return true; 223 } 224 225 // ensure the slip is empty when we start 226 if (current.hasProperties()) { 227 current.setProperty(Exchange.SLIP_ENDPOINT, null); 228 } 229 230 while (iter.hasNext(current)) { 231 Endpoint endpoint; 232 try { 233 endpoint = resolveEndpoint(iter, exchange); 234 // if no endpoint was resolved then try the next 235 if (endpoint == null) { 236 continue; 237 } 238 } catch (Exception e) { 239 // error resolving endpoint so we should break out 240 current.setException(e); 241 break; 242 } 243 244 //process and prepare the routing slip 245 boolean sync = processExchange(endpoint, current, exchange, originalCallback, iter); 246 current = prepareExchangeForRoutingSlip(current, endpoint); 247 248 if (!sync) { 249 log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); 250 // the remainder of the routing slip will be completed async 251 // so we break out now, then the callback will be invoked which then continue routing from where we left here 252 return false; 253 } 254 255 log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); 256 257 // we ignore some kind of exceptions and allow us to continue 258 if (isIgnoreInvalidEndpoints()) { 259 FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class); 260 if (e != null) { 261 if (log.isDebugEnabled()) { 262 log.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.", e); 263 } 264 current.setException(null); 265 } 266 } 267 268 // Decide whether to continue with the recipients or not; similar logic to the Pipeline 269 // check for error if so we should break out 270 if (!continueProcessing(current, "so breaking out of the routing slip", log)) { 271 break; 272 } 273 } 274 275 // logging nextExchange as it contains the exchange that might have altered the payload and since 276 // we are logging the completion if will be confusing if we log the original instead 277 // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots 278 log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), current); 279 280 // copy results back to the original exchange 281 ExchangeHelper.copyResults(exchange, current); 282 283 // okay we are completely done with the routing slip 284 // so we need to signal done on the original callback so it can continue 285 originalCallback.done(true); 286 return true; 287 } 288 289 protected Endpoint resolveEndpoint(RoutingSlipIterator iter, Exchange exchange) throws Exception { 290 Object nextRecipient = iter.next(exchange); 291 Endpoint endpoint = null; 292 try { 293 endpoint = ExchangeHelper.resolveEndpoint(exchange, nextRecipient); 294 } catch (Exception e) { 295 if (isIgnoreInvalidEndpoints()) { 296 log.info("Endpoint uri is invalid: " + nextRecipient + ". This exception will be ignored.", e); 297 } else { 298 throw e; 299 } 300 } 301 return endpoint; 302 } 303 304 protected Exchange prepareExchangeForRoutingSlip(Exchange current, Endpoint endpoint) { 305 Exchange copy = new DefaultExchange(current); 306 // we must use the same id as this is a snapshot strategy where Camel copies a snapshot 307 // before processing the next step in the pipeline, so we have a snapshot of the exchange 308 // just before. This snapshot is used if Camel should do redeliveries (re try) using 309 // DeadLetterChannel. That is why it's important the id is the same, as it is the *same* 310 // exchange being routed. 311 copy.setExchangeId(current.getExchangeId()); 312 copyOutToIn(copy, current); 313 314 // ensure stream caching is reset 315 MessageHelper.resetStreamCache(copy.getIn()); 316 317 return copy; 318 } 319 320 protected AsyncProcessor createErrorHandler(RouteContext routeContext, Exchange exchange, AsyncProcessor processor, Endpoint endpoint) { 321 AsyncProcessor answer = processor; 322 323 boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class); 324 325 // do not wrap in error handler if we are inside a try block 326 if (!tryBlock && routeContext != null && errorHandler != null) { 327 // wrap the producer in error handler so we have fine grained error handling on 328 // the output side instead of the input side 329 // this is needed to support redelivery on that output alone and not doing redelivery 330 // for the entire routingslip/dynamic-router block again which will start from scratch again 331 answer = errorHandler; 332 } 333 334 return answer; 335 } 336 337 protected boolean processExchange(final Endpoint endpoint, final Exchange exchange, final Exchange original, 338 final AsyncCallback originalCallback, final RoutingSlipIterator iter) { 339 340 // this does the actual processing so log at trace level 341 log.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); 342 343 // routing slip callback which are used when 344 // - routing slip was routed asynchronously 345 // - and we are completely done with the routing slip 346 // so we need to signal done on the original callback so it can continue 347 AsyncCallback callback = new AsyncCallback() { 348 @Override 349 public void done(boolean doneSync) { 350 if (!doneSync) { 351 originalCallback.done(false); 352 } 353 } 354 }; 355 boolean sync = producerCache.doInAsyncProducer(endpoint, exchange, null, callback, new AsyncProducerCallback() { 356 public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange, 357 ExchangePattern exchangePattern, final AsyncCallback callback) { 358 359 // rework error handling to support fine grained error handling 360 RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null; 361 AsyncProcessor target = createErrorHandler(routeContext, exchange, asyncProducer, endpoint); 362 363 // set property which endpoint we send to and the producer that can do it 364 exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); 365 exchange.setProperty(Exchange.SLIP_ENDPOINT, endpoint.getEndpointUri()); 366 exchange.setProperty(Exchange.SLIP_PRODUCER, asyncProducer); 367 368 boolean answer = target.process(exchange, new AsyncCallback() { 369 public void done(boolean doneSync) { 370 // cleanup producer after usage 371 exchange.removeProperty(Exchange.SLIP_PRODUCER); 372 373 // we only have to handle async completion of the routing slip 374 if (doneSync) { 375 callback.done(true); 376 return; 377 } 378 379 try { 380 // continue processing the routing slip asynchronously 381 Exchange current = prepareExchangeForRoutingSlip(exchange, endpoint); 382 383 while (iter.hasNext(current)) { 384 385 // we ignore some kind of exceptions and allow us to continue 386 if (isIgnoreInvalidEndpoints()) { 387 FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class); 388 if (e != null) { 389 if (log.isDebugEnabled()) { 390 log.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.", e); 391 } 392 current.setException(null); 393 } 394 } 395 396 // Decide whether to continue with the recipients or not; similar logic to the Pipeline 397 // check for error if so we should break out 398 if (!continueProcessing(current, "so breaking out of the routing slip", log)) { 399 break; 400 } 401 402 Endpoint endpoint; 403 try { 404 endpoint = resolveEndpoint(iter, exchange); 405 // if no endpoint was resolved then try the next 406 if (endpoint == null) { 407 continue; 408 } 409 } catch (Exception e) { 410 // error resolving endpoint so we should break out 411 current.setException(e); 412 break; 413 } 414 415 // prepare and process the routing slip 416 boolean sync = processExchange(endpoint, current, original, callback, iter); 417 current = prepareExchangeForRoutingSlip(current, endpoint); 418 419 if (!sync) { 420 log.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId()); 421 return; 422 } 423 } 424 425 // logging nextExchange as it contains the exchange that might have altered the payload and since 426 // we are logging the completion if will be confusing if we log the original instead 427 // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots 428 log.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), current); 429 430 // copy results back to the original exchange 431 ExchangeHelper.copyResults(original, current); 432 } catch (Throwable e) { 433 exchange.setException(e); 434 } 435 436 // okay we are completely done with the routing slip 437 // so we need to signal done on the original callback so it can continue 438 originalCallback.done(false); 439 } 440 }); 441 442 return answer; 443 } 444 }); 445 446 return sync; 447 } 448 449 protected void doStart() throws Exception { 450 if (producerCache == null) { 451 if (cacheSize < 0) { 452 producerCache = new EmptyProducerCache(this, camelContext); 453 log.debug("RoutingSlip {} is not using ProducerCache", this); 454 } else if (cacheSize == 0) { 455 producerCache = new ProducerCache(this, camelContext); 456 log.debug("RoutingSlip {} using ProducerCache with default cache size", this); 457 } else { 458 producerCache = new ProducerCache(this, camelContext, cacheSize); 459 log.debug("RoutingSlip {} using ProducerCache with cacheSize={}", this, cacheSize); 460 } 461 } 462 463 ServiceHelper.startServices(producerCache, errorHandler); 464 } 465 466 protected void doStop() throws Exception { 467 ServiceHelper.stopServices(producerCache, errorHandler); 468 } 469 470 protected void doShutdown() throws Exception { 471 ServiceHelper.stopAndShutdownServices(producerCache, errorHandler); 472 } 473 474 public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { 475 return producerCache.getEndpointUtilizationStatistics(); 476 } 477 478 /** 479 * Returns the outbound message if available. Otherwise return the inbound message. 480 */ 481 private Message getResultMessage(Exchange exchange) { 482 if (exchange.hasOut()) { 483 return exchange.getOut(); 484 } else { 485 // if this endpoint had no out (like a mock endpoint) just take the in 486 return exchange.getIn(); 487 } 488 } 489 490 /** 491 * Copy the outbound data in 'source' to the inbound data in 'result'. 492 */ 493 private void copyOutToIn(Exchange result, Exchange source) { 494 result.setException(source.getException()); 495 result.setIn(getResultMessage(source)); 496 497 result.getProperties().clear(); 498 result.getProperties().putAll(source.getProperties()); 499 } 500 501 /** 502 * Creates the embedded processor to use when wrapping this routing slip in an error handler. 503 */ 504 public AsyncProcessor newRoutingSlipProcessorForErrorHandler() { 505 return new RoutingSlipProcessor(); 506 } 507 508 /** 509 * Embedded processor that routes to the routing slip that has been set via the 510 * exchange property {@link Exchange#SLIP_PRODUCER}. 511 */ 512 private final class RoutingSlipProcessor implements AsyncProcessor { 513 514 @Override 515 public void process(Exchange exchange) throws Exception { 516 AsyncProcessorHelper.process(this, exchange); 517 } 518 519 @Override 520 public boolean process(Exchange exchange, AsyncCallback callback) { 521 AsyncProcessor producer = exchange.getProperty(Exchange.SLIP_PRODUCER, AsyncProcessor.class); 522 return producer.process(exchange, callback); 523 } 524 525 @Override 526 public String toString() { 527 return "RoutingSlipProcessor"; 528 } 529 } 530}