001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.HashMap;
020import java.util.Map;
021import java.util.concurrent.ScheduledExecutorService;
022import java.util.concurrent.TimeUnit;
023
024import org.apache.camel.CamelContext;
025import org.apache.camel.Component;
026import org.apache.camel.LoggingLevel;
027import org.apache.camel.PollingConsumer;
028import org.apache.camel.ResolveEndpointFailedException;
029import org.apache.camel.spi.PollingConsumerPollStrategy;
030import org.apache.camel.spi.ScheduledPollConsumerScheduler;
031import org.apache.camel.spi.UriParam;
032import org.apache.camel.util.CamelContextHelper;
033import org.apache.camel.util.EndpointHelper;
034import org.apache.camel.util.IntrospectionSupport;
035
036/**
037 * A base class for {@link org.apache.camel.Endpoint} which creates a {@link ScheduledPollConsumer}
038 *
039 * @version
040 */
041public abstract class ScheduledPollEndpoint extends DefaultEndpoint {
042
043    private static final String SPRING_SCHEDULER = "org.apache.camel.spring.pollingconsumer.SpringScheduledPollConsumerScheduler";
044    private static final String QUARTZ_2_SCHEDULER = "org.apache.camel.pollconsumer.quartz2.QuartzScheduledPollConsumerScheduler";
045
046    // if adding more options then align with org.apache.camel.impl.ScheduledPollConsumer
047    @UriParam(optionalPrefix = "consumer.", defaultValue = "true", label = "consumer,scheduler",
048            description = "Whether the scheduler should be auto started.")
049    private boolean startScheduler = true;
050    @UriParam(optionalPrefix = "consumer.", defaultValue = "1000", label = "consumer,scheduler",
051            description = "Milliseconds before the first poll starts.")
052    private long initialDelay = 1000;
053    @UriParam(optionalPrefix = "consumer.", defaultValue = "500", label = "consumer,scheduler",
054            description = "Milliseconds before the next poll.")
055    private long delay = 500;
056    @UriParam(optionalPrefix = "consumer.", defaultValue = "MILLISECONDS", label = "consumer,scheduler",
057            description = "Time unit for initialDelay and delay options.")
058    private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
059    @UriParam(optionalPrefix = "consumer.", defaultValue = "true", label = "consumer,scheduler",
060            description = "Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details.")
061    private boolean useFixedDelay = true;
062    @UriParam(optionalPrefix = "consumer.", label = "consumer,advanced",
063            description = "A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation"
064                    + " to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel.")
065    private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy();
066    @UriParam(optionalPrefix = "consumer.", defaultValue = "TRACE", label = "consumer,scheduler",
067            description = "The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that.")
068    private LoggingLevel runLoggingLevel = LoggingLevel.TRACE;
069    @UriParam(optionalPrefix = "consumer.", label = "consumer",
070            description = "If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead.")
071    private boolean sendEmptyMessageWhenIdle;
072    @UriParam(optionalPrefix = "consumer.", label = "consumer,scheduler",
073            description = "If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages.")
074    private boolean greedy;
075    @UriParam(optionalPrefix = "consumer.", enums = "none,spring,quartz2",
076            defaultValue = "none", label = "consumer,scheduler", description = "To use a cron scheduler from either camel-spring or camel-quartz2 component")
077    private ScheduledPollConsumerScheduler scheduler;
078    private String schedulerName = "none"; // used when configuring scheduler using a string value
079    @UriParam(prefix = "scheduler.", multiValue = true, label = "consumer,scheduler",
080            description = "To configure additional properties when using a custom scheduler or any of the Quartz2, Spring based scheduler.")
081    private Map<String, Object> schedulerProperties;
082    @UriParam(optionalPrefix = "consumer.", label = "consumer,scheduler",
083            description = "Allows for configuring a custom/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool.")
084    private ScheduledExecutorService scheduledExecutorService;
085    @UriParam(optionalPrefix = "consumer.", label = "consumer,scheduler",
086            description = "To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row."
087                    + " The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again."
088                    + " When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured.")
089    private int backoffMultiplier;
090    @UriParam(optionalPrefix = "consumer.", label = "consumer,scheduler",
091            description = "The number of subsequent idle polls that should happen before the backoffMultipler should kick-in.")
092    private int backoffIdleThreshold;
093    @UriParam(optionalPrefix = "consumer.", label = "consumer,scheduler",
094            description = "The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in.")
095    private int backoffErrorThreshold;
096
097    protected ScheduledPollEndpoint(String endpointUri, Component component) {
098        super(endpointUri, component);
099    }
100
101    @Deprecated
102    protected ScheduledPollEndpoint(String endpointUri, CamelContext context) {
103        super(endpointUri, context);
104    }
105
106    @Deprecated
107    protected ScheduledPollEndpoint(String endpointUri) {
108        super(endpointUri);
109    }
110
111    protected ScheduledPollEndpoint() {
112    }
113
114    public void configureProperties(Map<String, Object> options) {
115        super.configureProperties(options);
116        configureScheduledPollConsumerProperties(options, getConsumerProperties());
117    }
118
119    protected void configureScheduledPollConsumerProperties(Map<String, Object> options, Map<String, Object> consumerProperties) {
120        // special for scheduled poll consumers as we want to allow end users to configure its options
121        // from the URI parameters without the consumer. prefix
122        Map<String, Object> schedulerProperties = IntrospectionSupport.extractProperties(options, "scheduler.");
123        if (schedulerProperties != null && !schedulerProperties.isEmpty()) {
124            setSchedulerProperties(schedulerProperties);
125        }
126
127        if (scheduler == null && schedulerName != null) {
128            if ("none".equals(schedulerName)) {
129                // no cron scheduler in use
130                scheduler = null;
131            } else if ("spring".equals(schedulerName)) {
132                // special for scheduler if its "spring" or "quartz2"
133                try {
134                    Class<? extends ScheduledPollConsumerScheduler> clazz = getCamelContext().getClassResolver().resolveMandatoryClass(SPRING_SCHEDULER, ScheduledPollConsumerScheduler.class);
135                    setScheduler(getCamelContext().getInjector().newInstance(clazz));
136                } catch (ClassNotFoundException e) {
137                    throw new IllegalArgumentException("Cannot load " + SPRING_SCHEDULER + " from classpath. Make sure camel-spring.jar is on the classpath.", e);
138                }
139            } else if ("quartz2".equals(schedulerName)) {
140                // special for scheduler if its "spring" or "quartz2"
141                try {
142                    Class<? extends ScheduledPollConsumerScheduler> clazz = getCamelContext().getClassResolver().resolveMandatoryClass(QUARTZ_2_SCHEDULER, ScheduledPollConsumerScheduler.class);
143                    setScheduler(getCamelContext().getInjector().newInstance(clazz));
144                } catch (ClassNotFoundException e) {
145                    throw new IllegalArgumentException("Cannot load " + QUARTZ_2_SCHEDULER + " from classpath. Make sure camel-quarz2.jar is on the classpath.", e);
146                }
147            } else {
148                // must refer to a custom scheduler by the given name
149                setScheduler(CamelContextHelper.mandatoryLookup(getCamelContext(), schedulerName, ScheduledPollConsumerScheduler.class));
150            }
151        }
152    }
153
154    @Override
155    protected void configurePollingConsumer(PollingConsumer consumer) throws Exception {
156        Map<String, Object> copy = new HashMap<String, Object>(getConsumerProperties());
157        Map<String, Object> throwaway = new HashMap<String, Object>();
158
159        // filter out unwanted options which is intended for the scheduled poll consumer
160        // as these options are not supported on the polling consumer
161        configureScheduledPollConsumerProperties(copy, throwaway);
162
163        // set reference properties first as they use # syntax that fools the regular properties setter
164        EndpointHelper.setReferenceProperties(getCamelContext(), consumer, copy);
165        EndpointHelper.setProperties(getCamelContext(), consumer, copy);
166
167        if (!isLenientProperties() && copy.size() > 0) {
168            throw new ResolveEndpointFailedException(this.getEndpointUri(), "There are " + copy.size()
169                    + " parameters that couldn't be set on the endpoint polling consumer."
170                    + " Check the uri if the parameters are spelt correctly and that they are properties of the endpoint."
171                    + " Unknown consumer parameters=[" + copy + "]");
172        }
173    }
174
175    protected void initConsumerProperties() {
176        // must setup consumer properties before we are ready to start
177        Map<String, Object> options = getConsumerProperties();
178        if (!options.containsKey("startScheduler")) {
179            options.put("startScheduler", isStartScheduler());
180        }
181        if (!options.containsKey("initialDelay")) {
182            options.put("initialDelay", getInitialDelay());
183        }
184        if (!options.containsKey("delay")) {
185            options.put("delay", getDelay());
186        }
187        if (!options.containsKey("timeUnit")) {
188            options.put("timeUnit", getTimeUnit());
189        }
190        if (!options.containsKey("useFixedDelay")) {
191            options.put("useFixedDelay", isUseFixedDelay());
192        }
193        if (!options.containsKey("pollStrategy")) {
194            options.put("pollStrategy", getPollStrategy());
195        }
196        if (!options.containsKey("runLoggingLevel")) {
197            options.put("runLoggingLevel", getRunLoggingLevel());
198        }
199        if (!options.containsKey("sendEmptyMessageWhenIdle")) {
200            options.put("sendEmptyMessageWhenIdle", isSendEmptyMessageWhenIdle());
201        }
202        if (!options.containsKey("greedy")) {
203            options.put("greedy", isGreedy());
204        }
205        if (!options.containsKey("scheduler")) {
206            options.put("scheduler", getScheduler());
207        }
208        if (!options.containsKey("schedulerProperties")) {
209            options.put("schedulerProperties", getSchedulerProperties());
210        }
211        if (!options.containsKey("scheduledExecutorService")) {
212            options.put("scheduledExecutorService", getScheduledExecutorService());
213        }
214        if (!options.containsKey("backoffMultiplier")) {
215            options.put("backoffMultiplier", getBackoffMultiplier());
216        }
217        if (!options.containsKey("backoffIdleThreshold")) {
218            options.put("backoffIdleThreshold", getBackoffIdleThreshold());
219        }
220        if (!options.containsKey("backoffErrorThreshold")) {
221            options.put("backoffErrorThreshold", getBackoffErrorThreshold());
222        }
223    }
224
225    @Override
226    protected void doStart() throws Exception {
227        initConsumerProperties();
228        super.doStart();
229    }
230
231    @Override
232    protected void doStop() throws Exception {
233        super.doStop();
234        // noop
235    }
236
237    public boolean isStartScheduler() {
238        return startScheduler;
239    }
240
241    /**
242     * Whether the scheduler should be auto started.
243     */
244    public void setStartScheduler(boolean startScheduler) {
245        this.startScheduler = startScheduler;
246    }
247
248    public long getInitialDelay() {
249        return initialDelay;
250    }
251
252    /**
253     * Milliseconds before the first poll starts.
254     */
255    public void setInitialDelay(long initialDelay) {
256        this.initialDelay = initialDelay;
257    }
258
259    public long getDelay() {
260        return delay;
261    }
262
263    /**
264     * Milliseconds before the next poll.
265     */
266    public void setDelay(long delay) {
267        this.delay = delay;
268    }
269
270    public TimeUnit getTimeUnit() {
271        return timeUnit;
272    }
273
274    /**
275     * Time unit for initialDelay and delay options.
276     */
277    public void setTimeUnit(TimeUnit timeUnit) {
278        this.timeUnit = timeUnit;
279    }
280
281    public boolean isUseFixedDelay() {
282        return useFixedDelay;
283    }
284
285    /**
286     * Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details.
287     */
288    public void setUseFixedDelay(boolean useFixedDelay) {
289        this.useFixedDelay = useFixedDelay;
290    }
291
292    public PollingConsumerPollStrategy getPollStrategy() {
293        return pollStrategy;
294    }
295
296    /**
297     * A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation
298     * to control error handling usually occurred during the poll operation before an Exchange have been created
299     * and being routed in Camel. In other words the error occurred while the polling was gathering information,
300     * for instance access to a file network failed so Camel cannot access it to scan for files.
301     * The default implementation will log the caused exception at WARN level and ignore it.
302     */
303    public void setPollStrategy(PollingConsumerPollStrategy pollStrategy) {
304        this.pollStrategy = pollStrategy;
305        // we are allowed to change poll strategy
306    }
307
308    public LoggingLevel getRunLoggingLevel() {
309        return runLoggingLevel;
310    }
311
312    /**
313     * The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that.
314     */
315    public void setRunLoggingLevel(LoggingLevel runLoggingLevel) {
316        this.runLoggingLevel = runLoggingLevel;
317    }
318
319    public boolean isSendEmptyMessageWhenIdle() {
320        return sendEmptyMessageWhenIdle;
321    }
322
323    /**
324     * If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead.
325     */
326    public void setSendEmptyMessageWhenIdle(boolean sendEmptyMessageWhenIdle) {
327        this.sendEmptyMessageWhenIdle = sendEmptyMessageWhenIdle;
328    }
329
330    public boolean isGreedy() {
331        return greedy;
332    }
333
334    /**
335     * If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages.
336     */
337    public void setGreedy(boolean greedy) {
338        this.greedy = greedy;
339    }
340
341    public ScheduledPollConsumerScheduler getScheduler() {
342        return scheduler;
343    }
344
345    /**
346     * Allow to plugin a custom org.apache.camel.spi.ScheduledPollConsumerScheduler to use as the scheduler for
347     * firing when the polling consumer runs. The default implementation uses the ScheduledExecutorService and
348     * there is a Quartz2, and Spring based which supports CRON expressions.
349     *
350     * Notice: If using a custom scheduler then the options for initialDelay, useFixedDelay, timeUnit,
351     * and scheduledExecutorService may not be in use. Use the text quartz2 to refer to use the Quartz2 scheduler;
352     * and use the text spring to use the Spring based; and use the text #myScheduler to refer to a custom scheduler
353     * by its id in the Registry. See Quartz2 page for an example.
354     */
355    public void setScheduler(ScheduledPollConsumerScheduler scheduler) {
356        this.scheduler = scheduler;
357    }
358
359    /**
360     * Allow to plugin a custom org.apache.camel.spi.ScheduledPollConsumerScheduler to use as the scheduler for
361     * firing when the polling consumer runs. This option is used for referring to one of the built-in schedulers
362     * either <tt>spring</tt>, or <tt>quartz2</tt>. Using <tt>none</tt> refers to no scheduler to be used.
363     */
364    public void setScheduler(String schedulerName) {
365        this.schedulerName = schedulerName;
366    }
367
368    public Map<String, Object> getSchedulerProperties() {
369        return schedulerProperties;
370    }
371
372    /**
373     * To configure additional properties when using a custom scheduler or any of the Quartz2, Spring based scheduler.
374     */
375    public void setSchedulerProperties(Map<String, Object> schedulerProperties) {
376        this.schedulerProperties = schedulerProperties;
377    }
378
379    public ScheduledExecutorService getScheduledExecutorService() {
380        return scheduledExecutorService;
381    }
382
383    /**
384     * Allows for configuring a custom/shared thread pool to use for the consumer.
385     * By default each consumer has its own single threaded thread pool.
386     * This option allows you to share a thread pool among multiple consumers.
387     */
388    public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
389        this.scheduledExecutorService = scheduledExecutorService;
390    }
391
392    public int getBackoffMultiplier() {
393        return backoffMultiplier;
394    }
395
396    /**
397     * To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row.
398     * The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again.
399     * When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured.
400     */
401    public void setBackoffMultiplier(int backoffMultiplier) {
402        this.backoffMultiplier = backoffMultiplier;
403    }
404
405    public int getBackoffIdleThreshold() {
406        return backoffIdleThreshold;
407    }
408
409    /**
410     * The number of subsequent idle polls that should happen before the backoffMultipler should kick-in.
411     */
412    public void setBackoffIdleThreshold(int backoffIdleThreshold) {
413        this.backoffIdleThreshold = backoffIdleThreshold;
414    }
415
416    public int getBackoffErrorThreshold() {
417        return backoffErrorThreshold;
418    }
419
420    /**
421     * The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in.
422     */
423    public void setBackoffErrorThreshold(int backoffErrorThreshold) {
424        this.backoffErrorThreshold = backoffErrorThreshold;
425    }
426
427}