001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.Collections; 022import java.util.Comparator; 023import java.util.Date; 024import java.util.List; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.atomic.AtomicInteger; 028 029import org.apache.camel.Endpoint; 030import org.apache.camel.Exchange; 031import org.apache.camel.MessageHistory; 032import org.apache.camel.spi.InflightRepository; 033import org.apache.camel.support.ServiceSupport; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** 038 * Default {@link org.apache.camel.spi.InflightRepository}. 039 * 040 * @version 041 */ 042public class DefaultInflightRepository extends ServiceSupport implements InflightRepository { 043 044 private static final Logger LOG = LoggerFactory.getLogger(DefaultInflightRepository.class); 045 private final ConcurrentMap<String, Exchange> inflight = new ConcurrentHashMap<String, Exchange>(); 046 private final ConcurrentMap<String, AtomicInteger> routeCount = new ConcurrentHashMap<String, AtomicInteger>(); 047 048 public void add(Exchange exchange) { 049 inflight.put(exchange.getExchangeId(), exchange); 050 } 051 052 public void remove(Exchange exchange) { 053 inflight.remove(exchange.getExchangeId()); 054 } 055 056 public void add(Exchange exchange, String routeId) { 057 AtomicInteger existing = routeCount.putIfAbsent(routeId, new AtomicInteger(1)); 058 if (existing != null) { 059 existing.incrementAndGet(); 060 } 061 } 062 063 public void remove(Exchange exchange, String routeId) { 064 AtomicInteger existing = routeCount.get(routeId); 065 if (existing != null) { 066 existing.decrementAndGet(); 067 } 068 } 069 070 public int size() { 071 return inflight.size(); 072 } 073 074 @Deprecated 075 public int size(Endpoint endpoint) { 076 return 0; 077 } 078 079 @Override 080 public void removeRoute(String routeId) { 081 routeCount.remove(routeId); 082 } 083 084 @Override 085 public int size(String routeId) { 086 AtomicInteger existing = routeCount.get(routeId); 087 return existing != null ? existing.get() : 0; 088 } 089 090 @Override 091 public Collection<InflightExchange> browse() { 092 return browse(-1, false); 093 } 094 095 @Override 096 public Collection<InflightExchange> browse(int limit, boolean sortByLongestDuration) { 097 List<InflightExchange> answer = new ArrayList<InflightExchange>(); 098 099 List<Exchange> values = new ArrayList<Exchange>(inflight.values()); 100 if (sortByLongestDuration) { 101 Collections.sort(values, new Comparator<Exchange>() { 102 @Override 103 public int compare(Exchange e1, Exchange e2) { 104 long d1 = getExchangeDuration(e1); 105 long d2 = getExchangeDuration(e2); 106 return Long.compare(d1, d2); 107 } 108 }); 109 } else { 110 // else sort by exchange id 111 Collections.sort(values, new Comparator<Exchange>() { 112 @Override 113 public int compare(Exchange e1, Exchange e2) { 114 return e1.getExchangeId().compareTo(e2.getExchangeId()); 115 } 116 }); 117 } 118 119 for (Exchange exchange : values) { 120 answer.add(new InflightExchangeEntry(exchange)); 121 if (limit > 0 && answer.size() >= limit) { 122 break; 123 } 124 } 125 return Collections.unmodifiableCollection(answer); 126 } 127 128 @Override 129 protected void doStart() throws Exception { 130 } 131 132 @Override 133 protected void doStop() throws Exception { 134 int count = size(); 135 if (count > 0) { 136 LOG.warn("Shutting down while there are still " + count + " inflight exchanges."); 137 } else { 138 LOG.debug("Shutting down with no inflight exchanges."); 139 } 140 routeCount.clear(); 141 } 142 143 private static long getExchangeDuration(Exchange exchange) { 144 long duration = 0; 145 Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, Date.class); 146 if (created != null) { 147 duration = System.currentTimeMillis() - created.getTime(); 148 } 149 return duration; 150 } 151 152 private static final class InflightExchangeEntry implements InflightExchange { 153 154 private final Exchange exchange; 155 156 private InflightExchangeEntry(Exchange exchange) { 157 this.exchange = exchange; 158 } 159 160 @Override 161 public Exchange getExchange() { 162 return exchange; 163 } 164 165 @Override 166 public long getDuration() { 167 return DefaultInflightRepository.getExchangeDuration(exchange); 168 } 169 170 @Override 171 @SuppressWarnings("unchecked") 172 public long getElapsed() { 173 List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class); 174 if (list == null || list.isEmpty()) { 175 return 0; 176 } 177 178 // get latest entry 179 MessageHistory history = list.get(list.size() - 1); 180 if (history != null) { 181 return history.getElapsed(); 182 } else { 183 return 0; 184 } 185 } 186 187 @Override 188 @SuppressWarnings("unchecked") 189 public String getNodeId() { 190 List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class); 191 if (list == null || list.isEmpty()) { 192 return null; 193 } 194 195 // get latest entry 196 MessageHistory history = list.get(list.size() - 1); 197 if (history != null) { 198 return history.getNode().getId(); 199 } else { 200 return null; 201 } 202 } 203 204 @Override 205 @SuppressWarnings("unchecked") 206 public String getRouteId() { 207 List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class); 208 if (list == null || list.isEmpty()) { 209 return null; 210 } 211 212 // get latest entry 213 MessageHistory history = list.get(list.size() - 1); 214 if (history != null) { 215 return history.getRouteId(); 216 } else { 217 return null; 218 } 219 } 220 221 @Override 222 public String toString() { 223 return "InflightExchangeEntry[exchangeId=" + exchange.getExchangeId() + "]"; 224 } 225 } 226 227}