Skip to content
Snippets Groups Projects
Commit 59553677 authored by Tobias Pietzsch's avatar Tobias Pietzsch
Browse files

FetcherThreads with a queue of runnable Loaders.

parent 0477bdc0
No related branches found
No related tags found
No related merge requests found
......@@ -33,7 +33,7 @@ import bdv.cache.CacheIoTiming.IoStatistics;
import bdv.cache.CacheIoTiming.IoTimeBudget;
import bdv.cache.util.BlockingFetchQueues;
import bdv.cache.util.FetcherThreads;
import bdv.cache.util.FetcherThreads.Loader;
import bdv.cache.util.Loader;
import bdv.img.cache.VolatileGlobalCellCache;
......@@ -75,9 +75,9 @@ public final class LoadingVolatileCache< K, V extends VolatileCacheValue > imple
private final int numPriorityLevels;
private final BlockingFetchQueues< K > queue;
private final BlockingFetchQueues< Loader > queue;
private final FetcherThreads< K > fetchers;
private final FetcherThreads fetchers;
private volatile long currentQueueFrame = 0;
......@@ -96,7 +96,7 @@ public final class LoadingVolatileCache< K, V extends VolatileCacheValue > imple
{
this.numPriorityLevels = numPriorityLevels;
queue = new BlockingFetchQueues<>( numPriorityLevels );
fetchers = new FetcherThreads<>( queue, new EntryLoader(), numFetcherThreads );
fetchers = new FetcherThreads( queue, numFetcherThreads );
}
/**
......@@ -253,7 +253,7 @@ public final class LoadingVolatileCache< K, V extends VolatileCacheValue > imple
prepareNextFrame();
}
public FetcherThreads< K > getFetcherThreads()
public FetcherThreads getFetcherThreads()
{
return fetchers;
}
......@@ -299,7 +299,7 @@ public final class LoadingVolatileCache< K, V extends VolatileCacheValue > imple
if ( entry.getEnqueueFrame() < currentQueueFrame )
{
entry.setEnqueueFrame( currentQueueFrame );
queue.put( entry.getKey(), priority, enqueuToFront );
queue.put( new EntryLoader( entry.getKey() ), priority, enqueuToFront );
}
}
......@@ -343,8 +343,15 @@ public final class LoadingVolatileCache< K, V extends VolatileCacheValue > imple
* corresponding {@link Entry} from the {@link WeakSoftCache}, and and
* forwarding to {@link Entry#loadIfNotValid()}.
*/
final class EntryLoader implements Loader< K >
final class EntryLoader implements Loader
{
private final K key;
public EntryLoader( final K key )
{
this.key = key;
}
/**
* If this key's data is not yet valid, then load it. After the method
* returns, the data is guaranteed to be valid.
......@@ -353,7 +360,7 @@ public final class LoadingVolatileCache< K, V extends VolatileCacheValue > imple
* if the loading operation was interrupted.
*/
@Override
public void load( final K key ) throws InterruptedException
public void load() throws InterruptedException
{
final Entry entry = cache.get( key );
if ( entry != null )
......
......@@ -35,6 +35,8 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* TODO revise javadoc
*
* Queueing structure (intended for cache entries to be fetched). There is an
* array of {@link ArrayDeque}s, ordered by priority. Elements are
* {@link #put(Object, int, boolean)} with a priority and added to one of the
......
......@@ -36,6 +36,8 @@ import bdv.cache.LoadingVolatileCache;
import bdv.cache.VolatileCacheValue;
/**
* TODO revise javadoc
*
* A set of threads that load data. Each thread does the following in a loop:
* <ol>
* <li>Take the next {@code key} from a queue.</li>
......@@ -50,32 +52,9 @@ import bdv.cache.VolatileCacheValue;
*
* @author Tobias Pietzsch &lt;tobias.pietzsch@gmail.com&gt;
*/
public class FetcherThreads< K >
public class FetcherThreads
{
/**
* Loads data associated with a key.
*
* @param <K>
* the key type.
*/
public interface Loader< K >
{
/**
* If this key's data is not yet valid, then load it. After the method
* returns, the data is guaranteed to be valid.
* <p>
* This must be implemented in a thread-safe manner. Multiple threads
* are allowed to call this method at the same time with the same key.
* The expected behaviour is that the data is loaded only once and the
* result is made visible on all threads.
*
* @throws InterruptedException
* if the loading operation was interrupted.
*/
public void load( K key ) throws InterruptedException;
}
private final ArrayList< Fetcher< K > > fetchers;
private final ArrayList< Fetcher > fetchers;
/**
* Create (and start) a set of fetcher threads.
......@@ -87,11 +66,10 @@ public class FetcherThreads< K >
* @param numFetcherThreads how many parallel fetcher threads to start.
*/
public FetcherThreads(
final BlockingFetchQueues< K > queue,
final Loader< K > loader,
final BlockingFetchQueues< Loader > queue,
final int numFetcherThreads )
{
this( queue, loader, numFetcherThreads, i -> String.format( "Fetcher-%d", i ) );
this( queue, numFetcherThreads, i -> String.format( "Fetcher-%d", i ) );
}
/**
......@@ -103,15 +81,14 @@ public class FetcherThreads< K >
* @param threadIndexToName a function for naming fetcher threads (takes an index and returns a name).
*/
public FetcherThreads(
final BlockingFetchQueues< K > queue,
final Loader< K > loader,
final BlockingFetchQueues< Loader > queue,
final int numFetcherThreads,
final IntFunction< String > threadIndexToName )
{
fetchers = new ArrayList<>( numFetcherThreads );
for ( int i = 0; i < numFetcherThreads; ++i )
{
final Fetcher< K > f = new Fetcher<>( queue, loader );
final Fetcher f = new Fetcher( queue );
f.setDaemon( true );
f.setName( threadIndexToName.apply( i ) );
fetchers.add( f );
......@@ -133,7 +110,7 @@ public class FetcherThreads< K >
*/
public void pauseFetcherThreadsUntil( final long timeMillis )
{
for ( final Fetcher< K > f : fetchers )
for ( final Fetcher f : fetchers )
f.pauseUntil( timeMillis );
}
......@@ -144,38 +121,33 @@ public class FetcherThreads< K >
*/
public void wakeFetcherThreads()
{
for ( final Fetcher< K > f : fetchers )
for ( final Fetcher f : fetchers )
f.wakeUp();
}
static final class Fetcher< K > extends Thread
static final class Fetcher extends Thread
{
private final BlockingFetchQueues< K > queue;
private final Loader< K > loader;
private final BlockingFetchQueues< Loader > queue;
private final Object lock = new Object();
private volatile long pauseUntilTimeMillis = 0;
public Fetcher(
final BlockingFetchQueues< K > queue,
final Loader< K > loader )
public Fetcher( final BlockingFetchQueues< Loader > queue )
{
this.queue = queue;
this.loader = loader;
}
@Override
public final void run()
{
K key = null;
Loader loader = null;
while ( true )
{
while ( key == null )
while ( loader == null )
try
{
key = queue.take();
loader = queue.take();
}
catch ( final InterruptedException e )
{}
......@@ -195,8 +167,8 @@ public class FetcherThreads< K >
}
try
{
loader.load( key );
key = null;
loader.load();
loader = null;
}
catch ( final InterruptedException e )
{}
......
package bdv.cache.util;
/**
* Loads something.
*
* @author Tobias Pietzsch &lt;tobias.pietzsch@gmail.com&gt;
*/
public interface Loader
{
/**
* Load something.
*
* @throws InterruptedException
* if the loading operation was interrupted.
*/
public void load() throws InterruptedException;
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment