001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import org.apache.camel.AsyncCallback;
020    import org.apache.camel.AsyncProcessor;
021    import org.apache.camel.Exchange;
022    import org.apache.camel.Processor;
023    import org.apache.camel.impl.DefaultUnitOfWork;
024    import org.apache.camel.impl.MDCUnitOfWork;
025    import org.apache.camel.spi.RouteContext;
026    import org.apache.camel.spi.UnitOfWork;
027    import org.apache.camel.util.AsyncProcessorHelper;
028    import org.slf4j.Logger;
029    import org.slf4j.LoggerFactory;
030    
031    /**
032     * Ensures the {@link Exchange} is routed under the boundaries of an {@link org.apache.camel.spi.UnitOfWork}.
033     * <p/>
034     * Handles calling the {@link org.apache.camel.spi.UnitOfWork#done(org.apache.camel.Exchange)} method
035     * when processing of an {@link Exchange} is complete.
036     */
037    public class UnitOfWorkProcessor extends DelegateAsyncProcessor {
038    
039        private static final transient Logger LOG = LoggerFactory.getLogger(UnitOfWorkProcessor.class);
040        private final RouteContext routeContext;
041        private final String routeId;
042    
043        public UnitOfWorkProcessor(Processor processor) {
044            this(null, processor);
045        }
046    
047        public UnitOfWorkProcessor(AsyncProcessor processor) {
048            this(null, processor);
049        }
050    
051        public UnitOfWorkProcessor(RouteContext routeContext, Processor processor) {
052            super(processor);
053            this.routeContext = routeContext;
054            if (routeContext != null) {
055                this.routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
056            } else {
057                this.routeId = null;
058            }
059        }
060    
061        public UnitOfWorkProcessor(RouteContext routeContext, AsyncProcessor processor) {
062            super(processor);
063            this.routeContext = routeContext;
064            if (routeContext != null) {
065                this.routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
066            } else {
067                this.routeId = null;
068            }
069        }
070    
071        @Override
072        public String toString() {
073            return "UnitOfWork(" + processor + ")";
074        }
075    
076        public RouteContext getRouteContext() {
077            return routeContext;
078        }
079    
080        @Override
081        protected void doStart() throws Exception {
082            // if a route context has been configured, then wrap the processor with a
083            // RouteContextProcessor to ensure we track the route context properly during
084            // processing of the exchange, but only do this once
085            if (routeContext != null && (!(processor instanceof RouteContextProcessor))) {
086                processor = new RouteContextProcessor(routeContext, processor);
087            }
088            super.doStart();
089        }
090    
091        @Override
092        public boolean process(final Exchange exchange, final AsyncCallback callback) {
093            // if the exchange doesn't have from route id set, then set it if it originated
094            // from this unit of work
095            if (routeId != null && exchange.getFromRouteId() == null) {
096                exchange.setFromRouteId(routeId);
097            }
098    
099            if (exchange.getUnitOfWork() == null) {
100                // If there is no existing UoW, then we should start one and
101                // terminate it once processing is completed for the exchange.
102                final UnitOfWork uow = createUnitOfWork(exchange);
103                exchange.setUnitOfWork(uow);
104                try {
105                    uow.start();
106                } catch (Exception e) {
107                    callback.done(true);
108                    exchange.setException(e);
109                    return true;
110                }
111    
112                Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
113                if (synchronous != null) {
114                    // the exchange signalled to process synchronously
115                    return processSync(exchange, callback, uow);
116                } else {
117                    return processAsync(exchange, callback, uow);
118                }
119            } else {
120                // There was an existing UoW, so we should just pass through..
121                // so that the guy the initiated the UoW can terminate it.
122                return processor.process(exchange, callback);
123            }
124        }
125    
126        protected boolean processSync(final Exchange exchange, final AsyncCallback callback, final UnitOfWork uow) {
127            LOG.trace("Exchange marked UnitOfWork to be processed synchronously: {}", exchange);
128    
129            // process the exchange synchronously
130            try {
131                AsyncProcessorHelper.process(processor, exchange);
132            } catch (Throwable e) {
133                exchange.setException(e);
134            }
135    
136            try {
137                callback.done(true);
138            } finally {
139                doneUow(uow, exchange);
140            }
141    
142            return true;
143        }
144    
145        protected boolean processAsync(final Exchange exchange, final AsyncCallback callback, final UnitOfWork uow) {
146            LOG.trace("Processing exchange asynchronously: {}", exchange);
147    
148            // process the exchange asynchronously
149            try {
150                return processor.process(exchange, new AsyncCallback() {
151                    public void done(boolean doneSync) {
152                        // Order here matters. We need to complete the callbacks
153                        // since they will likely update the exchange with some final results.
154                        try {
155                            callback.done(doneSync);
156                        } finally {
157                            doneUow(uow, exchange);
158                        }
159                    }
160                });
161            } catch (Throwable e) {
162                LOG.warn("Caught unhandled exception while processing ExchangeId: " + exchange.getExchangeId(), e);
163    
164                // fallback and catch any exceptions the process may not have caught
165                // we must ensure to done the UoW in all cases and issue done on the callback
166                exchange.setException(e);
167    
168                // Order here matters. We need to complete the callbacks
169                // since they will likely update the exchange with some final results.
170                try {
171                    callback.done(true);
172                } finally {
173                    doneUow(uow, exchange);
174                }
175                return true;
176            }
177        }
178    
179        /**
180         * Strategy to create the unit of work for the given exchange.
181         *
182         * @param exchange the exchange
183         * @return the created unit of work
184         */
185        protected UnitOfWork createUnitOfWork(Exchange exchange) {
186            UnitOfWork answer;
187            if (exchange.getContext().isUseMDCLogging()) {
188                answer = new MDCUnitOfWork(exchange);
189            } else {
190                answer = new DefaultUnitOfWork(exchange);
191            }
192            return answer;
193        }
194    
195        private void doneUow(UnitOfWork uow, Exchange exchange) {
196            // unit of work is done
197            try {
198                if (uow != null) {
199                    uow.done(exchange);
200                }
201            } catch (Throwable e) {
202                LOG.warn("Exception occurred during done UnitOfWork for Exchange: " + exchange
203                        + ". This exception will be ignored.", e);
204            }
205            try {
206                if (uow != null) {
207                    uow.stop();
208                }
209            } catch (Throwable e) {
210                LOG.warn("Exception occurred during stopping UnitOfWork for Exchange: " + exchange
211                        + ". This exception will be ignored.", e);
212            }
213    
214            // remove uow from exchange as its done
215            exchange.setUnitOfWork(null);
216        }
217    
218    }