diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java index a94f98f64f827ba2c90b74c2cb4e83b4771fa2df..9d9921d29377110c6c0bff1f2f5964c0558a6e95 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java @@ -2,6 +2,7 @@ package cz.it4i.fiji.haas; import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; @@ -124,7 +125,7 @@ public class Job { public void stopDownloadData() { setProperty(JOB_NEEDS_DOWNLOAD, false); try { - this.synchronization.stopUpload(); + this.synchronization.stopDownload(); } catch (IOException e) { log.error(e.getMessage(), e); throw new RuntimeException(e); @@ -132,13 +133,15 @@ public class Job { } public synchronized void resumeUpload() { - if (Boolean.parseBoolean(getProperty(JOB_NEEDS_UPLOAD))) { + if (needsUpload()) { synchronization.resumeUpload(); } } + + public synchronized void resumeDownload() { - if (Boolean.parseBoolean(getProperty(JOB_NEEDS_DOWNLOAD))) { + if (needsDownload()) { synchronization.resumeDownload(); } } @@ -164,6 +167,14 @@ public class Job { return getSafeBoolean(getProperty(JOB_IS_DOWNLOADED)); } + public boolean needsDownload() { + return Boolean.parseBoolean(getProperty(JOB_NEEDS_DOWNLOAD)); + } + + public boolean needsUpload() { + return Boolean.parseBoolean(getProperty(JOB_NEEDS_UPLOAD)); + } + public void uploadFile(String file, ProgressNotifier notifier) { uploadFiles(Arrays.asList(file), notifier); } @@ -188,7 +199,12 @@ public class Job { String item; progress.startNewFile(totalSizes.get(index)); notifier.addItem(item = "Uploading file: " + file.getName()); - transfer.upload(file); + try { + transfer.upload(file); + } catch (InterruptedIOException e) { + notifier.itemDone(item); + return; + } notifier.itemDone(item); index++; } @@ -231,7 +247,12 @@ public class Job { String item; progress.addItem(item = fileName); progress.startNewFile(fileSizes.get(idx)); - transfer.download(fileName, jobDir); + try { + transfer.download(fileName, jobDir); + } catch (InterruptedIOException e) { + progress.itemDone(item); + return; + } progress.itemDone(item); idx++; } diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersitentSynchronizationProcess.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersistentSynchronizationProcess.java similarity index 77% rename from haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersitentSynchronizationProcess.java rename to haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersistentSynchronizationProcess.java index ae3733923e7add9b26b2316934b2eb8fd6411cd8..fee14f5e84ad20678610900b50afd6c773bb9750 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersitentSynchronizationProcess.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersistentSynchronizationProcess.java @@ -1,8 +1,12 @@ package cz.it4i.fiji.haas.data_transfer; import java.io.IOException; +import java.io.InterruptedIOException; import java.nio.file.Path; +import java.util.Collections; +import java.util.HashSet; import java.util.Queue; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -17,12 +21,12 @@ import cz.it4i.fiji.haas_java_client.HaaSFileTransfer; import cz.it4i.fiji.haas_java_client.ProgressNotifier; import cz.it4i.fiji.haas_java_client.TransferFileProgressForHaaSClient; -public abstract class PersitentSynchronizationProcess<T> { +public abstract class PersistentSynchronizationProcess<T> { private boolean startFinished = true; public static final Logger log = LoggerFactory - .getLogger(cz.it4i.fiji.haas.data_transfer.PersitentSynchronizationProcess.class); + .getLogger(cz.it4i.fiji.haas.data_transfer.PersistentSynchronizationProcess.class); private static final TransferFileProgressForHaaSClient DUMMY_FILE_PROGRESS = new TransferFileProgressForHaaSClient( 0, HaaSClient.DUMMY_PROGRESS_NOTIFIER); @@ -30,6 +34,8 @@ public abstract class PersitentSynchronizationProcess<T> { private PersistentIndex<T> index; private Queue<T> toProcessQueue = new LinkedBlockingQueue<T>(); + + private Set<Thread> runningTransferThreads = Collections.synchronizedSet(new HashSet<>()); private SimpleThreadRunner runner; @@ -39,7 +45,7 @@ public abstract class PersitentSynchronizationProcess<T> { private ProgressNotifier notifier; - public PersitentSynchronizationProcess(ExecutorService service, + public PersistentSynchronizationProcess(ExecutorService service, Supplier<HaaSFileTransfer> fileTransferSupplier, Runnable processFinishedNotifier, Path indexFile, Function<String, T> convertor) throws IOException { runner = new SimpleThreadRunner(service); @@ -67,6 +73,7 @@ public abstract class PersitentSynchronizationProcess<T> { toProcessQueue.clear(); index.clear(); notifier.setCount(-1, -1); + runningTransferThreads.forEach(t -> t.interrupt()); } public void resume() { @@ -76,25 +83,33 @@ public abstract class PersitentSynchronizationProcess<T> { abstract protected Iterable<T> getItems() throws IOException; - abstract protected void processItem(HaaSFileTransfer tr, T p); + abstract protected void processItem(HaaSFileTransfer tr, T p) throws InterruptedIOException; private void doProcess(AtomicBoolean reRun) { + boolean interrupted = false; try (HaaSFileTransfer tr = fileTransferSupplier.get()) { TransferFileProgressForHaaSClient notifier; tr.setProgress(notifier = getTransferFileProgress(tr)); + runningTransferThreads.add(Thread.currentThread()); while (!toProcessQueue.isEmpty()) { T p = toProcessQueue.poll(); String item = p.toString(); notifier.addItem(item); - processItem(tr, p); - fileUploaded(p); + try { + processItem(tr, p); + fileUploaded(p); + } catch (InterruptedIOException e) { + toProcessQueue.clear(); + interrupted = true; + } notifier.itemDone(item); reRun.set(false); } + runningTransferThreads.remove(Thread.currentThread()); notifier.done(); } finally { synchronized (this) { - if (startFinished) { + if (startFinished && !interrupted && !Thread.interrupted()) { processFinishedNotifier.run(); } } diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java index 3a0011fafec5a618d3ed252f311d6f8bae43908b..268e5a46c3097ce8646ed2eaaab9ee84c92cc951 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java @@ -1,6 +1,7 @@ package cz.it4i.fiji.haas.data_transfer; import java.io.IOException; +import java.io.InterruptedIOException; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; @@ -37,7 +38,7 @@ public class Synchronization { private PersistentIndex<Path> filesDownloaded; - private PersitentSynchronizationProcess<Path> uploadProcess; + private PersistentSynchronizationProcess<Path> uploadProcess; private P_PersistentDownloadProcess downloadProcess; @@ -92,9 +93,9 @@ public class Synchronization { return !file.getFileName().toString().matches("[.][^.]+") && !filesDownloaded.contains(file); } - private PersitentSynchronizationProcess<Path> createUploadProcess(Supplier<HaaSFileTransfer> fileTransferSupplier, + private PersistentSynchronizationProcess<Path> createUploadProcess(Supplier<HaaSFileTransfer> fileTransferSupplier, ExecutorService service, Runnable uploadFinishedNotifier) throws IOException { - return new PersitentSynchronizationProcess<Path>(service, fileTransferSupplier, uploadFinishedNotifier, + return new PersistentSynchronizationProcess<Path>(service, fileTransferSupplier, uploadFinishedNotifier, workingDirectory.resolve(FILE_INDEX_TO_UPLOAD_FILENAME), pathResolver) { @Override @@ -106,7 +107,7 @@ public class Synchronization { } @Override - protected void processItem(HaaSFileTransfer tr, Path p) { + protected void processItem(HaaSFileTransfer tr, Path p) throws InterruptedIOException { UploadingFile uf = new UploadingFileImpl(p); tr.upload(uf); } @@ -132,7 +133,7 @@ public class Synchronization { return new P_PersistentDownloadProcess(service, fileTransferSupplier, uploadFinishedNotifier); } - private class P_PersistentDownloadProcess extends PersitentSynchronizationProcess<String>{ + private class P_PersistentDownloadProcess extends PersistentSynchronizationProcess<String>{ private Collection<String> items = Collections.emptyList(); @@ -152,7 +153,7 @@ public class Synchronization { } @Override - protected void processItem(HaaSFileTransfer tr, String file) { + protected void processItem(HaaSFileTransfer tr, String file) throws InterruptedIOException { filesDownloaded.insert(workingDirectory.resolve(file)); try { filesDownloaded.storeToFile(); diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/ui/TableViewContextMenu.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/ui/TableViewContextMenu.java index 0789e95c66f6a7124e5398cecb10e6a107a67923..2ea1c0d8d1a907e74bd43dc65c0630291dbb3552 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/ui/TableViewContextMenu.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/ui/TableViewContextMenu.java @@ -5,12 +5,14 @@ import java.util.LinkedList; import java.util.function.BiConsumer; import java.util.function.BiPredicate; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javafx.event.EventHandler; +import javafx.scene.control.CheckMenuItem; import javafx.scene.control.ContextMenu; import javafx.scene.control.MenuItem; import javafx.scene.control.TableColumn; @@ -21,7 +23,7 @@ public class TableViewContextMenu<T> { public final static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.ui.TableViewContextMenu.class); - private final Collection<P_MenuItem> items = new LinkedList<>(); + private final Collection<P_Updetable<?>> items = new LinkedList<>(); private final Collection<P_MenuItemWithColumnIndex> itemsWithColumnIndex = new LinkedList<>(); private TableView<T> tableView; @@ -39,6 +41,11 @@ public class TableViewContextMenu<T> { public void addItem(String text, BiConsumer<T, Integer> eventHandler, BiPredicate<T, Integer> enableHandler) { itemsWithColumnIndex.add(new P_MenuItemWithColumnIndex(text, eventHandler, enableHandler)); } + + public void addItem(String text, Consumer<T> eventHandlerOn, Consumer<T> eventHandlerOff, + Predicate<T> enableHandler, Function<T, Boolean> property) { + items.add(new P_CheckMenuItem(text, eventHandlerOff, eventHandlerOn, enableHandler, property)); + } private T getRequestedItem() { return tableView.getFocusModel().getFocusedItem(); @@ -82,21 +89,35 @@ public class TableViewContextMenu<T> { return cm; } - private class P_MenuItem { - - private MenuItem item; - private Predicate<T> enableHandler; - - public P_MenuItem(String text, Consumer<T> eventHandler, Predicate<T> enableHandler) { + private class P_Updetable<I extends MenuItem> { + + private I item; + private Predicate<T> enableHandler; + + + public P_Updetable(I item, Predicate<T> enableHandler) { + this.item = item; this.enableHandler = enableHandler; - item = new MenuItem(text); - item.setOnAction(e -> eventHandler.accept(getRequestedItem())); - getOrCreateContextMenu().getItems().add(item); + getOrCreateContextMenu().getItems().add(getItem()); } public void updateEnable(T selected) { item.setDisable(!enableHandler.test(selected)); } + + protected I getItem() { + return item; + } + + } + + private class P_MenuItem extends P_Updetable<MenuItem>{ + public P_MenuItem(String text, Consumer<T> eventHandler, Predicate<T> enableHandler) { + super(new MenuItem(text), enableHandler); + + getItem().setOnAction(e -> eventHandler.accept(getRequestedItem())); + + } } @@ -119,4 +140,30 @@ public class TableViewContextMenu<T> { } + private class P_CheckMenuItem extends P_Updetable<CheckMenuItem>{ + + private Function<T, Boolean> property; + + public P_CheckMenuItem(String text, Consumer<T> eventHandlerOff, Consumer<T> eventHandlerOn, + Predicate<T> enableHandler, Function<T,Boolean> property) { + super(new CheckMenuItem(text), enableHandler); + this.property = property; + getItem().setOnAction(e -> { + boolean selected = getItem().isSelected(); + if(selected) { + eventHandlerOn.accept(getRequestedItem()); + } else { + eventHandlerOff.accept(getRequestedItem()); + } + }); + } + + @Override + public void updateEnable(T selected) { + super.updateEnable(selected); + getItem().setSelected(property.apply(selected)); + } + + } + } diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransfer.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransfer.java index e3c751a3c89bbf44a9135d91b70f775dac3133db..59d482ece293f89140bab989a57ca47bc74aa12c 100644 --- a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransfer.java +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransfer.java @@ -1,6 +1,7 @@ package cz.it4i.fiji.haas_java_client; import java.io.Closeable; +import java.io.InterruptedIOException; import java.nio.file.Path; import java.util.List; @@ -13,9 +14,9 @@ public interface HaaSFileTransfer extends Closeable { @Override void close(); - void upload(UploadingFile files); + void upload(UploadingFile files) throws InterruptedIOException; - void download(String files, Path workDirectory); + void download(String files, Path workDirectory) throws InterruptedIOException; List<Long> obtainSize(List<String> files); diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransferImp.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransferImp.java index ae970984461097d6c39fb106a9084dc04d508a80..90e117b58c83f970a0fb909f4cfe10ac73dc6b83 100644 --- a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransferImp.java +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransferImp.java @@ -3,6 +3,7 @@ package cz.it4i.fiji.haas_java_client; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; import java.nio.file.Path; import java.util.LinkedList; import java.util.List; @@ -39,25 +40,30 @@ class HaaSFileTransferImp implements HaaSFileTransfer { } @Override - public void upload(UploadingFile file) { + public void upload(UploadingFile file) throws InterruptedIOException{ String destFile = "'" + ft.getSharedBasepath() + "/" + file.getName() + "'"; try (InputStream is = file.getInputStream()) { boolean result = scpClient.upload(is, destFile, file.getLength(), file.getLastTime(), progress); if (!result) { throw new HaaSClientException("Uploading of " + file + " to " + destFile + " failed"); } + } catch(InterruptedIOException e) { + throw e; } catch (JSchException | IOException e) { - throw new HaaSClientException(); + throw new HaaSClientException(e); } } @Override - public void download(String fileName, Path workDirectory) { + public void download(String fileName, Path workDirectory) throws InterruptedIOException{ try { fileName = fileName.replaceFirst("/", ""); Path rFile = workDirectory.resolve(fileName); String fileToDownload = "'" + ft.getSharedBasepath() + "/" + fileName + "'"; scpClient.download(fileToDownload, rFile, progress); + } catch(InterruptedIOException e) { + log.info("upload interrupted flag: " + Thread.currentThread().isInterrupted()); + throw e; } catch (JSchException | IOException e) { throw new HaaSClientException(e); } @@ -68,36 +74,6 @@ class HaaSFileTransferImp implements HaaSFileTransfer { this.progress = progress; } - /* - @Override - public void download(Iterable<String> files, Path workDirectory) { - List<Long> fileSizes; - try { - fileSizes = HaaSClient.getSizes(StreamSupport.stream(files.spliterator(), false) - .map(filename -> "'" + ft.getSharedBasepath() + "/" + filename + "'").collect(Collectors.toList()), - scpClient); - - final long totalFileSize = fileSizes.stream().mapToLong(i -> i.longValue()).sum(); - TransferFileProgressForHaaSClient progress = new TransferFileProgressForHaaSClient(totalFileSize, - HaaSClient.DUMMY_PROGRESS_NOTIFIER); - int idx = 0; - for (String fileName : files) { - fileName = fileName.replaceFirst("/", ""); - Path rFile = workDirectory.resolve(fileName); - String fileToDownload = "'" + ft.getSharedBasepath() + "/" + fileName + "'"; - String item; - progress.addItem(item = fileName); - progress.startNewFile(fileSizes.get(idx)); - scpClient.download(fileToDownload, rFile, progress); - progress.itemDone(item); - idx++; - } - } catch (JSchException | IOException e) { - throw new HaaSClientException(e); - } - } - */ - @Override public List<Long> obtainSize(List<String> files) { try { @@ -128,7 +104,7 @@ class HaaSFileTransferImp implements HaaSFileTransfer { } return result; } - + private String replaceIfFirstFirst(String fileName, String string, String string2) { if (fileName.length() < 0 && fileName.charAt(0) == '/') { fileName = fileName.substring(1); diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/LambdaExceptionHandlerWrapper.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/LambdaExceptionHandlerWrapper.java new file mode 100644 index 0000000000000000000000000000000000000000..2abb7fc87cdd2c14791620d4f1b77355a7fec8dd --- /dev/null +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/LambdaExceptionHandlerWrapper.java @@ -0,0 +1,16 @@ +package cz.it4i.fiji.haas_java_client; + +public class LambdaExceptionHandlerWrapper { + + public interface Runnable { + void run() throws Exception; + } + + public static void wrap(Runnable r) { + try { + r.run(); + } catch (Exception e) { + //ignore + } + } +} diff --git a/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestHaaSJavaClient.java b/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestHaaSJavaClient.java index 886a34da546abb8359aee67c0289610c62b82291..b8b693824565f80dc1ff91300cd5db7ba8074353 100644 --- a/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestHaaSJavaClient.java +++ b/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestHaaSJavaClient.java @@ -1,5 +1,7 @@ package cz.it4i.fiji.haas_java_client; +import static cz.it4i.fiji.haas_java_client.LambdaExceptionHandlerWrapper.wrap; + import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -49,7 +51,7 @@ public class TestHaaSJavaClient { if (info.getState() == JobState.Finished) { try (HaaSFileTransfer fileTransfer = client.startFileTransfer(jobId, HaaSClient.DUMMY_TRANSFER_FILE_PROGRESS)) { - client.getChangedFiles(jobId).forEach(file -> fileTransfer.download(file, workDir)); + client.getChangedFiles(jobId).forEach(file -> wrap(() -> fileTransfer.download(file, workDir))); } } log.info("JobId :" + jobId + ", state" + info.getState()); diff --git a/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestHaaSJavaClientWithSPIM.java b/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestHaaSJavaClientWithSPIM.java index c4afe21c9197219255e1885608c99eb5fff9266c..1feeda28fab960db550d1500923058b288ffc4dc 100644 --- a/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestHaaSJavaClientWithSPIM.java +++ b/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestHaaSJavaClientWithSPIM.java @@ -14,24 +14,24 @@ import org.slf4j.LoggerFactory; import cz.it4i.fiji.haas_java_client.HaaSClient.SynchronizableFiles; import cz.it4i.fiji.haas_java_client.proxy.JobFileContentExt; +import static cz.it4i.fiji.haas_java_client.LambdaExceptionHandlerWrapper.wrap; public class TestHaaSJavaClientWithSPIM { private static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas_java_client.TestHaaSJavaClientWithSPIM.class); - + public static void main(String[] args) throws ServiceException, IOException { HaaSClient client = new HaaSClient(TestingConstants.getSettings(2, 9600, 6l, "DD-17-31")); Path baseDir = Paths.get("/home/koz01/Work/vyzkumnik/fiji/work/aaa"); - long jobId = client.createJob( "TestOutRedirect", - Collections.emptyList()); - - try(HaaSFileTransfer tr = client.startFileTransfer(jobId, HaaSClient.DUMMY_TRANSFER_FILE_PROGRESS)) { + long jobId = client.createJob("TestOutRedirect", Collections.emptyList()); + + try (HaaSFileTransfer tr = client.startFileTransfer(jobId, HaaSClient.DUMMY_TRANSFER_FILE_PROGRESS)) { StreamSupport.stream(getAllFiles(baseDir.resolve("spim-data")).spliterator(), false) - .map(UploadingFileImpl::new).forEach(f -> tr.upload(f)); + .map(UploadingFileImpl::new).forEach(f -> wrap(() -> tr.upload(f))); } client.submitJob(jobId); - + Path workDir = baseDir.resolve("" + jobId); if (!Files.isDirectory(workDir)) { Files.createDirectories(workDir); @@ -53,8 +53,9 @@ public class TestHaaSJavaClientWithSPIM { } client.downloadPartsOfJobFiles(jobId, taskFileOffset).forEach(jfc -> showJFC(jfc)); if (info.getState() == JobState.Finished) { - try (HaaSFileTransfer fileTransfer = client.startFileTransfer(jobId, HaaSClient.DUMMY_TRANSFER_FILE_PROGRESS)) { - client.getChangedFiles(jobId).forEach(file -> fileTransfer.download(file, workDir)); + try (HaaSFileTransfer fileTransfer = client.startFileTransfer(jobId, + HaaSClient.DUMMY_TRANSFER_FILE_PROGRESS)) { + client.getChangedFiles(jobId).forEach(file -> wrap(() -> fileTransfer.download(file, workDir))); } } diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java index a19610bbce1a5b9e512796b8399b82c325d80925..924bf2b766fc296468c196b0d18198b00e1d16b3 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java @@ -434,6 +434,18 @@ public class BenchmarkJobManager { public boolean isUploaded() { return job.isUploaded(); } + + public void stopDownload() { + job.stopDownloadData(); + } + + public boolean needsDownload() { + return job.needsDownload(); + } + + public boolean needsUpload() { + return job.needsUpload(); + } } public BenchmarkJobManager(BenchmarkSPIMParameters params) throws IOException { diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/UpdatableBenchmarkJob.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/UpdatableBenchmarkJob.java index 10fcbd2243d54f4a9ce0fac16bb1a93f6bfce57d..5de718cc70e19ca8af4b125d1c26df85e463c076 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/UpdatableBenchmarkJob.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/UpdatableBenchmarkJob.java @@ -18,9 +18,9 @@ public class UpdatableBenchmarkJob extends UpdatableObservableValue<BenchmarkJob .getLogger(cz.it4i.fiji.haas_spim_benchmark.core.UpdatableBenchmarkJob.class); private P_TransferProgress downloadProgress = new P_TransferProgress(val -> getValue().setDownloaded(val), - () -> getValue().isDownloaded()); + () -> getValue().isDownloaded(), () -> getValue().needsDownload()); private P_TransferProgress uploadProgress = new P_TransferProgress(val -> getValue().setUploaded(val), - () -> getValue().isUploaded()); + () -> getValue().isUploaded(), () -> getValue().needsUpload()); private Executor executor; public interface TransferProgress { @@ -58,28 +58,29 @@ public class UpdatableBenchmarkJob extends UpdatableObservableValue<BenchmarkJob @Override protected void fireValueChangedEvent() { - executor.execute(() -> super.fireValueChangedEvent()); + executor.execute(() -> { + super.fireValueChangedEvent(); + }); } private class P_TransferProgress implements Progress, TransferProgress { - private boolean working; - // private boolean done; private long start; private Long remainingMiliseconds; private Float remainingPercents; private Supplier<Boolean> doneStatusSupplier; private Consumer<Boolean> doneStatusConsumer; + private Supplier<Boolean> workingSupplier; - public P_TransferProgress(Consumer<Boolean> doneStatusConsumer, Supplier<Boolean> doneStatusSupplier) { + public P_TransferProgress(Consumer<Boolean> doneStatusConsumer, Supplier<Boolean> doneStatusSupplier, Supplier<Boolean> workingSupplier) { this.doneStatusConsumer = doneStatusConsumer; this.doneStatusSupplier = doneStatusSupplier; + this.workingSupplier = workingSupplier; } @Override public synchronized void setCount(int count, int total) { if (total < -1) { - working = false; remainingMiliseconds = null; remainingPercents = null; } else { @@ -92,20 +93,14 @@ public class UpdatableBenchmarkJob extends UpdatableObservableValue<BenchmarkJob @Override public synchronized void addItem(Object item) { - if (!working) { - setDone(false); - working = true; - start = System.currentTimeMillis(); - } + setDone(false); + start = System.currentTimeMillis(); + fireValueChangedEvent(); } @Override public synchronized void done() { - if (working) { - setDone(true); - } - working = false; remainingMiliseconds = 0l; remainingPercents = 0.f; fireValueChangedEvent(); @@ -113,7 +108,7 @@ public class UpdatableBenchmarkJob extends UpdatableObservableValue<BenchmarkJob @Override public synchronized boolean isWorking() { - return working; + return workingSupplier.get(); } @Override diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java index a8e6ef50583ba256f1816eac02f2e28ce58978bb..a4a6e994fad856689fea06494a2d28ceb8a4fc4f 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMController.java @@ -110,13 +110,19 @@ public class BenchmarkSPIMController extends BorderPane implements CloseableCont || j.getState() == JobState.Failed || j.getState() == JobState.Canceled)); menu.addItem("Upload data", job -> executeWSCallAsync("Uploading data", p -> job.getValue().startUpload()), - job -> JavaFXRoutines.notNullValue(job, j -> !EnumSet.of(JobState.Running).contains(j.getState()))); + job -> executeWSCallAsync("Stop uploading data", p -> job.getValue().stopUpload()), + job -> JavaFXRoutines.notNullValue(job, j -> !EnumSet.of(JobState.Running).contains(j.getState())), + job -> registry.get(job.getValue()).getUploadProgress().isWorking()); + menu.addItem("Download result", job -> executeWSCallAsync("Downloading data", p -> job.getValue().startDownload()), + job -> executeWSCallAsync("Stop downloading data", p -> job.getValue().stopDownload()), job -> JavaFXRoutines.notNullValue(job, j -> EnumSet.of(JobState.Failed, JobState.Finished, JobState.Canceled).contains(j.getState()) - && !j.canBeDownloaded())); + && j.canBeDownloaded()), + job -> registry.get(job.getValue()).getDownloadProgress().isWorking()); + menu.addItem("Download statistics", job -> executeWSCallAsync("Downloading data", p -> job.getValue().downloadStatistics(p)), job -> JavaFXRoutines.notNullValue(job, j -> j.getState() == JobState.Finished)); diff --git a/java-scpclient/pom.xml b/java-scpclient/pom.xml index 13a71e9d10be0a2da911d26aa08cf9b6bfbb48a5..a0dc939bccf7616df58f1cdaa78d9849c0aea4e8 100644 --- a/java-scpclient/pom.xml +++ b/java-scpclient/pom.xml @@ -32,5 +32,18 @@ <artifactId>jsch</artifactId> <version>0.1.54</version> </dependency> + <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.25</version> + </dependency> + <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-jdk14 --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <version>1.7.25</version> + <optional>true</optional> + </dependency> </dependencies> </project> diff --git a/java-scpclient/src/main/java/cz/it4i/fiji/scpclient/ScpClient.java b/java-scpclient/src/main/java/cz/it4i/fiji/scpclient/ScpClient.java index a25697caf0d88cfb3c9daa69c7429fc79710fb06..f3e79881244969f20ed0b5d772f73381599b5e06 100644 --- a/java-scpclient/src/main/java/cz/it4i/fiji/scpclient/ScpClient.java +++ b/java-scpclient/src/main/java/cz/it4i/fiji/scpclient/ScpClient.java @@ -3,14 +3,19 @@ package cz.it4i.fiji.scpclient; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; import java.io.OutputStream; import java.io.UnsupportedEncodingException; +import java.nio.channels.ClosedByInterruptException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.jcraft.jsch.Channel; import com.jcraft.jsch.ChannelExec; import com.jcraft.jsch.ChannelSftp; @@ -24,6 +29,9 @@ import com.jcraft.jsch.UserInfo; public class ScpClient implements Closeable { + public static final Logger log = LoggerFactory.getLogger(cz.it4i.fiji.scpclient.ScpClient.class); + + private String hostName; private String username; private JSch jsch = new JSch(); @@ -237,6 +245,9 @@ public class ScpClient implements Closeable { } out.close(); + } catch (ClosedByInterruptException e) { + Thread.interrupted(); + throw new InterruptedIOException(); } finally { channel.disconnect(); }