001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.ArrayList; 020import java.util.Collections; 021import java.util.EventObject; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027 028import org.apache.camel.CamelContext; 029import org.apache.camel.CamelContextAware; 030import org.apache.camel.Endpoint; 031import org.apache.camel.Exchange; 032import org.apache.camel.management.event.ExchangeCreatedEvent; 033import org.apache.camel.management.event.ExchangeSendingEvent; 034import org.apache.camel.management.event.RouteAddedEvent; 035import org.apache.camel.management.event.RouteRemovedEvent; 036import org.apache.camel.spi.EndpointUtilizationStatistics; 037import org.apache.camel.spi.RouteContext; 038import org.apache.camel.spi.RuntimeEndpointRegistry; 039import org.apache.camel.spi.UnitOfWork; 040import org.apache.camel.support.EventNotifierSupport; 041import org.apache.camel.util.LRUCache; 042import org.apache.camel.util.ObjectHelper; 043import org.apache.camel.util.ServiceHelper; 044 045public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport implements CamelContextAware, RuntimeEndpointRegistry { 046 047 private CamelContext camelContext; 048 049 // route id -> endpoint urls 050 private Map<String, Set<String>> inputs; 051 private Map<String, Map<String, String>> outputs; 052 private int limit = 1000; 053 private boolean enabled = true; 054 private volatile boolean extended; 055 private EndpointUtilizationStatistics inputUtilization; 056 private EndpointUtilizationStatistics outputUtilization; 057 058 public CamelContext getCamelContext() { 059 return camelContext; 060 } 061 062 public void setCamelContext(CamelContext camelContext) { 063 this.camelContext = camelContext; 064 } 065 066 public boolean isEnabled() { 067 return enabled; 068 } 069 070 public void setEnabled(boolean enabled) { 071 this.enabled = enabled; 072 } 073 074 @Override 075 public List<String> getAllEndpoints(boolean includeInputs) { 076 List<String> answer = new ArrayList<String>(); 077 if (includeInputs) { 078 for (Map.Entry<String, Set<String>> entry : inputs.entrySet()) { 079 answer.addAll(entry.getValue()); 080 } 081 } 082 for (Map.Entry<String, Map<String, String>> entry : outputs.entrySet()) { 083 answer.addAll(entry.getValue().keySet()); 084 } 085 return Collections.unmodifiableList(answer); 086 } 087 088 @Override 089 public List<String> getEndpointsPerRoute(String routeId, boolean includeInputs) { 090 List<String> answer = new ArrayList<String>(); 091 if (includeInputs) { 092 Set<String> uris = inputs.get(routeId); 093 if (uris != null) { 094 answer.addAll(uris); 095 } 096 } 097 Map<String, String> uris = outputs.get(routeId); 098 if (uris != null) { 099 answer.addAll(uris.keySet()); 100 } 101 return Collections.unmodifiableList(answer); 102 } 103 104 @Override 105 public List<Statistic> getEndpointStatistics() { 106 List<Statistic> answer = new ArrayList<Statistic>(); 107 108 // inputs 109 for (Map.Entry<String, Set<String>> entry : inputs.entrySet()) { 110 String routeId = entry.getKey(); 111 for (String uri : entry.getValue()) { 112 Long hits = 0L; 113 if (extended) { 114 String key = asUtilizationKey(routeId, uri); 115 if (key != null) { 116 hits = inputUtilization.getStatistics().get(key); 117 if (hits == null) { 118 hits = 0L; 119 } 120 } 121 } 122 answer.add(new EndpointRuntimeStatistics(uri, routeId, "in", hits)); 123 } 124 } 125 126 // outputs 127 for (Map.Entry<String, Map<String, String>> entry : outputs.entrySet()) { 128 String routeId = entry.getKey(); 129 for (String uri : entry.getValue().keySet()) { 130 Long hits = 0L; 131 if (extended) { 132 String key = asUtilizationKey(routeId, uri); 133 if (key != null) { 134 hits = outputUtilization.getStatistics().get(key); 135 if (hits == null) { 136 hits = 0L; 137 } 138 } 139 } 140 answer.add(new EndpointRuntimeStatistics(uri, routeId, "out", hits)); 141 } 142 } 143 144 return answer; 145 } 146 147 @Override 148 public int getLimit() { 149 return limit; 150 } 151 152 @Override 153 public void setLimit(int limit) { 154 this.limit = limit; 155 } 156 157 @Override 158 public void clear() { 159 inputs.clear(); 160 outputs.clear(); 161 reset(); 162 } 163 164 @Override 165 public void reset() { 166 // its safe to call clear as reset 167 if (inputUtilization != null) { 168 inputUtilization.clear(); 169 } 170 if (outputUtilization != null) { 171 outputUtilization.clear(); 172 } 173 } 174 175 @Override 176 public int size() { 177 int total = inputs.values().size(); 178 total += outputs.values().size(); 179 return total; 180 } 181 182 @Override 183 protected void doStart() throws Exception { 184 ObjectHelper.notNull(camelContext, "camelContext", this); 185 186 if (inputs == null) { 187 inputs = new HashMap<String, Set<String>>(); 188 } 189 if (outputs == null) { 190 outputs = new HashMap<String, Map<String, String>>(); 191 } 192 if (getCamelContext().getManagementStrategy().getManagementAgent() != null) { 193 Boolean isEnabled = getCamelContext().getManagementStrategy().getManagementAgent().getEndpointRuntimeStatisticsEnabled(); 194 boolean isExtended = getCamelContext().getManagementStrategy().getManagementAgent().getStatisticsLevel().isExtended(); 195 // extended mode is either if we use Extended statistics level or the option is explicit enabled 196 extended = isExtended || isEnabled != null && isEnabled; 197 } 198 if (extended) { 199 inputUtilization = new DefaultEndpointUtilizationStatistics(limit); 200 outputUtilization = new DefaultEndpointUtilizationStatistics(limit); 201 } 202 if (extended) { 203 log.info("Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: {})", limit); 204 } 205 ServiceHelper.startServices(inputUtilization, outputUtilization); 206 } 207 208 @Override 209 protected void doStop() throws Exception { 210 clear(); 211 ServiceHelper.stopServices(inputUtilization, outputUtilization); 212 } 213 214 @Override 215 public void notify(EventObject event) throws Exception { 216 if (event instanceof RouteAddedEvent) { 217 RouteAddedEvent rse = (RouteAddedEvent) event; 218 Endpoint endpoint = rse.getRoute().getEndpoint(); 219 String routeId = rse.getRoute().getId(); 220 221 // a HashSet is fine for inputs as we only have a limited number of those 222 Set<String> uris = new HashSet<String>(); 223 uris.add(endpoint.getEndpointUri()); 224 inputs.put(routeId, uris); 225 // use a LRUCache for outputs as we could potential have unlimited uris if dynamic routing is in use 226 // and therefore need to have the limit in use 227 outputs.put(routeId, new LRUCache<String, String>(limit)); 228 } else if (event instanceof RouteRemovedEvent) { 229 RouteRemovedEvent rse = (RouteRemovedEvent) event; 230 String routeId = rse.getRoute().getId(); 231 inputs.remove(routeId); 232 outputs.remove(routeId); 233 if (extended) { 234 String uri = rse.getRoute().getEndpoint().getEndpointUri(); 235 String key = asUtilizationKey(routeId, uri); 236 if (key != null) { 237 inputUtilization.remove(key); 238 } 239 } 240 } else if (extended && event instanceof ExchangeCreatedEvent) { 241 // we only capture details in extended mode 242 ExchangeCreatedEvent ece = (ExchangeCreatedEvent) event; 243 Endpoint endpoint = ece.getExchange().getFromEndpoint(); 244 if (endpoint != null) { 245 String routeId = ece.getExchange().getFromRouteId(); 246 String uri = endpoint.getEndpointUri(); 247 String key = asUtilizationKey(routeId, uri); 248 if (key != null) { 249 inputUtilization.onHit(key); 250 } 251 } 252 } else if (event instanceof ExchangeSendingEvent) { 253 ExchangeSendingEvent ese = (ExchangeSendingEvent) event; 254 Endpoint endpoint = ese.getEndpoint(); 255 String routeId = getRouteId(ese.getExchange()); 256 String uri = endpoint.getEndpointUri(); 257 258 Map<String, String> uris = outputs.get(routeId); 259 if (uris != null && !uris.containsKey(uri)) { 260 uris.put(uri, uri); 261 } 262 if (extended) { 263 String key = asUtilizationKey(routeId, uri); 264 if (key != null) { 265 outputUtilization.onHit(key); 266 } 267 } 268 } 269 } 270 271 private String getRouteId(Exchange exchange) { 272 String answer = null; 273 UnitOfWork uow = exchange.getUnitOfWork(); 274 RouteContext rc = uow != null ? uow.getRouteContext() : null; 275 if (rc != null) { 276 answer = rc.getRoute().getId(); 277 } 278 if (answer == null) { 279 // fallback and get from route id on the exchange 280 answer = exchange.getFromRouteId(); 281 } 282 return answer; 283 } 284 285 @Override 286 public boolean isEnabled(EventObject event) { 287 return enabled && event instanceof ExchangeCreatedEvent 288 || event instanceof ExchangeSendingEvent 289 || event instanceof RouteAddedEvent 290 || event instanceof RouteRemovedEvent; 291 } 292 293 private static String asUtilizationKey(String routeId, String uri) { 294 if (routeId == null || uri == null) { 295 return null; 296 } else { 297 return routeId + "|" + uri; 298 } 299 } 300 301 private static final class EndpointRuntimeStatistics implements Statistic { 302 303 private final String uri; 304 private final String routeId; 305 private final String direction; 306 private final long hits; 307 308 private EndpointRuntimeStatistics(String uri, String routeId, String direction, long hits) { 309 this.uri = uri; 310 this.routeId = routeId; 311 this.direction = direction; 312 this.hits = hits; 313 } 314 315 public String getUri() { 316 return uri; 317 } 318 319 public String getRouteId() { 320 return routeId; 321 } 322 323 public String getDirection() { 324 return direction; 325 } 326 327 public long getHits() { 328 return hits; 329 } 330 } 331}