001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.Locale; 020 import java.util.concurrent.ScheduledExecutorService; 021 import java.util.concurrent.ScheduledFuture; 022 import java.util.concurrent.TimeUnit; 023 024 import org.apache.camel.Endpoint; 025 import org.apache.camel.Exchange; 026 import org.apache.camel.LoggingLevel; 027 import org.apache.camel.PollingConsumerPollingStrategy; 028 import org.apache.camel.Processor; 029 import org.apache.camel.StatefulService; 030 import org.apache.camel.SuspendableService; 031 import org.apache.camel.spi.PollingConsumerPollStrategy; 032 import org.apache.camel.util.ObjectHelper; 033 import org.apache.camel.util.ServiceHelper; 034 import org.slf4j.Logger; 035 import org.slf4j.LoggerFactory; 036 037 /** 038 * A useful base class for any consumer which is polling based 039 * 040 * @version 041 */ 042 public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, SuspendableService, PollingConsumerPollingStrategy { 043 private static final transient Logger LOG = LoggerFactory.getLogger(ScheduledPollConsumer.class); 044 045 private ScheduledExecutorService scheduledExecutorService; 046 private boolean shutdownExecutor; 047 private ScheduledFuture<?> future; 048 049 // if adding more options then align with ScheduledPollEndpoint#configureScheduledPollConsumerProperties 050 private boolean startScheduler = true; 051 private long initialDelay = 1000; 052 private long delay = 500; 053 private TimeUnit timeUnit = TimeUnit.MILLISECONDS; 054 private boolean useFixedDelay = true; 055 private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy(); 056 private LoggingLevel runLoggingLevel = LoggingLevel.TRACE; 057 private boolean sendEmptyMessageWhenIdle; 058 private volatile boolean polling; 059 060 public ScheduledPollConsumer(Endpoint endpoint, Processor processor) { 061 super(endpoint, processor); 062 } 063 064 public ScheduledPollConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService scheduledExecutorService) { 065 super(endpoint, processor); 066 // we have been given an existing thread pool, so we should not manage its lifecycle 067 // so we should keep shutdownExecutor as false 068 this.scheduledExecutorService = scheduledExecutorService; 069 ObjectHelper.notNull(scheduledExecutorService, "scheduledExecutorService"); 070 } 071 072 /** 073 * Invoked whenever we should be polled 074 */ 075 public void run() { 076 // avoid this thread to throw exceptions because the thread pool wont re-schedule a new thread 077 try { 078 // log starting 079 if (LoggingLevel.ERROR == runLoggingLevel) { 080 LOG.error("Scheduled task started on: {}", this.getEndpoint()); 081 } else if (LoggingLevel.WARN == runLoggingLevel) { 082 LOG.warn("Scheduled task started on: {}", this.getEndpoint()); 083 } else if (LoggingLevel.INFO == runLoggingLevel) { 084 LOG.info("Scheduled task started on: {}", this.getEndpoint()); 085 } else if (LoggingLevel.DEBUG == runLoggingLevel) { 086 LOG.debug("Scheduled task started on: {}", this.getEndpoint()); 087 } else { 088 LOG.trace("Scheduled task started on: {}", this.getEndpoint()); 089 } 090 091 // execute scheduled task 092 doRun(); 093 094 // log completed 095 if (LoggingLevel.ERROR == runLoggingLevel) { 096 LOG.error("Scheduled task completed on: {}", this.getEndpoint()); 097 } else if (LoggingLevel.WARN == runLoggingLevel) { 098 LOG.warn("Scheduled task completed on: {}", this.getEndpoint()); 099 } else if (LoggingLevel.INFO == runLoggingLevel) { 100 LOG.info("Scheduled task completed on: {}", this.getEndpoint()); 101 } else if (LoggingLevel.DEBUG == runLoggingLevel) { 102 LOG.debug("Scheduled task completed on: {}", this.getEndpoint()); 103 } else { 104 LOG.trace("Scheduled task completed on: {}", this.getEndpoint()); 105 } 106 107 } catch (Error e) { 108 // must catch Error, to ensure the task is re-scheduled 109 LOG.error("Error occurred during running scheduled task on: " + this.getEndpoint() + ", due: " + e.getMessage(), e); 110 } 111 } 112 113 private void doRun() { 114 if (isSuspended()) { 115 LOG.trace("Cannot start to poll: {} as its suspended", this.getEndpoint()); 116 return; 117 } 118 119 int retryCounter = -1; 120 boolean done = false; 121 Throwable cause = null; 122 123 while (!done) { 124 try { 125 cause = null; 126 // eager assume we are done 127 done = true; 128 if (isPollAllowed()) { 129 130 if (retryCounter == -1) { 131 LOG.trace("Starting to poll: {}", this.getEndpoint()); 132 } else { 133 LOG.debug("Retrying attempt {} to poll: {}", retryCounter, this.getEndpoint()); 134 } 135 136 // mark we are polling which should also include the begin/poll/commit 137 polling = true; 138 try { 139 boolean begin = pollStrategy.begin(this, getEndpoint()); 140 if (begin) { 141 retryCounter++; 142 int polledMessages = poll(); 143 144 if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) { 145 // send an "empty" exchange 146 processEmptyMessage(); 147 } 148 149 pollStrategy.commit(this, getEndpoint(), polledMessages); 150 } else { 151 LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy); 152 } 153 } finally { 154 polling = false; 155 } 156 } 157 158 LOG.trace("Finished polling: {}", this.getEndpoint()); 159 } catch (Exception e) { 160 try { 161 boolean retry = pollStrategy.rollback(this, getEndpoint(), retryCounter, e); 162 if (retry) { 163 // do not set cause as we retry 164 done = false; 165 } else { 166 cause = e; 167 done = true; 168 } 169 } catch (Throwable t) { 170 cause = t; 171 done = true; 172 } 173 } catch (Throwable t) { 174 cause = t; 175 done = true; 176 } 177 178 if (cause != null && isRunAllowed()) { 179 // let exception handler deal with the caused exception 180 // but suppress this during shutdown as the logs may get flooded with exceptions during shutdown/forced shutdown 181 try { 182 getExceptionHandler().handleException("Consumer " + this + " failed polling endpoint: " + getEndpoint() 183 + ". Will try again at next poll", cause); 184 } catch (Throwable e) { 185 LOG.warn("Error handling exception. This exception will be ignored.", e); 186 } 187 cause = null; 188 } 189 } 190 191 // avoid this thread to throw exceptions because the thread pool wont re-schedule a new thread 192 } 193 194 /** 195 * No messages to poll so send an empty message instead. 196 * 197 * @throws Exception is thrown if error processing the empty message. 198 */ 199 protected void processEmptyMessage() throws Exception { 200 Exchange exchange = getEndpoint().createExchange(); 201 log.debug("Sending empty message as there were no messages from polling: {}", this.getEndpoint()); 202 getProcessor().process(exchange); 203 } 204 205 // Properties 206 // ------------------------------------------------------------------------- 207 208 protected boolean isPollAllowed() { 209 return isRunAllowed() && !isSuspended(); 210 } 211 212 /** 213 * Whether polling is currently in progress 214 */ 215 protected boolean isPolling() { 216 return polling; 217 } 218 219 public long getInitialDelay() { 220 return initialDelay; 221 } 222 223 public void setInitialDelay(long initialDelay) { 224 this.initialDelay = initialDelay; 225 } 226 227 public long getDelay() { 228 return delay; 229 } 230 231 public void setDelay(long delay) { 232 this.delay = delay; 233 } 234 235 public TimeUnit getTimeUnit() { 236 return timeUnit; 237 } 238 239 /** 240 * Sets the time unit to use. 241 * <p/> 242 * Notice that both {@link #getDelay()} and {@link #getInitialDelay()} are using 243 * the same time unit. So if you change this value, then take into account that the 244 * default value of {@link #getInitialDelay()} is 1000. So you may to adjust this value accordingly. 245 * 246 * @param timeUnit the time unit. 247 */ 248 public void setTimeUnit(TimeUnit timeUnit) { 249 this.timeUnit = timeUnit; 250 } 251 252 public boolean isUseFixedDelay() { 253 return useFixedDelay; 254 } 255 256 public void setUseFixedDelay(boolean useFixedDelay) { 257 this.useFixedDelay = useFixedDelay; 258 } 259 260 public LoggingLevel getRunLoggingLevel() { 261 return runLoggingLevel; 262 } 263 264 public void setRunLoggingLevel(LoggingLevel runLoggingLevel) { 265 this.runLoggingLevel = runLoggingLevel; 266 } 267 268 public PollingConsumerPollStrategy getPollStrategy() { 269 return pollStrategy; 270 } 271 272 public void setPollStrategy(PollingConsumerPollStrategy pollStrategy) { 273 this.pollStrategy = pollStrategy; 274 } 275 276 public boolean isStartScheduler() { 277 return startScheduler; 278 } 279 280 /** 281 * Sets whether the scheduler should be started when this consumer starts. 282 * <p/> 283 * This option is default true. 284 * 285 * @param startScheduler whether to start scheduler 286 */ 287 public void setStartScheduler(boolean startScheduler) { 288 this.startScheduler = startScheduler; 289 } 290 291 public void setSendEmptyMessageWhenIdle(boolean sendEmptyMessageWhenIdle) { 292 this.sendEmptyMessageWhenIdle = sendEmptyMessageWhenIdle; 293 } 294 295 public boolean isSendEmptyMessageWhenIdle() { 296 return sendEmptyMessageWhenIdle; 297 } 298 299 public ScheduledExecutorService getScheduledExecutorService() { 300 return scheduledExecutorService; 301 } 302 303 /** 304 * Sets a custom shared {@link ScheduledExecutorService} to use as thread pool 305 * <p/> 306 * <b>Notice: </b> When using a custom thread pool, then the lifecycle of this thread 307 * pool is not controlled by this consumer (eg this consumer will not start/stop the thread pool 308 * when the consumer is started/stopped etc.) 309 * 310 * @param scheduledExecutorService the custom thread pool to use 311 */ 312 public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { 313 this.scheduledExecutorService = scheduledExecutorService; 314 } 315 316 // Implementation methods 317 // ------------------------------------------------------------------------- 318 319 /** 320 * The polling method which is invoked periodically to poll this consumer 321 * 322 * @return number of messages polled, will be <tt>0</tt> if no message was polled at all. 323 * @throws Exception can be thrown if an exception occurred during polling 324 */ 325 protected abstract int poll() throws Exception; 326 327 @Override 328 protected void doStart() throws Exception { 329 super.doStart(); 330 331 // if no existing executor provided, then create a new thread pool ourselves 332 if (scheduledExecutorService == null) { 333 // we only need one thread in the pool to schedule this task 334 this.scheduledExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager() 335 .newScheduledThreadPool(this, getEndpoint().getEndpointUri(), 1); 336 // and we should shutdown the thread pool when no longer needed 337 this.shutdownExecutor = true; 338 } 339 340 ObjectHelper.notNull(scheduledExecutorService, "scheduledExecutorService", this); 341 ObjectHelper.notNull(pollStrategy, "pollStrategy", this); 342 343 if (isStartScheduler()) { 344 startScheduler(); 345 } 346 } 347 348 protected void startScheduler() { 349 if (isUseFixedDelay()) { 350 if (LOG.isDebugEnabled()) { 351 LOG.debug("Scheduling poll (fixed delay) with initialDelay: {}, delay: {} ({}) for: {}", 352 new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), getEndpoint()}); 353 } 354 future = scheduledExecutorService.scheduleWithFixedDelay(this, getInitialDelay(), getDelay(), getTimeUnit()); 355 } else { 356 if (LOG.isDebugEnabled()) { 357 LOG.debug("Scheduling poll (fixed rate) with initialDelay: {}, delay: {} ({}) for: {}", 358 new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), getEndpoint()}); 359 } 360 future = scheduledExecutorService.scheduleAtFixedRate(this, getInitialDelay(), getDelay(), getTimeUnit()); 361 } 362 } 363 364 @Override 365 protected void doStop() throws Exception { 366 if (future != null) { 367 LOG.debug("This consumer is stopping, so cancelling scheduled task: " + future); 368 future.cancel(false); 369 } 370 super.doStop(); 371 } 372 373 @Override 374 protected void doShutdown() throws Exception { 375 if (shutdownExecutor && scheduledExecutorService != null) { 376 getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService); 377 scheduledExecutorService = null; 378 future = null; 379 } 380 super.doShutdown(); 381 } 382 383 @Override 384 protected void doSuspend() throws Exception { 385 // dont stop/cancel the future task since we just check in the run method 386 } 387 388 @Override 389 public void onInit() throws Exception { 390 // noop 391 } 392 393 @Override 394 public long beforePoll(long timeout) throws Exception { 395 LOG.trace("Before poll {}", getEndpoint()); 396 // resume or start our self 397 if (!ServiceHelper.resumeService(this)) { 398 ServiceHelper.startService(this); 399 } 400 401 // ensure at least timeout is as long as one poll delay 402 return Math.max(timeout, getDelay()); 403 } 404 405 @Override 406 public void afterPoll() throws Exception { 407 LOG.trace("After poll {}", getEndpoint()); 408 // suspend or stop our self 409 if (!ServiceHelper.suspendService(this)) { 410 ServiceHelper.stopService(this); 411 } 412 } 413 414 }