001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.Collection;
020import java.util.Collections;
021import java.util.List;
022import java.util.Map;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.CountDownLatch;
025import java.util.concurrent.RejectedExecutionException;
026import java.util.concurrent.atomic.AtomicLong;
027
028import org.apache.camel.Exchange;
029import org.apache.camel.MessageHistory;
030import org.apache.camel.processor.DefaultExchangeFormatter;
031import org.apache.camel.spi.AsyncProcessorAwaitManager;
032import org.apache.camel.spi.ExchangeFormatter;
033import org.apache.camel.support.ServiceSupport;
034import org.apache.camel.util.MessageHelper;
035import org.apache.camel.util.ObjectHelper;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements AsyncProcessorAwaitManager {
040
041    private static final Logger LOG = LoggerFactory.getLogger(DefaultAsyncProcessorAwaitManager.class);
042
043    private final AsyncProcessorAwaitManager.Statistics statistics = new UtilizationStatistics();
044    private final AtomicLong blockedCounter = new AtomicLong();
045    private final AtomicLong interruptedCounter = new AtomicLong();
046    private final AtomicLong totalDuration = new AtomicLong();
047    private final AtomicLong minDuration = new AtomicLong();
048    private final AtomicLong maxDuration = new AtomicLong();
049    private final AtomicLong meanDuration = new AtomicLong();
050
051    private final Map<Exchange, AwaitThread> inflight = new ConcurrentHashMap<Exchange, AwaitThread>();
052    private final ExchangeFormatter exchangeFormatter;
053    private boolean interruptThreadsWhileStopping = true;
054
055    public DefaultAsyncProcessorAwaitManager() {
056        // setup exchange formatter to be used for message history dump
057        DefaultExchangeFormatter formatter = new DefaultExchangeFormatter();
058        formatter.setShowExchangeId(true);
059        formatter.setMultiline(true);
060        formatter.setShowHeaders(true);
061        formatter.setStyle(DefaultExchangeFormatter.OutputStyle.Fixed);
062        this.exchangeFormatter = formatter;
063    }
064
065    @Override
066    public void await(Exchange exchange, CountDownLatch latch) {
067        LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
068                exchange.getExchangeId(), exchange);
069        try {
070            if (statistics.isStatisticsEnabled()) {
071                blockedCounter.incrementAndGet();
072            }
073            inflight.put(exchange, new AwaitThreadEntry(Thread.currentThread(), exchange, latch));
074            latch.await();
075            LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}",
076                    exchange.getExchangeId(), exchange);
077
078        } catch (InterruptedException e) {
079            LOG.trace("Interrupted while waiting for callback, will continue routing exchangeId: {} -> {}",
080                    exchange.getExchangeId(), exchange);
081            exchange.setException(e);
082        } finally {
083            AwaitThread thread = inflight.remove(exchange);
084
085            if (statistics.isStatisticsEnabled() && thread != null) {
086                long time = thread.getWaitDuration();
087                long total = totalDuration.get() + time;
088                totalDuration.set(total);
089
090                if (time < minDuration.get()) {
091                    minDuration.set(time);
092                } else if (time > maxDuration.get()) {
093                    maxDuration.set(time);
094                }
095
096                // update mean
097                long count = blockedCounter.get();
098                long mean = count > 0 ? total / count : 0;
099                meanDuration.set(mean);
100            }
101        }
102    }
103
104    @Override
105    public void countDown(Exchange exchange, CountDownLatch latch) {
106        LOG.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId());
107        latch.countDown();
108    }
109
110    @Override
111    public int size() {
112        return inflight.size();
113    }
114
115    @Override
116    public Collection<AwaitThread> browse() {
117        return Collections.unmodifiableCollection(inflight.values());
118    }
119
120    @Override
121    public void interrupt(String exchangeId) {
122        // need to find the exchange with the given exchange id
123        Exchange found = null;
124        for (AsyncProcessorAwaitManager.AwaitThread entry : browse()) {
125            Exchange exchange = entry.getExchange();
126            if (exchangeId.equals(exchange.getExchangeId())) {
127                found = exchange;
128                break;
129            }
130        }
131
132        if (found != null) {
133            interrupt(found);
134        }
135    }
136
137    @Override
138    public void interrupt(Exchange exchange) {
139        AwaitThreadEntry entry = (AwaitThreadEntry) inflight.get(exchange);
140        if (entry != null) {
141            try {
142                StringBuilder sb = new StringBuilder();
143                sb.append("Interrupted while waiting for asynchronous callback, will release the following blocked thread which was waiting for exchange to finish processing with exchangeId: ");
144                sb.append(exchange.getExchangeId());
145                sb.append("\n");
146
147                sb.append(dumpBlockedThread(entry));
148
149                // dump a route stack trace of the exchange
150                String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, exchangeFormatter, false);
151                if (routeStackTrace != null) {
152                    sb.append(routeStackTrace);
153                }
154                LOG.warn(sb.toString());
155
156            } catch (Exception e) {
157                throw ObjectHelper.wrapRuntimeCamelException(e);
158            } finally {
159                if (statistics.isStatisticsEnabled()) {
160                    interruptedCounter.incrementAndGet();
161                }
162                exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId()));
163                entry.getLatch().countDown();
164            }
165        }
166    }
167
168    public boolean isInterruptThreadsWhileStopping() {
169        return interruptThreadsWhileStopping;
170    }
171
172    public void setInterruptThreadsWhileStopping(boolean interruptThreadsWhileStopping) {
173        this.interruptThreadsWhileStopping = interruptThreadsWhileStopping;
174    }
175
176    public Statistics getStatistics() {
177        return statistics;
178    }
179
180    @Override
181    protected void doStart() throws Exception {
182        // noop
183    }
184
185    @Override
186    protected void doStop() throws Exception {
187        Collection<AwaitThread> threads = browse();
188        int count = threads.size();
189        if (count > 0) {
190            LOG.warn("Shutting down while there are still " + count + " inflight threads currently blocked.");
191
192            StringBuilder sb = new StringBuilder();
193            for (AwaitThread entry : threads) {
194                sb.append(dumpBlockedThread(entry));
195            }
196
197            if (isInterruptThreadsWhileStopping()) {
198                LOG.warn("The following threads are blocked and will be interrupted so the threads are released:\n" + sb.toString());
199                for (AwaitThread entry : threads) {
200                    try {
201                        interrupt(entry.getExchange());
202                    } catch (Throwable e) {
203                        LOG.warn("Error while interrupting thread: " + entry.getBlockedThread().getName() + ". This exception is ignored.", e);
204                    }
205                }
206            } else {
207                LOG.warn("The following threads are blocked, and may reside in the JVM:\n" + sb.toString());
208            }
209        } else {
210            LOG.debug("Shutting down with no inflight threads.");
211        }
212
213        inflight.clear();
214    }
215
216    private static String dumpBlockedThread(AwaitThread entry) {
217        StringBuilder sb = new StringBuilder();
218        sb.append("\n");
219        sb.append("Blocked Thread\n");
220        sb.append("---------------------------------------------------------------------------------------------------------------------------------------\n");
221
222        sb.append(style("Id:")).append(entry.getBlockedThread().getId()).append("\n");
223        sb.append(style("Name:")).append(entry.getBlockedThread().getName()).append("\n");
224        sb.append(style("RouteId:")).append(safeNull(entry.getRouteId())).append("\n");
225        sb.append(style("NodeId:")).append(safeNull(entry.getNodeId())).append("\n");
226        sb.append(style("Duration:")).append(entry.getWaitDuration()).append(" msec.\n");
227        return sb.toString();
228    }
229
230    private static String style(String label) {
231        return String.format("\t%-20s", label);
232    }
233
234    private static String safeNull(Object value) {
235        return value != null ? value.toString() : "";
236    }
237
238    private static final class AwaitThreadEntry implements AwaitThread {
239        private final Thread thread;
240        private final Exchange exchange;
241        private final CountDownLatch latch;
242        private final long start;
243        private String routeId;
244        private String nodeId;
245
246        private AwaitThreadEntry(Thread thread, Exchange exchange, CountDownLatch latch) {
247            this.thread = thread;
248            this.exchange = exchange;
249            this.latch = latch;
250            this.start = System.currentTimeMillis();
251
252            // capture details from message history if enabled
253            List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
254            if (list != null && !list.isEmpty()) {
255                // grab last part
256                MessageHistory history = list.get(list.size() - 1);
257                routeId = history.getRouteId();
258                nodeId = history.getNode() != null ? history.getNode().getId() : null;
259            }
260        }
261
262        @Override
263        public Thread getBlockedThread() {
264            return thread;
265        }
266
267        @Override
268        public Exchange getExchange() {
269            return exchange;
270        }
271
272        @Override
273        public long getWaitDuration() {
274            return System.currentTimeMillis() - start;
275        }
276
277        @Override
278        public String getRouteId() {
279            return routeId;
280        }
281
282        @Override
283        public String getNodeId() {
284            return nodeId;
285        }
286
287        public CountDownLatch getLatch() {
288            return latch;
289        }
290
291        @Override
292        public String toString() {
293            return "AwaitThreadEntry[name=" + thread.getName() + ", exchangeId=" + exchange.getExchangeId() + "]";
294        }
295    }
296
297    /**
298     * Represents utilization statistics
299     */
300    private final class UtilizationStatistics implements AsyncProcessorAwaitManager.Statistics {
301
302        private boolean statisticsEnabled;
303
304        @Override
305        public long getThreadsBlocked() {
306            return blockedCounter.get();
307        }
308
309        @Override
310        public long getThreadsInterrupted() {
311            return interruptedCounter.get();
312        }
313
314        @Override
315        public long getTotalDuration() {
316            return totalDuration.get();
317        }
318
319        @Override
320        public long getMinDuration() {
321            return minDuration.get();
322        }
323
324        @Override
325        public long getMaxDuration() {
326            return maxDuration.get();
327        }
328
329        @Override
330        public long getMeanDuration() {
331            return meanDuration.get();
332        }
333
334        @Override
335        public void reset() {
336            blockedCounter.set(0);
337            interruptedCounter.set(0);
338            totalDuration.set(0);
339            minDuration.set(0);
340            maxDuration.set(0);
341            meanDuration.set(0);
342        }
343
344        @Override
345        public boolean isStatisticsEnabled() {
346            return statisticsEnabled;
347        }
348
349        @Override
350        public void setStatisticsEnabled(boolean statisticsEnabled) {
351            this.statisticsEnabled = statisticsEnabled;
352        }
353
354        @Override
355        public String toString() {
356            return String.format("AsyncProcessAwaitManager utilization[blocked=%s, interrupted=%s, total=%s min=%s, max=%s, mean=%s]",
357                    getThreadsBlocked(), getThreadsInterrupted(), getTotalDuration(), getMinDuration(), getMaxDuration(), getMeanDuration());
358        }
359    }
360
361}