001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import org.apache.camel.AsyncCallback; 020 import org.apache.camel.Exchange; 021 import org.apache.camel.Processor; 022 import org.apache.camel.spi.RouteContext; 023 import org.apache.camel.spi.UnitOfWork; 024 import org.slf4j.Logger; 025 import org.slf4j.LoggerFactory; 026 import org.slf4j.MDC; 027 028 /** 029 * This unit of work supports <a href="http://www.slf4j.org/api/org/slf4j/MDC.html">MDC</a>. 030 * 031 * @version 032 */ 033 public class MDCUnitOfWork extends DefaultUnitOfWork { 034 035 public static final String MDC_BREADCRUMB_ID = "camel.breadcrumbId"; 036 public static final String MDC_EXCHANGE_ID = "camel.exchangeId"; 037 public static final String MDC_MESSAGE_ID = "camel.messageId"; 038 public static final String MDC_CORRELATION_ID = "camel.correlationId"; 039 public static final String MDC_ROUTE_ID = "camel.routeId"; 040 public static final String MDC_CAMEL_CONTEXT_ID = "camel.contextId"; 041 public static final String MDC_TRANSACTION_KEY = "camel.transactionKey"; 042 043 private static final Logger LOG = LoggerFactory.getLogger(MDCUnitOfWork.class); 044 045 private final String originalBreadcrumbId; 046 private final String originalExchangeId; 047 private final String originalMessageId; 048 private final String originalCorrelationId; 049 private final String originalRouteId; 050 private final String originalCamelContextId; 051 private final String originalTransactionKey; 052 053 public MDCUnitOfWork(Exchange exchange) { 054 super(exchange, LOG); 055 056 // remember existing values 057 this.originalExchangeId = MDC.get(MDC_EXCHANGE_ID); 058 this.originalMessageId = MDC.get(MDC_MESSAGE_ID); 059 this.originalBreadcrumbId = MDC.get(MDC_BREADCRUMB_ID); 060 this.originalCorrelationId = MDC.get(MDC_CORRELATION_ID); 061 this.originalRouteId = MDC.get(MDC_ROUTE_ID); 062 this.originalCamelContextId = MDC.get(MDC_CAMEL_CONTEXT_ID); 063 this.originalTransactionKey = MDC.get(MDC_TRANSACTION_KEY); 064 065 // must add exchange and message id in constructor 066 MDC.put(MDC_EXCHANGE_ID, exchange.getExchangeId()); 067 String msgId = exchange.hasOut() ? exchange.getOut().getMessageId() : exchange.getIn().getMessageId(); 068 MDC.put(MDC_MESSAGE_ID, msgId); 069 // the camel context id is from exchange 070 MDC.put(MDC_CAMEL_CONTEXT_ID, exchange.getContext().getName()); 071 // and add optional correlation id 072 String corrId = exchange.getProperty(Exchange.CORRELATION_ID, String.class); 073 if (corrId != null) { 074 MDC.put(MDC_CORRELATION_ID, corrId); 075 } 076 // and add optional breadcrumb id 077 String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class); 078 if (breadcrumbId != null) { 079 MDC.put(MDC_BREADCRUMB_ID, breadcrumbId); 080 } 081 } 082 083 @Override 084 public UnitOfWork newInstance(Exchange exchange) { 085 return new MDCUnitOfWork(exchange); 086 } 087 088 @Override 089 public void stop() throws Exception { 090 super.stop(); 091 // and remove when stopping 092 clear(); 093 } 094 095 @Override 096 public void pushRouteContext(RouteContext routeContext) { 097 MDC.put(MDC_ROUTE_ID, routeContext.getRoute().getId()); 098 super.pushRouteContext(routeContext); 099 } 100 101 @Override 102 public RouteContext popRouteContext() { 103 MDC.remove(MDC_ROUTE_ID); 104 return super.popRouteContext(); 105 } 106 107 @Override 108 public void beginTransactedBy(Object key) { 109 MDC.put(MDC_TRANSACTION_KEY, key.toString()); 110 super.beginTransactedBy(key); 111 } 112 113 @Override 114 public void endTransactedBy(Object key) { 115 MDC.remove(MDC_TRANSACTION_KEY); 116 super.endTransactedBy(key); 117 } 118 119 @Override 120 public AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback) { 121 return new MDCCallback(callback); 122 } 123 124 @Override 125 public void afterProcess(Processor processor, Exchange exchange, AsyncCallback callback, boolean doneSync) { 126 if (!doneSync) { 127 // must clear MDC on current thread as the exchange is being processed asynchronously 128 // by another thread 129 clear(); 130 } 131 super.afterProcess(processor, exchange, callback, doneSync); 132 } 133 134 /** 135 * Clears information put on the MDC by this {@link MDCUnitOfWork} 136 */ 137 public void clear() { 138 if (this.originalBreadcrumbId != null) { 139 MDC.put(MDC_BREADCRUMB_ID, originalBreadcrumbId); 140 } else { 141 MDC.remove(MDC_BREADCRUMB_ID); 142 } 143 if (this.originalExchangeId != null) { 144 MDC.put(MDC_EXCHANGE_ID, originalExchangeId); 145 } else { 146 MDC.remove(MDC_EXCHANGE_ID); 147 } 148 if (this.originalMessageId != null) { 149 MDC.put(MDC_MESSAGE_ID, originalMessageId); 150 } else { 151 MDC.remove(MDC_MESSAGE_ID); 152 } 153 if (this.originalCorrelationId != null) { 154 MDC.put(MDC_CORRELATION_ID, originalCorrelationId); 155 } else { 156 MDC.remove(MDC_CORRELATION_ID); 157 } 158 if (this.originalRouteId != null) { 159 MDC.put(MDC_ROUTE_ID, originalRouteId); 160 } else { 161 MDC.remove(MDC_ROUTE_ID); 162 } 163 if (this.originalCamelContextId != null) { 164 MDC.put(MDC_CAMEL_CONTEXT_ID, originalCamelContextId); 165 } else { 166 MDC.remove(MDC_CAMEL_CONTEXT_ID); 167 } 168 if (this.originalTransactionKey != null) { 169 MDC.put(MDC_TRANSACTION_KEY, originalTransactionKey); 170 } else { 171 MDC.remove(MDC_TRANSACTION_KEY); 172 } 173 } 174 175 @Override 176 public String toString() { 177 return "MDCUnitOfWork"; 178 } 179 180 /** 181 * {@link AsyncCallback} which preserves {@link org.slf4j.MDC} when 182 * the asynchronous routing engine is being used. 183 */ 184 private static final class MDCCallback implements AsyncCallback { 185 186 private final AsyncCallback delegate; 187 private final String breadcrumbId; 188 private final String exchangeId; 189 private final String messageId; 190 private final String correlationId; 191 private final String routeId; 192 private final String camelContextId; 193 194 private MDCCallback(AsyncCallback delegate) { 195 this.delegate = delegate; 196 this.exchangeId = MDC.get(MDC_EXCHANGE_ID); 197 this.messageId = MDC.get(MDC_MESSAGE_ID); 198 this.breadcrumbId = MDC.get(MDC_BREADCRUMB_ID); 199 this.correlationId = MDC.get(MDC_CORRELATION_ID); 200 this.camelContextId = MDC.get(MDC_CAMEL_CONTEXT_ID); 201 202 String routeId = MDC.get(MDC_ROUTE_ID); 203 if (routeId != null) { 204 // intern route id as this reduces memory allocations 205 this.routeId = routeId.intern(); 206 } else { 207 this.routeId = null; 208 } 209 } 210 211 public void done(boolean doneSync) { 212 try { 213 if (!doneSync) { 214 // when done asynchronously then restore information from previous thread 215 if (breadcrumbId != null) { 216 MDC.put(MDC_BREADCRUMB_ID, breadcrumbId); 217 } 218 if (exchangeId != null) { 219 MDC.put(MDC_EXCHANGE_ID, exchangeId); 220 } 221 if (messageId != null) { 222 MDC.put(MDC_MESSAGE_ID, messageId); 223 } 224 if (correlationId != null) { 225 MDC.put(MDC_CORRELATION_ID, correlationId); 226 } 227 if (routeId != null) { 228 MDC.put(MDC_ROUTE_ID, routeId); 229 } 230 if (camelContextId != null) { 231 MDC.put(MDC_CAMEL_CONTEXT_ID, camelContextId); 232 } 233 } 234 } finally { 235 // muse ensure delegate is invoked 236 delegate.done(doneSync); 237 } 238 } 239 240 @Override 241 public String toString() { 242 return delegate.toString(); 243 } 244 } 245 246 }