001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.Collections; 022import java.util.Comparator; 023import java.util.Date; 024import java.util.LinkedList; 025import java.util.List; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.concurrent.atomic.AtomicInteger; 029 030import org.apache.camel.Endpoint; 031import org.apache.camel.Exchange; 032import org.apache.camel.MessageHistory; 033import org.apache.camel.spi.InflightRepository; 034import org.apache.camel.support.ServiceSupport; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * Default {@link org.apache.camel.spi.InflightRepository}. 040 * 041 * @version 042 */ 043public class DefaultInflightRepository extends ServiceSupport implements InflightRepository { 044 045 private static final Logger LOG = LoggerFactory.getLogger(DefaultInflightRepository.class); 046 private final ConcurrentMap<String, Exchange> inflight = new ConcurrentHashMap<String, Exchange>(); 047 private final ConcurrentMap<String, AtomicInteger> routeCount = new ConcurrentHashMap<String, AtomicInteger>(); 048 049 public void add(Exchange exchange) { 050 inflight.put(exchange.getExchangeId(), exchange); 051 } 052 053 public void remove(Exchange exchange) { 054 inflight.remove(exchange.getExchangeId()); 055 } 056 057 public void add(Exchange exchange, String routeId) { 058 AtomicInteger existing = routeCount.putIfAbsent(routeId, new AtomicInteger(1)); 059 if (existing != null) { 060 existing.incrementAndGet(); 061 } 062 } 063 064 public void remove(Exchange exchange, String routeId) { 065 AtomicInteger existing = routeCount.get(routeId); 066 if (existing != null) { 067 existing.decrementAndGet(); 068 } 069 } 070 071 public int size() { 072 return inflight.size(); 073 } 074 075 @Deprecated 076 public int size(Endpoint endpoint) { 077 return 0; 078 } 079 080 @Override 081 public void removeRoute(String routeId) { 082 routeCount.remove(routeId); 083 } 084 085 @Override 086 public int size(String routeId) { 087 AtomicInteger existing = routeCount.get(routeId); 088 return existing != null ? existing.get() : 0; 089 } 090 091 @Override 092 public Collection<InflightExchange> browse() { 093 return browse(null, -1, false); 094 } 095 096 @Override 097 public Collection<InflightExchange> browse(String fromRouteId) { 098 return browse(fromRouteId, -1, false); 099 } 100 101 @Override 102 public Collection<InflightExchange> browse(int limit, boolean sortByLongestDuration) { 103 return browse(null, limit, sortByLongestDuration); 104 } 105 106 @Override 107 public Collection<InflightExchange> browse(String fromRouteId, int limit, boolean sortByLongestDuration) { 108 List<InflightExchange> answer = new ArrayList<InflightExchange>(); 109 110 List<Exchange> values; 111 if (fromRouteId == null) { 112 // all values 113 values = new ArrayList<Exchange>(inflight.values()); 114 } else { 115 // only if route match 116 values = new ArrayList<Exchange>(); 117 for (Exchange exchange : inflight.values()) { 118 String exchangeRouteId = exchange.getFromRouteId(); 119 if (fromRouteId.equals(exchangeRouteId)) { 120 values.add(exchange); 121 } 122 } 123 } 124 125 if (sortByLongestDuration) { 126 values.sort(new Comparator<Exchange>() { 127 @Override 128 public int compare(Exchange e1, Exchange e2) { 129 long d1 = getExchangeDuration(e1); 130 long d2 = getExchangeDuration(e2); 131 return Long.compare(d1, d2); 132 } 133 }); 134 } else { 135 // else sort by exchange id 136 values.sort(new Comparator<Exchange>() { 137 @Override 138 public int compare(Exchange e1, Exchange e2) { 139 return e1.getExchangeId().compareTo(e2.getExchangeId()); 140 } 141 }); 142 } 143 144 for (Exchange exchange : values) { 145 answer.add(new InflightExchangeEntry(exchange)); 146 if (limit > 0 && answer.size() >= limit) { 147 break; 148 } 149 } 150 return Collections.unmodifiableCollection(answer); 151 } 152 153 @Override 154 protected void doStart() throws Exception { 155 } 156 157 @Override 158 protected void doStop() throws Exception { 159 int count = size(); 160 if (count > 0) { 161 LOG.warn("Shutting down while there are still " + count + " inflight exchanges."); 162 } else { 163 LOG.debug("Shutting down with no inflight exchanges."); 164 } 165 routeCount.clear(); 166 } 167 168 private static long getExchangeDuration(Exchange exchange) { 169 long duration = 0; 170 Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, Date.class); 171 if (created != null) { 172 duration = System.currentTimeMillis() - created.getTime(); 173 } 174 return duration; 175 } 176 177 private static final class InflightExchangeEntry implements InflightExchange { 178 179 private final Exchange exchange; 180 181 private InflightExchangeEntry(Exchange exchange) { 182 this.exchange = exchange; 183 } 184 185 @Override 186 public Exchange getExchange() { 187 return exchange; 188 } 189 190 @Override 191 public long getDuration() { 192 return DefaultInflightRepository.getExchangeDuration(exchange); 193 } 194 195 @Override 196 @SuppressWarnings("unchecked") 197 public long getElapsed() { 198 LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class); 199 if (list == null || list.isEmpty()) { 200 return 0; 201 } 202 203 // get latest entry 204 MessageHistory history = list.getLast(); 205 if (history != null) { 206 return history.getElapsed(); 207 } else { 208 return 0; 209 } 210 } 211 212 @Override 213 @SuppressWarnings("unchecked") 214 public String getNodeId() { 215 LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class); 216 if (list == null || list.isEmpty()) { 217 return null; 218 } 219 220 // get latest entry 221 MessageHistory history = list.getLast(); 222 if (history != null) { 223 return history.getNode().getId(); 224 } else { 225 return null; 226 } 227 } 228 229 @Override 230 public String getFromRouteId() { 231 return exchange.getFromRouteId(); 232 } 233 234 @Override 235 public String getRouteId() { 236 return getAtRouteId(); 237 } 238 239 @Override 240 @SuppressWarnings("unchecked") 241 public String getAtRouteId() { 242 LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class); 243 if (list == null || list.isEmpty()) { 244 return null; 245 } 246 247 // get latest entry 248 MessageHistory history = list.getLast(); 249 if (history != null) { 250 return history.getRouteId(); 251 } else { 252 return null; 253 } 254 } 255 256 @Override 257 public String toString() { 258 return "InflightExchangeEntry[exchangeId=" + exchange.getExchangeId() + "]"; 259 } 260 } 261 262}