From 353e10fc25a28206220f1fe594ddbffef2f4d911 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Ko=C5=BEusznik?= <jan@kozusznik.cz> Date: Thu, 26 Apr 2018 17:22:33 +0200 Subject: [PATCH] feat: limit download after job submit --- .../src/main/java/cz/it4i/fiji/haas/Job.java | 49 +++++++++++++++++-- .../haas/data_transfer/Synchronization.java | 6 +-- .../core/BenchmarkJobManager.java | 36 +++++--------- .../ui/BenchmarkSPIMController.java | 4 +- .../java/cz/it4i/fiji/haas/RunBenchmark.java | 2 +- 5 files changed, 61 insertions(+), 36 deletions(-) diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java index 1135b367..99f09004 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java @@ -29,7 +29,11 @@ import cz.it4i.fiji.haas_java_client.JobState; import cz.it4i.fiji.haas_java_client.ProgressNotifier; import cz.it4i.fiji.haas_java_client.TransferFileProgressForHaaSClient; import net.imagej.updater.util.Progress; - +/*** + * TASK - napojit na UI + * @author koz01 + * + */ public class Job { private static final String JOB_NAME = "job.name"; @@ -38,6 +42,10 @@ public class Job { private static final String JOB_INFO_FILENAME = ".jobinfo"; + private static final String JOB_NEEDS_DOWNLOAD = "job.needs_download"; + + private static final String JOB_CAN_BE_DOWNLOADED = "job.needs_download"; + public static boolean isJobPath(Path p) { return isValidPath(p); } @@ -73,8 +81,10 @@ public class Job { setJobDirectory(jobDirectory); propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILENAME)); resumeUpload(); + resumeDownload(); } + private Job(JobManager4Job jobManager, Supplier<HaaSClient> haasClientSupplier) { this.haasClientSupplier = haasClientSupplier; this.jobManager = jobManager; @@ -101,15 +111,26 @@ public class Job { } public void startDownload(Predicate<String> predicate, Progress notifier) throws IOException { + setProperty(JOB_NEEDS_DOWNLOAD, true); Collection<String> files = getHaaSClient().getChangedFiles(jobId).stream().filter(predicate) .collect(Collectors.toList()); synchronization.startDownload(files); } - - public boolean isUploading() { - return Boolean.parseBoolean(getProperty(JOB_NEEDS_UPLOAD)); + + public void stopDownloadData() { + setProperty(JOB_NEEDS_DOWNLOAD, false); + try { + this.synchronization.stopUpload(); + } catch (IOException e) { + log.error(e.getMessage(), e); + throw new RuntimeException(e); + } } - + + public boolean canBeDownload() { + return Boolean.parseBoolean(getProperty(JOB_CAN_BE_DOWNLOADED)); + } + public void uploadFile(String file, ProgressNotifier notifier) { uploadFiles(Arrays.asList(file), notifier); } @@ -144,8 +165,12 @@ public class Job { public void submit() { HaaSClient client = getHaaSClient(); client.submitJob(jobId); + stopDownloadData(); + setCanBeDownloaded(true); } + + synchronized public long getId() { if (jobId == null) { jobId = getJobId(jobDir); @@ -275,6 +300,9 @@ public class Job { () -> haasClientSupplier.get().startFileTransfer(getId(), HaaSClient.DUMMY_TRANSFER_FILE_PROGRESS), jobDir, Executors.newFixedThreadPool(2), () -> { setProperty(JOB_NEEDS_UPLOAD, false); + }, () -> { + setProperty(JOB_NEEDS_DOWNLOAD, false); + setCanBeDownloaded(false); }); } catch (IOException e) { log.error(e.getMessage(), e); @@ -287,6 +315,13 @@ public class Job { synchronization.resumeUpload(); } } + + private synchronized void resumeDownload() { + if (Boolean.parseBoolean(getProperty(JOB_NEEDS_DOWNLOAD))) { + synchronization.resumeDownload(); + } + } + private void setName(String name) { setProperty(JOB_NAME, name); @@ -320,5 +355,9 @@ public class Job { private static long getJobId(Path path) { return Long.parseLong(path.getFileName().toString()); } + + private void setCanBeDownloaded(boolean b) { + setProperty(JOB_CAN_BE_DOWNLOADED, b); + } } diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java index dd0de6ac..7f9ae99d 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java @@ -43,13 +43,13 @@ public class Synchronization { public Synchronization(Supplier<HaaSFileTransfer> fileTransferSupplier, Path workingDirectory, - ExecutorService service, Runnable uploadFinishedNotifier) throws IOException { + ExecutorService service, Runnable uploadFinishedNotifier, Runnable downloadFinishedNotifier) throws IOException { this.workingDirectory = workingDirectory; this.filesDownloaded = new PersistentIndex<>(workingDirectory.resolve(FILE_INDEX_DOWNLOADED_FILENAME), pathResolver); this.uploadProcess = createUploadProcess(fileTransferSupplier, service, uploadFinishedNotifier); - this.downloadProcess = createDownloadProcess(fileTransferSupplier, service, uploadFinishedNotifier); + this.downloadProcess = createDownloadProcess(fileTransferSupplier, service, downloadFinishedNotifier); } public synchronized void startUpload() throws IOException { @@ -130,13 +130,13 @@ public class Synchronization { @Override protected void processItem(HaaSFileTransfer tr, String file) { - tr.download(file, workingDirectory); filesDownloaded.insert(workingDirectory.resolve(file)); try { filesDownloaded.storeToFile(); } catch (IOException e) { log.error(e.getMessage(), e); } + tr.download(file, workingDirectory); } } diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java index 2dd3c31e..2741aa41 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java @@ -49,8 +49,7 @@ import net.imagej.updater.util.Progress; public class BenchmarkJobManager { - private static final String JOB_HAS_DATA_TO_DOWNLOAD_PROPERTY = "job.needDownload"; - + private static Logger log = LoggerFactory .getLogger(cz.it4i.fiji.haas_spim_benchmark.core.BenchmarkJobManager.class); @@ -82,7 +81,7 @@ public class BenchmarkJobManager { running = null; job.submit(); job.setProperty(SPIM_OUTPUT_FILENAME_PATTERN, outputName); - setDownloaded(false); + } public JobState getState() { @@ -97,10 +96,7 @@ public class BenchmarkJobManager { job.stopUploadData(); } - public boolean isUploading() { - return job.isUploading(); - } - + public synchronized CompletableFuture<JobState> getStateAsync(Executor executor) { if (running != null) { return running; @@ -112,15 +108,17 @@ public class BenchmarkJobManager { return result; } - public void downloadData(Progress progress) throws IOException { + public void startDownload(Progress progress) throws IOException { if (job.getState() == JobState.Finished) { String filePattern = job.getProperty(SPIM_OUTPUT_FILENAME_PATTERN); - job.download(downloadFinishedData(filePattern), new P_ProgressNotifierAdapter(progress)); + job.startDownload(downloadFinishedData(filePattern) , progress); } else if (job.getState() == JobState.Failed || job.getState() == JobState.Canceled) { - job.download(downloadFailedData(), new P_ProgressNotifierAdapter(progress)); + job.startDownload(downloadFailedData(), progress); } - - setDownloaded(true); + } + + public boolean canBeDownloaded() { + return job.canBeDownload(); } public void downloadStatistics(Progress progress) throws IOException { @@ -159,10 +157,6 @@ public class BenchmarkJobManager { return false; } - public boolean downloaded() { - return getDownloaded(); - } - public void update() { job.updateInfo(); } @@ -407,15 +401,7 @@ public class BenchmarkJobManager { return Stream.concat(nonTaskSpecificErrors.stream(), taskSpecificErrors).collect(Collectors.toList()); } - private void setDownloaded(boolean b) { - job.setProperty(JOB_HAS_DATA_TO_DOWNLOAD_PROPERTY, b + ""); - } - - private boolean getDownloaded() { - String downloadedStr = job.getProperty(JOB_HAS_DATA_TO_DOWNLOAD_PROPERTY); - return downloadedStr != null && Boolean.parseBoolean(downloadedStr); - } - + } public BenchmarkJobManager(BenchmarkSPIMParameters params) throws IOException { diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java index a8dc9929..0423b261 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java @@ -112,10 +112,10 @@ public class BenchmarkSPIMController extends BorderPane implements CloseableCont job -> JavaFXRoutines.notNullValue(job, j -> !EnumSet.of(JobState.Running).contains(j.getState()))); menu.addItem("Download result", - job -> executeWSCallAsync("Downloading data", p -> job.getValue().downloadData(p)), + job -> executeWSCallAsync("Downloading data", p -> job.getValue().startDownload(p)), job -> JavaFXRoutines.notNullValue(job, j -> EnumSet.of(JobState.Failed, JobState.Finished, JobState.Canceled).contains(j.getState()) - && !j.downloaded())); + && !j.canBeDownloaded())); menu.addItem("Download statistics", job -> executeWSCallAsync("Downloading data", p -> job.getValue().downloadStatistics(p)), job -> JavaFXRoutines.notNullValue(job, j -> j.getState() == JobState.Finished)); diff --git a/haas-spim-benchmark/src/test/java/cz/it4i/fiji/haas/RunBenchmark.java b/haas-spim-benchmark/src/test/java/cz/it4i/fiji/haas/RunBenchmark.java index 9e938f94..1e6f8cfa 100644 --- a/haas-spim-benchmark/src/test/java/cz/it4i/fiji/haas/RunBenchmark.java +++ b/haas-spim-benchmark/src/test/java/cz/it4i/fiji/haas/RunBenchmark.java @@ -34,7 +34,7 @@ public class RunBenchmark { if (state == JobState.Configuring) { job.startJob(new DummyProgress()); } else if (state != JobState.Running && state != JobState.Queued) { - job.downloadData(new DummyProgress()); + job.startDownload(new DummyProgress()); } else if (state == JobState.Running) { log.info(job.getSnakemakeOutput()); } -- GitLab