From 79c2786b77e5c1654f870e0070cdae85189f36e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Ko=C5=BEusznik?= <jan@kozusznik.cz> Date: Thu, 17 May 2018 13:36:54 +0200 Subject: [PATCH] feat: split input and output directory --- .../haas/data_transfer/Synchronization.java | 93 +++++++++++-------- 1 file changed, 52 insertions(+), 41 deletions(-) diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java index 5e17ff9d..d22bfda4 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java @@ -6,12 +6,12 @@ import java.io.InterruptedIOException; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -24,20 +24,22 @@ import cz.it4i.fiji.haas_java_client.ProgressNotifier; import cz.it4i.fiji.haas_java_client.UploadingFile; import cz.it4i.fiji.haas_java_client.UploadingFileImpl; -public class Synchronization implements Closeable{ +public class Synchronization implements Closeable { public static final Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.data_transfer.Synchronization.class); - + private static final String FILE_INDEX_TO_UPLOAD_FILENAME = ".toUploadFiles"; - + private static final String FILE_INDEX_TO_DOWNLOAD_FILENAME = ".toDownloadFiles"; - + private static final String FILE_INDEX_DOWNLOADED_FILENAME = ".downloaded"; - + private final Path workingDirectory; - - private final Function<String,Path> pathResolver; - + + private final Path inputDirectory; + + private final Path outputDirectory; + private final PersistentIndex<Path> filesDownloaded; private final PersistentSynchronizationProcess<Path> uploadProcess; @@ -45,14 +47,22 @@ public class Synchronization implements Closeable{ private final P_PersistentDownloadProcess downloadProcess; private final ExecutorService service; - + public Synchronization(Supplier<HaaSFileTransfer> fileTransferSupplier, Path workingDirectory, - Runnable uploadFinishedNotifier, Runnable downloadFinishedNotifier) throws IOException { - this.service = Executors.newFixedThreadPool(2); + Runnable uploadFinishedNotifier, Runnable downloadFinishedNotifier) throws IOException { + this(fileTransferSupplier, workingDirectory, workingDirectory, workingDirectory, uploadFinishedNotifier, + downloadFinishedNotifier); + } + + public Synchronization(Supplier<HaaSFileTransfer> fileTransferSupplier, Path workingDirectory, Path inputDirectory, + Path outputDirectory, Runnable uploadFinishedNotifier, Runnable downloadFinishedNotifier) + throws IOException { this.workingDirectory = workingDirectory; - this.pathResolver = name -> workingDirectory.resolve(name); + this.inputDirectory = inputDirectory; + this.outputDirectory = outputDirectory; + this.service = Executors.newFixedThreadPool(2); this.filesDownloaded = new PersistentIndex<>(workingDirectory.resolve(FILE_INDEX_DOWNLOADED_FILENAME), - pathResolver); + name -> Paths.get(name)); this.uploadProcess = createUploadProcess(fileTransferSupplier, service, uploadFinishedNotifier); this.downloadProcess = createDownloadProcess(fileTransferSupplier, service, downloadFinishedNotifier); } @@ -60,16 +70,16 @@ public class Synchronization implements Closeable{ public synchronized void setUploadNotifier(ProgressNotifier notifier) { uploadProcess.setNotifier(notifier); } - + public void setDownloadNotifier(ProgressNotifier notifier) { downloadProcess.setNotifier(notifier); - + } public synchronized void startUpload() throws IOException { uploadProcess.start(); } - + public void stopUpload() throws IOException { uploadProcess.stop(); } @@ -77,16 +87,16 @@ public class Synchronization implements Closeable{ public void resumeUpload() { uploadProcess.resume(); } - + public synchronized void startDownload(Collection<String> files) throws IOException { this.downloadProcess.setItems(files); this.downloadProcess.start(); } - + public synchronized void stopDownload() throws IOException { this.downloadProcess.stop(); } - + public synchronized void resumeDownload() { this.downloadProcess.resume(); } @@ -99,23 +109,24 @@ public class Synchronization implements Closeable{ } private boolean canUpload(Path file) { - + return !file.getFileName().toString().matches("[.][^.]+") && !filesDownloaded.contains(file); } private PersistentSynchronizationProcess<Path> createUploadProcess(Supplier<HaaSFileTransfer> fileTransferSupplier, ExecutorService service, Runnable uploadFinishedNotifier) throws IOException { return new PersistentSynchronizationProcess<Path>(service, fileTransferSupplier, uploadFinishedNotifier, - workingDirectory.resolve(FILE_INDEX_TO_UPLOAD_FILENAME), pathResolver) { + workingDirectory.resolve(FILE_INDEX_TO_UPLOAD_FILENAME), name -> inputDirectory.resolve(name)) { @Override protected Iterable<Path> getItems() throws IOException { - try(DirectoryStream<Path> ds = Files.newDirectoryStream(workingDirectory,Synchronization.this::canUpload)) { - return StreamSupport.stream(ds.spliterator(), false).collect(Collectors.toList()); + try (DirectoryStream<Path> ds = Files.newDirectoryStream(inputDirectory, + Synchronization.this::canUpload)) { + return StreamSupport.stream(ds.spliterator(), false).collect(Collectors.toList()); } - + } - + @Override protected void processItem(HaaSFileTransfer tr, Path p) throws InterruptedIOException { UploadingFile uf = new UploadingFileImpl(p); @@ -124,29 +135,28 @@ public class Synchronization implements Closeable{ @Override protected long getTotalSize(Iterable<Path> items, HaaSFileTransfer tr) { - return StreamSupport.stream(items.spliterator(), false).map(p->{ + return StreamSupport.stream(items.spliterator(), false).map(p -> { try { return Files.size(p); } catch (IOException e) { log.error(e.getMessage(), e); - return 0; + return 0; } - }).collect(Collectors.summingLong(val->val.longValue())); + }).collect(Collectors.summingLong(val -> val.longValue())); } }; } - private P_PersistentDownloadProcess createDownloadProcess( - Supplier<HaaSFileTransfer> fileTransferSupplier, ExecutorService service, - Runnable uploadFinishedNotifier) throws IOException { - + private P_PersistentDownloadProcess createDownloadProcess(Supplier<HaaSFileTransfer> fileTransferSupplier, + ExecutorService service, Runnable uploadFinishedNotifier) throws IOException { + return new P_PersistentDownloadProcess(service, fileTransferSupplier, uploadFinishedNotifier); } - - private class P_PersistentDownloadProcess extends PersistentSynchronizationProcess<String>{ + + private class P_PersistentDownloadProcess extends PersistentSynchronizationProcess<String> { private Collection<String> items = Collections.emptyList(); - + public P_PersistentDownloadProcess(ExecutorService service, Supplier<HaaSFileTransfer> fileTransferSupplier, Runnable processFinishedNotifier) throws IOException { super(service, fileTransferSupplier, processFinishedNotifier, @@ -156,7 +166,7 @@ public class Synchronization implements Closeable{ private synchronized void setItems(Collection<String> items) { this.items = new LinkedList<>(items); } - + @Override protected synchronized Iterable<String> getItems() throws IOException { return items; @@ -164,19 +174,20 @@ public class Synchronization implements Closeable{ @Override protected void processItem(HaaSFileTransfer tr, String file) throws InterruptedIOException { - filesDownloaded.insert(workingDirectory.resolve(file)); + filesDownloaded.insert(outputDirectory.resolve(file)); try { filesDownloaded.storeToWorkingFile(); } catch (IOException e) { log.error(e.getMessage(), e); } - tr.download(file, workingDirectory); + tr.download(file, outputDirectory); } @Override protected long getTotalSize(Iterable<String> items, HaaSFileTransfer tr) throws InterruptedIOException { - return tr.obtainSize( StreamSupport.stream(items.spliterator(), false).collect(Collectors.toList())).stream().collect(Collectors.summingLong(val->val)); + return tr.obtainSize(StreamSupport.stream(items.spliterator(), false).collect(Collectors.toList())).stream() + .collect(Collectors.summingLong(val -> val)); } - + } } -- GitLab