001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import org.apache.camel.AsyncCallback;
020    import org.apache.camel.Exchange;
021    import org.apache.camel.Processor;
022    import org.apache.camel.spi.RouteContext;
023    import org.apache.camel.spi.UnitOfWork;
024    import org.slf4j.Logger;
025    import org.slf4j.LoggerFactory;
026    import org.slf4j.MDC;
027    
028    /**
029     * This unit of work supports <a href="http://www.slf4j.org/api/org/slf4j/MDC.html">MDC</a>.
030     *
031     * @version 
032     */
033    public class MDCUnitOfWork extends DefaultUnitOfWork {
034    
035        public static final String MDC_BREADCRUMB_ID = "camel.breadcrumbId";
036        public static final String MDC_EXCHANGE_ID = "camel.exchangeId";
037        public static final String MDC_MESSAGE_ID = "camel.messageId";
038        public static final String MDC_CORRELATION_ID = "camel.correlationId";
039        public static final String MDC_ROUTE_ID = "camel.routeId";
040        public static final String MDC_CAMEL_CONTEXT_ID = "camel.contextId";
041        public static final String MDC_TRANSACTION_KEY = "camel.transactionKey";
042    
043        private static final Logger LOG = LoggerFactory.getLogger(MDCUnitOfWork.class);
044    
045        private final String originalBreadcrumbId;
046        private final String originalExchangeId;
047        private final String originalMessageId;
048        private final String originalCorrelationId;
049        private final String originalRouteId;
050        private final String originalCamelContextId;
051        private final String originalTransactionKey;
052    
053        public MDCUnitOfWork(Exchange exchange) {
054            super(exchange, LOG);
055    
056            // remember existing values
057            this.originalExchangeId = MDC.get(MDC_EXCHANGE_ID);
058            this.originalMessageId = MDC.get(MDC_MESSAGE_ID);
059            this.originalBreadcrumbId = MDC.get(MDC_BREADCRUMB_ID);
060            this.originalCorrelationId = MDC.get(MDC_CORRELATION_ID);
061            this.originalRouteId = MDC.get(MDC_ROUTE_ID);
062            this.originalCamelContextId = MDC.get(MDC_CAMEL_CONTEXT_ID);
063            this.originalTransactionKey = MDC.get(MDC_TRANSACTION_KEY);
064    
065            // must add exchange and message id in constructor
066            MDC.put(MDC_EXCHANGE_ID, exchange.getExchangeId());
067            String msgId = exchange.hasOut() ? exchange.getOut().getMessageId() : exchange.getIn().getMessageId();
068            MDC.put(MDC_MESSAGE_ID, msgId);
069            // the camel context id is from exchange
070            MDC.put(MDC_CAMEL_CONTEXT_ID, exchange.getContext().getName());
071            // and add optional correlation id
072            String corrId = exchange.getProperty(Exchange.CORRELATION_ID, String.class);
073            if (corrId != null) {
074                MDC.put(MDC_CORRELATION_ID, corrId);
075            }
076            // and add optional breadcrumb id
077            String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
078            if (breadcrumbId != null) {
079                MDC.put(MDC_BREADCRUMB_ID, breadcrumbId);
080            }
081        }
082    
083        @Override
084        public UnitOfWork newInstance(Exchange exchange) {
085            return new MDCUnitOfWork(exchange);
086        }
087    
088        @Override
089        public void stop() throws Exception {
090            super.stop();
091            // and remove when stopping
092            clear();
093        }
094    
095        @Override
096        public void pushRouteContext(RouteContext routeContext) {
097            MDC.put(MDC_ROUTE_ID, routeContext.getRoute().getId());
098            super.pushRouteContext(routeContext);
099        }
100    
101        @Override
102        public RouteContext popRouteContext() {
103            MDC.remove(MDC_ROUTE_ID);
104            return super.popRouteContext();
105        }
106    
107        @Override
108        public void beginTransactedBy(Object key) {
109            MDC.put(MDC_TRANSACTION_KEY, key.toString());
110            super.beginTransactedBy(key);
111        }
112    
113        @Override
114        public void endTransactedBy(Object key) {
115            MDC.remove(MDC_TRANSACTION_KEY);
116            super.endTransactedBy(key);
117        }
118    
119        @Override
120        public AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback) {
121            return new MDCCallback(callback);
122        }
123    
124        @Override
125        public void afterProcess(Processor processor, Exchange exchange, AsyncCallback callback, boolean doneSync) {
126            if (!doneSync) {
127                // must clear MDC on current thread as the exchange is being processed asynchronously
128                // by another thread
129                clear();
130            }
131            super.afterProcess(processor, exchange, callback, doneSync);
132        }
133    
134        /**
135         * Clears information put on the MDC by this {@link MDCUnitOfWork}
136         */
137        public void clear() {
138            if (this.originalBreadcrumbId != null) {
139                MDC.put(MDC_BREADCRUMB_ID, originalBreadcrumbId);
140            } else {
141                MDC.remove(MDC_BREADCRUMB_ID);
142            }
143            if (this.originalExchangeId != null) {
144                MDC.put(MDC_EXCHANGE_ID, originalExchangeId);
145            } else {
146                MDC.remove(MDC_EXCHANGE_ID);
147            }
148            if (this.originalMessageId != null) {
149                MDC.put(MDC_MESSAGE_ID, originalMessageId);
150            } else {
151                MDC.remove(MDC_MESSAGE_ID);
152            }
153            if (this.originalCorrelationId != null) {
154                MDC.put(MDC_CORRELATION_ID, originalCorrelationId);
155            } else {
156                MDC.remove(MDC_CORRELATION_ID);
157            }
158            if (this.originalRouteId != null) {
159                MDC.put(MDC_ROUTE_ID, originalRouteId);
160            } else {
161                MDC.remove(MDC_ROUTE_ID);
162            }
163            if (this.originalCamelContextId != null) {
164                MDC.put(MDC_CAMEL_CONTEXT_ID, originalCamelContextId);
165            } else {
166                MDC.remove(MDC_CAMEL_CONTEXT_ID);
167            }
168            if (this.originalTransactionKey != null) {
169                MDC.put(MDC_TRANSACTION_KEY, originalTransactionKey);
170            } else {
171                MDC.remove(MDC_TRANSACTION_KEY);
172            }
173        }
174    
175        @Override
176        public String toString() {
177            return "MDCUnitOfWork";
178        }
179    
180        /**
181         * {@link AsyncCallback} which preserves {@link org.slf4j.MDC} when
182         * the asynchronous routing engine is being used.
183         */
184        private static final class MDCCallback implements AsyncCallback {
185    
186            private final AsyncCallback delegate;
187            private final String breadcrumbId;
188            private final String exchangeId;
189            private final String messageId;
190            private final String correlationId;
191            private final String routeId;
192            private final String camelContextId;
193    
194            private MDCCallback(AsyncCallback delegate) {
195                this.delegate = delegate;
196                this.exchangeId = MDC.get(MDC_EXCHANGE_ID);
197                this.messageId = MDC.get(MDC_MESSAGE_ID);
198                this.breadcrumbId = MDC.get(MDC_BREADCRUMB_ID);
199                this.correlationId = MDC.get(MDC_CORRELATION_ID);
200                this.camelContextId = MDC.get(MDC_CAMEL_CONTEXT_ID);
201    
202                String routeId = MDC.get(MDC_ROUTE_ID);
203                if (routeId != null) {
204                    // intern route id as this reduces memory allocations
205                    this.routeId = routeId.intern();
206                } else {
207                    this.routeId = null;
208                }
209            }
210    
211            public void done(boolean doneSync) {
212                try {
213                    if (!doneSync) {
214                        // when done asynchronously then restore information from previous thread
215                        if (breadcrumbId != null) {
216                            MDC.put(MDC_BREADCRUMB_ID, breadcrumbId);
217                        }
218                        if (exchangeId != null) {
219                            MDC.put(MDC_EXCHANGE_ID, exchangeId);
220                        }
221                        if (messageId != null) {
222                            MDC.put(MDC_MESSAGE_ID, messageId);
223                        }
224                        if (correlationId != null) {
225                            MDC.put(MDC_CORRELATION_ID, correlationId);
226                        }
227                        if (routeId != null) {
228                            MDC.put(MDC_ROUTE_ID, routeId);
229                        }
230                        if (camelContextId != null) {
231                            MDC.put(MDC_CAMEL_CONTEXT_ID, camelContextId);
232                        }
233                    }
234                } finally {
235                    // muse ensure delegate is invoked
236                    delegate.done(doneSync);
237                }
238            }
239    
240            @Override
241            public String toString() {
242                return delegate.toString();
243            }
244        }
245    
246    }