diff --git a/src/main/java/bdv/server/CellHandler.java b/src/main/java/bdv/server/CellHandler.java index 47d6193e60b8dbd8cb25502cebe471691cdb23cc..d5ed31381007b9626c43d3b8237446ee05a44051 100644 --- a/src/main/java/bdv/server/CellHandler.java +++ b/src/main/java/bdv/server/CellHandler.java @@ -10,6 +10,7 @@ import bdv.img.remote.RemoteImageLoaderMetaData; import bdv.spimdata.SequenceDescriptionMinimal; import bdv.spimdata.SpimDataMinimal; import bdv.spimdata.XmlIoSpimDataMinimal; +import bdv.util.RemoteCompressionClient; import bdv.util.ThumbnailGenerator; import com.google.gson.GsonBuilder; import cz.it4i.qcmp.cache.ICacheFile; @@ -52,8 +53,6 @@ import java.util.HashMap; import java.util.Stack; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; public class CellHandler extends ContextHandler { private static final org.eclipse.jetty.util.log.Logger LOG = Log.getLogger(CellHandler.class); @@ -168,11 +167,7 @@ public class CellHandler extends ContextHandler { private Stack<MemoryOutputStream> cachedBuffers = null; private final int INITIAL_BUFFER_SIZE = 2048; - private final AtomicInteger compressedAccumulation = new AtomicInteger(0); - private final AtomicInteger uncompressedAccumulation = new AtomicInteger(0); - private final AtomicInteger qcmpCellResponseCount = new AtomicInteger(0); - private final AtomicLong compressionTimeAccumulation = new AtomicLong(0); - + private final HashMap<String, RemoteCompressionClient> remoteCompressionClients = new HashMap<>(); public CellHandler(final String baseUrl, final String xmlFilename, @@ -375,6 +370,7 @@ public class CellHandler extends ContextHandler { final Request baseRequest, final HttpServletRequest request, final HttpServletResponse response) throws IOException { + if (target.equals("/settings")) { if (settingsXmlString != null) respondWithString(baseRequest, response, "application/xml", settingsXmlString); @@ -392,14 +388,18 @@ public class CellHandler extends ContextHandler { respondWithString(baseRequest, response, "application/xml", datasetXmlString); return; } - + final RemoteCompressionClient compressionClient = getRemoteCompressionClient(baseRequest); final String[] parts = cellString.split("/"); if (parts[0].equals("cell")) { final int level = Integer.parseInt(parts[4]); final short[] data = getCachedVolatileCellData(parts, level); - compressedAccumulation.addAndGet(data.length * 2); - uncompressedAccumulation.addAndGet(data.length * 2); + + if (compressionClient != null) { + compressionClient.getCompressedAccumulation().addAndGet(data.length * 2); + compressionClient.getUncompressedAccumulation().addAndGet(data.length * 2); + } + respondWithShortArray(response, data); response.setContentType("application/octet-stream"); @@ -420,8 +420,11 @@ public class CellHandler extends ContextHandler { final int compressedContentLength = getCompressorForMipmapLevel(mipmapLevel).streamCompressChunk(cellCompressionStream, inputData); compressionStopwatch.stop(); - compressionTimeAccumulation.addAndGet(compressionStopwatch.getElapsedInUnit(TimeUnit.NANOSECONDS)); - qcmpCellResponseCount.incrementAndGet(); + + assert (compressionClient != null) : "Handling cell_qcmp and compression client is null."; + + compressionClient.getCompressionTimeAccumulation().addAndGet(compressionStopwatch.getElapsedInUnit(TimeUnit.NANOSECONDS)); + compressionClient.getHandledRequestCount().incrementAndGet(); response.setContentLength(compressedContentLength); try (final OutputStream responseStream = response.getOutputStream()) { @@ -443,9 +446,8 @@ public class CellHandler extends ContextHandler { baseRequest.setHandled(true); returnBufferForReuse(cellCompressionStream); - final long currentlySent = compressedAccumulation.addAndGet(compressedContentLength); - final long uncompressedWouldSent = uncompressedAccumulation.addAndGet(data.length * 2); - + final long currentlySent = compressionClient.getCompressedAccumulation().addAndGet(compressedContentLength); + final long uncompressedWouldSent = compressionClient.getUncompressedAccumulation().addAndGet(data.length * 2); if (compressionParams.isVerbose()) { LOG.info(String.format("Sending %dB instead of %dB. Currently sent %dB instead of %dB.", @@ -460,15 +462,17 @@ public class CellHandler extends ContextHandler { } else if (parts[0].equals("init_qcmp")) { respondWithCompressionInfo(baseRequest, response); } else if (parts[0].equals("qcmp_summary")) { - respondWithCompressionSummary(baseRequest, response); + respondWithCompressionSummary(baseRequest, response, compressionClient); } } - private void respondWithCompressionSummary(final Request baseRequest, final HttpServletResponse response) throws IOException { - final long currentlySent = compressedAccumulation.get(); - final long uncompressedWouldSent = uncompressedAccumulation.get(); - final int qcmpRequestCount = qcmpCellResponseCount.get(); - final long accumulatedNs = compressionTimeAccumulation.get(); + private void respondWithCompressionSummary(final Request baseRequest, + final HttpServletResponse response, + final RemoteCompressionClient compressionClient) throws IOException { + final long currentlySent = compressionClient.getCompressedAccumulation().get(); + final long uncompressedWouldSent = compressionClient.getUncompressedAccumulation().get(); + final int qcmpRequestCount = compressionClient.getHandledRequestCount().get(); + final long accumulatedNs = compressionClient.getCompressionTimeAccumulation().get(); final long totalRequestTimeMs = TimeUnit.MILLISECONDS.convert(accumulatedNs, TimeUnit.NANOSECONDS); final long averageTimePerRequestMs = (long) ((double) totalRequestTimeMs / (double) qcmpRequestCount); @@ -507,6 +511,33 @@ public class CellHandler extends ContextHandler { } } + /** + * Get remote compression client by the base request from the client and update its last access time. + * + * @param baseRequest Base request of the client. + * @return RemoteCompressionClient if exists or null. + */ + private RemoteCompressionClient getRemoteCompressionClient(final Request baseRequest) { + final RemoteCompressionClient compressionClient = remoteCompressionClients.get(baseRequest.getRemoteAddr()); + if (compressionClient != null) + compressionClient.updateLastAccessTime(); + return compressionClient; + } + + private void removeInactiveCompressionClients() { + final int maxInactiveTimeInMinutes = 60; + final ArrayList<String> keysToRemove = new ArrayList<>(10); + remoteCompressionClients.forEach((key, value) -> { + if (value.getInactiveTimeInMinutes() > maxInactiveTimeInMinutes) + keysToRemove.add(key); + }); + + LOG.info(String.format("Removing %d inactive compression clients.", keysToRemove.size())); + for (final String keyToRemove : keysToRemove) { + remoteCompressionClients.remove(keyToRemove); + } + } + private void respondWithCompressionInfo(final Request baseRequest, final HttpServletResponse response) throws IOException { if (cachedCodebooks == null || cachedCodebooks.isEmpty()) { LOG.info("QCMP initialization request was refused, QCMP compression is not enabled."); @@ -516,6 +547,21 @@ public class CellHandler extends ContextHandler { return; } + // Save remote compression client. + final String key = baseRequest.getRemoteAddr(); + { + final RemoteCompressionClient existingClient = remoteCompressionClients.get(key); + if (existingClient != null) { + LOG.info("Resetting remote compression client for remote addr=" + key); + existingClient.reset(); + } else { + remoteCompressionClients.put(key, new RemoteCompressionClient()); + } + if (remoteCompressionClients.size() > 10) + removeInactiveCompressionClients(); + } + + try (final DataOutputStream dos = new DataOutputStream(response.getOutputStream())) { dos.writeByte(compressionParams.getCompressFromMipmapLevel()); dos.writeByte(cachedCodebooks.size()); @@ -530,6 +576,7 @@ public class CellHandler extends ContextHandler { baseRequest.setHandled(true); } + public String getXmlFile() { return xmlFilename; } diff --git a/src/main/java/bdv/util/RemoteCompressionClient.java b/src/main/java/bdv/util/RemoteCompressionClient.java new file mode 100644 index 0000000000000000000000000000000000000000..7b28e9f346501c561bc7a90b2a5e3b02f968bd99 --- /dev/null +++ b/src/main/java/bdv/util/RemoteCompressionClient.java @@ -0,0 +1,54 @@ +package bdv.util; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class RemoteCompressionClient { + private final AtomicInteger compressedAccumulation = new AtomicInteger(0); + private final AtomicInteger uncompressedAccumulation = new AtomicInteger(0); + private final AtomicInteger handledRequestCount = new AtomicInteger(0); + private final AtomicLong compressionTimeAccumulation = new AtomicLong(0); + + private Instant lastAccessTime; + + public RemoteCompressionClient() { + updateLastAccessTime(); + } + + public AtomicInteger getCompressedAccumulation() { + return compressedAccumulation; + } + + public AtomicInteger getUncompressedAccumulation() { + return uncompressedAccumulation; + } + + public AtomicInteger getHandledRequestCount() { + return handledRequestCount; + } + + public AtomicLong getCompressionTimeAccumulation() { + return compressionTimeAccumulation; + } + + public void updateLastAccessTime() { + lastAccessTime = Instant.now(); + } + + public long getInactiveTimeInMinutes() { + return Duration.between(lastAccessTime, Instant.now()).toMinutes(); + } + + /** + * Reset the compression client and update lastAccessTime. + */ + public void reset() { + compressedAccumulation.set(0); + uncompressedAccumulation.set(0); + handledRequestCount.set(0); + compressionTimeAccumulation.set(0); + updateLastAccessTime(); + } +}