From 6f2f14a9d47e8d36cafe9da319292781f6e9f5bf Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jan=20Ko=C5=BEusznik?= <jan@kozusznik.cz>
Date: Mon, 30 Apr 2018 13:57:53 +0200
Subject: [PATCH] feat: add notifier for download

---
 .../src/main/java/cz/it4i/fiji/haas/Job.java  | 24 ++++--
 .../PersitentSynchronizationProcess.java      | 27 ++++++-
 .../haas/data_transfer/Synchronization.java   | 29 ++++++++
 .../fiji/haas/ui/ObservableValueRegistry.java |  6 +-
 .../core/BenchmarkJobManager.java             | 23 +++++-
 .../core/UpdatableBenchmarkJob.java           | 74 +++++++++++++++++++
 .../ui/BenchmarkSPIMController.java           |  2 +-
 .../ui/ObservableBenchmarkJobRegistry.java    | 29 ++++++++
 .../java/cz/it4i/fiji/haas/RunBenchmark.java  |  2 +-
 9 files changed, 203 insertions(+), 13 deletions(-)
 create mode 100644 haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/UpdatableBenchmarkJob.java

diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java
index 99f09004..03c54732 100644
--- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java
+++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java
@@ -28,7 +28,7 @@ import cz.it4i.fiji.haas_java_client.JobInfo;
 import cz.it4i.fiji.haas_java_client.JobState;
 import cz.it4i.fiji.haas_java_client.ProgressNotifier;
 import cz.it4i.fiji.haas_java_client.TransferFileProgressForHaaSClient;
-import net.imagej.updater.util.Progress;
+import cz.it4i.fiji.scpclient.TransferFileProgress;
 /***
  * TASK - napojit na UI 
  * @author koz01
@@ -63,7 +63,8 @@ public class Job {
 	private PropertyHolder propertyHolder;
 	private JobManager4Job jobManager;
 	private Synchronization synchronization;
-
+	
+	
 	public Job(JobManager4Job jobManager, String name, Path basePath, Supplier<HaaSClient> haasClientSupplier)
 			throws IOException {
 		this(jobManager, haasClientSupplier);
@@ -73,7 +74,7 @@ public class Job {
 		propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILENAME));
 		Files.createDirectory(jobDir);
 		setName(name);
-
+		
 	}
 
 	public Job(JobManager4Job jobManager, Path jobDirectory, Supplier<HaaSClient> haasClientSupplier) {
@@ -90,6 +91,7 @@ public class Job {
 		this.jobManager = jobManager;
 	}
 
+
 	public void startUploadData() {
 		setProperty(JOB_NEEDS_UPLOAD, true);
 		try {
@@ -110,7 +112,7 @@ public class Job {
 		}
 	}
 
-	public void startDownload(Predicate<String> predicate, Progress notifier) throws IOException {
+	public void startDownload(Predicate<String> predicate) throws IOException {
 		setProperty(JOB_NEEDS_DOWNLOAD, true);
 		Collection<String> files = getHaaSClient().getChangedFiles(jobId).stream().filter(predicate)
 				.collect(Collectors.toList());
@@ -297,7 +299,7 @@ public class Job {
 		this.jobDir = jobDirectory;
 		try {
 			this.synchronization = new Synchronization(
-					() -> haasClientSupplier.get().startFileTransfer(getId(), HaaSClient.DUMMY_TRANSFER_FILE_PROGRESS),
+					()->startFileTransfer(HaaSClient.DUMMY_TRANSFER_FILE_PROGRESS),
 					jobDir, Executors.newFixedThreadPool(2), () -> {
 						setProperty(JOB_NEEDS_UPLOAD, false);
 					}, () -> {
@@ -310,6 +312,10 @@ public class Job {
 		}
 	}
 
+	private HaaSFileTransfer startFileTransfer( TransferFileProgress progress) {
+		return haasClientSupplier.get().startFileTransfer(getId(), progress);
+	}
+
 	private synchronized void resumeUpload() {
 		if (Boolean.parseBoolean(getProperty(JOB_NEEDS_UPLOAD))) {
 			synchronization.resumeUpload();
@@ -360,4 +366,12 @@ public class Job {
 		setProperty(JOB_CAN_BE_DOWNLOADED, b);
 	}
 
+	public void setDownloadNotifier(ProgressNotifier notifier) {
+		synchronization.setDownloadNotifier(notifier);
+	}
+	
+	public void setUploadNotifier(ProgressNotifier notifier) {
+		synchronization.setUploadNotifier(notifier);
+	}
+
 }
diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersitentSynchronizationProcess.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersitentSynchronizationProcess.java
index 6d933615..bfa3b2c4 100644
--- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersitentSynchronizationProcess.java
+++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersitentSynchronizationProcess.java
@@ -13,6 +13,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import cz.it4i.fiji.haas_java_client.HaaSFileTransfer;
+import cz.it4i.fiji.haas_java_client.ProgressNotifier;
+import cz.it4i.fiji.haas_java_client.TransferFileProgressForHaaSClient;
+import cz.it4i.fiji.scpclient.TransferFileProgress;
 
 public abstract class PersitentSynchronizationProcess<T> {
 
@@ -20,6 +23,12 @@ public abstract class PersitentSynchronizationProcess<T> {
 	
 	public static final Logger log = LoggerFactory
 			.getLogger(cz.it4i.fiji.haas.data_transfer.PersitentSynchronizationProcess.class);
+
+	private static final TransferFileProgress DUMMY_FILE_PROGRESS =  new TransferFileProgress() {
+		@Override
+		public void dataTransfered(long bytesTransfered) {
+		}
+	};
 	
 	private PersistentIndex<T> index;
 	
@@ -32,6 +41,8 @@ public abstract class PersitentSynchronizationProcess<T> {
 	private Runnable processFinishedNotifier;
 
 	private String name;
+
+	private ProgressNotifier notifier;
 	
 	public PersitentSynchronizationProcess(String name,ExecutorService service, Supplier<HaaSFileTransfer> fileTransferSupplier, Runnable processFinishedNotifier, Path indexFile,Function<String,T> convertor) throws IOException {
 		runner = new SimpleThreadRunner(service);
@@ -74,6 +85,7 @@ public abstract class PersitentSynchronizationProcess<T> {
 	
 	private void doProcess(AtomicBoolean reRun) {
 		try(HaaSFileTransfer tr = fileTransferSupplier.get()) {
+			tr.setProgress(getTransferFileProgress(tr));
 			while (!toProcessQueue.isEmpty()) {
 				T p = toProcessQueue.poll();
 				
@@ -91,7 +103,9 @@ public abstract class PersitentSynchronizationProcess<T> {
 			}
 		}
 	}
-	
+
+	abstract protected long getTotalSize(Iterable<T> items, HaaSFileTransfer tr);
+
 	private void fileUploaded(T p) {
 		try {
 			index.remove(p);
@@ -101,5 +115,16 @@ public abstract class PersitentSynchronizationProcess<T> {
 		}
 	}
 
+	public void setNotifier(ProgressNotifier notifier) {
+		this.notifier = notifier;
+	}
+
+	private TransferFileProgress getTransferFileProgress(HaaSFileTransfer tr) {
+		if(notifier == null) {
+			return DUMMY_FILE_PROGRESS;
+		}
+		return new TransferFileProgressForHaaSClient(getTotalSize(toProcessQueue, tr), notifier);
+	}
+
 	
 }
diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java
index 7f9ae99d..fbb9bceb 100644
--- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java
+++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java
@@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory;
 
 import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile;
 import cz.it4i.fiji.haas_java_client.HaaSFileTransfer;
+import cz.it4i.fiji.haas_java_client.ProgressNotifier;
 import cz.it4i.fiji.haas_java_client.UploadingFileImpl;
 
 public class Synchronization {
@@ -52,6 +53,15 @@ public class Synchronization {
 		this.downloadProcess = createDownloadProcess(fileTransferSupplier, service, downloadFinishedNotifier);
 	}
 
+	public synchronized void setUploadNotifier(ProgressNotifier notifier) {
+		uploadProcess.setNotifier(notifier);
+	}
+	
+	public void setDownloadNotifier(ProgressNotifier notifier) {
+		// TODO Auto-generated method stub
+		
+	}
+
 	public synchronized void startUpload() throws IOException {
 		uploadProcess.start();
 	}
@@ -92,6 +102,7 @@ public class Synchronization {
 				try(DirectoryStream<Path> ds = Files.newDirectoryStream(workingDirectory,Synchronization.this::canUpload)) {
 					return StreamSupport.stream(ds.spliterator(), false).collect(Collectors.toList()); 
 				}
+				
 			}
 	
 			@Override
@@ -99,6 +110,18 @@ public class Synchronization {
 				UploadingFile uf = new UploadingFileImpl(p);
 				tr.upload(uf);
 			}
+
+			@Override
+			protected long getTotalSize(Iterable<Path> items, HaaSFileTransfer tr) {
+				return StreamSupport.stream(items.spliterator(), false).map(p->{
+					try {
+						return Files.size(p);
+					} catch (IOException e) {
+						log.error(e.getMessage(), e);
+						return 0; 
+					}
+				}).collect(Collectors.summingLong(val->val.longValue()));
+			}
 		};
 	}
 
@@ -138,6 +161,12 @@ public class Synchronization {
 			}
 			tr.download(file, workingDirectory);
 		}
+
+		@Override
+		protected long getTotalSize(Iterable<String> items, HaaSFileTransfer tr) {
+			
+			return tr.obtainSize( StreamSupport.stream(items.spliterator(), false).collect(Collectors.toList())).stream().collect(Collectors.summingLong(val->val));
+		}
 		
 	}
 }
diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/ui/ObservableValueRegistry.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/ui/ObservableValueRegistry.java
index b47f0739..c976fb35 100644
--- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/ui/ObservableValueRegistry.java
+++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/ui/ObservableValueRegistry.java
@@ -32,9 +32,13 @@ public class ObservableValueRegistry<T> {
 	private Map<T,UpdatableObservableValue<T>> map = new LinkedHashMap<>(); 
 	
 	public  ObservableValue<T> addIfAbsent(T value) {
-		UpdatableObservableValue<T> uov = map.computeIfAbsent(value, v-> new UpdatableObservableValue<T>(v, updateFunction, stateProvider));
+		UpdatableObservableValue<T> uov = map.computeIfAbsent(value, v-> constructObservableValue(v, updateFunction, stateProvider));
 		return uov;
 	}
+
+	protected UpdatableObservableValue<T> constructObservableValue(T v, Function<T, UpdateStatus> updateFunction, Function<T, Object> stateProvider) {
+		return new UpdatableObservableValue<T>(v, updateFunction, stateProvider);
+	}
 	
 	public UpdatableObservableValue<T> get(T value) {
 		return map.get(value);
diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java
index 2741aa41..67cc49a8 100644
--- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java
+++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java
@@ -108,12 +108,12 @@ public class BenchmarkJobManager {
 			return result;
 		}
 
-		public void startDownload(Progress progress) throws IOException {
+		public void startDownload() throws IOException {
 			if (job.getState() == JobState.Finished) {
 				String filePattern = job.getProperty(SPIM_OUTPUT_FILENAME_PATTERN);
-				job.startDownload(downloadFinishedData(filePattern) , progress);
+				job.startDownload(downloadFinishedData(filePattern) );
 			} else if (job.getState() == JobState.Failed || job.getState() == JobState.Canceled) {
-				job.startDownload(downloadFailedData(), progress);
+				job.startDownload(downloadFailedData());
 			}
 		}
 		
@@ -401,7 +401,22 @@ public class BenchmarkJobManager {
 			return Stream.concat(nonTaskSpecificErrors.stream(), taskSpecificErrors).collect(Collectors.toList());
 		}
 
-		
+		public void setDownloadNotifier(Progress progress) {
+			if(progress == null) {
+				job.setDownloadNotifier(null);
+			} else {
+				job.setDownloadNotifier(new P_ProgressNotifierAdapter(progress));
+			}
+		}
+
+
+		public void setUploadNotifier(Progress downloadProgress) {
+			if(downloadProgress == null) {
+				job.setUploadNotifier(null);
+			} else {
+				job.setUploadNotifier(new P_ProgressNotifierAdapter(downloadProgress));
+			}
+		}
 	}
 
 	public BenchmarkJobManager(BenchmarkSPIMParameters params) throws IOException {
diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/UpdatableBenchmarkJob.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/UpdatableBenchmarkJob.java
new file mode 100644
index 00000000..510134fb
--- /dev/null
+++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/UpdatableBenchmarkJob.java
@@ -0,0 +1,74 @@
+package cz.it4i.fiji.haas_spim_benchmark.core;
+
+import java.util.function.Function;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import cz.it4i.fiji.haas.ui.UpdatableObservableValue;
+import cz.it4i.fiji.haas_spim_benchmark.core.BenchmarkJobManager.BenchmarkJob;
+import net.imagej.updater.util.Progress;
+
+public class UpdatableBenchmarkJob extends UpdatableObservableValue<BenchmarkJob>{
+
+	public static final Logger log = LoggerFactory
+			.getLogger(cz.it4i.fiji.haas_spim_benchmark.core.UpdatableBenchmarkJob.class);
+	
+	private Progress downloadProgress = new P_DownloadProgress();
+	
+	public UpdatableBenchmarkJob(BenchmarkJob wrapped, Function<BenchmarkJob, UpdateStatus> updateFunction,
+			Function<BenchmarkJob, Object> stateProvider) {
+		super(wrapped, updateFunction, stateProvider);
+		
+		wrapped.setDownloadNotifier(downloadProgress);
+	}
+	
+	public void removed() {
+		getValue().setDownloadNotifier(null);
+	}
+	
+	
+	private class P_DownloadProgress implements Progress {
+
+		@Override
+		public void setTitle(String title) {
+			// TODO Auto-generated method stub
+			
+		}
+
+		@Override
+		public void setCount(int count, int total) {
+			log.info("setCount count=" + count + ", total:" + total);
+		}
+
+		@Override
+		public void addItem(Object item) {
+			// TODO Auto-generated method stub
+			
+		}
+
+		@Override
+		public void setItemCount(int count, int total) {
+			// TODO Auto-generated method stub
+			
+		}
+
+		@Override
+		public void itemDone(Object item) {
+			// TODO Auto-generated method stub
+			
+		}
+
+		@Override
+		public void done() {
+			// TODO Auto-generated method stub
+			
+		}
+		
+	}
+
+
+
+	
+
+}
diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java
index 0423b261..a2938a0c 100644
--- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java
+++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java
@@ -112,7 +112,7 @@ public class BenchmarkSPIMController extends BorderPane implements CloseableCont
 				job -> JavaFXRoutines.notNullValue(job, j -> !EnumSet.of(JobState.Running).contains(j.getState())));
 		
 		menu.addItem("Download result",
-				job -> executeWSCallAsync("Downloading data", p -> job.getValue().startDownload(p)),
+				job -> executeWSCallAsync("Downloading data", p -> job.getValue().startDownload()),
 				job -> JavaFXRoutines.notNullValue(job,
 						j -> EnumSet.of(JobState.Failed, JobState.Finished, JobState.Canceled).contains(j.getState())
 								&& !j.canBeDownloaded()));
diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/ObservableBenchmarkJobRegistry.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/ObservableBenchmarkJobRegistry.java
index ed93b4a9..560b3e05 100644
--- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/ObservableBenchmarkJobRegistry.java
+++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/ObservableBenchmarkJobRegistry.java
@@ -3,14 +3,18 @@ package cz.it4i.fiji.haas_spim_benchmark.ui;
 import java.nio.file.Files;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import cz.it4i.fiji.haas.ui.ObservableValueRegistry;
+import cz.it4i.fiji.haas.ui.UpdatableObservableValue;
 import cz.it4i.fiji.haas.ui.UpdatableObservableValue.UpdateStatus;
 import cz.it4i.fiji.haas_java_client.JobState;
 import cz.it4i.fiji.haas_spim_benchmark.core.BenchmarkJobManager.BenchmarkJob;
+import cz.it4i.fiji.haas_spim_benchmark.core.UpdatableBenchmarkJob;
+import javafx.beans.value.ObservableValue;
 
 public class ObservableBenchmarkJobRegistry extends ObservableValueRegistry<BenchmarkJob> {
 
@@ -23,6 +27,31 @@ public class ObservableBenchmarkJobRegistry extends ObservableValueRegistry<Benc
 			return t.getStateAsync(exec).getNow(null);
 		}, removeConsumer);
 	}
+	
+	@Override
+	public UpdatableBenchmarkJob addIfAbsent(BenchmarkJob value) {
+		return (UpdatableBenchmarkJob) super.addIfAbsent(value);
+	}
+	
+	@Override
+	public UpdatableBenchmarkJob get(BenchmarkJob value) {
+		return (UpdatableBenchmarkJob) super.get(value);
+	}
+	
+	@Override
+	protected ObservableValue<BenchmarkJob> remove(BenchmarkJob value) {
+		UpdatableBenchmarkJob result = (UpdatableBenchmarkJob) super.remove(value);
+		result.removed();
+		return result;
+	}
+	
+	@Override
+	protected UpdatableObservableValue<BenchmarkJob> constructObservableValue(BenchmarkJob v,
+			Function<BenchmarkJob, UpdateStatus> updateFunction, Function<BenchmarkJob, Object> stateProvider) {
+		return new UpdatableBenchmarkJob(v, updateFunction, stateProvider);
+	}
+	
+	
 
 	private static UpdateStatus update(BenchmarkJob t, Executor executor) {
 		if (!Files.isDirectory(t.getDirectory())) {
diff --git a/haas-spim-benchmark/src/test/java/cz/it4i/fiji/haas/RunBenchmark.java b/haas-spim-benchmark/src/test/java/cz/it4i/fiji/haas/RunBenchmark.java
index 1e6f8cfa..6a5fbd4b 100644
--- a/haas-spim-benchmark/src/test/java/cz/it4i/fiji/haas/RunBenchmark.java
+++ b/haas-spim-benchmark/src/test/java/cz/it4i/fiji/haas/RunBenchmark.java
@@ -34,7 +34,7 @@ public class RunBenchmark {
 				if (state == JobState.Configuring) {
 					job.startJob(new DummyProgress());
 				} else if (state != JobState.Running && state != JobState.Queued) {
-					job.startDownload(new DummyProgress());
+					job.startDownload();
 				} else if (state == JobState.Running) {
 					log.info(job.getSnakemakeOutput());
 				}
-- 
GitLab