001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.ArrayList;
020import java.util.Date;
021import java.util.Iterator;
022import java.util.LinkedHashSet;
023import java.util.List;
024import java.util.Set;
025import java.util.Stack;
026import java.util.function.Predicate;
027
028import org.apache.camel.AsyncCallback;
029import org.apache.camel.CamelContext;
030import org.apache.camel.CamelUnitOfWorkException;
031import org.apache.camel.Exchange;
032import org.apache.camel.Message;
033import org.apache.camel.Processor;
034import org.apache.camel.Route;
035import org.apache.camel.Service;
036import org.apache.camel.spi.RouteContext;
037import org.apache.camel.spi.SubUnitOfWork;
038import org.apache.camel.spi.SubUnitOfWorkCallback;
039import org.apache.camel.spi.Synchronization;
040import org.apache.camel.spi.SynchronizationVetoable;
041import org.apache.camel.spi.TracedRouteNodes;
042import org.apache.camel.spi.UnitOfWork;
043import org.apache.camel.util.EventHelper;
044import org.apache.camel.util.UnitOfWorkHelper;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * The default implementation of {@link org.apache.camel.spi.UnitOfWork}
050 */
051public class DefaultUnitOfWork implements UnitOfWork, Service {
052    private static final Logger LOG = LoggerFactory.getLogger(DefaultUnitOfWork.class);
053
054    // TODO: This implementation seems to have transformed itself into a to broad concern
055    // where unit of work is doing a bit more work than the transactional aspect that ties
056    // to its name. Maybe this implementation should be named ExchangeContext and we can
057    // introduce a simpler UnitOfWork concept. This would also allow us to refactor the
058    // SubUnitOfWork into a general parent/child unit of work concept. However this
059    // requires API changes and thus is best kept for Camel 3.0
060
061    private UnitOfWork parent;
062    private String id;
063    private CamelContext context;
064    private List<Synchronization> synchronizations;
065    private Message originalInMessage;
066    private final TracedRouteNodes tracedRouteNodes;
067    private Set<Object> transactedBy;
068    private final Stack<RouteContext> routeContextStack = new Stack<RouteContext>();
069    private Stack<DefaultSubUnitOfWork> subUnitOfWorks;
070    private final transient Logger log;
071    
072    public DefaultUnitOfWork(Exchange exchange) {
073        this(exchange, LOG);
074    }
075
076    protected DefaultUnitOfWork(Exchange exchange, Logger logger) {
077        log = logger;
078        if (log.isTraceEnabled()) {
079            log.trace("UnitOfWork created for ExchangeId: {} with {}", exchange.getExchangeId(), exchange);
080        }
081        tracedRouteNodes = new DefaultTracedRouteNodes();
082        context = exchange.getContext();
083
084        if (context.isAllowUseOriginalMessage()) {
085            // special for JmsMessage as it can cause it to loose headers later.
086            if (exchange.getIn().getClass().getName().equals("org.apache.camel.component.jms.JmsMessage")) {
087                this.originalInMessage = new DefaultMessage();
088                this.originalInMessage.setBody(exchange.getIn().getBody());
089                this.originalInMessage.getHeaders().putAll(exchange.getIn().getHeaders());
090            } else {
091                this.originalInMessage = exchange.getIn().copy();
092            }
093        }
094
095        // mark the creation time when this Exchange was created
096        if (exchange.getProperty(Exchange.CREATED_TIMESTAMP) == null) {
097            exchange.setProperty(Exchange.CREATED_TIMESTAMP, new Date());
098        }
099
100        // inject breadcrumb header if enabled
101        if (exchange.getContext().isUseBreadcrumb()) {
102            // create or use existing breadcrumb
103            String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
104            if (breadcrumbId == null) {
105                // no existing breadcrumb, so create a new one based on the message id
106                breadcrumbId = exchange.getIn().getMessageId();
107                exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, breadcrumbId);
108            }
109        }
110        
111        // setup whether the exchange is externally redelivered or not (if not initialized before)
112        // store as property so we know that the origin exchange was redelivered
113        if (exchange.getProperty(Exchange.EXTERNAL_REDELIVERED) == null) {
114            exchange.setProperty(Exchange.EXTERNAL_REDELIVERED, exchange.isExternalRedelivered());
115        }
116
117        // fire event
118        try {
119            EventHelper.notifyExchangeCreated(exchange.getContext(), exchange);
120        } catch (Throwable e) {
121            // must catch exceptions to ensure the exchange is not failing due to notification event failed
122            log.warn("Exception occurred during event notification. This exception will be ignored.", e);
123        }
124
125        // register to inflight registry
126        if (exchange.getContext() != null) {
127            exchange.getContext().getInflightRepository().add(exchange);
128        }
129    }
130
131    UnitOfWork newInstance(Exchange exchange) {
132        return new DefaultUnitOfWork(exchange);
133    }
134
135    @Override
136    public void setParentUnitOfWork(UnitOfWork parentUnitOfWork) {
137        this.parent = parentUnitOfWork;
138    }
139
140    public UnitOfWork createChildUnitOfWork(Exchange childExchange) {
141        // create a new child unit of work, and mark me as its parent
142        UnitOfWork answer = newInstance(childExchange);
143        answer.setParentUnitOfWork(this);
144        return answer;
145    }
146
147    public void start() throws Exception {
148        id = null;
149    }
150
151    public void stop() throws Exception {
152        // need to clean up when we are stopping to not leak memory
153        if (synchronizations != null) {
154            synchronizations.clear();
155        }
156        if (tracedRouteNodes != null) {
157            tracedRouteNodes.clear();
158        }
159        if (transactedBy != null) {
160            transactedBy.clear();
161        }
162        synchronized (routeContextStack) {
163            if (!routeContextStack.isEmpty()) {
164                routeContextStack.clear();
165            }
166        }
167        if (subUnitOfWorks != null) {
168            subUnitOfWorks.clear();
169        }
170        originalInMessage = null;
171        parent = null;
172        id = null;
173    }
174
175    public synchronized void addSynchronization(Synchronization synchronization) {
176        if (synchronizations == null) {
177            synchronizations = new ArrayList<Synchronization>();
178        }
179        log.trace("Adding synchronization {}", synchronization);
180        synchronizations.add(synchronization);
181    }
182
183    public synchronized void removeSynchronization(Synchronization synchronization) {
184        if (synchronizations != null) {
185            synchronizations.remove(synchronization);
186        }
187    }
188
189    public synchronized boolean containsSynchronization(Synchronization synchronization) {
190        return synchronizations != null && synchronizations.contains(synchronization);
191    }
192
193    public void handoverSynchronization(Exchange target) {
194        handoverSynchronization(target, null);
195    }
196
197    @Override
198    public void handoverSynchronization(Exchange target, Predicate<Synchronization> filter) {
199        if (synchronizations == null || synchronizations.isEmpty()) {
200            return;
201        }
202
203        Iterator<Synchronization> it = synchronizations.iterator();
204        while (it.hasNext()) {
205            Synchronization synchronization = it.next();
206
207            boolean handover = true;
208            if (synchronization instanceof SynchronizationVetoable) {
209                SynchronizationVetoable veto = (SynchronizationVetoable) synchronization;
210                handover = veto.allowHandover();
211            }
212
213            if (handover && (filter == null || filter.test(synchronization))) {
214                log.trace("Handover synchronization {} to: {}", synchronization, target);
215                target.addOnCompletion(synchronization);
216                // remove it if its handed over
217                it.remove();
218            } else {
219                log.trace("Handover not allow for synchronization {}", synchronization);
220            }
221        }
222    }
223
224    public void done(Exchange exchange) {
225        log.trace("UnitOfWork done for ExchangeId: {} with {}", exchange.getExchangeId(), exchange);
226
227        boolean failed = exchange.isFailed();
228
229        // at first done the synchronizations
230        UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations, log);
231
232        // notify uow callback if in use
233        try {
234            SubUnitOfWorkCallback uowCallback = getSubUnitOfWorkCallback();
235            if (uowCallback != null) {
236                uowCallback.onDone(exchange);
237            }
238        } catch (Throwable e) {
239            // must catch exceptions to ensure synchronizations is also invoked
240            log.warn("Exception occurred during savepoint onDone. This exception will be ignored.", e);
241        }
242
243        // unregister from inflight registry, before signalling we are done
244        if (exchange.getContext() != null) {
245            exchange.getContext().getInflightRepository().remove(exchange);
246        }
247
248        // then fire event to signal the exchange is done
249        try {
250            if (failed) {
251                EventHelper.notifyExchangeFailed(exchange.getContext(), exchange);
252            } else {
253                EventHelper.notifyExchangeDone(exchange.getContext(), exchange);
254            }
255        } catch (Throwable e) {
256            // must catch exceptions to ensure synchronizations is also invoked
257            log.warn("Exception occurred during event notification. This exception will be ignored.", e);
258        }
259    }
260
261    @Override
262    public void beforeRoute(Exchange exchange, Route route) {
263        if (log.isTraceEnabled()) {
264            log.trace("UnitOfWork beforeRoute: {} for ExchangeId: {} with {}", new Object[]{route.getId(), exchange.getExchangeId(), exchange});
265        }
266        UnitOfWorkHelper.beforeRouteSynchronizations(route, exchange, synchronizations, log);
267    }
268
269    @Override
270    public void afterRoute(Exchange exchange, Route route) {
271        if (log.isTraceEnabled()) {
272            log.trace("UnitOfWork afterRoute: {} for ExchangeId: {} with {}", new Object[]{route.getId(), exchange.getExchangeId(), exchange});
273        }
274        UnitOfWorkHelper.afterRouteSynchronizations(route, exchange, synchronizations, log);
275    }
276
277    public String getId() {
278        if (id == null) {
279            id = context.getUuidGenerator().generateUuid();
280        }
281        return id;
282    }
283
284    public Message getOriginalInMessage() {
285        if (originalInMessage == null && !context.isAllowUseOriginalMessage()) {
286            throw new IllegalStateException("AllowUseOriginalMessage is disabled. Cannot access the original message.");
287        }
288        return originalInMessage;
289    }
290
291    public TracedRouteNodes getTracedRouteNodes() {
292        return tracedRouteNodes;
293    }
294
295    public boolean isTransacted() {
296        return transactedBy != null && !transactedBy.isEmpty();
297    }
298
299    public boolean isTransactedBy(Object key) {
300        return getTransactedBy().contains(key);
301    }
302
303    public void beginTransactedBy(Object key) {
304        getTransactedBy().add(key);
305    }
306
307    public void endTransactedBy(Object key) {
308        getTransactedBy().remove(key);
309    }
310
311    public RouteContext getRouteContext() {
312        synchronized (routeContextStack) {
313            if (routeContextStack.isEmpty()) {
314                return null;
315            }
316            return routeContextStack.peek();
317        }
318    }
319
320    public void pushRouteContext(RouteContext routeContext) {
321        synchronized (routeContextStack) {
322            routeContextStack.add(routeContext);
323        }
324    }
325
326    public RouteContext popRouteContext() {
327        synchronized (routeContextStack) {
328            if (routeContextStack.isEmpty()) {
329                return null;
330            }
331            return routeContextStack.pop();
332        }
333    }
334
335    public AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback) {
336        // no wrapping needed
337        return callback;
338    }
339
340    public void afterProcess(Processor processor, Exchange exchange, AsyncCallback callback, boolean doneSync) {
341    }
342
343    @Override
344    public void beginSubUnitOfWork(Exchange exchange) {
345        if (log.isTraceEnabled()) {
346            log.trace("beginSubUnitOfWork exchangeId: {}", exchange.getExchangeId());
347        }
348
349        if (subUnitOfWorks == null) {
350            subUnitOfWorks = new Stack<DefaultSubUnitOfWork>();
351        }
352        subUnitOfWorks.push(new DefaultSubUnitOfWork());
353    }
354
355    @Override
356    public void endSubUnitOfWork(Exchange exchange) {
357        if (log.isTraceEnabled()) {
358            log.trace("endSubUnitOfWork exchangeId: {}", exchange.getExchangeId());
359        }
360
361        if (subUnitOfWorks == null || subUnitOfWorks.isEmpty()) {
362            return;
363        }
364
365        // pop last sub unit of work as its now ended
366        SubUnitOfWork subUoW = subUnitOfWorks.pop();
367        if (subUoW.isFailed()) {
368            // the sub unit of work failed so set an exception containing all the caused exceptions
369            // and mark the exchange for rollback only
370
371            // if there are multiple exceptions then wrap those into another exception with them all
372            Exception cause;
373            List<Exception> list = subUoW.getExceptions();
374            if (list != null) {
375                if (list.size() == 1) {
376                    cause = list.get(0);
377                } else {
378                    cause = new CamelUnitOfWorkException(exchange, list);
379                }
380                exchange.setException(cause);
381            }
382            // mark it as rollback and that the unit of work is exhausted. This ensures that we do not try
383            // to redeliver this exception (again)
384            exchange.setProperty(Exchange.ROLLBACK_ONLY, true);
385            exchange.setProperty(Exchange.UNIT_OF_WORK_EXHAUSTED, true);
386            // and remove any indications of error handled which will prevent this exception to be noticed
387            // by the error handler which we want to react with the result of the sub unit of work
388            exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, null);
389            exchange.setProperty(Exchange.FAILURE_HANDLED, null);
390            if (log.isTraceEnabled()) {
391                log.trace("endSubUnitOfWork exchangeId: {} with {} caused exceptions.", exchange.getExchangeId(), list != null ? list.size() : 0);
392            }
393        }
394    }
395
396    @Override
397    public SubUnitOfWorkCallback getSubUnitOfWorkCallback() {
398        // if there is a parent-child relationship between unit of works
399        // then we should use the callback strategies from the parent
400        if (parent != null) {
401            return parent.getSubUnitOfWorkCallback();
402        }
403
404        if (subUnitOfWorks == null || subUnitOfWorks.isEmpty()) {
405            return null;
406        }
407        return subUnitOfWorks.peek();
408    }
409
410    private Set<Object> getTransactedBy() {
411        if (transactedBy == null) {
412            transactedBy = new LinkedHashSet<Object>();
413        }
414        return transactedBy;
415    }
416
417    @Override
418    public String toString() {
419        return "DefaultUnitOfWork";
420    }
421}