diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java index 03c54732dfef3cb6a87132b66270fe7f6f3d9086..c065cc4fa8b7713c1eaaead0fa6fcaaa844d5a29 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java @@ -81,8 +81,6 @@ public class Job { this(jobManager, haasClientSupplier); setJobDirectory(jobDirectory); propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILENAME)); - resumeUpload(); - resumeDownload(); } @@ -129,6 +127,19 @@ public class Job { } } + public synchronized void resumeUpload() { + if (Boolean.parseBoolean(getProperty(JOB_NEEDS_UPLOAD))) { + synchronization.resumeUpload(); + } + } + + public synchronized void resumeDownload() { + if (Boolean.parseBoolean(getProperty(JOB_NEEDS_DOWNLOAD))) { + synchronization.resumeDownload(); + } + } + + public boolean canBeDownload() { return Boolean.parseBoolean(getProperty(JOB_CAN_BE_DOWNLOADED)); } @@ -295,6 +306,14 @@ public class Job { } } + public void setDownloadNotifier(ProgressNotifier notifier) { + synchronization.setDownloadNotifier(notifier); + } + + public void setUploadNotifier(ProgressNotifier notifier) { + synchronization.setUploadNotifier(notifier); + } + private void setJobDirectory(Path jobDirectory) { this.jobDir = jobDirectory; try { @@ -316,18 +335,7 @@ public class Job { return haasClientSupplier.get().startFileTransfer(getId(), progress); } - private synchronized void resumeUpload() { - if (Boolean.parseBoolean(getProperty(JOB_NEEDS_UPLOAD))) { - synchronization.resumeUpload(); - } - } - private synchronized void resumeDownload() { - if (Boolean.parseBoolean(getProperty(JOB_NEEDS_DOWNLOAD))) { - synchronization.resumeDownload(); - } - } - private void setName(String name) { setProperty(JOB_NAME, name); @@ -366,12 +374,4 @@ public class Job { setProperty(JOB_CAN_BE_DOWNLOADED, b); } - public void setDownloadNotifier(ProgressNotifier notifier) { - synchronization.setDownloadNotifier(notifier); - } - - public void setUploadNotifier(ProgressNotifier notifier) { - synchronization.setUploadNotifier(notifier); - } - } diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersitentSynchronizationProcess.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersitentSynchronizationProcess.java index bfa3b2c451b44814a85883533b080430b81a9769..ae3733923e7add9b26b2316934b2eb8fd6411cd8 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersitentSynchronizationProcess.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersitentSynchronizationProcess.java @@ -12,89 +12,86 @@ import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import cz.it4i.fiji.haas_java_client.HaaSClient; import cz.it4i.fiji.haas_java_client.HaaSFileTransfer; import cz.it4i.fiji.haas_java_client.ProgressNotifier; import cz.it4i.fiji.haas_java_client.TransferFileProgressForHaaSClient; -import cz.it4i.fiji.scpclient.TransferFileProgress; public abstract class PersitentSynchronizationProcess<T> { private boolean startFinished = true; - + public static final Logger log = LoggerFactory .getLogger(cz.it4i.fiji.haas.data_transfer.PersitentSynchronizationProcess.class); - private static final TransferFileProgress DUMMY_FILE_PROGRESS = new TransferFileProgress() { - @Override - public void dataTransfered(long bytesTransfered) { - } - }; - + private static final TransferFileProgressForHaaSClient DUMMY_FILE_PROGRESS = new TransferFileProgressForHaaSClient( + 0, HaaSClient.DUMMY_PROGRESS_NOTIFIER); + private PersistentIndex<T> index; - + private Queue<T> toProcessQueue = new LinkedBlockingQueue<T>(); - + private SimpleThreadRunner runner; private Supplier<HaaSFileTransfer> fileTransferSupplier; private Runnable processFinishedNotifier; - private String name; - private ProgressNotifier notifier; - - public PersitentSynchronizationProcess(String name,ExecutorService service, Supplier<HaaSFileTransfer> fileTransferSupplier, Runnable processFinishedNotifier, Path indexFile,Function<String,T> convertor) throws IOException { + + public PersitentSynchronizationProcess(ExecutorService service, + Supplier<HaaSFileTransfer> fileTransferSupplier, Runnable processFinishedNotifier, Path indexFile, + Function<String, T> convertor) throws IOException { runner = new SimpleThreadRunner(service); - this.name = name; this.fileTransferSupplier = fileTransferSupplier; this.processFinishedNotifier = processFinishedNotifier; this.index = new PersistentIndex<>(indexFile, convertor); } - + public synchronized void start() throws IOException { startFinished = false; index.clear(); - try{ + try { for (T item : getItems()) { index.insert(item); toProcessQueue.add(item); } - runner.runIfNotRunning(this::doProcess); + runner.runIfNotRunning(this::doProcess); } finally { startFinished = true; index.storeToFile(); } } - - public void stop() throws IOException { toProcessQueue.clear(); index.clear(); + notifier.setCount(-1, -1); } public void resume() { index.fillQueue(toProcessQueue); runner.runIfNotRunning(this::doProcess); } - + abstract protected Iterable<T> getItems() throws IOException; - + abstract protected void processItem(HaaSFileTransfer tr, T p); - + private void doProcess(AtomicBoolean reRun) { - try(HaaSFileTransfer tr = fileTransferSupplier.get()) { - tr.setProgress(getTransferFileProgress(tr)); + try (HaaSFileTransfer tr = fileTransferSupplier.get()) { + TransferFileProgressForHaaSClient notifier; + tr.setProgress(notifier = getTransferFileProgress(tr)); while (!toProcessQueue.isEmpty()) { T p = toProcessQueue.poll(); - - log.info(name + "ing: " + p); + String item = p.toString(); + notifier.addItem(item); processItem(tr, p); fileUploaded(p); - log.info(name + "ed: " + p); + notifier.itemDone(item); reRun.set(false); } + notifier.done(); } finally { synchronized (this) { if (startFinished) { @@ -119,12 +116,11 @@ public abstract class PersitentSynchronizationProcess<T> { this.notifier = notifier; } - private TransferFileProgress getTransferFileProgress(HaaSFileTransfer tr) { - if(notifier == null) { + private TransferFileProgressForHaaSClient getTransferFileProgress(HaaSFileTransfer tr) { + if (notifier == null) { return DUMMY_FILE_PROGRESS; } return new TransferFileProgressForHaaSClient(getTotalSize(toProcessQueue, tr), notifier); } - } diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java index fbb9bcebcc5ddcec3c11ad5a536bc7cf51c051e6..3a0011fafec5a618d3ed252f311d6f8bae43908b 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java @@ -58,7 +58,7 @@ public class Synchronization { } public void setDownloadNotifier(ProgressNotifier notifier) { - // TODO Auto-generated method stub + downloadProcess.setNotifier(notifier); } @@ -94,7 +94,7 @@ public class Synchronization { private PersitentSynchronizationProcess<Path> createUploadProcess(Supplier<HaaSFileTransfer> fileTransferSupplier, ExecutorService service, Runnable uploadFinishedNotifier) throws IOException { - return new PersitentSynchronizationProcess<Path>("upload", service, fileTransferSupplier, uploadFinishedNotifier, + return new PersitentSynchronizationProcess<Path>(service, fileTransferSupplier, uploadFinishedNotifier, workingDirectory.resolve(FILE_INDEX_TO_UPLOAD_FILENAME), pathResolver) { @Override @@ -138,7 +138,7 @@ public class Synchronization { public P_PersistentDownloadProcess(ExecutorService service, Supplier<HaaSFileTransfer> fileTransferSupplier, Runnable processFinishedNotifier) throws IOException { - super("download",service, fileTransferSupplier, processFinishedNotifier, + super(service, fileTransferSupplier, processFinishedNotifier, workingDirectory.resolve(FILE_INDEX_TO_DOWNLOAD_FILENAME), name -> name); } diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/TransferFileProgressForHaaSClient.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/TransferFileProgressForHaaSClient.java index d1b250c95c93a5bdd7e200d45000ad953abe380f..e6242cacef3055732764dd09b0495695dc711882 100644 --- a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/TransferFileProgressForHaaSClient.java +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/TransferFileProgressForHaaSClient.java @@ -57,4 +57,8 @@ public class TransferFileProgressForHaaSClient implements TransferFileProgress { } return result; } + + public void done() { + notifier.done(); + } } diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java index 67cc49a8ae221c53e90253f0ebf6f1a0fa2aebd7..854a70af1130a048e5d34aa9cd76d7d9b9299491 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java @@ -73,6 +73,14 @@ public class BenchmarkJobManager { computationAccessor = getComputationAccessor(); } + public void setDownloadNotifier(Progress progress) { + job.setDownloadNotifier(convertTo(progress)); + } + + public void setUploadNotifier(Progress progress) { + job.setUploadNotifier(convertTo(progress)); + } + public synchronized void startJob(Progress progress) throws IOException { job.uploadFile(Constants.CONFIG_YAML, new P_ProgressNotifierAdapter(progress)); String outputName = getOutputName(job.openLocalFile(Constants.CONFIG_YAML)); @@ -208,11 +216,20 @@ public class BenchmarkJobManager { return computationAccessor.getActualOutput(content); } + public void resumeTransfer() { + job.resumeDownload(); + job.resumeUpload(); + } + @Override public String toString() { return "" + getId(); } + private ProgressNotifier convertTo(Progress progress) { + return progress == null ? null : new P_ProgressNotifierAdapter(progress); + } + private synchronized CompletableFuture<JobState> doGetStateAsync(Executor executor) { JobState state = job.getState(); if (state != JobState.Finished) { @@ -400,23 +417,6 @@ public class BenchmarkJobManager { Stream<BenchmarkError> taskSpecificErrors = tasks.stream().flatMap(s -> s.getErrors().stream()); return Stream.concat(nonTaskSpecificErrors.stream(), taskSpecificErrors).collect(Collectors.toList()); } - - public void setDownloadNotifier(Progress progress) { - if(progress == null) { - job.setDownloadNotifier(null); - } else { - job.setDownloadNotifier(new P_ProgressNotifierAdapter(progress)); - } - } - - - public void setUploadNotifier(Progress downloadProgress) { - if(downloadProgress == null) { - job.setUploadNotifier(null); - } else { - job.setUploadNotifier(new P_ProgressNotifierAdapter(downloadProgress)); - } - } } public BenchmarkJobManager(BenchmarkSPIMParameters params) throws IOException { diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/UpdatableBenchmarkJob.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/UpdatableBenchmarkJob.java index 510134fb54b5baa1f61ef7e29d95b3488686c541..2e4a1aa1f0900d8b6fc4b261582336816a74e5ef 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/UpdatableBenchmarkJob.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/UpdatableBenchmarkJob.java @@ -14,13 +14,25 @@ public class UpdatableBenchmarkJob extends UpdatableObservableValue<BenchmarkJob public static final Logger log = LoggerFactory .getLogger(cz.it4i.fiji.haas_spim_benchmark.core.UpdatableBenchmarkJob.class); - private Progress downloadProgress = new P_DownloadProgress(); + private P_DownloadProgress downloadProgress = new P_DownloadProgress(); + + public interface TransferProgress { + + public Long getRemainingSeconds(); + + public boolean isDownloaded(); + + public boolean isDonwloadind(); + + public Float getRemainingPercents(); + } public UpdatableBenchmarkJob(BenchmarkJob wrapped, Function<BenchmarkJob, UpdateStatus> updateFunction, Function<BenchmarkJob, Object> stateProvider) { super(wrapped, updateFunction, stateProvider); wrapped.setDownloadNotifier(downloadProgress); + wrapped.resumeTransfer(); } public void removed() { @@ -28,47 +40,83 @@ public class UpdatableBenchmarkJob extends UpdatableObservableValue<BenchmarkJob } - private class P_DownloadProgress implements Progress { + + private class P_DownloadProgress implements Progress, TransferProgress { + private boolean downloading; + private boolean downloaded; + private long start; + private Long remainingSeconds; + private Float remainingPercents; + @Override public void setTitle(String title) { - // TODO Auto-generated method stub - } @Override - public void setCount(int count, int total) { - log.info("setCount count=" + count + ", total:" + total); + public synchronized void setCount(int count, int total) { + + if(total < -1) { + downloading = false; + remainingSeconds = null; + remainingPercents = null; + } else { + long delta = System.currentTimeMillis() - start; + remainingSeconds = (long) ((double) delta / count * (total - count)) / 1000; + remainingPercents = (((float)total - count) / total * 100); + } + fireValueChangedEvent(); } @Override public void addItem(Object item) { - // TODO Auto-generated method stub - + if (!downloading) { + downloaded = false; + downloading = true; + start = System.currentTimeMillis(); + } + fireValueChangedEvent(); + } + + @Override + public void done() { + if (downloading) { + downloaded = true; + } + downloading = false; + remainingSeconds = 0l; + remainingPercents = 0.f; + fireValueChangedEvent(); } @Override - public void setItemCount(int count, int total) { - // TODO Auto-generated method stub - + public boolean isDownloaded() { + return downloaded; } @Override - public void itemDone(Object item) { - // TODO Auto-generated method stub - + public boolean isDonwloadind() { + return downloading; } @Override - public void done() { - // TODO Auto-generated method stub - + public Long getRemainingSeconds() { + return remainingSeconds; } - - } + @Override + public Float getRemainingPercents() { + return remainingPercents; + } + + @Override + public void setItemCount(int count, int total) { + } + @Override + public void itemDone(Object item) { + } - + } }