001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.support; 018 019import java.util.concurrent.TimeUnit; 020 021import org.apache.camel.Consumer; 022import org.apache.camel.Exchange; 023import org.apache.camel.Route; 024import org.apache.camel.Suspendable; 025import org.apache.camel.spi.ExceptionHandler; 026import org.apache.camel.spi.RoutePolicy; 027import org.apache.camel.util.ServiceHelper; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031/** 032 * A base class for developing custom {@link RoutePolicy} implementations. 033 * 034 * @version 035 */ 036public abstract class RoutePolicySupport extends ServiceSupport implements RoutePolicy { 037 038 protected final Logger log = LoggerFactory.getLogger(getClass()); 039 private ExceptionHandler exceptionHandler; 040 041 public void onInit(Route route) { 042 if (exceptionHandler == null) { 043 exceptionHandler = new LoggingExceptionHandler(route.getRouteContext().getCamelContext(), getClass()); 044 } 045 } 046 047 public void onRemove(Route route) { 048 // noop 049 } 050 051 @Override 052 public void onStart(Route route) { 053 // noop 054 } 055 056 @Override 057 public void onStop(Route route) { 058 // noop 059 } 060 061 @Override 062 public void onSuspend(Route route) { 063 // noop 064 } 065 066 @Override 067 public void onResume(Route route) { 068 // noop 069 } 070 071 public void onExchangeBegin(Route route, Exchange exchange) { 072 // noop 073 } 074 075 public void onExchangeDone(Route route, Exchange exchange) { 076 // noop 077 } 078 079 /** 080 * Starts the consumer. 081 * 082 * @return the returned value is always <tt>true</tt> and should not be used. 083 * @see #resumeOrStartConsumer(Consumer) 084 */ 085 public boolean startConsumer(Consumer consumer) throws Exception { 086 // TODO: change to void in Camel 3.0 087 ServiceHelper.startService(consumer); 088 log.debug("Started consumer {}", consumer); 089 return true; 090 } 091 092 /** 093 * Stops the consumer. 094 * 095 * @return the returned value is always <tt>true</tt> and should not be used. 096 * @see #suspendOrStopConsumer(Consumer) 097 */ 098 public boolean stopConsumer(Consumer consumer) throws Exception { 099 // TODO: change to void in Camel 3.0 100 // stop and shutdown 101 ServiceHelper.stopAndShutdownServices(consumer); 102 log.debug("Stopped consumer {}", consumer); 103 return true; 104 } 105 106 /** 107 * Suspends or stops the consumer. 108 * 109 * If the consumer is {@link org.apache.camel.Suspendable} then the consumer is suspended, 110 * otherwise the consumer is stopped. 111 * 112 * @see #stopConsumer(Consumer) 113 * @return <tt>true</tt> if the consumer was suspended or stopped, <tt>false</tt> if the consumer was already suspend or stopped 114 */ 115 public boolean suspendOrStopConsumer(Consumer consumer) throws Exception { 116 if (consumer instanceof Suspendable) { 117 boolean suspended = ServiceHelper.suspendService(consumer); 118 if (suspended) { 119 log.debug("Suspended consumer {}", consumer); 120 } else { 121 log.trace("Consumer already suspended {}", consumer); 122 } 123 return suspended; 124 } 125 if (!ServiceHelper.isStopped(consumer)) { 126 ServiceHelper.stopService(consumer); 127 log.debug("Stopped consumer {}", consumer); 128 return true; 129 } 130 return false; 131 } 132 133 /** 134 * Resumes or starts the consumer. 135 * 136 * If the consumer is {@link org.apache.camel.Suspendable} then the consumer is resumed, 137 * otherwise the consumer is started. 138 * 139 * @see #startConsumer(Consumer) 140 * @return <tt>true</tt> if the consumer was resumed or started, <tt>false</tt> if the consumer was already resumed or started 141 */ 142 public boolean resumeOrStartConsumer(Consumer consumer) throws Exception { 143 if (consumer instanceof Suspendable) { 144 boolean resumed = ServiceHelper.resumeService(consumer); 145 if (resumed) { 146 log.debug("Resumed consumer {}", consumer); 147 } else { 148 log.trace("Consumer already resumed {}", consumer); 149 } 150 return resumed; 151 } 152 if (!ServiceHelper.isStarted(consumer)) { 153 ServiceHelper.startService(consumer); 154 log.debug("Started consumer {}", consumer); 155 return true; 156 } 157 return false; 158 } 159 160 public void startRoute(Route route) throws Exception { 161 route.getRouteContext().getCamelContext().startRoute(route.getId()); 162 } 163 164 public void resumeRoute(Route route) throws Exception { 165 route.getRouteContext().getCamelContext().resumeRoute(route.getId()); 166 } 167 168 public void suspendRoute(Route route) throws Exception { 169 route.getRouteContext().getCamelContext().suspendRoute(route.getId()); 170 } 171 172 public void suspendRoute(Route route, long timeout, TimeUnit timeUnit) throws Exception { 173 route.getRouteContext().getCamelContext().suspendRoute(route.getId(), timeout, timeUnit); 174 } 175 176 /** 177 * @see #stopRouteAsync(Route) 178 */ 179 public void stopRoute(Route route) throws Exception { 180 route.getRouteContext().getCamelContext().stopRoute(route.getId()); 181 } 182 183 /** 184 * @see #stopRouteAsync(Route) 185 */ 186 public void stopRoute(Route route, long timeout, TimeUnit timeUnit) throws Exception { 187 route.getRouteContext().getCamelContext().stopRoute(route.getId(), timeout, timeUnit); 188 } 189 190 /** 191 * Allows to stop a route asynchronously using a separate background thread which can allow any current in-flight exchange 192 * to complete while the route is being shutdown. 193 * You may attempt to stop a route from processing an exchange which would be in-flight and therefore attempting to stop 194 * the route will defer due there is an inflight exchange in-progress. By stopping the route independently using a separate 195 * thread ensures the exchange can continue process and complete and the route can be stopped. 196 */ 197 public void stopRouteAsync(final Route route) { 198 String threadId = route.getRouteContext().getCamelContext().getExecutorServiceManager().resolveThreadName("StopRouteAsync"); 199 Runnable task = () -> { 200 try { 201 route.getRouteContext().getCamelContext().stopRoute(route.getId()); 202 } catch (Exception e) { 203 handleException(e); 204 } 205 }; 206 new Thread(task, threadId).start(); 207 } 208 209 /** 210 * Handles the given exception using the {@link #getExceptionHandler()} 211 * 212 * @param t the exception to handle 213 */ 214 protected void handleException(Throwable t) { 215 if (exceptionHandler != null) { 216 exceptionHandler.handleException(t); 217 } 218 } 219 220 @Override 221 protected void doStart() throws Exception { 222 // noop 223 } 224 225 @Override 226 protected void doStop() throws Exception { 227 // noop 228 } 229 230 public ExceptionHandler getExceptionHandler() { 231 return exceptionHandler; 232 } 233 234 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 235 this.exceptionHandler = exceptionHandler; 236 } 237 238}