From 5b48841641de5de36b1187c41cb4174700c3e2ab Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jan=20Ko=C5=BEusznik?= <jan@kozusznik.cz>
Date: Wed, 25 Apr 2018 13:14:27 +0200
Subject: [PATCH] feature: services for synchronization

---
 .../src/main/java/cz/it4i/fiji/haas/Job.java  | 119 ++++++++++++------
 .../java/cz/it4i/fiji/haas/JobManager.java    |   7 --
 .../{FileRepository.java => FileIndex.java}   |  36 +++---
 .../data_transfer/SimpleThreadRunner.java     |  28 +++++
 .../haas/data_transfer/Synchronization.java   |  56 +++++++--
 .../haas/data_transfer/UploadingFileImpl.java |  17 ++-
 .../core/BenchmarkJobManager.java             |   8 +-
 7 files changed, 189 insertions(+), 82 deletions(-)
 rename haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/{FileRepository.java => FileIndex.java} (60%)
 create mode 100644 haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/SimpleThreadRunner.java

diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java
index 41c3b33f..162a63a8 100644
--- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java
+++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java
@@ -4,21 +4,23 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.Executors;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import cz.it4i.fiji.haas.JobManager.JobManager4Job;
 import cz.it4i.fiji.haas.JobManager.JobSynchronizableFile;
+import cz.it4i.fiji.haas.data_transfer.Synchronization;
 import cz.it4i.fiji.haas_java_client.DummyProgressNotifier;
 import cz.it4i.fiji.haas_java_client.HaaSClient;
 import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile;
@@ -31,6 +33,8 @@ import net.imagej.updater.util.Progress;
 public class Job {
 
 	private static final String JOB_NAME = "job.name";
+	
+	private static final String JOB_NEEDS_UPLOAD = "job.needs_upload";
 
 	public static boolean isJobPath(Path p) {
 		return isValidPath(p);
@@ -45,45 +49,67 @@ public class Job {
 	private Supplier<HaaSClient> haasClientSupplier;
 
 	// private JobState state;
-	//private Boolean needsDownload;
+	// private Boolean needsDownload;
 	private JobInfo jobInfo;
 	private Long jobId;
-
-	
 	private PropertyHolder propertyHolder;
-
 	private JobManager4Job jobManager;
+	private Synchronization synchronization;
 
-	public Job(JobManager4Job jobManager, String name, Path basePath, Supplier<HaaSClient> haasClientSupplier) throws IOException {
-		this(jobManager,haasClientSupplier);
+	public Job(JobManager4Job jobManager, String name, Path basePath, Supplier<HaaSClient> haasClientSupplier)
+			throws IOException {
+		this(jobManager, haasClientSupplier);
 		HaaSClient client = getHaaSClient();
 		long id = client.createJob(name, Collections.emptyList());
 		jobDir = basePath.resolve("" + id);
 		propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILE));
 		Files.createDirectory(jobDir);
 		setName(name);
-		
-	}
 
-	public void setName(String name) {
-		setProperty(JOB_NAME, name);
 	}
 
-	public Job(JobManager4Job jobManager,Path p, Supplier<HaaSClient> haasClientSupplier) {
+	public Job(JobManager4Job jobManager, Path jobDirectory, Supplier<HaaSClient> haasClientSupplier) {
 		this(jobManager, haasClientSupplier);
-		jobDir = p;
+		jobDir = jobDirectory;
 		propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILE));
+		resumeSynchronization();
 	}
 
-	public void uploadFiles(Iterable<UploadingFile> files, Progress notifier) {
-		HaaSClient client = getHaaSClient();
-		try(HaaSFileTransfer transfer = client.startFileTransfer(getId(), new P_ProgressNotifierAdapter(notifier))){
-			transfer.upload(files);
+	private Job(JobManager4Job jobManager, Supplier<HaaSClient> haasClientSupplier) {
+		this.haasClientSupplier = haasClientSupplier;
+		this.jobManager = jobManager;
+		try {
+			this.synchronization = new Synchronization(()->haasClientSupplier.get().startFileTransfer(getId(), new DummyProgressNotifier()), jobDir, Executors.newFixedThreadPool(2), ()->  {
+				setProperty(JOB_NEEDS_UPLOAD, false);
+			});
+		} catch (IOException e) {
+			log.error(e.getMessage(), e);
+			throw new RuntimeException(e);
 		}
 	}
 
-	public void uploadFilesByName(Iterable<String> files, Progress notifier) {
-		Iterable<UploadingFile> uploadingFiles = StreamSupport.stream(files.spliterator(), false)
+	public void startUploadData()  {
+		setProperty(JOB_INFO_FILE, true);
+		try {
+			this.synchronization.startUpload();
+		} catch (IOException e) {
+			log.error(e.getMessage(), e);
+			throw new RuntimeException(e);
+		}
+	}
+	
+	public void stopUploadData()  {
+		setProperty(JOB_INFO_FILE, false);
+		try {
+			this.synchronization.stopUpload();
+		} catch (IOException e) {
+			log.error(e.getMessage(), e);
+			throw new RuntimeException(e);
+		}
+	}
+	
+	public void uploadFile(String file, Progress notifier) {
+		Iterable<UploadingFile> uploadingFiles = Arrays.asList(file).stream()
 				.map((String name) -> HaaSClient.getUploadingFile(jobDir.resolve(name))).collect(Collectors.toList());
 		uploadFiles(uploadingFiles, notifier);
 	}
@@ -93,13 +119,6 @@ public class Job {
 		client.submitJob(jobId);
 	}
 
-	private Job(JobManager4Job jobManager, Supplier<HaaSClient> haasClientSupplier) {
-		this.haasClientSupplier = haasClientSupplier;
-		this.jobManager = jobManager;
-	}
-
-	
-
 	synchronized public long getId() {
 		if (jobId == null) {
 			jobId = getJobId(jobDir);
@@ -120,8 +139,11 @@ public class Job {
 	}
 
 	synchronized public void download(Predicate<String> predicate, Progress notifier) {
-		try (HaaSFileTransfer fileTransfer = getHaaSClient().startFileTransfer(jobId, new P_ProgressNotifierAdapter(notifier))) {
-			fileTransfer.download(getHaaSClient().getChangedFiles(jobId).stream().filter(predicate).collect(Collectors.toList()), jobDir);
+		try (HaaSFileTransfer fileTransfer = getHaaSClient().startFileTransfer(jobId,
+				new P_ProgressNotifierAdapter(notifier))) {
+			fileTransfer.download(
+					getHaaSClient().getChangedFiles(jobId).stream().filter(predicate).collect(Collectors.toList()),
+					jobDir);
 		}
 	}
 
@@ -153,10 +175,14 @@ public class Job {
 		return Files.newInputStream(jobDir.resolve(name));
 	}
 
-	public void setProperty(String name, String value) {
+	public synchronized void setProperty(String name, String value) {
 		propertyHolder.setValue(name, value);
 	}
 
+	public void setProperty(String jobNeedsUpload, boolean b) {
+		propertyHolder.setValue(jobNeedsUpload, "" + b);
+	}
+
 	public String getProperty(String name) {
 		return propertyHolder.getValue(name);
 	}
@@ -168,24 +194,41 @@ public class Job {
 	public Path getDirectory() {
 		return jobDir;
 	}
-	
+
 	public boolean remove() {
 		boolean result;
-		if((result = jobManager.remove(this)) && Files.isDirectory(jobDir) ) {
+		if ((result = jobManager.remove(this)) && Files.isDirectory(jobDir)) {
 			List<Path> pathsToDelete;
 			try {
 				pathsToDelete = Files.walk(jobDir).sorted(Comparator.reverseOrder()).collect(Collectors.toList());
-				for(Path path : pathsToDelete) {
-				    Files.deleteIfExists(path);
+				for (Path path : pathsToDelete) {
+					Files.deleteIfExists(path);
 				}
 			} catch (IOException e) {
 				log.error(e.getMessage(), e);
 			}
-			
+
 		}
 		return result;
 	}
-	
+
+	private synchronized void resumeSynchronization() {
+		if(Boolean.parseBoolean(getProperty(JOB_NEEDS_UPLOAD))) {
+			synchronization.resumeUpload();
+		}
+	}
+
+	private void uploadFiles(Iterable<UploadingFile> files, Progress notifier) {
+		HaaSClient client = getHaaSClient();
+		try (HaaSFileTransfer transfer = client.startFileTransfer(getId(), new P_ProgressNotifierAdapter(notifier))) {
+			transfer.upload(files);
+		}
+	}
+
+	private void setName(String name) {
+		setProperty(JOB_NAME, name);
+	}
+
 	private HaaSClient getHaaSClient() {
 		return this.haasClientSupplier.get();
 	}
@@ -257,7 +300,7 @@ public class Job {
 	}
 
 	public List<Long> getFileSizes(List<String> names) {
-		
+
 		try (HaaSFileTransfer transfer = getHaaSClient().startFileTransfer(getId(), new DummyProgressNotifier())) {
 			return transfer.obtainSize(names);
 		}
@@ -269,8 +312,4 @@ public class Job {
 		}
 	}
 
-	
-
-	
-
 }
diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java
index 0c627041..77116f51 100644
--- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java
+++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java
@@ -12,7 +12,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import cz.it4i.fiji.haas_java_client.HaaSClient;
-import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile;
 import cz.it4i.fiji.haas_java_client.JobState;
 import cz.it4i.fiji.haas_java_client.Settings;
 import cz.it4i.fiji.haas_java_client.SynchronizableFileType;
@@ -57,12 +56,6 @@ public class JobManager {
 		return job;
 	}
 
-	public Job startJob(Iterable<UploadingFile> files, Progress notifier) throws IOException {
-		Job result = createJob();
-		result.uploadFiles(files, notifier);
-		result.submit();
-		return result;
-	}
 
 	public Collection<Job> getJobs() {
 		if (jobs == null) {
diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/FileRepository.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/FileIndex.java
similarity index 60%
rename from haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/FileRepository.java
rename to haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/FileIndex.java
index 59ed59d8..40a1bc4a 100644
--- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/FileRepository.java
+++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/FileIndex.java
@@ -7,51 +7,57 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.LinkedHashSet;
+import java.util.Queue;
 import java.util.Set;
 
-public class FileRepository {
+public class FileIndex {
 
 	private Path workingFile;
-	
+
 	private Set<Path> files = new LinkedHashSet<>();
 
-	public FileRepository(Path workingFile) throws IOException {
+	public FileIndex(Path workingFile) throws IOException {
 		this.workingFile = workingFile;
 		loadFromFile();
 	}
-	
+
 	public synchronized void storeToFile() throws IOException {
-		try(BufferedWriter bw = Files.newBufferedWriter(workingFile)) {
-			for(Path file: files) {
+		try (BufferedWriter bw = Files.newBufferedWriter(workingFile)) {
+			for (Path file : files) {
 				bw.write(file.toString() + "\n");
 			}
 		}
 	}
-	
 
 	public synchronized boolean needsDownload(Path file) {
-		return files.contains(file);
+		return files.add(file);
 	}
 
 	public synchronized void uploaded(Path p) {
-		files.add(p);
-		
+		files.remove(p);
+
+	}
+
+	public synchronized void fileUploadQueue(Queue<Path> toUpload) {
+		toUpload.addAll(files);
 	}
 
-	
 	private void loadFromFile() throws IOException {
 		files.clear();
-		try(BufferedReader br = Files.newBufferedReader(workingFile)) {
+		try (BufferedReader br = Files.newBufferedReader(workingFile)) {
 			String line;
-			while(null != (line = br.readLine())) {
+			while (null != (line = br.readLine())) {
 				processLine(line);
 			}
 		}
 	}
 
-
-
 	private void processLine(String line) {
 		files.add(Paths.get(line));
 	}
+
+	public void clear() throws IOException {
+		files.clear();
+		storeToFile();
+	}
 }
diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/SimpleThreadRunner.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/SimpleThreadRunner.java
new file mode 100644
index 00000000..4a8237ba
--- /dev/null
+++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/SimpleThreadRunner.java
@@ -0,0 +1,28 @@
+package cz.it4i.fiji.haas.data_transfer;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+public class SimpleThreadRunner {
+	private ExecutorService service;
+	private AtomicBoolean reRun = new AtomicBoolean(false);
+
+	public SimpleThreadRunner(ExecutorService service) {
+		this.service = service;
+	}
+
+	public void runIfNotRunning(Consumer<AtomicBoolean> r) {
+		synchronized (this) {
+			if (reRun.get()) {
+				return;
+			}
+			reRun.set(true);
+		}
+		service.execute(() -> {
+			do {
+				r.accept(reRun);
+			} while (reRun.get());
+		});
+	}
+}
diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java
index bc4c971a..5a0464ff 100644
--- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java
+++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java
@@ -8,6 +8,7 @@ import java.util.Arrays;
 import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 
 import org.slf4j.Logger;
@@ -26,24 +27,50 @@ public class Synchronization {
 	
 	private Queue<Path> toUpload = new LinkedBlockingQueue<>();
 	
-	private ExecutorService service;
+	private FileIndex fileRepository;
 	
-	private FileRepository fileRepository;
+	private SimpleThreadRunner runnerForUpload;
 	
+	private boolean startUploadFinished;
+
+	private Runnable uploadFinishedNotifier;
 	
-	void upload() throws IOException {
+	public Synchronization(Supplier<HaaSFileTransfer> fileTransferSupplier, Path workingDirectory,
+			ExecutorService service, Runnable uploadFinishedNotifier ) throws IOException {
+		this.fileTransferSupplier = fileTransferSupplier;
+		this.workingDirectory = workingDirectory;
+		this.fileRepository = new FileIndex(workingDirectory);
+		this.runnerForUpload = new SimpleThreadRunner(service);
+		this.uploadFinishedNotifier = uploadFinishedNotifier;
+	}
+
+	public synchronized void startUpload() throws IOException {
+		startUploadFinished = false;
+		fileRepository.clear();
 		try(DirectoryStream<Path> ds = Files.newDirectoryStream(workingDirectory,this::isNotHidden)) {
 			for (Path file : ds) {
-				if(needsUpload(file)) {
-					toUpload.add(file);
-					service.execute(this::doRun);
-				}
+				fileRepository.needsDownload(file);
+				toUpload.add(file);
+				runnerForUpload.runIfNotRunning(this::doUpload);
 			}
+		} finally {
+			startUploadFinished = true;
+			fileRepository.storeToFile();
+			
 		}
+	
 	}
 
-	private boolean needsUpload(Path file) {
-		return fileRepository.needsDownload(file);
+	public void stopUpload() throws IOException {
+		toUpload.clear();
+		fileRepository.clear();
+	}
+
+	public void resumeUpload() {
+		fileRepository.fileUploadQueue(toUpload);
+		if(!toUpload.isEmpty()) {
+			runnerForUpload.runIfNotRunning(this::doUpload);
+		}
 	}
 
 	private boolean isNotHidden(Path file) {
@@ -51,17 +78,23 @@ public class Synchronization {
 		return !file.getFileName().toString().matches("[.][^.]+");
 	}
 	
-	private void doRun() {
+	private void doUpload(AtomicBoolean reRun) {
 		try(HaaSFileTransfer tr = fileTransferSupplier.get()) {
 			while (!toUpload.isEmpty()) {
 				Path p = toUpload.poll();
 				UploadingFile uf = createUploadingFile(p);
 				tr.upload(Arrays.asList(uf));
 				fileUploaded(p);
+				reRun.set(false);
 			}
 		} finally {
 			try {
 				fileRepository.storeToFile();
+				synchronized(this) {
+					if(startUploadFinished) {
+						uploadFinishedNotifier.run();
+					}
+				}
 			} catch (IOException e) {
 				log.error(e.getMessage(), e);
 			}
@@ -73,7 +106,6 @@ public class Synchronization {
 	}
 
 	private UploadingFile createUploadingFile(Path p) {
-		
-		return null;
+		return new UploadingFileImpl(p);
 	}
 }
diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/UploadingFileImpl.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/UploadingFileImpl.java
index 0cd50289..ac3a403a 100644
--- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/UploadingFileImpl.java
+++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/UploadingFileImpl.java
@@ -5,12 +5,17 @@ import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile;
 
-public class UploadingFileImpl implements UploadingFile{
+public class UploadingFileImpl implements UploadingFile {
+
+	public static final Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.data_transfer.UploadingFileImpl.class);
 
 	private final Path path;
-	
+
 	public UploadingFileImpl(Path path) {
 		this.path = path;
 	}
@@ -32,8 +37,12 @@ public class UploadingFileImpl implements UploadingFile{
 
 	@Override
 	public long getLastTime() {
-		// TODO Auto-generated method stub
-		return 0;
+		try {
+			return Files.getLastModifiedTime(path).toMillis();
+		} catch (IOException e) {
+			log.error(e.getMessage(), e);
+			return 0;
+		}
 	}
 
 }
diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java
index 1a90371e..e6f4dcfe 100644
--- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java
+++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java
@@ -74,7 +74,7 @@ public class BenchmarkJobManager {
 		}
 
 		public synchronized void startJob(Progress progress) throws IOException {
-			job.uploadFilesByName(Arrays.asList(Constants.CONFIG_YAML), progress);
+			job.uploadFile(Constants.CONFIG_YAML, progress);
 			String outputName = getOutputName(job.openLocalFile(Constants.CONFIG_YAML));
 			verifiedState = null;
 			verifiedStateProcessed = false;
@@ -109,10 +109,10 @@ public class BenchmarkJobManager {
 			verifiedStateProcessed = true;
 			return CompletableFuture.supplyAsync(() -> {
 				try {
-					verifiedState = 
-							Stream.concat(Arrays.asList(state).stream(), getTasks().stream().filter(task->!task.getDescription().equals(Constants.DONE_TASK))
+					verifiedState = Stream.concat(Arrays.asList(state).stream(),
+							getTasks().stream().filter(task -> !task.getDescription().equals(Constants.DONE_TASK))
 									.flatMap(task -> task.getComputations().stream()).map(tc -> tc.getState()))
-									.max(new JobStateComparator()).get();
+							.max(new JobStateComparator()).get();
 					if (verifiedState != JobState.Finished && verifiedState != JobState.Canceled) {
 						verifiedState = JobState.Failed;
 					}
-- 
GitLab