001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.ArrayList; 020 import java.util.Collection; 021 import java.util.Iterator; 022 import java.util.List; 023 024 import org.apache.camel.AsyncCallback; 025 import org.apache.camel.AsyncProcessor; 026 import org.apache.camel.Exchange; 027 import org.apache.camel.Navigate; 028 import org.apache.camel.Processor; 029 import org.apache.camel.Traceable; 030 import org.apache.camel.support.ServiceSupport; 031 import org.apache.camel.util.AsyncProcessorConverterHelper; 032 import org.apache.camel.util.AsyncProcessorHelper; 033 import org.apache.camel.util.ExchangeHelper; 034 import org.apache.camel.util.ServiceHelper; 035 import org.slf4j.Logger; 036 import org.slf4j.LoggerFactory; 037 038 /** 039 * Implements try/catch/finally type processing 040 * 041 * @version 042 */ 043 public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable { 044 private static final transient Logger LOG = LoggerFactory.getLogger(TryProcessor.class); 045 046 protected final AsyncProcessor tryProcessor; 047 protected final DoCatchProcessor catchProcessor; 048 protected final DoFinallyProcessor finallyProcessor; 049 private List<AsyncProcessor> processors; 050 051 public TryProcessor(Processor tryProcessor, List<CatchProcessor> catchClauses, Processor finallyProcessor) { 052 this.tryProcessor = AsyncProcessorConverterHelper.convert(tryProcessor); 053 this.catchProcessor = new DoCatchProcessor(catchClauses); 054 this.finallyProcessor = new DoFinallyProcessor(finallyProcessor); 055 } 056 057 public String toString() { 058 String finallyText = (finallyProcessor == null) ? "" : " Finally {" + finallyProcessor + "}"; 059 return "Try {" + tryProcessor + "} " + (catchProcessor != null ? catchProcessor : "") + finallyText; 060 } 061 062 public String getTraceLabel() { 063 return "doTry"; 064 } 065 066 public void process(Exchange exchange) throws Exception { 067 AsyncProcessorHelper.process(this, exchange); 068 } 069 070 public boolean process(Exchange exchange, AsyncCallback callback) { 071 Iterator<AsyncProcessor> processors = getProcessors().iterator(); 072 073 while (continueRouting(processors, exchange)) { 074 ExchangeHelper.prepareOutToIn(exchange); 075 076 // process the next processor 077 AsyncProcessor processor = processors.next(); 078 boolean sync = process(exchange, callback, processor, processors); 079 080 // continue as long its being processed synchronously 081 if (!sync) { 082 LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); 083 // the remainder of the try .. catch .. finally will be completed async 084 // so we break out now, then the callback will be invoked which then continue routing from where we left here 085 return false; 086 } 087 088 LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); 089 } 090 091 ExchangeHelper.prepareOutToIn(exchange); 092 LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); 093 callback.done(true); 094 return true; 095 } 096 097 protected boolean process(final Exchange exchange, final AsyncCallback callback, 098 final AsyncProcessor processor, final Iterator<AsyncProcessor> processors) { 099 // this does the actual processing so log at trace level 100 LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); 101 102 // implement asynchronous routing logic in callback so we can have the callback being 103 // triggered and then continue routing where we left 104 boolean sync = AsyncProcessorHelper.process(processor, exchange, new AsyncCallback() { 105 public void done(boolean doneSync) { 106 // we only have to handle async completion of the pipeline 107 if (doneSync) { 108 return; 109 } 110 111 // continue processing the try .. catch .. finally asynchronously 112 while (continueRouting(processors, exchange)) { 113 ExchangeHelper.prepareOutToIn(exchange); 114 115 // process the next processor 116 AsyncProcessor processor = processors.next(); 117 doneSync = process(exchange, callback, processor, processors); 118 119 if (!doneSync) { 120 LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); 121 // the remainder of the try .. catch .. finally will be completed async 122 // so we break out now, then the callback will be invoked which then continue routing from where we left here 123 return; 124 } 125 } 126 127 ExchangeHelper.prepareOutToIn(exchange); 128 LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); 129 callback.done(false); 130 } 131 }); 132 133 return sync; 134 } 135 136 protected Collection<AsyncProcessor> getProcessors() { 137 return processors; 138 } 139 140 protected boolean continueRouting(Iterator<AsyncProcessor> it, Exchange exchange) { 141 Object stop = exchange.getProperty(Exchange.ROUTE_STOP); 142 if (stop != null) { 143 boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop); 144 if (doStop) { 145 LOG.debug("Exchange is marked to stop routing: {}", exchange); 146 return false; 147 } 148 } 149 150 // continue if there are more processors to route 151 return it.hasNext(); 152 } 153 154 protected void doStart() throws Exception { 155 processors = new ArrayList<AsyncProcessor>(); 156 processors.add(tryProcessor); 157 processors.add(catchProcessor); 158 processors.add(finallyProcessor); 159 ServiceHelper.startServices(tryProcessor, catchProcessor, finallyProcessor); 160 } 161 162 protected void doStop() throws Exception { 163 ServiceHelper.stopServices(finallyProcessor, catchProcessor, tryProcessor); 164 processors.clear(); 165 } 166 167 public List<Processor> next() { 168 if (!hasNext()) { 169 return null; 170 } 171 List<Processor> answer = new ArrayList<Processor>(); 172 if (tryProcessor != null) { 173 answer.add(tryProcessor); 174 } 175 if (catchProcessor != null) { 176 answer.add(catchProcessor); 177 } 178 if (finallyProcessor != null) { 179 answer.add(finallyProcessor); 180 } 181 return answer; 182 } 183 184 public boolean hasNext() { 185 return tryProcessor != null; 186 } 187 188 /** 189 * Processor to handle do catch supporting asynchronous routing engine 190 */ 191 private final class DoCatchProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable { 192 193 private final List<CatchProcessor> catchClauses; 194 195 private DoCatchProcessor(List<CatchProcessor> catchClauses) { 196 this.catchClauses = catchClauses; 197 } 198 199 public void process(Exchange exchange) throws Exception { 200 AsyncProcessorHelper.process(this, exchange); 201 } 202 203 public boolean process(final Exchange exchange, final AsyncCallback callback) { 204 Exception e = exchange.getException(); 205 206 if (catchClauses == null || e == null) { 207 return true; 208 } 209 210 // find a catch clause to use 211 CatchProcessor processor = null; 212 for (CatchProcessor catchClause : catchClauses) { 213 Throwable caught = catchClause.catches(exchange, e); 214 if (caught != null) { 215 if (LOG.isTraceEnabled()) { 216 LOG.trace("This TryProcessor catches the exception: {} caused by: {}", caught.getClass().getName(), e.getMessage()); 217 } 218 processor = catchClause; 219 break; 220 } 221 } 222 223 if (processor != null) { 224 // create the handle processor which performs the actual logic 225 // this processor just lookup the right catch clause to use and then let the 226 // HandleDoCatchProcessor do all the hard work (separate of concerns) 227 HandleDoCatchProcessor cool = new HandleDoCatchProcessor(processor); 228 return AsyncProcessorHelper.process(cool, exchange, callback); 229 } else { 230 if (LOG.isTraceEnabled()) { 231 LOG.trace("This TryProcessor does not catch the exception: {} caused by: {}", e.getClass().getName(), e.getMessage()); 232 } 233 } 234 235 return true; 236 } 237 238 @Override 239 protected void doStart() throws Exception { 240 ServiceHelper.startService(catchClauses); 241 } 242 243 @Override 244 protected void doStop() throws Exception { 245 ServiceHelper.stopServices(catchClauses); 246 } 247 248 @Override 249 public String toString() { 250 return "Catches{" + catchClauses + "}"; 251 } 252 253 public String getTraceLabel() { 254 return "doCatch"; 255 } 256 257 public List<Processor> next() { 258 List<Processor> answer = new ArrayList<Processor>(); 259 if (catchProcessor != null) { 260 answer.addAll(catchClauses); 261 } 262 return answer; 263 } 264 265 public boolean hasNext() { 266 return catchClauses != null && catchClauses.size() > 0; 267 } 268 } 269 270 /** 271 * Processor to handle do finally supporting asynchronous routing engine 272 */ 273 private final class DoFinallyProcessor extends DelegateAsyncProcessor implements Traceable { 274 275 private DoFinallyProcessor(Processor processor) { 276 super(processor); 277 } 278 279 @Override 280 protected boolean processNext(final Exchange exchange, final AsyncCallback callback) { 281 // clear exception so finally block can be executed 282 final Exception e = exchange.getException(); 283 exchange.setException(null); 284 // but store the caught exception as a property 285 if (e != null) { 286 exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); 287 } 288 // store the last to endpoint as the failure endpoint 289 if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) { 290 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); 291 } 292 293 boolean sync = super.processNext(exchange, new AsyncCallback() { 294 public void done(boolean doneSync) { 295 // we only have to handle async completion of the pipeline 296 if (doneSync) { 297 return; 298 } 299 300 if (e == null) { 301 exchange.removeProperty(Exchange.FAILURE_ENDPOINT); 302 } else { 303 // set exception back on exchange 304 exchange.setException(e); 305 exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); 306 } 307 308 // signal callback to continue routing async 309 ExchangeHelper.prepareOutToIn(exchange); 310 LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); 311 callback.done(false); 312 } 313 }); 314 315 if (sync) { 316 if (e == null) { 317 exchange.removeProperty(Exchange.FAILURE_ENDPOINT); 318 } else { 319 // set exception back on exchange 320 exchange.setException(e); 321 exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); 322 } 323 } 324 325 return sync; 326 } 327 328 @Override 329 public String toString() { 330 return "Finally{" + getProcessor() + "}"; 331 } 332 333 public String getTraceLabel() { 334 return "doFinally"; 335 } 336 } 337 338 /** 339 * Processor to handle do catch supporting asynchronous routing engine 340 */ 341 private final class HandleDoCatchProcessor extends DelegateAsyncProcessor { 342 343 private final CatchProcessor catchClause; 344 345 private HandleDoCatchProcessor(CatchProcessor processor) { 346 super(processor); 347 this.catchClause = processor; 348 } 349 350 @Override 351 protected boolean processNext(final Exchange exchange, final AsyncCallback callback) { 352 final Exception caught = exchange.getException(); 353 if (caught == null) { 354 return true; 355 } 356 357 // store the last to endpoint as the failure endpoint 358 if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) { 359 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); 360 } 361 // give the rest of the pipeline another chance 362 exchange.setProperty(Exchange.EXCEPTION_CAUGHT, caught); 363 exchange.setException(null); 364 // and we should not be regarded as exhausted as we are in a try .. catch block 365 exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); 366 367 // is the exception handled by the catch clause 368 final Boolean handled = catchClause.handles(exchange); 369 370 if (LOG.isDebugEnabled()) { 371 LOG.debug("The exception is handled: {} for the exception: {} caused by: {}", 372 new Object[]{handled, caught.getClass().getName(), caught.getMessage()}); 373 } 374 375 boolean sync = super.processNext(exchange, new AsyncCallback() { 376 public void done(boolean doneSync) { 377 // we only have to handle async completion of the pipeline 378 if (doneSync) { 379 return; 380 } 381 382 if (!handled) { 383 if (exchange.getException() == null) { 384 exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); 385 } 386 } 387 // always clear redelivery exhausted in a catch clause 388 exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); 389 390 // signal callback to continue routing async 391 ExchangeHelper.prepareOutToIn(exchange); 392 callback.done(false); 393 } 394 }); 395 396 if (sync) { 397 // set exception back on exchange 398 if (!handled) { 399 if (exchange.getException() == null) { 400 exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); 401 } 402 } 403 // always clear redelivery exhausted in a catch clause 404 exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); 405 } 406 407 return sync; 408 } 409 } 410 411 }