Skip to content
Snippets Groups Projects
Commit c9094e1f authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

feat: download

parent 48532e0c
No related branches found
No related tags found
No related merge requests found
......@@ -5,46 +5,58 @@ import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.LinkedHashSet;
import java.util.Queue;
import java.util.Set;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FileIndex {
public class PersistentIndex<T> {
public static final Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.data_transfer.FileIndex.class);
public static final Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.data_transfer.PersistentIndex.class);
private Path workingFile;
private Set<Path> files = new LinkedHashSet<>();
private Set<T> files = new LinkedHashSet<>();
public FileIndex(Path workingFile) throws IOException {
private Function<String, T> fromString;
public PersistentIndex(Path workingFile,Function<String, T> fromString) throws IOException {
this.workingFile = workingFile;
this.fromString = fromString;
loadFromFile();
}
public synchronized void storeToFile() throws IOException {
try (BufferedWriter bw = Files.newBufferedWriter(workingFile)) {
for (Path file : files) {
for (T file : files) {
bw.write(file.toString() + "\n");
}
}
}
public synchronized boolean insert(Path file) {
public synchronized boolean insert(T file) {
return files.add(file);
}
public synchronized void uploaded(Path p) {
public synchronized void remove(T p) {
files.remove(p);
}
public synchronized void fillQueue(Queue<Path> toUpload) {
public synchronized void fillQueue(Queue<T> toUpload) {
toUpload.addAll(files);
}
public synchronized void clear() throws IOException {
files.clear();
storeToFile();
}
public synchronized boolean contains(Path file) {
return files.contains(file);
}
private void loadFromFile() throws IOException {
files.clear();
......@@ -59,11 +71,8 @@ public class FileIndex {
}
private void processLine(String line) {
files.add(Paths.get(line));
files.add(fromString.apply(line));
}
public void clear() throws IOException {
files.clear();
storeToFile();
}
}
package cz.it4i.fiji.haas.data_transfer;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cz.it4i.fiji.haas_java_client.HaaSFileTransfer;
public abstract class PersitentSynchronizationProcess<T> {
private boolean startFinished = true;
public static final Logger log = LoggerFactory
.getLogger(cz.it4i.fiji.haas.data_transfer.PersitentSynchronizationProcess.class);
private PersistentIndex<T> index;
private Queue<T> toProcessQueue = new LinkedBlockingQueue<T>();
private SimpleThreadRunner runner;
private Supplier<HaaSFileTransfer> fileTransferSupplier;
private Runnable processFinishedNotifier;
private String name;
public PersitentSynchronizationProcess(String name,ExecutorService service, Supplier<HaaSFileTransfer> fileTransferSupplier, Runnable processFinishedNotifier, Path indexFile,Function<String,T> convertor) throws IOException {
runner = new SimpleThreadRunner(service);
this.name = name;
this.fileTransferSupplier = fileTransferSupplier;
this.processFinishedNotifier = processFinishedNotifier;
this.index = new PersistentIndex<>(indexFile, convertor);
}
public synchronized void start() throws IOException {
startFinished = false;
index.clear();
try{
for (T item : getItems()) {
index.insert(item);
toProcessQueue.add(item);
}
runner.runIfNotRunning(this::doProcess);
} finally {
startFinished = true;
index.storeToFile();
}
}
public void stop() throws IOException {
toProcessQueue.clear();
index.clear();
}
public void resume() {
index.fillQueue(toProcessQueue);
runner.runIfNotRunning(this::doProcess);
}
abstract protected Iterable<T> getItems() throws IOException;
abstract protected void processItem(HaaSFileTransfer tr, T p);
private void doProcess(AtomicBoolean reRun) {
try(HaaSFileTransfer tr = fileTransferSupplier.get()) {
while (!toProcessQueue.isEmpty()) {
T p = toProcessQueue.poll();
log.info(name + "ing: " + p);
processItem(tr, p);
fileUploaded(p);
log.info(name + "ed: " + p);
reRun.set(false);
}
} finally {
synchronized (this) {
if (startFinished) {
processFinishedNotifier.run();
}
}
}
}
private void fileUploaded(T p) {
try {
index.remove(p);
index.storeToFile();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
}
......@@ -4,13 +4,14 @@ import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Queue;
import java.util.Collections;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -29,104 +30,114 @@ public class Synchronization {
private static final String FILE_INDEX_DOWNLOADED_FILENAME = ".downloaded";
private Supplier<HaaSFileTransfer> fileTransferSupplier;
private Path workingDirectory;
private Queue<Path> toUpload = new LinkedBlockingQueue<>();
private FileIndex filesToUpload;
private Function<String,Path> pathResolver = name -> workingDirectory.resolve(name);
private FileIndex filesToDownload;
private FileIndex filesDownloaded;
private PersistentIndex<Path> filesDownloaded;
private PersitentSynchronizationProcess<Path> uploadProcess;
private P_PersistentDownloadProcess downloadProcess;
private SimpleThreadRunner runnerForUpload;
private boolean startUploadFinished;
private Runnable uploadFinishedNotifier;
public Synchronization(Supplier<HaaSFileTransfer> fileTransferSupplier, Path workingDirectory,
ExecutorService service, Runnable uploadFinishedNotifier ) throws IOException {
this.fileTransferSupplier = fileTransferSupplier;
ExecutorService service, Runnable uploadFinishedNotifier) throws IOException {
this.workingDirectory = workingDirectory;
this.filesToUpload = new FileIndex(workingDirectory.resolve(FILE_INDEX_TO_UPLOAD_FILENAME));
this.filesToDownload = new FileIndex(workingDirectory.resolve(FILE_INDEX_TO_DOWNLOAD_FILENAME));
this.filesDownloaded = new FileIndex(workingDirectory.resolve(FILE_INDEX_DOWNLOADED_FILENAME));
this.runnerForUpload = new SimpleThreadRunner(service);
this.uploadFinishedNotifier = uploadFinishedNotifier;
this.filesDownloaded = new PersistentIndex<>(workingDirectory.resolve(FILE_INDEX_DOWNLOADED_FILENAME),
pathResolver);
this.uploadProcess = createUploadProcess(fileTransferSupplier, service, uploadFinishedNotifier);
this.downloadProcess = createDownloadProcess(fileTransferSupplier, service, uploadFinishedNotifier);
}
public synchronized void startUpload() throws IOException {
startUploadFinished = false;
filesToUpload.clear();
try(DirectoryStream<Path> ds = Files.newDirectoryStream(workingDirectory,this::isNotHidden)) {
for (Path file : ds) {
filesToUpload.insert(file);
toUpload.add(file);
runnerForUpload.runIfNotRunning(this::doUpload);
}
} finally {
startUploadFinished = true;
filesToUpload.storeToFile();
}
uploadProcess.start();
}
public void stopUpload() throws IOException {
toUpload.clear();
filesToUpload.clear();
uploadProcess.stop();
}
public void resumeUpload() {
filesToUpload.fillQueue(toUpload);
if(!toUpload.isEmpty()) {
runnerForUpload.runIfNotRunning(this::doUpload);
}
uploadProcess.resume();
}
public synchronized void startDownload(Collection<String> files) throws IOException {
filesToDownload.clear();
this.downloadProcess.setItems(files);
this.downloadProcess.start();
}
public synchronized void stopDownload() throws IOException {
this.downloadProcess.stop();
}
public synchronized void resumeDownload() {
this.downloadProcess.resume();
}
private boolean isNotHidden(Path file) {
private boolean canUpload(Path file) {
return !file.getFileName().toString().matches("[.][^.]+");
return !file.getFileName().toString().matches("[.][^.]+") && !filesDownloaded.contains(file);
}
private PersitentSynchronizationProcess<Path> createUploadProcess(Supplier<HaaSFileTransfer> fileTransferSupplier,
ExecutorService service, Runnable uploadFinishedNotifier) throws IOException {
return new PersitentSynchronizationProcess<Path>("upload", service, fileTransferSupplier, uploadFinishedNotifier,
workingDirectory.resolve(FILE_INDEX_TO_UPLOAD_FILENAME), pathResolver) {
@Override
protected Iterable<Path> getItems() throws IOException {
try(DirectoryStream<Path> ds = Files.newDirectoryStream(workingDirectory,Synchronization.this::canUpload)) {
return StreamSupport.stream(ds.spliterator(), false).collect(Collectors.toList());
}
}
private void doUpload(AtomicBoolean reRun) {
try(HaaSFileTransfer tr = fileTransferSupplier.get()) {
while (!toUpload.isEmpty()) {
Path p = toUpload.poll();
UploadingFile uf = createUploadingFile(p);
log.info("upload: " + p);
@Override
protected void processItem(HaaSFileTransfer tr, Path p) {
UploadingFile uf = new UploadingFileImpl(p);
tr.upload(uf);
fileUploaded(p);
log.info("uploaded: " + p);
reRun.set(false);
}
} finally {
synchronized (this) {
if (startUploadFinished) {
uploadFinishedNotifier.run();
}
}
}
};
}
private void fileUploaded(Path p) {
try {
filesToUpload.uploaded(p);
filesToUpload.storeToFile();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
private P_PersistentDownloadProcess createDownloadProcess(
Supplier<HaaSFileTransfer> fileTransferSupplier, ExecutorService service,
Runnable uploadFinishedNotifier) throws IOException {
return new P_PersistentDownloadProcess(service, fileTransferSupplier, uploadFinishedNotifier);
}
private class P_PersistentDownloadProcess extends PersitentSynchronizationProcess<String>{
private Collection<String> items = Collections.emptyList();
public P_PersistentDownloadProcess(ExecutorService service, Supplier<HaaSFileTransfer> fileTransferSupplier,
Runnable processFinishedNotifier) throws IOException {
super("download",service, fileTransferSupplier, processFinishedNotifier,
workingDirectory.resolve(FILE_INDEX_TO_DOWNLOAD_FILENAME), name -> name);
}
private UploadingFile createUploadingFile(Path p) {
return new UploadingFileImpl(p);
private synchronized void setItems(Collection<String> items) {
this.items = new LinkedList<>(items);
}
@Override
protected synchronized Iterable<String> getItems() throws IOException {
return items;
}
@Override
protected void processItem(HaaSFileTransfer tr, String file) {
tr.download(file, workingDirectory);
filesDownloaded.insert(workingDirectory.resolve(file));
try {
filesDownloaded.storeToFile();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment