diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java index 64a16889bd262e6916f0b643ba5bf6d86d1365ef..1c4bd03428ad11d3a65be3c4924bd6d3a923b3a1 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java @@ -12,6 +12,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -140,11 +141,14 @@ public class Job { } } - public void startDownload(Predicate<String> predicate) throws IOException { - setProperty(JOB_NEEDS_DOWNLOAD, true); + public CompletableFuture<?> startDownload(Predicate<String> predicate) throws IOException { Collection<String> files = getHaaSClient().getChangedFiles(jobId).stream().filter(predicate) .collect(Collectors.toList()); - synchronization.startDownload(files); + if(files.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + setProperty(JOB_NEEDS_DOWNLOAD, true); + return synchronization.startDownload(files); } public void stopDownloadData() { diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersistentSynchronizationProcess.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersistentSynchronizationProcess.java index 1811451cef1cbc4d292853c87c0d33bdc4ce4655..c7ed76b38aea3207daad0adb3bb79bd1cb0f6a94 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersistentSynchronizationProcess.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersistentSynchronizationProcess.java @@ -7,6 +7,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Queue; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -58,7 +59,7 @@ public abstract class PersistentSynchronizationProcess<T> { this.index = new PersistentIndex<>(indexFile, convertor); } - public synchronized void start() throws IOException { + public synchronized CompletableFuture<?> start() throws IOException { startFinished = false; index.clear(); try { @@ -66,7 +67,7 @@ public abstract class PersistentSynchronizationProcess<T> { index.insert(item); toProcessQueue.add(item); } - runner.runIfNotRunning(this::doProcess); + return runner.runIfNotRunning(this::doProcess); } finally { startFinished = true; index.storeToWorkingFile(); @@ -115,7 +116,13 @@ public abstract class PersistentSynchronizationProcess<T> { } this.notifier.itemDone(INIT_TRANSFER_ITEM); this.notifier.done(); - while (!interrupted && !toProcessQueue.isEmpty()) { + do { + synchronized (reRun) { + if(interrupted || toProcessQueue.isEmpty()) { + reRun.set(false); + break; + } + } T p = toProcessQueue.poll(); String item = p.toString(); notifier.addItem(item); @@ -127,8 +134,7 @@ public abstract class PersistentSynchronizationProcess<T> { interrupted = true; } notifier.itemDone(item); - reRun.set(false); - } + } while(true); } finally { runningTransferThreads.remove(Thread.currentThread()); synchronized (this) { diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/SimpleThreadRunner.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/SimpleThreadRunner.java index 3b1dea3f3acaecf0552551f1d4722d178b08d2ef..5a5ea2684913207a096bf6d7cc3a740a70ac75a6 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/SimpleThreadRunner.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/SimpleThreadRunner.java @@ -1,5 +1,6 @@ package cz.it4i.fiji.haas.data_transfer; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -7,22 +8,23 @@ import java.util.function.Consumer; public class SimpleThreadRunner { private final ExecutorService service; private final AtomicBoolean reRun = new AtomicBoolean(false); + private CompletableFuture<?> lastRun; public SimpleThreadRunner(ExecutorService service) { this.service = service; } - public void runIfNotRunning(Consumer<AtomicBoolean> r) { - synchronized (this) { + synchronized public CompletableFuture<?> runIfNotRunning(Consumer<AtomicBoolean> r) { + synchronized (reRun) { if (reRun.get()) { - return; + return lastRun; } reRun.set(true); } - service.execute(() -> { + return lastRun = CompletableFuture.runAsync(() -> { do { r.accept(reRun); } while (reRun.get()); - }); + }, service); } } diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java index f2f2181efb8e24e8db9ebd754ab58384fd78b9bc..cac8b78db0c9000f0fb8a24fa5758f8171acf2f1 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java @@ -10,6 +10,7 @@ import java.nio.file.Paths; import java.util.Collection; import java.util.Collections; import java.util.LinkedList; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Predicate; @@ -86,9 +87,9 @@ public class Synchronization implements Closeable { uploadProcess.resume(); } - public synchronized void startDownload(Collection<String> files) throws IOException { + public synchronized CompletableFuture<?> startDownload(Collection<String> files) throws IOException { this.downloadProcess.setItems(files); - this.downloadProcess.start(); + return this.downloadProcess.start(); } public synchronized void stopDownload() throws IOException { @@ -117,7 +118,7 @@ public class Synchronization implements Closeable { workingDirectory.resolve(FILE_INDEX_TO_UPLOAD_FILENAME), name -> inputDirectory.resolve(name)) { @Override - protected Iterable<Path> getItems() throws IOException { + protected Collection<Path> getItems() throws IOException { try (DirectoryStream<Path> ds = Files.newDirectoryStream(inputDirectory, Synchronization.this::canUpload)) { return StreamSupport.stream(ds.spliterator(), false).collect(Collectors.toList()); @@ -166,7 +167,7 @@ public class Synchronization implements Closeable { } @Override - protected synchronized Iterable<String> getItems() throws IOException { + protected synchronized Collection<String> getItems() throws IOException { return items; } diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java index 2265e5122febbcee85cf79ffdb7a2b9627c4f937..6baaf057d5bce57d2aa916cc8d1ec772a5b4fa21 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java @@ -13,6 +13,7 @@ import java.io.Closeable; import java.io.FileNotFoundException; import java.io.FileWriter; import java.io.IOException; +import java.io.InputStream; import java.nio.file.Files; import java.nio.file.InvalidPathException; import java.nio.file.Path; @@ -26,6 +27,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Scanner; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.function.Function; @@ -33,8 +35,20 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; import cz.it4i.fiji.haas.HaaSOutputHolder; import cz.it4i.fiji.haas.HaaSOutputHolderImpl; @@ -69,6 +83,7 @@ public class BenchmarkJobManager implements Closeable { private JobState verifiedState; private boolean verifiedStateProcessed; private CompletableFuture<JobState> running; + private ProgressNotifier downloadNotifier; public BenchmarkJob(Job job) { this.job = job; @@ -78,7 +93,7 @@ public class BenchmarkJobManager implements Closeable { } public void setDownloadNotifier(Progress progress) { - job.setDownloadNotifier(convertTo(progress)); + job.setDownloadNotifier(downloadNotifier = convertTo(progress)); } public void setUploadNotifier(Progress progress) { @@ -86,7 +101,7 @@ public class BenchmarkJobManager implements Closeable { } public synchronized void startJob(Progress progress) throws IOException { - job.uploadFile(Constants.CONFIG_YAML, new P_ProgressNotifierAdapter(progress)); + job.uploadFile(Constants.CONFIG_YAML, new ProgressNotifierAdapter(progress)); LoadedYAML yaml = new LoadedYAML(job.openLocalFile(Constants.CONFIG_YAML)); verifiedState = null; @@ -120,12 +135,15 @@ public class BenchmarkJobManager implements Closeable { return result; } - public void startDownload() throws IOException { + public CompletableFuture<?> startDownload() throws IOException { if (job.getState() == JobState.Finished) { - String filePattern = job.getProperty(SPIM_OUTPUT_FILENAME_PATTERN); - job.startDownload(downloadFinishedData(filePattern)); + CompletableFuture<?> result = new CompletableFuture<Void>(); + startDownloadResults(result); + return result; } else if (job.getState() == JobState.Failed || job.getState() == JobState.Canceled) { - job.startDownload(downloadFailedData()); + return job.startDownload(downloadFailedData()); + } else { + return CompletableFuture.completedFuture(null); } } @@ -134,7 +152,7 @@ public class BenchmarkJobManager implements Closeable { } public void downloadStatistics(Progress progress) throws IOException { - job.download(BenchmarkJobManager.downloadStatistics(), new P_ProgressNotifierAdapter(progress)); + job.download(BenchmarkJobManager.downloadStatistics(), new ProgressNotifierAdapter(progress)); Path resultFile = job.getDirectory().resolve(BENCHMARK_RESULT_FILE); if (resultFile != null) BenchmarkJobManager.formatResultFile(resultFile); @@ -275,7 +293,7 @@ public class BenchmarkJobManager implements Closeable { } private ProgressNotifier convertTo(Progress progress) { - return progress == null ? null : new P_ProgressNotifierAdapter(progress); + return progress == null ? null : new ProgressNotifierAdapter(progress); } private synchronized CompletableFuture<JobState> doGetStateAsync(Executor executor) { @@ -396,6 +414,54 @@ public class BenchmarkJobManager implements Closeable { } } + private void startDownloadResults(CompletableFuture<?> result) throws IOException { + String mainFile = job.getProperty(SPIM_OUTPUT_FILENAME_PATTERN) + ".xml"; + final ProgressNotifierTemporarySwitchOff notifierSwitch = new ProgressNotifierTemporarySwitchOff(downloadNotifier, job); + + job.startDownload(downloadFileNameExtractDecorator(fileName->fileName.equals(mainFile))) + .whenComplete((X,e1)-> { + notifierSwitch.switchOn(); + if(e1 == null) { + Set<String> otherFiles = extractNames(getOutputDirectory().resolve(mainFile)); + try { + job.startDownload(downloadFileNameExtractDecorator(name -> otherFiles.contains(name))) + .whenComplete((X2,e2) -> { + result.complete(null); + if(e2 != null) { + log.error(e2.getMessage(), e2); + } + }); + } catch (IOException e) { + e1 = e; + } + } + if(e1 != null){ + log.error(e1.getMessage(), e1); + result.complete(null); + } + }); + } + + private Set<String> extractNames(Path resolve) { + Set<String> result = new HashSet<>(); + try(InputStream fileIS = Files.newInputStream(resolve)) { + DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder builder = builderFactory.newDocumentBuilder(); + Document xmlDocument = builder.parse(fileIS); + XPath xPath = XPathFactory.newInstance().newXPath(); + Node imageLoader = ((NodeList) xPath.evaluate("/SpimData/SequenceDescription/ImageLoader", xmlDocument, XPathConstants.NODESET)).item(0); + Node hdf5 = ((NodeList) xPath.evaluate("hdf5", imageLoader, XPathConstants.NODESET)).item(0); + result.add(hdf5.getTextContent()); + NodeList nl = (NodeList) xPath.evaluate("partition/path", imageLoader, XPathConstants.NODESET); + for(int i = 0; i < nl.getLength(); i++) { + result.add(nl.item(i).getTextContent()); + } + } catch (IOException | ParserConfigurationException | SAXException | XPathExpressionException e) { + log.error(e.getMessage(), e); + } + return result; + } + private void processOutput() { final String OUTPUT_PARSING_RULE = "rule "; @@ -599,17 +665,14 @@ public class BenchmarkJobManager implements Closeable { return new BenchmarkJob(job); } - - - private static Predicate<String> downloadFinishedData(String filePattern) { + private static Predicate<String> downloadFileNameExtractDecorator(Predicate<String> decorated) { return name -> { Path path = getPathSafely(name); if (path == null) return false; String fileName = path.getFileName().toString(); - return fileName.startsWith(filePattern) && fileName.endsWith("h5") || fileName.equals(filePattern + ".xml") - || fileName.equals(Constants.BENCHMARK_RESULT_FILE); + return decorated.test(fileName); }; } @@ -698,43 +761,4 @@ public class BenchmarkJobManager implements Closeable { }; } - private class P_ProgressNotifierAdapter implements ProgressNotifier { - private final Progress progress; - - public P_ProgressNotifierAdapter(Progress progress) { - this.progress = progress; - } - - @Override - public void setTitle(String title) { - progress.setTitle(title); - } - - @Override - public void setCount(int count, int total) { - progress.setCount(count, total); - } - - @Override - public void addItem(Object item) { - progress.addItem(item); - } - - @Override - public void setItemCount(int count, int total) { - progress.setItemCount(count, total); - } - - @Override - public void itemDone(Object item) { - progress.itemDone(item); - } - - @Override - public void done() { - progress.done(); - } - - } - } diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/ProgressNotifierAdapter.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/ProgressNotifierAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..ceeba8b9d5a9a72bbfcecdc93b69ae122cc95e06 --- /dev/null +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/ProgressNotifierAdapter.java @@ -0,0 +1,43 @@ +package cz.it4i.fiji.haas_spim_benchmark.core; + +import cz.it4i.fiji.haas_java_client.ProgressNotifier; +import net.imagej.updater.util.Progress; + +class ProgressNotifierAdapter implements ProgressNotifier { + private final Progress progress; + + public ProgressNotifierAdapter(Progress progress) { + this.progress = progress; + } + + @Override + public void setTitle(String title) { + progress.setTitle(title); + } + + @Override + public void setCount(int count, int total) { + progress.setCount(count, total); + } + + @Override + public void addItem(Object item) { + progress.addItem(item); + } + + @Override + public void setItemCount(int count, int total) { + progress.setItemCount(count, total); + } + + @Override + public void itemDone(Object item) { + progress.itemDone(item); + } + + @Override + public void done() { + progress.done(); + } + +} \ No newline at end of file diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/ProgressNotifierTemporarySwitchOff.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/ProgressNotifierTemporarySwitchOff.java new file mode 100644 index 0000000000000000000000000000000000000000..a1a1d275064bb4a169fc7906490a8387acd7e18f --- /dev/null +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/ProgressNotifierTemporarySwitchOff.java @@ -0,0 +1,26 @@ +package cz.it4i.fiji.haas_spim_benchmark.core; + +import cz.it4i.fiji.haas.Job; +import cz.it4i.fiji.haas.ui.DummyProgress; +import cz.it4i.fiji.haas_java_client.ProgressNotifier; + +public class ProgressNotifierTemporarySwitchOff { + + private final ProgressNotifier notifier; + private final Job job; + + public ProgressNotifierTemporarySwitchOff(ProgressNotifier downloadNotifier, Job job) { + this.notifier = downloadNotifier; + this.job = job; + if(this.notifier != null) { + job.setDownloadNotifier(new ProgressNotifierAdapter(new DummyProgress())); + } + } + + public void switchOn() { + if(notifier != null) { + this.job.setDownloadNotifier(notifier); + } + } + +}