001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.support;
018
019import java.util.ArrayList;
020import java.util.Collections;
021import java.util.Comparator;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.ScheduledExecutorService;
028import java.util.concurrent.ScheduledFuture;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.locks.Lock;
031import java.util.concurrent.locks.ReentrantLock;
032
033import org.apache.camel.TimeoutMap;
034import org.apache.camel.util.ObjectHelper;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * Default implementation of the {@link TimeoutMap}.
040 * <p/>
041 * This implementation supports thread safe and non thread safe, in the manner you can enable locking or not.
042 * By default locking is enabled and thus we are thread safe.
043 * <p/>
044 * You must provide a {@link java.util.concurrent.ScheduledExecutorService} in the constructor which is used
045 * to schedule a background task which check for old entries to purge. This implementation will shutdown the scheduler
046 * if its being stopped.
047 * You must also invoke {@link #start()} to startup the timeout map, before its ready to be used.
048 * And you must invoke {@link #stop()} to stop the map when no longer in use.
049 *
050 * @version 
051 */
052public class DefaultTimeoutMap<K, V> extends ServiceSupport implements TimeoutMap<K, V>, Runnable {
053
054    protected final Logger log = LoggerFactory.getLogger(getClass());
055
056    private final ConcurrentMap<K, TimeoutMapEntry<K, V>> map = new ConcurrentHashMap<K, TimeoutMapEntry<K, V>>();
057    private final ScheduledExecutorService executor;
058    private volatile ScheduledFuture<?> future;
059    private final long purgePollTime;
060    private final Lock lock = new ReentrantLock();
061    private boolean useLock = true;
062
063    public DefaultTimeoutMap(ScheduledExecutorService executor) {
064        this(executor, 1000);
065    }
066
067    public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
068        this(executor, requestMapPollTimeMillis, true);
069    }
070
071    public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis, boolean useLock) {
072        ObjectHelper.notNull(executor, "ScheduledExecutorService");
073        this.executor = executor;
074        this.purgePollTime = requestMapPollTimeMillis;
075        this.useLock = useLock;
076    }
077
078    public V get(K key) {
079        TimeoutMapEntry<K, V> entry;
080        if (useLock) {
081            lock.lock();
082        }
083        try {
084            entry = map.get(key);
085            if (entry == null) {
086                return null;
087            }
088            updateExpireTime(entry);
089        } finally {
090            if (useLock) {
091                lock.unlock();
092            }
093        }
094        return entry.getValue();
095    }
096    
097    public V put(K key, V value, long timeoutMillis) {
098        TimeoutMapEntry<K, V> entry = new TimeoutMapEntry<K, V>(key, value, timeoutMillis);
099        if (useLock) {
100            lock.lock();
101        }
102        try {
103            updateExpireTime(entry);
104            TimeoutMapEntry<K, V> result = map.put(key, entry);
105            return result != null ? result.getValue() : null;
106        } finally {
107            if (useLock) {
108                lock.unlock();
109            }
110        }
111    }
112    
113    public V putIfAbsent(K key, V value, long timeoutMillis) {
114        TimeoutMapEntry<K, V> entry = new TimeoutMapEntry<K, V>(key, value, timeoutMillis);
115        if (useLock) {
116            lock.lock();
117        }
118        try {
119            updateExpireTime(entry);
120            //Just make sure we don't override the old entry
121            TimeoutMapEntry<K, V> result = map.putIfAbsent(key, entry);
122            return result != null ? result.getValue() : null;
123        } finally {
124            if (useLock) {
125                lock.unlock();
126            }
127        }
128    }
129
130    public V remove(K key) {
131        TimeoutMapEntry<K, V> entry;
132
133        if (useLock) {
134            lock.lock();
135        }
136        try {
137            entry = map.remove(key);
138        } finally {
139            if (useLock) {
140                lock.unlock();
141            }
142        }
143
144        return entry != null ? entry.getValue() : null;
145    }
146
147    public Object[] getKeys() {
148        Object[] keys;
149        if (useLock) {
150            lock.lock();
151        }
152        try {
153            Set<K> keySet = map.keySet();
154            keys = new Object[keySet.size()];
155            keySet.toArray(keys);
156        } finally {
157            if (useLock) {
158                lock.unlock();
159            }
160        }
161        return keys;
162    }
163    
164    public int size() {
165        return map.size();
166    }
167
168    /**
169     * The timer task which purges old requests and schedules another poll
170     */
171    public void run() {
172        // only run if allowed
173        if (!isRunAllowed()) {
174            log.trace("Purge task not allowed to run");
175            return;
176        }
177
178        log.trace("Running purge task to see if any entries has been timed out");
179        try {
180            purge();
181        } catch (Throwable t) {
182            // must catch and log exception otherwise the executor will now schedule next run
183            log.warn("Exception occurred during purge task. This exception will be ignored.", t);
184        }
185    }
186
187    public void purge() {
188        log.trace("There are {} in the timeout map", map.size());
189        if (map.isEmpty()) {
190            return;
191        }
192        
193        long now = currentTime();
194
195        List<TimeoutMapEntry<K, V>> expired = new ArrayList<TimeoutMapEntry<K, V>>();
196
197        if (useLock) {
198            lock.lock();
199        }
200        try {
201            // need to find the expired entries and add to the expired list
202            for (Map.Entry<K, TimeoutMapEntry<K, V>> entry : map.entrySet()) {
203                if (entry.getValue().getExpireTime() < now) {
204                    if (isValidForEviction(entry.getValue())) {
205                        log.debug("Evicting inactive entry ID: {}", entry.getValue());
206                        expired.add(entry.getValue());
207                    }
208                }
209            }
210
211            // if we found any expired then we need to sort, onEviction and remove
212            if (!expired.isEmpty()) {
213                // sort according to the expired time so we got the first expired first
214                Collections.sort(expired, new Comparator<TimeoutMapEntry<K, V>>() {
215                    public int compare(TimeoutMapEntry<K, V> a, TimeoutMapEntry<K, V> b) {
216                        long diff = a.getExpireTime() - b.getExpireTime();
217                        if (diff == 0) {
218                            return 0;
219                        }
220                        return diff > 0 ? 1 : -1;
221                    }
222                });
223
224                List<K> evicts = new ArrayList<K>(expired.size());
225                try {
226                    // now fire eviction notification
227                    for (TimeoutMapEntry<K, V> entry : expired) {
228                        boolean evict = false;
229                        try {
230                            evict = onEviction(entry.getKey(), entry.getValue());
231                        } catch (Throwable t) {
232                            log.warn("Exception happened during eviction of entry ID {}, won't evict and will continue trying: {}", 
233                                    entry.getValue(), t);
234                        }
235                        if (evict) {
236                            // okay this entry should be evicted
237                            evicts.add(entry.getKey());
238                        }
239                    }
240                } finally {
241                    // and must remove from list after we have fired the notifications
242                    for (K key : evicts) {
243                        map.remove(key);
244                    }
245                }
246            }
247        } finally {
248            if (useLock) {
249                lock.unlock();
250            }
251        }
252    }
253
254    // Properties
255    // -------------------------------------------------------------------------
256    
257    public long getPurgePollTime() {
258        return purgePollTime;
259    }
260
261    public ScheduledExecutorService getExecutor() {
262        return executor;
263    }
264
265    // Implementation methods
266    // -------------------------------------------------------------------------
267
268    /**
269     * lets schedule each time to allow folks to change the time at runtime
270     */
271    protected void schedulePoll() {
272        future = executor.scheduleWithFixedDelay(this, 0, purgePollTime, TimeUnit.MILLISECONDS);
273    }
274
275    /**
276     * A hook to allow derivations to avoid evicting the current entry
277     */
278    protected boolean isValidForEviction(TimeoutMapEntry<K, V> entry) {
279        return true;
280    }
281
282    public boolean onEviction(K key, V value) {
283        return true;
284    }
285
286    protected void updateExpireTime(TimeoutMapEntry<K, V> entry) {
287        long now = currentTime();
288        entry.setExpireTime(entry.getTimeout() + now);
289    }
290
291    protected long currentTime() {
292        return System.currentTimeMillis();
293    }
294
295    @Override
296    protected void doStart() throws Exception {
297        if (executor.isShutdown()) {
298            throw new IllegalStateException("The ScheduledExecutorService is shutdown");
299        }
300        schedulePoll();
301    }
302
303    @Override
304    protected void doStop() throws Exception {
305        if (future != null) {
306            future.cancel(false);
307            future = null;
308        }
309        // clear map if we stop
310        map.clear();
311    }
312
313}