001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.management.mbean;
018
019import java.io.ByteArrayInputStream;
020import java.io.InputStream;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.Comparator;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentSkipListMap;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import javax.management.AttributeValueExp;
033import javax.management.MBeanServer;
034import javax.management.ObjectName;
035import javax.management.Query;
036import javax.management.QueryExp;
037import javax.management.StringValueExp;
038
039import org.w3c.dom.Document;
040
041import org.apache.camel.CamelContext;
042import org.apache.camel.Exchange;
043import org.apache.camel.ManagementStatisticsLevel;
044import org.apache.camel.Route;
045import org.apache.camel.ServiceStatus;
046import org.apache.camel.TimerListener;
047import org.apache.camel.api.management.ManagedResource;
048import org.apache.camel.api.management.mbean.ManagedProcessorMBean;
049import org.apache.camel.api.management.mbean.ManagedRouteMBean;
050import org.apache.camel.model.ModelCamelContext;
051import org.apache.camel.model.ModelHelper;
052import org.apache.camel.model.RouteDefinition;
053import org.apache.camel.spi.ManagementStrategy;
054import org.apache.camel.spi.RoutePolicy;
055import org.apache.camel.util.ObjectHelper;
056import org.apache.camel.util.XmlLineNumberParser;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060@ManagedResource(description = "Managed Route")
061public class ManagedRoute extends ManagedPerformanceCounter implements TimerListener, ManagedRouteMBean {
062
063    public static final String VALUE_UNKNOWN = "Unknown";
064
065    private static final Logger LOG = LoggerFactory.getLogger(ManagedRoute.class);
066
067    protected final Route route;
068    protected final String description;
069    protected final ModelCamelContext context;
070    private final LoadTriplet load = new LoadTriplet();
071    private final ConcurrentSkipListMap<InFlightKey, Long> exchangesInFlightStartTimestamps = new ConcurrentSkipListMap<InFlightKey, Long>();
072    private final ConcurrentHashMap<String, InFlightKey> exchangesInFlightKeys = new ConcurrentHashMap<String, InFlightKey>();
073    private final String jmxDomain;
074
075    public ManagedRoute(ModelCamelContext context, Route route) {
076        this.route = route;
077        this.context = context;
078        this.description = route.getDescription();
079        this.jmxDomain = context.getManagementStrategy().getManagementAgent().getMBeanObjectDomainName();
080    }
081
082    @Override
083    public void init(ManagementStrategy strategy) {
084        super.init(strategy);
085        boolean enabled = context.getManagementStrategy().getManagementAgent().getStatisticsLevel() != ManagementStatisticsLevel.Off;
086        setStatisticsEnabled(enabled);
087
088        exchangesInFlightKeys.clear();
089        exchangesInFlightStartTimestamps.clear();
090    }
091
092    public Route getRoute() {
093        return route;
094    }
095
096    public CamelContext getContext() {
097        return context;
098    }
099
100    public String getRouteId() {
101        String id = route.getId();
102        if (id == null) {
103            id = VALUE_UNKNOWN;
104        }
105        return id;
106    }
107
108    public String getDescription() {
109        return description;
110    }
111
112    @Override
113    public String getEndpointUri() {
114        if (route.getEndpoint() != null) {
115            return route.getEndpoint().getEndpointUri();
116        }
117        return VALUE_UNKNOWN;
118    }
119
120    public String getState() {
121        // must use String type to be sure remote JMX can read the attribute without requiring Camel classes.
122        ServiceStatus status = context.getRouteStatus(route.getId());
123        // if no status exists then its stopped
124        if (status == null) {
125            status = ServiceStatus.Stopped;
126        }
127        return status.name();
128    }
129
130    public String getUptime() {
131        return route.getUptime();
132    }
133
134    public long getUptimeMillis() {
135        return route.getUptimeMillis();
136    }
137
138    public Integer getInflightExchanges() {
139        return (int) super.getExchangesInflight();
140    }
141
142    public String getCamelId() {
143        return context.getName();
144    }
145
146    public String getCamelManagementName() {
147        return context.getManagementName();
148    }
149
150    public Boolean getTracing() {
151        return route.getRouteContext().isTracing();
152    }
153
154    public void setTracing(Boolean tracing) {
155        route.getRouteContext().setTracing(tracing);
156    }
157
158    public Boolean getMessageHistory() {
159        return route.getRouteContext().isMessageHistory();
160    }
161
162    public String getRoutePolicyList() {
163        List<RoutePolicy> policyList = route.getRouteContext().getRoutePolicyList();
164
165        if (policyList == null || policyList.isEmpty()) {
166            // return an empty string to have it displayed nicely in JMX consoles
167            return "";
168        }
169
170        StringBuilder sb = new StringBuilder();
171        for (int i = 0; i < policyList.size(); i++) {
172            RoutePolicy policy = policyList.get(i);
173            sb.append(policy.getClass().getSimpleName());
174            sb.append("(").append(ObjectHelper.getIdentityHashCode(policy)).append(")");
175            if (i < policyList.size() - 1) {
176                sb.append(", ");
177            }
178        }
179        return sb.toString();
180    }
181
182    public String getLoad01() {
183        double load1 = load.getLoad1();
184        if (Double.isNaN(load1)) {
185            // empty string if load statistics is disabled
186            return "";
187        } else {
188            return String.format("%.2f", load1);
189        }
190    }
191
192    public String getLoad05() {
193        double load5 = load.getLoad5();
194        if (Double.isNaN(load5)) {
195            // empty string if load statistics is disabled
196            return "";
197        } else {
198            return String.format("%.2f", load5);
199        }
200    }
201
202    public String getLoad15() {
203        double load15 = load.getLoad15();
204        if (Double.isNaN(load15)) {
205            // empty string if load statistics is disabled
206            return "";
207        } else {
208            return String.format("%.2f", load15);
209        }
210    }
211
212    @Override
213    public void onTimer() {
214        load.update(getInflightExchanges());
215    }
216
217    public void start() throws Exception {
218        if (!context.getStatus().isStarted()) {
219            throw new IllegalArgumentException("CamelContext is not started");
220        }
221        context.startRoute(getRouteId());
222    }
223
224    public void stop() throws Exception {
225        if (!context.getStatus().isStarted()) {
226            throw new IllegalArgumentException("CamelContext is not started");
227        }
228        context.stopRoute(getRouteId());
229    }
230
231    public void stop(long timeout) throws Exception {
232        if (!context.getStatus().isStarted()) {
233            throw new IllegalArgumentException("CamelContext is not started");
234        }
235        context.stopRoute(getRouteId(), timeout, TimeUnit.SECONDS);
236    }
237
238    public boolean stop(Long timeout, Boolean abortAfterTimeout) throws Exception {
239        if (!context.getStatus().isStarted()) {
240            throw new IllegalArgumentException("CamelContext is not started");
241        }
242        return context.stopRoute(getRouteId(), timeout, TimeUnit.SECONDS, abortAfterTimeout);
243    }
244
245    public void shutdown() throws Exception {
246        if (!context.getStatus().isStarted()) {
247            throw new IllegalArgumentException("CamelContext is not started");
248        }
249        String routeId = getRouteId();
250        context.stopRoute(routeId);
251        context.removeRoute(routeId);
252    }
253
254    public void shutdown(long timeout) throws Exception {
255        if (!context.getStatus().isStarted()) {
256            throw new IllegalArgumentException("CamelContext is not started");
257        }
258        String routeId = getRouteId();
259        context.stopRoute(routeId, timeout, TimeUnit.SECONDS);
260        context.removeRoute(routeId);
261    }
262
263    public boolean remove() throws Exception {
264        if (!context.getStatus().isStarted()) {
265            throw new IllegalArgumentException("CamelContext is not started");
266        }
267        return context.removeRoute(getRouteId());
268    }
269
270    public String dumpRouteAsXml() throws Exception {
271        return dumpRouteAsXml(false);
272    }
273
274    @Override
275    public String dumpRouteAsXml(boolean resolvePlaceholders) throws Exception {
276        String id = route.getId();
277        RouteDefinition def = context.getRouteDefinition(id);
278        if (def != null) {
279            String xml = ModelHelper.dumpModelAsXml(context, def);
280
281            // if resolving placeholders we parse the xml, and resolve the property placeholders during parsing
282            if (resolvePlaceholders) {
283                final AtomicBoolean changed = new AtomicBoolean();
284                InputStream is = new ByteArrayInputStream(xml.getBytes());
285                Document dom = XmlLineNumberParser.parseXml(is, new XmlLineNumberParser.XmlTextTransformer() {
286                    @Override
287                    public String transform(String text) {
288                        try {
289                            String after = getContext().resolvePropertyPlaceholders(text);
290                            if (!changed.get()) {
291                                changed.set(!text.equals(after));
292                            }
293                            return after;
294                        } catch (Exception e) {
295                            // ignore
296                            return text;
297                        }
298                    }
299                });
300                // okay there were some property placeholder replaced so re-create the model
301                if (changed.get()) {
302                    xml = context.getTypeConverter().mandatoryConvertTo(String.class, dom);
303                    RouteDefinition copy = ModelHelper.createModelFromXml(context, xml, RouteDefinition.class);
304                    xml = ModelHelper.dumpModelAsXml(context, copy);
305                }
306            }
307            return xml;
308        }
309        return null;
310    }
311
312    public void updateRouteFromXml(String xml) throws Exception {
313        // convert to model from xml
314        RouteDefinition def = ModelHelper.createModelFromXml(context, xml, RouteDefinition.class);
315        if (def == null) {
316            return;
317        }
318
319        // if the xml does not contain the route-id then we fix this by adding the actual route id
320        // this may be needed if the route-id was auto-generated, as the intend is to update this route
321        // and not add a new route, adding a new route, use the MBean operation on ManagedCamelContext instead.
322        if (ObjectHelper.isEmpty(def.getId())) {
323            def.setId(getRouteId());
324        } else if (!def.getId().equals(getRouteId())) {
325            throw new IllegalArgumentException("Cannot update route from XML as routeIds does not match. routeId: "
326                    + getRouteId() + ", routeId from XML: " + def.getId());
327        }
328
329        LOG.debug("Updating route: {} from xml: {}", def.getId(), xml);
330
331        try {
332            // add will remove existing route first
333            context.addRouteDefinition(def);
334        } catch (Exception e) {
335            // log the error as warn as the management api may be invoked remotely over JMX which does not propagate such exception
336            String msg = "Error updating route: " + def.getId() + " from xml: " + xml + " due: " + e.getMessage();
337            LOG.warn(msg, e);
338            throw e;
339        }
340    }
341
342    public String dumpRouteStatsAsXml(boolean fullStats, boolean includeProcessors) throws Exception {
343        // in this logic we need to calculate the accumulated processing time for the processor in the route
344        // and hence why the logic is a bit more complicated to do this, as we need to calculate that from
345        // the bottom -> top of the route but this information is valuable for profiling routes
346        StringBuilder sb = new StringBuilder();
347
348        // need to calculate this value first, as we need that value for the route stat
349        Long processorAccumulatedTime = 0L;
350
351        // gather all the processors for this route, which requires JMX
352        if (includeProcessors) {
353            sb.append("  <processorStats>\n");
354            MBeanServer server = getContext().getManagementStrategy().getManagementAgent().getMBeanServer();
355            if (server != null) {
356                // get all the processor mbeans and sort them accordingly to their index
357                String prefix = getContext().getManagementStrategy().getManagementAgent().getIncludeHostName() ? "*/" : "";
358                ObjectName query = ObjectName.getInstance(jmxDomain + ":context=" + prefix + getContext().getManagementName() + ",type=processors,*");
359                Set<ObjectName> names = server.queryNames(query, null);
360                List<ManagedProcessorMBean> mps = new ArrayList<ManagedProcessorMBean>();
361                for (ObjectName on : names) {
362                    ManagedProcessorMBean processor = context.getManagementStrategy().getManagementAgent().newProxyClient(on, ManagedProcessorMBean.class);
363
364                    // the processor must belong to this route
365                    if (getRouteId().equals(processor.getRouteId())) {
366                        mps.add(processor);
367                    }
368                }
369                Collections.sort(mps, new OrderProcessorMBeans());
370
371                // walk the processors in reverse order, and calculate the accumulated total time
372                Map<String, Long> accumulatedTimes = new HashMap<String, Long>();
373                Collections.reverse(mps);
374                for (ManagedProcessorMBean processor : mps) {
375                    processorAccumulatedTime += processor.getTotalProcessingTime();
376                    accumulatedTimes.put(processor.getProcessorId(), processorAccumulatedTime);
377                }
378                // and reverse back again
379                Collections.reverse(mps);
380
381                // and now add the sorted list of processors to the xml output
382                for (ManagedProcessorMBean processor : mps) {
383                    sb.append("    <processorStat").append(String.format(" id=\"%s\" index=\"%s\" state=\"%s\"", processor.getProcessorId(), processor.getIndex(), processor.getState()));
384                    // do we have an accumulated time then append that
385                    Long accTime = accumulatedTimes.get(processor.getProcessorId());
386                    if (accTime != null) {
387                        sb.append(" accumulatedProcessingTime=\"").append(accTime).append("\"");
388                    }
389                    // use substring as we only want the attributes
390                    sb.append(" ").append(processor.dumpStatsAsXml(fullStats).substring(7)).append("\n");
391                }
392            }
393            sb.append("  </processorStats>\n");
394        }
395
396        // route self time is route total - processor accumulated total)
397        long routeSelfTime = getTotalProcessingTime() - processorAccumulatedTime;
398        if (routeSelfTime < 0) {
399            // ensure we don't calculate that as negative
400            routeSelfTime = 0;
401        }
402
403        StringBuilder answer = new StringBuilder();
404        answer.append("<routeStat").append(String.format(" id=\"%s\"", route.getId())).append(String.format(" state=\"%s\"", getState()));
405        // use substring as we only want the attributes
406        String stat = dumpStatsAsXml(fullStats);
407        answer.append(" exchangesInflight=\"").append(getInflightExchanges()).append("\"");
408        answer.append(" selfProcessingTime=\"").append(routeSelfTime).append("\"");
409        InFlightKey oldestInflightEntry = getOldestInflightEntry();
410        if (oldestInflightEntry == null) {
411            answer.append(" oldestInflightExchangeId=\"\"");
412            answer.append(" oldestInflightDuration=\"\"");
413        } else {
414            answer.append(" oldestInflightExchangeId=\"").append(oldestInflightEntry.exchangeId).append("\"");
415            answer.append(" oldestInflightDuration=\"").append(System.currentTimeMillis() - oldestInflightEntry.timeStamp).append("\"");
416        }
417        answer.append(" ").append(stat.substring(7, stat.length() - 2)).append(">\n");
418
419        if (includeProcessors) {
420            answer.append(sb);
421        }
422
423        answer.append("</routeStat>");
424        return answer.toString();
425    }
426
427    public void reset(boolean includeProcessors) throws Exception {
428        reset();
429
430        // and now reset all processors for this route
431        if (includeProcessors) {
432            MBeanServer server = getContext().getManagementStrategy().getManagementAgent().getMBeanServer();
433            if (server != null) {
434                // get all the processor mbeans and sort them accordingly to their index
435                String prefix = getContext().getManagementStrategy().getManagementAgent().getIncludeHostName() ? "*/" : "";
436                ObjectName query = ObjectName.getInstance(jmxDomain + ":context=" + prefix + getContext().getManagementName() + ",type=processors,*");
437                QueryExp queryExp = Query.match(new AttributeValueExp("RouteId"), new StringValueExp(getRouteId()));
438                Set<ObjectName> names = server.queryNames(query, queryExp);
439                for (ObjectName name : names) {
440                    server.invoke(name, "reset", null, null);
441                }
442            }
443        }
444    }
445
446    public String createRouteStaticEndpointJson() {
447        return getContext().createRouteStaticEndpointJson(getRouteId());
448    }
449
450    @Override
451    public String createRouteStaticEndpointJson(boolean includeDynamic) {
452        return getContext().createRouteStaticEndpointJson(getRouteId(), includeDynamic);
453    }
454
455    @Override
456    public boolean equals(Object o) {
457        return this == o || (o != null && getClass() == o.getClass() && route.equals(((ManagedRoute) o).route));
458    }
459
460    @Override
461    public int hashCode() {
462        return route.hashCode();
463    }
464
465    private InFlightKey getOldestInflightEntry() {
466        Map.Entry<InFlightKey, Long> entry = exchangesInFlightStartTimestamps.firstEntry();
467        if (entry != null) {
468            return entry.getKey();
469        }
470        return null;
471    }
472
473    public Long getOldestInflightDuration() {
474        InFlightKey oldest = getOldestInflightEntry();
475        if (oldest == null) {
476            return null;
477        }
478        return System.currentTimeMillis() - oldest.timeStamp;
479    }
480
481    public String getOldestInflightExchangeId() {
482        InFlightKey oldest = getOldestInflightEntry();
483        if (oldest == null) {
484            return null;
485        }
486        return oldest.exchangeId;
487    }
488
489    @Override
490    public synchronized void processExchange(Exchange exchange) {
491        InFlightKey key = new InFlightKey(System.currentTimeMillis(), exchange.getExchangeId());
492        InFlightKey oldKey = exchangesInFlightKeys.putIfAbsent(exchange.getExchangeId(), key);
493        // we may already have the exchange being processed so only add to timestamp if its a new exchange
494        // for example when people call the same routes recursive
495        if (oldKey == null) {
496            exchangesInFlightStartTimestamps.put(key, key.timeStamp);
497        }
498        super.processExchange(exchange);
499    }
500
501    @Override
502    public synchronized void completedExchange(Exchange exchange, long time) {
503        InFlightKey key = exchangesInFlightKeys.remove(exchange.getExchangeId());
504        if (key != null) {
505            exchangesInFlightStartTimestamps.remove(key);
506        }
507        super.completedExchange(exchange, time);
508    }
509
510    @Override
511    public synchronized void failedExchange(Exchange exchange) {
512        InFlightKey key = exchangesInFlightKeys.remove(exchange.getExchangeId());
513        if (key != null) {
514            exchangesInFlightStartTimestamps.remove(key);
515        }
516        super.failedExchange(exchange);
517    }
518
519    private static class InFlightKey implements Comparable<InFlightKey> {
520
521        private final Long timeStamp;
522        private final String exchangeId;
523
524        InFlightKey(Long timeStamp, String exchangeId) {
525            this.timeStamp = timeStamp;
526            this.exchangeId = exchangeId;
527        }
528
529        @Override
530        public int compareTo(InFlightKey o) {
531            int compare = Long.compare(timeStamp, o.timeStamp);
532            if (compare == 0) {
533                return exchangeId.compareTo(o.exchangeId);
534            }
535            return compare;
536        }
537
538        @Override
539        public boolean equals(Object o) {
540            if (this == o) {
541                return true;
542            }
543            if (o == null || getClass() != o.getClass()) {
544                return false;
545            }
546
547            InFlightKey that = (InFlightKey) o;
548
549            if (!exchangeId.equals(that.exchangeId)) {
550                return false;
551            }
552            if (!timeStamp.equals(that.timeStamp)) {
553                return false;
554            }
555
556            return true;
557        }
558
559        @Override
560        public int hashCode() {
561            int result = timeStamp.hashCode();
562            result = 31 * result + exchangeId.hashCode();
563            return result;
564        }
565
566        @Override
567        public String toString() {
568            return exchangeId;
569        }
570    }
571
572    /**
573     * Used for sorting the processor mbeans accordingly to their index.
574     */
575    private static final class OrderProcessorMBeans implements Comparator<ManagedProcessorMBean> {
576
577        @Override
578        public int compare(ManagedProcessorMBean o1, ManagedProcessorMBean o2) {
579            return o1.getIndex().compareTo(o2.getIndex());
580        }
581    }
582}