Skip to content
Snippets Groups Projects
Commit aef81b57 authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

feat: support of upload in UI

parent 5b488416
Branches
Tags
No related merge requests found
......@@ -35,13 +35,15 @@ public class Job {
private static final String JOB_NAME = "job.name";
private static final String JOB_NEEDS_UPLOAD = "job.needs_upload";
private static final String JOB_INFO_FILENAME = ".jobinfo";
public static boolean isJobPath(Path p) {
return isValidPath(p);
}
private static String JOB_INFO_FILE = ".jobinfo";
private static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.Job.class);
private Path jobDir;
......@@ -61,8 +63,8 @@ public class Job {
this(jobManager, haasClientSupplier);
HaaSClient client = getHaaSClient();
long id = client.createJob(name, Collections.emptyList());
jobDir = basePath.resolve("" + id);
propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILE));
setJobDirectory(basePath.resolve("" + id));
propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILENAME));
Files.createDirectory(jobDir);
setName(name);
......@@ -70,26 +72,20 @@ public class Job {
public Job(JobManager4Job jobManager, Path jobDirectory, Supplier<HaaSClient> haasClientSupplier) {
this(jobManager, haasClientSupplier);
jobDir = jobDirectory;
propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILE));
resumeSynchronization();
setJobDirectory(jobDirectory);
propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILENAME));
resumeUpload();
}
private Job(JobManager4Job jobManager, Supplier<HaaSClient> haasClientSupplier) {
this.haasClientSupplier = haasClientSupplier;
this.jobManager = jobManager;
try {
this.synchronization = new Synchronization(()->haasClientSupplier.get().startFileTransfer(getId(), new DummyProgressNotifier()), jobDir, Executors.newFixedThreadPool(2), ()-> {
setProperty(JOB_NEEDS_UPLOAD, false);
});
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
public void startUploadData() {
setProperty(JOB_INFO_FILE, true);
setProperty(JOB_NEEDS_UPLOAD, true);
try {
this.synchronization.startUpload();
} catch (IOException e) {
......@@ -99,7 +95,7 @@ public class Job {
}
public void stopUploadData() {
setProperty(JOB_INFO_FILE, false);
setProperty(JOB_NEEDS_UPLOAD, false);
try {
this.synchronization.stopUpload();
} catch (IOException e) {
......@@ -108,6 +104,10 @@ public class Job {
}
}
public boolean isUploading() {
return Boolean.parseBoolean(getProperty(JOB_NEEDS_UPLOAD));
}
public void uploadFile(String file, Progress notifier) {
Iterable<UploadingFile> uploadingFiles = Arrays.asList(file).stream()
.map((String name) -> HaaSClient.getUploadingFile(jobDir.resolve(name))).collect(Collectors.toList());
......@@ -212,7 +212,19 @@ public class Job {
return result;
}
private synchronized void resumeSynchronization() {
private void setJobDirectory(Path jobDirectory) {
this.jobDir = jobDirectory;
try {
this.synchronization = new Synchronization(()->haasClientSupplier.get().startFileTransfer(getId(), new DummyProgressNotifier()), jobDir, Executors.newFixedThreadPool(2), ()-> {
setProperty(JOB_NEEDS_UPLOAD, false);
});
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
private synchronized void resumeUpload() {
if(Boolean.parseBoolean(getProperty(JOB_NEEDS_UPLOAD))) {
synchronization.resumeUpload();
}
......@@ -251,7 +263,7 @@ public class Job {
} catch (NumberFormatException e) {
return false;
}
return Files.isRegularFile(path.resolve(JOB_INFO_FILE));
return Files.isRegularFile(path.resolve(JOB_INFO_FILENAME));
}
private static long getJobId(Path path) {
......
......@@ -10,8 +10,13 @@ import java.util.LinkedHashSet;
import java.util.Queue;
import java.util.Set;
public class FileIndex {
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FileIndex {
public static final Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.data_transfer.FileIndex.class);
private Path workingFile;
private Set<Path> files = new LinkedHashSet<>();
......@@ -29,25 +34,26 @@ public class FileIndex {
}
}
public synchronized boolean needsDownload(Path file) {
public synchronized boolean insert(Path file) {
return files.add(file);
}
public synchronized void uploaded(Path p) {
files.remove(p);
}
public synchronized void fileUploadQueue(Queue<Path> toUpload) {
public synchronized void fillQueue(Queue<Path> toUpload) {
toUpload.addAll(files);
}
private void loadFromFile() throws IOException {
files.clear();
try (BufferedReader br = Files.newBufferedReader(workingFile)) {
String line;
while (null != (line = br.readLine())) {
processLine(line);
if(Files.exists(workingFile)) {
try (BufferedReader br = Files.newBufferedReader(workingFile)) {
String line;
while (null != (line = br.readLine())) {
processLine(line);
}
}
}
}
......
......@@ -21,6 +21,8 @@ public class Synchronization {
public static final Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.data_transfer.Synchronization.class);
private static final String FILE_INDEX_FILENAME = ".toUploadFiles";
private Supplier<HaaSFileTransfer> fileTransferSupplier;
private Path workingDirectory;
......@@ -39,7 +41,7 @@ public class Synchronization {
ExecutorService service, Runnable uploadFinishedNotifier ) throws IOException {
this.fileTransferSupplier = fileTransferSupplier;
this.workingDirectory = workingDirectory;
this.fileRepository = new FileIndex(workingDirectory);
this.fileRepository = new FileIndex(workingDirectory.resolve(FILE_INDEX_FILENAME));
this.runnerForUpload = new SimpleThreadRunner(service);
this.uploadFinishedNotifier = uploadFinishedNotifier;
}
......@@ -49,7 +51,7 @@ public class Synchronization {
fileRepository.clear();
try(DirectoryStream<Path> ds = Files.newDirectoryStream(workingDirectory,this::isNotHidden)) {
for (Path file : ds) {
fileRepository.needsDownload(file);
fileRepository.insert(file);
toUpload.add(file);
runnerForUpload.runIfNotRunning(this::doUpload);
}
......@@ -58,20 +60,23 @@ public class Synchronization {
fileRepository.storeToFile();
}
}
public void stopUpload() throws IOException {
toUpload.clear();
fileRepository.clear();
}
public void resumeUpload() {
fileRepository.fileUploadQueue(toUpload);
fileRepository.fillQueue(toUpload);
if(!toUpload.isEmpty()) {
runnerForUpload.runIfNotRunning(this::doUpload);
}
}
public void startDownload() {
}
private boolean isNotHidden(Path file) {
......@@ -83,26 +88,28 @@ public class Synchronization {
while (!toUpload.isEmpty()) {
Path p = toUpload.poll();
UploadingFile uf = createUploadingFile(p);
log.info("upload: " + p);
tr.upload(Arrays.asList(uf));
fileUploaded(p);
log.info("uploaded: " + p);
reRun.set(false);
}
} finally {
try {
fileRepository.storeToFile();
synchronized(this) {
if(startUploadFinished) {
uploadFinishedNotifier.run();
}
synchronized (this) {
if (startUploadFinished) {
uploadFinishedNotifier.run();
}
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
}
private void fileUploaded(Path p) {
fileRepository.uploaded(p);
try {
fileRepository.uploaded(p);
fileRepository.storeToFile();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
private UploadingFile createUploadingFile(Path p) {
......
......@@ -87,6 +87,18 @@ public class BenchmarkJobManager {
public JobState getState() {
return getStateAsync(r -> r.run()).getNow(JobState.Unknown);
}
public void startUpload() {
job.startUploadData();
}
public void stopUpload() {
job.stopUploadData();
}
public boolean isUploading() {
return job.isUploading();
}
public synchronized CompletableFuture<JobState> getStateAsync(Executor executor) {
if (running != null) {
......@@ -99,42 +111,6 @@ public class BenchmarkJobManager {
return result;
}
private synchronized CompletableFuture<JobState> doGetStateAsync(Executor executor) {
JobState state = job.getState();
if (state != JobState.Finished) {
return CompletableFuture.completedFuture(state);
} else if (verifiedState != null) {
return CompletableFuture.completedFuture(verifiedState);
}
verifiedStateProcessed = true;
return CompletableFuture.supplyAsync(() -> {
try {
verifiedState = Stream.concat(Arrays.asList(state).stream(),
getTasks().stream().filter(task -> !task.getDescription().equals(Constants.DONE_TASK))
.flatMap(task -> task.getComputations().stream()).map(tc -> tc.getState()))
.max(new JobStateComparator()).get();
if (verifiedState != JobState.Finished && verifiedState != JobState.Canceled) {
verifiedState = JobState.Failed;
}
synchronized (BenchmarkJob.this) {
// test whether job was restarted - it sets running to null
if (!verifiedStateProcessed) {
verifiedState = null;
return doGetStateAsync(r -> r.run()).getNow(null);
}
running = null;
return verifiedState;
}
} finally {
synchronized (BenchmarkJob.this) {
if (running != null) {
running = null;
}
}
}
}, executor);
}
public void downloadData(Progress progress) throws IOException {
if (job.getState() == JobState.Finished) {
String filePattern = job.getProperty(SPIM_OUTPUT_FILENAME_PATTERN);
......@@ -242,6 +218,42 @@ public class BenchmarkJobManager {
return "" + getId();
}
private synchronized CompletableFuture<JobState> doGetStateAsync(Executor executor) {
JobState state = job.getState();
if (state != JobState.Finished) {
return CompletableFuture.completedFuture(state);
} else if (verifiedState != null) {
return CompletableFuture.completedFuture(verifiedState);
}
verifiedStateProcessed = true;
return CompletableFuture.supplyAsync(() -> {
try {
verifiedState = Stream.concat(Arrays.asList(state).stream(),
getTasks().stream().filter(task -> !task.getDescription().equals(Constants.DONE_TASK))
.flatMap(task -> task.getComputations().stream()).map(tc -> tc.getState()))
.max(new JobStateComparator()).get();
if (verifiedState != JobState.Finished && verifiedState != JobState.Canceled) {
verifiedState = JobState.Failed;
}
synchronized (BenchmarkJob.this) {
// test whether job was restarted - it sets running to null
if (!verifiedStateProcessed) {
verifiedState = null;
return doGetStateAsync(r -> r.run()).getNow(null);
}
running = null;
return verifiedState;
}
} finally {
synchronized (BenchmarkJob.this) {
if (running != null) {
running = null;
}
}
}
}, executor);
}
private String getStringFromTimeSafely(Calendar time) {
return time != null ? time.getTime().toString() : "N/A";
}
......
<?xml version="1.0" encoding="UTF-8"?>
<?import javafx.geometry.Insets?>
<?import javafx.scene.control.TableColumn?>
<?import javafx.scene.control.TableView?>
<?import javafx.scene.layout.BorderPane?>
<?import javafx.scene.layout.HBox?>
<?import javafx.geometry.Insets?>
<fx:root type="BorderPane" xmlns="http://javafx.com/javafx/8.0.65"
xmlns:fx="http://javafx.com/fxml/1" fx:controller="cz.it4i.fiji.haas_spim_benchmark.ui.BenchmarkSPIMController">
<fx:root type="BorderPane" xmlns="http://javafx.com/javafx/8.0.65" xmlns:fx="http://javafx.com/fxml/1" fx:controller="cz.it4i.fiji.haas_spim_benchmark.ui.BenchmarkSPIMController">
<center>
<HBox>
<children>
<TableView fx:id="jobs" HBox.hgrow="ALWAYS">
<TableView fx:id="jobs" prefHeight="400.0" prefWidth="1065.0" HBox.hgrow="ALWAYS">
<columns>
<TableColumn prefWidth="75.0" text="Job Id" />
<TableColumn prefWidth="149.0" text="Status" />
<TableColumn prefWidth="230.0" text="Creation time" />
<TableColumn prefWidth="230.0" text="Start time" />
<TableColumn prefWidth="230.0" text="End Time" />
<TableColumn prefWidth="150.0" text="Upload" />
</columns>
</TableView>
</children>
......
......@@ -108,6 +108,9 @@ public class BenchmarkSPIMController extends BorderPane implements CloseableCont
j -> j.getState() == JobState.Running || j.getState() == JobState.Finished
|| j.getState() == JobState.Failed || j.getState() == JobState.Canceled));
menu.addItem("Upload data", job -> executeWSCallAsync("Uploading data", p -> job.getValue().startUpload()),
job -> JavaFXRoutines.notNullValue(job, j -> !EnumSet.of(JobState.Running).contains(j.getState())));
menu.addItem("Download result",
job -> executeWSCallAsync("Downloading data", p -> job.getValue().downloadData(p)),
job -> JavaFXRoutines.notNullValue(job,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment