001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.ArrayList;
020import java.util.Collections;
021import java.util.EventObject;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027
028import org.apache.camel.CamelContext;
029import org.apache.camel.CamelContextAware;
030import org.apache.camel.Endpoint;
031import org.apache.camel.Exchange;
032import org.apache.camel.management.event.ExchangeCreatedEvent;
033import org.apache.camel.management.event.ExchangeSendingEvent;
034import org.apache.camel.management.event.RouteAddedEvent;
035import org.apache.camel.management.event.RouteRemovedEvent;
036import org.apache.camel.spi.EndpointUtilizationStatistics;
037import org.apache.camel.spi.RouteContext;
038import org.apache.camel.spi.RuntimeEndpointRegistry;
039import org.apache.camel.spi.UnitOfWork;
040import org.apache.camel.support.EventNotifierSupport;
041import org.apache.camel.util.LRUCacheFactory;
042import org.apache.camel.util.ObjectHelper;
043import org.apache.camel.util.ServiceHelper;
044
045public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport implements CamelContextAware, RuntimeEndpointRegistry {
046
047    private CamelContext camelContext;
048
049    // route id -> endpoint urls
050    private Map<String, Set<String>> inputs;
051    private Map<String, Map<String, String>> outputs;
052    private int limit = 1000;
053    private boolean enabled = true;
054    private volatile boolean extended;
055    private EndpointUtilizationStatistics inputUtilization;
056    private EndpointUtilizationStatistics outputUtilization;
057
058    public CamelContext getCamelContext() {
059        return camelContext;
060    }
061
062    public void setCamelContext(CamelContext camelContext) {
063        this.camelContext = camelContext;
064    }
065
066    public boolean isEnabled() {
067        return enabled;
068    }
069
070    public void setEnabled(boolean enabled) {
071        this.enabled = enabled;
072    }
073
074    @Override
075    public List<String> getAllEndpoints(boolean includeInputs) {
076        List<String> answer = new ArrayList<String>();
077        if (includeInputs) {
078            for (Map.Entry<String, Set<String>> entry : inputs.entrySet()) {
079                answer.addAll(entry.getValue());
080            }
081        }
082        for (Map.Entry<String, Map<String, String>> entry : outputs.entrySet()) {
083            answer.addAll(entry.getValue().keySet());
084        }
085        return Collections.unmodifiableList(answer);
086    }
087
088    @Override
089    public List<String> getEndpointsPerRoute(String routeId, boolean includeInputs) {
090        List<String> answer = new ArrayList<String>();
091        if (includeInputs) {
092            Set<String> uris = inputs.get(routeId);
093            if (uris != null) {
094                answer.addAll(uris);
095            }
096        }
097        Map<String, String> uris = outputs.get(routeId);
098        if (uris != null) {
099            answer.addAll(uris.keySet());
100        }
101        return Collections.unmodifiableList(answer);
102    }
103
104    @Override
105    public List<Statistic> getEndpointStatistics() {
106        List<Statistic> answer = new ArrayList<Statistic>();
107
108        // inputs
109        for (Map.Entry<String, Set<String>> entry : inputs.entrySet()) {
110            String routeId = entry.getKey();
111            for (String uri : entry.getValue()) {
112                Long hits = 0L;
113                if (extended) {
114                    String key = asUtilizationKey(routeId, uri);
115                    if (key != null) {
116                        hits = inputUtilization.getStatistics().get(key);
117                        if (hits == null) {
118                            hits = 0L;
119                        }
120                    }
121                }
122                answer.add(new EndpointRuntimeStatistics(uri, routeId, "in", hits));
123            }
124        }
125
126        // outputs
127        for (Map.Entry<String, Map<String, String>> entry : outputs.entrySet()) {
128            String routeId = entry.getKey();
129            for (String uri : entry.getValue().keySet()) {
130                Long hits = 0L;
131                if (extended) {
132                    String key = asUtilizationKey(routeId, uri);
133                    if (key != null) {
134                        hits = outputUtilization.getStatistics().get(key);
135                        if (hits == null) {
136                            hits = 0L;
137                        }
138                    }
139                }
140                answer.add(new EndpointRuntimeStatistics(uri, routeId, "out", hits));
141            }
142        }
143
144        return answer;
145    }
146
147    @Override
148    public int getLimit() {
149        return limit;
150    }
151
152    @Override
153    public void setLimit(int limit) {
154        this.limit = limit;
155    }
156
157    @Override
158    public void clear() {
159        inputs.clear();
160        outputs.clear();
161        reset();
162    }
163
164    @Override
165    public void reset() {
166        // its safe to call clear as reset
167        if (inputUtilization != null) {
168            inputUtilization.clear();
169        }
170        if (outputUtilization != null) {
171            outputUtilization.clear();
172        }
173    }
174
175    @Override
176    public int size() {
177        int total = inputs.values().size();
178        total += outputs.values().size();
179        return total;
180    }
181
182    @Override
183    protected void doStart() throws Exception {
184        ObjectHelper.notNull(camelContext, "camelContext", this);
185
186        if (inputs == null) {
187            inputs = new HashMap<String, Set<String>>();
188        }
189        if (outputs == null) {
190            outputs = new HashMap<String, Map<String, String>>();
191        }
192        if (getCamelContext().getManagementStrategy().getManagementAgent() != null) {
193            extended = getCamelContext().getManagementStrategy().getManagementAgent().getStatisticsLevel().isExtended();
194        }
195        if (extended) {
196            inputUtilization = new DefaultEndpointUtilizationStatistics(limit);
197            outputUtilization = new DefaultEndpointUtilizationStatistics(limit);
198        }
199        if (extended) {
200            log.info("Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: {})", limit);
201        } else {
202            log.info("Runtime endpoint registry is in normal mode gathering information of all incoming and outgoing endpoints (cache limit: {})", limit);
203        }
204        ServiceHelper.startServices(inputUtilization, outputUtilization);
205    }
206
207    @Override
208    protected void doStop() throws Exception {
209        clear();
210        ServiceHelper.stopServices(inputUtilization, outputUtilization);
211    }
212
213    @Override
214    @SuppressWarnings("unchecked")
215    public void notify(EventObject event) throws Exception {
216        if (event instanceof RouteAddedEvent) {
217            RouteAddedEvent rse = (RouteAddedEvent) event;
218            Endpoint endpoint = rse.getRoute().getEndpoint();
219            String routeId = rse.getRoute().getId();
220
221            // a HashSet is fine for inputs as we only have a limited number of those
222            Set<String> uris = new HashSet<String>();
223            uris.add(endpoint.getEndpointUri());
224            inputs.put(routeId, uris);
225            // use a LRUCache for outputs as we could potential have unlimited uris if dynamic routing is in use
226            // and therefore need to have the limit in use
227            outputs.put(routeId, LRUCacheFactory.newLRUCache(limit));
228        } else if (event instanceof RouteRemovedEvent) {
229            RouteRemovedEvent rse = (RouteRemovedEvent) event;
230            String routeId = rse.getRoute().getId();
231            inputs.remove(routeId);
232            outputs.remove(routeId);
233            if (extended) {
234                String uri = rse.getRoute().getEndpoint().getEndpointUri();
235                String key = asUtilizationKey(routeId, uri);
236                if (key != null) {
237                    inputUtilization.remove(key);
238                }
239            }
240        } else if (extended && event instanceof ExchangeCreatedEvent) {
241            // we only capture details in extended mode
242            ExchangeCreatedEvent ece = (ExchangeCreatedEvent) event;
243            Endpoint endpoint = ece.getExchange().getFromEndpoint();
244            if (endpoint != null) {
245                String routeId = ece.getExchange().getFromRouteId();
246                String uri = endpoint.getEndpointUri();
247                String key = asUtilizationKey(routeId, uri);
248                if (key != null) {
249                    inputUtilization.onHit(key);
250                }
251            }
252        } else if (event instanceof ExchangeSendingEvent) {
253            ExchangeSendingEvent ese = (ExchangeSendingEvent) event;
254            Endpoint endpoint = ese.getEndpoint();
255            String routeId = getRouteId(ese.getExchange());
256            String uri = endpoint.getEndpointUri();
257
258            Map<String, String> uris = outputs.get(routeId);
259            if (uris != null && !uris.containsKey(uri)) {
260                uris.put(uri, uri);
261            }
262            if (extended) {
263                String key = asUtilizationKey(routeId, uri);
264                if (key != null) {
265                    outputUtilization.onHit(key);
266                }
267            }
268        }
269    }
270
271    private String getRouteId(Exchange exchange) {
272        String answer = null;
273        UnitOfWork uow = exchange.getUnitOfWork();
274        RouteContext rc = uow != null ? uow.getRouteContext() : null;
275        if (rc != null) {
276            answer = rc.getRoute().getId();
277        }
278        if (answer == null) {
279            // fallback and get from route id on the exchange
280            answer = exchange.getFromRouteId();
281        }
282        return answer;
283    }
284
285    @Override
286    public boolean isDisabled() {
287        return !enabled;
288    }
289
290    @Override
291    public boolean isEnabled(EventObject event) {
292        return enabled && event instanceof ExchangeCreatedEvent
293                || event instanceof ExchangeSendingEvent
294                || event instanceof RouteAddedEvent
295                || event instanceof RouteRemovedEvent;
296    }
297
298    private static String asUtilizationKey(String routeId, String uri) {
299        if (routeId == null || uri == null) {
300            return null;
301        } else {
302            return routeId + "|" + uri;
303        }
304    }
305
306    private static final class EndpointRuntimeStatistics implements Statistic {
307
308        private final String uri;
309        private final String routeId;
310        private final String direction;
311        private final long hits;
312
313        private EndpointRuntimeStatistics(String uri, String routeId, String direction, long hits) {
314            this.uri = uri;
315            this.routeId = routeId;
316            this.direction = direction;
317            this.hits = hits;
318        }
319
320        public String getUri() {
321            return uri;
322        }
323
324        public String getRouteId() {
325            return routeId;
326        }
327
328        public String getDirection() {
329            return direction;
330        }
331
332        public long getHits() {
333            return hits;
334        }
335    }
336}