diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java index 41c3b33f1f3677d357a08acef27d6f5cc847696a..162a63a8e97c4b1338395280fec18f3b026efd74 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java @@ -4,21 +4,23 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Arrays; import java.util.Calendar; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.concurrent.Executors; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cz.it4i.fiji.haas.JobManager.JobManager4Job; import cz.it4i.fiji.haas.JobManager.JobSynchronizableFile; +import cz.it4i.fiji.haas.data_transfer.Synchronization; import cz.it4i.fiji.haas_java_client.DummyProgressNotifier; import cz.it4i.fiji.haas_java_client.HaaSClient; import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile; @@ -31,6 +33,8 @@ import net.imagej.updater.util.Progress; public class Job { private static final String JOB_NAME = "job.name"; + + private static final String JOB_NEEDS_UPLOAD = "job.needs_upload"; public static boolean isJobPath(Path p) { return isValidPath(p); @@ -45,45 +49,67 @@ public class Job { private Supplier<HaaSClient> haasClientSupplier; // private JobState state; - //private Boolean needsDownload; + // private Boolean needsDownload; private JobInfo jobInfo; private Long jobId; - - private PropertyHolder propertyHolder; - private JobManager4Job jobManager; + private Synchronization synchronization; - public Job(JobManager4Job jobManager, String name, Path basePath, Supplier<HaaSClient> haasClientSupplier) throws IOException { - this(jobManager,haasClientSupplier); + public Job(JobManager4Job jobManager, String name, Path basePath, Supplier<HaaSClient> haasClientSupplier) + throws IOException { + this(jobManager, haasClientSupplier); HaaSClient client = getHaaSClient(); long id = client.createJob(name, Collections.emptyList()); jobDir = basePath.resolve("" + id); propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILE)); Files.createDirectory(jobDir); setName(name); - - } - public void setName(String name) { - setProperty(JOB_NAME, name); } - public Job(JobManager4Job jobManager,Path p, Supplier<HaaSClient> haasClientSupplier) { + public Job(JobManager4Job jobManager, Path jobDirectory, Supplier<HaaSClient> haasClientSupplier) { this(jobManager, haasClientSupplier); - jobDir = p; + jobDir = jobDirectory; propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILE)); + resumeSynchronization(); } - public void uploadFiles(Iterable<UploadingFile> files, Progress notifier) { - HaaSClient client = getHaaSClient(); - try(HaaSFileTransfer transfer = client.startFileTransfer(getId(), new P_ProgressNotifierAdapter(notifier))){ - transfer.upload(files); + private Job(JobManager4Job jobManager, Supplier<HaaSClient> haasClientSupplier) { + this.haasClientSupplier = haasClientSupplier; + this.jobManager = jobManager; + try { + this.synchronization = new Synchronization(()->haasClientSupplier.get().startFileTransfer(getId(), new DummyProgressNotifier()), jobDir, Executors.newFixedThreadPool(2), ()-> { + setProperty(JOB_NEEDS_UPLOAD, false); + }); + } catch (IOException e) { + log.error(e.getMessage(), e); + throw new RuntimeException(e); } } - public void uploadFilesByName(Iterable<String> files, Progress notifier) { - Iterable<UploadingFile> uploadingFiles = StreamSupport.stream(files.spliterator(), false) + public void startUploadData() { + setProperty(JOB_INFO_FILE, true); + try { + this.synchronization.startUpload(); + } catch (IOException e) { + log.error(e.getMessage(), e); + throw new RuntimeException(e); + } + } + + public void stopUploadData() { + setProperty(JOB_INFO_FILE, false); + try { + this.synchronization.stopUpload(); + } catch (IOException e) { + log.error(e.getMessage(), e); + throw new RuntimeException(e); + } + } + + public void uploadFile(String file, Progress notifier) { + Iterable<UploadingFile> uploadingFiles = Arrays.asList(file).stream() .map((String name) -> HaaSClient.getUploadingFile(jobDir.resolve(name))).collect(Collectors.toList()); uploadFiles(uploadingFiles, notifier); } @@ -93,13 +119,6 @@ public class Job { client.submitJob(jobId); } - private Job(JobManager4Job jobManager, Supplier<HaaSClient> haasClientSupplier) { - this.haasClientSupplier = haasClientSupplier; - this.jobManager = jobManager; - } - - - synchronized public long getId() { if (jobId == null) { jobId = getJobId(jobDir); @@ -120,8 +139,11 @@ public class Job { } synchronized public void download(Predicate<String> predicate, Progress notifier) { - try (HaaSFileTransfer fileTransfer = getHaaSClient().startFileTransfer(jobId, new P_ProgressNotifierAdapter(notifier))) { - fileTransfer.download(getHaaSClient().getChangedFiles(jobId).stream().filter(predicate).collect(Collectors.toList()), jobDir); + try (HaaSFileTransfer fileTransfer = getHaaSClient().startFileTransfer(jobId, + new P_ProgressNotifierAdapter(notifier))) { + fileTransfer.download( + getHaaSClient().getChangedFiles(jobId).stream().filter(predicate).collect(Collectors.toList()), + jobDir); } } @@ -153,10 +175,14 @@ public class Job { return Files.newInputStream(jobDir.resolve(name)); } - public void setProperty(String name, String value) { + public synchronized void setProperty(String name, String value) { propertyHolder.setValue(name, value); } + public void setProperty(String jobNeedsUpload, boolean b) { + propertyHolder.setValue(jobNeedsUpload, "" + b); + } + public String getProperty(String name) { return propertyHolder.getValue(name); } @@ -168,24 +194,41 @@ public class Job { public Path getDirectory() { return jobDir; } - + public boolean remove() { boolean result; - if((result = jobManager.remove(this)) && Files.isDirectory(jobDir) ) { + if ((result = jobManager.remove(this)) && Files.isDirectory(jobDir)) { List<Path> pathsToDelete; try { pathsToDelete = Files.walk(jobDir).sorted(Comparator.reverseOrder()).collect(Collectors.toList()); - for(Path path : pathsToDelete) { - Files.deleteIfExists(path); + for (Path path : pathsToDelete) { + Files.deleteIfExists(path); } } catch (IOException e) { log.error(e.getMessage(), e); } - + } return result; } - + + private synchronized void resumeSynchronization() { + if(Boolean.parseBoolean(getProperty(JOB_NEEDS_UPLOAD))) { + synchronization.resumeUpload(); + } + } + + private void uploadFiles(Iterable<UploadingFile> files, Progress notifier) { + HaaSClient client = getHaaSClient(); + try (HaaSFileTransfer transfer = client.startFileTransfer(getId(), new P_ProgressNotifierAdapter(notifier))) { + transfer.upload(files); + } + } + + private void setName(String name) { + setProperty(JOB_NAME, name); + } + private HaaSClient getHaaSClient() { return this.haasClientSupplier.get(); } @@ -257,7 +300,7 @@ public class Job { } public List<Long> getFileSizes(List<String> names) { - + try (HaaSFileTransfer transfer = getHaaSClient().startFileTransfer(getId(), new DummyProgressNotifier())) { return transfer.obtainSize(names); } @@ -269,8 +312,4 @@ public class Job { } } - - - - } diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java index 0c627041ded87e945909f82191b9dc3aaf8eeeeb..77116f511670dace631b3456f271921bf16b6c09 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java @@ -12,7 +12,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cz.it4i.fiji.haas_java_client.HaaSClient; -import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile; import cz.it4i.fiji.haas_java_client.JobState; import cz.it4i.fiji.haas_java_client.Settings; import cz.it4i.fiji.haas_java_client.SynchronizableFileType; @@ -57,12 +56,6 @@ public class JobManager { return job; } - public Job startJob(Iterable<UploadingFile> files, Progress notifier) throws IOException { - Job result = createJob(); - result.uploadFiles(files, notifier); - result.submit(); - return result; - } public Collection<Job> getJobs() { if (jobs == null) { diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/FileRepository.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/FileIndex.java similarity index 60% rename from haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/FileRepository.java rename to haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/FileIndex.java index 59ed59d8c255b624326f12e1068318d03ae22afe..40a1bc4a025e588a1e18a5d7fdbba8e43d6bace0 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/FileRepository.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/FileIndex.java @@ -7,51 +7,57 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.LinkedHashSet; +import java.util.Queue; import java.util.Set; -public class FileRepository { +public class FileIndex { private Path workingFile; - + private Set<Path> files = new LinkedHashSet<>(); - public FileRepository(Path workingFile) throws IOException { + public FileIndex(Path workingFile) throws IOException { this.workingFile = workingFile; loadFromFile(); } - + public synchronized void storeToFile() throws IOException { - try(BufferedWriter bw = Files.newBufferedWriter(workingFile)) { - for(Path file: files) { + try (BufferedWriter bw = Files.newBufferedWriter(workingFile)) { + for (Path file : files) { bw.write(file.toString() + "\n"); } } } - public synchronized boolean needsDownload(Path file) { - return files.contains(file); + return files.add(file); } public synchronized void uploaded(Path p) { - files.add(p); - + files.remove(p); + + } + + public synchronized void fileUploadQueue(Queue<Path> toUpload) { + toUpload.addAll(files); } - private void loadFromFile() throws IOException { files.clear(); - try(BufferedReader br = Files.newBufferedReader(workingFile)) { + try (BufferedReader br = Files.newBufferedReader(workingFile)) { String line; - while(null != (line = br.readLine())) { + while (null != (line = br.readLine())) { processLine(line); } } } - - private void processLine(String line) { files.add(Paths.get(line)); } + + public void clear() throws IOException { + files.clear(); + storeToFile(); + } } diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/SimpleThreadRunner.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/SimpleThreadRunner.java new file mode 100644 index 0000000000000000000000000000000000000000..4a8237bac110f78b02120388162e2a202ea0b25d --- /dev/null +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/SimpleThreadRunner.java @@ -0,0 +1,28 @@ +package cz.it4i.fiji.haas.data_transfer; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +public class SimpleThreadRunner { + private ExecutorService service; + private AtomicBoolean reRun = new AtomicBoolean(false); + + public SimpleThreadRunner(ExecutorService service) { + this.service = service; + } + + public void runIfNotRunning(Consumer<AtomicBoolean> r) { + synchronized (this) { + if (reRun.get()) { + return; + } + reRun.set(true); + } + service.execute(() -> { + do { + r.accept(reRun); + } while (reRun.get()); + }); + } +} diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java index bc4c971ac05fccd2c14a2b4a37b6a3c6e4d169fd..5a0464fff16b7c120acb279593f3a14cf2819dc4 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/Synchronization.java @@ -8,6 +8,7 @@ import java.util.Arrays; import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import org.slf4j.Logger; @@ -26,24 +27,50 @@ public class Synchronization { private Queue<Path> toUpload = new LinkedBlockingQueue<>(); - private ExecutorService service; + private FileIndex fileRepository; - private FileRepository fileRepository; + private SimpleThreadRunner runnerForUpload; + private boolean startUploadFinished; + + private Runnable uploadFinishedNotifier; - void upload() throws IOException { + public Synchronization(Supplier<HaaSFileTransfer> fileTransferSupplier, Path workingDirectory, + ExecutorService service, Runnable uploadFinishedNotifier ) throws IOException { + this.fileTransferSupplier = fileTransferSupplier; + this.workingDirectory = workingDirectory; + this.fileRepository = new FileIndex(workingDirectory); + this.runnerForUpload = new SimpleThreadRunner(service); + this.uploadFinishedNotifier = uploadFinishedNotifier; + } + + public synchronized void startUpload() throws IOException { + startUploadFinished = false; + fileRepository.clear(); try(DirectoryStream<Path> ds = Files.newDirectoryStream(workingDirectory,this::isNotHidden)) { for (Path file : ds) { - if(needsUpload(file)) { - toUpload.add(file); - service.execute(this::doRun); - } + fileRepository.needsDownload(file); + toUpload.add(file); + runnerForUpload.runIfNotRunning(this::doUpload); } + } finally { + startUploadFinished = true; + fileRepository.storeToFile(); + } + } - private boolean needsUpload(Path file) { - return fileRepository.needsDownload(file); + public void stopUpload() throws IOException { + toUpload.clear(); + fileRepository.clear(); + } + + public void resumeUpload() { + fileRepository.fileUploadQueue(toUpload); + if(!toUpload.isEmpty()) { + runnerForUpload.runIfNotRunning(this::doUpload); + } } private boolean isNotHidden(Path file) { @@ -51,17 +78,23 @@ public class Synchronization { return !file.getFileName().toString().matches("[.][^.]+"); } - private void doRun() { + private void doUpload(AtomicBoolean reRun) { try(HaaSFileTransfer tr = fileTransferSupplier.get()) { while (!toUpload.isEmpty()) { Path p = toUpload.poll(); UploadingFile uf = createUploadingFile(p); tr.upload(Arrays.asList(uf)); fileUploaded(p); + reRun.set(false); } } finally { try { fileRepository.storeToFile(); + synchronized(this) { + if(startUploadFinished) { + uploadFinishedNotifier.run(); + } + } } catch (IOException e) { log.error(e.getMessage(), e); } @@ -73,7 +106,6 @@ public class Synchronization { } private UploadingFile createUploadingFile(Path p) { - - return null; + return new UploadingFileImpl(p); } } diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/UploadingFileImpl.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/UploadingFileImpl.java index 0cd502892c413af060844af2e7a52918073658db..ac3a403a66992187c64d1e6b2ff7b14e2518adc1 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/UploadingFileImpl.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/data_transfer/UploadingFileImpl.java @@ -5,12 +5,17 @@ import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile; -public class UploadingFileImpl implements UploadingFile{ +public class UploadingFileImpl implements UploadingFile { + + public static final Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.data_transfer.UploadingFileImpl.class); private final Path path; - + public UploadingFileImpl(Path path) { this.path = path; } @@ -32,8 +37,12 @@ public class UploadingFileImpl implements UploadingFile{ @Override public long getLastTime() { - // TODO Auto-generated method stub - return 0; + try { + return Files.getLastModifiedTime(path).toMillis(); + } catch (IOException e) { + log.error(e.getMessage(), e); + return 0; + } } } diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java index 1a90371e33e179e9c2f474d733d5c9cd94d3adcb..e6f4dcfe4031bf3456d5c2160ba237d9b898e796 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java @@ -74,7 +74,7 @@ public class BenchmarkJobManager { } public synchronized void startJob(Progress progress) throws IOException { - job.uploadFilesByName(Arrays.asList(Constants.CONFIG_YAML), progress); + job.uploadFile(Constants.CONFIG_YAML, progress); String outputName = getOutputName(job.openLocalFile(Constants.CONFIG_YAML)); verifiedState = null; verifiedStateProcessed = false; @@ -109,10 +109,10 @@ public class BenchmarkJobManager { verifiedStateProcessed = true; return CompletableFuture.supplyAsync(() -> { try { - verifiedState = - Stream.concat(Arrays.asList(state).stream(), getTasks().stream().filter(task->!task.getDescription().equals(Constants.DONE_TASK)) + verifiedState = Stream.concat(Arrays.asList(state).stream(), + getTasks().stream().filter(task -> !task.getDescription().equals(Constants.DONE_TASK)) .flatMap(task -> task.getComputations().stream()).map(tc -> tc.getState())) - .max(new JobStateComparator()).get(); + .max(new JobStateComparator()).get(); if (verifiedState != JobState.Finished && verifiedState != JobState.Canceled) { verifiedState = JobState.Failed; }