001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.support; 018 019import java.util.ArrayList; 020import java.util.Collections; 021import java.util.Comparator; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.ScheduledExecutorService; 028import java.util.concurrent.ScheduledFuture; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.locks.Lock; 031import java.util.concurrent.locks.ReentrantLock; 032 033import org.apache.camel.TimeoutMap; 034import org.apache.camel.util.ObjectHelper; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * Default implementation of the {@link TimeoutMap}. 040 * <p/> 041 * This implementation supports thread safe and non thread safe, in the manner you can enable locking or not. 042 * By default locking is enabled and thus we are thread safe. 043 * <p/> 044 * You must provide a {@link java.util.concurrent.ScheduledExecutorService} in the constructor which is used 045 * to schedule a background task which check for old entries to purge. This implementation will shutdown the scheduler 046 * if its being stopped. 047 * You must also invoke {@link #start()} to startup the timeout map, before its ready to be used. 048 * And you must invoke {@link #stop()} to stop the map when no longer in use. 049 * 050 * @version 051 */ 052public class DefaultTimeoutMap<K, V> extends ServiceSupport implements TimeoutMap<K, V>, Runnable { 053 054 protected final Logger log = LoggerFactory.getLogger(getClass()); 055 056 private final ConcurrentMap<K, TimeoutMapEntry<K, V>> map = new ConcurrentHashMap<K, TimeoutMapEntry<K, V>>(); 057 private final ScheduledExecutorService executor; 058 private volatile ScheduledFuture<?> future; 059 private final long purgePollTime; 060 private final Lock lock = new ReentrantLock(); 061 private boolean useLock = true; 062 063 public DefaultTimeoutMap(ScheduledExecutorService executor) { 064 this(executor, 1000); 065 } 066 067 public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) { 068 this(executor, requestMapPollTimeMillis, true); 069 } 070 071 public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis, boolean useLock) { 072 ObjectHelper.notNull(executor, "ScheduledExecutorService"); 073 this.executor = executor; 074 this.purgePollTime = requestMapPollTimeMillis; 075 this.useLock = useLock; 076 } 077 078 public V get(K key) { 079 TimeoutMapEntry<K, V> entry; 080 if (useLock) { 081 lock.lock(); 082 } 083 try { 084 entry = map.get(key); 085 if (entry == null) { 086 return null; 087 } 088 updateExpireTime(entry); 089 } finally { 090 if (useLock) { 091 lock.unlock(); 092 } 093 } 094 return entry.getValue(); 095 } 096 097 public V put(K key, V value, long timeoutMillis) { 098 TimeoutMapEntry<K, V> entry = new TimeoutMapEntry<K, V>(key, value, timeoutMillis); 099 if (useLock) { 100 lock.lock(); 101 } 102 try { 103 updateExpireTime(entry); 104 TimeoutMapEntry<K, V> result = map.put(key, entry); 105 return result != null ? result.getValue() : null; 106 } finally { 107 if (useLock) { 108 lock.unlock(); 109 } 110 } 111 } 112 113 public V putIfAbsent(K key, V value, long timeoutMillis) { 114 TimeoutMapEntry<K, V> entry = new TimeoutMapEntry<K, V>(key, value, timeoutMillis); 115 if (useLock) { 116 lock.lock(); 117 } 118 try { 119 updateExpireTime(entry); 120 //Just make sure we don't override the old entry 121 TimeoutMapEntry<K, V> result = map.putIfAbsent(key, entry); 122 return result != null ? result.getValue() : null; 123 } finally { 124 if (useLock) { 125 lock.unlock(); 126 } 127 } 128 } 129 130 public V remove(K key) { 131 TimeoutMapEntry<K, V> entry; 132 133 if (useLock) { 134 lock.lock(); 135 } 136 try { 137 entry = map.remove(key); 138 } finally { 139 if (useLock) { 140 lock.unlock(); 141 } 142 } 143 144 return entry != null ? entry.getValue() : null; 145 } 146 147 public Object[] getKeys() { 148 Object[] keys; 149 if (useLock) { 150 lock.lock(); 151 } 152 try { 153 Set<K> keySet = map.keySet(); 154 keys = new Object[keySet.size()]; 155 keySet.toArray(keys); 156 } finally { 157 if (useLock) { 158 lock.unlock(); 159 } 160 } 161 return keys; 162 } 163 164 public int size() { 165 return map.size(); 166 } 167 168 /** 169 * The timer task which purges old requests and schedules another poll 170 */ 171 public void run() { 172 // only run if allowed 173 if (!isRunAllowed()) { 174 log.trace("Purge task not allowed to run"); 175 return; 176 } 177 178 log.trace("Running purge task to see if any entries has been timed out"); 179 try { 180 purge(); 181 } catch (Throwable t) { 182 // must catch and log exception otherwise the executor will now schedule next run 183 log.warn("Exception occurred during purge task. This exception will be ignored.", t); 184 } 185 } 186 187 public void purge() { 188 log.trace("There are {} in the timeout map", map.size()); 189 if (map.isEmpty()) { 190 return; 191 } 192 193 long now = currentTime(); 194 195 List<TimeoutMapEntry<K, V>> expired = new ArrayList<TimeoutMapEntry<K, V>>(); 196 197 if (useLock) { 198 lock.lock(); 199 } 200 try { 201 // need to find the expired entries and add to the expired list 202 for (Map.Entry<K, TimeoutMapEntry<K, V>> entry : map.entrySet()) { 203 if (entry.getValue().getExpireTime() < now) { 204 if (isValidForEviction(entry.getValue())) { 205 log.debug("Evicting inactive entry ID: {}", entry.getValue()); 206 expired.add(entry.getValue()); 207 } 208 } 209 } 210 211 // if we found any expired then we need to sort, onEviction and remove 212 if (!expired.isEmpty()) { 213 // sort according to the expired time so we got the first expired first 214 Collections.sort(expired, new Comparator<TimeoutMapEntry<K, V>>() { 215 public int compare(TimeoutMapEntry<K, V> a, TimeoutMapEntry<K, V> b) { 216 long diff = a.getExpireTime() - b.getExpireTime(); 217 if (diff == 0) { 218 return 0; 219 } 220 return diff > 0 ? 1 : -1; 221 } 222 }); 223 224 List<K> evicts = new ArrayList<K>(expired.size()); 225 try { 226 // now fire eviction notification 227 for (TimeoutMapEntry<K, V> entry : expired) { 228 boolean evict = false; 229 try { 230 evict = onEviction(entry.getKey(), entry.getValue()); 231 } catch (Throwable t) { 232 log.warn("Exception happened during eviction of entry ID {}, won't evict and will continue trying: {}", 233 entry.getValue(), t); 234 } 235 if (evict) { 236 // okay this entry should be evicted 237 evicts.add(entry.getKey()); 238 } 239 } 240 } finally { 241 // and must remove from list after we have fired the notifications 242 for (K key : evicts) { 243 map.remove(key); 244 } 245 } 246 } 247 } finally { 248 if (useLock) { 249 lock.unlock(); 250 } 251 } 252 } 253 254 // Properties 255 // ------------------------------------------------------------------------- 256 257 public long getPurgePollTime() { 258 return purgePollTime; 259 } 260 261 public ScheduledExecutorService getExecutor() { 262 return executor; 263 } 264 265 // Implementation methods 266 // ------------------------------------------------------------------------- 267 268 /** 269 * lets schedule each time to allow folks to change the time at runtime 270 */ 271 protected void schedulePoll() { 272 future = executor.scheduleWithFixedDelay(this, 0, purgePollTime, TimeUnit.MILLISECONDS); 273 } 274 275 /** 276 * A hook to allow derivations to avoid evicting the current entry 277 */ 278 protected boolean isValidForEviction(TimeoutMapEntry<K, V> entry) { 279 return true; 280 } 281 282 public boolean onEviction(K key, V value) { 283 return true; 284 } 285 286 protected void updateExpireTime(TimeoutMapEntry<K, V> entry) { 287 long now = currentTime(); 288 entry.setExpireTime(entry.getTimeout() + now); 289 } 290 291 protected long currentTime() { 292 return System.currentTimeMillis(); 293 } 294 295 @Override 296 protected void doStart() throws Exception { 297 if (executor.isShutdown()) { 298 throw new IllegalStateException("The ScheduledExecutorService is shutdown"); 299 } 300 schedulePoll(); 301 } 302 303 @Override 304 protected void doStop() throws Exception { 305 if (future != null) { 306 future.cancel(false); 307 future = null; 308 } 309 // clear map if we stop 310 map.clear(); 311 } 312 313}