diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java index 99f09004698a6aa93af4aff40c66c8262f1fd998..03c54732dfef3cb6a87132b66270fe7f6f3d9086 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java @@ -28,7 +28,7 @@ import cz.it4i.fiji.haas_java_client.JobInfo; import cz.it4i.fiji.haas_java_client.JobState; import cz.it4i.fiji.haas_java_client.ProgressNotifier; import cz.it4i.fiji.haas_java_client.TransferFileProgressForHaaSClient; -import net.imagej.updater.util.Progress; +import cz.it4i.fiji.scpclient.TransferFileProgress; /*** * TASK - napojit na UI * @author koz01 @@ -63,7 +63,8 @@ public class Job { private PropertyHolder propertyHolder; private JobManager4Job jobManager; private Synchronization synchronization; - + + public Job(JobManager4Job jobManager, String name, Path basePath, Supplier<HaaSClient> haasClientSupplier) throws IOException { this(jobManager, haasClientSupplier); @@ -73,7 +74,7 @@ public class Job { propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILENAME)); Files.createDirectory(jobDir); setName(name); - + } public Job(JobManager4Job jobManager, Path jobDirectory, Supplier<HaaSClient> haasClientSupplier) { @@ -90,6 +91,7 @@ public class Job { this.jobManager = jobManager; } + public void startUploadData() { setProperty(JOB_NEEDS_UPLOAD, true); try { @@ -110,7 +112,7 @@ public class Job { } } - public void startDownload(Predicate<String> predicate, Progress notifier) throws IOException { + public void startDownload(Predicate<String> predicate) throws IOException { setProperty(JOB_NEEDS_DOWNLOAD, true); Collection<String> files = getHaaSClient().getChangedFiles(jobId).stream().filter(predicate) .collect(Collectors.toList()); @@ -297,7 +299,7 @@ public class Job { this.jobDir = jobDirectory; try { this.synchronization = new Synchronization( - () -> haasClientSupplier.get().startFileTransfer(getId(), HaaSClient.DUMMY_TRANSFER_FILE_PROGRESS), + ()->startFileTransfer(HaaSClient.DUMMY_TRANSFER_FILE_PROGRESS), jobDir, Executors.newFixedThreadPool(2), () -> { setProperty(JOB_NEEDS_UPLOAD, false); }, () -> { @@ -310,6 +312,10 @@ public class Job { } } + private HaaSFileTransfer startFileTransfer( TransferFileProgress progress) { + return haasClientSupplier.get().startFileTransfer(getId(), progress); + } + private synchronized void resumeUpload() { if (Boolean.parseBoolean(getProperty(JOB_NEEDS_UPLOAD))) { synchronization.resumeUpload(); @@ -360,4 +366,12 @@ public class Job { setProperty(JOB_CAN_BE_DOWNLOADED, b); } + public void setDownloadNotifier(ProgressNotifier notifier) { + synchronization.setDownloadNotifier(notifier); + } + + public void setUploadNotifier(ProgressNotifier notifier) { + synchronization.setUploadNotifier(notifier); + } + } diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersitentSynchronizationProcess.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersitentSynchronizationProcess.java index 6d9336153578471ce3cc3933e6665421f6ce6508..bfa3b2c451b44814a85883533b080430b81a9769 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersitentSynchronizationProcess.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersitentSynchronizationProcess.java @@ -13,6 +13,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cz.it4i.fiji.haas_java_client.HaaSFileTransfer; +import cz.it4i.fiji.haas_java_client.ProgressNotifier; +import cz.it4i.fiji.haas_java_client.TransferFileProgressForHaaSClient; +import cz.it4i.fiji.scpclient.TransferFileProgress; public abstract class PersitentSynchronizationProcess<T> { @@ -20,6 +23,12 @@ public abstract class PersitentSynchronizationProcess<T> { public static final Logger log = LoggerFactory .getLogger(cz.it4i.fiji.haas.data_transfer.PersitentSynchronizationProcess.class); + + private static final TransferFileProgress DUMMY_FILE_PROGRESS = new TransferFileProgress() { + @Override + public void dataTransfered(long bytesTransfered) { + } + }; private PersistentIndex<T> index; @@ -32,6 +41,8 @@ public abstract class PersitentSynchronizationProcess<T> { private Runnable processFinishedNotifier; private String name; + + private ProgressNotifier notifier; public PersitentSynchronizationProcess(String name,ExecutorService service, Supplier<HaaSFileTransfer> fileTransferSupplier, Runnable processFinishedNotifier, Path indexFile,Function<String,T> convertor) throws IOException { runner = new SimpleThreadRunner(service); @@ -74,6 +85,7 @@ public abstract class PersitentSynchronizationProcess<T> { private void doProcess(AtomicBoolean reRun) { try(HaaSFileTransfer tr = fileTransferSupplier.get()) { + tr.setProgress(getTransferFileProgress(tr)); while (!toProcessQueue.isEmpty()) { T p = toProcessQueue.poll(); @@ -91,7 +103,9 @@ public abstract class PersitentSynchronizationProcess<T> { } } } - + + abstract protected long getTotalSize(Iterable<T> items, HaaSFileTransfer tr); + private void fileUploaded(T p) { try { index.remove(p); @@ -101,5 +115,16 @@ public abstract class PersitentSynchronizationProcess<T> { } } + public void setNotifier(ProgressNotifier notifier) { + this.notifier = notifier; + } + + private TransferFileProgress getTransferFileProgress(HaaSFileTransfer tr) { + if(notifier == null) { + return DUMMY_FILE_PROGRESS; + } + return new TransferFileProgressForHaaSClient(getTotalSize(toProcessQueue, tr), notifier); + } + } diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java index 7f9ae99d66cd80e019c164a6aaee701d6f33fc15..fbb9bcebcc5ddcec3c11ad5a536bc7cf51c051e6 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java @@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory; import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile; import cz.it4i.fiji.haas_java_client.HaaSFileTransfer; +import cz.it4i.fiji.haas_java_client.ProgressNotifier; import cz.it4i.fiji.haas_java_client.UploadingFileImpl; public class Synchronization { @@ -52,6 +53,15 @@ public class Synchronization { this.downloadProcess = createDownloadProcess(fileTransferSupplier, service, downloadFinishedNotifier); } + public synchronized void setUploadNotifier(ProgressNotifier notifier) { + uploadProcess.setNotifier(notifier); + } + + public void setDownloadNotifier(ProgressNotifier notifier) { + // TODO Auto-generated method stub + + } + public synchronized void startUpload() throws IOException { uploadProcess.start(); } @@ -92,6 +102,7 @@ public class Synchronization { try(DirectoryStream<Path> ds = Files.newDirectoryStream(workingDirectory,Synchronization.this::canUpload)) { return StreamSupport.stream(ds.spliterator(), false).collect(Collectors.toList()); } + } @Override @@ -99,6 +110,18 @@ public class Synchronization { UploadingFile uf = new UploadingFileImpl(p); tr.upload(uf); } + + @Override + protected long getTotalSize(Iterable<Path> items, HaaSFileTransfer tr) { + return StreamSupport.stream(items.spliterator(), false).map(p->{ + try { + return Files.size(p); + } catch (IOException e) { + log.error(e.getMessage(), e); + return 0; + } + }).collect(Collectors.summingLong(val->val.longValue())); + } }; } @@ -138,6 +161,12 @@ public class Synchronization { } tr.download(file, workingDirectory); } + + @Override + protected long getTotalSize(Iterable<String> items, HaaSFileTransfer tr) { + + return tr.obtainSize( StreamSupport.stream(items.spliterator(), false).collect(Collectors.toList())).stream().collect(Collectors.summingLong(val->val)); + } } } diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/ui/ObservableValueRegistry.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/ui/ObservableValueRegistry.java index b47f0739cd6a60e14d2649f977a18334101d68a2..c976fb3549093222db043e3cc86c295acee6e517 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/ui/ObservableValueRegistry.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/ui/ObservableValueRegistry.java @@ -32,9 +32,13 @@ public class ObservableValueRegistry<T> { private Map<T,UpdatableObservableValue<T>> map = new LinkedHashMap<>(); public ObservableValue<T> addIfAbsent(T value) { - UpdatableObservableValue<T> uov = map.computeIfAbsent(value, v-> new UpdatableObservableValue<T>(v, updateFunction, stateProvider)); + UpdatableObservableValue<T> uov = map.computeIfAbsent(value, v-> constructObservableValue(v, updateFunction, stateProvider)); return uov; } + + protected UpdatableObservableValue<T> constructObservableValue(T v, Function<T, UpdateStatus> updateFunction, Function<T, Object> stateProvider) { + return new UpdatableObservableValue<T>(v, updateFunction, stateProvider); + } public UpdatableObservableValue<T> get(T value) { return map.get(value); diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java index 2741aa412f624e5fb4571f733e060b4368e63588..67cc49a8ae221c53e90253f0ebf6f1a0fa2aebd7 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java @@ -108,12 +108,12 @@ public class BenchmarkJobManager { return result; } - public void startDownload(Progress progress) throws IOException { + public void startDownload() throws IOException { if (job.getState() == JobState.Finished) { String filePattern = job.getProperty(SPIM_OUTPUT_FILENAME_PATTERN); - job.startDownload(downloadFinishedData(filePattern) , progress); + job.startDownload(downloadFinishedData(filePattern) ); } else if (job.getState() == JobState.Failed || job.getState() == JobState.Canceled) { - job.startDownload(downloadFailedData(), progress); + job.startDownload(downloadFailedData()); } } @@ -401,7 +401,22 @@ public class BenchmarkJobManager { return Stream.concat(nonTaskSpecificErrors.stream(), taskSpecificErrors).collect(Collectors.toList()); } - + public void setDownloadNotifier(Progress progress) { + if(progress == null) { + job.setDownloadNotifier(null); + } else { + job.setDownloadNotifier(new P_ProgressNotifierAdapter(progress)); + } + } + + + public void setUploadNotifier(Progress downloadProgress) { + if(downloadProgress == null) { + job.setUploadNotifier(null); + } else { + job.setUploadNotifier(new P_ProgressNotifierAdapter(downloadProgress)); + } + } } public BenchmarkJobManager(BenchmarkSPIMParameters params) throws IOException { diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/UpdatableBenchmarkJob.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/UpdatableBenchmarkJob.java new file mode 100644 index 0000000000000000000000000000000000000000..510134fb54b5baa1f61ef7e29d95b3488686c541 --- /dev/null +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/UpdatableBenchmarkJob.java @@ -0,0 +1,74 @@ +package cz.it4i.fiji.haas_spim_benchmark.core; + +import java.util.function.Function; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import cz.it4i.fiji.haas.ui.UpdatableObservableValue; +import cz.it4i.fiji.haas_spim_benchmark.core.BenchmarkJobManager.BenchmarkJob; +import net.imagej.updater.util.Progress; + +public class UpdatableBenchmarkJob extends UpdatableObservableValue<BenchmarkJob>{ + + public static final Logger log = LoggerFactory + .getLogger(cz.it4i.fiji.haas_spim_benchmark.core.UpdatableBenchmarkJob.class); + + private Progress downloadProgress = new P_DownloadProgress(); + + public UpdatableBenchmarkJob(BenchmarkJob wrapped, Function<BenchmarkJob, UpdateStatus> updateFunction, + Function<BenchmarkJob, Object> stateProvider) { + super(wrapped, updateFunction, stateProvider); + + wrapped.setDownloadNotifier(downloadProgress); + } + + public void removed() { + getValue().setDownloadNotifier(null); + } + + + private class P_DownloadProgress implements Progress { + + @Override + public void setTitle(String title) { + // TODO Auto-generated method stub + + } + + @Override + public void setCount(int count, int total) { + log.info("setCount count=" + count + ", total:" + total); + } + + @Override + public void addItem(Object item) { + // TODO Auto-generated method stub + + } + + @Override + public void setItemCount(int count, int total) { + // TODO Auto-generated method stub + + } + + @Override + public void itemDone(Object item) { + // TODO Auto-generated method stub + + } + + @Override + public void done() { + // TODO Auto-generated method stub + + } + + } + + + + + +} diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java index 0423b2610d363d3378ff38d8d06a589d4164e9c7..a2938a0c4cd549aa43c8e6d843c3705a63fad351 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java @@ -112,7 +112,7 @@ public class BenchmarkSPIMController extends BorderPane implements CloseableCont job -> JavaFXRoutines.notNullValue(job, j -> !EnumSet.of(JobState.Running).contains(j.getState()))); menu.addItem("Download result", - job -> executeWSCallAsync("Downloading data", p -> job.getValue().startDownload(p)), + job -> executeWSCallAsync("Downloading data", p -> job.getValue().startDownload()), job -> JavaFXRoutines.notNullValue(job, j -> EnumSet.of(JobState.Failed, JobState.Finished, JobState.Canceled).contains(j.getState()) && !j.canBeDownloaded())); diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/ObservableBenchmarkJobRegistry.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/ObservableBenchmarkJobRegistry.java index ed93b4a9805ffc6de619d69878c0e6e4fdf23527..560b3e056fe52abc7f3ea76c8f0c67875a60f325 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/ObservableBenchmarkJobRegistry.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/ObservableBenchmarkJobRegistry.java @@ -3,14 +3,18 @@ package cz.it4i.fiji.haas_spim_benchmark.ui; import java.nio.file.Files; import java.util.concurrent.Executor; import java.util.function.Consumer; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cz.it4i.fiji.haas.ui.ObservableValueRegistry; +import cz.it4i.fiji.haas.ui.UpdatableObservableValue; import cz.it4i.fiji.haas.ui.UpdatableObservableValue.UpdateStatus; import cz.it4i.fiji.haas_java_client.JobState; import cz.it4i.fiji.haas_spim_benchmark.core.BenchmarkJobManager.BenchmarkJob; +import cz.it4i.fiji.haas_spim_benchmark.core.UpdatableBenchmarkJob; +import javafx.beans.value.ObservableValue; public class ObservableBenchmarkJobRegistry extends ObservableValueRegistry<BenchmarkJob> { @@ -23,6 +27,31 @@ public class ObservableBenchmarkJobRegistry extends ObservableValueRegistry<Benc return t.getStateAsync(exec).getNow(null); }, removeConsumer); } + + @Override + public UpdatableBenchmarkJob addIfAbsent(BenchmarkJob value) { + return (UpdatableBenchmarkJob) super.addIfAbsent(value); + } + + @Override + public UpdatableBenchmarkJob get(BenchmarkJob value) { + return (UpdatableBenchmarkJob) super.get(value); + } + + @Override + protected ObservableValue<BenchmarkJob> remove(BenchmarkJob value) { + UpdatableBenchmarkJob result = (UpdatableBenchmarkJob) super.remove(value); + result.removed(); + return result; + } + + @Override + protected UpdatableObservableValue<BenchmarkJob> constructObservableValue(BenchmarkJob v, + Function<BenchmarkJob, UpdateStatus> updateFunction, Function<BenchmarkJob, Object> stateProvider) { + return new UpdatableBenchmarkJob(v, updateFunction, stateProvider); + } + + private static UpdateStatus update(BenchmarkJob t, Executor executor) { if (!Files.isDirectory(t.getDirectory())) { diff --git a/haas-spim-benchmark/src/test/java/cz/it4i/fiji/haas/RunBenchmark.java b/haas-spim-benchmark/src/test/java/cz/it4i/fiji/haas/RunBenchmark.java index 1e6f8cfa0ea2609ce3ae55768cd7ebad3435ed9f..6a5fbd4be91416c77fad569cb74753ad4a8ded8d 100644 --- a/haas-spim-benchmark/src/test/java/cz/it4i/fiji/haas/RunBenchmark.java +++ b/haas-spim-benchmark/src/test/java/cz/it4i/fiji/haas/RunBenchmark.java @@ -34,7 +34,7 @@ public class RunBenchmark { if (state == JobState.Configuring) { job.startJob(new DummyProgress()); } else if (state != JobState.Running && state != JobState.Queued) { - job.startDownload(new DummyProgress()); + job.startDownload(); } else if (state == JobState.Running) { log.info(job.getSnakemakeOutput()); }