001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.ArrayList;
020    import java.util.Date;
021    import java.util.Iterator;
022    import java.util.LinkedHashSet;
023    import java.util.List;
024    import java.util.Set;
025    import java.util.Stack;
026    
027    import org.apache.camel.AsyncCallback;
028    import org.apache.camel.CamelContext;
029    import org.apache.camel.CamelUnitOfWorkException;
030    import org.apache.camel.Exchange;
031    import org.apache.camel.Message;
032    import org.apache.camel.Processor;
033    import org.apache.camel.Service;
034    import org.apache.camel.spi.RouteContext;
035    import org.apache.camel.spi.SubUnitOfWork;
036    import org.apache.camel.spi.SubUnitOfWorkCallback;
037    import org.apache.camel.spi.Synchronization;
038    import org.apache.camel.spi.SynchronizationVetoable;
039    import org.apache.camel.spi.TracedRouteNodes;
040    import org.apache.camel.spi.UnitOfWork;
041    import org.apache.camel.util.EventHelper;
042    import org.apache.camel.util.UnitOfWorkHelper;
043    import org.slf4j.Logger;
044    import org.slf4j.LoggerFactory;
045    
046    /**
047     * The default implementation of {@link org.apache.camel.spi.UnitOfWork}
048     */
049    public class DefaultUnitOfWork implements UnitOfWork, Service {
050        private static final Logger LOG = LoggerFactory.getLogger(DefaultUnitOfWork.class);
051    
052        // TODO: This implementation seems to have transformed itself into a to broad concern
053        // where unit of work is doing a bit more work than the transactional aspect that ties
054        // to its name. Maybe this implementation should be named ExchangeContext and we can
055        // introduce a simpler UnitOfWork concept. This would also allow us to refactor the
056        // SubUnitOfWork into a general parent/child unit of work concept. However this
057        // requires API changes and thus is best kept for Camel 3.0
058    
059        private UnitOfWork parent;
060        private String id;
061        private CamelContext context;
062        private List<Synchronization> synchronizations;
063        private Message originalInMessage;
064        private final TracedRouteNodes tracedRouteNodes;
065        private Set<Object> transactedBy;
066        private final Stack<RouteContext> routeContextStack = new Stack<RouteContext>();
067        private Stack<DefaultSubUnitOfWork> subUnitOfWorks;
068        private final transient Logger log;
069        
070        public DefaultUnitOfWork(Exchange exchange) {
071            this(exchange, LOG);
072        }
073    
074        protected DefaultUnitOfWork(Exchange exchange, Logger logger) {
075            log = logger;
076            if (log.isTraceEnabled()) {
077                log.trace("UnitOfWork created for ExchangeId: {} with {}", exchange.getExchangeId(), exchange);
078            }
079            tracedRouteNodes = new DefaultTracedRouteNodes();
080            context = exchange.getContext();
081    
082            // TODO: Camel 3.0: the copy on facade strategy will help us here in the future
083            // TODO: optimize to only copy original message if enabled to do so in the route
084            // special for JmsMessage as it can cause it to loose headers later.
085            // This will be resolved when we get the message facade with copy on write implemented
086            if (exchange.getIn().getClass().getSimpleName().equals("JmsMessage")) {
087                this.originalInMessage = new DefaultMessage();
088                this.originalInMessage.setBody(exchange.getIn().getBody());
089                this.originalInMessage.setHeaders(exchange.getIn().getHeaders());
090            } else {
091                this.originalInMessage = exchange.getIn().copy();
092            }
093    
094            // TODO: Optimize to only copy if useOriginalMessage has been enabled
095    
096            // mark the creation time when this Exchange was created
097            if (exchange.getProperty(Exchange.CREATED_TIMESTAMP) == null) {
098                exchange.setProperty(Exchange.CREATED_TIMESTAMP, new Date());
099            }
100    
101            // inject breadcrumb header if enabled
102            if (exchange.getContext().isUseBreadcrumb()) {
103                // create or use existing breadcrumb
104                String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
105                if (breadcrumbId == null) {
106                    // no existing breadcrumb, so create a new one based on the message id
107                    breadcrumbId = exchange.getIn().getMessageId();
108                    exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, breadcrumbId);
109                }
110            }
111            
112            // setup whether the exchange is externally redelivered or not (if not initialized before)
113            // store as property so we know that the origin exchange was redelivered
114            if (exchange.getProperty(Exchange.EXTERNAL_REDELIVERED) == null) {
115                exchange.setProperty(Exchange.EXTERNAL_REDELIVERED, exchange.isExternalRedelivered());
116            }
117    
118            // fire event
119            try {
120                EventHelper.notifyExchangeCreated(exchange.getContext(), exchange);
121            } catch (Throwable e) {
122                // must catch exceptions to ensure the exchange is not failing due to notification event failed
123                log.warn("Exception occurred during event notification. This exception will be ignored.", e);
124            }
125    
126            // register to inflight registry
127            if (exchange.getContext() != null) {
128                exchange.getContext().getInflightRepository().add(exchange);
129            }
130        }
131    
132        UnitOfWork newInstance(Exchange exchange) {
133            return new DefaultUnitOfWork(exchange);
134        }
135    
136        @Override
137        public void setParentUnitOfWork(UnitOfWork parentUnitOfWork) {
138            this.parent = parentUnitOfWork;
139        }
140    
141        public UnitOfWork createChildUnitOfWork(Exchange childExchange) {
142            // create a new child unit of work, and mark me as its parent
143            UnitOfWork answer = newInstance(childExchange);
144            answer.setParentUnitOfWork(this);
145            return answer;
146        }
147    
148        public void start() throws Exception {
149            id = null;
150        }
151    
152        public void stop() throws Exception {
153            // need to clean up when we are stopping to not leak memory
154            if (synchronizations != null) {
155                synchronizations.clear();
156            }
157            if (tracedRouteNodes != null) {
158                tracedRouteNodes.clear();
159            }
160            if (transactedBy != null) {
161                transactedBy.clear();
162            }
163            synchronized (routeContextStack) {
164                if (!routeContextStack.isEmpty()) {
165                    routeContextStack.clear();
166                }
167            }
168            if (subUnitOfWorks != null) {
169                subUnitOfWorks.clear();
170            }
171            originalInMessage = null;
172            parent = null;
173            id = null;
174        }
175    
176        public synchronized void addSynchronization(Synchronization synchronization) {
177            if (synchronizations == null) {
178                synchronizations = new ArrayList<Synchronization>();
179            }
180            log.trace("Adding synchronization {}", synchronization);
181            synchronizations.add(synchronization);
182        }
183    
184        public synchronized void removeSynchronization(Synchronization synchronization) {
185            if (synchronizations != null) {
186                synchronizations.remove(synchronization);
187            }
188        }
189    
190        public synchronized boolean containsSynchronization(Synchronization synchronization) {
191            return synchronizations != null && synchronizations.contains(synchronization);
192        }
193    
194        public void handoverSynchronization(Exchange target) {
195            if (synchronizations == null || synchronizations.isEmpty()) {
196                return;
197            }
198    
199            Iterator<Synchronization> it = synchronizations.iterator();
200            while (it.hasNext()) {
201                Synchronization synchronization = it.next();
202    
203                boolean handover = true;
204                if (synchronization instanceof SynchronizationVetoable) {
205                    SynchronizationVetoable veto = (SynchronizationVetoable) synchronization;
206                    handover = veto.allowHandover();
207                }
208    
209                if (handover) {
210                    log.trace("Handover synchronization {} to: {}", synchronization, target);
211                    target.addOnCompletion(synchronization);
212                    // remove it if its handed over
213                    it.remove();
214                } else {
215                    log.trace("Handover not allow for synchronization {}", synchronization);
216                }
217            }
218        }
219    
220        public void done(Exchange exchange) {
221            log.trace("UnitOfWork done for ExchangeId: {} with {}", exchange.getExchangeId(), exchange);
222    
223            boolean failed = exchange.isFailed();
224    
225            // at first done the synchronizations
226            UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, log);
227    
228            // notify uow callback if in use
229            try {
230                SubUnitOfWorkCallback uowCallback = getSubUnitOfWorkCallback();
231                if (uowCallback != null) {
232                    uowCallback.onDone(exchange);
233                }
234            } catch (Throwable e) {
235                // must catch exceptions to ensure synchronizations is also invoked
236                log.warn("Exception occurred during savepoint onDone. This exception will be ignored.", e);
237            }
238    
239            // unregister from inflight registry, before signalling we are done
240            if (exchange.getContext() != null) {
241                exchange.getContext().getInflightRepository().remove(exchange);
242            }
243    
244            // then fire event to signal the exchange is done
245            try {
246                if (failed) {
247                    EventHelper.notifyExchangeFailed(exchange.getContext(), exchange);
248                } else {
249                    EventHelper.notifyExchangeDone(exchange.getContext(), exchange);
250                }
251            } catch (Throwable e) {
252                // must catch exceptions to ensure synchronizations is also invoked
253                log.warn("Exception occurred during event notification. This exception will be ignored.", e);
254            }
255        }
256    
257        public String getId() {
258            if (id == null) {
259                id = context.getUuidGenerator().generateUuid();
260            }
261            return id;
262        }
263    
264        public Message getOriginalInMessage() {
265            return originalInMessage;
266        }
267    
268        public TracedRouteNodes getTracedRouteNodes() {
269            return tracedRouteNodes;
270        }
271    
272        public boolean isTransacted() {
273            return transactedBy != null && !transactedBy.isEmpty();
274        }
275    
276        public boolean isTransactedBy(Object key) {
277            return getTransactedBy().contains(key);
278        }
279    
280        public void beginTransactedBy(Object key) {
281            getTransactedBy().add(key);
282        }
283    
284        public void endTransactedBy(Object key) {
285            getTransactedBy().remove(key);
286        }
287    
288        public RouteContext getRouteContext() {
289            synchronized (routeContextStack) {
290                if (routeContextStack.isEmpty()) {
291                    return null;
292                }
293                return routeContextStack.peek();
294            }
295        }
296    
297        public void pushRouteContext(RouteContext routeContext) {
298            synchronized (routeContextStack) {
299                routeContextStack.add(routeContext);
300            }
301        }
302    
303        public RouteContext popRouteContext() {
304            synchronized (routeContextStack) {
305                if (routeContextStack.isEmpty()) {
306                    return null;
307                }
308                return routeContextStack.pop();
309            }
310        }
311    
312        public AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback) {
313            // no wrapping needed
314            return callback;
315        }
316    
317        public void afterProcess(Processor processor, Exchange exchange, AsyncCallback callback, boolean doneSync) {
318        }
319    
320        @Override
321        public void beginSubUnitOfWork(Exchange exchange) {
322            if (log.isTraceEnabled()) {
323                log.trace("beginSubUnitOfWork exchangeId: {}", exchange.getExchangeId());
324            }
325    
326            if (subUnitOfWorks == null) {
327                subUnitOfWorks = new Stack<DefaultSubUnitOfWork>();
328            }
329            subUnitOfWorks.push(new DefaultSubUnitOfWork());
330        }
331    
332        @Override
333        public void endSubUnitOfWork(Exchange exchange) {
334            if (log.isTraceEnabled()) {
335                log.trace("endSubUnitOfWork exchangeId: {}", exchange.getExchangeId());
336            }
337    
338            if (subUnitOfWorks == null || subUnitOfWorks.isEmpty()) {
339                return;
340            }
341    
342            // pop last sub unit of work as its now ended
343            SubUnitOfWork subUoW = subUnitOfWorks.pop();
344            if (subUoW.isFailed()) {
345                // the sub unit of work failed so set an exception containing all the caused exceptions
346                // and mark the exchange for rollback only
347    
348                // if there are multiple exceptions then wrap those into another exception with them all
349                Exception cause;
350                List<Exception> list = subUoW.getExceptions();
351                if (list != null) {
352                    if (list.size() == 1) {
353                        cause = list.get(0);
354                    } else {
355                        cause = new CamelUnitOfWorkException(exchange, list);
356                    }
357                    exchange.setException(cause);
358                }
359                // mark it as rollback and that the unit of work is exhausted. This ensures that we do not try
360                // to redeliver this exception (again)
361                exchange.setProperty(Exchange.ROLLBACK_ONLY, true);
362                exchange.setProperty(Exchange.UNIT_OF_WORK_EXHAUSTED, true);
363                // and remove any indications of error handled which will prevent this exception to be noticed
364                // by the error handler which we want to react with the result of the sub unit of work
365                exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, null);
366                exchange.setProperty(Exchange.FAILURE_HANDLED, null);
367                if (log.isTraceEnabled()) {
368                    log.trace("endSubUnitOfWork exchangeId: {} with {} caused exceptions.", exchange.getExchangeId(), list != null ? list.size() : 0);
369                }
370            }
371        }
372    
373        @Override
374        public SubUnitOfWorkCallback getSubUnitOfWorkCallback() {
375            // if there is a parent-child relationship between unit of works
376            // then we should use the callback strategies from the parent
377            if (parent != null) {
378                return parent.getSubUnitOfWorkCallback();
379            }
380    
381            if (subUnitOfWorks == null || subUnitOfWorks.isEmpty()) {
382                return null;
383            }
384            return subUnitOfWorks.peek();
385        }
386    
387        private Set<Object> getTransactedBy() {
388            if (transactedBy == null) {
389                transactedBy = new LinkedHashSet<Object>();
390            }
391            return transactedBy;
392        }
393    
394        @Override
395        public String toString() {
396            return "DefaultUnitOfWork";
397        }
398    }