001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.Iterator; 020 021 import org.apache.camel.AsyncCallback; 022 import org.apache.camel.AsyncProcessor; 023 import org.apache.camel.AsyncProducerCallback; 024 import org.apache.camel.CamelContext; 025 import org.apache.camel.Endpoint; 026 import org.apache.camel.Exchange; 027 import org.apache.camel.ExchangePattern; 028 import org.apache.camel.Expression; 029 import org.apache.camel.FailedToCreateProducerException; 030 import org.apache.camel.Message; 031 import org.apache.camel.Producer; 032 import org.apache.camel.Traceable; 033 import org.apache.camel.builder.ExpressionBuilder; 034 import org.apache.camel.impl.DefaultExchange; 035 import org.apache.camel.impl.ProducerCache; 036 import org.apache.camel.support.ServiceSupport; 037 import org.apache.camel.util.AsyncProcessorHelper; 038 import org.apache.camel.util.ExchangeHelper; 039 import org.apache.camel.util.ObjectHelper; 040 import org.apache.camel.util.ServiceHelper; 041 import org.slf4j.Logger; 042 import org.slf4j.LoggerFactory; 043 044 import static org.apache.camel.processor.PipelineHelper.continueProcessing; 045 import static org.apache.camel.util.ObjectHelper.notNull; 046 047 /** 048 * Implements a <a href="http://camel.apache.org/routing-slip.html">Routing Slip</a> 049 * pattern where the list of actual endpoints to send a message exchange to are 050 * dependent on the value of a message header. 051 * <p/> 052 * This implementation mirrors the logic from the {@link org.apache.camel.processor.Pipeline} in the async variation 053 * as the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the 054 * pipeline to ensure it works the same and the async routing engine is flawless. 055 */ 056 public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Traceable { 057 protected final transient Logger log = LoggerFactory.getLogger(getClass()); 058 protected ProducerCache producerCache; 059 protected boolean ignoreInvalidEndpoints; 060 protected String header; 061 protected Expression expression; 062 protected String uriDelimiter; 063 protected final CamelContext camelContext; 064 065 /** 066 * The iterator to be used for retrieving the next routing slip(s) to be used. 067 */ 068 protected interface RoutingSlipIterator { 069 070 /** 071 * Are the more routing slip(s)? 072 * 073 * @param exchange the current exchange 074 * @return <tt>true</tt> if more slips, <tt>false</tt> otherwise. 075 */ 076 boolean hasNext(Exchange exchange); 077 078 /** 079 * Returns the next routing slip(s). 080 * 081 * @param exchange the current exchange 082 * @return the slip(s). 083 */ 084 Object next(Exchange exchange); 085 086 } 087 088 public RoutingSlip(CamelContext camelContext) { 089 notNull(camelContext, "camelContext"); 090 this.camelContext = camelContext; 091 } 092 093 public RoutingSlip(CamelContext camelContext, Expression expression, String uriDelimiter) { 094 notNull(camelContext, "camelContext"); 095 notNull(expression, "expression"); 096 097 this.camelContext = camelContext; 098 this.expression = expression; 099 this.uriDelimiter = uriDelimiter; 100 this.header = null; 101 } 102 103 public void setDelimiter(String delimiter) { 104 this.uriDelimiter = delimiter; 105 } 106 107 public boolean isIgnoreInvalidEndpoints() { 108 return ignoreInvalidEndpoints; 109 } 110 111 public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) { 112 this.ignoreInvalidEndpoints = ignoreInvalidEndpoints; 113 } 114 115 @Override 116 public String toString() { 117 return "RoutingSlip[expression=" + expression + " uriDelimiter=" + uriDelimiter + "]"; 118 } 119 120 public String getTraceLabel() { 121 return "routingSlip[" + expression + "]"; 122 } 123 124 public void process(Exchange exchange) throws Exception { 125 AsyncProcessorHelper.process(this, exchange); 126 } 127 128 public boolean process(Exchange exchange, AsyncCallback callback) { 129 if (!isStarted()) { 130 exchange.setException(new IllegalStateException("RoutingSlip has not been started: " + this)); 131 callback.done(true); 132 return true; 133 } 134 135 return doRoutingSlip(exchange, callback); 136 } 137 138 public boolean doRoutingSlip(Exchange exchange, Object routingSlip, AsyncCallback callback) { 139 if (routingSlip instanceof Expression) { 140 this.expression = (Expression) routingSlip; 141 } else { 142 this.expression = ExpressionBuilder.constantExpression(routingSlip); 143 } 144 return doRoutingSlip(exchange, callback); 145 } 146 147 /** 148 * Creates the route slip iterator to be used. 149 * 150 * @param exchange the exchange 151 * @return the iterator, should never be <tt>null</tt> 152 */ 153 protected RoutingSlipIterator createRoutingSlipIterator(final Exchange exchange) throws Exception { 154 Object slip = expression.evaluate(exchange, Object.class); 155 if (exchange.getException() != null) { 156 // force any exceptions occurred during evaluation to be thrown 157 throw exchange.getException(); 158 } 159 160 final Iterator<Object> delegate = ObjectHelper.createIterator(slip, uriDelimiter); 161 162 return new RoutingSlipIterator() { 163 public boolean hasNext(Exchange exchange) { 164 return delegate.hasNext(); 165 } 166 167 public Object next(Exchange exchange) { 168 return delegate.next(); 169 } 170 }; 171 } 172 173 private boolean doRoutingSlip(final Exchange exchange, final AsyncCallback callback) { 174 Exchange current = exchange; 175 RoutingSlipIterator iter; 176 try { 177 iter = createRoutingSlipIterator(exchange); 178 } catch (Exception e) { 179 exchange.setException(e); 180 callback.done(true); 181 return true; 182 } 183 184 // ensure the slip is empty when we start 185 if (current.hasProperties()) { 186 current.setProperty(Exchange.SLIP_ENDPOINT, null); 187 } 188 189 while (iter.hasNext(current)) { 190 Endpoint endpoint; 191 try { 192 endpoint = resolveEndpoint(iter, exchange); 193 // if no endpoint was resolved then try the next 194 if (endpoint == null) { 195 continue; 196 } 197 } catch (Exception e) { 198 // error resolving endpoint so we should break out 199 current.setException(e); 200 break; 201 } 202 203 // prepare and process the routing slip 204 Exchange copy = prepareExchangeForRoutingSlip(current, endpoint); 205 boolean sync = processExchange(endpoint, copy, exchange, callback, iter); 206 current = copy; 207 208 if (!sync) { 209 log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); 210 // the remainder of the routing slip will be completed async 211 // so we break out now, then the callback will be invoked which then continue routing from where we left here 212 return false; 213 } 214 215 log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); 216 217 // we ignore some kind of exceptions and allow us to continue 218 if (isIgnoreInvalidEndpoints()) { 219 FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class); 220 if (e != null) { 221 if (log.isDebugEnabled()) { 222 log.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.", e); 223 } 224 current.setException(null); 225 } 226 } 227 228 // Decide whether to continue with the recipients or not; similar logic to the Pipeline 229 // check for error if so we should break out 230 if (!continueProcessing(current, "so breaking out of the routing slip", log)) { 231 break; 232 } 233 } 234 235 // logging nextExchange as it contains the exchange that might have altered the payload and since 236 // we are logging the completion if will be confusing if we log the original instead 237 // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots 238 log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), current); 239 240 // copy results back to the original exchange 241 ExchangeHelper.copyResults(exchange, current); 242 243 callback.done(true); 244 return true; 245 } 246 247 protected Endpoint resolveEndpoint(RoutingSlipIterator iter, Exchange exchange) throws Exception { 248 Object nextRecipient = iter.next(exchange); 249 Endpoint endpoint = null; 250 try { 251 endpoint = ExchangeHelper.resolveEndpoint(exchange, nextRecipient); 252 } catch (Exception e) { 253 if (isIgnoreInvalidEndpoints()) { 254 log.info("Endpoint uri is invalid: " + nextRecipient + ". This exception will be ignored.", e); 255 } else { 256 throw e; 257 } 258 } 259 return endpoint; 260 } 261 262 protected Exchange prepareExchangeForRoutingSlip(Exchange current, Endpoint endpoint) { 263 Exchange copy = new DefaultExchange(current); 264 // we must use the same id as this is a snapshot strategy where Camel copies a snapshot 265 // before processing the next step in the pipeline, so we have a snapshot of the exchange 266 // just before. This snapshot is used if Camel should do redeliveries (re try) using 267 // DeadLetterChannel. That is why it's important the id is the same, as it is the *same* 268 // exchange being routed. 269 copy.setExchangeId(current.getExchangeId()); 270 copyOutToIn(copy, current); 271 return copy; 272 } 273 274 protected boolean processExchange(final Endpoint endpoint, final Exchange exchange, final Exchange original, 275 final AsyncCallback callback, final RoutingSlipIterator iter) { 276 277 // this does the actual processing so log at trace level 278 log.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); 279 280 boolean sync = producerCache.doInAsyncProducer(endpoint, exchange, null, callback, new AsyncProducerCallback() { 281 public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange, 282 ExchangePattern exchangePattern, final AsyncCallback callback) { 283 // set property which endpoint we send to 284 exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); 285 exchange.setProperty(Exchange.SLIP_ENDPOINT, endpoint.getEndpointUri()); 286 287 boolean sync = AsyncProcessorHelper.process(asyncProducer, exchange, new AsyncCallback() { 288 public void done(boolean doneSync) { 289 // we only have to handle async completion of the routing slip 290 if (doneSync) { 291 return; 292 } 293 294 // continue processing the routing slip asynchronously 295 Exchange current = exchange; 296 297 while (iter.hasNext(current)) { 298 299 // we ignore some kind of exceptions and allow us to continue 300 if (isIgnoreInvalidEndpoints()) { 301 FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class); 302 if (e != null) { 303 if (log.isDebugEnabled()) { 304 log.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.", e); 305 } 306 current.setException(null); 307 } 308 } 309 310 // Decide whether to continue with the recipients or not; similar logic to the Pipeline 311 // check for error if so we should break out 312 if (!continueProcessing(current, "so breaking out of the routing slip", log)) { 313 break; 314 } 315 316 Endpoint endpoint; 317 try { 318 endpoint = resolveEndpoint(iter, exchange); 319 // if no endpoint was resolved then try the next 320 if (endpoint == null) { 321 continue; 322 } 323 } catch (Exception e) { 324 // error resolving endpoint so we should break out 325 exchange.setException(e); 326 break; 327 } 328 329 // prepare and process the routing slip 330 Exchange copy = prepareExchangeForRoutingSlip(current, endpoint); 331 boolean sync = processExchange(endpoint, copy, original, callback, iter); 332 current = copy; 333 334 if (!sync) { 335 log.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId()); 336 return; 337 } 338 } 339 340 // logging nextExchange as it contains the exchange that might have altered the payload and since 341 // we are logging the completion if will be confusing if we log the original instead 342 // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots 343 log.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), current); 344 345 // copy results back to the original exchange 346 ExchangeHelper.copyResults(original, current); 347 callback.done(false); 348 } 349 }); 350 351 return sync; 352 } 353 }); 354 355 return sync; 356 } 357 358 protected void doStart() throws Exception { 359 if (producerCache == null) { 360 producerCache = new ProducerCache(this, camelContext); 361 } 362 ServiceHelper.startService(producerCache); 363 } 364 365 protected void doStop() throws Exception { 366 ServiceHelper.stopService(producerCache); 367 } 368 369 protected void doShutdown() throws Exception { 370 ServiceHelper.stopAndShutdownService(producerCache); 371 } 372 373 /** 374 * Returns the outbound message if available. Otherwise return the inbound message. 375 */ 376 private Message getResultMessage(Exchange exchange) { 377 if (exchange.hasOut()) { 378 return exchange.getOut(); 379 } else { 380 // if this endpoint had no out (like a mock endpoint) just take the in 381 return exchange.getIn(); 382 } 383 } 384 385 /** 386 * Copy the outbound data in 'source' to the inbound data in 'result'. 387 */ 388 private void copyOutToIn(Exchange result, Exchange source) { 389 result.setException(source.getException()); 390 391 if (source.hasOut() && source.getOut().isFault()) { 392 result.getOut().copyFrom(source.getOut()); 393 } 394 395 result.setIn(getResultMessage(source)); 396 397 result.getProperties().clear(); 398 result.getProperties().putAll(source.getProperties()); 399 } 400 }