Skip to content
Snippets Groups Projects
Commit 8d7f3847 authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

feat: set input/output directory

propagate directories to Synchronization
parent 1e49c657
No related branches found
No related tags found
No related merge requests found
......@@ -5,12 +5,14 @@ import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
......@@ -29,8 +31,10 @@ import cz.it4i.fiji.haas_java_client.ProgressNotifier;
import cz.it4i.fiji.haas_java_client.TransferFileProgressForHaaSClient;
import cz.it4i.fiji.haas_java_client.UploadingFile;
import cz.it4i.fiji.scpclient.TransferFileProgress;
/***
* TASK - napojit na UI
* TASK - napojit na UI
*
* @author koz01
*
*/
......@@ -43,13 +47,19 @@ public class Job {
private static final String JOB_INFO_FILENAME = ".jobinfo";
private static final String JOB_NEEDS_DOWNLOAD = "job.needs_download";
private static final String JOB_CAN_BE_DOWNLOADED = "job.can_be_downloaded";
private static final String JOB_IS_DOWNLOADED = "job.downloaded";
private static final String JOB_IS_UPLOADED = "job.uploaded";
private static final String JOB_OUTPUT_DIRECTORY_PATH = "job.output_directory_path";
private static final String JOB_INPUT_DIRECTORY_PATH = "job.input_directory_path";
private static final String JOB_USE_DEMO_DATA = "job.use_demo_data";
public static boolean isValidJobPath(Path path) {
try {
getJobId(path);
......@@ -66,41 +76,50 @@ public class Job {
private final Supplier<HaaSClient> haasClientSupplier;
private JobInfo jobInfo;
private Long jobId;
private PropertyHolder propertyHolder;
private final JobManager4Job jobManager;
private Synchronization synchronization;
private Path inputDirectory;
private Path outputDirectory;
private boolean useDemoData;
public Job(JobManager4Job jobManager, String name, Path basePath, Supplier<HaaSClient> haasClientSupplier)
public Job(JobManager4Job jobManager, String name, Path basePath, Supplier<HaaSClient> haasClientSupplier,
Function<Path, Path> inputDirectoryProvider, Function<Path, Path> outputDirectoryProvider)
throws IOException {
this(jobManager, haasClientSupplier);
HaaSClient client = getHaaSClient();
long id = client.createJob(name, Collections.emptyList());
setJobDirectory(basePath.resolve("" + id));
setJobDirectory(basePath.resolve("" + id), inputDirectoryProvider, outputDirectoryProvider);
propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILENAME));
Files.createDirectory(jobDir);
Files.createDirectory(this.jobDir);
storeInputOutputDirectory();
setName(name);
}
public Job(JobManager4Job jobManager, Path jobDirectory, Supplier<HaaSClient> haasClientSupplier) {
this(jobManager, haasClientSupplier);
setJobDirectory(jobDirectory);
propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILENAME));
propertyHolder = new PropertyHolder(jobDirectory.resolve(JOB_INFO_FILENAME));
useDemoData = getSafeBoolean(JOB_USE_DEMO_DATA);
setJobDirectory(jobDirectory, jd -> useDemoData ? null : getDataDirectory(JOB_INPUT_DIRECTORY_PATH, jd),
jd -> getDataDirectory(JOB_OUTPUT_DIRECTORY_PATH, jd));
}
private Job(JobManager4Job jobManager, Supplier<HaaSClient> haasClientSupplier) {
this.haasClientSupplier = haasClientSupplier;
this.jobManager = jobManager;
}
public void startUploadData() {
setProperty(JOB_NEEDS_UPLOAD, true);
try {
......@@ -127,7 +146,7 @@ public class Job {
.collect(Collectors.toList());
synchronization.startDownload(files);
}
public void stopDownloadData() {
setProperty(JOB_NEEDS_DOWNLOAD, false);
try {
......@@ -137,14 +156,12 @@ public class Job {
throw new RuntimeException(e);
}
}
public synchronized void resumeUpload() {
if (needsUpload()) {
synchronization.resumeUpload();
}
}
public synchronized void resumeDownload() {
if (needsDownload()) {
......@@ -152,11 +169,10 @@ public class Job {
}
}
public boolean canBeDownloaded() {
return Boolean.parseBoolean(getProperty(JOB_CAN_BE_DOWNLOADED));
}
public void setUploaded(boolean b) {
setProperty(JOB_IS_UPLOADED, b);
}
......@@ -164,23 +180,23 @@ public class Job {
public void setDownloaded(boolean b) {
setProperty(JOB_IS_DOWNLOADED, b);
}
public boolean isUploaded() {
return getSafeBoolean(getProperty(JOB_IS_UPLOADED));
return getSafeBoolean(getProperty(JOB_IS_UPLOADED));
}
public boolean isDownloaded() {
return getSafeBoolean(getProperty(JOB_IS_DOWNLOADED));
}
public boolean needsDownload() {
return Boolean.parseBoolean(getProperty(JOB_NEEDS_DOWNLOAD));
}
public boolean needsUpload() {
return Boolean.parseBoolean(getProperty(JOB_NEEDS_UPLOAD));
}
public void uploadFile(String file, ProgressNotifier notifier) {
uploadFiles(Arrays.asList(file), notifier);
}
......@@ -224,14 +240,16 @@ public class Job {
setCanBeDownloaded(true);
}
synchronized public long getId() {
if (jobId == null) {
jobId = getJobId(jobDir);
}
return jobId;
}
public boolean isUseDemoData() {
return useDemoData;
}
public Path storeDataInWorkdirectory(UploadingFile uploadingFile) throws IOException {
Path result;
......@@ -242,8 +260,10 @@ public class Job {
}
synchronized public void download(Predicate<String> predicate, ProgressNotifier notifier) {
List<String> files = getHaaSClient().getChangedFiles(jobId).stream().filter(predicate).collect(Collectors.toList());
try (HaaSFileTransfer transfer = haasClientSupplier.get().startFileTransfer(getId(), HaaSClient.DUMMY_TRANSFER_FILE_PROGRESS)) {
List<String> files = getHaaSClient().getChangedFiles(jobId).stream().filter(predicate)
.collect(Collectors.toList());
try (HaaSFileTransfer transfer = haasClientSupplier.get().startFileTransfer(getId(),
HaaSClient.DUMMY_TRANSFER_FILE_PROGRESS)) {
List<Long> fileSizes;
try {
fileSizes = transfer.obtainSize(files);
......@@ -369,20 +389,44 @@ public class Job {
public void setUploadNotifier(ProgressNotifier notifier) {
synchronization.setUploadNotifier(notifier);
}
public void close() {
synchronization.close();
}
private void storeInputOutputDirectory() {
if (inputDirectory == null) {
useDemoData = true;
propertyHolder.setValue(JOB_USE_DEMO_DATA, "" + useDemoData);
} else {
storeDataDirectory(JOB_INPUT_DIRECTORY_PATH, inputDirectory);
}
storeDataDirectory(JOB_OUTPUT_DIRECTORY_PATH, outputDirectory);
}
private void storeDataDirectory(String directoryPropertyName, Path directory) {
if (!jobDir.equals(directory)) {
propertyHolder.setValue(directoryPropertyName, directory.toString());
}
}
private Path getDataDirectory(String typeOfDirectory, Path jobDirectory) {
String directory = propertyHolder.getValue(typeOfDirectory);
return directory != null ? Paths.get(directory) : jobDirectory;
}
private boolean getSafeBoolean(String value) {
return value != null ? Boolean.parseBoolean(value) : false;
}
private void setJobDirectory(Path jobDirectory) {
private void setJobDirectory(Path jobDirectory, Function<Path, Path> inputDirectoryProvider,
Function<Path, Path> outputDirectoryProvider) {
this.jobDir = jobDirectory;
try {
this.synchronization = new Synchronization(() -> startFileTransfer(HaaSClient.DUMMY_TRANSFER_FILE_PROGRESS),
jobDir, () -> {
jobDir, this.inputDirectory = inputDirectoryProvider.apply(jobDir),
this.outputDirectory = outputDirectoryProvider.apply(jobDir), () -> {
setProperty(JOB_NEEDS_UPLOAD, false);
setUploaded(true);
}, () -> {
......@@ -396,14 +440,10 @@ public class Job {
}
}
private HaaSFileTransfer startFileTransfer( TransferFileProgress progress) {
private HaaSFileTransfer startFileTransfer(TransferFileProgress progress) {
return haasClientSupplier.get().startFileTransfer(getId(), progress);
}
private void setName(String name) {
setProperty(JOB_NAME, name);
}
......@@ -426,7 +466,7 @@ public class Job {
private static long getJobId(Path path) {
return Long.parseLong(path.getFileName().toString());
}
private void setCanBeDownloaded(boolean b) {
setProperty(JOB_CAN_BE_DOWNLOADED, b);
}
......
......@@ -7,6 +7,7 @@ import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -44,10 +45,12 @@ public class JobManager implements Closeable {
this.settings = settings;
}
public Job createJob() throws IOException {
public Job createJob(Function<Path, Path> inputDirectoryProvider, Function<Path, Path> outputDirectoryProvider)
throws IOException {
Job result;
initJobsIfNecessary();
jobs.add(result = new Job(remover, settings.getJobName(), workDirectory, this::getHaasClient));
jobs.add(result = new Job(remover, settings.getJobName(), workDirectory, this::getHaasClient,
inputDirectoryProvider, outputDirectoryProvider));
return result;
}
......
......@@ -48,11 +48,11 @@ public class Synchronization implements Closeable {
private final ExecutorService service;
public Synchronization(Supplier<HaaSFileTransfer> fileTransferSupplier, Path workingDirectory,
Runnable uploadFinishedNotifier, Runnable downloadFinishedNotifier) throws IOException {
this(fileTransferSupplier, workingDirectory, workingDirectory, workingDirectory, uploadFinishedNotifier,
downloadFinishedNotifier);
}
// public Synchronization(Supplier<HaaSFileTransfer> fileTransferSupplier, Path workingDirectory,
// Runnable uploadFinishedNotifier, Runnable downloadFinishedNotifier) throws IOException {
// this(fileTransferSupplier, workingDirectory, workingDirectory, workingDirectory, uploadFinishedNotifier,
// downloadFinishedNotifier);
// }
public Synchronization(Supplier<HaaSFileTransfer> fileTransferSupplier, Path workingDirectory, Path inputDirectory,
Path outputDirectory, Runnable uploadFinishedNotifier, Runnable downloadFinishedNotifier)
......
......@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
......@@ -60,6 +61,10 @@ public class BenchmarkJobManager implements Closeable{
private final Job job;
public boolean isUseDemoData() {
return job.isUseDemoData();
}
private final List<Task> tasks;
private final List<BenchmarkError> nonTaskSpecificErrors;
private final SPIMComputationAccessor computationAccessor;
......@@ -454,8 +459,8 @@ public class BenchmarkJobManager implements Closeable{
jobManager = new JobManager(params.workingDirectory(), constructSettingsFromParams(params));
}
public BenchmarkJob createJob() throws IOException {
Job job = jobManager.createJob();
public BenchmarkJob createJob(Function<Path, Path> inputDirectoryProvider,Function<Path, Path> outputDirectoryProvider) throws IOException {
Job job = jobManager.createJob(inputDirectoryProvider, outputDirectoryProvider);
job.storeDataInWorkdirectory(getUploadingFile());
return convertJob(job);
}
......@@ -569,7 +574,7 @@ public class BenchmarkJobManager implements Closeable{
public void close() {
jobManager.close();
}
private UploadingFile getUploadingFile() {
return new UploadingFileFromResource("", Constants.CONFIG_YAML);
}
......
......@@ -2,7 +2,6 @@ package cz.it4i.fiji.haas_spim_benchmark.ui;
import java.awt.Desktop;
import java.awt.Window;
import java.awt.event.WindowAdapter;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashSet;
......@@ -128,7 +127,8 @@ public class BenchmarkSPIMController extends BorderPane implements CloseableCont
menu.addItem("Upload data", job -> executeWSCallAsync("Uploading data", p -> job.getValue().startUpload()),
job -> executeWSCallAsync("Stop uploading data", p -> job.getValue().stopUpload()),
job -> JavaFXRoutines.notNullValue(job, j -> !EnumSet.of(JobState.Running).contains(j.getState())),
job -> JavaFXRoutines.notNullValue(job,
j -> !j.isUseDemoData() && !EnumSet.of(JobState.Running).contains(j.getState())),
job -> job.getUploadProgress().isWorking());
menu.addItem("Download result",
......@@ -153,7 +153,8 @@ public class BenchmarkSPIMController extends BorderPane implements CloseableCont
private void createJob() {
NewJobWindow newJobWindow = new NewJobWindow(null);
ModalDialogs.doModal(newJobWindow, WindowConstants.DISPOSE_ON_CLOSE);
newJobWindow.setCreatePressedNotifier((Runnable) () -> executeWSCallAsync("Creating job", p -> manager.createJob()));
newJobWindow.setCreatePressedNotifier(() -> executeWSCallAsync("Creating job", p -> manager
.createJob(wd -> newJobWindow.getInputDirectory(wd), wd -> newJobWindow.getOutputDirectory(wd))));
}
......
......@@ -20,7 +20,7 @@ public class RunBenchmark {
public static class CreateJob {
public static void main(String[] args) throws IOException {
BenchmarkJobManager benchmarkJobManager = new BenchmarkJobManager(getBenchmarkSPIMParameters());
BenchmarkJob ji = benchmarkJobManager.createJob();
BenchmarkJob ji = benchmarkJobManager.createJob(jd -> jd, jd -> jd);
log.info("job: " + ji.getId() + " created.");
}
}
......@@ -42,8 +42,6 @@ public class RunBenchmark {
}
}
private static BenchmarkSPIMParameters getBenchmarkSPIMParameters() throws IOException {
Path p = Paths.get("/tmp/benchmark");
if (!Files.exists(p)) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment