Newer
Older
HongKee Moon
committed
package bdv.server;
import bdv.BigDataViewer;
import bdv.img.cache.VolatileGlobalCellCache;
import bdv.img.hdf5.Hdf5ImageLoader;
import bdv.img.hdf5.Hdf5VolatileShortArrayLoader;
import bdv.img.remote.AffineTransform3DJsonSerializer;
import bdv.img.remote.RemoteImageLoader;
import bdv.img.remote.RemoteImageLoaderMetaData;
import bdv.spimdata.SequenceDescriptionMinimal;
import bdv.spimdata.SpimDataMinimal;
import bdv.spimdata.XmlIoSpimDataMinimal;
import bdv.util.ThumbnailGenerator;
import cz.it4i.qcmp.cache.ICacheFile;
import cz.it4i.qcmp.cache.QuantizationCacheManager;
import cz.it4i.qcmp.compression.CompressionOptions;
import cz.it4i.qcmp.compression.ImageCompressor;
import cz.it4i.qcmp.data.V3i;
import cz.it4i.qcmp.io.*;
import cz.it4i.qcmp.utilities.Stopwatch;
import mpicbg.spim.data.SpimDataException;
import mpicbg.spim.data.generic.sequence.ImgLoaderHints;
import net.imglib2.cache.CacheLoader;
import net.imglib2.cache.LoaderCache;
import net.imglib2.cache.ref.SoftRefLoaderCache;
import net.imglib2.img.array.ArrayImg;
import net.imglib2.img.basictypeaccess.volatiles.array.VolatileShortArray;
import net.imglib2.img.cell.Cell;
import net.imglib2.realtransform.AffineTransform3D;
import net.imglib2.type.numeric.integer.UnsignedShortType;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.log.Log;
import org.jdom2.Document;
import org.jdom2.JDOMException;
import org.jdom2.input.SAXBuilder;
import org.jdom2.output.Format;
import org.jdom2.output.XMLOutputter;
import javax.imageio.ImageIO;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.awt.image.BufferedImage;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
HongKee Moon
committed
public class CellHandler extends ContextHandler {
private static final org.eclipse.jetty.util.log.Logger LOG = Log.getLogger(CellHandler.class);
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
/**
* Key for a cell identified by timepoint, setup, level, and index
* (flattened spatial coordinate).
*/
public static class Key {
private final int timepoint;
private final int setup;
private final int level;
private final long index;
private final String[] parts;
/**
* Create a Key for the specified cell. Note that {@code cellDims} and
* {@code cellMin} are not used for {@code hashcode()/equals()}.
*
* @param timepoint timepoint coordinate of the cell
* @param setup setup coordinate of the cell
* @param level level coordinate of the cell
* @param index index of the cell (flattened spatial coordinate of the
* cell)
*/
public Key(final int timepoint, final int setup, final int level, final long index, final String[] parts) {
this.timepoint = timepoint;
this.setup = setup;
this.level = level;
this.index = index;
this.parts = parts;
int value = Long.hashCode(index);
value = 31 * value + level;
value = 31 * value + setup;
value = 31 * value + timepoint;
hashcode = value;
}
@Override
public boolean equals(final Object other) {
if (this == other)
return true;
if (!(other instanceof VolatileGlobalCellCache.Key))
return false;
final Key that = (Key) other;
return (this.index == that.index) && (this.timepoint == that.timepoint) && (this.setup == that.setup) && (this.level == that.level);
}
final int hashcode;
@Override
public int hashCode() {
return hashcode;
}
}
private final CacheLoader<Key, Cell<?>> loader;
private final LoaderCache<Key, Cell<?>> cache;
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/**
* Full path of the dataset xml file this {@link CellHandler} is serving.
*/
private final String xmlFilename;
/**
* Full path of the dataset xml file this {@link CellHandler} is serving,
* without the ".xml" suffix.
*/
private final String baseFilename;
private final String dataSetURL;
/**
* Cached dataset XML to be send to and opened by {@link BigDataViewer}
* clients.
*/
private final String datasetXmlString;
/**
* Cached JSON representation of the {@link RemoteImageLoaderMetaData} to be
* send to clients.
*/
private final String metadataJson;
/**
* Cached dataset.settings XML to be send to clients. May be null if no
* settings file exists for the dataset.
*/
private final String settingsXmlString;
/**
* Full path to thumbnail png.
*/
private final String thumbnailFilename;
/**
* Compression stuff.
*/
private final BigDataServer.ExtendedCompressionOptions compressionParams;
private boolean compressionSupport = false;
private ArrayList<ICacheFile> cachedCodebooks = null;
private HashMap<Integer, ImageCompressor> compressors = null;
private ImageCompressor lowestResCompressor = null;
private Stack<MemoryOutputStream> cachedBuffers = null;
private final int INITIAL_BUFFER_SIZE = 2048;
private final AtomicInteger compressedAccumulation = new AtomicInteger(0);
private final AtomicInteger uncompressedAccumulation = new AtomicInteger(0);
private final AtomicInteger qcmpCellResponseCount = new AtomicInteger(0);
private final AtomicLong compressionTimeAccumulation = new AtomicLong(0);
public CellHandler(final String baseUrl,
final String xmlFilename,
final String datasetName,
final String thumbnailsDirectory,
final BigDataServer.ExtendedCompressionOptions compressionOps) throws SpimDataException, IOException {
final XmlIoSpimDataMinimal io = new XmlIoSpimDataMinimal();
final SpimDataMinimal spimData = io.load(xmlFilename);
final SequenceDescriptionMinimal seq = spimData.getSequenceDescription();
final Hdf5ImageLoader imgLoader = (Hdf5ImageLoader) seq.getImgLoader();
this.compressionParams = compressionOps;
final Hdf5VolatileShortArrayLoader cacheArrayLoader = imgLoader.getShortArrayLoader();
loader = key -> {
final int[] cellDims = new int[]{
Integer.parseInt(key.parts[5]),
Integer.parseInt(key.parts[6]),
Integer.parseInt(key.parts[7])};
final long[] cellMin = new long[]{
Long.parseLong(key.parts[8]),
Long.parseLong(key.parts[9]),
Long.parseLong(key.parts[10])};
return new Cell<>(cellDims, cellMin, cacheArrayLoader.loadArray(key.timepoint, key.setup, key.level, cellDims, cellMin));
};
cache = new SoftRefLoaderCache<>();
// dataSetURL property is used for providing the XML file by replace
// SequenceDescription>ImageLoader>baseUrl
this.xmlFilename = xmlFilename;
baseFilename = xmlFilename.endsWith(".xml") ? xmlFilename.substring(0, xmlFilename.length() - ".xml".length()) : xmlFilename;
dataSetURL = baseUrl;
datasetXmlString = buildRemoteDatasetXML(io, spimData, baseUrl);
metadataJson = buildMetadataJsonString(imgLoader, seq);
settingsXmlString = buildSettingsXML(baseFilename);
thumbnailFilename = createThumbnail(spimData, baseFilename, datasetName, thumbnailsDirectory);
final int numberOfMipmapLevels = imgLoader.getSetupImgLoader(0).numMipmapLevels();
initializeCompression(numberOfMipmapLevels, imgLoader);
}
private ImageCompressor getCompressorForMipmapLevel(final int mipmapLevel) {
assert (compressors != null && !compressors.isEmpty());
if (compressors.containsKey(mipmapLevel)) {
return compressors.get(mipmapLevel);
}
return lowestResCompressor;
private void initializeCompression(final int numberOfMipmapLevels, final Hdf5ImageLoader hdf5ImageLoader) {
if (compressionParams == null)
return;
this.compressionParams.setInputDataInfo(new FileInputData(this.baseFilename));
final QuantizationCacheManager qcm = new QuantizationCacheManager(compressionParams.getCodebookCacheFolder());
cachedCodebooks = qcm.loadAvailableCacheFiles(compressionParams);
if (cachedCodebooks.isEmpty()) {
if (compressionParams.isCodebookTrainingEnabled()) {
// NOTE(Moravec): Train all possible codebooks from |L| 2 to 256.
if (!trainCompressionCodebooks(compressionParams, hdf5ImageLoader)) {
LOG.warn("Failed to train compression codebooks.");
return;
}
cachedCodebooks = qcm.loadAvailableCacheFiles(compressionParams);
if (cachedCodebooks.isEmpty()) {
LOG.warn("Failed to train codebooks. Look above for errors.");
assert false; // For debug purposes.
return;
}
} else {
LOG.warn("Didn't find any cached codebooks for " + this.baseFilename + " and codebook training is not enabled. " +
"Enable with 'tcb' after xml file path.");
LOG.info(String.format("Found %d codebooks for %s.", cachedCodebooks.size(), this.baseFilename));
final int numberOfCompressors = Math.min((numberOfMipmapLevels - compressionParams.getCompressFromMipmapLevel()),
cachedCodebooks.size());
cachedCodebooks.sort(Comparator.comparingInt(obj -> obj.getHeader().getBitsPerCodebookIndex()));
compressors = new HashMap<>(numberOfCompressors);
for (int compressorIndex = 0; compressorIndex < numberOfCompressors; compressorIndex++) {
final ICacheFile levelCacheFile = cachedCodebooks.get((cachedCodebooks.size() - 1) - compressorIndex);
final int bitsPerCodebookIndex = levelCacheFile.getHeader().getBitsPerCodebookIndex();
final CompressionOptions compressorOptions = compressionParams.createClone();
assert (compressorOptions != compressionParams);
compressorOptions.setBitsPerCodebookIndex(bitsPerCodebookIndex);
final ImageCompressor compressor = new ImageCompressor(compressorOptions, levelCacheFile);
// compressor.allowKdTreeVectorLookup();
final int actualKey = compressorIndex + compressionParams.getCompressFromMipmapLevel();
compressors.put(actualKey, compressor);
LOG.info(String.format(" Loaded codebook of size %d for mipmap level %d. '%s'",
levelCacheFile.getHeader().getCodebookSize(),
actualKey,
levelCacheFile.klass()));
lowestResCompressor = compressor;
}
final int initialCompressionCacheSize = 10;
cachedBuffers = new Stack<>();
for (int i = 0; i < initialCompressionCacheSize; i++) {
cachedBuffers.push(new MemoryOutputStream(INITIAL_BUFFER_SIZE));
}
compressionSupport = true;
}
public boolean isCompressionSupported() {
return compressionSupport;
private boolean trainCompressionCodebooks(final BigDataServer.ExtendedCompressionOptions compressionOptions,
final Hdf5ImageLoader hdf5ImageLoader) {
final ArrayImg<?, ?> arrImg = (ArrayImg<?, ?>) hdf5ImageLoader.getSetupImgLoader(0).getImage(0, 0, ImgLoaderHints.LOAD_COMPLETELY);
assert (arrImg.numDimensions() == 3) : "arrImg.numDimensions() != 3";
assert (compressionOptions.getInputDataInfo().getCacheFileName().equals(baseFilename));
final CallbackInputData cid = new CallbackInputData((x, y, z) ->
{
return ((UnsignedShortType) arrImg.getAt(x, y, z)).getInteger();
},
compressionOptions.getInputDataInfo().getDimensions(),
baseFilename);
cid.setDimension(new V3i((int) arrImg.dimension(0), (int) arrImg.dimension(1), (int) arrImg.dimension(2)));
final InputData originalInputData = compressionOptions.getInputDataInfo();
final boolean originalVerbose = compressionOptions.isVerbose();
compressionOptions.setVerbose(true);
compressionOptions.setInputDataInfo(cid);
final ImageCompressor trainingCompressor = new ImageCompressor(compressionOptions);
final boolean result = trainingCompressor.trainAndSaveAllCodebooks();
compressionOptions.setInputDataInfo(originalInputData);
compressionOptions.setVerbose(originalVerbose);
return result;
}
private synchronized MemoryOutputStream getCachedCompressionBuffer() {
if (!cachedBuffers.empty()) {
return cachedBuffers.pop();
} else {
return new MemoryOutputStream(INITIAL_BUFFER_SIZE);
}
}
private synchronized void returnBufferForReuse(final MemoryOutputStream buffer) {
buffer.reset();
cachedBuffers.push(buffer);
private FlatBufferInputData createInputDataObject(final short[] data, final int[] cellDims) {
return new FlatBufferInputData(data, new V3i(cellDims[0], cellDims[1], cellDims[2]), InputData.PixelType.Gray16, this.baseFilename);
}
private void respondWithShortArray(final HttpServletResponse response, final short[] data) throws IOException {
final OutputStream responseStream = response.getOutputStream();
final byte[] buf = new byte[2 * data.length];
for (int i = 0, j = 0; i < data.length; i++) {
final short s = data[i];
buf[j++] = (byte) ((s >> 8) & 0xff);
buf[j++] = (byte) (s & 0xff);
}
response.setContentLength(buf.length);
responseStream.write(buf);
responseStream.close();
}
private short[] getCachedVolatileCellData(final String[] parts, final int level) {
final int index = Integer.parseInt(parts[1]);
final int timepoint = Integer.parseInt(parts[2]);
final int setup = Integer.parseInt(parts[3]);
final Key key = new Key(timepoint, setup, level, index, parts);
short[] data;
try {
final Cell<?> cell = cache.get(key, loader);
data = ((VolatileShortArray) cell.getData()).getCurrentStorageArray();
} catch (final ExecutionException e) {
data = new short[0];
}
return data;
}
public void doHandle(final String target,
final Request baseRequest,
final HttpServletRequest request,
final HttpServletResponse response) throws IOException {
if (target.equals("/settings")) {
if (settingsXmlString != null)
respondWithString(baseRequest, response, "application/xml", settingsXmlString);
return;
}
if (target.equals("/png")) {
provideThumbnail(baseRequest, response);
return;
}
final String cellString = request.getParameter("p");
if (cellString == null) {
respondWithString(baseRequest, response, "application/xml", datasetXmlString);
return;
}
final String[] parts = cellString.split("/");
if (parts[0].equals("cell")) {
final int level = Integer.parseInt(parts[4]);
final short[] data = getCachedVolatileCellData(parts, level);
respondWithShortArray(response, data);
response.setContentType("application/octet-stream");
response.setStatus(HttpServletResponse.SC_OK);
baseRequest.setHandled(true);
} else if (parts[0].equals("cell_qcmp")) {
final int mipmapLevel = Integer.parseInt(parts[4]);
final int[] cellDims = new int[]{Integer.parseInt(parts[5]), Integer.parseInt(parts[6]), Integer.parseInt(parts[7])};
final short[] data = getCachedVolatileCellData(parts, mipmapLevel);
assert (compressors != null && !compressors.isEmpty());
final FlatBufferInputData inputData = createInputDataObject(data, cellDims);
final MemoryOutputStream cellCompressionStream = getCachedCompressionBuffer();
final Stopwatch compressionStopwatch = Stopwatch.startNew();
final int compressedContentLength = getCompressorForMipmapLevel(mipmapLevel).streamCompressChunk(cellCompressionStream,
inputData);
compressionStopwatch.stop();
compressionTimeAccumulation.addAndGet(compressionStopwatch.getElapsedInUnit(TimeUnit.NANOSECONDS));
qcmpCellResponseCount.incrementAndGet();
response.setContentLength(compressedContentLength);
try (final OutputStream responseStream = response.getOutputStream()) {
responseStream.write(cellCompressionStream.getBuffer(), 0, cellCompressionStream.getCurrentBufferLength());
assert (cellCompressionStream.getCurrentBufferLength() == compressedContentLength) :
"compressor.streamCompressChunk() is not equal to cachedCompressionStream.getCurrentBufferLength()";
if (cellCompressionStream.getCurrentBufferLength() != compressedContentLength) {
System.err.printf("stream size\t%d\nreported size\t%d\n\n",
cellCompressionStream.getCurrentBufferLength(),
compressedContentLength);
}
response.setContentType("application/octet-stream");
response.setStatus(HttpServletResponse.SC_OK);
baseRequest.setHandled(true);
returnBufferForReuse(cellCompressionStream);
final long currentlySent = compressedAccumulation.addAndGet(compressedContentLength);
final long uncompressedWouldSent = uncompressedAccumulation.addAndGet(data.length * 2);
LOG.info(String.format("Sending %dB instead of %dB. Currently sent %dB instead of %dB.",
compressedContentLength,
(data.length * 2),
currentlySent,
} else if (parts[0].equals("init")) {
respondWithString(baseRequest, response, "application/json", metadataJson);
} else if (parts[0].equals("init_qcmp")) {
respondWithCompressionInfo(baseRequest, response);
} else if (parts[0].equals("qcmp_summary")) {
respondWithCompressionSummary(baseRequest, response);
private void respondWithCompressionSummary(final Request baseRequest, final HttpServletResponse response) throws IOException {
final long currentlySent = compressedAccumulation.get();
final long uncompressedWouldSent = uncompressedAccumulation.get();
final int qcmpRequestCount = qcmpCellResponseCount.get();
final long accumulatedNs = compressionTimeAccumulation.get();
final long totalRequestTimeMs = TimeUnit.MILLISECONDS.convert(accumulatedNs, TimeUnit.NANOSECONDS);
final long averageTimePerRequestMs = (long) ((double) totalRequestTimeMs / (double) qcmpRequestCount);
final double sentKB = ((double) currentlySent / 1000.0);
final double sentMB = ((double) currentlySent / 1000.0) / 1000.0;
final double wouldSentKB = ((double) uncompressedWouldSent / 1000.0);
final double wouldSentMB = ((double) uncompressedWouldSent / 1000.0) / 1000.0;
final double percentage = (double) currentlySent / (double) uncompressedWouldSent;
final String msg = String.format("Currently sent %d B (%.1f KB, %.1f MB) instead of %d B (%.1f KB, %.1f MB).\nPercentage: %.3f\n" +
"Total compression time: %d ms, Average time per request: %d ms",
currentlySent, sentKB, sentMB, uncompressedWouldSent, wouldSentKB, wouldSentMB, percentage,
totalRequestTimeMs, averageTimePerRequestMs);
respondWithString(baseRequest, response, "text/plain",
HttpServletResponse.SC_OK);
}
private void provideThumbnail(final Request baseRequest, final HttpServletResponse response) throws IOException {
final Path path = Paths.get(thumbnailFilename);
if (Files.exists(path)) {
final byte[] imageData = Files.readAllBytes(path);
if (imageData != null) {
response.setContentType("image/png");
response.setContentLength(imageData.length);
response.setStatus(HttpServletResponse.SC_OK);
baseRequest.setHandled(true);
final OutputStream os = response.getOutputStream();
os.write(imageData);
os.close();
}
}
}
private void respondWithCompressionInfo(final Request baseRequest, final HttpServletResponse response) throws IOException {
if (cachedCodebooks == null || cachedCodebooks.isEmpty()) {
LOG.info("QCMP initialization request was refused, QCMP compression is not enabled.");
respondWithString(baseRequest, response,
"text/plain", "QCMP Compression wasn't enabled on BigDataViewer server.",
HttpServletResponse.SC_BAD_REQUEST);
return;
try (final DataOutputStream dos = new DataOutputStream(response.getOutputStream())) {
dos.writeByte(compressionParams.getCompressFromMipmapLevel());
dos.writeByte(cachedCodebooks.size());
for (final ICacheFile cacheFile : cachedCodebooks) {
cacheFile.writeToStream(dos);
}
LOG.info(String.format("init_qcmp responding with %d bytes", dos.size()));
} // Stream gets closed here.
response.setContentType("application/octet-stream");
response.setStatus(HttpServletResponse.SC_OK);
baseRequest.setHandled(true);
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
}
public String getXmlFile() {
return xmlFilename;
}
public String getDataSetURL() {
return dataSetURL;
}
public String getThumbnailUrl() {
return dataSetURL + "png";
}
public String getDescription() {
throw new UnsupportedOperationException();
}
/**
* Create a JSON representation of the {@link RemoteImageLoaderMetaData}
* (image sizes and resolutions) provided by the given
* {@link Hdf5ImageLoader}.
*/
private static String buildMetadataJsonString(final Hdf5ImageLoader imgLoader, final SequenceDescriptionMinimal seq) {
final RemoteImageLoaderMetaData metadata = new RemoteImageLoaderMetaData(imgLoader, seq);
final GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.registerTypeAdapter(AffineTransform3D.class, new AffineTransform3DJsonSerializer());
gsonBuilder.enableComplexMapKeySerialization();
return gsonBuilder.create().toJson(metadata);
}
/**
* Create a modified dataset XML by replacing the ImageLoader with an
* {@link RemoteImageLoader} pointing to the data we are serving.
*/
private static String buildRemoteDatasetXML(final XmlIoSpimDataMinimal io,
final SpimDataMinimal spimData,
final String baseUrl) throws IOException, SpimDataException {
final SpimDataMinimal s = new SpimDataMinimal(spimData, new RemoteImageLoader(baseUrl, false));
final Document doc = new Document(io.toXml(s, s.getBasePath()));
final XMLOutputter xout = new XMLOutputter(Format.getPrettyFormat());
final StringWriter sw = new StringWriter();
xout.output(doc, sw);
return sw.toString();
}
/**
* Read {@code baseFilename.settings.xml} into a string if it exists.
*
* @return contents of {@code baseFilename.settings.xml} or {@code null} if
* that file couldn't be read.
*/
private static String buildSettingsXML(final String baseFilename) {
final String settings = baseFilename + ".settings.xml";
if (new File(settings).exists()) {
try {
final SAXBuilder sax = new SAXBuilder();
final Document doc = sax.build(settings);
final XMLOutputter xout = new XMLOutputter(Format.getPrettyFormat());
final StringWriter sw = new StringWriter();
xout.output(doc, sw);
return sw.toString();
} catch (final JDOMException | IOException e) {
LOG.warn("Could not read settings file \"" + settings + "\"");
LOG.warn(e.getMessage());
}
}
return null;
}
/**
* Create PNG thumbnail file named "{@code <baseFilename>.png}".
*/
private static String createThumbnail(final SpimDataMinimal spimData,
final String baseFilename,
final String datasetName,
final String thumbnailsDirectory) {
final String thumbnailFileName = thumbnailsDirectory + "/" + datasetName + ".png";
final File thumbnailFile = new File(thumbnailFileName);
if (!thumbnailFile.isFile()) // do not recreate thumbnail if it already exists
{
final BufferedImage bi = ThumbnailGenerator.makeThumbnail(spimData,
baseFilename,
Constants.THUMBNAIL_WIDTH,
Constants.THUMBNAIL_HEIGHT);
try {
ImageIO.write(bi, "png", thumbnailFile);
} catch (final IOException e) {
LOG.warn("Could not create thumbnail png for dataset \"" + baseFilename + "\"");
LOG.warn(e.getMessage());
}
}
return thumbnailFileName;
}
/**
* Handle request by sending a UTF-8 string.
*/
private static void respondWithString(final Request baseRequest,
final HttpServletResponse response,
final String contentType,
final String string) throws IOException {
respondWithString(baseRequest, response, contentType, string, HttpServletResponse.SC_OK);
}
/**
* Handle request by sending a UTF-8 string.
*/
private static void respondWithString(final Request baseRequest,
final HttpServletResponse response,
final String contentType,
final String string,
final int httpStatus) throws IOException {
response.setContentType(contentType);
response.setCharacterEncoding("UTF-8");
baseRequest.setHandled(true);
final PrintWriter ow = response.getWriter();
ow.write(string);
ow.close();
}