Skip to content
Snippets Groups Projects
Commit c6bc5861 authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

Refactoring for benchmark running

parent 92aff223
No related branches found
No related tags found
No related merge requests found
...@@ -3,6 +3,8 @@ package cz.it4i.fiji.haas; ...@@ -3,6 +3,8 @@ package cz.it4i.fiji.haas;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import cz.it4i.fiji.haas.JobManager.JobInfo; import cz.it4i.fiji.haas.JobManager.JobInfo;
import cz.it4i.fiji.haas_java_client.HaaSClient; import cz.it4i.fiji.haas_java_client.HaaSClient;
...@@ -12,37 +14,31 @@ import net.imagej.updater.util.Progress; ...@@ -12,37 +14,31 @@ import net.imagej.updater.util.Progress;
public class BenchmarkJobManager { public class BenchmarkJobManager {
private JobManager jobManager; private JobManager jobManager;
private Progress progress; private Progress progress;
private Path workDirectory; private Map<JobInfo, Path> uploadedFiles = new HashMap<>();
public BenchmarkJobManager(Path workDirectory, Progress progress) throws IOException { public BenchmarkJobManager(Path workDirectory, Progress progress) throws IOException {
this.workDirectory = workDirectory;
jobManager = new JobManager(workDirectory, TestingConstants.getSettings(3, 6)); jobManager = new JobManager(workDirectory, TestingConstants.getSettings(3, 6));
this.progress = progress; this.progress = progress;
} }
public JobInfo startJob() throws IOException { public JobInfo createJob() throws IOException {
JobInfo jobInfo = jobManager.createJob(progress);
JobInfo jobInfo = jobManager.startJob(Arrays.asList(getUploadingFile()).stream(), progress); Path file = jobInfo.storeDataInWorkdirectory(getUploadingFile());
jobInfo.waitForStart(); uploadedFiles.put(jobInfo, file);
if (jobInfo.getState() != JobState.Running) {
throw new IllegalStateException("start of job: " + jobInfo + " failed");
}
return jobInfo; return jobInfo;
} }
private HaaSClient.UploadingFile getUploadingFile() { public void startJob(JobInfo jobInfo) {
return new UploadingFileFromResource("", "config.yaml"); jobInfo.uploadFiles(Arrays.asList(HaaSClient.getUploadingFile(uploadedFiles.get(jobInfo))).stream());
jobInfo.submit();
} }
public JobState getState(long jobId) { public JobState getState(long jobId) {
return jobManager.getState(jobId); return jobManager.getState(jobId);
} }
private byte[] updateConfigFile(byte[] data) throws IOException {
return data;
private HaaSClient.UploadingFile getUploadingFile() {
return new UploadingFileFromResource("", "config.yaml");
} }
} }
...@@ -44,6 +44,7 @@ public class Job { ...@@ -44,6 +44,7 @@ public class Job {
private Boolean needsDownload; private Boolean needsDownload;
private JobInfo jobInfo; private JobInfo jobInfo;
private Long jobId; private Long jobId;
private ProgressNotifier notifier;
final private Progress dummy = new Progress() { final private Progress dummy = new Progress() {
...@@ -72,12 +73,11 @@ public class Job { ...@@ -72,12 +73,11 @@ public class Job {
} }
}; };
public Job(Path basePath, Stream<UploadingFile> files, Supplier<HaaSClient> haasClientSupplier, Progress progress) public Job(Path basePath, Supplier<HaaSClient> haasClientSupplier, Progress progress) throws IOException {
throws IOException {
this(haasClientSupplier); this(haasClientSupplier);
HaaSClient client = this.haasClientSupplier.get(); HaaSClient client = this.haasClientSupplier.get();
long id = client.start(files, "TestOutRedirect", Collections.emptyList(), long id = client.createJob("TestOutRedirect", Collections.emptyList(),
new P_ProgressNotifierAdapter(progress)); notifier = new P_ProgressNotifierAdapter(progress));
jobDir = basePath.resolve("" + id); jobDir = basePath.resolve("" + id);
Files.createDirectory(jobDir); Files.createDirectory(jobDir);
updateState(); updateState();
...@@ -89,6 +89,16 @@ public class Job { ...@@ -89,6 +89,16 @@ public class Job {
loadJobInfo(); loadJobInfo();
} }
public void uploadFiles(Stream<UploadingFile> files) {
HaaSClient client = this.haasClientSupplier.get();
client.uploadFiles(jobId, files, notifier);
}
public void submit() {
HaaSClient client = this.haasClientSupplier.get();
client.submitJob(jobId, notifier);
}
private Job(Supplier<HaaSClient> haasClientSupplier) { private Job(Supplier<HaaSClient> haasClientSupplier) {
this.haasClientSupplier = haasClientSupplier; this.haasClientSupplier = haasClientSupplier;
} }
...@@ -116,6 +126,14 @@ public class Job { ...@@ -116,6 +126,14 @@ public class Job {
public void download() { public void download() {
download(dummy); download(dummy);
} }
public Path storeDataInWorkdirectory(UploadingFile uploadingFile) throws IOException {
Path result;
try(InputStream is = uploadingFile.getInputStream()) {
Files.copy(is, result = jobDir.resolve(uploadingFile.getName()));
}
return result;
}
synchronized public void download(Progress progress) { synchronized public void download(Progress progress) {
if (!needsDownload()) { if (!needsDownload()) {
...@@ -129,7 +147,7 @@ public class Job { ...@@ -129,7 +147,7 @@ public class Job {
log.error(e); log.error(e);
} }
} }
public JobState getState() { public JobState getState() {
return state; return state;
} }
...@@ -218,5 +236,4 @@ public class Job { ...@@ -218,5 +236,4 @@ public class Job {
} }
} }
} }
...@@ -45,9 +45,9 @@ public class JobManager { ...@@ -45,9 +45,9 @@ public class JobManager {
} }
public JobInfo startJob(Stream<UploadingFile> files, Progress progress) throws IOException { public JobInfo createJob(Progress progress) throws IOException {
Job job; Job job;
jobs.add(job = new Job(workDirectory, files, this::getHaasClient, progress)); jobs.add(job = new Job(workDirectory, this::getHaasClient, progress));
return new JobInfo(job) { return new JobInfo(job) {
@Override @Override
public JobState getState() { public JobState getState() {
...@@ -60,6 +60,14 @@ public class JobManager { ...@@ -60,6 +60,14 @@ public class JobManager {
} }
}; };
} }
public JobInfo startJob(Stream<UploadingFile> files, Progress progress) throws IOException {
JobInfo result = createJob(progress);
result.uploadFiles(files);
result.submit();
return result;
}
public Iterable<JobInfo> getJobsNeedingDownload() { public Iterable<JobInfo> getJobsNeedingDownload() {
return () -> jobs.stream().filter(j -> j.needsDownload()).map(j -> new JobInfo(j)).iterator(); return () -> jobs.stream().filter(j -> j.needsDownload()).map(j -> new JobInfo(j)).iterator();
...@@ -95,6 +103,16 @@ public class JobManager { ...@@ -95,6 +103,16 @@ public class JobManager {
this.job = job; this.job = job;
} }
public void uploadFiles(Stream<UploadingFile> files) {
job.uploadFiles(files);
}
public void submit() {
job.submit();
}
public Long getId() { public Long getId() {
return job.getJobId(); return job.getJobId();
} }
...@@ -142,6 +160,13 @@ public class JobManager { ...@@ -142,6 +160,13 @@ public class JobManager {
return time != null ? time.getTime().toString() : "N/A"; return time != null ? time.getTime().toString() : "N/A";
} }
public Path storeDataInWorkdirectory(UploadingFile uploadingFile) throws IOException {
return job.storeDataInWorkdirectory(uploadingFile);
}
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment