001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.HashMap; 020import java.util.Map; 021import java.util.concurrent.ScheduledExecutorService; 022import java.util.concurrent.TimeUnit; 023 024import org.apache.camel.Endpoint; 025import org.apache.camel.Exchange; 026import org.apache.camel.FailedToCreateConsumerException; 027import org.apache.camel.LoggingLevel; 028import org.apache.camel.PollingConsumerPollingStrategy; 029import org.apache.camel.Processor; 030import org.apache.camel.Suspendable; 031import org.apache.camel.SuspendableService; 032import org.apache.camel.spi.PollingConsumerPollStrategy; 033import org.apache.camel.spi.ScheduledPollConsumerScheduler; 034import org.apache.camel.util.IntrospectionSupport; 035import org.apache.camel.util.ObjectHelper; 036import org.apache.camel.util.ServiceHelper; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * A useful base class for any consumer which is polling based 042 */ 043public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, Suspendable, PollingConsumerPollingStrategy { 044 private static final Logger LOG = LoggerFactory.getLogger(ScheduledPollConsumer.class); 045 046 private ScheduledPollConsumerScheduler scheduler; 047 private ScheduledExecutorService scheduledExecutorService; 048 049 // if adding more options then align with org.apache.camel.impl.ScheduledPollEndpoint 050 private boolean startScheduler = true; 051 private long initialDelay = 1000; 052 private long delay = 500; 053 private TimeUnit timeUnit = TimeUnit.MILLISECONDS; 054 private boolean useFixedDelay = true; 055 private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy(); 056 private LoggingLevel runLoggingLevel = LoggingLevel.TRACE; 057 private boolean sendEmptyMessageWhenIdle; 058 private boolean greedy; 059 private int backoffMultiplier; 060 private int backoffIdleThreshold; 061 private int backoffErrorThreshold; 062 private Map<String, Object> schedulerProperties; 063 064 // state during running 065 private volatile boolean polling; 066 private volatile int backoffCounter; 067 private volatile long idleCounter; 068 private volatile long errorCounter; 069 070 public ScheduledPollConsumer(Endpoint endpoint, Processor processor) { 071 super(endpoint, processor); 072 } 073 074 public ScheduledPollConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService scheduledExecutorService) { 075 super(endpoint, processor); 076 // we have been given an existing thread pool, so we should not manage its lifecycle 077 // so we should keep shutdownExecutor as false 078 this.scheduledExecutorService = scheduledExecutorService; 079 ObjectHelper.notNull(scheduledExecutorService, "scheduledExecutorService"); 080 } 081 082 /** 083 * Invoked whenever we should be polled 084 */ 085 public void run() { 086 // avoid this thread to throw exceptions because the thread pool wont re-schedule a new thread 087 try { 088 // log starting 089 if (LoggingLevel.ERROR == runLoggingLevel) { 090 LOG.error("Scheduled task started on: {}", this.getEndpoint()); 091 } else if (LoggingLevel.WARN == runLoggingLevel) { 092 LOG.warn("Scheduled task started on: {}", this.getEndpoint()); 093 } else if (LoggingLevel.INFO == runLoggingLevel) { 094 LOG.info("Scheduled task started on: {}", this.getEndpoint()); 095 } else if (LoggingLevel.DEBUG == runLoggingLevel) { 096 LOG.debug("Scheduled task started on: {}", this.getEndpoint()); 097 } else { 098 LOG.trace("Scheduled task started on: {}", this.getEndpoint()); 099 } 100 101 // execute scheduled task 102 doRun(); 103 104 // log completed 105 if (LoggingLevel.ERROR == runLoggingLevel) { 106 LOG.error("Scheduled task completed on: {}", this.getEndpoint()); 107 } else if (LoggingLevel.WARN == runLoggingLevel) { 108 LOG.warn("Scheduled task completed on: {}", this.getEndpoint()); 109 } else if (LoggingLevel.INFO == runLoggingLevel) { 110 LOG.info("Scheduled task completed on: {}", this.getEndpoint()); 111 } else if (LoggingLevel.DEBUG == runLoggingLevel) { 112 LOG.debug("Scheduled task completed on: {}", this.getEndpoint()); 113 } else { 114 LOG.trace("Scheduled task completed on: {}", this.getEndpoint()); 115 } 116 117 } catch (Error e) { 118 // must catch Error, to ensure the task is re-scheduled 119 LOG.error("Error occurred during running scheduled task on: " + this.getEndpoint() + ", due: " + e.getMessage(), e); 120 } 121 } 122 123 private void doRun() { 124 if (isSuspended()) { 125 LOG.trace("Cannot start to poll: {} as its suspended", this.getEndpoint()); 126 return; 127 } 128 129 // should we backoff if its enabled, and either the idle or error counter is > the threshold 130 if (backoffMultiplier > 0 131 // either idle or error threshold could be not in use, so check for that and use MAX_VALUE if not in use 132 && (idleCounter >= (backoffIdleThreshold > 0 ? backoffIdleThreshold : Integer.MAX_VALUE)) 133 || errorCounter >= (backoffErrorThreshold > 0 ? backoffErrorThreshold : Integer.MAX_VALUE)) { 134 if (backoffCounter++ < backoffMultiplier) { 135 // yes we should backoff 136 if (idleCounter > 0) { 137 LOG.debug("doRun() backoff due subsequent {} idles (backoff at {}/{})", new Object[]{idleCounter, backoffCounter, backoffMultiplier}); 138 } else { 139 LOG.debug("doRun() backoff due subsequent {} errors (backoff at {}/{})", new Object[]{errorCounter, backoffCounter, backoffMultiplier}); 140 } 141 return; 142 } else { 143 // we are finished with backoff so reset counters 144 idleCounter = 0; 145 errorCounter = 0; 146 backoffCounter = 0; 147 LOG.trace("doRun() backoff finished, resetting counters."); 148 } 149 } 150 151 int retryCounter = -1; 152 boolean done = false; 153 Throwable cause = null; 154 int polledMessages = 0; 155 156 while (!done) { 157 try { 158 cause = null; 159 // eager assume we are done 160 done = true; 161 if (isPollAllowed()) { 162 163 if (retryCounter == -1) { 164 LOG.trace("Starting to poll: {}", this.getEndpoint()); 165 } else { 166 LOG.debug("Retrying attempt {} to poll: {}", retryCounter, this.getEndpoint()); 167 } 168 169 // mark we are polling which should also include the begin/poll/commit 170 polling = true; 171 try { 172 boolean begin = pollStrategy.begin(this, getEndpoint()); 173 if (begin) { 174 retryCounter++; 175 polledMessages = poll(); 176 LOG.trace("Polled {} messages", polledMessages); 177 178 if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) { 179 // send an "empty" exchange 180 processEmptyMessage(); 181 } 182 183 pollStrategy.commit(this, getEndpoint(), polledMessages); 184 185 if (polledMessages > 0 && isGreedy()) { 186 done = false; 187 retryCounter = -1; 188 LOG.trace("Greedy polling after processing {} messages", polledMessages); 189 } 190 } else { 191 LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy); 192 } 193 } finally { 194 polling = false; 195 } 196 } 197 198 LOG.trace("Finished polling: {}", this.getEndpoint()); 199 } catch (Exception e) { 200 try { 201 boolean retry = pollStrategy.rollback(this, getEndpoint(), retryCounter, e); 202 if (retry) { 203 // do not set cause as we retry 204 done = false; 205 } else { 206 cause = e; 207 done = true; 208 } 209 } catch (Throwable t) { 210 cause = t; 211 done = true; 212 } 213 } catch (Throwable t) { 214 cause = t; 215 done = true; 216 } 217 218 if (cause != null && isRunAllowed()) { 219 // let exception handler deal with the caused exception 220 // but suppress this during shutdown as the logs may get flooded with exceptions during shutdown/forced shutdown 221 try { 222 getExceptionHandler().handleException("Consumer " + this + " failed polling endpoint: " + getEndpoint() 223 + ". Will try again at next poll", cause); 224 } catch (Throwable e) { 225 LOG.warn("Error handling exception. This exception will be ignored.", e); 226 } 227 } 228 } 229 230 if (cause != null) { 231 idleCounter = 0; 232 errorCounter++; 233 } else { 234 idleCounter = polledMessages == 0 ? ++idleCounter : 0; 235 errorCounter = 0; 236 } 237 LOG.trace("doRun() done with idleCounter={}, errorCounter={}", idleCounter, errorCounter); 238 239 // avoid this thread to throw exceptions because the thread pool wont re-schedule a new thread 240 } 241 242 /** 243 * No messages to poll so send an empty message instead. 244 * 245 * @throws Exception is thrown if error processing the empty message. 246 */ 247 protected void processEmptyMessage() throws Exception { 248 Exchange exchange = getEndpoint().createExchange(); 249 log.debug("Sending empty message as there were no messages from polling: {}", this.getEndpoint()); 250 getProcessor().process(exchange); 251 } 252 253 // Properties 254 // ------------------------------------------------------------------------- 255 256 protected boolean isPollAllowed() { 257 return isRunAllowed() && !isSuspended(); 258 } 259 260 /** 261 * Whether polling is currently in progress 262 */ 263 protected boolean isPolling() { 264 return polling; 265 } 266 267 public ScheduledPollConsumerScheduler getScheduler() { 268 return scheduler; 269 } 270 271 public void setScheduler(ScheduledPollConsumerScheduler scheduler) { 272 this.scheduler = scheduler; 273 } 274 275 public Map<String, Object> getSchedulerProperties() { 276 return schedulerProperties; 277 } 278 279 public void setSchedulerProperties(Map<String, Object> schedulerProperties) { 280 this.schedulerProperties = schedulerProperties; 281 } 282 283 public long getInitialDelay() { 284 return initialDelay; 285 } 286 287 public void setInitialDelay(long initialDelay) { 288 this.initialDelay = initialDelay; 289 } 290 291 public long getDelay() { 292 return delay; 293 } 294 295 public void setDelay(long delay) { 296 this.delay = delay; 297 } 298 299 public TimeUnit getTimeUnit() { 300 return timeUnit; 301 } 302 303 public void setTimeUnit(TimeUnit timeUnit) { 304 this.timeUnit = timeUnit; 305 } 306 307 public boolean isUseFixedDelay() { 308 return useFixedDelay; 309 } 310 311 public void setUseFixedDelay(boolean useFixedDelay) { 312 this.useFixedDelay = useFixedDelay; 313 } 314 315 public LoggingLevel getRunLoggingLevel() { 316 return runLoggingLevel; 317 } 318 319 public void setRunLoggingLevel(LoggingLevel runLoggingLevel) { 320 this.runLoggingLevel = runLoggingLevel; 321 } 322 323 public PollingConsumerPollStrategy getPollStrategy() { 324 return pollStrategy; 325 } 326 327 public void setPollStrategy(PollingConsumerPollStrategy pollStrategy) { 328 this.pollStrategy = pollStrategy; 329 } 330 331 public boolean isStartScheduler() { 332 return startScheduler; 333 } 334 335 public void setStartScheduler(boolean startScheduler) { 336 this.startScheduler = startScheduler; 337 } 338 339 public void setSendEmptyMessageWhenIdle(boolean sendEmptyMessageWhenIdle) { 340 this.sendEmptyMessageWhenIdle = sendEmptyMessageWhenIdle; 341 } 342 343 public boolean isSendEmptyMessageWhenIdle() { 344 return sendEmptyMessageWhenIdle; 345 } 346 347 public boolean isGreedy() { 348 return greedy; 349 } 350 351 public void setGreedy(boolean greedy) { 352 this.greedy = greedy; 353 } 354 355 public int getBackoffCounter() { 356 return backoffCounter; 357 } 358 359 public int getBackoffMultiplier() { 360 return backoffMultiplier; 361 } 362 363 public void setBackoffMultiplier(int backoffMultiplier) { 364 this.backoffMultiplier = backoffMultiplier; 365 } 366 367 public int getBackoffIdleThreshold() { 368 return backoffIdleThreshold; 369 } 370 371 public void setBackoffIdleThreshold(int backoffIdleThreshold) { 372 this.backoffIdleThreshold = backoffIdleThreshold; 373 } 374 375 public int getBackoffErrorThreshold() { 376 return backoffErrorThreshold; 377 } 378 379 public void setBackoffErrorThreshold(int backoffErrorThreshold) { 380 this.backoffErrorThreshold = backoffErrorThreshold; 381 } 382 383 public ScheduledExecutorService getScheduledExecutorService() { 384 return scheduledExecutorService; 385 } 386 387 public boolean isSchedulerStarted() { 388 return scheduler.isSchedulerStarted(); 389 } 390 391 public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { 392 this.scheduledExecutorService = scheduledExecutorService; 393 } 394 395 // Implementation methods 396 // ------------------------------------------------------------------------- 397 398 /** 399 * The polling method which is invoked periodically to poll this consumer 400 * 401 * @return number of messages polled, will be <tt>0</tt> if no message was polled at all. 402 * @throws Exception can be thrown if an exception occurred during polling 403 */ 404 protected abstract int poll() throws Exception; 405 406 @Override 407 protected void doStart() throws Exception { 408 super.doStart(); 409 410 // validate that if backoff multiplier is in use, the threshold values is set correctly 411 if (backoffMultiplier > 0) { 412 if (backoffIdleThreshold <= 0 && backoffErrorThreshold <= 0) { 413 throw new IllegalArgumentException("backoffIdleThreshold and/or backoffErrorThreshold must be configured to a positive value when using backoffMultiplier"); 414 } 415 LOG.debug("Using backoff[multiplier={}, idleThreshold={}, errorThreshold={}] on {}", new Object[]{backoffMultiplier, backoffIdleThreshold, backoffErrorThreshold, getEndpoint()}); 416 } 417 418 if (scheduler == null) { 419 scheduler = new DefaultScheduledPollConsumerScheduler(scheduledExecutorService); 420 } 421 scheduler.setCamelContext(getEndpoint().getCamelContext()); 422 scheduler.onInit(this); 423 scheduler.scheduleTask(this); 424 425 // configure scheduler with options from this consumer 426 Map<String, Object> properties = new HashMap<String, Object>(); 427 IntrospectionSupport.getProperties(this, properties, null); 428 IntrospectionSupport.setProperties(getEndpoint().getCamelContext().getTypeConverter(), scheduler, properties); 429 if (schedulerProperties != null && !schedulerProperties.isEmpty()) { 430 // need to use a copy in case the consumer is restarted so we keep the properties 431 Map<String, Object> copy = new HashMap<String, Object>(schedulerProperties); 432 IntrospectionSupport.setProperties(getEndpoint().getCamelContext().getTypeConverter(), scheduler, copy); 433 if (copy.size() > 0) { 434 throw new FailedToCreateConsumerException(getEndpoint(), "There are " + copy.size() 435 + " scheduler parameters that couldn't be set on the endpoint." 436 + " Check the uri if the parameters are spelt correctly and that they are properties of the endpoint." 437 + " Unknown parameters=[" + copy + "]"); 438 } 439 } 440 441 ObjectHelper.notNull(scheduler, "scheduler", this); 442 ObjectHelper.notNull(pollStrategy, "pollStrategy", this); 443 444 ServiceHelper.startService(scheduler); 445 446 if (isStartScheduler()) { 447 startScheduler(); 448 } 449 } 450 451 /** 452 * Starts the scheduler. 453 * <p/> 454 * If the scheduler is already started, then this is a noop method call. 455 */ 456 public void startScheduler() { 457 scheduler.startScheduler(); 458 } 459 460 @Override 461 protected void doStop() throws Exception { 462 if (scheduler != null) { 463 scheduler.unscheduleTask(); 464 ServiceHelper.stopAndShutdownServices(scheduler); 465 } 466 467 // clear counters 468 backoffCounter = 0; 469 idleCounter = 0; 470 errorCounter = 0; 471 472 super.doStop(); 473 } 474 475 @Override 476 protected void doShutdown() throws Exception { 477 ServiceHelper.stopAndShutdownServices(scheduler); 478 super.doShutdown(); 479 } 480 481 @Override 482 protected void doSuspend() throws Exception { 483 // dont stop/cancel the future task since we just check in the run method 484 } 485 486 @Override 487 public void onInit() throws Exception { 488 // make sure the scheduler is starter 489 startScheduler = true; 490 } 491 492 @Override 493 public long beforePoll(long timeout) throws Exception { 494 LOG.trace("Before poll {}", getEndpoint()); 495 // resume or start our self 496 if (!ServiceHelper.resumeService(this)) { 497 ServiceHelper.startService(this); 498 } 499 500 // ensure at least timeout is as long as one poll delay 501 return Math.max(timeout, getDelay()); 502 } 503 504 @Override 505 public void afterPoll() throws Exception { 506 LOG.trace("After poll {}", getEndpoint()); 507 // suspend or stop our self 508 if (!ServiceHelper.suspendService(this)) { 509 ServiceHelper.stopService(this); 510 } 511 } 512 513}