001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.Collection; 020import java.util.Collections; 021import java.util.List; 022import java.util.Map; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.RejectedExecutionException; 026import java.util.concurrent.atomic.AtomicLong; 027 028import org.apache.camel.Exchange; 029import org.apache.camel.MessageHistory; 030import org.apache.camel.processor.DefaultExchangeFormatter; 031import org.apache.camel.spi.AsyncProcessorAwaitManager; 032import org.apache.camel.spi.ExchangeFormatter; 033import org.apache.camel.support.ServiceSupport; 034import org.apache.camel.util.MessageHelper; 035import org.apache.camel.util.ObjectHelper; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements AsyncProcessorAwaitManager { 040 041 private static final Logger LOG = LoggerFactory.getLogger(DefaultAsyncProcessorAwaitManager.class); 042 043 private final AsyncProcessorAwaitManager.Statistics statistics = new UtilizationStatistics(); 044 private final AtomicLong blockedCounter = new AtomicLong(); 045 private final AtomicLong interruptedCounter = new AtomicLong(); 046 private final AtomicLong totalDuration = new AtomicLong(); 047 private final AtomicLong minDuration = new AtomicLong(); 048 private final AtomicLong maxDuration = new AtomicLong(); 049 private final AtomicLong meanDuration = new AtomicLong(); 050 051 private final Map<Exchange, AwaitThread> inflight = new ConcurrentHashMap<Exchange, AwaitThread>(); 052 private final ExchangeFormatter exchangeFormatter; 053 private boolean interruptThreadsWhileStopping = true; 054 055 public DefaultAsyncProcessorAwaitManager() { 056 // setup exchange formatter to be used for message history dump 057 DefaultExchangeFormatter formatter = new DefaultExchangeFormatter(); 058 formatter.setShowExchangeId(true); 059 formatter.setMultiline(true); 060 formatter.setShowHeaders(true); 061 formatter.setStyle(DefaultExchangeFormatter.OutputStyle.Fixed); 062 this.exchangeFormatter = formatter; 063 } 064 065 @Override 066 public void await(Exchange exchange, CountDownLatch latch) { 067 LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}", 068 exchange.getExchangeId(), exchange); 069 try { 070 if (statistics.isStatisticsEnabled()) { 071 blockedCounter.incrementAndGet(); 072 } 073 inflight.put(exchange, new AwaitThreadEntry(Thread.currentThread(), exchange, latch)); 074 latch.await(); 075 LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}", 076 exchange.getExchangeId(), exchange); 077 078 } catch (InterruptedException e) { 079 LOG.trace("Interrupted while waiting for callback, will continue routing exchangeId: {} -> {}", 080 exchange.getExchangeId(), exchange); 081 exchange.setException(e); 082 } finally { 083 AwaitThread thread = inflight.remove(exchange); 084 085 if (statistics.isStatisticsEnabled() && thread != null) { 086 long time = thread.getWaitDuration(); 087 long total = totalDuration.get() + time; 088 totalDuration.set(total); 089 090 if (time < minDuration.get()) { 091 minDuration.set(time); 092 } else if (time > maxDuration.get()) { 093 maxDuration.set(time); 094 } 095 096 // update mean 097 long count = blockedCounter.get(); 098 long mean = count > 0 ? total / count : 0; 099 meanDuration.set(mean); 100 } 101 } 102 } 103 104 @Override 105 public void countDown(Exchange exchange, CountDownLatch latch) { 106 LOG.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId()); 107 latch.countDown(); 108 } 109 110 @Override 111 public int size() { 112 return inflight.size(); 113 } 114 115 @Override 116 public Collection<AwaitThread> browse() { 117 return Collections.unmodifiableCollection(inflight.values()); 118 } 119 120 @Override 121 public void interrupt(String exchangeId) { 122 // need to find the exchange with the given exchange id 123 Exchange found = null; 124 for (AsyncProcessorAwaitManager.AwaitThread entry : browse()) { 125 Exchange exchange = entry.getExchange(); 126 if (exchangeId.equals(exchange.getExchangeId())) { 127 found = exchange; 128 break; 129 } 130 } 131 132 if (found != null) { 133 interrupt(found); 134 } 135 } 136 137 @Override 138 public void interrupt(Exchange exchange) { 139 AwaitThreadEntry entry = (AwaitThreadEntry) inflight.get(exchange); 140 if (entry != null) { 141 try { 142 StringBuilder sb = new StringBuilder(); 143 sb.append("Interrupted while waiting for asynchronous callback, will release the following blocked thread which was waiting for exchange to finish processing with exchangeId: "); 144 sb.append(exchange.getExchangeId()); 145 sb.append("\n"); 146 147 sb.append(dumpBlockedThread(entry)); 148 149 // dump a route stack trace of the exchange 150 String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, exchangeFormatter, false); 151 if (routeStackTrace != null) { 152 sb.append(routeStackTrace); 153 } 154 LOG.warn(sb.toString()); 155 156 } catch (Exception e) { 157 throw ObjectHelper.wrapRuntimeCamelException(e); 158 } finally { 159 if (statistics.isStatisticsEnabled()) { 160 interruptedCounter.incrementAndGet(); 161 } 162 exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId())); 163 entry.getLatch().countDown(); 164 } 165 } 166 } 167 168 public boolean isInterruptThreadsWhileStopping() { 169 return interruptThreadsWhileStopping; 170 } 171 172 public void setInterruptThreadsWhileStopping(boolean interruptThreadsWhileStopping) { 173 this.interruptThreadsWhileStopping = interruptThreadsWhileStopping; 174 } 175 176 public Statistics getStatistics() { 177 return statistics; 178 } 179 180 @Override 181 protected void doStart() throws Exception { 182 // noop 183 } 184 185 @Override 186 protected void doStop() throws Exception { 187 Collection<AwaitThread> threads = browse(); 188 int count = threads.size(); 189 if (count > 0) { 190 LOG.warn("Shutting down while there are still " + count + " inflight threads currently blocked."); 191 192 StringBuilder sb = new StringBuilder(); 193 for (AwaitThread entry : threads) { 194 sb.append(dumpBlockedThread(entry)); 195 } 196 197 if (isInterruptThreadsWhileStopping()) { 198 LOG.warn("The following threads are blocked and will be interrupted so the threads are released:\n" + sb.toString()); 199 for (AwaitThread entry : threads) { 200 try { 201 interrupt(entry.getExchange()); 202 } catch (Throwable e) { 203 LOG.warn("Error while interrupting thread: " + entry.getBlockedThread().getName() + ". This exception is ignored.", e); 204 } 205 } 206 } else { 207 LOG.warn("The following threads are blocked, and may reside in the JVM:\n" + sb.toString()); 208 } 209 } else { 210 LOG.debug("Shutting down with no inflight threads."); 211 } 212 213 inflight.clear(); 214 } 215 216 private static String dumpBlockedThread(AwaitThread entry) { 217 StringBuilder sb = new StringBuilder(); 218 sb.append("\n"); 219 sb.append("Blocked Thread\n"); 220 sb.append("---------------------------------------------------------------------------------------------------------------------------------------\n"); 221 222 sb.append(style("Id:")).append(entry.getBlockedThread().getId()).append("\n"); 223 sb.append(style("Name:")).append(entry.getBlockedThread().getName()).append("\n"); 224 sb.append(style("RouteId:")).append(safeNull(entry.getRouteId())).append("\n"); 225 sb.append(style("NodeId:")).append(safeNull(entry.getNodeId())).append("\n"); 226 sb.append(style("Duration:")).append(entry.getWaitDuration()).append(" msec.\n"); 227 return sb.toString(); 228 } 229 230 private static String style(String label) { 231 return String.format("\t%-20s", label); 232 } 233 234 private static String safeNull(Object value) { 235 return value != null ? value.toString() : ""; 236 } 237 238 private static final class AwaitThreadEntry implements AwaitThread { 239 private final Thread thread; 240 private final Exchange exchange; 241 private final CountDownLatch latch; 242 private final long start; 243 private String routeId; 244 private String nodeId; 245 246 private AwaitThreadEntry(Thread thread, Exchange exchange, CountDownLatch latch) { 247 this.thread = thread; 248 this.exchange = exchange; 249 this.latch = latch; 250 this.start = System.currentTimeMillis(); 251 252 // capture details from message history if enabled 253 List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class); 254 if (list != null && !list.isEmpty()) { 255 // grab last part 256 MessageHistory history = list.get(list.size() - 1); 257 routeId = history.getRouteId(); 258 nodeId = history.getNode() != null ? history.getNode().getId() : null; 259 } 260 } 261 262 @Override 263 public Thread getBlockedThread() { 264 return thread; 265 } 266 267 @Override 268 public Exchange getExchange() { 269 return exchange; 270 } 271 272 @Override 273 public long getWaitDuration() { 274 return System.currentTimeMillis() - start; 275 } 276 277 @Override 278 public String getRouteId() { 279 return routeId; 280 } 281 282 @Override 283 public String getNodeId() { 284 return nodeId; 285 } 286 287 public CountDownLatch getLatch() { 288 return latch; 289 } 290 291 @Override 292 public String toString() { 293 return "AwaitThreadEntry[name=" + thread.getName() + ", exchangeId=" + exchange.getExchangeId() + "]"; 294 } 295 } 296 297 /** 298 * Represents utilization statistics 299 */ 300 private final class UtilizationStatistics implements AsyncProcessorAwaitManager.Statistics { 301 302 private boolean statisticsEnabled; 303 304 @Override 305 public long getThreadsBlocked() { 306 return blockedCounter.get(); 307 } 308 309 @Override 310 public long getThreadsInterrupted() { 311 return interruptedCounter.get(); 312 } 313 314 @Override 315 public long getTotalDuration() { 316 return totalDuration.get(); 317 } 318 319 @Override 320 public long getMinDuration() { 321 return minDuration.get(); 322 } 323 324 @Override 325 public long getMaxDuration() { 326 return maxDuration.get(); 327 } 328 329 @Override 330 public long getMeanDuration() { 331 return meanDuration.get(); 332 } 333 334 @Override 335 public void reset() { 336 blockedCounter.set(0); 337 interruptedCounter.set(0); 338 totalDuration.set(0); 339 minDuration.set(0); 340 maxDuration.set(0); 341 meanDuration.set(0); 342 } 343 344 @Override 345 public boolean isStatisticsEnabled() { 346 return statisticsEnabled; 347 } 348 349 @Override 350 public void setStatisticsEnabled(boolean statisticsEnabled) { 351 this.statisticsEnabled = statisticsEnabled; 352 } 353 354 @Override 355 public String toString() { 356 return String.format("AsyncProcessAwaitManager utilization[blocked=%s, interrupted=%s, total=%s min=%s, max=%s, mean=%s]", 357 getThreadsBlocked(), getThreadsInterrupted(), getTotalDuration(), getMinDuration(), getMaxDuration(), getMeanDuration()); 358 } 359 } 360 361}