001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.HashMap; 020import java.util.Map; 021import java.util.concurrent.ScheduledExecutorService; 022import java.util.concurrent.TimeUnit; 023 024import org.apache.camel.CamelContext; 025import org.apache.camel.Component; 026import org.apache.camel.LoggingLevel; 027import org.apache.camel.PollingConsumer; 028import org.apache.camel.ResolveEndpointFailedException; 029import org.apache.camel.spi.PollingConsumerPollStrategy; 030import org.apache.camel.spi.ScheduledPollConsumerScheduler; 031import org.apache.camel.spi.UriParam; 032import org.apache.camel.util.CamelContextHelper; 033import org.apache.camel.util.EndpointHelper; 034import org.apache.camel.util.IntrospectionSupport; 035 036/** 037 * A base class for {@link org.apache.camel.Endpoint} which creates a {@link ScheduledPollConsumer} 038 * 039 * @version 040 */ 041public abstract class ScheduledPollEndpoint extends DefaultEndpoint { 042 043 private static final String SPRING_SCHEDULER = "org.apache.camel.spring.pollingconsumer.SpringScheduledPollConsumerScheduler"; 044 private static final String QUARTZ_2_SCHEDULER = "org.apache.camel.pollconsumer.quartz2.QuartzScheduledPollConsumerScheduler"; 045 046 // if adding more options then align with org.apache.camel.impl.ScheduledPollConsumer 047 @UriParam(optionalPrefix = "consumer.", defaultValue = "true", label = "consumer,scheduler", 048 description = "Whether the scheduler should be auto started.") 049 private boolean startScheduler = true; 050 @UriParam(optionalPrefix = "consumer.", defaultValue = "1000", label = "consumer,scheduler", 051 description = "Milliseconds before the first poll starts.") 052 private long initialDelay = 1000; 053 @UriParam(optionalPrefix = "consumer.", defaultValue = "500", label = "consumer,scheduler", 054 description = "Milliseconds before the next poll.") 055 private long delay = 500; 056 @UriParam(optionalPrefix = "consumer.", defaultValue = "MILLISECONDS", label = "consumer,scheduler", 057 description = "Time unit for initialDelay and delay options.") 058 private TimeUnit timeUnit = TimeUnit.MILLISECONDS; 059 @UriParam(optionalPrefix = "consumer.", defaultValue = "true", label = "consumer,scheduler", 060 description = "Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details.") 061 private boolean useFixedDelay = true; 062 @UriParam(optionalPrefix = "consumer.", label = "consumer,advanced", 063 description = "A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation" 064 + " to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel.") 065 private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy(); 066 @UriParam(optionalPrefix = "consumer.", defaultValue = "TRACE", label = "consumer,scheduler", 067 description = "The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that.") 068 private LoggingLevel runLoggingLevel = LoggingLevel.TRACE; 069 @UriParam(optionalPrefix = "consumer.", label = "consumer", 070 description = "If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead.") 071 private boolean sendEmptyMessageWhenIdle; 072 @UriParam(optionalPrefix = "consumer.", label = "consumer,scheduler", 073 description = "If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages.") 074 private boolean greedy; 075 @UriParam(optionalPrefix = "consumer.", enums = "none,spring,quartz2", 076 defaultValue = "none", label = "consumer,scheduler", description = "To use a cron scheduler from either camel-spring or camel-quartz2 component") 077 private ScheduledPollConsumerScheduler scheduler; 078 private String schedulerName = "none"; // used when configuring scheduler using a string value 079 @UriParam(prefix = "scheduler.", multiValue = true, label = "consumer,scheduler", 080 description = "To configure additional properties when using a custom scheduler or any of the Quartz2, Spring based scheduler.") 081 private Map<String, Object> schedulerProperties; 082 @UriParam(optionalPrefix = "consumer.", label = "consumer,scheduler", 083 description = "Allows for configuring a custom/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool.") 084 private ScheduledExecutorService scheduledExecutorService; 085 @UriParam(optionalPrefix = "consumer.", label = "consumer,scheduler", 086 description = "To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row." 087 + " The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again." 088 + " When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured.") 089 private int backoffMultiplier; 090 @UriParam(optionalPrefix = "consumer.", label = "consumer,scheduler", 091 description = "The number of subsequent idle polls that should happen before the backoffMultipler should kick-in.") 092 private int backoffIdleThreshold; 093 @UriParam(optionalPrefix = "consumer.", label = "consumer,scheduler", 094 description = "The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in.") 095 private int backoffErrorThreshold; 096 097 protected ScheduledPollEndpoint(String endpointUri, Component component) { 098 super(endpointUri, component); 099 } 100 101 @Deprecated 102 protected ScheduledPollEndpoint(String endpointUri, CamelContext context) { 103 super(endpointUri, context); 104 } 105 106 @Deprecated 107 protected ScheduledPollEndpoint(String endpointUri) { 108 super(endpointUri); 109 } 110 111 protected ScheduledPollEndpoint() { 112 } 113 114 public void configureProperties(Map<String, Object> options) { 115 super.configureProperties(options); 116 configureScheduledPollConsumerProperties(options, getConsumerProperties()); 117 } 118 119 protected void configureScheduledPollConsumerProperties(Map<String, Object> options, Map<String, Object> consumerProperties) { 120 // special for scheduled poll consumers as we want to allow end users to configure its options 121 // from the URI parameters without the consumer. prefix 122 Map<String, Object> schedulerProperties = IntrospectionSupport.extractProperties(options, "scheduler."); 123 if (schedulerProperties != null && !schedulerProperties.isEmpty()) { 124 setSchedulerProperties(schedulerProperties); 125 } 126 127 if (scheduler == null && schedulerName != null) { 128 if ("none".equals(schedulerName)) { 129 // no cron scheduler in use 130 scheduler = null; 131 } else if ("spring".equals(schedulerName)) { 132 // special for scheduler if its "spring" or "quartz2" 133 try { 134 Class<? extends ScheduledPollConsumerScheduler> clazz = getCamelContext().getClassResolver().resolveMandatoryClass(SPRING_SCHEDULER, ScheduledPollConsumerScheduler.class); 135 setScheduler(getCamelContext().getInjector().newInstance(clazz)); 136 } catch (ClassNotFoundException e) { 137 throw new IllegalArgumentException("Cannot load " + SPRING_SCHEDULER + " from classpath. Make sure camel-spring.jar is on the classpath.", e); 138 } 139 } else if ("quartz2".equals(schedulerName)) { 140 // special for scheduler if its "spring" or "quartz2" 141 try { 142 Class<? extends ScheduledPollConsumerScheduler> clazz = getCamelContext().getClassResolver().resolveMandatoryClass(QUARTZ_2_SCHEDULER, ScheduledPollConsumerScheduler.class); 143 setScheduler(getCamelContext().getInjector().newInstance(clazz)); 144 } catch (ClassNotFoundException e) { 145 throw new IllegalArgumentException("Cannot load " + QUARTZ_2_SCHEDULER + " from classpath. Make sure camel-quarz2.jar is on the classpath.", e); 146 } 147 } else { 148 // must refer to a custom scheduler by the given name 149 setScheduler(CamelContextHelper.mandatoryLookup(getCamelContext(), schedulerName, ScheduledPollConsumerScheduler.class)); 150 } 151 } 152 } 153 154 @Override 155 protected void configurePollingConsumer(PollingConsumer consumer) throws Exception { 156 Map<String, Object> copy = new HashMap<String, Object>(getConsumerProperties()); 157 Map<String, Object> throwaway = new HashMap<String, Object>(); 158 159 // filter out unwanted options which is intended for the scheduled poll consumer 160 // as these options are not supported on the polling consumer 161 configureScheduledPollConsumerProperties(copy, throwaway); 162 163 // set reference properties first as they use # syntax that fools the regular properties setter 164 EndpointHelper.setReferenceProperties(getCamelContext(), consumer, copy); 165 EndpointHelper.setProperties(getCamelContext(), consumer, copy); 166 167 if (!isLenientProperties() && copy.size() > 0) { 168 throw new ResolveEndpointFailedException(this.getEndpointUri(), "There are " + copy.size() 169 + " parameters that couldn't be set on the endpoint polling consumer." 170 + " Check the uri if the parameters are spelt correctly and that they are properties of the endpoint." 171 + " Unknown consumer parameters=[" + copy + "]"); 172 } 173 } 174 175 protected void initConsumerProperties() { 176 // must setup consumer properties before we are ready to start 177 Map<String, Object> options = getConsumerProperties(); 178 if (!options.containsKey("startScheduler")) { 179 options.put("startScheduler", isStartScheduler()); 180 } 181 if (!options.containsKey("initialDelay")) { 182 options.put("initialDelay", getInitialDelay()); 183 } 184 if (!options.containsKey("delay")) { 185 options.put("delay", getDelay()); 186 } 187 if (!options.containsKey("timeUnit")) { 188 options.put("timeUnit", getTimeUnit()); 189 } 190 if (!options.containsKey("useFixedDelay")) { 191 options.put("useFixedDelay", isUseFixedDelay()); 192 } 193 if (!options.containsKey("pollStrategy")) { 194 options.put("pollStrategy", getPollStrategy()); 195 } 196 if (!options.containsKey("runLoggingLevel")) { 197 options.put("runLoggingLevel", getRunLoggingLevel()); 198 } 199 if (!options.containsKey("sendEmptyMessageWhenIdle")) { 200 options.put("sendEmptyMessageWhenIdle", isSendEmptyMessageWhenIdle()); 201 } 202 if (!options.containsKey("greedy")) { 203 options.put("greedy", isGreedy()); 204 } 205 if (!options.containsKey("scheduler")) { 206 options.put("scheduler", getScheduler()); 207 } 208 if (!options.containsKey("schedulerProperties")) { 209 options.put("schedulerProperties", getSchedulerProperties()); 210 } 211 if (!options.containsKey("scheduledExecutorService")) { 212 options.put("scheduledExecutorService", getScheduledExecutorService()); 213 } 214 if (!options.containsKey("backoffMultiplier")) { 215 options.put("backoffMultiplier", getBackoffMultiplier()); 216 } 217 if (!options.containsKey("backoffIdleThreshold")) { 218 options.put("backoffIdleThreshold", getBackoffIdleThreshold()); 219 } 220 if (!options.containsKey("backoffErrorThreshold")) { 221 options.put("backoffErrorThreshold", getBackoffErrorThreshold()); 222 } 223 } 224 225 @Override 226 protected void doStart() throws Exception { 227 initConsumerProperties(); 228 super.doStart(); 229 } 230 231 @Override 232 protected void doStop() throws Exception { 233 super.doStop(); 234 // noop 235 } 236 237 public boolean isStartScheduler() { 238 return startScheduler; 239 } 240 241 /** 242 * Whether the scheduler should be auto started. 243 */ 244 public void setStartScheduler(boolean startScheduler) { 245 this.startScheduler = startScheduler; 246 } 247 248 public long getInitialDelay() { 249 return initialDelay; 250 } 251 252 /** 253 * Milliseconds before the first poll starts. 254 */ 255 public void setInitialDelay(long initialDelay) { 256 this.initialDelay = initialDelay; 257 } 258 259 public long getDelay() { 260 return delay; 261 } 262 263 /** 264 * Milliseconds before the next poll. 265 */ 266 public void setDelay(long delay) { 267 this.delay = delay; 268 } 269 270 public TimeUnit getTimeUnit() { 271 return timeUnit; 272 } 273 274 /** 275 * Time unit for initialDelay and delay options. 276 */ 277 public void setTimeUnit(TimeUnit timeUnit) { 278 this.timeUnit = timeUnit; 279 } 280 281 public boolean isUseFixedDelay() { 282 return useFixedDelay; 283 } 284 285 /** 286 * Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details. 287 */ 288 public void setUseFixedDelay(boolean useFixedDelay) { 289 this.useFixedDelay = useFixedDelay; 290 } 291 292 public PollingConsumerPollStrategy getPollStrategy() { 293 return pollStrategy; 294 } 295 296 /** 297 * A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation 298 * to control error handling usually occurred during the poll operation before an Exchange have been created 299 * and being routed in Camel. In other words the error occurred while the polling was gathering information, 300 * for instance access to a file network failed so Camel cannot access it to scan for files. 301 * The default implementation will log the caused exception at WARN level and ignore it. 302 */ 303 public void setPollStrategy(PollingConsumerPollStrategy pollStrategy) { 304 this.pollStrategy = pollStrategy; 305 // we are allowed to change poll strategy 306 } 307 308 public LoggingLevel getRunLoggingLevel() { 309 return runLoggingLevel; 310 } 311 312 /** 313 * The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that. 314 */ 315 public void setRunLoggingLevel(LoggingLevel runLoggingLevel) { 316 this.runLoggingLevel = runLoggingLevel; 317 } 318 319 public boolean isSendEmptyMessageWhenIdle() { 320 return sendEmptyMessageWhenIdle; 321 } 322 323 /** 324 * If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead. 325 */ 326 public void setSendEmptyMessageWhenIdle(boolean sendEmptyMessageWhenIdle) { 327 this.sendEmptyMessageWhenIdle = sendEmptyMessageWhenIdle; 328 } 329 330 public boolean isGreedy() { 331 return greedy; 332 } 333 334 /** 335 * If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages. 336 */ 337 public void setGreedy(boolean greedy) { 338 this.greedy = greedy; 339 } 340 341 public ScheduledPollConsumerScheduler getScheduler() { 342 return scheduler; 343 } 344 345 /** 346 * Allow to plugin a custom org.apache.camel.spi.ScheduledPollConsumerScheduler to use as the scheduler for 347 * firing when the polling consumer runs. The default implementation uses the ScheduledExecutorService and 348 * there is a Quartz2, and Spring based which supports CRON expressions. 349 * 350 * Notice: If using a custom scheduler then the options for initialDelay, useFixedDelay, timeUnit, 351 * and scheduledExecutorService may not be in use. Use the text quartz2 to refer to use the Quartz2 scheduler; 352 * and use the text spring to use the Spring based; and use the text #myScheduler to refer to a custom scheduler 353 * by its id in the Registry. See Quartz2 page for an example. 354 */ 355 public void setScheduler(ScheduledPollConsumerScheduler scheduler) { 356 this.scheduler = scheduler; 357 } 358 359 /** 360 * Allow to plugin a custom org.apache.camel.spi.ScheduledPollConsumerScheduler to use as the scheduler for 361 * firing when the polling consumer runs. This option is used for referring to one of the built-in schedulers 362 * either <tt>spring</tt>, or <tt>quartz2</tt>. Using <tt>none</tt> refers to no scheduler to be used. 363 */ 364 public void setScheduler(String schedulerName) { 365 this.schedulerName = schedulerName; 366 } 367 368 public Map<String, Object> getSchedulerProperties() { 369 return schedulerProperties; 370 } 371 372 /** 373 * To configure additional properties when using a custom scheduler or any of the Quartz2, Spring based scheduler. 374 */ 375 public void setSchedulerProperties(Map<String, Object> schedulerProperties) { 376 this.schedulerProperties = schedulerProperties; 377 } 378 379 public ScheduledExecutorService getScheduledExecutorService() { 380 return scheduledExecutorService; 381 } 382 383 /** 384 * Allows for configuring a custom/shared thread pool to use for the consumer. 385 * By default each consumer has its own single threaded thread pool. 386 * This option allows you to share a thread pool among multiple consumers. 387 */ 388 public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) { 389 this.scheduledExecutorService = scheduledExecutorService; 390 } 391 392 public int getBackoffMultiplier() { 393 return backoffMultiplier; 394 } 395 396 /** 397 * To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row. 398 * The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. 399 * When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured. 400 */ 401 public void setBackoffMultiplier(int backoffMultiplier) { 402 this.backoffMultiplier = backoffMultiplier; 403 } 404 405 public int getBackoffIdleThreshold() { 406 return backoffIdleThreshold; 407 } 408 409 /** 410 * The number of subsequent idle polls that should happen before the backoffMultipler should kick-in. 411 */ 412 public void setBackoffIdleThreshold(int backoffIdleThreshold) { 413 this.backoffIdleThreshold = backoffIdleThreshold; 414 } 415 416 public int getBackoffErrorThreshold() { 417 return backoffErrorThreshold; 418 } 419 420 /** 421 * The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in. 422 */ 423 public void setBackoffErrorThreshold(int backoffErrorThreshold) { 424 this.backoffErrorThreshold = backoffErrorThreshold; 425 } 426 427}