Skip to content
Snippets Groups Projects
Commit 353e10fc authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

feat: limit download after job submit

parent c9094e1f
Branches
No related tags found
No related merge requests found
...@@ -29,7 +29,11 @@ import cz.it4i.fiji.haas_java_client.JobState; ...@@ -29,7 +29,11 @@ import cz.it4i.fiji.haas_java_client.JobState;
import cz.it4i.fiji.haas_java_client.ProgressNotifier; import cz.it4i.fiji.haas_java_client.ProgressNotifier;
import cz.it4i.fiji.haas_java_client.TransferFileProgressForHaaSClient; import cz.it4i.fiji.haas_java_client.TransferFileProgressForHaaSClient;
import net.imagej.updater.util.Progress; import net.imagej.updater.util.Progress;
/***
* TASK - napojit na UI
* @author koz01
*
*/
public class Job { public class Job {
private static final String JOB_NAME = "job.name"; private static final String JOB_NAME = "job.name";
...@@ -38,6 +42,10 @@ public class Job { ...@@ -38,6 +42,10 @@ public class Job {
private static final String JOB_INFO_FILENAME = ".jobinfo"; private static final String JOB_INFO_FILENAME = ".jobinfo";
private static final String JOB_NEEDS_DOWNLOAD = "job.needs_download";
private static final String JOB_CAN_BE_DOWNLOADED = "job.needs_download";
public static boolean isJobPath(Path p) { public static boolean isJobPath(Path p) {
return isValidPath(p); return isValidPath(p);
} }
...@@ -73,8 +81,10 @@ public class Job { ...@@ -73,8 +81,10 @@ public class Job {
setJobDirectory(jobDirectory); setJobDirectory(jobDirectory);
propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILENAME)); propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILENAME));
resumeUpload(); resumeUpload();
resumeDownload();
} }
private Job(JobManager4Job jobManager, Supplier<HaaSClient> haasClientSupplier) { private Job(JobManager4Job jobManager, Supplier<HaaSClient> haasClientSupplier) {
this.haasClientSupplier = haasClientSupplier; this.haasClientSupplier = haasClientSupplier;
this.jobManager = jobManager; this.jobManager = jobManager;
...@@ -101,15 +111,26 @@ public class Job { ...@@ -101,15 +111,26 @@ public class Job {
} }
public void startDownload(Predicate<String> predicate, Progress notifier) throws IOException { public void startDownload(Predicate<String> predicate, Progress notifier) throws IOException {
setProperty(JOB_NEEDS_DOWNLOAD, true);
Collection<String> files = getHaaSClient().getChangedFiles(jobId).stream().filter(predicate) Collection<String> files = getHaaSClient().getChangedFiles(jobId).stream().filter(predicate)
.collect(Collectors.toList()); .collect(Collectors.toList());
synchronization.startDownload(files); synchronization.startDownload(files);
} }
public boolean isUploading() { public void stopDownloadData() {
return Boolean.parseBoolean(getProperty(JOB_NEEDS_UPLOAD)); setProperty(JOB_NEEDS_DOWNLOAD, false);
try {
this.synchronization.stopUpload();
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
} }
public boolean canBeDownload() {
return Boolean.parseBoolean(getProperty(JOB_CAN_BE_DOWNLOADED));
}
public void uploadFile(String file, ProgressNotifier notifier) { public void uploadFile(String file, ProgressNotifier notifier) {
uploadFiles(Arrays.asList(file), notifier); uploadFiles(Arrays.asList(file), notifier);
} }
...@@ -144,8 +165,12 @@ public class Job { ...@@ -144,8 +165,12 @@ public class Job {
public void submit() { public void submit() {
HaaSClient client = getHaaSClient(); HaaSClient client = getHaaSClient();
client.submitJob(jobId); client.submitJob(jobId);
stopDownloadData();
setCanBeDownloaded(true);
} }
synchronized public long getId() { synchronized public long getId() {
if (jobId == null) { if (jobId == null) {
jobId = getJobId(jobDir); jobId = getJobId(jobDir);
...@@ -275,6 +300,9 @@ public class Job { ...@@ -275,6 +300,9 @@ public class Job {
() -> haasClientSupplier.get().startFileTransfer(getId(), HaaSClient.DUMMY_TRANSFER_FILE_PROGRESS), () -> haasClientSupplier.get().startFileTransfer(getId(), HaaSClient.DUMMY_TRANSFER_FILE_PROGRESS),
jobDir, Executors.newFixedThreadPool(2), () -> { jobDir, Executors.newFixedThreadPool(2), () -> {
setProperty(JOB_NEEDS_UPLOAD, false); setProperty(JOB_NEEDS_UPLOAD, false);
}, () -> {
setProperty(JOB_NEEDS_DOWNLOAD, false);
setCanBeDownloaded(false);
}); });
} catch (IOException e) { } catch (IOException e) {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
...@@ -287,6 +315,13 @@ public class Job { ...@@ -287,6 +315,13 @@ public class Job {
synchronization.resumeUpload(); synchronization.resumeUpload();
} }
} }
private synchronized void resumeDownload() {
if (Boolean.parseBoolean(getProperty(JOB_NEEDS_DOWNLOAD))) {
synchronization.resumeDownload();
}
}
private void setName(String name) { private void setName(String name) {
setProperty(JOB_NAME, name); setProperty(JOB_NAME, name);
...@@ -320,5 +355,9 @@ public class Job { ...@@ -320,5 +355,9 @@ public class Job {
private static long getJobId(Path path) { private static long getJobId(Path path) {
return Long.parseLong(path.getFileName().toString()); return Long.parseLong(path.getFileName().toString());
} }
private void setCanBeDownloaded(boolean b) {
setProperty(JOB_CAN_BE_DOWNLOADED, b);
}
} }
...@@ -43,13 +43,13 @@ public class Synchronization { ...@@ -43,13 +43,13 @@ public class Synchronization {
public Synchronization(Supplier<HaaSFileTransfer> fileTransferSupplier, Path workingDirectory, public Synchronization(Supplier<HaaSFileTransfer> fileTransferSupplier, Path workingDirectory,
ExecutorService service, Runnable uploadFinishedNotifier) throws IOException { ExecutorService service, Runnable uploadFinishedNotifier, Runnable downloadFinishedNotifier) throws IOException {
this.workingDirectory = workingDirectory; this.workingDirectory = workingDirectory;
this.filesDownloaded = new PersistentIndex<>(workingDirectory.resolve(FILE_INDEX_DOWNLOADED_FILENAME), this.filesDownloaded = new PersistentIndex<>(workingDirectory.resolve(FILE_INDEX_DOWNLOADED_FILENAME),
pathResolver); pathResolver);
this.uploadProcess = createUploadProcess(fileTransferSupplier, service, uploadFinishedNotifier); this.uploadProcess = createUploadProcess(fileTransferSupplier, service, uploadFinishedNotifier);
this.downloadProcess = createDownloadProcess(fileTransferSupplier, service, uploadFinishedNotifier); this.downloadProcess = createDownloadProcess(fileTransferSupplier, service, downloadFinishedNotifier);
} }
public synchronized void startUpload() throws IOException { public synchronized void startUpload() throws IOException {
...@@ -130,13 +130,13 @@ public class Synchronization { ...@@ -130,13 +130,13 @@ public class Synchronization {
@Override @Override
protected void processItem(HaaSFileTransfer tr, String file) { protected void processItem(HaaSFileTransfer tr, String file) {
tr.download(file, workingDirectory);
filesDownloaded.insert(workingDirectory.resolve(file)); filesDownloaded.insert(workingDirectory.resolve(file));
try { try {
filesDownloaded.storeToFile(); filesDownloaded.storeToFile();
} catch (IOException e) { } catch (IOException e) {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
} }
tr.download(file, workingDirectory);
} }
} }
......
...@@ -49,8 +49,7 @@ import net.imagej.updater.util.Progress; ...@@ -49,8 +49,7 @@ import net.imagej.updater.util.Progress;
public class BenchmarkJobManager { public class BenchmarkJobManager {
private static final String JOB_HAS_DATA_TO_DOWNLOAD_PROPERTY = "job.needDownload";
private static Logger log = LoggerFactory private static Logger log = LoggerFactory
.getLogger(cz.it4i.fiji.haas_spim_benchmark.core.BenchmarkJobManager.class); .getLogger(cz.it4i.fiji.haas_spim_benchmark.core.BenchmarkJobManager.class);
...@@ -82,7 +81,7 @@ public class BenchmarkJobManager { ...@@ -82,7 +81,7 @@ public class BenchmarkJobManager {
running = null; running = null;
job.submit(); job.submit();
job.setProperty(SPIM_OUTPUT_FILENAME_PATTERN, outputName); job.setProperty(SPIM_OUTPUT_FILENAME_PATTERN, outputName);
setDownloaded(false);
} }
public JobState getState() { public JobState getState() {
...@@ -97,10 +96,7 @@ public class BenchmarkJobManager { ...@@ -97,10 +96,7 @@ public class BenchmarkJobManager {
job.stopUploadData(); job.stopUploadData();
} }
public boolean isUploading() {
return job.isUploading();
}
public synchronized CompletableFuture<JobState> getStateAsync(Executor executor) { public synchronized CompletableFuture<JobState> getStateAsync(Executor executor) {
if (running != null) { if (running != null) {
return running; return running;
...@@ -112,15 +108,17 @@ public class BenchmarkJobManager { ...@@ -112,15 +108,17 @@ public class BenchmarkJobManager {
return result; return result;
} }
public void downloadData(Progress progress) throws IOException { public void startDownload(Progress progress) throws IOException {
if (job.getState() == JobState.Finished) { if (job.getState() == JobState.Finished) {
String filePattern = job.getProperty(SPIM_OUTPUT_FILENAME_PATTERN); String filePattern = job.getProperty(SPIM_OUTPUT_FILENAME_PATTERN);
job.download(downloadFinishedData(filePattern), new P_ProgressNotifierAdapter(progress)); job.startDownload(downloadFinishedData(filePattern) , progress);
} else if (job.getState() == JobState.Failed || job.getState() == JobState.Canceled) { } else if (job.getState() == JobState.Failed || job.getState() == JobState.Canceled) {
job.download(downloadFailedData(), new P_ProgressNotifierAdapter(progress)); job.startDownload(downloadFailedData(), progress);
} }
}
setDownloaded(true);
public boolean canBeDownloaded() {
return job.canBeDownload();
} }
public void downloadStatistics(Progress progress) throws IOException { public void downloadStatistics(Progress progress) throws IOException {
...@@ -159,10 +157,6 @@ public class BenchmarkJobManager { ...@@ -159,10 +157,6 @@ public class BenchmarkJobManager {
return false; return false;
} }
public boolean downloaded() {
return getDownloaded();
}
public void update() { public void update() {
job.updateInfo(); job.updateInfo();
} }
...@@ -407,15 +401,7 @@ public class BenchmarkJobManager { ...@@ -407,15 +401,7 @@ public class BenchmarkJobManager {
return Stream.concat(nonTaskSpecificErrors.stream(), taskSpecificErrors).collect(Collectors.toList()); return Stream.concat(nonTaskSpecificErrors.stream(), taskSpecificErrors).collect(Collectors.toList());
} }
private void setDownloaded(boolean b) {
job.setProperty(JOB_HAS_DATA_TO_DOWNLOAD_PROPERTY, b + "");
}
private boolean getDownloaded() {
String downloadedStr = job.getProperty(JOB_HAS_DATA_TO_DOWNLOAD_PROPERTY);
return downloadedStr != null && Boolean.parseBoolean(downloadedStr);
}
} }
public BenchmarkJobManager(BenchmarkSPIMParameters params) throws IOException { public BenchmarkJobManager(BenchmarkSPIMParameters params) throws IOException {
......
...@@ -112,10 +112,10 @@ public class BenchmarkSPIMController extends BorderPane implements CloseableCont ...@@ -112,10 +112,10 @@ public class BenchmarkSPIMController extends BorderPane implements CloseableCont
job -> JavaFXRoutines.notNullValue(job, j -> !EnumSet.of(JobState.Running).contains(j.getState()))); job -> JavaFXRoutines.notNullValue(job, j -> !EnumSet.of(JobState.Running).contains(j.getState())));
menu.addItem("Download result", menu.addItem("Download result",
job -> executeWSCallAsync("Downloading data", p -> job.getValue().downloadData(p)), job -> executeWSCallAsync("Downloading data", p -> job.getValue().startDownload(p)),
job -> JavaFXRoutines.notNullValue(job, job -> JavaFXRoutines.notNullValue(job,
j -> EnumSet.of(JobState.Failed, JobState.Finished, JobState.Canceled).contains(j.getState()) j -> EnumSet.of(JobState.Failed, JobState.Finished, JobState.Canceled).contains(j.getState())
&& !j.downloaded())); && !j.canBeDownloaded()));
menu.addItem("Download statistics", menu.addItem("Download statistics",
job -> executeWSCallAsync("Downloading data", p -> job.getValue().downloadStatistics(p)), job -> executeWSCallAsync("Downloading data", p -> job.getValue().downloadStatistics(p)),
job -> JavaFXRoutines.notNullValue(job, j -> j.getState() == JobState.Finished)); job -> JavaFXRoutines.notNullValue(job, j -> j.getState() == JobState.Finished));
......
...@@ -34,7 +34,7 @@ public class RunBenchmark { ...@@ -34,7 +34,7 @@ public class RunBenchmark {
if (state == JobState.Configuring) { if (state == JobState.Configuring) {
job.startJob(new DummyProgress()); job.startJob(new DummyProgress());
} else if (state != JobState.Running && state != JobState.Queued) { } else if (state != JobState.Running && state != JobState.Queued) {
job.downloadData(new DummyProgress()); job.startDownload(new DummyProgress());
} else if (state == JobState.Running) { } else if (state == JobState.Running) {
log.info(job.getSnakemakeOutput()); log.info(job.getSnakemakeOutput());
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment