diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/BenchmarkJobManager.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/BenchmarkJobManager.java index 09b4536ca1e706c9418a775aa40de5d46e986bc4..cb70898a78e5d965df1d802c10cc67fe5a410fc1 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/BenchmarkJobManager.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/BenchmarkJobManager.java @@ -3,11 +3,13 @@ package cz.it4i.fiji.haas; import java.io.IOException; import java.io.InputStream; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.yaml.snakeyaml.Yaml; @@ -19,13 +21,15 @@ import cz.it4i.fiji.haas_java_client.JobState; import net.imagej.updater.util.Progress; public class BenchmarkJobManager { + private static final String SPIM_OUTPUT_FILENAME_PATTERN = "spim.outputFilenamePattern"; + private static final int HAAS_TEMPLATE_ID = 4; private static final String CONFIG_YAML = "config.yaml"; private JobManager jobManager; private Progress progress; - private Map<Long,JobInfo> jobs = new HashMap<>(); + private Map<Long, JobInfo> jobs = new HashMap<>(); public BenchmarkJobManager(Path workDirectory, Progress progress) throws IOException { - jobManager = new JobManager(workDirectory, TestingConstants.getSettings(2, 6)); + jobManager = new JobManager(workDirectory, TestingConstants.getSettings(HAAS_TEMPLATE_ID, 6)); this.progress = progress; } @@ -40,28 +44,34 @@ public class BenchmarkJobManager { jobInfo.uploadFilesByName(() -> Arrays.asList(CONFIG_YAML).stream()); String outputName = getOutputName(jobInfo.openLocalFile(CONFIG_YAML)); jobInfo.submit(); - jobInfo.setProperty("spim.outputFilenamePattern", outputName); + jobInfo.setProperty(SPIM_OUTPUT_FILENAME_PATTERN, outputName); } - public Collection<Long> getJobs() throws IOException { return jobManager.getJobs(progress).stream().map(this::indexJob).collect(Collectors.toList()); } - + public JobState getState(long jobId) { return jobs.get(jobId).getState(); } - public void downloadData(long jobId) { - // TODO Auto-generated method stub - - - + public void downloadData(long jobId) throws IOException { + JobInfo ji = jobs.get(jobId); + if (ji.needsDownload()) { + if (ji.getState() == JobState.Finished) { + String filePattern = ji.getProperty(SPIM_OUTPUT_FILENAME_PATTERN); + ji.downloadData(downloadFinishedData(filePattern), progress); + } else if (ji.getState() == JobState.Failed) { + ji.downloadData(downloadFailedData(), progress); + } + } + } + public Iterable<String> getOutput(long jobId, List<JobSynchronizableFile> files) { return jobs.get(jobId).getOutput(files); } - + private HaaSClient.UploadingFile getUploadingFile() { return new UploadingFileFromResource("", CONFIG_YAML); } @@ -73,23 +83,40 @@ public class BenchmarkJobManager { @SuppressWarnings("rawtypes") private String getOutputName(InputStream openLocalFile) throws IOException { - try(InputStream is = openLocalFile){ + try (InputStream is = openLocalFile) { Yaml yaml = new Yaml(); - + Map map = yaml.load(is); - String result = (String) ((Map)map.get("common")).get("hdf5_xml_filename"); - if(result == null) { + String result = (String) ((Map) map.get("common")).get("hdf5_xml_filename"); + if (result == null) { throw new IllegalArgumentException("hdf5_xml_filename not found"); } + if (result.charAt(0) == '"' || result.charAt(0) == '\'') { + if (result.charAt(result.length() - 1) != result.charAt(0)) { + throw new IllegalArgumentException(result); + } + result = result.substring(1, result.length() - 1); + } + return result; } - - } - - - + } - + private Predicate<String> downloadFinishedData(String filePattern) { + return name -> { + Path p = Paths.get(name); + String fileName = p.getFileName().toString(); + return fileName.startsWith(filePattern) && fileName.endsWith("h5") || fileName.equals(filePattern + ".xml") + || fileName.equals("benchmark_result.csv"); + }; + } + private Predicate<String> downloadFailedData() { + return name -> { + Path p = Paths.get(name); + return p.getFileName().toString().startsWith("snakejob.") + || p.getParent().getFileName().toString().equals("logs"); + }; + } } diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java index b00c1e70ffcbc6e8609b40f6cad25e8be12f84d2..9399ccb5b5b34ef0582538b3550d6732b6f45dd5 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java @@ -10,6 +10,7 @@ import java.util.Calendar; import java.util.Collections; import java.util.EnumSet; import java.util.Properties; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -94,6 +95,7 @@ public class Job { this(haasClientSupplier, progress); jobDir = p; loadJobInfo(); + updateState(); } public void uploadFiles(Supplier<Stream<UploadingFile>> files) { @@ -110,7 +112,7 @@ public class Job { client.submitJob(jobId, notifier); } - private Job(Supplier<HaaSClient> haasClientSupplier, Progress progress) { + private Job(Supplier<HaaSClient> haasClientSupplier, Progress progress) throws IOException { notifier = new P_ProgressNotifierAdapter(progress); this.haasClientSupplier = haasClientSupplier; } @@ -135,7 +137,7 @@ public class Job { } public void download() { - download(dummy); + download(x->true, dummy); } public Path storeDataInWorkdirectory(UploadingFile uploadingFile) throws IOException { @@ -146,11 +148,11 @@ public class Job { return result; } - synchronized public void download(Progress progress) { + synchronized public void download(Predicate<String> predicate, Progress progress) { if (!needsDownload()) { throw new IllegalStateException("Job: " + getJobId() + " doesn't need download"); } - haasClientSupplier.get().download(getJobId(), jobDir, new P_ProgressNotifierAdapter(progress)); + haasClientSupplier.get().download(getJobId(), jobDir, predicate, new P_ProgressNotifierAdapter(progress)); needsDownload = false; try { saveJobinfo(); @@ -190,9 +192,13 @@ public class Job { public void setProperty(String name, String value) throws IOException { Properties prop = loadPropertiesIfExists(); prop.setProperty(name, value); - + storeProperties(prop); } + public String getProperty(String name) throws IOException { + return loadPropertiesIfExists().getProperty(name); + } + private synchronized void saveJobinfo() throws IOException { Properties prop = loadPropertiesIfExists(); if (needsDownload != null) { @@ -213,8 +219,8 @@ public class Job { Properties prop = loadPropertiesIfExists(); if (prop.containsKey(JOB_HAS_DATA_TO_DOWNLOAD_PROPERTY)) { needsDownload = Boolean.parseBoolean(prop.getProperty(JOB_HAS_DATA_TO_DOWNLOAD_PROPERTY)); - name = prop.getProperty(JOB_NAME); } + name = prop.getProperty(JOB_NAME); } private Properties loadPropertiesIfExists() throws IOException { @@ -286,4 +292,6 @@ public class Job { } + + } diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java index 0853d60e9c784f2fb3700e2f5ccd50ce72f55850..e206c5ef27fe1c56b07511eed2f3d9f51a8a7035 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java @@ -8,6 +8,7 @@ import java.util.Calendar; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -42,6 +43,9 @@ public class JobManager { public JobInfo createJob(Progress progress) throws IOException { Job job; + if(jobs == null) { + jobs = new LinkedList<>(); + } jobs.add(job = new Job(settings.getJobName(), workDirectory, this::getHaasClient, progress)); return new JobInfo(job) { @Override @@ -160,8 +164,13 @@ public class JobManager { } public void downloadData(Progress progress) { - job.download(progress); + downloadData(x->true, progress); + } + + public void downloadData(Predicate<String> predicate, Progress progress) { + job.download(predicate,progress); fireValueChangedEvent(); + } public void waitForStart() { @@ -199,6 +208,13 @@ public class JobManager { } + public String getProperty(String name) throws IOException { + return job.getProperty(name); + } + + + + } } diff --git a/haas-imagej-client/src/test/java/cz/it4i/fiji/haas/RunBenchmark.java b/haas-imagej-client/src/test/java/cz/it4i/fiji/haas/RunBenchmark.java index eb50cc28d4e1de6ba81117339ae827d2c6cb319b..01b62934608655ad5192487aa68f525524c3edc1 100644 --- a/haas-imagej-client/src/test/java/cz/it4i/fiji/haas/RunBenchmark.java +++ b/haas-imagej-client/src/test/java/cz/it4i/fiji/haas/RunBenchmark.java @@ -45,7 +45,7 @@ public class RunBenchmark { } else if (state == JobState.Running) { log.info(benchmarkJobManager.getOutput(jobId,Arrays.asList( new JobManager.JobSynchronizableFile(SynchronizableFileType.StandardErrorFile, 0))).iterator().next()); - } + } } } } diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java index f8bd00fe77ea5b8d9560425bcc6a482be6064b75..22d31f12019e03ec80e107ec3ef385a899e96a32 100644 --- a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java @@ -254,25 +254,25 @@ public class HaaSClient { } public void download(long jobId, Path workDirectory, final ProgressNotifier notifier) { - download(jobId, workDirectory, notifier, val -> true); + download(jobId, workDirectory, val -> true, notifier); } - public void download(long jobId, Path workDirectory, final ProgressNotifier notifier, Predicate<String> function) { + public void download(long jobId, Path workDirectory, Predicate<String> function, final ProgressNotifier notifier) { try { notifier.setTitle("Downloading"); FileTransferMethodExt ft = getFileTransfer().getFileTransferMethod(jobId, getSessionID()); try (ScpClient scpClient = getScpClient(ft)) { String[] filesArray = getFileTransfer().listChangedFilesForJob(jobId, getSessionID()); - Stream<String> files = Arrays.asList(filesArray).stream().filter(function); + Collection<String> files = Arrays.asList(filesArray).stream().filter(function).collect(Collectors.toList()); List<Long> fileSizes = getSizes( - files.map(filename -> "'" + ft.getSharedBasepath() + "/" + filename + "'").collect( + files.stream().map(filename -> "'" + ft.getSharedBasepath() + "/" + filename + "'").collect( Collectors.toList()), scpClient, new P_ProgressNotifierDecorator4Size(notifier)); final long totalFileSize = fileSizes.stream().mapToLong(i -> i.longValue()).sum(); TransferFileProgressForHaaSClient progress = new TransferFileProgressForHaaSClient(totalFileSize, notifier); int idx = 0; - for (String fileName : (Iterable<String>) files::iterator) { + for (String fileName : files) { fileName = fileName.replaceFirst("/", ""); Path rFile = workDirectory.resolve(fileName); if (!Files.exists(rFile.getParent())) {