? .classpath ? .project ? etc ? lib/core/extLog4j_1-2-6.jar ? lib/core/extXerces_2-4-0.jar ? lib/core/extXmlApis_1-0-0.jar ? src/core/java/com/opensymphony/oscache/base/AbstractCacheAdministrator.class ? src/core/java/com/opensymphony/oscache/base/Cache.class ? src/core/java/com/opensymphony/oscache/base/CacheEntry.class ? src/core/java/com/opensymphony/oscache/base/Config.class ? src/core/java/com/opensymphony/oscache/base/DummyAlwayRefreshEntryPolicy.class ? src/core/java/com/opensymphony/oscache/base/EntryRefreshPolicy.class ? src/core/java/com/opensymphony/oscache/base/EntryUpdateState.class ? src/core/java/com/opensymphony/oscache/base/FinalizationException.class ? src/core/java/com/opensymphony/oscache/base/InitializationException.class ? src/core/java/com/opensymphony/oscache/base/LifecycleAware.class ? src/core/java/com/opensymphony/oscache/base/NeedsRefreshException.class ? src/core/java/com/opensymphony/oscache/base/TestAbstractCacheAdministrator.class ? src/core/java/com/opensymphony/oscache/base/TestCache.class ? src/core/java/com/opensymphony/oscache/base/TestCacheEntry.class ? src/core/java/com/opensymphony/oscache/base/TestCompleteBase.class ? src/core/java/com/opensymphony/oscache/base/TestConcurrency$GetEntry.class ? src/core/java/com/opensymphony/oscache/base/TestConcurrency$GetStaleEntryAndCancelUpdate.class ? src/core/java/com/opensymphony/oscache/base/TestConcurrency$OSGeneralTest.class ? src/core/java/com/opensymphony/oscache/base/TestConcurrency.class ? src/core/java/com/opensymphony/oscache/base/algorithm/AbstractConcurrentReadCache$1.class ? src/core/java/com/opensymphony/oscache/base/algorithm/AbstractConcurrentReadCache$2.class ? src/core/java/com/opensymphony/oscache/base/algorithm/AbstractConcurrentReadCache$3.class ? src/core/java/com/opensymphony/oscache/base/algorithm/AbstractConcurrentReadCache$Entry.class ? src/core/java/com/opensymphony/oscache/base/algorithm/AbstractConcurrentReadCache$HashIterator.class ? src/core/java/com/opensymphony/oscache/base/algorithm/AbstractConcurrentReadCache$KeyIterator.class ? src/core/java/com/opensymphony/oscache/base/algorithm/AbstractConcurrentReadCache$ValueIterator.class ? src/core/java/com/opensymphony/oscache/base/algorithm/AbstractConcurrentReadCache.class ? src/core/java/com/opensymphony/oscache/base/algorithm/FIFOCache.class ? src/core/java/com/opensymphony/oscache/base/algorithm/LRUCache.class ? src/core/java/com/opensymphony/oscache/base/algorithm/TestAbstractCache.class ? src/core/java/com/opensymphony/oscache/base/algorithm/TestCompleteAlgorithm.class ? src/core/java/com/opensymphony/oscache/base/algorithm/TestFIFOCache.class ? src/core/java/com/opensymphony/oscache/base/algorithm/TestLRUCache.class ? src/core/java/com/opensymphony/oscache/base/algorithm/TestQueueCache.class ? src/core/java/com/opensymphony/oscache/base/algorithm/TestUnlimitedCache.class ? src/core/java/com/opensymphony/oscache/base/algorithm/UnlimitedCache.class ? src/core/java/com/opensymphony/oscache/base/events/CacheEntryEvent.class ? src/core/java/com/opensymphony/oscache/base/events/CacheEntryEventListener.class ? src/core/java/com/opensymphony/oscache/base/events/CacheEntryEventType.class ? src/core/java/com/opensymphony/oscache/base/events/CacheEvent.class ? src/core/java/com/opensymphony/oscache/base/events/CacheEventListener.class ? src/core/java/com/opensymphony/oscache/base/events/CacheGroupEvent.class ? src/core/java/com/opensymphony/oscache/base/events/CacheMapAccessEvent.class ? src/core/java/com/opensymphony/oscache/base/events/CacheMapAccessEventListener.class ? src/core/java/com/opensymphony/oscache/base/events/CacheMapAccessEventType.class ? src/core/java/com/opensymphony/oscache/base/events/CachePatternEvent.class ? src/core/java/com/opensymphony/oscache/base/events/CachewideEvent.class ? src/core/java/com/opensymphony/oscache/base/events/CachewideEventType.class ? src/core/java/com/opensymphony/oscache/base/events/ScopeEvent.class ? src/core/java/com/opensymphony/oscache/base/events/ScopeEventListener.class ? src/core/java/com/opensymphony/oscache/base/events/ScopeEventType.class ? src/core/java/com/opensymphony/oscache/base/events/TestCacheEntryEvent.class ? src/core/java/com/opensymphony/oscache/base/events/TestCacheMapAccessEvent.class ? src/core/java/com/opensymphony/oscache/base/events/TestCompleteEvents.class ? src/core/java/com/opensymphony/oscache/base/events/TestScopeEvent.class ? src/core/java/com/opensymphony/oscache/base/persistence/CachePersistenceException.class ? src/core/java/com/opensymphony/oscache/base/persistence/PersistenceListener.class ? src/core/java/com/opensymphony/oscache/extra/CacheEntryEventListenerImpl.class ? src/core/java/com/opensymphony/oscache/extra/CacheMapAccessEventListenerImpl.class ? src/core/java/com/opensymphony/oscache/extra/ScopeEventListenerImpl.class ? src/core/java/com/opensymphony/oscache/extra/TestCacheEntryEventListenerImpl.class ? src/core/java/com/opensymphony/oscache/extra/TestCacheMapAccessEventListenerImpl.class ? src/core/java/com/opensymphony/oscache/extra/TestCompleteExtra.class ? src/core/java/com/opensymphony/oscache/extra/TestScopeEventListenerImpl.class ? src/core/java/com/opensymphony/oscache/general/GeneralCacheAdministrator.class ? src/core/java/com/opensymphony/oscache/general/TestCompleteGeneral.class ? src/core/java/com/opensymphony/oscache/general/TestGeneralCacheAdministrator.class ? src/core/java/com/opensymphony/oscache/util/ClassLoaderUtil.class ? src/core/java/com/opensymphony/oscache/util/FastCronParser.class ? src/core/java/com/opensymphony/oscache/util/StringUtil.class ? src/core/java/com/opensymphony/oscache/util/TestFastCronParser.class ? src/core/java/com/opensymphony/oscache/util/ValueSet.class ? src/core/java/com/opensymphony/oscache/web/CacheContextListener.class ? src/core/java/com/opensymphony/oscache/web/ServletCache.class ? src/core/java/com/opensymphony/oscache/web/ServletCacheAdministrator.class ? src/core/java/com/opensymphony/oscache/web/WebEntryRefreshPolicy.class ? src/core/java/com/opensymphony/oscache/web/filter/CacheFilter.class ? src/core/java/com/opensymphony/oscache/web/filter/CacheHttpServletResponseWrapper.class ? src/core/java/com/opensymphony/oscache/web/filter/ResponseContent.class ? src/core/java/com/opensymphony/oscache/web/filter/SplitServletOutputStream.class ? src/core/java/com/opensymphony/oscache/web/tag/CacheTag.class ? src/core/java/com/opensymphony/oscache/web/tag/FlushTag.class ? src/core/java/com/opensymphony/oscache/web/tag/GroupTag.class ? src/core/java/com/opensymphony/oscache/web/tag/UseCachedTag.class Index: build.properties =================================================================== RCS file: /cvs/oscache/build.properties,v retrieving revision 1.2 diff -r1.2 build.properties 3c3 < version=2.0.1 --- > version=2.0.3 13c13 < #test.cluster= \ No newline at end of file --- > #test.cluster= Index: build.xml =================================================================== RCS file: /cvs/oscache/build.xml,v retrieving revision 1.10 diff -r1.10 build.xml 91c91,97 < --- > compiler="javac1.3" > bootclasspath="/usr/java/jdk1.3.1/jre/lib/rt.jar;/usr/java/jdk1.3.1/lib/dt.jar;" > source="1.3" > target="1.3" > > /> 117c123 < --- > 146c152,156 < --- > > > > > 152c162 < --- > 154c164 < --- > 156,157c166,171 < < --- > > > > > > 158a173,174 > > 222c238 < --- > 227,228c243 < < --- > 230,233d244 < < < < 240,255d250 < < < < < < < < < < < < < < < < Index: src/core/java/com/opensymphony/oscache/base/Cache.java =================================================================== RCS file: /cvs/oscache/src/core/java/com/opensymphony/oscache/base/Cache.java,v retrieving revision 1.6 diff -r1.6 Cache.java 60,61c60,64 < * A set that holds keys of cache entries that are currently being built. < * The cache checks against this map when a stale entry is requested. --- > * A map that holds keys of cache entries that are currently being built, and EntryUpdateState instance as values. This is used to coordinate threads > * that modify/access a same key in concurrence. > * > * The cache checks against this map when a stale entry is requested, or a cache miss is observed. > * 66,68c69,75 < * We need to isolate these here since the actual CacheEntry < * objects may not normally be held in memory at all (eg, if no < * memory cache is configured). --- > * To avoid data races, values in this map should remain present during the whole time distinct threads deal with the > * same key. We implement this using explicit reference counting in the EntryUpdateState instance, to be able to clean up > * the map once all threads have declared they are done accessing/updating a given key. > * > * It is not possible to locate this into the CacheEntry because this would require to have a CacheEntry instance for all cache misses, and > * may therefore generate a memory leak. More over, the CacheEntry instance may not be hold in memory in the case no > * memory cache is configured. 238d244 < EntryUpdateState updateState = getUpdateState(key); 240,287c246,293 < synchronized (updateState) { < if (updateState.isAwaitingUpdate() || updateState.isCancelled()) { < // No one else is currently updating this entry - grab ownership < updateState.startUpdate(); < < if (cacheEntry.isNew()) { < accessEventType = CacheMapAccessEventType.MISS; < } else { < accessEventType = CacheMapAccessEventType.STALE_HIT; < } < } else if (updateState.isUpdating()) { < // Another thread is already updating the cache. We block if this < // is a new entry, or blocking mode is enabled. Either putInCache() < // or cancelUpdate() can cause this thread to resume. < if (cacheEntry.isNew() || blocking) { < do { < try { < updateState.wait(); < } catch (InterruptedException e) { < } < } while (updateState.isUpdating()); < < if (updateState.isCancelled()) { < // The updating thread cancelled the update, let this one have a go. < updateState.startUpdate(); < < // We put the updateState object back into the updateStates map so < // any remaining threads waiting on this cache entry will be notified < // once this thread has done its thing (either updated the cache or < // cancelled the update). Without this code they'll get left hanging... < synchronized (updateStates) { < updateStates.put(key, updateState); < } < < if (cacheEntry.isNew()) { < accessEventType = CacheMapAccessEventType.MISS; < } else { < accessEventType = CacheMapAccessEventType.STALE_HIT; < } < } else if (updateState.isComplete()) { < reload = true; < } else { < log.error("Invalid update state for cache entry " + key); < } < } < } else { < reload = true; < } --- > //Get access to the EntryUpdateState instance and increment the usage count during the potential sleep > EntryUpdateState updateState = getUpdateState(key); > try { > synchronized (updateState) { > if (updateState.isAwaitingUpdate() || updateState.isCancelled()) { > // No one else is currently updating this entry - grab ownership > updateState.startUpdate(); > > if (cacheEntry.isNew()) { > accessEventType = CacheMapAccessEventType.MISS; > } else { > accessEventType = CacheMapAccessEventType.STALE_HIT; > } > } else if (updateState.isUpdating()) { > // Another thread is already updating the cache. We block if this > // is a new entry, or blocking mode is enabled. Either putInCache() > // or cancelUpdate() can cause this thread to resume. > if (cacheEntry.isNew() || blocking) { > do { > try { > updateState.wait(); > } catch (InterruptedException e) { > } > } while (updateState.isUpdating()); > > if (updateState.isCancelled()) { > // The updating thread cancelled the update, let this one have a go. This increments the usage count for this EntryUpdateState instance > updateState.startUpdate(); > > if (cacheEntry.isNew()) { > accessEventType = CacheMapAccessEventType.MISS; > } else { > accessEventType = CacheMapAccessEventType.STALE_HIT; > } > } else if (updateState.isComplete()) { > reload = true; > } else { > log.error("Invalid update state for cache entry " + key); > } > } > } else { > reload = true; > } > } > } finally { > //Make sure we release the usage count for this EntryUpdateState since we don't use it anymore. If the current thread started the update, then the counter was > //increased by one in startUpdate() > releaseUpdateState(updateState, key); 364c370 < state = (EntryUpdateState) updateStates.remove(key); --- > state = (EntryUpdateState) updateStates.get(key); 368c374 < state.cancelUpdate(); --- > int usageCounter = state.cancelUpdate(); 369a376,377 > > checkEntryStateUpdateUsage(key, state, usageCounter); 370a379,382 > } else { > if (log.isErrorEnabled()) { > log.error("internal error: expected to get a state from key [" + key + "]"); > } 376a389,405 > * Utility method to check if the specified usage count is zero, and if so remove the corresponding EntryUpdateState from the updateStates. This is designed to factor common code. > * > * Warning: This method should always be called while holding both the updateStates field and the state parameter > */ > private void checkEntryStateUpdateUsage(String key, EntryUpdateState state, int usageCounter) { > //Clean up the updateStates map to avoid a memory leak once no thread is using this EntryUpdateState instance anymore. > if (usageCounter ==0) { > EntryUpdateState removedState = (EntryUpdateState) updateStates.remove(key); > if (state != removedState) { > if (log.isErrorEnabled()) { > log.error("internal error: removed state [" + removedState + "] from key [" + key + "] whereas we expected [" + state + "]"); > } > } > } > } > > /** 654a684,686 > } else { > //Otherwise indicate that we start using it to prevent its removal until all threads are done with it. > updateState.incrementUsageCounter(); 661a694,705 > * releases the usage that was made of the specified EntryUpdateState. When this reaches zero, the entry is removed from the map. > * @param state the state to release the usage of > * @param key the associated key. > */ > protected void releaseUpdateState(EntryUpdateState state, String key) { > synchronized (updateStates) { > int usageCounter = state.decrementUsageCounter(); > checkEntryStateUpdateUsage(key, state, usageCounter); > } > } > > /** 671c715,716 < * by the {@link #putInCache} method. --- > * by the {@link #putInCache} method, so it is possible that no EntryUpdateState was hold > * when this method is called. 679c724 < state = (EntryUpdateState) updateStates.remove(key); --- > state = (EntryUpdateState) updateStates.get(key); 683c728 < state.completeUpdate(); --- > int usageCounter = state.completeUpdate(); 684a730,732 > > checkEntryStateUpdateUsage(key, state, usageCounter); > 686c734,736 < } --- > } else { > //If putInCache() was called directly (i.e. not as a result of a NeedRefreshException) then no EntryUpdateState would be found. > } 859a910,928 > > /** > * Test support only: return the number of EntryUpdateState instances within the updateStates map. > */ > protected int getNbUpdateState() { > synchronized(updateStates) { > return updateStates.size(); > } > } > > > /** > * Test support only: return the number of entries currently in the cache map > */ > public int getNbEntries() { > synchronized(cacheMap) { > return cacheMap.size(); > } > } Index: src/core/java/com/opensymphony/oscache/base/EntryUpdateState.java =================================================================== RCS file: /cvs/oscache/src/core/java/com/opensymphony/oscache/base/EntryUpdateState.java,v retrieving revision 1.3 diff -r1.3 EntryUpdateState.java 41a42,48 > > /** > * A counter of the number of threads that are coordinated through this instance. When this counter gets to zero, then the reference to this > * instance may be released from the Cache instance. > * This is counter is protected by the EntryStateUpdate instance monitor. > */ > private int nbConcurrentUses = 1; 79a87 > * @return the counter value after the operation completed 81c89 < public void cancelUpdate() { --- > public int cancelUpdate() { 86a95 > return decrementUsageCounter(); 91a101 > * @return the counter value after the operation completed 93c103 < public void completeUpdate() { --- > public int completeUpdate() { 98a109 > return decrementUsageCounter(); 103a115 > * @return the counter value after the operation completed 105c117 < public void startUpdate() { --- > public int startUpdate() { 110a123 > return incrementUsageCounter(); 111a125,156 > > /** > * Increments the usage counter by one > * @return the counter value after the increment > */ > public synchronized int incrementUsageCounter() { > nbConcurrentUses++; > return nbConcurrentUses; > } > > /** > * Gets the current usage counter value > * @return a positive number. > */ > public synchronized int getUsageCounter() { > return nbConcurrentUses; > } > > > /** > * Decrements the usage counter by one. This method may only be called when the usage number is greater than zero > * @return the counter value after the decrement > */ > public synchronized int decrementUsageCounter() { > if (nbConcurrentUses <=0) { > throw new IllegalStateException("Cannot decrement usage counter, it is already equals to [" + nbConcurrentUses + "]"); > } > nbConcurrentUses--; > return nbConcurrentUses; > } > > Index: src/core/java/com/opensymphony/oscache/base/algorithm/FIFOCache.java =================================================================== RCS file: /cvs/oscache/src/core/java/com/opensymphony/oscache/base/algorithm/FIFOCache.java,v retrieving revision 1.1 diff -r1.1 FIFOCache.java 8a9,10 > import com.opensymphony.oscache.util.ClassLoaderUtil; > 41,42c43,44 < Class.forName("java.util.LinkedHashSet"); < list = new LinkedHashSet(); --- > Class linkedHashSetClass = ClassLoaderUtil.loadClass("java.util.LinkedHashSet", this.getClass()); > list = (Collection) linkedHashSetClass.newInstance(); 44c46,51 < } catch (ClassNotFoundException e) { --- > } catch (Exception e) { > isSet = false; > } catch(LinkageError e) { > isSet = false; > } > if (!isSet) { 46c53 < } --- > } Index: src/core/java/com/opensymphony/oscache/base/algorithm/LRUCache.java =================================================================== RCS file: /cvs/oscache/src/core/java/com/opensymphony/oscache/base/algorithm/LRUCache.java,v retrieving revision 1.2 diff -r1.2 LRUCache.java 83,84c83,84 < ClassLoaderUtil.loadClass("java.util.LinkedHashSet", this.getClass()); < list = new LinkedHashSet(); --- > Class linkedHashSetClass = ClassLoaderUtil.loadClass("java.util.LinkedHashSet", this.getClass()); > list = (Collection) linkedHashSetClass.newInstance(); 86c86,91 < } catch (ClassNotFoundException e) { --- > } catch (Exception e) { > isSet = false; > } catch(LinkageError e) { > isSet = false; > } > if (!isSet) { 93c98,103 < } catch (ClassNotFoundException e1) { --- > } catch (Exception e1) { > isMap = false; > } catch(LinkageError e) { > isMap = false; > } > if (!isMap) { Index: src/core/test/com/opensymphony/oscache/base/TestCache.java =================================================================== RCS file: /cvs/oscache/src/core/test/com/opensymphony/oscache/base/TestCache.java,v retrieving revision 1.1 diff -r1.1 TestCache.java 6a7,8 > import java.util.Properties; > 8a11 > import junit.framework.Assert; 79a83,197 > /** > * Tests that with a very large amount of keys that added and trigger cache overflows, there is no memory leak > * @throws Exception > */ > public void testBug174CacheOverflow() throws Exception { > GeneralCacheAdministrator admin = new GeneralCacheAdministrator(); > > int cacheCapacity = 100; > int maxAddedCacheEntries = cacheCapacity*10; > String baseCacheKey= "baseKey"; > String cacheValue ="same_value"; > > admin.setCacheCapacity(cacheCapacity); > > Cache cache = admin.getCache(); > > //Add lots of different keys to trigger cache overflow > for (int keyIndex=0; keyIndex String key = baseCacheKey + keyIndex; > admin.putInCache(key, cacheValue); > } > > Assert.assertEquals("expected cache to be at its full capacity", cacheCapacity , cache.getNbEntries()); > Assert.assertTrue("expected cache overflows to have cleaned UpdateState instances. got [" + cache.getNbUpdateState() + "] updates while max is [" + cacheCapacity + "]", cache.getNbUpdateState() <= cacheCapacity); > } > > /** > * Tests that with a very large amount of keys that added and trigger cache overflows, there is no memory leak > * @throws Exception > */ > public void testBug174CacheOverflowAndUpdate() throws Exception { > GeneralCacheAdministrator admin = new GeneralCacheAdministrator(); > > int cacheCapacity = 100; > int maxAddedCacheEntries = cacheCapacity*10; > String baseCacheKey= "baseKey"; > String cacheValue ="same_value"; > > admin.setCacheCapacity(cacheCapacity); > > Cache cache = admin.getCache(); > > > //Add lots of different keys to trigger cache overflow, mixed with updates > //FIXME: we may need different threads to enter branches recovering from current update. > for (int keyIndex=0; keyIndex String key = baseCacheKey + keyIndex; > admin.putInCache(key, cacheValue); > > try { > admin.getFromCache(key, 0); > fail("expected element [" + key + "] not to be present"); > } catch (NeedsRefreshException e) { > admin.putInCache(key, cacheValue); > } > } > > Assert.assertEquals("expected cache to be at its full capacity", cacheCapacity , cache.getNbEntries()); > Assert.assertTrue("expected cache overflows to have cleaned UpdateState instances. Nb states is:" + cache.getNbUpdateState() + " expected max="+ cacheCapacity, cache.getNbUpdateState() <= cacheCapacity); > } > > > /** > * Tests that with a very large amount of keys accessed and cancelled, there is no memory leak > * @throws Exception > */ > public void testBug174CacheMissesNonBlocking() throws Exception { > testBug174CacheMisses(false); > } > > /** > * Tests that with a very large amount of keys accessed and cancelled, there is no memory leak > * @throws Exception > */ > public void testBug174CacheMissesBlocking() throws Exception { > testBug174CacheMisses(true); > } > > /** > * Tests that with a very large amount of keys accessed and cancelled, there is no memory leak > * @throws Exception > */ > public void testBug174CacheMisses(boolean block) throws Exception { > GeneralCacheAdministrator admin; > Properties p = new Properties(); > if (block) { > p.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY, "true"); > } > admin = new GeneralCacheAdministrator(p); > > int cacheCapacity = 100; > int maxAddedCacheEntries = cacheCapacity*10; > String baseCacheKey= "baseKey"; > String cacheValue ="same_value"; > > admin.setCacheCapacity(cacheCapacity); > > Cache cache = admin.getCache(); > > //Access lots of different keys to trigger cache overflow > for (int keyIndex=0; keyIndex String key = baseCacheKey + keyIndex; > try { > admin.getFromCache(key); > fail("expected element [" + key + "] not to be present"); > } catch (NeedsRefreshException e) { > admin.cancelUpdate(key); > } > } > > Assert.assertTrue("expected cache accesses to not leak past cache capacity. Nb states is:" + cache.getNbUpdateState() + " expected max="+ cacheCapacity, cache.getNbUpdateState() < cacheCapacity); > } > > > Index: src/core/test/com/opensymphony/oscache/base/TestCompleteBase.java =================================================================== RCS file: /cvs/oscache/src/core/test/com/opensymphony/oscache/base/TestCompleteBase.java,v retrieving revision 1.1 diff -r1.1 TestCompleteBase.java 53c53,54 < suite.addTest(TestFastCronParser.suite()); --- > //does not work in 1.3 > // suite.addTest(TestFastCronParser.suite()); Index: src/core/test/com/opensymphony/oscache/base/TestConcurrency.java =================================================================== RCS file: /cvs/oscache/src/core/test/com/opensymphony/oscache/base/TestConcurrency.java,v retrieving revision 1.4 diff -r1.4 TestConcurrency.java 12a13 > import java.util.BitSet; 14a16,18 > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > 22a27,28 > private static transient final Log log = LogFactory.getLog(GeneralCacheAdministrator.class); //TestConcurrency.class > 63a70,164 > * Checks whether the cache handles simultaneous attempts to access a > * stable cache entry correctly when the blocking mode is enabled. > * > * Basically N threads are concurrently trying to access a same stale cache entry and each is cancelling its update. Each thread repeat this operation M times. > * The test is sucessfull if after some time, all threads are properly released > */ > public void testConcurrentStaleGets() { > GeneralCacheAdministrator staticAdmin = admin; > admin = new GeneralCacheAdministrator(); //avoid poluting other test cases > > try { > // A test for the case where oscache.blocking = true > //admin.destroy(); > > Properties p = new Properties(); > p.setProperty(AbstractCacheAdministrator.CACHE_BLOCKING_KEY, "true"); > admin = new GeneralCacheAdministrator(p); > > assertTrue("The cache should be in blocking mode for this test.", admin.isBlocking()); > > int nbThreads = 10; > int retryByThreads = 10000; > > String key = "new"; > > //First put a value > admin.putInCache(key, VALUE); > > try { > //Then test without concurrency that it is reported as stale when time-to-live is zero > admin.getFromCache(key, 0); > fail("NeedsRefreshException should have been thrown"); > } catch (NeedsRefreshException nre) { > //Ok this is was is excpected, we can release the update > admin.cancelUpdate(key); > } > > //Then ask N threads to concurrently try to access this stale resource and each should receive a NeedsRefreshException, and cancel the update > Thread spawnedThreads [] = new Thread[nbThreads]; > BitSet successfullThreadTerminations = new BitSet(nbThreads); //Track which thread successfully terminated > for(int threadIndex=0; threadIndex GetStaleEntryAndCancelUpdate getEntry = new GetStaleEntryAndCancelUpdate(key, 0, retryByThreads, threadIndex, successfullThreadTerminations); > Thread thread = new Thread(getEntry); > spawnedThreads[threadIndex] = thread; > thread.start(); > } > > // OK, those threads should now repeatidely be blocked waiting for the new cache > // entry to appear. Wait for all of them to terminate > int maxWaitingSeconds = 100; > int maxWaitForEachThread= 5; > long waitStartTime = System.currentTimeMillis(); > > boolean atLeastOneThreadRunning = false; > > while (System.currentTimeMillis() - waitStartTime < maxWaitingSeconds *1000) { > atLeastOneThreadRunning = false; > > //Wait a bit between each step to avoid consumming all CPU and preventing other threads from running. > try { > Thread.sleep(500); > } catch (InterruptedException ie) { > } > > //check whether all threads are done. > for(int threadIndex=0; threadIndex Thread inspectedThread = spawnedThreads[threadIndex]; > try { > inspectedThread.join(maxWaitForEachThread * 1000); > } catch (InterruptedException e) { > fail("Thread #" + threadIndex + " was interrupted"); > } > if (inspectedThread.isAlive()) { > atLeastOneThreadRunning = true; > log.error("Thread #" + threadIndex + " did not complete within [" + (System.currentTimeMillis() - waitStartTime ) /1000 + "] s "); > } > } > if (! atLeastOneThreadRunning) { > break; //while loop, test success. > } > > } > > assertTrue("at least one thread did not complete within [" + (System.currentTimeMillis() - waitStartTime ) /1000 + "] s ", ! atLeastOneThreadRunning); > > for(int threadIndex=0; threadIndex assertTrue("thread [" + threadIndex + "] did not successfully complete. ", successfullThreadTerminations.get(threadIndex)); > } > } finally { > admin = staticAdmin; > //Avoid po > } > } > > /** 284a386 > assertFalse("expected NeedsRefreshException to be thrown!", expectNRE); 296a399,443 > /** > * Basically requests a stale entry, expects to receive a NeedsRefreshException, and always cancels the update. > */ > private class GetStaleEntryAndCancelUpdate implements Runnable { > String key; > int time; > int retries; > private final int threadIndex; > private final BitSet successfullThreadTerminations; > > GetStaleEntryAndCancelUpdate(String key, int time, int retries, int threadIndex, BitSet successfullThreadTerminations) { > this.key = key; > this.time = time; > this.retries = retries; > this.threadIndex = threadIndex; > this.successfullThreadTerminations = successfullThreadTerminations; > } > > public void run() { > for (int retryIndex=0; retryIndex try { > // Get from the cache > Object fromCache = admin.getFromCache(key, time); > assertNull("Thread index [" + retryIndex + "] expected stale request [" + retryIndex + "] to be received, got [" + fromCache+ "]", fromCache); > } catch (NeedsRefreshException nre) { > try { > admin.cancelUpdate(key); > } catch(Throwable t) { > log.error("Thread index [" + retryIndex + "]: Unexpectedly caught exception [" + t + "]", t); > fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]"); > } > } catch(Throwable t) { > log.error("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]", t); > fail("Thread index [" + retryIndex + "] : Unexpectedly caught exception [" + t + "]"); > } > } > > //Once we successfully terminate, we update the corresponding bit to let the Junit know we succeeded. > synchronized(successfullThreadTerminations) { > successfullThreadTerminations.set(threadIndex); > } > } > } > >