Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
B
BigDataViewer_Core_Extension
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
BioinformaticDataCompression
BigDataViewer_Core_Extension
Commits
97331faf
Commit
97331faf
authored
Feb 4, 2017
by
Tobias Pietzsch
Browse files
Options
Downloads
Patches
Plain Diff
revised WeakRefVolatileCache, not exactly with the verified code, but...
parent
5748b2e3
Branches
Branches containing commit
No related tags found
No related merge requests found
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
src/main/java/bdv/cache/revised/WeakRefVolatileCache.java
+230
-159
230 additions, 159 deletions
src/main/java/bdv/cache/revised/WeakRefVolatileCache.java
with
230 additions
and
159 deletions
src/main/java/bdv/cache/revised/WeakRefVolatileCache.java
+
230
−
159
View file @
97331faf
...
...
@@ -8,7 +8,6 @@ import java.util.concurrent.ConcurrentHashMap;
import
java.util.concurrent.ExecutionException
;
import
bdv.cache.CacheHints
;
import
bdv.cache.LoadingStrategy
;
import
bdv.cache.VolatileCacheValue
;
import
bdv.cache.iotiming.CacheIoTiming
;
import
bdv.cache.iotiming.IoStatistics
;
...
...
@@ -45,40 +44,33 @@ public class WeakRefVolatileCache< K, V extends VolatileCacheValue > implements
public
void
clean
()
{
map
.
remove
(
entry
.
getKey
(),
entry
);
entry
.
clean
(
this
);
}
}
static
final
int
NOTLOADED
=
0
;
static
final
int
INVALID
=
1
;
static
final
int
VALID
=
2
;
final
class
Entry
{
private
final
K
key
;
private
WeakReference
<
V
>
ref
;
private
long
enqueueFrame
;
final
K
key
;
private
VolatileLoader
<
?
extends
V
>
loader
;
int
loaded
;
public
Entry
(
final
K
key
,
final
V
value
)
{
assert
(
value
.
isValid
()
);
this
.
key
=
key
;
this
.
loader
=
null
;
ref
=
new
CacheWeakReference
(
value
,
this
);
enqueueFrame
=
-
1
;
}
long
enqueueFrame
;
VolatileLoader
<
?
extends
V
>
loader
;
public
Entry
(
final
K
key
,
final
VolatileLoader
<
?
extends
V
>
loader
)
{
this
.
key
=
key
;
this
.
loader
=
loader
;
ref
=
new
CacheWeakReference
(
loader
.
createInvalid
(),
this
);
enqueueFrame
=
-
1
;
}
public
K
getKey
()
{
return
key
;
this
.
ref
=
new
WeakReference
<>(
null
);
this
.
loaded
=
NOTLOADED
;
this
.
enqueueFrame
=
-
1
;
}
public
V
getValue
()
...
...
@@ -86,100 +78,91 @@ public class WeakRefVolatileCache< K, V extends VolatileCacheValue > implements
return
ref
.
get
();
}
public
void
load
()
throws
ExecutionException
public
void
setInvalid
(
final
V
value
)
{
/*
* TODO: the assumption for following synchronization pattern is
* that value.isValid() will never go from true to false. When
* invalidation API is added, that might change.
*/
final
V
oldValue
=
ref
.
get
();
if
(
oldValue
!=
null
&&
!
oldValue
.
isValid
()
)
{
synchronized
(
this
)
{
V
value
=
ref
.
get
();
if
(
value
==
oldValue
)
{
value
=
backingCache
.
get
(
key
,
loader
);
loaded
=
INVALID
;
ref
=
new
CacheWeakReference
(
value
,
this
);
loader
=
null
;
enqueueFrame
=
Long
.
MAX_VALUE
;
notifyAll
();
}
}
}
}
public
long
getEnqueueFrame
(
)
public
void
setValid
(
final
V
value
)
{
return
enqueueFrame
;
loaded
=
VALID
;
ref
=
new
CacheWeakReference
(
value
,
this
);
loader
=
null
;
enqueueFrame
=
Long
.
MAX_VALUE
;
}
public
void
setEnqueueFrame
(
final
long
f
)
public
void
clean
(
final
CacheWeakReference
re
f
)
{
enqueueFrame
=
f
;
if
(
ref
==
this
.
ref
)
map
.
remove
(
key
,
this
);
}
}
@Override
public
V
getIfPresent
(
final
Object
key
,
final
CacheHints
cacheH
ints
)
throws
ExecutionException
public
V
getIfPresent
(
final
Object
key
,
final
CacheHints
h
ints
)
throws
ExecutionException
{
final
Entry
entry
=
map
.
get
(
key
);
if
(
entry
==
null
)
return
null
;
final
V
value
=
entry
.
getValue
();
if
(
value
==
null
)
return
null
;
if
(
!
value
.
isValid
()
)
loadEntryWithCacheHints
(
entry
,
cacheHints
);
final
V
v
=
entry
.
getValue
();
if
(
v
!=
null
&&
v
.
isValid
()
)
return
v
;
return
entry
.
getValue
();
switch
(
hints
.
getLoadingStrategy
()
)
{
case
BLOCKING:
return
getBlocking
(
entry
);
case
BUDGETED:
final
int
priority
=
hints
.
getQueuePriority
();
final
IoStatistics
stats
=
CacheIoTiming
.
getIoStatistics
();
final
IoTimeBudget
budget
=
stats
.
getIoTimeBudget
();
final
long
timeLeft
=
budget
.
timeLeft
(
priority
);
if
(
timeLeft
>
0
)
return
getBudgeted
(
entry
,
hints
);
case
VOLATILE:
enqueue
(
entry
,
hints
);
case
DONTLOAD:
default
:
return
v
;
}
}
@Override
public
V
get
(
final
K
key
,
final
VolatileLoader
<
?
extends
V
>
loader
,
final
CacheHints
cacheH
ints
)
throws
ExecutionException
public
V
get
(
final
K
key
,
final
VolatileLoader
<
?
extends
V
>
loader
,
final
CacheHints
h
ints
)
throws
ExecutionException
{
/*
* Get existing entry for key or create it with an empty (invalid)
* value.
* Get existing entry for key or create it.
*/
final
Entry
entry
=
map
.
computeIfAbsent
(
key
,
(
k
)
->
{
final
V
value
=
backingCache
.
getIfPresent
(
k
);
if
(
value
!=
null
)
return
new
Entry
(
k
,
value
);
else
return
new
Entry
(
k
,
loader
);
}
);
final
Entry
entry
=
map
.
computeIfAbsent
(
key
,
k
->
new
Entry
(
key
,
loader
)
);
V
value
=
entry
.
getValue
();
if
(
value
==
null
)
{
/*
* The value has been garbage collected. We need to create a new
* entry.
*/
map
.
remove
(
key
,
entry
);
return
get
(
key
,
loader
,
cacheHints
);
}
V
v
=
entry
.
getValue
();
if
(
v
!=
null
&&
v
.
isValid
()
)
return
v
;
/*
* Entry and value exist. If the value is invalid, try to load it. While
* we do that, the value cannot be garbage collected because we hold a
* strong reference and consequently the entry cannot be removed from the
* map.
*/
if
(
!
value
.
isValid
()
)
switch
(
hints
.
getLoadingStrategy
()
)
{
loadEntryWithCacheHints
(
entry
,
cacheHints
);
value
=
entry
.
getValue
();
cleanUp
(
50
);
case
BLOCKING:
v
=
getBlocking
(
entry
);
break
;
case
BUDGETED:
v
=
getBudgeted
(
entry
,
hints
);
break
;
case
VOLATILE:
v
=
getVolatile
(
entry
,
hints
);
break
;
case
DONTLOAD:
v
=
getDontLoad
(
entry
);
break
;
}
return
value
;
cleanUp
(
10
);
if
(
v
==
null
)
return
get
(
key
,
loader
,
hints
);
else
return
v
;
}
/**
...
...
@@ -211,63 +194,129 @@ public class WeakRefVolatileCache< K, V extends VolatileCacheValue > implements
throw
new
UnsupportedOperationException
(
"not implemented yet"
);
}
/**
* {@link Callable} to put into the fetch queue. Loads data for a specific key.
*/
final
class
FetchEntry
implements
Callable
<
Void
>
{
private
final
K
key
;
public
FetchEntry
(
final
K
key
)
{
this
.
key
=
key
;
}
/**
* If this key's data is not yet valid, then load it. After the method
* returns, the data is guaranteed to be valid.
*
* @throws InterruptedException
* if the loading operation was interrupted.
*/
@Override
public
Void
call
()
throws
Exception
{
load
(
key
);
return
null
;
}
}
void
load
(
final
K
key
)
throws
ExecutionException
{
final
Entry
entry
=
map
.
get
(
key
);
if
(
entry
!=
null
)
getBlocking
(
entry
);
}
// ================ private methods =====================
/**
* Load or enqueue the specified {@link Entry}, depending on the
* {@link LoadingStrategy} given in {@code cacheHints}.
*
* @param entry
* @param currentValue
* @param cacheHints
* @throws ExecutionException
*/
private
void
loadEntryWithCacheHints
(
final
Entry
entry
,
final
CacheHints
cacheHints
)
throws
ExecutionException
private
V
getDontLoad
(
final
Entry
entry
)
{
s
witch
(
cacheHints
.
getLoadingStrategy
()
)
s
ynchronized
(
entry
)
{
case
VOLATILE:
default
:
enqueueEntry
(
entry
,
cacheHints
);
break
;
case
BLOCKING:
entry
.
load
();
break
;
case
BUDGETED:
loadOrEnqueue
(
entry
,
cacheHints
);
break
;
case
DONTLOAD:
break
;
V
v
=
entry
.
getValue
();
if
(
v
!=
null
)
return
v
;
if
(
entry
.
loaded
!=
NOTLOADED
)
{
map
.
remove
(
entry
.
key
,
entry
);
return
null
;
}
final
V
vl
=
backingCache
.
getIfPresent
(
entry
.
key
);
if
(
vl
!=
null
)
{
entry
.
setValid
(
vl
);
return
vl
;
}
/**
* Load the data for the {@link Entry} if it is not yet loaded (valid) and
* there is enough {@link IoTimeBudget} left. Otherwise, enqueue the
* {@link Entry} if it hasn't been enqueued for this frame already.
*/
private
void
loadOrEnqueue
(
final
Entry
entry
,
final
CacheHints
cacheHints
)
v
=
entry
.
loader
.
createInvalid
();
entry
.
setInvalid
(
v
);
return
v
;
}
}
private
V
getVolatile
(
final
Entry
entry
,
final
CacheHints
hints
)
{
synchronized
(
entry
)
{
V
v
=
entry
.
getValue
();
if
(
v
==
null
&&
entry
.
loaded
!=
NOTLOADED
)
{
map
.
remove
(
entry
.
key
,
entry
);
return
null
;
}
if
(
entry
.
loaded
==
VALID
)
// v.isValid()
return
v
;
final
V
vl
=
backingCache
.
getIfPresent
(
entry
.
key
);
if
(
vl
!=
null
)
{
final
int
priority
=
cacheHints
.
getQueuePriority
();
entry
.
setValid
(
vl
);
return
vl
;
}
if
(
entry
.
loaded
==
NOTLOADED
)
{
v
=
entry
.
loader
.
createInvalid
();
entry
.
setInvalid
(
v
);
}
enqueue
(
entry
,
hints
);
return
v
;
}
}
private
V
getBudgeted
(
final
Entry
entry
,
final
CacheHints
hints
)
{
synchronized
(
entry
)
{
V
v
=
entry
.
getValue
();
if
(
v
==
null
&&
entry
.
loaded
!=
NOTLOADED
)
{
map
.
remove
(
entry
.
key
,
entry
);
return
null
;
}
if
(
entry
.
loaded
==
VALID
)
// v.isValid()
return
v
;
enqueue
(
entry
,
hints
);
final
int
priority
=
hints
.
getQueuePriority
();
final
IoStatistics
stats
=
CacheIoTiming
.
getIoStatistics
();
final
IoTimeBudget
budget
=
stats
.
getIoTimeBudget
();
final
long
timeLeft
=
budget
.
timeLeft
(
priority
);
if
(
timeLeft
>
0
)
{
synchronized
(
entry
)
{
if
(
entry
.
getValue
().
isValid
()
)
return
;
enqueueEntry
(
entry
,
cacheHints
);
final
long
t0
=
stats
.
getIoNanoTime
();
stats
.
start
();
try
{
entry
.
wait
(
timeLeft
/
1000000
l
,
1
);
// releases and re-acquires entry lock
}
catch
(
final
InterruptedException
e
)
{}
...
...
@@ -275,51 +324,73 @@ public class WeakRefVolatileCache< K, V extends VolatileCacheValue > implements
final
long
t
=
stats
.
getIoNanoTime
()
-
t0
;
budget
.
use
(
t
,
priority
);
}
v
=
entry
.
getValue
();
if
(
v
==
null
)
{
if
(
entry
.
loaded
==
NOTLOADED
)
{
v
=
entry
.
loader
.
createInvalid
();
entry
.
setInvalid
(
v
);
return
v
;
}
else
enqueueEntry
(
entry
,
cacheHints
);
{
map
.
remove
(
entry
.
key
,
entry
);
return
null
;
}
}
return
v
;
}
}
/**
* Enqueue the {@link Entry} if it hasn't been enqueued for this frame
* already.
*/
private
void
enqueueEntry
(
final
Entry
entry
,
final
CacheHints
cacheHints
)
private
V
getBlocking
(
final
Entry
entry
)
throws
ExecutionException
{
final
long
currentQueueFrame
=
fetchQueue
.
getCurrentFrame
()
;
if
(
entry
.
getEnqueueFrame
()
<
currentQueueFrame
)
VolatileLoader
<
?
extends
V
>
loader
;
synchronized
(
entry
)
{
entry
.
setEnqueueFrame
(
currentQueueFrame
);
fetchQueue
.
put
(
new
FetchEntry
(
entry
.
getKey
()
),
cacheHints
.
getQueuePriority
(),
cacheHints
.
isEnqueuToFront
()
);
}
final
V
v
=
entry
.
getValue
();
if
(
v
==
null
&&
entry
.
loaded
!=
NOTLOADED
)
{
map
.
remove
(
entry
.
key
,
entry
);
return
null
;
}
/**
* {@link Callable} to put into the fetch queue. Loads data for a specific key.
*/
final
class
FetchEntry
implements
Callable
<
Void
>
{
private
final
K
key
;
if
(
entry
.
loaded
==
VALID
)
// v.isValid()
return
v
;
public
FetchEntry
(
final
K
key
)
loader
=
entry
.
loader
;
}
final
V
vl
=
backingCache
.
get
(
entry
.
key
,
loader
);
synchronized
(
entry
)
{
this
.
key
=
key
;
final
V
v
=
entry
.
getValue
();
if
(
v
==
null
&&
entry
.
loaded
!=
NOTLOADED
)
{
map
.
remove
(
entry
.
key
,
entry
);
return
null
;
}
if
(
entry
.
loaded
==
VALID
)
// v.isValid()
return
v
;
// entry.loaded == INVALID
entry
.
setValid
(
vl
);
return
vl
;
}
}
/**
* If this key's data is not yet valid, then load it. After the method
* returns, the data is guaranteed to be valid.
*
* @throws InterruptedException
* if the loading operation was interrupted.
* Enqueue the {@link Entry} if it hasn't been enqueued for this frame
* already.
*/
@Override
public
Void
call
()
throws
Exception
private
void
enqueue
(
final
Entry
entry
,
final
CacheHints
hints
)
{
final
Entry
entry
=
map
.
get
(
key
);
if
(
entry
!=
null
)
entry
.
load
();
return
null
;
final
long
currentQueueFrame
=
fetchQueue
.
getCurrentFrame
();
if
(
entry
.
enqueueFrame
<
currentQueueFrame
)
{
entry
.
enqueueFrame
=
currentQueueFrame
;
fetchQueue
.
put
(
new
FetchEntry
(
entry
.
key
),
hints
.
getQueuePriority
(),
hints
.
isEnqueuToFront
()
);
}
}
}
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment