001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import org.apache.camel.AsyncCallback;
020import org.apache.camel.Exchange;
021import org.apache.camel.Processor;
022import org.apache.camel.spi.RouteContext;
023import org.apache.camel.spi.UnitOfWork;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026import org.slf4j.MDC;
027
028/**
029 * This unit of work supports <a href="http://www.slf4j.org/api/org/slf4j/MDC.html">MDC</a>.
030 *
031 * @version 
032 */
033public class MDCUnitOfWork extends DefaultUnitOfWork {
034
035    public static final String MDC_BREADCRUMB_ID = "camel.breadcrumbId";
036    public static final String MDC_EXCHANGE_ID = "camel.exchangeId";
037    public static final String MDC_MESSAGE_ID = "camel.messageId";
038    public static final String MDC_CORRELATION_ID = "camel.correlationId";
039    public static final String MDC_ROUTE_ID = "camel.routeId";
040    public static final String MDC_CAMEL_CONTEXT_ID = "camel.contextId";
041    public static final String MDC_TRANSACTION_KEY = "camel.transactionKey";
042
043    private static final Logger LOG = LoggerFactory.getLogger(MDCUnitOfWork.class);
044
045    private final String originalBreadcrumbId;
046    private final String originalExchangeId;
047    private final String originalMessageId;
048    private final String originalCorrelationId;
049    private final String originalRouteId;
050    private final String originalCamelContextId;
051    private final String originalTransactionKey;
052
053    public MDCUnitOfWork(Exchange exchange) {
054        super(exchange, LOG);
055
056        // remember existing values
057        this.originalExchangeId = MDC.get(MDC_EXCHANGE_ID);
058        this.originalMessageId = MDC.get(MDC_MESSAGE_ID);
059        this.originalBreadcrumbId = MDC.get(MDC_BREADCRUMB_ID);
060        this.originalCorrelationId = MDC.get(MDC_CORRELATION_ID);
061        this.originalRouteId = MDC.get(MDC_ROUTE_ID);
062        this.originalCamelContextId = MDC.get(MDC_CAMEL_CONTEXT_ID);
063        this.originalTransactionKey = MDC.get(MDC_TRANSACTION_KEY);
064
065        // must add exchange and message id in constructor
066        MDC.put(MDC_EXCHANGE_ID, exchange.getExchangeId());
067        String msgId = exchange.hasOut() ? exchange.getOut().getMessageId() : exchange.getIn().getMessageId();
068        MDC.put(MDC_MESSAGE_ID, msgId);
069        // the camel context id is from exchange
070        MDC.put(MDC_CAMEL_CONTEXT_ID, exchange.getContext().getName());
071        // and add optional correlation id
072        String corrId = exchange.getProperty(Exchange.CORRELATION_ID, String.class);
073        if (corrId != null) {
074            MDC.put(MDC_CORRELATION_ID, corrId);
075        }
076        // and add optional breadcrumb id
077        String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
078        if (breadcrumbId != null) {
079            MDC.put(MDC_BREADCRUMB_ID, breadcrumbId);
080        }
081    }
082
083    @Override
084    public UnitOfWork newInstance(Exchange exchange) {
085        return new MDCUnitOfWork(exchange);
086    }
087
088    @Override
089    public void stop() throws Exception {
090        super.stop();
091        // and remove when stopping
092        clear();
093    }
094
095    @Override
096    public void pushRouteContext(RouteContext routeContext) {
097        MDC.put(MDC_ROUTE_ID, routeContext.getRoute().getId());
098        super.pushRouteContext(routeContext);
099    }
100
101    @Override
102    public RouteContext popRouteContext() {
103        MDC.remove(MDC_ROUTE_ID);
104        return super.popRouteContext();
105    }
106
107    @Override
108    public void beginTransactedBy(Object key) {
109        MDC.put(MDC_TRANSACTION_KEY, key.toString());
110        super.beginTransactedBy(key);
111    }
112
113    @Override
114    public void endTransactedBy(Object key) {
115        MDC.remove(MDC_TRANSACTION_KEY);
116        super.endTransactedBy(key);
117    }
118
119    @Override
120    public AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback) {
121        return new MDCCallback(callback);
122    }
123
124    @Override
125    public void afterProcess(Processor processor, Exchange exchange, AsyncCallback callback, boolean doneSync) {
126        if (!doneSync) {
127            // must clear MDC on current thread as the exchange is being processed asynchronously
128            // by another thread
129            clear();
130        }
131        super.afterProcess(processor, exchange, callback, doneSync);
132    }
133
134    /**
135     * Clears information put on the MDC by this {@link MDCUnitOfWork}
136     */
137    public void clear() {
138        if (this.originalBreadcrumbId != null) {
139            MDC.put(MDC_BREADCRUMB_ID, originalBreadcrumbId);
140        } else {
141            MDC.remove(MDC_BREADCRUMB_ID);
142        }
143        if (this.originalExchangeId != null) {
144            MDC.put(MDC_EXCHANGE_ID, originalExchangeId);
145        } else {
146            MDC.remove(MDC_EXCHANGE_ID);
147        }
148        if (this.originalMessageId != null) {
149            MDC.put(MDC_MESSAGE_ID, originalMessageId);
150        } else {
151            MDC.remove(MDC_MESSAGE_ID);
152        }
153        if (this.originalCorrelationId != null) {
154            MDC.put(MDC_CORRELATION_ID, originalCorrelationId);
155        } else {
156            MDC.remove(MDC_CORRELATION_ID);
157        }
158        if (this.originalRouteId != null) {
159            MDC.put(MDC_ROUTE_ID, originalRouteId);
160        } else {
161            MDC.remove(MDC_ROUTE_ID);
162        }
163        if (this.originalCamelContextId != null) {
164            MDC.put(MDC_CAMEL_CONTEXT_ID, originalCamelContextId);
165        } else {
166            MDC.remove(MDC_CAMEL_CONTEXT_ID);
167        }
168        if (this.originalTransactionKey != null) {
169            MDC.put(MDC_TRANSACTION_KEY, originalTransactionKey);
170        } else {
171            MDC.remove(MDC_TRANSACTION_KEY);
172        }
173    }
174
175    @Override
176    public String toString() {
177        return "MDCUnitOfWork";
178    }
179
180    /**
181     * {@link AsyncCallback} which preserves {@link org.slf4j.MDC} when
182     * the asynchronous routing engine is being used.
183     */
184    private static final class MDCCallback implements AsyncCallback {
185
186        private final AsyncCallback delegate;
187        private final String breadcrumbId;
188        private final String exchangeId;
189        private final String messageId;
190        private final String correlationId;
191        private final String routeId;
192        private final String camelContextId;
193
194        private MDCCallback(AsyncCallback delegate) {
195            this.delegate = delegate;
196            this.exchangeId = MDC.get(MDC_EXCHANGE_ID);
197            this.messageId = MDC.get(MDC_MESSAGE_ID);
198            this.breadcrumbId = MDC.get(MDC_BREADCRUMB_ID);
199            this.correlationId = MDC.get(MDC_CORRELATION_ID);
200            this.camelContextId = MDC.get(MDC_CAMEL_CONTEXT_ID);
201            this.routeId = MDC.get(MDC_ROUTE_ID);
202        }
203
204        public void done(boolean doneSync) {
205            try {
206                if (!doneSync) {
207                    // when done asynchronously then restore information from previous thread
208                    if (breadcrumbId != null) {
209                        MDC.put(MDC_BREADCRUMB_ID, breadcrumbId);
210                    }
211                    if (exchangeId != null) {
212                        MDC.put(MDC_EXCHANGE_ID, exchangeId);
213                    }
214                    if (messageId != null) {
215                        MDC.put(MDC_MESSAGE_ID, messageId);
216                    }
217                    if (correlationId != null) {
218                        MDC.put(MDC_CORRELATION_ID, correlationId);
219                    }
220                    if (camelContextId != null) {
221                        MDC.put(MDC_CAMEL_CONTEXT_ID, camelContextId);
222                    }
223                }
224                // need to setup the routeId finally
225                if (routeId != null) {
226                    MDC.put(MDC_ROUTE_ID, routeId);
227                }
228                
229            } finally {
230                // muse ensure delegate is invoked
231                delegate.done(doneSync);
232            }
233        }
234
235        @Override
236        public String toString() {
237            return delegate.toString();
238        }
239    }
240
241}