001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.Collection; 020import java.util.Collections; 021import java.util.LinkedList; 022import java.util.Map; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.CountDownLatch; 025import java.util.concurrent.RejectedExecutionException; 026import java.util.concurrent.atomic.AtomicLong; 027 028import org.apache.camel.Exchange; 029import org.apache.camel.MessageHistory; 030import org.apache.camel.NamedNode; 031import org.apache.camel.processor.DefaultExchangeFormatter; 032import org.apache.camel.spi.AsyncProcessorAwaitManager; 033import org.apache.camel.spi.ExchangeFormatter; 034import org.apache.camel.support.ServiceSupport; 035import org.apache.camel.util.MessageHelper; 036import org.apache.camel.util.ObjectHelper; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements AsyncProcessorAwaitManager { 041 042 private static final Logger LOG = LoggerFactory.getLogger(DefaultAsyncProcessorAwaitManager.class); 043 044 private final AsyncProcessorAwaitManager.Statistics statistics = new UtilizationStatistics(); 045 private final AtomicLong blockedCounter = new AtomicLong(); 046 private final AtomicLong interruptedCounter = new AtomicLong(); 047 private final AtomicLong totalDuration = new AtomicLong(); 048 private final AtomicLong minDuration = new AtomicLong(); 049 private final AtomicLong maxDuration = new AtomicLong(); 050 private final AtomicLong meanDuration = new AtomicLong(); 051 052 private final Map<Exchange, AwaitThread> inflight = new ConcurrentHashMap<Exchange, AwaitThread>(); 053 private final ExchangeFormatter exchangeFormatter; 054 private boolean interruptThreadsWhileStopping = true; 055 056 public DefaultAsyncProcessorAwaitManager() { 057 // setup exchange formatter to be used for message history dump 058 DefaultExchangeFormatter formatter = new DefaultExchangeFormatter(); 059 formatter.setShowExchangeId(true); 060 formatter.setMultiline(true); 061 formatter.setShowHeaders(true); 062 formatter.setStyle(DefaultExchangeFormatter.OutputStyle.Fixed); 063 this.exchangeFormatter = formatter; 064 } 065 066 @Override 067 public void await(Exchange exchange, CountDownLatch latch) { 068 LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}", 069 exchange.getExchangeId(), exchange); 070 try { 071 if (statistics.isStatisticsEnabled()) { 072 blockedCounter.incrementAndGet(); 073 } 074 inflight.put(exchange, new AwaitThreadEntry(Thread.currentThread(), exchange, latch)); 075 latch.await(); 076 LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}", 077 exchange.getExchangeId(), exchange); 078 079 } catch (InterruptedException e) { 080 LOG.trace("Interrupted while waiting for callback, will continue routing exchangeId: {} -> {}", 081 exchange.getExchangeId(), exchange); 082 exchange.setException(e); 083 } finally { 084 AwaitThread thread = inflight.remove(exchange); 085 086 if (statistics.isStatisticsEnabled() && thread != null) { 087 long time = thread.getWaitDuration(); 088 long total = totalDuration.get() + time; 089 totalDuration.set(total); 090 091 if (time < minDuration.get()) { 092 minDuration.set(time); 093 } else if (time > maxDuration.get()) { 094 maxDuration.set(time); 095 } 096 097 // update mean 098 long count = blockedCounter.get(); 099 long mean = count > 0 ? total / count : 0; 100 meanDuration.set(mean); 101 } 102 } 103 } 104 105 @Override 106 public void countDown(Exchange exchange, CountDownLatch latch) { 107 LOG.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId()); 108 latch.countDown(); 109 } 110 111 @Override 112 public int size() { 113 return inflight.size(); 114 } 115 116 @Override 117 public Collection<AwaitThread> browse() { 118 return Collections.unmodifiableCollection(inflight.values()); 119 } 120 121 @Override 122 public void interrupt(String exchangeId) { 123 // need to find the exchange with the given exchange id 124 Exchange found = null; 125 for (AsyncProcessorAwaitManager.AwaitThread entry : browse()) { 126 Exchange exchange = entry.getExchange(); 127 if (exchangeId.equals(exchange.getExchangeId())) { 128 found = exchange; 129 break; 130 } 131 } 132 133 if (found != null) { 134 interrupt(found); 135 } 136 } 137 138 @Override 139 public void interrupt(Exchange exchange) { 140 AwaitThreadEntry entry = (AwaitThreadEntry) inflight.get(exchange); 141 if (entry != null) { 142 try { 143 StringBuilder sb = new StringBuilder(); 144 sb.append("Interrupted while waiting for asynchronous callback, will release the following blocked thread which was waiting for exchange to finish processing with exchangeId: "); 145 sb.append(exchange.getExchangeId()); 146 sb.append("\n"); 147 148 sb.append(dumpBlockedThread(entry)); 149 150 // dump a route stack trace of the exchange 151 String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, exchangeFormatter, false); 152 if (routeStackTrace != null) { 153 sb.append(routeStackTrace); 154 } 155 LOG.warn(sb.toString()); 156 157 } catch (Exception e) { 158 throw ObjectHelper.wrapRuntimeCamelException(e); 159 } finally { 160 if (statistics.isStatisticsEnabled()) { 161 interruptedCounter.incrementAndGet(); 162 } 163 exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId())); 164 entry.getLatch().countDown(); 165 } 166 } 167 } 168 169 public boolean isInterruptThreadsWhileStopping() { 170 return interruptThreadsWhileStopping; 171 } 172 173 public void setInterruptThreadsWhileStopping(boolean interruptThreadsWhileStopping) { 174 this.interruptThreadsWhileStopping = interruptThreadsWhileStopping; 175 } 176 177 public Statistics getStatistics() { 178 return statistics; 179 } 180 181 @Override 182 protected void doStart() throws Exception { 183 // noop 184 } 185 186 @Override 187 protected void doStop() throws Exception { 188 Collection<AwaitThread> threads = browse(); 189 int count = threads.size(); 190 if (count > 0) { 191 LOG.warn("Shutting down while there are still " + count + " inflight threads currently blocked."); 192 193 StringBuilder sb = new StringBuilder(); 194 for (AwaitThread entry : threads) { 195 sb.append(dumpBlockedThread(entry)); 196 } 197 198 if (isInterruptThreadsWhileStopping()) { 199 LOG.warn("The following threads are blocked and will be interrupted so the threads are released:\n" + sb.toString()); 200 for (AwaitThread entry : threads) { 201 try { 202 interrupt(entry.getExchange()); 203 } catch (Throwable e) { 204 LOG.warn("Error while interrupting thread: " + entry.getBlockedThread().getName() + ". This exception is ignored.", e); 205 } 206 } 207 } else { 208 LOG.warn("The following threads are blocked, and may reside in the JVM:\n" + sb.toString()); 209 } 210 } else { 211 LOG.debug("Shutting down with no inflight threads."); 212 } 213 214 inflight.clear(); 215 } 216 217 private static String dumpBlockedThread(AwaitThread entry) { 218 StringBuilder sb = new StringBuilder(); 219 sb.append("\n"); 220 sb.append("Blocked Thread\n"); 221 sb.append("---------------------------------------------------------------------------------------------------------------------------------------\n"); 222 223 sb.append(style("Id:")).append(entry.getBlockedThread().getId()).append("\n"); 224 sb.append(style("Name:")).append(entry.getBlockedThread().getName()).append("\n"); 225 sb.append(style("RouteId:")).append(safeNull(entry.getRouteId())).append("\n"); 226 sb.append(style("NodeId:")).append(safeNull(entry.getNodeId())).append("\n"); 227 sb.append(style("Duration:")).append(entry.getWaitDuration()).append(" msec.\n"); 228 return sb.toString(); 229 } 230 231 private static String style(String label) { 232 return String.format("\t%-20s", label); 233 } 234 235 private static String safeNull(Object value) { 236 return value != null ? value.toString() : ""; 237 } 238 239 private static final class AwaitThreadEntry implements AwaitThread { 240 private final Thread thread; 241 private final Exchange exchange; 242 private final CountDownLatch latch; 243 private final long start; 244 245 private AwaitThreadEntry(Thread thread, Exchange exchange, CountDownLatch latch) { 246 this.thread = thread; 247 this.exchange = exchange; 248 this.latch = latch; 249 this.start = System.currentTimeMillis(); 250 } 251 252 @Override 253 public Thread getBlockedThread() { 254 return thread; 255 } 256 257 @Override 258 public Exchange getExchange() { 259 return exchange; 260 } 261 262 @Override 263 public long getWaitDuration() { 264 return System.currentTimeMillis() - start; 265 } 266 267 @Override 268 public String getRouteId() { 269 MessageHistory lastMessageHistory = getLastMessageHistory(); 270 if (lastMessageHistory == null) { 271 return null; 272 } 273 return lastMessageHistory.getRouteId(); 274 } 275 276 @Override 277 public String getNodeId() { 278 NamedNode node = getNode(); 279 if (node == null) { 280 return null; 281 } 282 return node.getId(); 283 } 284 285 public CountDownLatch getLatch() { 286 return latch; 287 } 288 289 private NamedNode getNode() { 290 MessageHistory lastMessageHistory = getLastMessageHistory(); 291 if (lastMessageHistory == null) { 292 return null; 293 } 294 return lastMessageHistory.getNode(); 295 } 296 297 private MessageHistory getLastMessageHistory() { 298 LinkedList<MessageHistory> list = getMessageHistories(); 299 if (list == null || list.isEmpty()) { 300 return null; 301 } 302 return list.getLast(); 303 } 304 305 private LinkedList<MessageHistory> getMessageHistories() { 306 return exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class); 307 } 308 309 @Override 310 public String toString() { 311 return "AwaitThreadEntry[name=" + thread.getName() + ", exchangeId=" + exchange.getExchangeId() + "]"; 312 } 313 } 314 315 /** 316 * Represents utilization statistics 317 */ 318 private final class UtilizationStatistics implements AsyncProcessorAwaitManager.Statistics { 319 320 private boolean statisticsEnabled; 321 322 @Override 323 public long getThreadsBlocked() { 324 return blockedCounter.get(); 325 } 326 327 @Override 328 public long getThreadsInterrupted() { 329 return interruptedCounter.get(); 330 } 331 332 @Override 333 public long getTotalDuration() { 334 return totalDuration.get(); 335 } 336 337 @Override 338 public long getMinDuration() { 339 return minDuration.get(); 340 } 341 342 @Override 343 public long getMaxDuration() { 344 return maxDuration.get(); 345 } 346 347 @Override 348 public long getMeanDuration() { 349 return meanDuration.get(); 350 } 351 352 @Override 353 public void reset() { 354 blockedCounter.set(0); 355 interruptedCounter.set(0); 356 totalDuration.set(0); 357 minDuration.set(0); 358 maxDuration.set(0); 359 meanDuration.set(0); 360 } 361 362 @Override 363 public boolean isStatisticsEnabled() { 364 return statisticsEnabled; 365 } 366 367 @Override 368 public void setStatisticsEnabled(boolean statisticsEnabled) { 369 this.statisticsEnabled = statisticsEnabled; 370 } 371 372 @Override 373 public String toString() { 374 return String.format("AsyncProcessAwaitManager utilization[blocked=%s, interrupted=%s, total=%s min=%s, max=%s, mean=%s]", 375 getThreadsBlocked(), getThreadsInterrupted(), getTotalDuration(), getMinDuration(), getMaxDuration(), getMeanDuration()); 376 } 377 } 378 379}