Skip to content
Snippets Groups Projects
Commit 5b488416 authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

feature: services for synchronization

parent 4012f516
No related branches found
No related tags found
No related merge requests found
......@@ -4,21 +4,23 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cz.it4i.fiji.haas.JobManager.JobManager4Job;
import cz.it4i.fiji.haas.JobManager.JobSynchronizableFile;
import cz.it4i.fiji.haas.data_transfer.Synchronization;
import cz.it4i.fiji.haas_java_client.DummyProgressNotifier;
import cz.it4i.fiji.haas_java_client.HaaSClient;
import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile;
......@@ -31,6 +33,8 @@ import net.imagej.updater.util.Progress;
public class Job {
private static final String JOB_NAME = "job.name";
private static final String JOB_NEEDS_UPLOAD = "job.needs_upload";
public static boolean isJobPath(Path p) {
return isValidPath(p);
......@@ -45,45 +49,67 @@ public class Job {
private Supplier<HaaSClient> haasClientSupplier;
// private JobState state;
//private Boolean needsDownload;
// private Boolean needsDownload;
private JobInfo jobInfo;
private Long jobId;
private PropertyHolder propertyHolder;
private JobManager4Job jobManager;
private Synchronization synchronization;
public Job(JobManager4Job jobManager, String name, Path basePath, Supplier<HaaSClient> haasClientSupplier) throws IOException {
this(jobManager,haasClientSupplier);
public Job(JobManager4Job jobManager, String name, Path basePath, Supplier<HaaSClient> haasClientSupplier)
throws IOException {
this(jobManager, haasClientSupplier);
HaaSClient client = getHaaSClient();
long id = client.createJob(name, Collections.emptyList());
jobDir = basePath.resolve("" + id);
propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILE));
Files.createDirectory(jobDir);
setName(name);
}
public void setName(String name) {
setProperty(JOB_NAME, name);
}
public Job(JobManager4Job jobManager,Path p, Supplier<HaaSClient> haasClientSupplier) {
public Job(JobManager4Job jobManager, Path jobDirectory, Supplier<HaaSClient> haasClientSupplier) {
this(jobManager, haasClientSupplier);
jobDir = p;
jobDir = jobDirectory;
propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILE));
resumeSynchronization();
}
public void uploadFiles(Iterable<UploadingFile> files, Progress notifier) {
HaaSClient client = getHaaSClient();
try(HaaSFileTransfer transfer = client.startFileTransfer(getId(), new P_ProgressNotifierAdapter(notifier))){
transfer.upload(files);
private Job(JobManager4Job jobManager, Supplier<HaaSClient> haasClientSupplier) {
this.haasClientSupplier = haasClientSupplier;
this.jobManager = jobManager;
try {
this.synchronization = new Synchronization(()->haasClientSupplier.get().startFileTransfer(getId(), new DummyProgressNotifier()), jobDir, Executors.newFixedThreadPool(2), ()-> {
setProperty(JOB_NEEDS_UPLOAD, false);
});
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
public void uploadFilesByName(Iterable<String> files, Progress notifier) {
Iterable<UploadingFile> uploadingFiles = StreamSupport.stream(files.spliterator(), false)
public void startUploadData() {
setProperty(JOB_INFO_FILE, true);
try {
this.synchronization.startUpload();
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
public void stopUploadData() {
setProperty(JOB_INFO_FILE, false);
try {
this.synchronization.stopUpload();
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
public void uploadFile(String file, Progress notifier) {
Iterable<UploadingFile> uploadingFiles = Arrays.asList(file).stream()
.map((String name) -> HaaSClient.getUploadingFile(jobDir.resolve(name))).collect(Collectors.toList());
uploadFiles(uploadingFiles, notifier);
}
......@@ -93,13 +119,6 @@ public class Job {
client.submitJob(jobId);
}
private Job(JobManager4Job jobManager, Supplier<HaaSClient> haasClientSupplier) {
this.haasClientSupplier = haasClientSupplier;
this.jobManager = jobManager;
}
synchronized public long getId() {
if (jobId == null) {
jobId = getJobId(jobDir);
......@@ -120,8 +139,11 @@ public class Job {
}
synchronized public void download(Predicate<String> predicate, Progress notifier) {
try (HaaSFileTransfer fileTransfer = getHaaSClient().startFileTransfer(jobId, new P_ProgressNotifierAdapter(notifier))) {
fileTransfer.download(getHaaSClient().getChangedFiles(jobId).stream().filter(predicate).collect(Collectors.toList()), jobDir);
try (HaaSFileTransfer fileTransfer = getHaaSClient().startFileTransfer(jobId,
new P_ProgressNotifierAdapter(notifier))) {
fileTransfer.download(
getHaaSClient().getChangedFiles(jobId).stream().filter(predicate).collect(Collectors.toList()),
jobDir);
}
}
......@@ -153,10 +175,14 @@ public class Job {
return Files.newInputStream(jobDir.resolve(name));
}
public void setProperty(String name, String value) {
public synchronized void setProperty(String name, String value) {
propertyHolder.setValue(name, value);
}
public void setProperty(String jobNeedsUpload, boolean b) {
propertyHolder.setValue(jobNeedsUpload, "" + b);
}
public String getProperty(String name) {
return propertyHolder.getValue(name);
}
......@@ -168,24 +194,41 @@ public class Job {
public Path getDirectory() {
return jobDir;
}
public boolean remove() {
boolean result;
if((result = jobManager.remove(this)) && Files.isDirectory(jobDir) ) {
if ((result = jobManager.remove(this)) && Files.isDirectory(jobDir)) {
List<Path> pathsToDelete;
try {
pathsToDelete = Files.walk(jobDir).sorted(Comparator.reverseOrder()).collect(Collectors.toList());
for(Path path : pathsToDelete) {
Files.deleteIfExists(path);
for (Path path : pathsToDelete) {
Files.deleteIfExists(path);
}
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
return result;
}
private synchronized void resumeSynchronization() {
if(Boolean.parseBoolean(getProperty(JOB_NEEDS_UPLOAD))) {
synchronization.resumeUpload();
}
}
private void uploadFiles(Iterable<UploadingFile> files, Progress notifier) {
HaaSClient client = getHaaSClient();
try (HaaSFileTransfer transfer = client.startFileTransfer(getId(), new P_ProgressNotifierAdapter(notifier))) {
transfer.upload(files);
}
}
private void setName(String name) {
setProperty(JOB_NAME, name);
}
private HaaSClient getHaaSClient() {
return this.haasClientSupplier.get();
}
......@@ -257,7 +300,7 @@ public class Job {
}
public List<Long> getFileSizes(List<String> names) {
try (HaaSFileTransfer transfer = getHaaSClient().startFileTransfer(getId(), new DummyProgressNotifier())) {
return transfer.obtainSize(names);
}
......@@ -269,8 +312,4 @@ public class Job {
}
}
}
......@@ -12,7 +12,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cz.it4i.fiji.haas_java_client.HaaSClient;
import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile;
import cz.it4i.fiji.haas_java_client.JobState;
import cz.it4i.fiji.haas_java_client.Settings;
import cz.it4i.fiji.haas_java_client.SynchronizableFileType;
......@@ -57,12 +56,6 @@ public class JobManager {
return job;
}
public Job startJob(Iterable<UploadingFile> files, Progress notifier) throws IOException {
Job result = createJob();
result.uploadFiles(files, notifier);
result.submit();
return result;
}
public Collection<Job> getJobs() {
if (jobs == null) {
......
......@@ -7,51 +7,57 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.LinkedHashSet;
import java.util.Queue;
import java.util.Set;
public class FileRepository {
public class FileIndex {
private Path workingFile;
private Set<Path> files = new LinkedHashSet<>();
public FileRepository(Path workingFile) throws IOException {
public FileIndex(Path workingFile) throws IOException {
this.workingFile = workingFile;
loadFromFile();
}
public synchronized void storeToFile() throws IOException {
try(BufferedWriter bw = Files.newBufferedWriter(workingFile)) {
for(Path file: files) {
try (BufferedWriter bw = Files.newBufferedWriter(workingFile)) {
for (Path file : files) {
bw.write(file.toString() + "\n");
}
}
}
public synchronized boolean needsDownload(Path file) {
return files.contains(file);
return files.add(file);
}
public synchronized void uploaded(Path p) {
files.add(p);
files.remove(p);
}
public synchronized void fileUploadQueue(Queue<Path> toUpload) {
toUpload.addAll(files);
}
private void loadFromFile() throws IOException {
files.clear();
try(BufferedReader br = Files.newBufferedReader(workingFile)) {
try (BufferedReader br = Files.newBufferedReader(workingFile)) {
String line;
while(null != (line = br.readLine())) {
while (null != (line = br.readLine())) {
processLine(line);
}
}
}
private void processLine(String line) {
files.add(Paths.get(line));
}
public void clear() throws IOException {
files.clear();
storeToFile();
}
}
package cz.it4i.fiji.haas.data_transfer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
public class SimpleThreadRunner {
private ExecutorService service;
private AtomicBoolean reRun = new AtomicBoolean(false);
public SimpleThreadRunner(ExecutorService service) {
this.service = service;
}
public void runIfNotRunning(Consumer<AtomicBoolean> r) {
synchronized (this) {
if (reRun.get()) {
return;
}
reRun.set(true);
}
service.execute(() -> {
do {
r.accept(reRun);
} while (reRun.get());
});
}
}
......@@ -8,6 +8,7 @@ import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.slf4j.Logger;
......@@ -26,24 +27,50 @@ public class Synchronization {
private Queue<Path> toUpload = new LinkedBlockingQueue<>();
private ExecutorService service;
private FileIndex fileRepository;
private FileRepository fileRepository;
private SimpleThreadRunner runnerForUpload;
private boolean startUploadFinished;
private Runnable uploadFinishedNotifier;
void upload() throws IOException {
public Synchronization(Supplier<HaaSFileTransfer> fileTransferSupplier, Path workingDirectory,
ExecutorService service, Runnable uploadFinishedNotifier ) throws IOException {
this.fileTransferSupplier = fileTransferSupplier;
this.workingDirectory = workingDirectory;
this.fileRepository = new FileIndex(workingDirectory);
this.runnerForUpload = new SimpleThreadRunner(service);
this.uploadFinishedNotifier = uploadFinishedNotifier;
}
public synchronized void startUpload() throws IOException {
startUploadFinished = false;
fileRepository.clear();
try(DirectoryStream<Path> ds = Files.newDirectoryStream(workingDirectory,this::isNotHidden)) {
for (Path file : ds) {
if(needsUpload(file)) {
toUpload.add(file);
service.execute(this::doRun);
}
fileRepository.needsDownload(file);
toUpload.add(file);
runnerForUpload.runIfNotRunning(this::doUpload);
}
} finally {
startUploadFinished = true;
fileRepository.storeToFile();
}
}
private boolean needsUpload(Path file) {
return fileRepository.needsDownload(file);
public void stopUpload() throws IOException {
toUpload.clear();
fileRepository.clear();
}
public void resumeUpload() {
fileRepository.fileUploadQueue(toUpload);
if(!toUpload.isEmpty()) {
runnerForUpload.runIfNotRunning(this::doUpload);
}
}
private boolean isNotHidden(Path file) {
......@@ -51,17 +78,23 @@ public class Synchronization {
return !file.getFileName().toString().matches("[.][^.]+");
}
private void doRun() {
private void doUpload(AtomicBoolean reRun) {
try(HaaSFileTransfer tr = fileTransferSupplier.get()) {
while (!toUpload.isEmpty()) {
Path p = toUpload.poll();
UploadingFile uf = createUploadingFile(p);
tr.upload(Arrays.asList(uf));
fileUploaded(p);
reRun.set(false);
}
} finally {
try {
fileRepository.storeToFile();
synchronized(this) {
if(startUploadFinished) {
uploadFinishedNotifier.run();
}
}
} catch (IOException e) {
log.error(e.getMessage(), e);
}
......@@ -73,7 +106,6 @@ public class Synchronization {
}
private UploadingFile createUploadingFile(Path p) {
return null;
return new UploadingFileImpl(p);
}
}
......@@ -5,12 +5,17 @@ import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile;
public class UploadingFileImpl implements UploadingFile{
public class UploadingFileImpl implements UploadingFile {
public static final Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.data_transfer.UploadingFileImpl.class);
private final Path path;
public UploadingFileImpl(Path path) {
this.path = path;
}
......@@ -32,8 +37,12 @@ public class UploadingFileImpl implements UploadingFile{
@Override
public long getLastTime() {
// TODO Auto-generated method stub
return 0;
try {
return Files.getLastModifiedTime(path).toMillis();
} catch (IOException e) {
log.error(e.getMessage(), e);
return 0;
}
}
}
......@@ -74,7 +74,7 @@ public class BenchmarkJobManager {
}
public synchronized void startJob(Progress progress) throws IOException {
job.uploadFilesByName(Arrays.asList(Constants.CONFIG_YAML), progress);
job.uploadFile(Constants.CONFIG_YAML, progress);
String outputName = getOutputName(job.openLocalFile(Constants.CONFIG_YAML));
verifiedState = null;
verifiedStateProcessed = false;
......@@ -109,10 +109,10 @@ public class BenchmarkJobManager {
verifiedStateProcessed = true;
return CompletableFuture.supplyAsync(() -> {
try {
verifiedState =
Stream.concat(Arrays.asList(state).stream(), getTasks().stream().filter(task->!task.getDescription().equals(Constants.DONE_TASK))
verifiedState = Stream.concat(Arrays.asList(state).stream(),
getTasks().stream().filter(task -> !task.getDescription().equals(Constants.DONE_TASK))
.flatMap(task -> task.getComputations().stream()).map(tc -> tc.getState()))
.max(new JobStateComparator()).get();
.max(new JobStateComparator()).get();
if (verifiedState != JobState.Finished && verifiedState != JobState.Canceled) {
verifiedState = JobState.Failed;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment