001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.scheduler;
018
019import java.util.concurrent.ScheduledExecutorService;
020
021import org.apache.camel.Consumer;
022import org.apache.camel.Processor;
023import org.apache.camel.Producer;
024import org.apache.camel.impl.ScheduledPollEndpoint;
025import org.apache.camel.spi.Metadata;
026import org.apache.camel.spi.UriEndpoint;
027import org.apache.camel.spi.UriParam;
028import org.apache.camel.spi.UriPath;
029
030/**
031 * The scheduler component is used for generating message exchanges when a scheduler fires.
032 *
033 * This component is similar to the timer component, but it offers more functionality in terms of scheduling.
034 * Also this component uses JDK ScheduledExecutorService. Where as the timer uses a JDK Timer.
035 */
036@UriEndpoint(scheme = "scheduler", title = "Scheduler", syntax = "scheduler:name", consumerOnly = true, consumerClass = SchedulerConsumer.class, label = "core,scheduling")
037public class SchedulerEndpoint extends ScheduledPollEndpoint {
038
039    @UriPath @Metadata(required = "true")
040    private String name;
041    @UriParam(defaultValue = "1", label = "scheduler")
042    private int concurrentTasks = 1;
043
044    public SchedulerEndpoint(String uri, SchedulerComponent component, String remaining) {
045        super(uri, component);
046        this.name = remaining;
047    }
048
049    @Override
050    public SchedulerComponent getComponent() {
051        return (SchedulerComponent) super.getComponent();
052    }
053
054    @Override
055    public Producer createProducer() throws Exception {
056        throw new UnsupportedOperationException("Scheduler cannot be used as a producer");
057    }
058
059    @Override
060    public Consumer createConsumer(Processor processor) throws Exception {
061        SchedulerConsumer consumer = new SchedulerConsumer(this, processor);
062        configureConsumer(consumer);
063        return consumer;
064    }
065
066    @Override
067    public boolean isSingleton() {
068        return true;
069    }
070
071    public String getName() {
072        return name;
073    }
074
075    /**
076     * The name of the scheduler
077     */
078    public void setName(String name) {
079        this.name = name;
080    }
081
082    public int getConcurrentTasks() {
083        return concurrentTasks;
084    }
085
086    /**
087     * Number of threads used by the scheduling thread pool.
088     * <p/>
089     * Is by default using a single thread
090     */
091    public void setConcurrentTasks(int concurrentTasks) {
092        this.concurrentTasks = concurrentTasks;
093    }
094
095    public void onConsumerStart(SchedulerConsumer consumer) {
096        // if using default scheduler then obtain thread pool from component which manages their lifecycle
097        if (consumer.getScheduler() == null && consumer.getScheduledExecutorService() == null) {
098            ScheduledExecutorService scheduler = getComponent().addConsumer(consumer);
099            consumer.setScheduledExecutorService(scheduler);
100        }
101    }
102
103    public void onConsumerStop(SchedulerConsumer consumer) {
104        getComponent().removeConsumer(consumer);
105    }
106}