001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import org.apache.camel.AsyncCallback; 020 import org.apache.camel.AsyncProcessor; 021 import org.apache.camel.Exchange; 022 import org.apache.camel.Processor; 023 import org.apache.camel.impl.DefaultUnitOfWork; 024 import org.apache.camel.impl.MDCUnitOfWork; 025 import org.apache.camel.spi.RouteContext; 026 import org.apache.camel.spi.UnitOfWork; 027 import org.apache.camel.util.AsyncProcessorHelper; 028 import org.slf4j.Logger; 029 import org.slf4j.LoggerFactory; 030 031 /** 032 * Ensures the {@link Exchange} is routed under the boundaries of an {@link org.apache.camel.spi.UnitOfWork}. 033 * <p/> 034 * Handles calling the {@link org.apache.camel.spi.UnitOfWork#done(org.apache.camel.Exchange)} method 035 * when processing of an {@link Exchange} is complete. 036 */ 037 public class UnitOfWorkProcessor extends DelegateAsyncProcessor { 038 039 private static final transient Logger LOG = LoggerFactory.getLogger(UnitOfWorkProcessor.class); 040 private final RouteContext routeContext; 041 private final String routeId; 042 043 public UnitOfWorkProcessor(Processor processor) { 044 this(null, processor); 045 } 046 047 public UnitOfWorkProcessor(AsyncProcessor processor) { 048 this(null, processor); 049 } 050 051 public UnitOfWorkProcessor(RouteContext routeContext, Processor processor) { 052 super(processor); 053 this.routeContext = routeContext; 054 if (routeContext != null) { 055 this.routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); 056 } else { 057 this.routeId = null; 058 } 059 } 060 061 public UnitOfWorkProcessor(RouteContext routeContext, AsyncProcessor processor) { 062 super(processor); 063 this.routeContext = routeContext; 064 if (routeContext != null) { 065 this.routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); 066 } else { 067 this.routeId = null; 068 } 069 } 070 071 @Override 072 public String toString() { 073 return "UnitOfWork(" + processor + ")"; 074 } 075 076 public RouteContext getRouteContext() { 077 return routeContext; 078 } 079 080 @Override 081 protected void doStart() throws Exception { 082 // if a route context has been configured, then wrap the processor with a 083 // RouteContextProcessor to ensure we track the route context properly during 084 // processing of the exchange, but only do this once 085 if (routeContext != null && (!(processor instanceof RouteContextProcessor))) { 086 processor = new RouteContextProcessor(routeContext, processor); 087 } 088 super.doStart(); 089 } 090 091 @Override 092 public boolean process(final Exchange exchange, final AsyncCallback callback) { 093 // if the exchange doesn't have from route id set, then set it if it originated 094 // from this unit of work 095 if (routeId != null && exchange.getFromRouteId() == null) { 096 exchange.setFromRouteId(routeId); 097 } 098 099 if (exchange.getUnitOfWork() == null) { 100 // If there is no existing UoW, then we should start one and 101 // terminate it once processing is completed for the exchange. 102 final UnitOfWork uow = createUnitOfWork(exchange); 103 exchange.setUnitOfWork(uow); 104 try { 105 uow.start(); 106 } catch (Exception e) { 107 callback.done(true); 108 exchange.setException(e); 109 return true; 110 } 111 112 Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC); 113 if (synchronous != null) { 114 // the exchange signalled to process synchronously 115 return processSync(exchange, callback, uow); 116 } else { 117 return processAsync(exchange, callback, uow); 118 } 119 } else { 120 // There was an existing UoW, so we should just pass through.. 121 // so that the guy the initiated the UoW can terminate it. 122 return processor.process(exchange, callback); 123 } 124 } 125 126 protected boolean processSync(final Exchange exchange, final AsyncCallback callback, final UnitOfWork uow) { 127 LOG.trace("Exchange marked UnitOfWork to be processed synchronously: {}", exchange); 128 129 // process the exchange synchronously 130 try { 131 AsyncProcessorHelper.process(processor, exchange); 132 } catch (Throwable e) { 133 exchange.setException(e); 134 } 135 136 try { 137 callback.done(true); 138 } finally { 139 doneUow(uow, exchange); 140 } 141 142 return true; 143 } 144 145 protected boolean processAsync(final Exchange exchange, final AsyncCallback callback, final UnitOfWork uow) { 146 LOG.trace("Processing exchange asynchronously: {}", exchange); 147 148 // process the exchange asynchronously 149 try { 150 return processor.process(exchange, new AsyncCallback() { 151 public void done(boolean doneSync) { 152 // Order here matters. We need to complete the callbacks 153 // since they will likely update the exchange with some final results. 154 try { 155 callback.done(doneSync); 156 } finally { 157 doneUow(uow, exchange); 158 } 159 } 160 }); 161 } catch (Throwable e) { 162 LOG.warn("Caught unhandled exception while processing ExchangeId: " + exchange.getExchangeId(), e); 163 164 // fallback and catch any exceptions the process may not have caught 165 // we must ensure to done the UoW in all cases and issue done on the callback 166 exchange.setException(e); 167 168 // Order here matters. We need to complete the callbacks 169 // since they will likely update the exchange with some final results. 170 try { 171 callback.done(true); 172 } finally { 173 doneUow(uow, exchange); 174 } 175 return true; 176 } 177 } 178 179 /** 180 * Strategy to create the unit of work for the given exchange. 181 * 182 * @param exchange the exchange 183 * @return the created unit of work 184 */ 185 protected UnitOfWork createUnitOfWork(Exchange exchange) { 186 UnitOfWork answer; 187 if (exchange.getContext().isUseMDCLogging()) { 188 answer = new MDCUnitOfWork(exchange); 189 } else { 190 answer = new DefaultUnitOfWork(exchange); 191 } 192 return answer; 193 } 194 195 private void doneUow(UnitOfWork uow, Exchange exchange) { 196 // unit of work is done 197 try { 198 if (uow != null) { 199 uow.done(exchange); 200 } 201 } catch (Throwable e) { 202 LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange 203 + ". This exception will be ignored.", e); 204 } 205 try { 206 if (uow != null) { 207 uow.stop(); 208 } 209 } catch (Throwable e) { 210 LOG.warn("Exception occurred during stopping UnitOfWork for Exchange: " + exchange 211 + ". This exception will be ignored.", e); 212 } 213 214 // remove uow from exchange as its done 215 exchange.setUnitOfWork(null); 216 } 217 218 }