diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/FileIndex.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersistentIndex.java similarity index 61% rename from haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/FileIndex.java rename to haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersistentIndex.java index b3a8753b9dd77de2b03099d5348a02f5b3ebddd0..9a66b392b7c6956a42080565fa038e46eb96ddc4 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/FileIndex.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersistentIndex.java @@ -5,46 +5,58 @@ import java.io.BufferedWriter; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.LinkedHashSet; import java.util.Queue; import java.util.Set; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FileIndex { +public class PersistentIndex<T> { - public static final Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.data_transfer.FileIndex.class); + public static final Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.data_transfer.PersistentIndex.class); private Path workingFile; - private Set<Path> files = new LinkedHashSet<>(); + private Set<T> files = new LinkedHashSet<>(); - public FileIndex(Path workingFile) throws IOException { + private Function<String, T> fromString; + + public PersistentIndex(Path workingFile,Function<String, T> fromString) throws IOException { this.workingFile = workingFile; + this.fromString = fromString; loadFromFile(); } public synchronized void storeToFile() throws IOException { try (BufferedWriter bw = Files.newBufferedWriter(workingFile)) { - for (Path file : files) { + for (T file : files) { bw.write(file.toString() + "\n"); } } } - public synchronized boolean insert(Path file) { + public synchronized boolean insert(T file) { return files.add(file); } - public synchronized void uploaded(Path p) { + public synchronized void remove(T p) { files.remove(p); } - public synchronized void fillQueue(Queue<Path> toUpload) { + public synchronized void fillQueue(Queue<T> toUpload) { toUpload.addAll(files); } + + public synchronized void clear() throws IOException { + files.clear(); + storeToFile(); + } + + public synchronized boolean contains(Path file) { + return files.contains(file); + } private void loadFromFile() throws IOException { files.clear(); @@ -59,11 +71,8 @@ public class FileIndex { } private void processLine(String line) { - files.add(Paths.get(line)); + files.add(fromString.apply(line)); } - public void clear() throws IOException { - files.clear(); - storeToFile(); - } + } diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersitentSynchronizationProcess.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersitentSynchronizationProcess.java new file mode 100644 index 0000000000000000000000000000000000000000..6d9336153578471ce3cc3933e6665421f6ce6508 --- /dev/null +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/PersitentSynchronizationProcess.java @@ -0,0 +1,105 @@ +package cz.it4i.fiji.haas.data_transfer; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import cz.it4i.fiji.haas_java_client.HaaSFileTransfer; + +public abstract class PersitentSynchronizationProcess<T> { + + private boolean startFinished = true; + + public static final Logger log = LoggerFactory + .getLogger(cz.it4i.fiji.haas.data_transfer.PersitentSynchronizationProcess.class); + + private PersistentIndex<T> index; + + private Queue<T> toProcessQueue = new LinkedBlockingQueue<T>(); + + private SimpleThreadRunner runner; + + private Supplier<HaaSFileTransfer> fileTransferSupplier; + + private Runnable processFinishedNotifier; + + private String name; + + public PersitentSynchronizationProcess(String name,ExecutorService service, Supplier<HaaSFileTransfer> fileTransferSupplier, Runnable processFinishedNotifier, Path indexFile,Function<String,T> convertor) throws IOException { + runner = new SimpleThreadRunner(service); + this.name = name; + this.fileTransferSupplier = fileTransferSupplier; + this.processFinishedNotifier = processFinishedNotifier; + this.index = new PersistentIndex<>(indexFile, convertor); + } + + public synchronized void start() throws IOException { + startFinished = false; + index.clear(); + try{ + for (T item : getItems()) { + index.insert(item); + toProcessQueue.add(item); + } + runner.runIfNotRunning(this::doProcess); + } finally { + startFinished = true; + index.storeToFile(); + } + } + + + + public void stop() throws IOException { + toProcessQueue.clear(); + index.clear(); + } + + public void resume() { + index.fillQueue(toProcessQueue); + runner.runIfNotRunning(this::doProcess); + } + + abstract protected Iterable<T> getItems() throws IOException; + + abstract protected void processItem(HaaSFileTransfer tr, T p); + + private void doProcess(AtomicBoolean reRun) { + try(HaaSFileTransfer tr = fileTransferSupplier.get()) { + while (!toProcessQueue.isEmpty()) { + T p = toProcessQueue.poll(); + + log.info(name + "ing: " + p); + processItem(tr, p); + fileUploaded(p); + log.info(name + "ed: " + p); + reRun.set(false); + } + } finally { + synchronized (this) { + if (startFinished) { + processFinishedNotifier.run(); + } + } + } + } + + private void fileUploaded(T p) { + try { + index.remove(p); + index.storeToFile(); + } catch (IOException e) { + log.error(e.getMessage(), e); + } + } + + +} diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java index 30680c25424af2f615099275cc56488dbd1e7faf..dd0de6ac200c55cdfc646b067ee650f69e416736 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java @@ -4,13 +4,14 @@ import java.io.IOException; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Arrays; import java.util.Collection; -import java.util.Queue; +import java.util.Collections; +import java.util.LinkedList; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,104 +30,114 @@ public class Synchronization { private static final String FILE_INDEX_DOWNLOADED_FILENAME = ".downloaded"; - - private Supplier<HaaSFileTransfer> fileTransferSupplier; - private Path workingDirectory; - private Queue<Path> toUpload = new LinkedBlockingQueue<>(); - - private FileIndex filesToUpload; + private Function<String,Path> pathResolver = name -> workingDirectory.resolve(name); - private FileIndex filesToDownload; - - private FileIndex filesDownloaded; + private PersistentIndex<Path> filesDownloaded; + + private PersitentSynchronizationProcess<Path> uploadProcess; + + private P_PersistentDownloadProcess downloadProcess; - private SimpleThreadRunner runnerForUpload; - private boolean startUploadFinished; - - private Runnable uploadFinishedNotifier; public Synchronization(Supplier<HaaSFileTransfer> fileTransferSupplier, Path workingDirectory, - ExecutorService service, Runnable uploadFinishedNotifier ) throws IOException { - this.fileTransferSupplier = fileTransferSupplier; + ExecutorService service, Runnable uploadFinishedNotifier) throws IOException { + this.workingDirectory = workingDirectory; - this.filesToUpload = new FileIndex(workingDirectory.resolve(FILE_INDEX_TO_UPLOAD_FILENAME)); - this.filesToDownload = new FileIndex(workingDirectory.resolve(FILE_INDEX_TO_DOWNLOAD_FILENAME)); - this.filesDownloaded = new FileIndex(workingDirectory.resolve(FILE_INDEX_DOWNLOADED_FILENAME)); - this.runnerForUpload = new SimpleThreadRunner(service); - this.uploadFinishedNotifier = uploadFinishedNotifier; + this.filesDownloaded = new PersistentIndex<>(workingDirectory.resolve(FILE_INDEX_DOWNLOADED_FILENAME), + pathResolver); + this.uploadProcess = createUploadProcess(fileTransferSupplier, service, uploadFinishedNotifier); + this.downloadProcess = createDownloadProcess(fileTransferSupplier, service, uploadFinishedNotifier); } public synchronized void startUpload() throws IOException { - startUploadFinished = false; - filesToUpload.clear(); - try(DirectoryStream<Path> ds = Files.newDirectoryStream(workingDirectory,this::isNotHidden)) { - for (Path file : ds) { - filesToUpload.insert(file); - toUpload.add(file); - runnerForUpload.runIfNotRunning(this::doUpload); - } - } finally { - startUploadFinished = true; - filesToUpload.storeToFile(); - - } + uploadProcess.start(); } public void stopUpload() throws IOException { - toUpload.clear(); - filesToUpload.clear(); + uploadProcess.stop(); } public void resumeUpload() { - filesToUpload.fillQueue(toUpload); - if(!toUpload.isEmpty()) { - runnerForUpload.runIfNotRunning(this::doUpload); - } + uploadProcess.resume(); } public synchronized void startDownload(Collection<String> files) throws IOException { - filesToDownload.clear(); - + this.downloadProcess.setItems(files); + this.downloadProcess.start(); + } + + public synchronized void stopDownload() throws IOException { + this.downloadProcess.stop(); + } + + public synchronized void resumeDownload() { + this.downloadProcess.resume(); } - private boolean isNotHidden(Path file) { + private boolean canUpload(Path file) { - return !file.getFileName().toString().matches("[.][^.]+"); + return !file.getFileName().toString().matches("[.][^.]+") && !filesDownloaded.contains(file); } + + private PersitentSynchronizationProcess<Path> createUploadProcess(Supplier<HaaSFileTransfer> fileTransferSupplier, + ExecutorService service, Runnable uploadFinishedNotifier) throws IOException { + return new PersitentSynchronizationProcess<Path>("upload", service, fileTransferSupplier, uploadFinishedNotifier, + workingDirectory.resolve(FILE_INDEX_TO_UPLOAD_FILENAME), pathResolver) { + + @Override + protected Iterable<Path> getItems() throws IOException { + try(DirectoryStream<Path> ds = Files.newDirectoryStream(workingDirectory,Synchronization.this::canUpload)) { + return StreamSupport.stream(ds.spliterator(), false).collect(Collectors.toList()); + } + } - private void doUpload(AtomicBoolean reRun) { - try(HaaSFileTransfer tr = fileTransferSupplier.get()) { - while (!toUpload.isEmpty()) { - Path p = toUpload.poll(); - UploadingFile uf = createUploadingFile(p); - log.info("upload: " + p); + @Override + protected void processItem(HaaSFileTransfer tr, Path p) { + UploadingFile uf = new UploadingFileImpl(p); tr.upload(uf); - fileUploaded(p); - log.info("uploaded: " + p); - reRun.set(false); } - } finally { - synchronized (this) { - if (startUploadFinished) { - uploadFinishedNotifier.run(); - } - } - } + }; } - private void fileUploaded(Path p) { - try { - filesToUpload.uploaded(p); - filesToUpload.storeToFile(); - } catch (IOException e) { - log.error(e.getMessage(), e); - } + private P_PersistentDownloadProcess createDownloadProcess( + Supplier<HaaSFileTransfer> fileTransferSupplier, ExecutorService service, + Runnable uploadFinishedNotifier) throws IOException { + + return new P_PersistentDownloadProcess(service, fileTransferSupplier, uploadFinishedNotifier); } + + private class P_PersistentDownloadProcess extends PersitentSynchronizationProcess<String>{ + + private Collection<String> items = Collections.emptyList(); + + public P_PersistentDownloadProcess(ExecutorService service, Supplier<HaaSFileTransfer> fileTransferSupplier, + Runnable processFinishedNotifier) throws IOException { + super("download",service, fileTransferSupplier, processFinishedNotifier, + workingDirectory.resolve(FILE_INDEX_TO_DOWNLOAD_FILENAME), name -> name); + } - private UploadingFile createUploadingFile(Path p) { - return new UploadingFileImpl(p); + private synchronized void setItems(Collection<String> items) { + this.items = new LinkedList<>(items); + } + + @Override + protected synchronized Iterable<String> getItems() throws IOException { + return items; + } + + @Override + protected void processItem(HaaSFileTransfer tr, String file) { + tr.download(file, workingDirectory); + filesDownloaded.insert(workingDirectory.resolve(file)); + try { + filesDownloaded.storeToFile(); + } catch (IOException e) { + log.error(e.getMessage(), e); + } + } + } }