001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.support; 018 019 import java.util.ArrayList; 020 import java.util.Collections; 021 import java.util.Comparator; 022 import java.util.List; 023 import java.util.Map; 024 import java.util.Set; 025 import java.util.concurrent.ConcurrentHashMap; 026 import java.util.concurrent.ConcurrentMap; 027 import java.util.concurrent.ScheduledExecutorService; 028 import java.util.concurrent.TimeUnit; 029 import java.util.concurrent.locks.Lock; 030 import java.util.concurrent.locks.ReentrantLock; 031 032 import org.apache.camel.TimeoutMap; 033 import org.apache.camel.util.ObjectHelper; 034 import org.slf4j.Logger; 035 import org.slf4j.LoggerFactory; 036 037 /** 038 * Default implementation of the {@link TimeoutMap}. 039 * <p/> 040 * This implementation supports thread safe and non thread safe, in the manner you can enable locking or not. 041 * By default locking is enabled and thus we are thread safe. 042 * <p/> 043 * You must provide a {@link java.util.concurrent.ScheduledExecutorService} in the constructor which is used 044 * to schedule a background task which check for old entries to purge. This implementation will shutdown the scheduler 045 * if its being stopped. 046 * 047 * @version 048 */ 049 public class DefaultTimeoutMap<K, V> extends ServiceSupport implements TimeoutMap<K, V>, Runnable { 050 051 protected final transient Logger log = LoggerFactory.getLogger(getClass()); 052 053 private final ConcurrentMap<K, TimeoutMapEntry<K, V>> map = new ConcurrentHashMap<K, TimeoutMapEntry<K, V>>(); 054 private final ScheduledExecutorService executor; 055 private final long purgePollTime; 056 private final Lock lock = new ReentrantLock(); 057 private boolean useLock = true; 058 059 public DefaultTimeoutMap(ScheduledExecutorService executor) { 060 this(executor, 1000); 061 } 062 063 public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) { 064 this(executor, requestMapPollTimeMillis, true); 065 } 066 067 public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis, boolean useLock) { 068 ObjectHelper.notNull(executor, "ScheduledExecutorService"); 069 this.executor = executor; 070 this.purgePollTime = requestMapPollTimeMillis; 071 this.useLock = useLock; 072 schedulePoll(); 073 } 074 075 public V get(K key) { 076 TimeoutMapEntry<K, V> entry; 077 if (useLock) { 078 lock.lock(); 079 } 080 try { 081 entry = map.get(key); 082 if (entry == null) { 083 return null; 084 } 085 updateExpireTime(entry); 086 } finally { 087 if (useLock) { 088 lock.unlock(); 089 } 090 } 091 return entry.getValue(); 092 } 093 094 public void put(K key, V value, long timeoutMillis) { 095 TimeoutMapEntry<K, V> entry = new TimeoutMapEntry<K, V>(key, value, timeoutMillis); 096 if (useLock) { 097 lock.lock(); 098 } 099 try { 100 map.put(key, entry); 101 updateExpireTime(entry); 102 } finally { 103 if (useLock) { 104 lock.unlock(); 105 } 106 } 107 } 108 109 public V remove(K key) { 110 TimeoutMapEntry<K, V> entry; 111 112 if (useLock) { 113 lock.lock(); 114 } 115 try { 116 entry = map.remove(key); 117 } finally { 118 if (useLock) { 119 lock.unlock(); 120 } 121 } 122 123 return entry != null ? entry.getValue() : null; 124 } 125 126 public Object[] getKeys() { 127 Object[] keys; 128 if (useLock) { 129 lock.lock(); 130 } 131 try { 132 Set<K> keySet = map.keySet(); 133 keys = new Object[keySet.size()]; 134 keySet.toArray(keys); 135 } finally { 136 if (useLock) { 137 lock.unlock(); 138 } 139 } 140 return keys; 141 } 142 143 public int size() { 144 return map.size(); 145 } 146 147 /** 148 * The timer task which purges old requests and schedules another poll 149 */ 150 public void run() { 151 // only run if allowed 152 if (!isRunAllowed()) { 153 log.trace("Purge task not allowed to run"); 154 return; 155 } 156 157 log.trace("Running purge task to see if any entries has been timed out"); 158 try { 159 purge(); 160 } catch (Throwable t) { 161 // must catch and log exception otherwise the executor will now schedule next run 162 log.warn("Exception occurred during purge task. This exception will be ignored.", t); 163 } 164 } 165 166 public void purge() { 167 log.trace("There are {} in the timeout map", map.size()); 168 if (map.isEmpty()) { 169 return; 170 } 171 172 long now = currentTime(); 173 174 List<TimeoutMapEntry<K, V>> expired = new ArrayList<TimeoutMapEntry<K, V>>(); 175 176 if (useLock) { 177 lock.lock(); 178 } 179 try { 180 // need to find the expired entries and add to the expired list 181 for (Map.Entry<K, TimeoutMapEntry<K, V>> entry : map.entrySet()) { 182 if (entry.getValue().getExpireTime() < now) { 183 if (isValidForEviction(entry.getValue())) { 184 log.debug("Evicting inactive entry ID: {}", entry.getValue()); 185 expired.add(entry.getValue()); 186 } 187 } 188 } 189 190 // if we found any expired then we need to sort, onEviction and remove 191 if (!expired.isEmpty()) { 192 // sort according to the expired time so we got the first expired first 193 Collections.sort(expired, new Comparator<TimeoutMapEntry<K, V>>() { 194 public int compare(TimeoutMapEntry<K, V> a, TimeoutMapEntry<K, V> b) { 195 long diff = a.getExpireTime() - b.getExpireTime(); 196 if (diff == 0) { 197 return 0; 198 } 199 return diff > 0 ? 1 : -1; 200 } 201 }); 202 203 List<K> evicts = new ArrayList<K>(expired.size()); 204 try { 205 // now fire eviction notification 206 for (TimeoutMapEntry<K, V> entry : expired) { 207 boolean evict = false; 208 try { 209 evict = onEviction(entry.getKey(), entry.getValue()); 210 } catch (Throwable t) { 211 log.warn("Exception happened during eviction of entry ID {}, won't evict and will continue trying: {}", 212 entry.getValue(), t); 213 } 214 if (evict) { 215 // okay this entry should be evicted 216 evicts.add(entry.getKey()); 217 } 218 } 219 } finally { 220 // and must remove from list after we have fired the notifications 221 for (K key : evicts) { 222 map.remove(key); 223 } 224 } 225 } 226 } finally { 227 if (useLock) { 228 lock.unlock(); 229 } 230 } 231 } 232 233 // Properties 234 // ------------------------------------------------------------------------- 235 236 public long getPurgePollTime() { 237 return purgePollTime; 238 } 239 240 public ScheduledExecutorService getExecutor() { 241 return executor; 242 } 243 244 // Implementation methods 245 // ------------------------------------------------------------------------- 246 247 /** 248 * lets schedule each time to allow folks to change the time at runtime 249 */ 250 protected void schedulePoll() { 251 executor.scheduleWithFixedDelay(this, 0, purgePollTime, TimeUnit.MILLISECONDS); 252 } 253 254 /** 255 * A hook to allow derivations to avoid evicting the current entry 256 */ 257 protected boolean isValidForEviction(TimeoutMapEntry<K, V> entry) { 258 return true; 259 } 260 261 public boolean onEviction(K key, V value) { 262 return true; 263 } 264 265 protected void updateExpireTime(TimeoutMapEntry<K, V> entry) { 266 long now = currentTime(); 267 entry.setExpireTime(entry.getTimeout() + now); 268 } 269 270 protected long currentTime() { 271 return System.currentTimeMillis(); 272 } 273 274 @Override 275 protected void doStart() throws Exception { 276 if (executor.isShutdown()) { 277 throw new IllegalStateException("The ScheduledExecutorService is shutdown"); 278 } 279 } 280 281 @Override 282 protected void doStop() throws Exception { 283 // clear map if we stop 284 map.clear(); 285 } 286 287 }