001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.Collections;
022import java.util.Comparator;
023import java.util.Date;
024import java.util.LinkedList;
025import java.util.List;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.concurrent.atomic.AtomicInteger;
029
030import org.apache.camel.Endpoint;
031import org.apache.camel.Exchange;
032import org.apache.camel.MessageHistory;
033import org.apache.camel.spi.InflightRepository;
034import org.apache.camel.support.ServiceSupport;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * Default {@link org.apache.camel.spi.InflightRepository}.
040 *
041 * @version 
042 */
043public class DefaultInflightRepository extends ServiceSupport implements InflightRepository {
044
045    private static final Logger LOG = LoggerFactory.getLogger(DefaultInflightRepository.class);
046    private final ConcurrentMap<String, Exchange> inflight = new ConcurrentHashMap<String, Exchange>();
047    private final ConcurrentMap<String, AtomicInteger> routeCount = new ConcurrentHashMap<String, AtomicInteger>();
048
049    public void add(Exchange exchange) {
050        inflight.put(exchange.getExchangeId(), exchange);
051    }
052
053    public void remove(Exchange exchange) {
054        inflight.remove(exchange.getExchangeId());
055    }
056
057    public void add(Exchange exchange, String routeId) {
058        AtomicInteger existing = routeCount.putIfAbsent(routeId, new AtomicInteger(1));
059        if (existing != null) {
060            existing.incrementAndGet();
061        }
062    }
063
064    public void remove(Exchange exchange, String routeId) {
065        AtomicInteger existing = routeCount.get(routeId);
066        if (existing != null) {
067            existing.decrementAndGet();
068        }
069    }
070
071    public int size() {
072        return inflight.size();
073    }
074
075    @Deprecated
076    public int size(Endpoint endpoint) {
077        return 0;
078    }
079
080    @Override
081    public void removeRoute(String routeId) {
082        routeCount.remove(routeId);
083    }
084
085    @Override
086    public int size(String routeId) {
087        AtomicInteger existing = routeCount.get(routeId);
088        return existing != null ? existing.get() : 0;
089    }
090
091    @Override
092    public Collection<InflightExchange> browse() {
093        return browse(null, -1, false);
094    }
095
096    @Override
097    public Collection<InflightExchange> browse(String fromRouteId) {
098        return browse(fromRouteId, -1, false);
099    }
100
101    @Override
102    public Collection<InflightExchange> browse(int limit, boolean sortByLongestDuration) {
103        return browse(null, limit, sortByLongestDuration);
104    }
105
106    @Override
107    public Collection<InflightExchange> browse(String fromRouteId, int limit, boolean sortByLongestDuration) {
108        List<InflightExchange> answer = new ArrayList<InflightExchange>();
109
110        List<Exchange> values;
111        if (fromRouteId == null) {
112            // all values
113            values = new ArrayList<Exchange>(inflight.values());
114        } else {
115            // only if route match
116            values = new ArrayList<Exchange>();
117            for (Exchange exchange : inflight.values()) {
118                String exchangeRouteId = exchange.getFromRouteId();
119                if (fromRouteId.equals(exchangeRouteId)) {
120                    values.add(exchange);
121                }
122            }
123        }
124
125        if (sortByLongestDuration) {
126            values.sort(new Comparator<Exchange>() {
127                @Override
128                public int compare(Exchange e1, Exchange e2) {
129                    long d1 = getExchangeDuration(e1);
130                    long d2 = getExchangeDuration(e2);
131                    return Long.compare(d1, d2);
132                }
133            });
134        } else {
135            // else sort by exchange id
136            values.sort(new Comparator<Exchange>() {
137                @Override
138                public int compare(Exchange e1, Exchange e2) {
139                    return e1.getExchangeId().compareTo(e2.getExchangeId());
140                }
141            });
142        }
143
144        for (Exchange exchange : values) {
145            answer.add(new InflightExchangeEntry(exchange));
146            if (limit > 0 && answer.size() >= limit) {
147                break;
148            }
149        }
150        return Collections.unmodifiableCollection(answer);
151    }
152
153    @Override
154    protected void doStart() throws Exception {
155    }
156
157    @Override
158    protected void doStop() throws Exception {
159        int count = size();
160        if (count > 0) {
161            LOG.warn("Shutting down while there are still " + count + " inflight exchanges.");
162        } else {
163            LOG.debug("Shutting down with no inflight exchanges.");
164        }
165        routeCount.clear();
166    }
167
168    private static long getExchangeDuration(Exchange exchange) {
169        long duration = 0;
170        Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, Date.class);
171        if (created != null) {
172            duration = System.currentTimeMillis() - created.getTime();
173        }
174        return duration;
175    }
176
177    private static final class InflightExchangeEntry implements InflightExchange {
178
179        private final Exchange exchange;
180
181        private InflightExchangeEntry(Exchange exchange) {
182            this.exchange = exchange;
183        }
184
185        @Override
186        public Exchange getExchange() {
187            return exchange;
188        }
189
190        @Override
191        public long getDuration() {
192            return DefaultInflightRepository.getExchangeDuration(exchange);
193        }
194
195        @Override
196        @SuppressWarnings("unchecked")
197        public long getElapsed() {
198            LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class);
199            if (list == null || list.isEmpty()) {
200                return 0;
201            }
202
203            // get latest entry
204            MessageHistory history = list.getLast();
205            if (history != null) {
206                return history.getElapsed();
207            } else {
208                return 0;
209            }
210        }
211
212        @Override
213        @SuppressWarnings("unchecked")
214        public String getNodeId() {
215            LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class);
216            if (list == null || list.isEmpty()) {
217                return null;
218            }
219
220            // get latest entry
221            MessageHistory history = list.getLast();
222            if (history != null) {
223                return history.getNode().getId();
224            } else {
225                return null;
226            }
227        }
228
229        @Override
230        public String getFromRouteId() {
231            return exchange.getFromRouteId();
232        }
233
234        @Override
235        public String getRouteId() {
236            return getAtRouteId();
237        }
238
239        @Override
240        @SuppressWarnings("unchecked")
241        public String getAtRouteId() {
242            LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class);
243            if (list == null || list.isEmpty()) {
244                return null;
245            }
246
247            // get latest entry
248            MessageHistory history = list.getLast();
249            if (history != null) {
250                return history.getRouteId();
251            } else {
252                return null;
253            }
254        }
255
256        @Override
257        public String toString() {
258            return "InflightExchangeEntry[exchangeId=" + exchange.getExchangeId() + "]";
259        }
260    }
261
262}