001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.util.concurrent.Callable; 020import java.util.concurrent.ExecutorService; 021 022import org.apache.camel.AsyncCallback; 023import org.apache.camel.AsyncProcessor; 024import org.apache.camel.CamelContext; 025import org.apache.camel.Exchange; 026import org.apache.camel.ExchangePattern; 027import org.apache.camel.Message; 028import org.apache.camel.Ordered; 029import org.apache.camel.Predicate; 030import org.apache.camel.Processor; 031import org.apache.camel.Route; 032import org.apache.camel.Traceable; 033import org.apache.camel.spi.IdAware; 034import org.apache.camel.support.ServiceSupport; 035import org.apache.camel.support.SynchronizationAdapter; 036import org.apache.camel.util.AsyncProcessorHelper; 037import org.apache.camel.util.ExchangeHelper; 038import org.apache.camel.util.ServiceHelper; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import static org.apache.camel.util.ObjectHelper.notNull; 043 044/** 045 * Processor implementing <a href="http://camel.apache.org/oncompletion.html">onCompletion</a>. 046 * 047 * @version 048 */ 049public class OnCompletionProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { 050 051 private static final Logger LOG = LoggerFactory.getLogger(OnCompletionProcessor.class); 052 private final CamelContext camelContext; 053 private String id; 054 private final Processor processor; 055 private final ExecutorService executorService; 056 private final boolean shutdownExecutorService; 057 private final boolean onCompleteOnly; 058 private final boolean onFailureOnly; 059 private final Predicate onWhen; 060 private final boolean useOriginalBody; 061 private final boolean afterConsumer; 062 063 public OnCompletionProcessor(CamelContext camelContext, Processor processor, ExecutorService executorService, boolean shutdownExecutorService, 064 boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen, boolean useOriginalBody, boolean afterConsumer) { 065 notNull(camelContext, "camelContext"); 066 notNull(processor, "processor"); 067 this.camelContext = camelContext; 068 this.processor = processor; 069 this.executorService = executorService; 070 this.shutdownExecutorService = shutdownExecutorService; 071 this.onCompleteOnly = onCompleteOnly; 072 this.onFailureOnly = onFailureOnly; 073 this.onWhen = onWhen; 074 this.useOriginalBody = useOriginalBody; 075 this.afterConsumer = afterConsumer; 076 } 077 078 @Override 079 protected void doStart() throws Exception { 080 ServiceHelper.startService(processor); 081 } 082 083 @Override 084 protected void doStop() throws Exception { 085 ServiceHelper.stopService(processor); 086 } 087 088 @Override 089 protected void doShutdown() throws Exception { 090 ServiceHelper.stopAndShutdownService(processor); 091 if (shutdownExecutorService) { 092 getCamelContext().getExecutorServiceManager().shutdownNow(executorService); 093 } 094 } 095 096 public CamelContext getCamelContext() { 097 return camelContext; 098 } 099 100 public String getId() { 101 return id; 102 } 103 104 public void setId(String id) { 105 this.id = id; 106 } 107 108 public void process(Exchange exchange) throws Exception { 109 AsyncProcessorHelper.process(this, exchange); 110 } 111 112 public boolean process(Exchange exchange, AsyncCallback callback) { 113 if (processor != null) { 114 // register callback 115 if (afterConsumer) { 116 exchange.getUnitOfWork().addSynchronization(new OnCompletionSynchronizationAfterConsumer()); 117 } else { 118 exchange.getUnitOfWork().addSynchronization(new OnCompletionSynchronizationBeforeConsumer()); 119 } 120 } 121 122 callback.done(true); 123 return true; 124 } 125 126 protected boolean isCreateCopy() { 127 // we need to create a correlated copy if we run in parallel mode or is in after consumer mode (as the UoW would be done on the original exchange otherwise) 128 return executorService != null || afterConsumer; 129 } 130 131 /** 132 * Processes the exchange by the processors 133 * 134 * @param processor the processor 135 * @param exchange the exchange 136 */ 137 protected static void doProcess(Processor processor, Exchange exchange) { 138 // must remember some properties which we cannot use during onCompletion processing 139 // as otherwise we may cause issues 140 // but keep the caused exception stored as a property (Exchange.EXCEPTION_CAUGHT) on the exchange 141 Object stop = exchange.removeProperty(Exchange.ROUTE_STOP); 142 Object failureHandled = exchange.removeProperty(Exchange.FAILURE_HANDLED); 143 Object errorhandlerHandled = exchange.removeProperty(Exchange.ERRORHANDLER_HANDLED); 144 Object rollbackOnly = exchange.removeProperty(Exchange.ROLLBACK_ONLY); 145 Object rollbackOnlyLast = exchange.removeProperty(Exchange.ROLLBACK_ONLY_LAST); 146 147 Exception cause = exchange.getException(); 148 exchange.setException(null); 149 150 try { 151 processor.process(exchange); 152 } catch (Exception e) { 153 exchange.setException(e); 154 } finally { 155 // restore the options 156 if (stop != null) { 157 exchange.setProperty(Exchange.ROUTE_STOP, stop); 158 } 159 if (failureHandled != null) { 160 exchange.setProperty(Exchange.FAILURE_HANDLED, failureHandled); 161 } 162 if (errorhandlerHandled != null) { 163 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, errorhandlerHandled); 164 } 165 if (rollbackOnly != null) { 166 exchange.setProperty(Exchange.ROLLBACK_ONLY, rollbackOnly); 167 } 168 if (rollbackOnlyLast != null) { 169 exchange.setProperty(Exchange.ROLLBACK_ONLY_LAST, rollbackOnlyLast); 170 } 171 if (cause != null) { 172 exchange.setException(cause); 173 } 174 } 175 } 176 177 /** 178 * Prepares the {@link Exchange} to send as onCompletion. 179 * 180 * @param exchange the current exchange 181 * @return the exchange to be routed in onComplete 182 */ 183 protected Exchange prepareExchange(Exchange exchange) { 184 Exchange answer; 185 186 if (isCreateCopy()) { 187 // for asynchronous routing we must use a copy as we dont want it 188 // to cause side effects of the original exchange 189 // (the original thread will run in parallel) 190 answer = ExchangeHelper.createCorrelatedCopy(exchange, false); 191 if (answer.hasOut()) { 192 // move OUT to IN (pipes and filters) 193 answer.setIn(answer.getOut()); 194 answer.setOut(null); 195 } 196 // set MEP to InOnly as this onCompletion is a fire and forget 197 answer.setPattern(ExchangePattern.InOnly); 198 } else { 199 // use the exchange as-is 200 answer = exchange; 201 } 202 203 if (useOriginalBody) { 204 LOG.trace("Using the original IN message instead of current"); 205 206 Message original = ExchangeHelper.getOriginalInMessage(exchange); 207 answer.setIn(original); 208 } 209 210 // add a header flag to indicate its a on completion exchange 211 answer.setProperty(Exchange.ON_COMPLETION, Boolean.TRUE); 212 213 return answer; 214 } 215 216 private final class OnCompletionSynchronizationAfterConsumer extends SynchronizationAdapter implements Ordered { 217 218 public int getOrder() { 219 // we want to be last 220 return Ordered.LOWEST; 221 } 222 223 @Override 224 public void onComplete(final Exchange exchange) { 225 if (onFailureOnly) { 226 return; 227 } 228 229 if (onWhen != null && !onWhen.matches(exchange)) { 230 // predicate did not match so do not route the onComplete 231 return; 232 } 233 234 // must use a copy as we dont want it to cause side effects of the original exchange 235 final Exchange copy = prepareExchange(exchange); 236 237 if (executorService != null) { 238 executorService.submit(new Callable<Exchange>() { 239 public Exchange call() throws Exception { 240 LOG.debug("Processing onComplete: {}", copy); 241 doProcess(processor, copy); 242 return copy; 243 } 244 }); 245 } else { 246 // run without thread-pool 247 LOG.debug("Processing onComplete: {}", copy); 248 doProcess(processor, copy); 249 } 250 } 251 252 public void onFailure(final Exchange exchange) { 253 if (onCompleteOnly) { 254 return; 255 } 256 257 if (onWhen != null && !onWhen.matches(exchange)) { 258 // predicate did not match so do not route the onComplete 259 return; 260 } 261 262 263 // must use a copy as we dont want it to cause side effects of the original exchange 264 final Exchange copy = prepareExchange(exchange); 265 final Exception original = copy.getException(); 266 final boolean originalFault = copy.hasOut() ? copy.getOut().isFault() : copy.getIn().isFault(); 267 // must remove exception otherwise onFailure routing will fail as well 268 // the caused exception is stored as a property (Exchange.EXCEPTION_CAUGHT) on the exchange 269 copy.setException(null); 270 // must clear fault otherwise onFailure routing will fail as well 271 if (copy.hasOut()) { 272 copy.getOut().setFault(false); 273 } else { 274 copy.getIn().setFault(false); 275 } 276 277 if (executorService != null) { 278 executorService.submit(new Callable<Exchange>() { 279 public Exchange call() throws Exception { 280 LOG.debug("Processing onFailure: {}", copy); 281 doProcess(processor, copy); 282 // restore exception after processing 283 copy.setException(original); 284 return null; 285 } 286 }); 287 } else { 288 // run without thread-pool 289 LOG.debug("Processing onFailure: {}", copy); 290 doProcess(processor, copy); 291 // restore exception after processing 292 copy.setException(original); 293 // restore fault after processing 294 if (copy.hasOut()) { 295 copy.getOut().setFault(originalFault); 296 } else { 297 copy.getIn().setFault(originalFault); 298 } 299 } 300 } 301 302 @Override 303 public String toString() { 304 if (!onCompleteOnly && !onFailureOnly) { 305 return "onCompleteOrFailure"; 306 } else if (onCompleteOnly) { 307 return "onCompleteOnly"; 308 } else { 309 return "onFailureOnly"; 310 } 311 } 312 } 313 314 private final class OnCompletionSynchronizationBeforeConsumer extends SynchronizationAdapter implements Ordered { 315 316 public int getOrder() { 317 // we want to be last 318 return Ordered.LOWEST; 319 } 320 321 @Override 322 public void onAfterRoute(Route route, Exchange exchange) { 323 if (exchange.isFailed() && onCompleteOnly) { 324 return; 325 } 326 327 if (!exchange.isFailed() && onFailureOnly) { 328 return; 329 } 330 331 if (onWhen != null && !onWhen.matches(exchange)) { 332 // predicate did not match so do not route the onComplete 333 return; 334 } 335 336 // must use a copy as we dont want it to cause side effects of the original exchange 337 final Exchange copy = prepareExchange(exchange); 338 339 if (executorService != null) { 340 executorService.submit(new Callable<Exchange>() { 341 public Exchange call() throws Exception { 342 LOG.debug("Processing onAfterRoute: {}", copy); 343 doProcess(processor, copy); 344 return copy; 345 } 346 }); 347 } else { 348 // run without thread-pool 349 LOG.debug("Processing onAfterRoute: {}", copy); 350 doProcess(processor, copy); 351 } 352 } 353 354 @Override 355 public String toString() { 356 return "onAfterRoute"; 357 } 358 } 359 360 @Override 361 public String toString() { 362 return "OnCompletionProcessor[" + processor + "]"; 363 } 364 365 public String getTraceLabel() { 366 return "onCompletion"; 367 } 368}