001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.concurrent.ScheduledExecutorService;
020import java.util.concurrent.ScheduledFuture;
021import java.util.concurrent.TimeUnit;
022import java.util.concurrent.atomic.AtomicBoolean;
023
024import org.apache.camel.CamelContext;
025import org.apache.camel.CamelContextAware;
026import org.apache.camel.Exchange;
027import org.apache.camel.Route;
028import org.apache.camel.util.ObjectHelper;
029
030/**
031 * {@link org.apache.camel.spi.RoutePolicy} which executes for a duration and then triggers an action.
032 * <p/>
033 * This can be used to stop the route after it has processed a number of messages, or has been running for N seconds.
034 */
035public class DurationRoutePolicy extends org.apache.camel.support.RoutePolicySupport implements CamelContextAware {
036
037    enum Action {
038        STOP_CAMEL_CONTEXT, STOP_ROUTE, SUSPEND_ROUTE, SUSPEND_ALL_ROUTES
039    }
040
041    private CamelContext camelContext;
042    private String routeId;
043    private ScheduledExecutorService executorService;
044    private volatile ScheduledFuture task;
045    private volatile int doneMessages;
046    private AtomicBoolean actionDone = new AtomicBoolean();
047
048    private Action action = Action.STOP_ROUTE;
049    private int maxMessages;
050    private int maxSeconds;
051
052    public DurationRoutePolicy() {
053    }
054
055    public DurationRoutePolicy(CamelContext camelContext, String routeId) {
056        this.camelContext = camelContext;
057        this.routeId = routeId;
058    }
059
060    @Override
061    public CamelContext getCamelContext() {
062        return camelContext;
063    }
064
065    @Override
066    public void setCamelContext(CamelContext camelContext) {
067        this.camelContext = camelContext;
068    }
069
070    public int getMaxMessages() {
071        return maxMessages;
072    }
073
074    /**
075     * Maximum number of messages to process before the action is triggered
076     */
077    public void setMaxMessages(int maxMessages) {
078        this.maxMessages = maxMessages;
079    }
080
081    public int getMaxSeconds() {
082        return maxSeconds;
083    }
084
085    /**
086     * Maximum seconds Camel is running before the action is triggered
087     */
088    public void setMaxSeconds(int maxSeconds) {
089        this.maxSeconds = maxSeconds;
090    }
091
092    public Action getAction() {
093        return action;
094    }
095
096    /**
097     * What action to perform when maximum is triggered.
098     */
099    public void setAction(Action action) {
100        this.action = action;
101    }
102
103    @Override
104    public void onInit(Route route) {
105        super.onInit(route);
106
107        ObjectHelper.notNull(camelContext, "camelContext", this);
108
109        if (maxMessages == 0 && maxSeconds == 0) {
110            throw new IllegalArgumentException("The options maxMessages or maxSeconds must be configured");
111        }
112
113        if (routeId == null) {
114            this.routeId = route.getId();
115        }
116
117        if (executorService == null) {
118            executorService = camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "DurationRoutePolicy[" + routeId + "]");
119        }
120
121        if (maxSeconds > 0) {
122            task = performMaxDurationAction();
123        }
124    }
125
126    @Override
127    public void onExchangeDone(Route route, Exchange exchange) {
128        doneMessages++;
129
130        if (maxMessages > 0 && doneMessages >= maxMessages) {
131            if (actionDone.compareAndSet(false, true)) {
132                performMaxMessagesAction();
133                if (task != null && !task.isDone()) {
134                    task.cancel(false);
135                }
136            }
137        }
138    }
139
140    @Override
141    protected void doStop() throws Exception {
142        if (task != null && !task.isDone()) {
143            task.cancel(false);
144        }
145
146        if (executorService != null) {
147            getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
148            executorService = null;
149        }
150    }
151
152    protected void performMaxMessagesAction() {
153        executorService.submit(createTask(true));
154    }
155
156    protected ScheduledFuture performMaxDurationAction() {
157        return executorService.schedule(createTask(false), maxSeconds, TimeUnit.SECONDS);
158    }
159
160    private Runnable createTask(boolean maxMessagesHit) {
161        return () -> {
162            try {
163                String tail;
164                if (maxMessagesHit) {
165                    tail = " due max messages " + getMaxMessages() + " processed";
166                } else {
167                    tail = " due max seconds " + getMaxSeconds();
168                }
169
170                if (action == Action.STOP_CAMEL_CONTEXT) {
171                    log.info("Stopping CamelContext {}", tail);
172                    camelContext.stop();
173                } else if (action == Action.STOP_ROUTE) {
174                    log.info("Stopping route: {}{}", routeId, tail);
175                    camelContext.stopRoute(routeId);
176                } else if (action == Action.SUSPEND_ROUTE) {
177                    log.info("Suspending route: {}{}", routeId, tail);
178                    camelContext.suspendRoute(routeId);
179                } else if (action == Action.SUSPEND_ALL_ROUTES) {
180                    log.info("Suspending all routes {}", tail);
181                    camelContext.suspend();
182                }
183            } catch (Throwable e) {
184                log.warn("Error performing action: " + action, e);
185            }
186        };
187    }
188}