Skip to content
Snippets Groups Projects
Commit 0ceff863 authored by Vojtech Moravec's avatar Vojtech Moravec
Browse files

Measure compression statistics per remote client addr.

Compression statistics, like number of sent bytes are now grouped in RemoteCompressionClient class. This class is saved in a HashMap where key is the remote client address.

The RemoteCompressionClient is created in init_qcmp and is removed after inactivity longer than 60 minutes for now.

Hopefully `baseRequest.getRemoteAddr();` is good enaugh key for now.
parent f4579182
Branches master
No related tags found
No related merge requests found
...@@ -10,6 +10,7 @@ import bdv.img.remote.RemoteImageLoaderMetaData; ...@@ -10,6 +10,7 @@ import bdv.img.remote.RemoteImageLoaderMetaData;
import bdv.spimdata.SequenceDescriptionMinimal; import bdv.spimdata.SequenceDescriptionMinimal;
import bdv.spimdata.SpimDataMinimal; import bdv.spimdata.SpimDataMinimal;
import bdv.spimdata.XmlIoSpimDataMinimal; import bdv.spimdata.XmlIoSpimDataMinimal;
import bdv.util.RemoteCompressionClient;
import bdv.util.ThumbnailGenerator; import bdv.util.ThumbnailGenerator;
import com.google.gson.GsonBuilder; import com.google.gson.GsonBuilder;
import cz.it4i.qcmp.cache.ICacheFile; import cz.it4i.qcmp.cache.ICacheFile;
...@@ -52,8 +53,6 @@ import java.util.HashMap; ...@@ -52,8 +53,6 @@ import java.util.HashMap;
import java.util.Stack; import java.util.Stack;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class CellHandler extends ContextHandler { public class CellHandler extends ContextHandler {
private static final org.eclipse.jetty.util.log.Logger LOG = Log.getLogger(CellHandler.class); private static final org.eclipse.jetty.util.log.Logger LOG = Log.getLogger(CellHandler.class);
...@@ -168,11 +167,7 @@ public class CellHandler extends ContextHandler { ...@@ -168,11 +167,7 @@ public class CellHandler extends ContextHandler {
private Stack<MemoryOutputStream> cachedBuffers = null; private Stack<MemoryOutputStream> cachedBuffers = null;
private final int INITIAL_BUFFER_SIZE = 2048; private final int INITIAL_BUFFER_SIZE = 2048;
private final AtomicInteger compressedAccumulation = new AtomicInteger(0); private final HashMap<String, RemoteCompressionClient> remoteCompressionClients = new HashMap<>();
private final AtomicInteger uncompressedAccumulation = new AtomicInteger(0);
private final AtomicInteger qcmpCellResponseCount = new AtomicInteger(0);
private final AtomicLong compressionTimeAccumulation = new AtomicLong(0);
public CellHandler(final String baseUrl, public CellHandler(final String baseUrl,
final String xmlFilename, final String xmlFilename,
...@@ -375,6 +370,7 @@ public class CellHandler extends ContextHandler { ...@@ -375,6 +370,7 @@ public class CellHandler extends ContextHandler {
final Request baseRequest, final Request baseRequest,
final HttpServletRequest request, final HttpServletRequest request,
final HttpServletResponse response) throws IOException { final HttpServletResponse response) throws IOException {
if (target.equals("/settings")) { if (target.equals("/settings")) {
if (settingsXmlString != null) if (settingsXmlString != null)
respondWithString(baseRequest, response, "application/xml", settingsXmlString); respondWithString(baseRequest, response, "application/xml", settingsXmlString);
...@@ -392,14 +388,18 @@ public class CellHandler extends ContextHandler { ...@@ -392,14 +388,18 @@ public class CellHandler extends ContextHandler {
respondWithString(baseRequest, response, "application/xml", datasetXmlString); respondWithString(baseRequest, response, "application/xml", datasetXmlString);
return; return;
} }
final RemoteCompressionClient compressionClient = getRemoteCompressionClient(baseRequest);
final String[] parts = cellString.split("/"); final String[] parts = cellString.split("/");
if (parts[0].equals("cell")) { if (parts[0].equals("cell")) {
final int level = Integer.parseInt(parts[4]); final int level = Integer.parseInt(parts[4]);
final short[] data = getCachedVolatileCellData(parts, level); final short[] data = getCachedVolatileCellData(parts, level);
compressedAccumulation.addAndGet(data.length * 2);
uncompressedAccumulation.addAndGet(data.length * 2); if (compressionClient != null) {
compressionClient.getCompressedAccumulation().addAndGet(data.length * 2);
compressionClient.getUncompressedAccumulation().addAndGet(data.length * 2);
}
respondWithShortArray(response, data); respondWithShortArray(response, data);
response.setContentType("application/octet-stream"); response.setContentType("application/octet-stream");
...@@ -420,8 +420,11 @@ public class CellHandler extends ContextHandler { ...@@ -420,8 +420,11 @@ public class CellHandler extends ContextHandler {
final int compressedContentLength = getCompressorForMipmapLevel(mipmapLevel).streamCompressChunk(cellCompressionStream, final int compressedContentLength = getCompressorForMipmapLevel(mipmapLevel).streamCompressChunk(cellCompressionStream,
inputData); inputData);
compressionStopwatch.stop(); compressionStopwatch.stop();
compressionTimeAccumulation.addAndGet(compressionStopwatch.getElapsedInUnit(TimeUnit.NANOSECONDS));
qcmpCellResponseCount.incrementAndGet(); assert (compressionClient != null) : "Handling cell_qcmp and compression client is null.";
compressionClient.getCompressionTimeAccumulation().addAndGet(compressionStopwatch.getElapsedInUnit(TimeUnit.NANOSECONDS));
compressionClient.getHandledRequestCount().incrementAndGet();
response.setContentLength(compressedContentLength); response.setContentLength(compressedContentLength);
try (final OutputStream responseStream = response.getOutputStream()) { try (final OutputStream responseStream = response.getOutputStream()) {
...@@ -443,9 +446,8 @@ public class CellHandler extends ContextHandler { ...@@ -443,9 +446,8 @@ public class CellHandler extends ContextHandler {
baseRequest.setHandled(true); baseRequest.setHandled(true);
returnBufferForReuse(cellCompressionStream); returnBufferForReuse(cellCompressionStream);
final long currentlySent = compressedAccumulation.addAndGet(compressedContentLength); final long currentlySent = compressionClient.getCompressedAccumulation().addAndGet(compressedContentLength);
final long uncompressedWouldSent = uncompressedAccumulation.addAndGet(data.length * 2); final long uncompressedWouldSent = compressionClient.getUncompressedAccumulation().addAndGet(data.length * 2);
if (compressionParams.isVerbose()) { if (compressionParams.isVerbose()) {
LOG.info(String.format("Sending %dB instead of %dB. Currently sent %dB instead of %dB.", LOG.info(String.format("Sending %dB instead of %dB. Currently sent %dB instead of %dB.",
...@@ -460,15 +462,17 @@ public class CellHandler extends ContextHandler { ...@@ -460,15 +462,17 @@ public class CellHandler extends ContextHandler {
} else if (parts[0].equals("init_qcmp")) { } else if (parts[0].equals("init_qcmp")) {
respondWithCompressionInfo(baseRequest, response); respondWithCompressionInfo(baseRequest, response);
} else if (parts[0].equals("qcmp_summary")) { } else if (parts[0].equals("qcmp_summary")) {
respondWithCompressionSummary(baseRequest, response); respondWithCompressionSummary(baseRequest, response, compressionClient);
} }
} }
private void respondWithCompressionSummary(final Request baseRequest, final HttpServletResponse response) throws IOException { private void respondWithCompressionSummary(final Request baseRequest,
final long currentlySent = compressedAccumulation.get(); final HttpServletResponse response,
final long uncompressedWouldSent = uncompressedAccumulation.get(); final RemoteCompressionClient compressionClient) throws IOException {
final int qcmpRequestCount = qcmpCellResponseCount.get(); final long currentlySent = compressionClient.getCompressedAccumulation().get();
final long accumulatedNs = compressionTimeAccumulation.get(); final long uncompressedWouldSent = compressionClient.getUncompressedAccumulation().get();
final int qcmpRequestCount = compressionClient.getHandledRequestCount().get();
final long accumulatedNs = compressionClient.getCompressionTimeAccumulation().get();
final long totalRequestTimeMs = TimeUnit.MILLISECONDS.convert(accumulatedNs, TimeUnit.NANOSECONDS); final long totalRequestTimeMs = TimeUnit.MILLISECONDS.convert(accumulatedNs, TimeUnit.NANOSECONDS);
final long averageTimePerRequestMs = (long) ((double) totalRequestTimeMs / (double) qcmpRequestCount); final long averageTimePerRequestMs = (long) ((double) totalRequestTimeMs / (double) qcmpRequestCount);
...@@ -507,6 +511,33 @@ public class CellHandler extends ContextHandler { ...@@ -507,6 +511,33 @@ public class CellHandler extends ContextHandler {
} }
} }
/**
* Get remote compression client by the base request from the client and update its last access time.
*
* @param baseRequest Base request of the client.
* @return RemoteCompressionClient if exists or null.
*/
private RemoteCompressionClient getRemoteCompressionClient(final Request baseRequest) {
final RemoteCompressionClient compressionClient = remoteCompressionClients.get(baseRequest.getRemoteAddr());
if (compressionClient != null)
compressionClient.updateLastAccessTime();
return compressionClient;
}
private void removeInactiveCompressionClients() {
final int maxInactiveTimeInMinutes = 60;
final ArrayList<String> keysToRemove = new ArrayList<>(10);
remoteCompressionClients.forEach((key, value) -> {
if (value.getInactiveTimeInMinutes() > maxInactiveTimeInMinutes)
keysToRemove.add(key);
});
LOG.info(String.format("Removing %d inactive compression clients.", keysToRemove.size()));
for (final String keyToRemove : keysToRemove) {
remoteCompressionClients.remove(keyToRemove);
}
}
private void respondWithCompressionInfo(final Request baseRequest, final HttpServletResponse response) throws IOException { private void respondWithCompressionInfo(final Request baseRequest, final HttpServletResponse response) throws IOException {
if (cachedCodebooks == null || cachedCodebooks.isEmpty()) { if (cachedCodebooks == null || cachedCodebooks.isEmpty()) {
LOG.info("QCMP initialization request was refused, QCMP compression is not enabled."); LOG.info("QCMP initialization request was refused, QCMP compression is not enabled.");
...@@ -516,6 +547,21 @@ public class CellHandler extends ContextHandler { ...@@ -516,6 +547,21 @@ public class CellHandler extends ContextHandler {
return; return;
} }
// Save remote compression client.
final String key = baseRequest.getRemoteAddr();
{
final RemoteCompressionClient existingClient = remoteCompressionClients.get(key);
if (existingClient != null) {
LOG.info("Resetting remote compression client for remote addr=" + key);
existingClient.reset();
} else {
remoteCompressionClients.put(key, new RemoteCompressionClient());
}
if (remoteCompressionClients.size() > 10)
removeInactiveCompressionClients();
}
try (final DataOutputStream dos = new DataOutputStream(response.getOutputStream())) { try (final DataOutputStream dos = new DataOutputStream(response.getOutputStream())) {
dos.writeByte(compressionParams.getCompressFromMipmapLevel()); dos.writeByte(compressionParams.getCompressFromMipmapLevel());
dos.writeByte(cachedCodebooks.size()); dos.writeByte(cachedCodebooks.size());
...@@ -530,6 +576,7 @@ public class CellHandler extends ContextHandler { ...@@ -530,6 +576,7 @@ public class CellHandler extends ContextHandler {
baseRequest.setHandled(true); baseRequest.setHandled(true);
} }
public String getXmlFile() { public String getXmlFile() {
return xmlFilename; return xmlFilename;
} }
......
package bdv.util;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class RemoteCompressionClient {
private final AtomicInteger compressedAccumulation = new AtomicInteger(0);
private final AtomicInteger uncompressedAccumulation = new AtomicInteger(0);
private final AtomicInteger handledRequestCount = new AtomicInteger(0);
private final AtomicLong compressionTimeAccumulation = new AtomicLong(0);
private Instant lastAccessTime;
public RemoteCompressionClient() {
updateLastAccessTime();
}
public AtomicInteger getCompressedAccumulation() {
return compressedAccumulation;
}
public AtomicInteger getUncompressedAccumulation() {
return uncompressedAccumulation;
}
public AtomicInteger getHandledRequestCount() {
return handledRequestCount;
}
public AtomicLong getCompressionTimeAccumulation() {
return compressionTimeAccumulation;
}
public void updateLastAccessTime() {
lastAccessTime = Instant.now();
}
public long getInactiveTimeInMinutes() {
return Duration.between(lastAccessTime, Instant.now()).toMinutes();
}
/**
* Reset the compression client and update lastAccessTime.
*/
public void reset() {
compressedAccumulation.set(0);
uncompressedAccumulation.set(0);
handledRequestCount.set(0);
compressionTimeAccumulation.set(0);
updateLastAccessTime();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment