001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.support;
018    
019    import java.util.LinkedHashSet;
020    import java.util.Set;
021    import java.util.concurrent.ScheduledExecutorService;
022    import java.util.concurrent.ScheduledFuture;
023    import java.util.concurrent.TimeUnit;
024    
025    import org.apache.camel.CamelContext;
026    import org.apache.camel.CamelContextAware;
027    import org.apache.camel.TimerListener;
028    import org.apache.camel.util.ObjectHelper;
029    import org.slf4j.Logger;
030    import org.slf4j.LoggerFactory;
031    
032    /**
033     * A {@link TimerListener} manager which triggers the
034     * {@link org.apache.camel.TimerListener} listeners once every second.
035     * <p/>
036     * Also ensure when adding and remove listeners, that they are correctly removed to avoid
037     * leaking memory.
038     *
039     * @see TimerListener
040     */
041    public class TimerListenerManager extends ServiceSupport implements Runnable, CamelContextAware {
042    
043        private static final Logger LOG = LoggerFactory.getLogger(TimerListenerManager.class);
044        private final Set<TimerListener> listeners = new LinkedHashSet<TimerListener>();
045        private CamelContext camelContext;
046        private ScheduledExecutorService executorService;
047        private volatile ScheduledFuture<?> task;
048        private long interval = 1000L;
049    
050        public TimerListenerManager() {
051        }
052    
053        @Override
054        public void setCamelContext(CamelContext camelContext) {
055            this.camelContext = camelContext;
056        }
057    
058        @Override
059        public CamelContext getCamelContext() {
060            return camelContext;
061        }
062    
063        /**
064         * Gets the interval in millis.
065         * <p/>
066         * The default interval is 1000 millis.
067         *
068         * @return interval in millis.
069         */
070        public long getInterval() {
071            return interval;
072        }
073    
074        /**
075         * Sets the interval in millis.
076         *
077         * @param interval interval in millis.
078         */
079        public void setInterval(long interval) {
080            this.interval = interval;
081        }
082    
083        @Override
084        public void run() {
085            LOG.trace("Running scheduled TimerListener task");
086    
087            if (!isRunAllowed()) {
088                LOG.debug("TimerListener task cannot run as its not allowed");
089                return;
090            }
091    
092            for (TimerListener listener : listeners) {
093                try {
094                    LOG.trace("Invoking onTimer on {}", listener);
095                    listener.onTimer();
096                } catch (Throwable e) {
097                    // ignore
098                    LOG.debug("Error occurred during onTimer for TimerListener: " + listener + ". This exception will be ignored.", e);
099                }
100            }
101        }
102    
103        /**
104         * Adds the listener.
105         * <p/>
106         * It may be important to implement {@link #equals(Object)} and {@link #hashCode()} for the listener
107         * to ensure that we can remove the same listener again, when invoking remove.
108         * 
109         * @param listener listener
110         */
111        public void addTimerListener(TimerListener listener) {
112            listeners.add(listener);
113            LOG.debug("Added TimerListener: {}", listener);
114        }
115    
116        /**
117         * Removes the listener.
118         * <p/>
119         * It may be important to implement {@link #equals(Object)} and {@link #hashCode()} for the listener
120         * to ensure that we can remove the same listener again, when invoking remove.
121         *
122         * @param listener listener.
123         */
124        public void removeTimerListener(TimerListener listener) {
125            listeners.remove(listener);
126            LOG.debug("Removed TimerListener: {}", listener);
127        }
128    
129        @Override
130        protected void doStart() throws Exception {
131            ObjectHelper.notNull(camelContext, "camelContext", this);
132    
133            // create scheduled thread pool to trigger the task to run every interval
134            executorService = camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "ManagementLoadTask");
135            task = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS);
136            LOG.debug("Started scheduled TimerListener task to run with interval {} ms", interval);
137        }
138    
139        @Override
140        protected void doStop() throws Exception {
141            // executor service will be shutdown by CamelContext
142            if (task != null) {
143                task.cancel(true);
144                task = null;
145            }
146        }
147    
148        @Override
149        protected void doShutdown() throws Exception {
150            super.doShutdown();
151            // shutdown thread pool when we are shutting down
152            camelContext.getExecutorServiceManager().shutdown(executorService);
153            executorService = null;
154            listeners.clear();
155        }
156    }
157