001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.Collection;
020import java.util.Collections;
021import java.util.LinkedList;
022import java.util.Map;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.RejectedExecutionException;
026import java.util.concurrent.atomic.AtomicLong;
027
028import org.apache.camel.Exchange;
029import org.apache.camel.MessageHistory;
030import org.apache.camel.NamedNode;
031import org.apache.camel.processor.DefaultExchangeFormatter;
032import org.apache.camel.spi.AsyncProcessorAwaitManager;
033import org.apache.camel.spi.ExchangeFormatter;
034import org.apache.camel.support.ServiceSupport;
035import org.apache.camel.util.MessageHelper;
036import org.apache.camel.util.ObjectHelper;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements AsyncProcessorAwaitManager {
041
042    private static final Logger LOG = LoggerFactory.getLogger(DefaultAsyncProcessorAwaitManager.class);
043
044    private final AsyncProcessorAwaitManager.Statistics statistics = new UtilizationStatistics();
045    private final AtomicLong blockedCounter = new AtomicLong();
046    private final AtomicLong interruptedCounter = new AtomicLong();
047    private final AtomicLong totalDuration = new AtomicLong();
048    private final AtomicLong minDuration = new AtomicLong();
049    private final AtomicLong maxDuration = new AtomicLong();
050    private final AtomicLong meanDuration = new AtomicLong();
051
052    private final Map<Exchange, AwaitThread> inflight = new ConcurrentHashMap<Exchange, AwaitThread>();
053    private final ExchangeFormatter exchangeFormatter;
054    private boolean interruptThreadsWhileStopping = true;
055
056    public DefaultAsyncProcessorAwaitManager() {
057        // setup exchange formatter to be used for message history dump
058        DefaultExchangeFormatter formatter = new DefaultExchangeFormatter();
059        formatter.setShowExchangeId(true);
060        formatter.setMultiline(true);
061        formatter.setShowHeaders(true);
062        formatter.setStyle(DefaultExchangeFormatter.OutputStyle.Fixed);
063        this.exchangeFormatter = formatter;
064    }
065
066    @Override
067    public void await(Exchange exchange, CountDownLatch latch) {
068        LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
069                exchange.getExchangeId(), exchange);
070        try {
071            if (statistics.isStatisticsEnabled()) {
072                blockedCounter.incrementAndGet();
073            }
074            inflight.put(exchange, new AwaitThreadEntry(Thread.currentThread(), exchange, latch));
075            latch.await();
076            LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}",
077                    exchange.getExchangeId(), exchange);
078
079        } catch (InterruptedException e) {
080            LOG.trace("Interrupted while waiting for callback, will continue routing exchangeId: {} -> {}",
081                    exchange.getExchangeId(), exchange);
082            exchange.setException(e);
083        } finally {
084            AwaitThread thread = inflight.remove(exchange);
085
086            if (statistics.isStatisticsEnabled() && thread != null) {
087                long time = thread.getWaitDuration();
088                long total = totalDuration.get() + time;
089                totalDuration.set(total);
090
091                if (time < minDuration.get()) {
092                    minDuration.set(time);
093                } else if (time > maxDuration.get()) {
094                    maxDuration.set(time);
095                }
096
097                // update mean
098                long count = blockedCounter.get();
099                long mean = count > 0 ? total / count : 0;
100                meanDuration.set(mean);
101            }
102        }
103    }
104
105    @Override
106    public void countDown(Exchange exchange, CountDownLatch latch) {
107        LOG.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId());
108        latch.countDown();
109    }
110
111    @Override
112    public int size() {
113        return inflight.size();
114    }
115
116    @Override
117    public Collection<AwaitThread> browse() {
118        return Collections.unmodifiableCollection(inflight.values());
119    }
120
121    @Override
122    public void interrupt(String exchangeId) {
123        // need to find the exchange with the given exchange id
124        Exchange found = null;
125        for (AsyncProcessorAwaitManager.AwaitThread entry : browse()) {
126            Exchange exchange = entry.getExchange();
127            if (exchangeId.equals(exchange.getExchangeId())) {
128                found = exchange;
129                break;
130            }
131        }
132
133        if (found != null) {
134            interrupt(found);
135        }
136    }
137
138    @Override
139    public void interrupt(Exchange exchange) {
140        AwaitThreadEntry entry = (AwaitThreadEntry) inflight.get(exchange);
141        if (entry != null) {
142            try {
143                StringBuilder sb = new StringBuilder();
144                sb.append("Interrupted while waiting for asynchronous callback, will release the following blocked thread which was waiting for exchange to finish processing with exchangeId: ");
145                sb.append(exchange.getExchangeId());
146                sb.append("\n");
147
148                sb.append(dumpBlockedThread(entry));
149
150                // dump a route stack trace of the exchange
151                String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, exchangeFormatter, false);
152                if (routeStackTrace != null) {
153                    sb.append(routeStackTrace);
154                }
155                LOG.warn(sb.toString());
156
157            } catch (Exception e) {
158                throw ObjectHelper.wrapRuntimeCamelException(e);
159            } finally {
160                if (statistics.isStatisticsEnabled()) {
161                    interruptedCounter.incrementAndGet();
162                }
163                exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId()));
164                entry.getLatch().countDown();
165            }
166        }
167    }
168
169    public boolean isInterruptThreadsWhileStopping() {
170        return interruptThreadsWhileStopping;
171    }
172
173    public void setInterruptThreadsWhileStopping(boolean interruptThreadsWhileStopping) {
174        this.interruptThreadsWhileStopping = interruptThreadsWhileStopping;
175    }
176
177    public Statistics getStatistics() {
178        return statistics;
179    }
180
181    @Override
182    protected void doStart() throws Exception {
183        // noop
184    }
185
186    @Override
187    protected void doStop() throws Exception {
188        Collection<AwaitThread> threads = browse();
189        int count = threads.size();
190        if (count > 0) {
191            LOG.warn("Shutting down while there are still {} inflight threads currently blocked.", count);
192
193            StringBuilder sb = new StringBuilder();
194            for (AwaitThread entry : threads) {
195                sb.append(dumpBlockedThread(entry));
196            }
197
198            if (isInterruptThreadsWhileStopping()) {
199                LOG.warn("The following threads are blocked and will be interrupted so the threads are released:\n" + sb.toString());
200                for (AwaitThread entry : threads) {
201                    try {
202                        interrupt(entry.getExchange());
203                    } catch (Throwable e) {
204                        LOG.warn("Error while interrupting thread: " + entry.getBlockedThread().getName() + ". This exception is ignored.", e);
205                    }
206                }
207            } else {
208                LOG.warn("The following threads are blocked, and may reside in the JVM:\n" + sb.toString());
209            }
210        } else {
211            LOG.debug("Shutting down with no inflight threads.");
212        }
213
214        inflight.clear();
215    }
216
217    private static String dumpBlockedThread(AwaitThread entry) {
218        StringBuilder sb = new StringBuilder();
219        sb.append("\n");
220        sb.append("Blocked Thread\n");
221        sb.append("---------------------------------------------------------------------------------------------------------------------------------------\n");
222
223        sb.append(style("Id:")).append(entry.getBlockedThread().getId()).append("\n");
224        sb.append(style("Name:")).append(entry.getBlockedThread().getName()).append("\n");
225        sb.append(style("RouteId:")).append(safeNull(entry.getRouteId())).append("\n");
226        sb.append(style("NodeId:")).append(safeNull(entry.getNodeId())).append("\n");
227        sb.append(style("Duration:")).append(entry.getWaitDuration()).append(" msec.\n");
228        return sb.toString();
229    }
230
231    private static String style(String label) {
232        return String.format("\t%-20s", label);
233    }
234
235    private static String safeNull(Object value) {
236        return value != null ? value.toString() : "";
237    }
238
239    private static final class AwaitThreadEntry implements AwaitThread {
240        private final Thread thread;
241        private final Exchange exchange;
242        private final CountDownLatch latch;
243        private final long start;
244
245        private AwaitThreadEntry(Thread thread, Exchange exchange, CountDownLatch latch) {
246            this.thread = thread;
247            this.exchange = exchange;
248            this.latch = latch;
249            this.start = System.currentTimeMillis();
250        }
251
252        @Override
253        public Thread getBlockedThread() {
254            return thread;
255        }
256
257        @Override
258        public Exchange getExchange() {
259            return exchange;
260        }
261
262        @Override
263        public long getWaitDuration() {
264            return System.currentTimeMillis() - start;
265        }
266
267        @Override
268        public String getRouteId() {
269            MessageHistory lastMessageHistory = getLastMessageHistory();
270            if (lastMessageHistory == null) {
271                return null;
272            }
273            return lastMessageHistory.getRouteId();
274        }
275
276        @Override
277        public String getNodeId() {
278            NamedNode node = getNode();
279            if (node == null) {
280                return null;
281            }
282            return node.getId();
283        }
284
285        public CountDownLatch getLatch() {
286            return latch;
287        }
288
289        private NamedNode getNode() {
290            MessageHistory lastMessageHistory = getLastMessageHistory();
291            if (lastMessageHistory == null) {
292                return null;
293            }
294            return lastMessageHistory.getNode();
295        }
296
297        private MessageHistory getLastMessageHistory() {
298            LinkedList<MessageHistory> list = getMessageHistories();
299            if (list == null || list.isEmpty()) {
300                return null;
301            }
302            return list.getLast();
303        }
304
305        private LinkedList<MessageHistory> getMessageHistories() {
306            return exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class);
307        }
308
309        @Override
310        public String toString() {
311            return "AwaitThreadEntry[name=" + thread.getName() + ", exchangeId=" + exchange.getExchangeId() + "]";
312        }
313    }
314
315    /**
316     * Represents utilization statistics
317     */
318    private final class UtilizationStatistics implements AsyncProcessorAwaitManager.Statistics {
319
320        private boolean statisticsEnabled;
321
322        @Override
323        public long getThreadsBlocked() {
324            return blockedCounter.get();
325        }
326
327        @Override
328        public long getThreadsInterrupted() {
329            return interruptedCounter.get();
330        }
331
332        @Override
333        public long getTotalDuration() {
334            return totalDuration.get();
335        }
336
337        @Override
338        public long getMinDuration() {
339            return minDuration.get();
340        }
341
342        @Override
343        public long getMaxDuration() {
344            return maxDuration.get();
345        }
346
347        @Override
348        public long getMeanDuration() {
349            return meanDuration.get();
350        }
351
352        @Override
353        public void reset() {
354            blockedCounter.set(0);
355            interruptedCounter.set(0);
356            totalDuration.set(0);
357            minDuration.set(0);
358            maxDuration.set(0);
359            meanDuration.set(0);
360        }
361
362        @Override
363        public boolean isStatisticsEnabled() {
364            return statisticsEnabled;
365        }
366
367        @Override
368        public void setStatisticsEnabled(boolean statisticsEnabled) {
369            this.statisticsEnabled = statisticsEnabled;
370        }
371
372        @Override
373        public String toString() {
374            return String.format("AsyncProcessAwaitManager utilization[blocked=%s, interrupted=%s, total=%s min=%s, max=%s, mean=%s]",
375                    getThreadsBlocked(), getThreadsInterrupted(), getTotalDuration(), getMinDuration(), getMaxDuration(), getMeanDuration());
376        }
377    }
378
379}