From 2c69fa284baa540a65e487da0c439b6a6b3adce0 Mon Sep 17 00:00:00 2001
From: Vojtech Moravec <vojtech.moravec.st@vsb.cz>
Date: Tue, 4 Feb 2020 09:40:20 +0100
Subject: [PATCH] Parallelized quantizeIntoIndices for VQ compression.

---
 .../compression/VQImageCompressor.java        |  2 +-
 .../quantization/vector/VectorQuantizer.java  | 42 +++++++++++++++++--
 2 files changed, 39 insertions(+), 5 deletions(-)

diff --git a/src/main/java/azgracompress/compression/VQImageCompressor.java b/src/main/java/azgracompress/compression/VQImageCompressor.java
index de00719..33cadff 100644
--- a/src/main/java/azgracompress/compression/VQImageCompressor.java
+++ b/src/main/java/azgracompress/compression/VQImageCompressor.java
@@ -143,7 +143,7 @@ public class VQImageCompressor extends CompressorDecompressorBase implements IIm
             assert (quantizer != null);
 
             Log("Compression plane...");
-            final int[] indices = quantizer.quantizeIntoIndices(planeVectors);
+            final int[] indices = quantizer.quantizeIntoIndices(planeVectors, options.getWorkerCount());
 
             try (OutBitStream outBitStream = new OutBitStream(compressStream, options.getBitsPerPixel(), 2048)) {
                 outBitStream.write(indices);
diff --git a/src/main/java/azgracompress/quantization/vector/VectorQuantizer.java b/src/main/java/azgracompress/quantization/vector/VectorQuantizer.java
index 72b4f1e..adb9b52 100644
--- a/src/main/java/azgracompress/quantization/vector/VectorQuantizer.java
+++ b/src/main/java/azgracompress/quantization/vector/VectorQuantizer.java
@@ -31,13 +31,47 @@ public class VectorQuantizer {
     }
 
     public int[] quantizeIntoIndices(final int[][] dataVectors) {
+        return quantizeIntoIndices(dataVectors, 1);
+    }
+
+
+    public int[] quantizeIntoIndices(final int[][] dataVectors, final int maxWorkerCount) {
+
         assert (dataVectors.length > 0 && dataVectors[0].length % vectorSize == 0) : "Wrong vector size";
         int[] indices = new int[dataVectors.length];
 
-        // Speedup
-        for (int vectorIndex = 0; vectorIndex < dataVectors.length; vectorIndex++) {
-            indices[vectorIndex] = findClosestCodebookEntryIndex(dataVectors[vectorIndex],
-                                                                 VectorDistanceMetric.Euclidean);
+        if (maxWorkerCount == 1) {
+            for (int vectorIndex = 0; vectorIndex < dataVectors.length; vectorIndex++) {
+                indices[vectorIndex] = findClosestCodebookEntryIndex(dataVectors[vectorIndex],
+                                                                     VectorDistanceMetric.Euclidean);
+            }
+        } else {
+            // Cap the worker count on 8
+            final int workerCount = Math.min(maxWorkerCount, 8);
+            Thread[] workers = new Thread[workerCount];
+            final int workSize = dataVectors.length / workerCount;
+
+            for (int wId = 0; wId < workerCount; wId++) {
+                final int fromIndex = wId * workSize;
+                final int toIndex = (wId == workerCount - 1) ? dataVectors.length : (workSize + (wId * workSize));
+
+                workers[wId] = new Thread(() -> {
+                    for (int vectorIndex = fromIndex; vectorIndex < toIndex; vectorIndex++) {
+                        indices[vectorIndex] = findClosestCodebookEntryIndex(dataVectors[vectorIndex],
+                                                                             VectorDistanceMetric.Euclidean);
+                    }
+                });
+
+                workers[wId].start();
+            }
+            try {
+                for (int wId = 0; wId < workerCount; wId++) {
+                    workers[wId].join();
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+
         }
         return indices;
     }
-- 
GitLab