From aef81b57fba4b2826778666477c1853afc9465a6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jan=20Ko=C5=BEusznik?= <jan@kozusznik.cz>
Date: Wed, 25 Apr 2018 15:45:31 +0200
Subject: [PATCH] feat: support of upload in UI

---
 .../src/main/java/cz/it4i/fiji/haas/Job.java  | 50 ++++++-----
 .../fiji/haas/data_transfer/FileIndex.java    | 22 +++--
 .../haas/data_transfer/Synchronization.java   | 35 ++++----
 .../core/BenchmarkJobManager.java             | 84 +++++++++++--------
 .../haas_spim_benchmark/ui/BenchmarkSPIM.fxml |  8 +-
 .../ui/BenchmarkSPIMController.java           |  3 +
 6 files changed, 121 insertions(+), 81 deletions(-)

diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java
index 162a63a8..d192e5cf 100644
--- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java
+++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java
@@ -35,13 +35,15 @@ public class Job {
 	private static final String JOB_NAME = "job.name";
 	
 	private static final String JOB_NEEDS_UPLOAD = "job.needs_upload";
+	
+	private static final String JOB_INFO_FILENAME = ".jobinfo";
+
 
 	public static boolean isJobPath(Path p) {
 		return isValidPath(p);
 	}
 
-	private static String JOB_INFO_FILE = ".jobinfo";
-
+	
 	private static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.Job.class);
 
 	private Path jobDir;
@@ -61,8 +63,8 @@ public class Job {
 		this(jobManager, haasClientSupplier);
 		HaaSClient client = getHaaSClient();
 		long id = client.createJob(name, Collections.emptyList());
-		jobDir = basePath.resolve("" + id);
-		propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILE));
+		setJobDirectory(basePath.resolve("" + id));
+		propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILENAME));
 		Files.createDirectory(jobDir);
 		setName(name);
 
@@ -70,26 +72,20 @@ public class Job {
 
 	public Job(JobManager4Job jobManager, Path jobDirectory, Supplier<HaaSClient> haasClientSupplier) {
 		this(jobManager, haasClientSupplier);
-		jobDir = jobDirectory;
-		propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILE));
-		resumeSynchronization();
+		setJobDirectory(jobDirectory);
+		propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILENAME));
+		resumeUpload();
 	}
 
+	
+
 	private Job(JobManager4Job jobManager, Supplier<HaaSClient> haasClientSupplier) {
 		this.haasClientSupplier = haasClientSupplier;
 		this.jobManager = jobManager;
-		try {
-			this.synchronization = new Synchronization(()->haasClientSupplier.get().startFileTransfer(getId(), new DummyProgressNotifier()), jobDir, Executors.newFixedThreadPool(2), ()->  {
-				setProperty(JOB_NEEDS_UPLOAD, false);
-			});
-		} catch (IOException e) {
-			log.error(e.getMessage(), e);
-			throw new RuntimeException(e);
-		}
 	}
 
 	public void startUploadData()  {
-		setProperty(JOB_INFO_FILE, true);
+		setProperty(JOB_NEEDS_UPLOAD, true);
 		try {
 			this.synchronization.startUpload();
 		} catch (IOException e) {
@@ -99,7 +95,7 @@ public class Job {
 	}
 	
 	public void stopUploadData()  {
-		setProperty(JOB_INFO_FILE, false);
+		setProperty(JOB_NEEDS_UPLOAD, false);
 		try {
 			this.synchronization.stopUpload();
 		} catch (IOException e) {
@@ -108,6 +104,10 @@ public class Job {
 		}
 	}
 	
+	public boolean isUploading() {
+		return Boolean.parseBoolean(getProperty(JOB_NEEDS_UPLOAD));
+	}
+
 	public void uploadFile(String file, Progress notifier) {
 		Iterable<UploadingFile> uploadingFiles = Arrays.asList(file).stream()
 				.map((String name) -> HaaSClient.getUploadingFile(jobDir.resolve(name))).collect(Collectors.toList());
@@ -212,7 +212,19 @@ public class Job {
 		return result;
 	}
 
-	private synchronized void resumeSynchronization() {
+	private void setJobDirectory(Path jobDirectory) {
+		this.jobDir = jobDirectory;
+		try {
+			this.synchronization = new Synchronization(()->haasClientSupplier.get().startFileTransfer(getId(), new DummyProgressNotifier()), jobDir, Executors.newFixedThreadPool(2), ()->  {
+				setProperty(JOB_NEEDS_UPLOAD, false);
+			});
+		} catch (IOException e) {
+			log.error(e.getMessage(), e);
+			throw new RuntimeException(e);
+		}
+	}
+	
+	private synchronized void resumeUpload() {
 		if(Boolean.parseBoolean(getProperty(JOB_NEEDS_UPLOAD))) {
 			synchronization.resumeUpload();
 		}
@@ -251,7 +263,7 @@ public class Job {
 		} catch (NumberFormatException e) {
 			return false;
 		}
-		return Files.isRegularFile(path.resolve(JOB_INFO_FILE));
+		return Files.isRegularFile(path.resolve(JOB_INFO_FILENAME));
 	}
 
 	private static long getJobId(Path path) {
diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/FileIndex.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/FileIndex.java
index 40a1bc4a..b3a8753b 100644
--- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/FileIndex.java
+++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/FileIndex.java
@@ -10,8 +10,13 @@ import java.util.LinkedHashSet;
 import java.util.Queue;
 import java.util.Set;
 
-public class FileIndex {
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+public class FileIndex {
+	
+	public static final Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.data_transfer.FileIndex.class);
+	
 	private Path workingFile;
 
 	private Set<Path> files = new LinkedHashSet<>();
@@ -29,25 +34,26 @@ public class FileIndex {
 		}
 	}
 
-	public synchronized boolean needsDownload(Path file) {
+	public synchronized boolean insert(Path file) {
 		return files.add(file);
 	}
 
 	public synchronized void uploaded(Path p) {
 		files.remove(p);
-
 	}
 
-	public synchronized void fileUploadQueue(Queue<Path> toUpload) {
+	public synchronized void fillQueue(Queue<Path> toUpload) {
 		toUpload.addAll(files);
 	}
 
 	private void loadFromFile() throws IOException {
 		files.clear();
-		try (BufferedReader br = Files.newBufferedReader(workingFile)) {
-			String line;
-			while (null != (line = br.readLine())) {
-				processLine(line);
+		if(Files.exists(workingFile)) {
+			try (BufferedReader br = Files.newBufferedReader(workingFile)) {
+				String line;
+				while (null != (line = br.readLine())) {
+					processLine(line);
+				}
 			}
 		}
 	}
diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java
index 5a0464ff..1b09f99f 100644
--- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java
+++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java
@@ -21,6 +21,8 @@ public class Synchronization {
 
 	public static final Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.data_transfer.Synchronization.class);
 	
+	private static final String FILE_INDEX_FILENAME = ".toUploadFiles";
+	
 	private Supplier<HaaSFileTransfer> fileTransferSupplier;
 	
 	private Path workingDirectory;
@@ -39,7 +41,7 @@ public class Synchronization {
 			ExecutorService service, Runnable uploadFinishedNotifier ) throws IOException {
 		this.fileTransferSupplier = fileTransferSupplier;
 		this.workingDirectory = workingDirectory;
-		this.fileRepository = new FileIndex(workingDirectory);
+		this.fileRepository = new FileIndex(workingDirectory.resolve(FILE_INDEX_FILENAME));
 		this.runnerForUpload = new SimpleThreadRunner(service);
 		this.uploadFinishedNotifier = uploadFinishedNotifier;
 	}
@@ -49,7 +51,7 @@ public class Synchronization {
 		fileRepository.clear();
 		try(DirectoryStream<Path> ds = Files.newDirectoryStream(workingDirectory,this::isNotHidden)) {
 			for (Path file : ds) {
-				fileRepository.needsDownload(file);
+				fileRepository.insert(file);
 				toUpload.add(file);
 				runnerForUpload.runIfNotRunning(this::doUpload);
 			}
@@ -58,20 +60,23 @@ public class Synchronization {
 			fileRepository.storeToFile();
 			
 		}
-	
 	}
-
+	
 	public void stopUpload() throws IOException {
 		toUpload.clear();
 		fileRepository.clear();
 	}
 
 	public void resumeUpload() {
-		fileRepository.fileUploadQueue(toUpload);
+		fileRepository.fillQueue(toUpload);
 		if(!toUpload.isEmpty()) {
 			runnerForUpload.runIfNotRunning(this::doUpload);
 		}
 	}
+	
+	public void startDownload() {
+		
+	}
 
 	private boolean isNotHidden(Path file) {
 		
@@ -83,26 +88,28 @@ public class Synchronization {
 			while (!toUpload.isEmpty()) {
 				Path p = toUpload.poll();
 				UploadingFile uf = createUploadingFile(p);
+				log.info("upload: " + p);
 				tr.upload(Arrays.asList(uf));
 				fileUploaded(p);
+				log.info("uploaded: " + p);
 				reRun.set(false);
 			}
 		} finally {
-			try {
-				fileRepository.storeToFile();
-				synchronized(this) {
-					if(startUploadFinished) {
-						uploadFinishedNotifier.run();
-					}
+			synchronized (this) {
+				if (startUploadFinished) {
+					uploadFinishedNotifier.run();
 				}
-			} catch (IOException e) {
-				log.error(e.getMessage(), e);
 			}
 		}
 	}
 
 	private void fileUploaded(Path p) {
-		fileRepository.uploaded(p);
+		try {
+			fileRepository.uploaded(p);
+			fileRepository.storeToFile();
+		} catch (IOException e) {
+			log.error(e.getMessage(), e);
+		}
 	}
 
 	private UploadingFile createUploadingFile(Path p) {
diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java
index e6f4dcfe..1fcdc569 100644
--- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java
+++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java
@@ -87,6 +87,18 @@ public class BenchmarkJobManager {
 		public JobState getState() {
 			return getStateAsync(r -> r.run()).getNow(JobState.Unknown);
 		}
+		
+		public void startUpload() {
+			job.startUploadData();
+		}
+		
+		public void stopUpload() {
+			job.stopUploadData();
+		}
+		
+		public boolean isUploading() {
+			return job.isUploading();
+		}
 
 		public synchronized CompletableFuture<JobState> getStateAsync(Executor executor) {
 			if (running != null) {
@@ -99,42 +111,6 @@ public class BenchmarkJobManager {
 			return result;
 		}
 
-		private synchronized CompletableFuture<JobState> doGetStateAsync(Executor executor) {
-			JobState state = job.getState();
-			if (state != JobState.Finished) {
-				return CompletableFuture.completedFuture(state);
-			} else if (verifiedState != null) {
-				return CompletableFuture.completedFuture(verifiedState);
-			}
-			verifiedStateProcessed = true;
-			return CompletableFuture.supplyAsync(() -> {
-				try {
-					verifiedState = Stream.concat(Arrays.asList(state).stream(),
-							getTasks().stream().filter(task -> !task.getDescription().equals(Constants.DONE_TASK))
-									.flatMap(task -> task.getComputations().stream()).map(tc -> tc.getState()))
-							.max(new JobStateComparator()).get();
-					if (verifiedState != JobState.Finished && verifiedState != JobState.Canceled) {
-						verifiedState = JobState.Failed;
-					}
-					synchronized (BenchmarkJob.this) {
-						// test whether job was restarted - it sets running to null
-						if (!verifiedStateProcessed) {
-							verifiedState = null;
-							return doGetStateAsync(r -> r.run()).getNow(null);
-						}
-						running = null;
-						return verifiedState;
-					}
-				} finally {
-					synchronized (BenchmarkJob.this) {
-						if (running != null) {
-							running = null;
-						}
-					}
-				}
-			}, executor);
-		}
-
 		public void downloadData(Progress progress) throws IOException {
 			if (job.getState() == JobState.Finished) {
 				String filePattern = job.getProperty(SPIM_OUTPUT_FILENAME_PATTERN);
@@ -242,6 +218,42 @@ public class BenchmarkJobManager {
 			return "" + getId();
 		}
 
+		private synchronized CompletableFuture<JobState> doGetStateAsync(Executor executor) {
+			JobState state = job.getState();
+			if (state != JobState.Finished) {
+				return CompletableFuture.completedFuture(state);
+			} else if (verifiedState != null) {
+				return CompletableFuture.completedFuture(verifiedState);
+			}
+			verifiedStateProcessed = true;
+			return CompletableFuture.supplyAsync(() -> {
+				try {
+					verifiedState = Stream.concat(Arrays.asList(state).stream(),
+							getTasks().stream().filter(task -> !task.getDescription().equals(Constants.DONE_TASK))
+									.flatMap(task -> task.getComputations().stream()).map(tc -> tc.getState()))
+							.max(new JobStateComparator()).get();
+					if (verifiedState != JobState.Finished && verifiedState != JobState.Canceled) {
+						verifiedState = JobState.Failed;
+					}
+					synchronized (BenchmarkJob.this) {
+						// test whether job was restarted - it sets running to null
+						if (!verifiedStateProcessed) {
+							verifiedState = null;
+							return doGetStateAsync(r -> r.run()).getNow(null);
+						}
+						running = null;
+						return verifiedState;
+					}
+				} finally {
+					synchronized (BenchmarkJob.this) {
+						if (running != null) {
+							running = null;
+						}
+					}
+				}
+			}, executor);
+		}
+
 		private String getStringFromTimeSafely(Calendar time) {
 			return time != null ? time.getTime().toString() : "N/A";
 		}
diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIM.fxml b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIM.fxml
index 19a0dada..25ad67bc 100644
--- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIM.fxml
+++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIM.fxml
@@ -1,25 +1,25 @@
 <?xml version="1.0" encoding="UTF-8"?>
 
+<?import javafx.geometry.Insets?>
 <?import javafx.scene.control.TableColumn?>
 <?import javafx.scene.control.TableView?>
 <?import javafx.scene.layout.BorderPane?>
 <?import javafx.scene.layout.HBox?>
-<?import javafx.geometry.Insets?>
 
-<fx:root type="BorderPane" xmlns="http://javafx.com/javafx/8.0.65"
-	xmlns:fx="http://javafx.com/fxml/1" fx:controller="cz.it4i.fiji.haas_spim_benchmark.ui.BenchmarkSPIMController">
+<fx:root type="BorderPane" xmlns="http://javafx.com/javafx/8.0.65" xmlns:fx="http://javafx.com/fxml/1" fx:controller="cz.it4i.fiji.haas_spim_benchmark.ui.BenchmarkSPIMController">
 	<center>
 
 		<HBox>
 			<children>
 
-				<TableView fx:id="jobs" HBox.hgrow="ALWAYS">
+				<TableView fx:id="jobs" prefHeight="400.0" prefWidth="1065.0" HBox.hgrow="ALWAYS">
 					<columns>
 						<TableColumn prefWidth="75.0" text="Job Id" />
 						<TableColumn prefWidth="149.0" text="Status" />
 						<TableColumn prefWidth="230.0" text="Creation time" />
 						<TableColumn prefWidth="230.0" text="Start time" />
 						<TableColumn prefWidth="230.0" text="End Time" />
+                  <TableColumn prefWidth="150.0" text="Upload" />
 					</columns>
 				</TableView>
 			</children>
diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java
index 2dd990df..a8dc9929 100644
--- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java
+++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java
@@ -108,6 +108,9 @@ public class BenchmarkSPIMController extends BorderPane implements CloseableCont
 				j -> j.getState() == JobState.Running || j.getState() == JobState.Finished
 						|| j.getState() == JobState.Failed || j.getState() == JobState.Canceled));
 
+		menu.addItem("Upload data", job -> executeWSCallAsync("Uploading data", p -> job.getValue().startUpload()),
+				job -> JavaFXRoutines.notNullValue(job, j -> !EnumSet.of(JobState.Running).contains(j.getState())));
+		
 		menu.addItem("Download result",
 				job -> executeWSCallAsync("Downloading data", p -> job.getValue().downloadData(p)),
 				job -> JavaFXRoutines.notNullValue(job,
-- 
GitLab