001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.support;
018
019import java.util.concurrent.TimeUnit;
020
021import org.apache.camel.Consumer;
022import org.apache.camel.Exchange;
023import org.apache.camel.Route;
024import org.apache.camel.Suspendable;
025import org.apache.camel.spi.ExceptionHandler;
026import org.apache.camel.spi.RoutePolicy;
027import org.apache.camel.util.ServiceHelper;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * A base class for developing custom {@link RoutePolicy} implementations.
033 *
034 * @version 
035 */
036public abstract class RoutePolicySupport extends ServiceSupport implements RoutePolicy {
037
038    protected final Logger log = LoggerFactory.getLogger(getClass());
039    private ExceptionHandler exceptionHandler;
040
041    public void onInit(Route route) {
042        if (exceptionHandler == null) {
043            exceptionHandler = new LoggingExceptionHandler(route.getRouteContext().getCamelContext(), getClass());
044        }
045    }
046
047    public void onRemove(Route route) {
048        // noop
049    }
050
051    @Override
052    public void onStart(Route route) {
053        // noop
054    }
055
056    @Override
057    public void onStop(Route route) {
058        // noop
059    }
060
061    @Override
062    public void onSuspend(Route route) {
063        // noop
064    }
065
066    @Override
067    public void onResume(Route route) {
068        // noop
069    }
070
071    public void onExchangeBegin(Route route, Exchange exchange) {
072        // noop
073    }
074
075    public void onExchangeDone(Route route, Exchange exchange) {
076        // noop
077    }
078
079    /**
080     * Starts the consumer.
081     *
082     * @return the returned value is always <tt>true</tt> and should not be used.
083     * @see #resumeOrStartConsumer(Consumer)
084     */
085    public boolean startConsumer(Consumer consumer) throws Exception {
086        // TODO: change to void in Camel 3.0
087        ServiceHelper.startService(consumer);
088        log.debug("Started consumer {}", consumer);
089        return true;
090    }
091
092    /**
093     * Stops the consumer.
094     *
095     * @return the returned value is always <tt>true</tt> and should not be used.
096     * @see #suspendOrStopConsumer(Consumer)
097     */
098    public boolean stopConsumer(Consumer consumer) throws Exception {
099        // TODO: change to void in Camel 3.0
100        // stop and shutdown
101        ServiceHelper.stopAndShutdownServices(consumer);
102        log.debug("Stopped consumer {}", consumer);
103        return true;
104    }
105
106    /**
107     * Suspends or stops the consumer.
108     *
109     * If the consumer is {@link org.apache.camel.Suspendable} then the consumer is suspended,
110     * otherwise the consumer is stopped.
111     *
112     * @see #stopConsumer(Consumer)
113     * @return <tt>true</tt> if the consumer was suspended or stopped, <tt>false</tt> if the consumer was already suspend or stopped
114     */
115    public boolean suspendOrStopConsumer(Consumer consumer) throws Exception {
116        if (consumer instanceof Suspendable) {
117            boolean suspended = ServiceHelper.suspendService(consumer);
118            if (suspended) {
119                log.debug("Suspended consumer {}", consumer);
120            } else {
121                log.trace("Consumer already suspended {}", consumer);
122            }
123            return suspended;
124        }
125        if (!ServiceHelper.isStopped(consumer)) {
126            ServiceHelper.stopService(consumer);
127            log.debug("Stopped consumer {}", consumer);
128            return true;
129        }
130        return false;
131    }
132
133    /**
134     * Resumes or starts the consumer.
135     *
136     * If the consumer is {@link org.apache.camel.Suspendable} then the consumer is resumed,
137     * otherwise the consumer is started.
138     *
139     * @see #startConsumer(Consumer)
140     * @return <tt>true</tt> if the consumer was resumed or started, <tt>false</tt> if the consumer was already resumed or started
141     */
142    public boolean resumeOrStartConsumer(Consumer consumer) throws Exception {
143        if (consumer instanceof Suspendable) {
144            boolean resumed = ServiceHelper.resumeService(consumer);
145            if (resumed) {
146                log.debug("Resumed consumer {}", consumer);
147            } else {
148                log.trace("Consumer already resumed {}", consumer);
149            }
150            return resumed;
151        }
152        if (!ServiceHelper.isStarted(consumer)) {
153            ServiceHelper.startService(consumer);
154            log.debug("Started consumer {}", consumer);
155            return true;
156        }
157        return false;
158    }
159
160    public void startRoute(Route route) throws Exception {
161        route.getRouteContext().getCamelContext().startRoute(route.getId());
162    }
163
164    public void resumeRoute(Route route) throws Exception {
165        route.getRouteContext().getCamelContext().resumeRoute(route.getId());
166    }
167
168    public void suspendRoute(Route route) throws Exception {
169        route.getRouteContext().getCamelContext().suspendRoute(route.getId());
170    }
171
172    public void suspendRoute(Route route, long timeout, TimeUnit timeUnit) throws Exception {
173        route.getRouteContext().getCamelContext().suspendRoute(route.getId(), timeout, timeUnit);
174    }
175
176    /**
177     * @see #stopRouteAsync(Route)
178     */
179    public void stopRoute(Route route) throws Exception {
180        route.getRouteContext().getCamelContext().stopRoute(route.getId());
181    }
182
183    /**
184     * @see #stopRouteAsync(Route)
185     */
186    public void stopRoute(Route route, long timeout, TimeUnit timeUnit) throws Exception {
187        route.getRouteContext().getCamelContext().stopRoute(route.getId(), timeout, timeUnit);
188    }
189
190    /**
191     * Allows to stop a route asynchronously using a separate background thread which can allow any current in-flight exchange
192     * to complete while the route is being shutdown.
193     * You may attempt to stop a route from processing an exchange which would be in-flight and therefore attempting to stop
194     * the route will defer due there is an inflight exchange in-progress. By stopping the route independently using a separate
195     * thread ensures the exchange can continue process and complete and the route can be stopped.
196     */
197    public void stopRouteAsync(final Route route) {
198        String threadId = route.getRouteContext().getCamelContext().getExecutorServiceManager().resolveThreadName("StopRouteAsync");
199        Runnable task = () -> {
200            try {
201                route.getRouteContext().getCamelContext().stopRoute(route.getId());
202            } catch (Exception e) {
203                handleException(e);
204            }
205        };
206        new Thread(task, threadId).start();
207    }
208
209    /**
210     * Handles the given exception using the {@link #getExceptionHandler()}
211     *
212     * @param t the exception to handle
213     */
214    protected void handleException(Throwable t) {
215        if (exceptionHandler != null) {
216            exceptionHandler.handleException(t);
217        }
218    }
219
220    @Override
221    protected void doStart() throws Exception {
222        // noop
223    }
224
225    @Override
226    protected void doStop() throws Exception {
227        // noop
228    }
229
230    public ExceptionHandler getExceptionHandler() {
231        return exceptionHandler;
232    }
233
234    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
235        this.exceptionHandler = exceptionHandler;
236    }
237
238}