Skip to content
Snippets Groups Projects
Commit 7d306eff authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

feature: download result files base on content of xml file

parent 7dbf4432
No related branches found
No related tags found
No related merge requests found
...@@ -12,6 +12,7 @@ import java.util.Collection; ...@@ -12,6 +12,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.function.Supplier; import java.util.function.Supplier;
...@@ -140,11 +141,14 @@ public class Job { ...@@ -140,11 +141,14 @@ public class Job {
} }
} }
public void startDownload(Predicate<String> predicate) throws IOException { public CompletableFuture<?> startDownload(Predicate<String> predicate) throws IOException {
setProperty(JOB_NEEDS_DOWNLOAD, true);
Collection<String> files = getHaaSClient().getChangedFiles(jobId).stream().filter(predicate) Collection<String> files = getHaaSClient().getChangedFiles(jobId).stream().filter(predicate)
.collect(Collectors.toList()); .collect(Collectors.toList());
synchronization.startDownload(files); if(files.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
setProperty(JOB_NEEDS_DOWNLOAD, true);
return synchronization.startDownload(files);
} }
public void stopDownloadData() { public void stopDownloadData() {
......
...@@ -7,6 +7,7 @@ import java.util.Collections; ...@@ -7,6 +7,7 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
...@@ -58,7 +59,7 @@ public abstract class PersistentSynchronizationProcess<T> { ...@@ -58,7 +59,7 @@ public abstract class PersistentSynchronizationProcess<T> {
this.index = new PersistentIndex<>(indexFile, convertor); this.index = new PersistentIndex<>(indexFile, convertor);
} }
public synchronized void start() throws IOException { public synchronized CompletableFuture<?> start() throws IOException {
startFinished = false; startFinished = false;
index.clear(); index.clear();
try { try {
...@@ -66,7 +67,7 @@ public abstract class PersistentSynchronizationProcess<T> { ...@@ -66,7 +67,7 @@ public abstract class PersistentSynchronizationProcess<T> {
index.insert(item); index.insert(item);
toProcessQueue.add(item); toProcessQueue.add(item);
} }
runner.runIfNotRunning(this::doProcess); return runner.runIfNotRunning(this::doProcess);
} finally { } finally {
startFinished = true; startFinished = true;
index.storeToWorkingFile(); index.storeToWorkingFile();
...@@ -115,7 +116,13 @@ public abstract class PersistentSynchronizationProcess<T> { ...@@ -115,7 +116,13 @@ public abstract class PersistentSynchronizationProcess<T> {
} }
this.notifier.itemDone(INIT_TRANSFER_ITEM); this.notifier.itemDone(INIT_TRANSFER_ITEM);
this.notifier.done(); this.notifier.done();
while (!interrupted && !toProcessQueue.isEmpty()) { do {
synchronized (reRun) {
if(interrupted || toProcessQueue.isEmpty()) {
reRun.set(false);
break;
}
}
T p = toProcessQueue.poll(); T p = toProcessQueue.poll();
String item = p.toString(); String item = p.toString();
notifier.addItem(item); notifier.addItem(item);
...@@ -127,8 +134,7 @@ public abstract class PersistentSynchronizationProcess<T> { ...@@ -127,8 +134,7 @@ public abstract class PersistentSynchronizationProcess<T> {
interrupted = true; interrupted = true;
} }
notifier.itemDone(item); notifier.itemDone(item);
reRun.set(false); } while(true);
}
} finally { } finally {
runningTransferThreads.remove(Thread.currentThread()); runningTransferThreads.remove(Thread.currentThread());
synchronized (this) { synchronized (this) {
......
package cz.it4i.fiji.haas.data_transfer; package cz.it4i.fiji.haas.data_transfer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer; import java.util.function.Consumer;
...@@ -7,22 +8,23 @@ import java.util.function.Consumer; ...@@ -7,22 +8,23 @@ import java.util.function.Consumer;
public class SimpleThreadRunner { public class SimpleThreadRunner {
private final ExecutorService service; private final ExecutorService service;
private final AtomicBoolean reRun = new AtomicBoolean(false); private final AtomicBoolean reRun = new AtomicBoolean(false);
private CompletableFuture<?> lastRun;
public SimpleThreadRunner(ExecutorService service) { public SimpleThreadRunner(ExecutorService service) {
this.service = service; this.service = service;
} }
public void runIfNotRunning(Consumer<AtomicBoolean> r) { synchronized public CompletableFuture<?> runIfNotRunning(Consumer<AtomicBoolean> r) {
synchronized (this) { synchronized (reRun) {
if (reRun.get()) { if (reRun.get()) {
return; return lastRun;
} }
reRun.set(true); reRun.set(true);
} }
service.execute(() -> { return lastRun = CompletableFuture.runAsync(() -> {
do { do {
r.accept(reRun); r.accept(reRun);
} while (reRun.get()); } while (reRun.get());
}); }, service);
} }
} }
...@@ -10,6 +10,7 @@ import java.nio.file.Paths; ...@@ -10,6 +10,7 @@ import java.nio.file.Paths;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.function.Predicate; import java.util.function.Predicate;
...@@ -86,9 +87,9 @@ public class Synchronization implements Closeable { ...@@ -86,9 +87,9 @@ public class Synchronization implements Closeable {
uploadProcess.resume(); uploadProcess.resume();
} }
public synchronized void startDownload(Collection<String> files) throws IOException { public synchronized CompletableFuture<?> startDownload(Collection<String> files) throws IOException {
this.downloadProcess.setItems(files); this.downloadProcess.setItems(files);
this.downloadProcess.start(); return this.downloadProcess.start();
} }
public synchronized void stopDownload() throws IOException { public synchronized void stopDownload() throws IOException {
...@@ -117,7 +118,7 @@ public class Synchronization implements Closeable { ...@@ -117,7 +118,7 @@ public class Synchronization implements Closeable {
workingDirectory.resolve(FILE_INDEX_TO_UPLOAD_FILENAME), name -> inputDirectory.resolve(name)) { workingDirectory.resolve(FILE_INDEX_TO_UPLOAD_FILENAME), name -> inputDirectory.resolve(name)) {
@Override @Override
protected Iterable<Path> getItems() throws IOException { protected Collection<Path> getItems() throws IOException {
try (DirectoryStream<Path> ds = Files.newDirectoryStream(inputDirectory, try (DirectoryStream<Path> ds = Files.newDirectoryStream(inputDirectory,
Synchronization.this::canUpload)) { Synchronization.this::canUpload)) {
return StreamSupport.stream(ds.spliterator(), false).collect(Collectors.toList()); return StreamSupport.stream(ds.spliterator(), false).collect(Collectors.toList());
...@@ -166,7 +167,7 @@ public class Synchronization implements Closeable { ...@@ -166,7 +167,7 @@ public class Synchronization implements Closeable {
} }
@Override @Override
protected synchronized Iterable<String> getItems() throws IOException { protected synchronized Collection<String> getItems() throws IOException {
return items; return items;
} }
......
...@@ -13,6 +13,7 @@ import java.io.Closeable; ...@@ -13,6 +13,7 @@ import java.io.Closeable;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileWriter; import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.InvalidPathException; import java.nio.file.InvalidPathException;
import java.nio.file.Path; import java.nio.file.Path;
...@@ -26,6 +27,7 @@ import java.util.HashSet; ...@@ -26,6 +27,7 @@ import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Scanner; import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.function.Function; import java.util.function.Function;
...@@ -33,8 +35,20 @@ import java.util.function.Predicate; ...@@ -33,8 +35,20 @@ import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
import cz.it4i.fiji.haas.HaaSOutputHolder; import cz.it4i.fiji.haas.HaaSOutputHolder;
import cz.it4i.fiji.haas.HaaSOutputHolderImpl; import cz.it4i.fiji.haas.HaaSOutputHolderImpl;
...@@ -69,6 +83,7 @@ public class BenchmarkJobManager implements Closeable { ...@@ -69,6 +83,7 @@ public class BenchmarkJobManager implements Closeable {
private JobState verifiedState; private JobState verifiedState;
private boolean verifiedStateProcessed; private boolean verifiedStateProcessed;
private CompletableFuture<JobState> running; private CompletableFuture<JobState> running;
private ProgressNotifier downloadNotifier;
public BenchmarkJob(Job job) { public BenchmarkJob(Job job) {
this.job = job; this.job = job;
...@@ -78,7 +93,7 @@ public class BenchmarkJobManager implements Closeable { ...@@ -78,7 +93,7 @@ public class BenchmarkJobManager implements Closeable {
} }
public void setDownloadNotifier(Progress progress) { public void setDownloadNotifier(Progress progress) {
job.setDownloadNotifier(convertTo(progress)); job.setDownloadNotifier(downloadNotifier = convertTo(progress));
} }
public void setUploadNotifier(Progress progress) { public void setUploadNotifier(Progress progress) {
...@@ -86,7 +101,7 @@ public class BenchmarkJobManager implements Closeable { ...@@ -86,7 +101,7 @@ public class BenchmarkJobManager implements Closeable {
} }
public synchronized void startJob(Progress progress) throws IOException { public synchronized void startJob(Progress progress) throws IOException {
job.uploadFile(Constants.CONFIG_YAML, new P_ProgressNotifierAdapter(progress)); job.uploadFile(Constants.CONFIG_YAML, new ProgressNotifierAdapter(progress));
LoadedYAML yaml = new LoadedYAML(job.openLocalFile(Constants.CONFIG_YAML)); LoadedYAML yaml = new LoadedYAML(job.openLocalFile(Constants.CONFIG_YAML));
verifiedState = null; verifiedState = null;
...@@ -120,12 +135,15 @@ public class BenchmarkJobManager implements Closeable { ...@@ -120,12 +135,15 @@ public class BenchmarkJobManager implements Closeable {
return result; return result;
} }
public void startDownload() throws IOException { public CompletableFuture<?> startDownload() throws IOException {
if (job.getState() == JobState.Finished) { if (job.getState() == JobState.Finished) {
String filePattern = job.getProperty(SPIM_OUTPUT_FILENAME_PATTERN); CompletableFuture<?> result = new CompletableFuture<Void>();
job.startDownload(downloadFinishedData(filePattern)); startDownloadResults(result);
return result;
} else if (job.getState() == JobState.Failed || job.getState() == JobState.Canceled) { } else if (job.getState() == JobState.Failed || job.getState() == JobState.Canceled) {
job.startDownload(downloadFailedData()); return job.startDownload(downloadFailedData());
} else {
return CompletableFuture.completedFuture(null);
} }
} }
...@@ -134,7 +152,7 @@ public class BenchmarkJobManager implements Closeable { ...@@ -134,7 +152,7 @@ public class BenchmarkJobManager implements Closeable {
} }
public void downloadStatistics(Progress progress) throws IOException { public void downloadStatistics(Progress progress) throws IOException {
job.download(BenchmarkJobManager.downloadStatistics(), new P_ProgressNotifierAdapter(progress)); job.download(BenchmarkJobManager.downloadStatistics(), new ProgressNotifierAdapter(progress));
Path resultFile = job.getDirectory().resolve(BENCHMARK_RESULT_FILE); Path resultFile = job.getDirectory().resolve(BENCHMARK_RESULT_FILE);
if (resultFile != null) if (resultFile != null)
BenchmarkJobManager.formatResultFile(resultFile); BenchmarkJobManager.formatResultFile(resultFile);
...@@ -275,7 +293,7 @@ public class BenchmarkJobManager implements Closeable { ...@@ -275,7 +293,7 @@ public class BenchmarkJobManager implements Closeable {
} }
private ProgressNotifier convertTo(Progress progress) { private ProgressNotifier convertTo(Progress progress) {
return progress == null ? null : new P_ProgressNotifierAdapter(progress); return progress == null ? null : new ProgressNotifierAdapter(progress);
} }
private synchronized CompletableFuture<JobState> doGetStateAsync(Executor executor) { private synchronized CompletableFuture<JobState> doGetStateAsync(Executor executor) {
...@@ -396,6 +414,54 @@ public class BenchmarkJobManager implements Closeable { ...@@ -396,6 +414,54 @@ public class BenchmarkJobManager implements Closeable {
} }
} }
private void startDownloadResults(CompletableFuture<?> result) throws IOException {
String mainFile = job.getProperty(SPIM_OUTPUT_FILENAME_PATTERN) + ".xml";
final ProgressNotifierTemporarySwitchOff notifierSwitch = new ProgressNotifierTemporarySwitchOff(downloadNotifier, job);
job.startDownload(downloadFileNameExtractDecorator(fileName->fileName.equals(mainFile)))
.whenComplete((X,e1)-> {
notifierSwitch.switchOn();
if(e1 == null) {
Set<String> otherFiles = extractNames(getOutputDirectory().resolve(mainFile));
try {
job.startDownload(downloadFileNameExtractDecorator(name -> otherFiles.contains(name)))
.whenComplete((X2,e2) -> {
result.complete(null);
if(e2 != null) {
log.error(e2.getMessage(), e2);
}
});
} catch (IOException e) {
e1 = e;
}
}
if(e1 != null){
log.error(e1.getMessage(), e1);
result.complete(null);
}
});
}
private Set<String> extractNames(Path resolve) {
Set<String> result = new HashSet<>();
try(InputStream fileIS = Files.newInputStream(resolve)) {
DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = builderFactory.newDocumentBuilder();
Document xmlDocument = builder.parse(fileIS);
XPath xPath = XPathFactory.newInstance().newXPath();
Node imageLoader = ((NodeList) xPath.evaluate("/SpimData/SequenceDescription/ImageLoader", xmlDocument, XPathConstants.NODESET)).item(0);
Node hdf5 = ((NodeList) xPath.evaluate("hdf5", imageLoader, XPathConstants.NODESET)).item(0);
result.add(hdf5.getTextContent());
NodeList nl = (NodeList) xPath.evaluate("partition/path", imageLoader, XPathConstants.NODESET);
for(int i = 0; i < nl.getLength(); i++) {
result.add(nl.item(i).getTextContent());
}
} catch (IOException | ParserConfigurationException | SAXException | XPathExpressionException e) {
log.error(e.getMessage(), e);
}
return result;
}
private void processOutput() { private void processOutput() {
final String OUTPUT_PARSING_RULE = "rule "; final String OUTPUT_PARSING_RULE = "rule ";
...@@ -599,17 +665,14 @@ public class BenchmarkJobManager implements Closeable { ...@@ -599,17 +665,14 @@ public class BenchmarkJobManager implements Closeable {
return new BenchmarkJob(job); return new BenchmarkJob(job);
} }
private static Predicate<String> downloadFileNameExtractDecorator(Predicate<String> decorated) {
private static Predicate<String> downloadFinishedData(String filePattern) {
return name -> { return name -> {
Path path = getPathSafely(name); Path path = getPathSafely(name);
if (path == null) if (path == null)
return false; return false;
String fileName = path.getFileName().toString(); String fileName = path.getFileName().toString();
return fileName.startsWith(filePattern) && fileName.endsWith("h5") || fileName.equals(filePattern + ".xml") return decorated.test(fileName);
|| fileName.equals(Constants.BENCHMARK_RESULT_FILE);
}; };
} }
...@@ -698,43 +761,4 @@ public class BenchmarkJobManager implements Closeable { ...@@ -698,43 +761,4 @@ public class BenchmarkJobManager implements Closeable {
}; };
} }
private class P_ProgressNotifierAdapter implements ProgressNotifier {
private final Progress progress;
public P_ProgressNotifierAdapter(Progress progress) {
this.progress = progress;
}
@Override
public void setTitle(String title) {
progress.setTitle(title);
}
@Override
public void setCount(int count, int total) {
progress.setCount(count, total);
}
@Override
public void addItem(Object item) {
progress.addItem(item);
}
@Override
public void setItemCount(int count, int total) {
progress.setItemCount(count, total);
}
@Override
public void itemDone(Object item) {
progress.itemDone(item);
}
@Override
public void done() {
progress.done();
}
}
} }
package cz.it4i.fiji.haas_spim_benchmark.core;
import cz.it4i.fiji.haas_java_client.ProgressNotifier;
import net.imagej.updater.util.Progress;
class ProgressNotifierAdapter implements ProgressNotifier {
private final Progress progress;
public ProgressNotifierAdapter(Progress progress) {
this.progress = progress;
}
@Override
public void setTitle(String title) {
progress.setTitle(title);
}
@Override
public void setCount(int count, int total) {
progress.setCount(count, total);
}
@Override
public void addItem(Object item) {
progress.addItem(item);
}
@Override
public void setItemCount(int count, int total) {
progress.setItemCount(count, total);
}
@Override
public void itemDone(Object item) {
progress.itemDone(item);
}
@Override
public void done() {
progress.done();
}
}
\ No newline at end of file
package cz.it4i.fiji.haas_spim_benchmark.core;
import cz.it4i.fiji.haas.Job;
import cz.it4i.fiji.haas.ui.DummyProgress;
import cz.it4i.fiji.haas_java_client.ProgressNotifier;
public class ProgressNotifierTemporarySwitchOff {
private final ProgressNotifier notifier;
private final Job job;
public ProgressNotifierTemporarySwitchOff(ProgressNotifier downloadNotifier, Job job) {
this.notifier = downloadNotifier;
this.job = job;
if(this.notifier != null) {
job.setDownloadNotifier(new ProgressNotifierAdapter(new DummyProgress()));
}
}
public void switchOn() {
if(notifier != null) {
this.job.setDownloadNotifier(notifier);
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment