001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.util.Iterator; 020import java.util.concurrent.ConcurrentHashMap; 021import java.util.concurrent.ConcurrentMap; 022 023import org.apache.camel.AsyncCallback; 024import org.apache.camel.AsyncProcessor; 025import org.apache.camel.AsyncProducerCallback; 026import org.apache.camel.CamelContext; 027import org.apache.camel.Endpoint; 028import org.apache.camel.ErrorHandlerFactory; 029import org.apache.camel.Exchange; 030import org.apache.camel.ExchangePattern; 031import org.apache.camel.Expression; 032import org.apache.camel.FailedToCreateProducerException; 033import org.apache.camel.Message; 034import org.apache.camel.Processor; 035import org.apache.camel.Producer; 036import org.apache.camel.Traceable; 037import org.apache.camel.builder.ExpressionBuilder; 038import org.apache.camel.impl.DefaultExchange; 039import org.apache.camel.impl.EmptyProducerCache; 040import org.apache.camel.impl.ProducerCache; 041import org.apache.camel.spi.EndpointUtilizationStatistics; 042import org.apache.camel.spi.IdAware; 043import org.apache.camel.spi.RouteContext; 044import org.apache.camel.support.ServiceSupport; 045import org.apache.camel.util.AsyncProcessorHelper; 046import org.apache.camel.util.ExchangeHelper; 047import org.apache.camel.util.KeyValueHolder; 048import org.apache.camel.util.MessageHelper; 049import org.apache.camel.util.ObjectHelper; 050import org.apache.camel.util.ServiceHelper; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import static org.apache.camel.processor.PipelineHelper.continueProcessing; 055import static org.apache.camel.util.ObjectHelper.notNull; 056 057/** 058 * Implements a <a href="http://camel.apache.org/routing-slip.html">Routing Slip</a> 059 * pattern where the list of actual endpoints to send a message exchange to are 060 * dependent on the value of a message header. 061 * <p/> 062 * This implementation mirrors the logic from the {@link org.apache.camel.processor.Pipeline} in the async variation 063 * as the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the 064 * pipeline to ensure it works the same and the async routing engine is flawless. 065 */ 066public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { 067 protected final Logger log = LoggerFactory.getLogger(getClass()); 068 protected String id; 069 protected ProducerCache producerCache; 070 protected int cacheSize; 071 protected boolean ignoreInvalidEndpoints; 072 protected String header; 073 protected Expression expression; 074 protected String uriDelimiter; 075 protected final CamelContext camelContext; 076 private final ConcurrentMap<PreparedErrorHandler, AsyncProcessor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, AsyncProcessor>(); 077 078 /** 079 * Class that represents prepared fine grained error handlers when processing routingslip/dynamic-router exchanges 080 * <p/> 081 * This is similar to how multicast processor does. 082 */ 083 static final class PreparedErrorHandler extends KeyValueHolder<String, Processor> { 084 085 PreparedErrorHandler(String key, Processor value) { 086 super(key, value); 087 } 088 089 } 090 091 /** 092 * The iterator to be used for retrieving the next routing slip(s) to be used. 093 */ 094 protected interface RoutingSlipIterator { 095 096 /** 097 * Are the more routing slip(s)? 098 * 099 * @param exchange the current exchange 100 * @return <tt>true</tt> if more slips, <tt>false</tt> otherwise. 101 */ 102 boolean hasNext(Exchange exchange); 103 104 /** 105 * Returns the next routing slip(s). 106 * 107 * @param exchange the current exchange 108 * @return the slip(s). 109 */ 110 Object next(Exchange exchange); 111 112 } 113 114 public RoutingSlip(CamelContext camelContext) { 115 notNull(camelContext, "camelContext"); 116 this.camelContext = camelContext; 117 } 118 119 public RoutingSlip(CamelContext camelContext, Expression expression, String uriDelimiter) { 120 notNull(camelContext, "camelContext"); 121 notNull(expression, "expression"); 122 123 this.camelContext = camelContext; 124 this.expression = expression; 125 this.uriDelimiter = uriDelimiter; 126 this.header = null; 127 } 128 129 public String getId() { 130 return id; 131 } 132 133 public void setId(String id) { 134 this.id = id; 135 } 136 137 public Expression getExpression() { 138 return expression; 139 } 140 141 public String getUriDelimiter() { 142 return uriDelimiter; 143 } 144 145 public void setDelimiter(String delimiter) { 146 this.uriDelimiter = delimiter; 147 } 148 149 public boolean isIgnoreInvalidEndpoints() { 150 return ignoreInvalidEndpoints; 151 } 152 153 public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) { 154 this.ignoreInvalidEndpoints = ignoreInvalidEndpoints; 155 } 156 157 public int getCacheSize() { 158 return cacheSize; 159 } 160 161 public void setCacheSize(int cacheSize) { 162 this.cacheSize = cacheSize; 163 } 164 165 @Override 166 public String toString() { 167 return "RoutingSlip[expression=" + expression + " uriDelimiter=" + uriDelimiter + "]"; 168 } 169 170 public String getTraceLabel() { 171 return "routingSlip[" + expression + "]"; 172 } 173 174 public void process(Exchange exchange) throws Exception { 175 AsyncProcessorHelper.process(this, exchange); 176 } 177 178 public boolean process(Exchange exchange, AsyncCallback callback) { 179 if (!isStarted()) { 180 exchange.setException(new IllegalStateException("RoutingSlip has not been started: " + this)); 181 callback.done(true); 182 return true; 183 } 184 185 return doRoutingSlipWithExpression(exchange, this.expression, callback); 186 } 187 188 public boolean doRoutingSlip(Exchange exchange, Object routingSlip, AsyncCallback callback) { 189 if (routingSlip instanceof Expression) { 190 return doRoutingSlipWithExpression(exchange, (Expression) routingSlip, callback); 191 } else { 192 return doRoutingSlipWithExpression(exchange, ExpressionBuilder.constantExpression(routingSlip), callback); 193 } 194 } 195 196 /** 197 * Creates the route slip iterator to be used. 198 * 199 * @param exchange the exchange 200 * @param expression the expression 201 * @return the iterator, should never be <tt>null</tt> 202 */ 203 protected RoutingSlipIterator createRoutingSlipIterator(final Exchange exchange, final Expression expression) throws Exception { 204 Object slip = expression.evaluate(exchange, Object.class); 205 if (exchange.getException() != null) { 206 // force any exceptions occurred during evaluation to be thrown 207 throw exchange.getException(); 208 } 209 210 final Iterator<Object> delegate = ObjectHelper.createIterator(slip, uriDelimiter); 211 212 return new RoutingSlipIterator() { 213 public boolean hasNext(Exchange exchange) { 214 return delegate.hasNext(); 215 } 216 217 public Object next(Exchange exchange) { 218 return delegate.next(); 219 } 220 }; 221 } 222 223 private boolean doRoutingSlipWithExpression(final Exchange exchange, final Expression expression, final AsyncCallback callback) { 224 Exchange current = exchange; 225 RoutingSlipIterator iter; 226 try { 227 iter = createRoutingSlipIterator(exchange, expression); 228 } catch (Exception e) { 229 exchange.setException(e); 230 callback.done(true); 231 return true; 232 } 233 234 // ensure the slip is empty when we start 235 if (current.hasProperties()) { 236 current.setProperty(Exchange.SLIP_ENDPOINT, null); 237 } 238 239 while (iter.hasNext(current)) { 240 Endpoint endpoint; 241 try { 242 endpoint = resolveEndpoint(iter, exchange); 243 // if no endpoint was resolved then try the next 244 if (endpoint == null) { 245 continue; 246 } 247 } catch (Exception e) { 248 // error resolving endpoint so we should break out 249 current.setException(e); 250 break; 251 } 252 253 //process and prepare the routing slip 254 boolean sync = processExchange(endpoint, current, exchange, callback, iter); 255 current = prepareExchangeForRoutingSlip(current, endpoint); 256 257 if (!sync) { 258 log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); 259 // the remainder of the routing slip will be completed async 260 // so we break out now, then the callback will be invoked which then continue routing from where we left here 261 return false; 262 } 263 264 log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); 265 266 // we ignore some kind of exceptions and allow us to continue 267 if (isIgnoreInvalidEndpoints()) { 268 FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class); 269 if (e != null) { 270 if (log.isDebugEnabled()) { 271 log.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.", e); 272 } 273 current.setException(null); 274 } 275 } 276 277 // Decide whether to continue with the recipients or not; similar logic to the Pipeline 278 // check for error if so we should break out 279 if (!continueProcessing(current, "so breaking out of the routing slip", log)) { 280 break; 281 } 282 } 283 284 // logging nextExchange as it contains the exchange that might have altered the payload and since 285 // we are logging the completion if will be confusing if we log the original instead 286 // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots 287 log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), current); 288 289 // copy results back to the original exchange 290 ExchangeHelper.copyResults(exchange, current); 291 292 callback.done(true); 293 return true; 294 } 295 296 protected Endpoint resolveEndpoint(RoutingSlipIterator iter, Exchange exchange) throws Exception { 297 Object nextRecipient = iter.next(exchange); 298 Endpoint endpoint = null; 299 try { 300 endpoint = ExchangeHelper.resolveEndpoint(exchange, nextRecipient); 301 } catch (Exception e) { 302 if (isIgnoreInvalidEndpoints()) { 303 log.info("Endpoint uri is invalid: " + nextRecipient + ". This exception will be ignored.", e); 304 } else { 305 throw e; 306 } 307 } 308 return endpoint; 309 } 310 311 protected Exchange prepareExchangeForRoutingSlip(Exchange current, Endpoint endpoint) { 312 Exchange copy = new DefaultExchange(current); 313 // we must use the same id as this is a snapshot strategy where Camel copies a snapshot 314 // before processing the next step in the pipeline, so we have a snapshot of the exchange 315 // just before. This snapshot is used if Camel should do redeliveries (re try) using 316 // DeadLetterChannel. That is why it's important the id is the same, as it is the *same* 317 // exchange being routed. 318 copy.setExchangeId(current.getExchangeId()); 319 copyOutToIn(copy, current); 320 321 // ensure stream caching is reset 322 MessageHelper.resetStreamCache(copy.getIn()); 323 324 return copy; 325 } 326 327 protected AsyncProcessor createErrorHandler(RouteContext routeContext, Exchange exchange, AsyncProcessor processor, Endpoint endpoint) { 328 AsyncProcessor answer = processor; 329 330 boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class); 331 332 // do not wrap in error handler if we are inside a try block 333 if (!tryBlock && routeContext != null) { 334 // wrap the producer in error handler so we have fine grained error handling on 335 // the output side instead of the input side 336 // this is needed to support redelivery on that output alone and not doing redelivery 337 // for the entire routingslip/dynamic-router block again which will start from scratch again 338 339 // create key for cache 340 final PreparedErrorHandler key = new PreparedErrorHandler(endpoint.getEndpointUri(), processor); 341 342 // lookup cached first to reuse and preserve memory 343 answer = errorHandlers.get(key); 344 if (answer != null) { 345 log.trace("Using existing error handler for: {}", processor); 346 return answer; 347 } 348 349 log.trace("Creating error handler for: {}", processor); 350 ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder(); 351 // create error handler (create error handler directly to keep it light weight, 352 // instead of using ProcessorDefinition.wrapInErrorHandler) 353 try { 354 answer = (AsyncProcessor) builder.createErrorHandler(routeContext, processor); 355 356 // must start the error handler 357 ServiceHelper.startServices(answer); 358 359 // add to cache 360 errorHandlers.putIfAbsent(key, answer); 361 362 } catch (Exception e) { 363 throw ObjectHelper.wrapRuntimeCamelException(e); 364 } 365 } 366 367 return answer; 368 } 369 370 protected boolean processExchange(final Endpoint endpoint, final Exchange exchange, final Exchange original, 371 final AsyncCallback callback, final RoutingSlipIterator iter) { 372 373 // this does the actual processing so log at trace level 374 log.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); 375 376 boolean sync = producerCache.doInAsyncProducer(endpoint, exchange, null, callback, new AsyncProducerCallback() { 377 public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange, 378 ExchangePattern exchangePattern, final AsyncCallback callback) { 379 380 // rework error handling to support fine grained error handling 381 RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null; 382 asyncProducer = createErrorHandler(routeContext, exchange, asyncProducer, endpoint); 383 384 // set property which endpoint we send to 385 exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); 386 exchange.setProperty(Exchange.SLIP_ENDPOINT, endpoint.getEndpointUri()); 387 388 return asyncProducer.process(exchange, new AsyncCallback() { 389 public void done(boolean doneSync) { 390 // we only have to handle async completion of the routing slip 391 if (doneSync) { 392 callback.done(doneSync); 393 return; 394 } 395 396 // continue processing the routing slip asynchronously 397 Exchange current = prepareExchangeForRoutingSlip(exchange, endpoint); 398 399 while (iter.hasNext(current)) { 400 401 // we ignore some kind of exceptions and allow us to continue 402 if (isIgnoreInvalidEndpoints()) { 403 FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class); 404 if (e != null) { 405 if (log.isDebugEnabled()) { 406 log.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.", e); 407 } 408 current.setException(null); 409 } 410 } 411 412 // Decide whether to continue with the recipients or not; similar logic to the Pipeline 413 // check for error if so we should break out 414 if (!continueProcessing(current, "so breaking out of the routing slip", log)) { 415 break; 416 } 417 418 Endpoint endpoint; 419 try { 420 endpoint = resolveEndpoint(iter, exchange); 421 // if no endpoint was resolved then try the next 422 if (endpoint == null) { 423 continue; 424 } 425 } catch (Exception e) { 426 // error resolving endpoint so we should break out 427 exchange.setException(e); 428 break; 429 } 430 431 // prepare and process the routing slip 432 boolean sync = processExchange(endpoint, current, original, callback, iter); 433 current = prepareExchangeForRoutingSlip(current, endpoint); 434 435 if (!sync) { 436 log.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId()); 437 return; 438 } 439 } 440 441 // logging nextExchange as it contains the exchange that might have altered the payload and since 442 // we are logging the completion if will be confusing if we log the original instead 443 // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots 444 log.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), current); 445 446 // copy results back to the original exchange 447 ExchangeHelper.copyResults(original, current); 448 callback.done(false); 449 } 450 }); 451 } 452 }); 453 454 return sync; 455 } 456 457 protected void doStart() throws Exception { 458 if (producerCache == null) { 459 if (cacheSize < 0) { 460 producerCache = new EmptyProducerCache(this, camelContext); 461 log.debug("RoutingSlip {} is not using ProducerCache", this); 462 } else if (cacheSize == 0) { 463 producerCache = new ProducerCache(this, camelContext); 464 log.debug("RoutingSlip {} using ProducerCache with default cache size", this); 465 } else { 466 producerCache = new ProducerCache(this, camelContext, cacheSize); 467 log.debug("RoutingSlip {} using ProducerCache with cacheSize={}", this, cacheSize); 468 } 469 } 470 ServiceHelper.startService(producerCache); 471 } 472 473 protected void doStop() throws Exception { 474 ServiceHelper.stopServices(producerCache, errorHandlers); 475 } 476 477 protected void doShutdown() throws Exception { 478 ServiceHelper.stopAndShutdownServices(producerCache, errorHandlers); 479 480 // only clear error handlers when shutting down 481 errorHandlers.clear(); 482 } 483 484 public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { 485 return producerCache.getEndpointUtilizationStatistics(); 486 } 487 488 /** 489 * Returns the outbound message if available. Otherwise return the inbound message. 490 */ 491 private Message getResultMessage(Exchange exchange) { 492 if (exchange.hasOut()) { 493 return exchange.getOut(); 494 } else { 495 // if this endpoint had no out (like a mock endpoint) just take the in 496 return exchange.getIn(); 497 } 498 } 499 500 /** 501 * Copy the outbound data in 'source' to the inbound data in 'result'. 502 */ 503 private void copyOutToIn(Exchange result, Exchange source) { 504 result.setException(source.getException()); 505 result.setIn(getResultMessage(source)); 506 507 result.getProperties().clear(); 508 result.getProperties().putAll(source.getProperties()); 509 } 510}