001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.support;
018    
019    import java.util.ArrayList;
020    import java.util.Collections;
021    import java.util.Comparator;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.Set;
025    import java.util.concurrent.ConcurrentHashMap;
026    import java.util.concurrent.ConcurrentMap;
027    import java.util.concurrent.ScheduledExecutorService;
028    import java.util.concurrent.TimeUnit;
029    import java.util.concurrent.locks.Lock;
030    import java.util.concurrent.locks.ReentrantLock;
031    
032    import org.apache.camel.TimeoutMap;
033    import org.apache.camel.util.ObjectHelper;
034    import org.slf4j.Logger;
035    import org.slf4j.LoggerFactory;
036    
037    /**
038     * Default implementation of the {@link TimeoutMap}.
039     * <p/>
040     * This implementation supports thread safe and non thread safe, in the manner you can enable locking or not.
041     * By default locking is enabled and thus we are thread safe.
042     * <p/>
043     * You must provide a {@link java.util.concurrent.ScheduledExecutorService} in the constructor which is used
044     * to schedule a background task which check for old entries to purge. This implementation will shutdown the scheduler
045     * if its being stopped.
046     *
047     * @version 
048     */
049    public class DefaultTimeoutMap<K, V> extends ServiceSupport implements TimeoutMap<K, V>, Runnable {
050    
051        protected final transient Logger log = LoggerFactory.getLogger(getClass());
052    
053        private final ConcurrentMap<K, TimeoutMapEntry<K, V>> map = new ConcurrentHashMap<K, TimeoutMapEntry<K, V>>();
054        private final ScheduledExecutorService executor;
055        private final long purgePollTime;
056        private final Lock lock = new ReentrantLock();
057        private boolean useLock = true;
058    
059        public DefaultTimeoutMap(ScheduledExecutorService executor) {
060            this(executor, 1000);
061        }
062    
063        public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
064            this(executor, requestMapPollTimeMillis, true);
065        }
066    
067        public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis, boolean useLock) {
068            ObjectHelper.notNull(executor, "ScheduledExecutorService");
069            this.executor = executor;
070            this.purgePollTime = requestMapPollTimeMillis;
071            this.useLock = useLock;
072            schedulePoll();
073        }
074    
075        public V get(K key) {
076            TimeoutMapEntry<K, V> entry;
077            if (useLock) {
078                lock.lock();
079            }
080            try {
081                entry = map.get(key);
082                if (entry == null) {
083                    return null;
084                }
085                updateExpireTime(entry);
086            } finally {
087                if (useLock) {
088                    lock.unlock();
089                }
090            }
091            return entry.getValue();
092        }
093    
094        public void put(K key, V value, long timeoutMillis) {
095            TimeoutMapEntry<K, V> entry = new TimeoutMapEntry<K, V>(key, value, timeoutMillis);
096            if (useLock) {
097                lock.lock();
098            }
099            try {
100                map.put(key, entry);
101                updateExpireTime(entry);
102            } finally {
103                if (useLock) {
104                    lock.unlock();
105                }
106            }
107        }
108    
109        public V remove(K key) {
110            TimeoutMapEntry<K, V> entry;
111    
112            if (useLock) {
113                lock.lock();
114            }
115            try {
116                entry = map.remove(key);
117            } finally {
118                if (useLock) {
119                    lock.unlock();
120                }
121            }
122    
123            return entry != null ? entry.getValue() : null;
124        }
125    
126        public Object[] getKeys() {
127            Object[] keys;
128            if (useLock) {
129                lock.lock();
130            }
131            try {
132                Set<K> keySet = map.keySet();
133                keys = new Object[keySet.size()];
134                keySet.toArray(keys);
135            } finally {
136                if (useLock) {
137                    lock.unlock();
138                }
139            }
140            return keys;
141        }
142        
143        public int size() {
144            return map.size();
145        }
146    
147        /**
148         * The timer task which purges old requests and schedules another poll
149         */
150        public void run() {
151            // only run if allowed
152            if (!isRunAllowed()) {
153                log.trace("Purge task not allowed to run");
154                return;
155            }
156    
157            log.trace("Running purge task to see if any entries has been timed out");
158            try {
159                purge();
160            } catch (Throwable t) {
161                // must catch and log exception otherwise the executor will now schedule next run
162                log.warn("Exception occurred during purge task. This exception will be ignored.", t);
163            }
164        }
165    
166        public void purge() {
167            log.trace("There are {} in the timeout map", map.size());
168            if (map.isEmpty()) {
169                return;
170            }
171            
172            long now = currentTime();
173    
174            List<TimeoutMapEntry<K, V>> expired = new ArrayList<TimeoutMapEntry<K, V>>();
175    
176            if (useLock) {
177                lock.lock();
178            }
179            try {
180                // need to find the expired entries and add to the expired list
181                for (Map.Entry<K, TimeoutMapEntry<K, V>> entry : map.entrySet()) {
182                    if (entry.getValue().getExpireTime() < now) {
183                        if (isValidForEviction(entry.getValue())) {
184                            log.debug("Evicting inactive entry ID: {}", entry.getValue());
185                            expired.add(entry.getValue());
186                        }
187                    }
188                }
189    
190                // if we found any expired then we need to sort, onEviction and remove
191                if (!expired.isEmpty()) {
192                    // sort according to the expired time so we got the first expired first
193                    Collections.sort(expired, new Comparator<TimeoutMapEntry<K, V>>() {
194                        public int compare(TimeoutMapEntry<K, V> a, TimeoutMapEntry<K, V> b) {
195                            long diff = a.getExpireTime() - b.getExpireTime();
196                            if (diff == 0) {
197                                return 0;
198                            }
199                            return diff > 0 ? 1 : -1;
200                        }
201                    });
202    
203                    List<K> evicts = new ArrayList<K>(expired.size());
204                    try {
205                        // now fire eviction notification
206                        for (TimeoutMapEntry<K, V> entry : expired) {
207                            boolean evict = false;
208                            try {
209                                evict = onEviction(entry.getKey(), entry.getValue());
210                            } catch (Throwable t) {
211                                log.warn("Exception happened during eviction of entry ID {}, won't evict and will continue trying: {}", 
212                                        entry.getValue(), t);
213                            }
214                            if (evict) {
215                                // okay this entry should be evicted
216                                evicts.add(entry.getKey());
217                            }
218                        }
219                    } finally {
220                        // and must remove from list after we have fired the notifications
221                        for (K key : evicts) {
222                            map.remove(key);
223                        }
224                    }
225                }
226            } finally {
227                if (useLock) {
228                    lock.unlock();
229                }
230            }
231        }
232    
233        // Properties
234        // -------------------------------------------------------------------------
235        
236        public long getPurgePollTime() {
237            return purgePollTime;
238        }
239    
240        public ScheduledExecutorService getExecutor() {
241            return executor;
242        }
243    
244        // Implementation methods
245        // -------------------------------------------------------------------------
246    
247        /**
248         * lets schedule each time to allow folks to change the time at runtime
249         */
250        protected void schedulePoll() {
251            executor.scheduleWithFixedDelay(this, 0, purgePollTime, TimeUnit.MILLISECONDS);
252        }
253    
254        /**
255         * A hook to allow derivations to avoid evicting the current entry
256         */
257        protected boolean isValidForEviction(TimeoutMapEntry<K, V> entry) {
258            return true;
259        }
260    
261        public boolean onEviction(K key, V value) {
262            return true;
263        }
264    
265        protected void updateExpireTime(TimeoutMapEntry<K, V> entry) {
266            long now = currentTime();
267            entry.setExpireTime(entry.getTimeout() + now);
268        }
269    
270        protected long currentTime() {
271            return System.currentTimeMillis();
272        }
273    
274        @Override
275        protected void doStart() throws Exception {
276            if (executor.isShutdown()) {
277                throw new IllegalStateException("The ScheduledExecutorService is shutdown");
278            }
279        }
280    
281        @Override
282        protected void doStop() throws Exception {
283            // clear map if we stop
284            map.clear();
285        }
286    
287    }