diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java index 4e5e3058abddad5ce2fbded71a5e6ecfab92435b..66304142238aa3251742d394aabe8311c30f5b0b 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java @@ -1,8 +1,18 @@ package cz.it4i.fiji.haas_spim_benchmark.core; +import static cz.it4i.fiji.haas.data_transfer.PersistentSynchronizationProcess.FAILED_ITEM; +import static cz.it4i.fiji.haas_java_client.JobState.Canceled; +import static cz.it4i.fiji.haas_java_client.JobState.Failed; +import static cz.it4i.fiji.haas_java_client.JobState.Finished; +import static cz.it4i.fiji.haas_spim_benchmark.core.Configuration.getHaasClusterNodeType; +import static cz.it4i.fiji.haas_spim_benchmark.core.Configuration.getHaasTemplateID; +import static cz.it4i.fiji.haas_spim_benchmark.core.Configuration.getWalltime; import static cz.it4i.fiji.haas_spim_benchmark.core.Constants.BENCHMARK_TASK_NAME_MAP; +import static cz.it4i.fiji.haas_spim_benchmark.core.Constants.CORES_PER_NODE; +import static cz.it4i.fiji.haas_spim_benchmark.core.Constants.DONE_TASK; import static cz.it4i.fiji.haas_spim_benchmark.core.Constants.FUSION_SWITCH; +import static cz.it4i.fiji.haas_spim_benchmark.core.Constants.HAAS_JOB_NAME; import static cz.it4i.fiji.haas_spim_benchmark.core.Constants.HDF5_XML_FILENAME; import static cz.it4i.fiji.haas_spim_benchmark.core.Constants.SPIM_OUTPUT_FILENAME_PATTERN; import static cz.it4i.fiji.haas_spim_benchmark.core.Constants.VERIFIED_STATE_OF_FINISHED_JOB; @@ -58,7 +68,6 @@ import org.xml.sax.SAXException; import cz.it4i.fiji.commons.WebRoutines; import cz.it4i.fiji.haas.Job; import cz.it4i.fiji.haas.JobManager; -import cz.it4i.fiji.haas.data_transfer.PersistentSynchronizationProcess; import cz.it4i.fiji.haas_java_client.FileTransferInfo; import cz.it4i.fiji.haas_java_client.HaaSClientException; import cz.it4i.fiji.haas_java_client.HaaSClientSettings; @@ -72,12 +81,14 @@ import cz.it4i.fiji.haas_java_client.UploadingFile; public class BenchmarkJobManager implements Closeable { public interface DownloadingStatusProvider { + boolean isDownloaded(); + boolean needsDownload(); } - private static Logger log = LoggerFactory - .getLogger(cz.it4i.fiji.haas_spim_benchmark.core.BenchmarkJobManager.class); + private static Logger log = LoggerFactory.getLogger( + cz.it4i.fiji.haas_spim_benchmark.core.BenchmarkJobManager.class); private final JobManager jobManager; @@ -89,18 +100,20 @@ public class BenchmarkJobManager implements Closeable { private boolean verifiedStateProcessed; private CompletableFuture<JobState> running; private ProgressNotifier downloadNotifier; - private DownloadingStatusProvider downloadingStatus = new DownloadingStatusProvider() { - - @Override - public boolean needsDownload() { - return job.needsDownload(); - } - - @Override - public boolean isDownloaded() { - return job.isDownloaded(); - } - }; + private DownloadingStatusProvider downloadingStatus = + new DownloadingStatusProvider() + { + + @Override + public boolean needsDownload() { + return job.needsDownload(); + } + + @Override + public boolean isDownloaded() { + return job.isDownloaded(); + } + }; private boolean visibleInBDV; @@ -110,17 +123,19 @@ public class BenchmarkJobManager implements Closeable { } public synchronized void startJob(Progress progress) throws IOException { - job.uploadFile(Constants.CONFIG_YAML, new ProgressNotifierAdapter(progress)); - LoadedYAML yaml = new LoadedYAML(job.openLocalFile(Constants.CONFIG_YAML)); + job.uploadFile(Constants.CONFIG_YAML, new ProgressNotifierAdapter( + progress)); + LoadedYAML yaml = new LoadedYAML(job.openLocalFile( + Constants.CONFIG_YAML)); verifiedStateProcessed = false; running = null; - String message = "Submitting job id #"+getId(); + String message = "Submitting job id #" + getId(); progress.addItem(message); job.updateInfo(); JobState oldState = updateAndGetState(); job.submit(); setVerifiedState(null); - while(oldState == updateAndGetState()){ + while (oldState == updateAndGetState()) { try { Thread.sleep(Constants.WAIT_FOR_SUBMISSION_TIMEOUT); } @@ -129,8 +144,8 @@ public class BenchmarkJobManager implements Closeable { } } progress.itemDone(message); - job.setProperty(SPIM_OUTPUT_FILENAME_PATTERN, - yaml.getCommonProperty(FUSION_SWITCH) + "_" + yaml.getCommonProperty(HDF5_XML_FILENAME)); + job.setProperty(SPIM_OUTPUT_FILENAME_PATTERN, yaml.getCommonProperty( + FUSION_SWITCH) + "_" + yaml.getCommonProperty(HDF5_XML_FILENAME)); } public boolean delete() { @@ -145,13 +160,16 @@ public class BenchmarkJobManager implements Closeable { return getStateAsync(r -> r.run()).getNow(JobState.Unknown); } - - public synchronized CompletableFuture<JobState> getStateAsync(Executor executor) { + public synchronized CompletableFuture<JobState> getStateAsync( + Executor executor) + { if (running != null) { return running; } CompletableFuture<JobState> result = doGetStateAsync(executor); - if (!result.isCancelled() && !result.isCompletedExceptionally() && !result.isDone()) { + if (!result.isCancelled() && !result.isCompletedExceptionally() && !result + .isDone()) + { running = result; } return result; @@ -183,10 +201,11 @@ public class BenchmarkJobManager implements Closeable { digest = MessageDigest.getInstance("SHA-1"); digest.reset(); digest.update(changed.getBytes("utf8")); - String sha1 = String.format("%040x", new BigInteger(1, digest.digest())); - String result = Constants.BDS_ADDRESS + sha1 + "/"; + String sha1 = String.format("%040x", new BigInteger(1, digest + .digest())); + String result = Configuration.getBDSAddress() + sha1 + "/"; if (log.isDebugEnabled()) { - log.debug("getBDSPathForData changed={} path={}",result, result); + log.debug("getBDSPathForData changed={} path={}", result, result); } return result; } @@ -194,11 +213,11 @@ public class BenchmarkJobManager implements Closeable { throw new RuntimeException(exc); } } - + public boolean isVisibleInBDV() { return visibleInBDV; } - + public void startUpload() { job.startUploadData(); } @@ -224,13 +243,15 @@ public class BenchmarkJobManager implements Closeable { } public CompletableFuture<?> startDownload() throws IOException { - if (job.getState() == JobState.Finished) { + if (job.getState() == Finished) { CompletableFuture<?> result = new CompletableFuture<Void>(); startDownloadResults(result); return result; - } else if (job.getState() == JobState.Failed || job.getState() == JobState.Canceled) { + } + else if (job.getState() == Failed || job.getState() == Canceled) { return job.startDownload(downloadFailedData()); - } else { + } + else { return CompletableFuture.completedFuture(null); } } @@ -310,7 +331,9 @@ public class BenchmarkJobManager implements Closeable { return snakemakeOutputHelper.getActualOutput(types); } - public void storeDataInWorkdirectory(UploadingFile file) throws IOException { + public void storeDataInWorkdirectory(UploadingFile file) + throws IOException + { job.storeDataInWorkdirectory(file); } @@ -352,27 +375,29 @@ public class BenchmarkJobManager implements Closeable { return progress == null ? null : new ProgressNotifierAdapter(progress); } - private synchronized CompletableFuture<JobState> doGetStateAsync(Executor executor) { + private synchronized CompletableFuture<JobState> doGetStateAsync( + Executor executor) + { JobState state = job.getState(); - if (state != JobState.Finished) { + if (state != Finished) { setVerifiedState(null); return CompletableFuture.completedFuture(state); } if (getVerifiedState() != null) { return CompletableFuture.completedFuture(getVerifiedState()); } - + verifiedStateProcessed = true; return CompletableFuture.supplyAsync(() -> { try { - JobState workVerifiedState = Stream.concat(Arrays.asList(state).stream(), - getTasks().stream().filter(task -> !task.getDescription().equals( - Constants.DONE_TASK)).flatMap(task -> task.getComputations() + JobState workVerifiedState = Stream.concat(Arrays.asList(state) + .stream(), getTasks().stream().filter(task -> !task.getDescription() + .equals(DONE_TASK)).flatMap(task -> task.getComputations() .stream()).map(tc -> tc.getState())).max( new JobStateComparator()).get(); - - if (workVerifiedState != JobState.Finished && workVerifiedState != JobState.Canceled) { - workVerifiedState = JobState.Failed; + + if (workVerifiedState != Finished && workVerifiedState != Canceled) { + workVerifiedState = Failed; } synchronized (BenchmarkJob.this) { // test whether job was restarted - it sets running to null @@ -384,7 +409,8 @@ public class BenchmarkJobManager implements Closeable { setVerifiedState(workVerifiedState); return workVerifiedState; } - } finally { + } + finally { synchronized (BenchmarkJob.this) { if (running != null) { running = null; @@ -421,7 +447,7 @@ public class BenchmarkJobManager implements Closeable { } catch (IOException e) { throw new RuntimeException(e); - } + } finally { progressNotifierTemporarySwitchOff.switchOn(); stillRunningTemporarySwitch.switchBack(); @@ -429,8 +455,7 @@ public class BenchmarkJobManager implements Closeable { }).whenComplete((X, e) -> { if (e != null) { log.error(e.getMessage(), e); - downloadNotifier.addItem( - PersistentSynchronizationProcess.FAILED_ITEM); + downloadNotifier.addItem(FAILED_ITEM); } result.complete(null); }); @@ -439,19 +464,26 @@ public class BenchmarkJobManager implements Closeable { private Set<String> extractNames(Path pathToXML) { Set<String> result = new HashSet<>(); try (InputStream fileIS = Files.newInputStream(pathToXML)) { - DocumentBuilderFactory builderFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilderFactory builderFactory = DocumentBuilderFactory + .newInstance(); DocumentBuilder builder = builderFactory.newDocumentBuilder(); Document xmlDocument = builder.parse(fileIS); XPath xPath = XPathFactory.newInstance().newXPath(); - Node imageLoader = ((NodeList) xPath.evaluate("/SpimData/SequenceDescription/ImageLoader", xmlDocument, - XPathConstants.NODESET)).item(0); - Node hdf5 = ((NodeList) xPath.evaluate("hdf5", imageLoader, XPathConstants.NODESET)).item(0); + Node imageLoader = ((NodeList) xPath.evaluate( + "/SpimData/SequenceDescription/ImageLoader", xmlDocument, + XPathConstants.NODESET)).item(0); + Node hdf5 = ((NodeList) xPath.evaluate("hdf5", imageLoader, + XPathConstants.NODESET)).item(0); result.add(hdf5.getTextContent()); - NodeList nl = (NodeList) xPath.evaluate("partition/path", imageLoader, XPathConstants.NODESET); + NodeList nl = (NodeList) xPath.evaluate("partition/path", imageLoader, + XPathConstants.NODESET); for (int i = 0; i < nl.getLength(); i++) { result.add(nl.item(i).getTextContent()); } - } catch (IOException | ParserConfigurationException | SAXException | XPathExpressionException e) { + } + catch (IOException | ParserConfigurationException | SAXException + | XPathExpressionException e) + { throw new HaaSClientException("Extract names from " + pathToXML, e); } return result; @@ -469,17 +501,17 @@ public class BenchmarkJobManager implements Closeable { job.setProperty(VERIFIED_STATE_OF_FINISHED_JOB, value != null ? value .toString() : null); } - + private JobState getVerifiedState() { - if(verifiedState == null) { - String storedVerifiedState = job.getProperty(VERIFIED_STATE_OF_FINISHED_JOB); - if(storedVerifiedState != null) { + if (verifiedState == null) { + String storedVerifiedState = job.getProperty( + VERIFIED_STATE_OF_FINISHED_JOB); + if (storedVerifiedState != null) { verifiedState = JobState.valueOf(storedVerifiedState); } } return verifiedState; } - private JobState updateAndGetState() { job.updateInfo(); @@ -488,27 +520,31 @@ public class BenchmarkJobManager implements Closeable { } public BenchmarkJobManager(BenchmarkSPIMParameters params) { - jobManager = new JobManager(params.workingDirectory(), constructSettingsFromParams(params)); + jobManager = new JobManager(params.workingDirectory(), + constructSettingsFromParams(params)); jobManager.setUploadFilter(this::canUpload); } public BenchmarkJob createJob(Function<Path, Path> inputDirectoryProvider, - Function<Path, Path> outputDirectoryProvider) throws IOException { - Job job = jobManager.createJob( getJobSettings(),inputDirectoryProvider, outputDirectoryProvider); - if(job.getInputDirectory() == null) { + Function<Path, Path> outputDirectoryProvider) throws IOException + { + Job job = jobManager.createJob(getJobSettings(), inputDirectoryProvider, + outputDirectoryProvider); + if (job.getInputDirectory() == null) { job.createEmptyFile(Constants.DEMO_DATA_SIGNAL_FILE_NAME); } return convertJob(job); } public Collection<BenchmarkJob> getJobs() { - return jobManager.getJobs().stream().map(this::convertJob).collect(Collectors.toList()); + return jobManager.getJobs().stream().map(this::convertJob).collect( + Collectors.toList()); } public void checkConnection() { jobManager.checkConnection(); } - + public static void formatResultFile(Path filename) { List<ResultFileTask> identifiedTasks = new LinkedList<>(); @@ -531,7 +567,8 @@ public class BenchmarkJobManager implements Closeable { if (columns[0].equals(Constants.STATISTICS_TASK_NAME)) { - // If there is a task being processed, add all cached jobs to it and wrap it up + // If there is a task being processed, add all cached jobs to it and + // wrap it up if (null != processedTask) { processedTask.setJobs(jobs); identifiedTasks.add(processedTask); @@ -541,14 +578,16 @@ public class BenchmarkJobManager implements Closeable { processedTask = new ResultFileTask(columns[1]); jobs.clear(); - } else if (columns[0].equals(Constants.STATISTICS_JOB_IDS)) { + } + else if (columns[0].equals(Constants.STATISTICS_JOB_IDS)) { // Cache all found jobs for (int i = 1; i < columns.length; i++) { jobs.add(new ResultFileJob()); } - } else if (!columns[0].equals(Constants.STATISTICS_JOB_COUNT)) { + } + else if (!columns[0].equals(Constants.STATISTICS_JOB_COUNT)) { // Save values of a given property to cached jobs for (int i = 1; i < columns.length; i++) { @@ -558,55 +597,68 @@ public class BenchmarkJobManager implements Closeable { } } - // If there is a task being processed, add all cached jobs to it and wrap it up + // If there is a task being processed, add all cached jobs to it and wrap + // it up if (null != processedTask) { processedTask.setJobs(jobs); identifiedTasks.add(processedTask); } - } catch (IOException e) { + } + catch (IOException e) { log.error(e.getMessage(), e); return; } // Order tasks chronologically - List<String> chronologicList = BENCHMARK_TASK_NAME_MAP.keySet().stream().collect(Collectors.toList()); - Collections.sort(identifiedTasks, Comparator.comparingInt(t -> chronologicList.indexOf(t.getName()))); + List<String> chronologicList = BENCHMARK_TASK_NAME_MAP.keySet().stream() + .collect(Collectors.toList()); + Collections.sort(identifiedTasks, Comparator.comparingInt( + t -> chronologicList.indexOf(t.getName()))); FileWriter fileWriter = null; try { - fileWriter = new FileWriter( - filename.getParent().toString() + Constants.FORWARD_SLASH + Constants.STATISTICS_SUMMARY_FILENAME); - fileWriter.append(Constants.SUMMARY_FILE_HEADER).append(Constants.NEW_LINE_SEPARATOR); + fileWriter = new FileWriter(filename.getParent().toString() + + Constants.FORWARD_SLASH + Constants.STATISTICS_SUMMARY_FILENAME); + fileWriter.append(Constants.SUMMARY_FILE_HEADER).append( + Constants.NEW_LINE_SEPARATOR); for (ResultFileTask task : identifiedTasks) { - fileWriter.append(Constants.BENCHMARK_TASK_NAME_MAP.get(task.getName())).append(Constants.DELIMITER); - fileWriter.append(Double.toString(task.getAverageMemoryUsage())).append(Constants.DELIMITER); - fileWriter.append(Double.toString(task.getAverageWallTime())).append(Constants.DELIMITER); - fileWriter.append(Double.toString(task.getMaximumWallTime())).append(Constants.DELIMITER); - fileWriter.append(Double.toString(task.getTotalTime())).append(Constants.DELIMITER); + fileWriter.append(Constants.BENCHMARK_TASK_NAME_MAP.get(task.getName())) + .append(Constants.DELIMITER); + fileWriter.append(Double.toString(task.getAverageMemoryUsage())).append( + Constants.DELIMITER); + fileWriter.append(Double.toString(task.getAverageWallTime())).append( + Constants.DELIMITER); + fileWriter.append(Double.toString(task.getMaximumWallTime())).append( + Constants.DELIMITER); + fileWriter.append(Double.toString(task.getTotalTime())).append( + Constants.DELIMITER); fileWriter.append(Integer.toString(task.getJobCount())); fileWriter.append(Constants.NEW_LINE_SEPARATOR); } Double pipelineStart = identifiedTasks.stream() // - .mapToDouble(t -> t.getEarliestStartInSeconds()).min().getAsDouble(); + .mapToDouble(t -> t.getEarliestStartInSeconds()).min().getAsDouble(); Double pipelineEnd = identifiedTasks.stream() // - .mapToDouble(t -> t.getLatestEndInSeconds()).max().getAsDouble(); + .mapToDouble(t -> t.getLatestEndInSeconds()).max().getAsDouble(); fileWriter.append(Constants.NEW_LINE_SEPARATOR); fileWriter.append("Pipeline duration: " + (pipelineEnd - pipelineStart)); - } catch (Exception e) { + } + catch (Exception e) { log.error(e.getMessage(), e); - } finally { + } + finally { try { if (fileWriter != null) { fileWriter.flush(); fileWriter.close(); } - } catch (Exception e) { + } + catch (Exception e) { log.error(e.getMessage(), e); } } @@ -618,7 +670,8 @@ public class BenchmarkJobManager implements Closeable { } private boolean canUpload(Job job, Path p) { - return job.getInputDirectory() == null || !p.equals(job.getInputDirectory().resolve(Constants.CONFIG_YAML)); + return job.getInputDirectory() == null || !p.equals(job.getInputDirectory() + .resolve(Constants.CONFIG_YAML)); } private BenchmarkJob convertJob(Job job) { @@ -626,52 +679,57 @@ public class BenchmarkJobManager implements Closeable { } private static JobSettings getJobSettings() { - return new JobSettingsBuilder().jobName(Constants.HAAS_JOB_NAME) - .clusterNodeType(Constants.HAAS_CLUSTER_NODE_TYPE).templateId(Constants.HAAS_TEMPLATE_ID) - .walltimeLimit(Constants.HAAS_TIMEOUT).numberOfCoresPerNode(Constants.CORES_PER_NODE).build(); + return new JobSettingsBuilder().jobName(HAAS_JOB_NAME).clusterNodeType( + getHaasClusterNodeType()).templateId(getHaasTemplateID()).walltimeLimit( + getWalltime()).numberOfCoresPerNode(CORES_PER_NODE).build(); } - static private Predicate<String> downloadFileNameExtractDecorator(Predicate<String> decorated) { + static private Predicate<String> downloadFileNameExtractDecorator( + Predicate<String> decorated) + { return name -> { Path path = getPathSafely(name); - if (path == null) - return false; + if (path == null) return false; String fileName = path.getFileName().toString(); return decorated.test(fileName); }; } - - static private Predicate<String> downloadCSVDecorator(Predicate<String> decorated) { + + static private Predicate<String> downloadCSVDecorator( + Predicate<String> decorated) + { return name -> { - if(name.toLowerCase().endsWith(".csv")) { + if (name.toLowerCase().endsWith(".csv")) { return true; } return decorated.test(name); }; - + } static private Predicate<String> downloadFailedData() { return name -> { Path path = getPathSafely(name); - if (path == null) - return false; - return path.getFileName().toString().startsWith("snakejob.") - || path.getParent() != null && path.getParent().getFileName() != null - && path.getParent().getFileName().toString().equals("logs"); + if (path == null) return false; + return path.getFileName().toString().startsWith("snakejob.") || path + .getParent() != null && path.getParent().getFileName() != null && path + .getParent().getFileName().toString().equals("logs"); }; } private static Path getPathSafely(String name) { try { return Paths.get(name); - } catch (InvalidPathException ex) { + } + catch (InvalidPathException ex) { return null; } } - private static HaaSClientSettings constructSettingsFromParams(BenchmarkSPIMParameters params) { + private static HaaSClientSettings constructSettingsFromParams( + BenchmarkSPIMParameters params) + { return new HaaSClientSettings() { @Override @@ -681,7 +739,7 @@ public class BenchmarkJobManager implements Closeable { @Override public String getProjectId() { - return Constants.HAAS_PROJECT_ID; + return Configuration.getHaasProjectID(); } @Override diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/Configuration.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/Configuration.java new file mode 100644 index 0000000000000000000000000000000000000000..512af97948fe96183db83328665d4eaec4ae9e29 --- /dev/null +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/Configuration.java @@ -0,0 +1,82 @@ + +package cz.it4i.fiji.haas_spim_benchmark.core; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Optional; +import java.util.Properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Configuration { + + private final static Logger log = LoggerFactory.getLogger( + Configuration.class); + + private static Properties properties; + + private static int HAAS_UPDATE_TIMEOUT = 30000; + private static int HAAS_CLUSTER_NODE_TYPE = 7; + private static int HAAS_TEMPLATE_ID = 4; + private static String HAAS_PROJECT_ID = "DD-18-42"; + private static int WALLTIME = 3600; // Walltime in seconds + private static String BDS_ADDRESS = "http://julius2.it4i.cz/"; + + private static Properties getProperties() throws IOException { + if (properties == null) { + properties = new Properties(); + try (InputStream is = Configuration.class.getClassLoader() + .getResourceAsStream("hpc_wm_configuration.propertie")) + { + properties.load(is); + } + } + return properties; + } + + public static int getHaasUpdateTimeout() { + return getOrDefault("haas_update_timeout", HAAS_UPDATE_TIMEOUT); + } + + public static int getHaasClusterNodeType() { + return getOrDefault("haas_cluster_node_type", HAAS_CLUSTER_NODE_TYPE); + } + + public static int getHaasTemplateID() { + return getOrDefault("haas_template_ID", HAAS_TEMPLATE_ID); + } + + public static int getWalltime() { + return getOrDefault("walltime", WALLTIME); + } + + public static String getHaasProjectID() { + return getOrDefault("haas_project_ID", HAAS_PROJECT_ID); + } + + public static String getBDSAddress() { + return getOrDefault("bds_address", BDS_ADDRESS); + } + + private static String getOrDefault(String name, String def) { + try { + return Optional.ofNullable(getProperties().getProperty(name)).orElse(def); + } + catch (IOException e) { + log.error(e.getMessage(), e); + return def; + } + } + + private static int getOrDefault(String name, int def) { + try { + return Optional.ofNullable(getProperties().getProperty(name)).map( + val -> Integer.parseInt(val)).orElse(def); + } + catch (IOException e) { + log.error(e.getMessage(), e); + return def; + } + } +} diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/Constants.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/Constants.java index dd5df44db5aa870aad65cf94e2cb15ae41a1378f..5241863580d862d14873c4d7ec92cda04ea551ae 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/Constants.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/Constants.java @@ -1,24 +1,20 @@ + package cz.it4i.fiji.haas_spim_benchmark.core; import java.util.LinkedHashMap; import java.util.Map; public interface Constants { - + String MENU_ITEM_NAME = "Multiview Reconstruction"; String SUBMENU_ITEM_NAME = "SPIM Workflow Manager for HPC"; - + String PHONE = "123456789"; - int HAAS_UPDATE_TIMEOUT = 30000; + short UI_TO_HAAS_FREQUENCY_UPDATE_RATIO = 10; String HAAS_JOB_NAME = "HaaSSPIMBenchmark"; - int HAAS_CLUSTER_NODE_TYPE = 7; - int HAAS_TEMPLATE_ID = 4; - String HAAS_PROJECT_ID = "DD-18-42"; - int HAAS_TIMEOUT = 3600; //Walltime in seconds long WAIT_FOR_SUBMISSION_TIMEOUT = 100; - String BDS_ADDRESS = "http://julius2.it4i.cz/"; - + final String NEW_LINE_SEPARATOR = "\n"; final String DELIMITER = ";"; final String FORWARD_SLASH = "/"; @@ -28,28 +24,31 @@ public interface Constants { String CONFIG_YAML = "config.yaml"; String HDF5_XML_FILENAME = "hdf5_xml_filename"; String FUSION_SWITCH = "fusion_switch"; - - + // This map is considered as ground truth for chronologic task sorting - Map<String, String> BENCHMARK_TASK_NAME_MAP = new LinkedHashMap<String, String>() { - private static final long serialVersionUID = 1L; + Map<String, String> BENCHMARK_TASK_NAME_MAP = + new LinkedHashMap<String, String>() { - put("define_xml_czi", "Define dataset"); - put("define_xml_tif", "Define dataset"); - put("hdf5_xml", "Define hdf5 dataset"); - put("resave_hdf5", "Resave to hdf5"); - put("registration", "Detection and registration"); - put("xml_merge", "Merge xml"); - put("timelapse", "Time lapse registration"); - put("fusion", "Average fusion"); - put("external_transform", "External transformation"); - put("deconvolution", "Deconvolution GPU"); - put("define_output", "Define output"); - put("hdf5_xml_output", "Define hdf5 output"); - put("resave_hdf5_output", "Resave output to hdf5"); - put(DONE_TASK, "Done"); - }}; - + + private static final long serialVersionUID = 1L; + { + put("define_xml_czi", "Define dataset"); + put("define_xml_tif", "Define dataset"); + put("hdf5_xml", "Define hdf5 dataset"); + put("resave_hdf5", "Resave to hdf5"); + put("registration", "Detection and registration"); + put("xml_merge", "Merge xml"); + put("timelapse", "Time lapse registration"); + put("fusion", "Average fusion"); + put("external_transform", "External transformation"); + put("deconvolution", "Deconvolution GPU"); + put("define_output", "Define output"); + put("hdf5_xml_output", "Define hdf5 output"); + put("resave_hdf5_output", "Resave output to hdf5"); + put(DONE_TASK, "Done"); + } + }; + String STATISTICS_TASK_NAME = "Task name"; String STATISTICS_JOB_IDS = "job ids"; String STATISTICS_JOB_COUNT = "jobs #"; @@ -57,13 +56,13 @@ public interface Constants { String STATISTICS_RESOURCES_WALL_TIME = "resources_used.walltime"; String STATISTICS_RESOURCES_CPU_PERCENTAGE = "resources_used.cpupercent"; String STATISTICS_RESOURCES_START_TIME = "stime"; - + String BENCHMARK_RESULT_FILE = "benchmark_result.csv"; String STATISTICS_SUMMARY_FILENAME = "summary.csv"; - String SUMMARY_FILE_HEADER = "Task;AvgMemoryUsage;AvgWallTime;MaxWallTime;TotalTime;JobCount"; + String SUMMARY_FILE_HEADER = + "Task;AvgMemoryUsage;AvgWallTime;MaxWallTime;TotalTime;JobCount"; String DONE_TASK = "done"; int CORES_PER_NODE = 24; String DEMO_DATA_SIGNAL_FILE_NAME = "demodata"; - - + } diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/HaasOutputObservableValueRegistry.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/HaasOutputObservableValueRegistry.java index 5bd21b6fb5bd92ad32eab74fb2e9123362a4e2fc..6ec4749657e9b43124e638161564cb04eec44e01 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/HaasOutputObservableValueRegistry.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/HaasOutputObservableValueRegistry.java @@ -1,6 +1,11 @@ package cz.it4i.fiji.haas_spim_benchmark.core; +import static cz.it4i.fiji.haas_java_client.SynchronizableFileType.StandardErrorFile; +import static cz.it4i.fiji.haas_java_client.SynchronizableFileType.StandardOutputFile; +import static cz.it4i.fiji.haas_spim_benchmark.core.Configuration.getHaasUpdateTimeout; +import static cz.it4i.fiji.haas_spim_benchmark.core.Constants.UI_TO_HAAS_FREQUENCY_UPDATE_RATIO; + import com.google.common.collect.Streams; import java.io.Closeable; @@ -26,10 +31,8 @@ class HaasOutputObservableValueRegistry implements Closeable { public HaasOutputObservableValueRegistry(final BenchmarkJob job) { this.job = job; - this.observableValues.put(SynchronizableFileType.StandardOutputFile, - createObservableValue()); - this.observableValues.put(SynchronizableFileType.StandardErrorFile, - createObservableValue()); + this.observableValues.put(StandardOutputFile, createObservableValue()); + this.observableValues.put(StandardErrorFile, createObservableValue()); } @Override @@ -83,8 +86,7 @@ class HaasOutputObservableValueRegistry implements Closeable { (type, value) -> (Runnable) (() -> observableValues.get(type) .update(value))).forEach(r -> r.run()); } - }, 0, Constants.HAAS_UPDATE_TIMEOUT / - Constants.UI_TO_HAAS_FREQUENCY_UPDATE_RATIO); + }, 0, getHaasUpdateTimeout() / UI_TO_HAAS_FREQUENCY_UPDATE_RATIO); isRunning = true; } else if (isRunning && !anyListeners) { diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/SnakemakeOutputHelper.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/SnakemakeOutputHelper.java index 95bfbe29834f589d6b987bd78fa122ff75e3a96d..303b9735f0e97c3d0aff09955e4edbac365ad476 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/SnakemakeOutputHelper.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/SnakemakeOutputHelper.java @@ -1,8 +1,10 @@ package cz.it4i.fiji.haas_spim_benchmark.core; +import static cz.it4i.fiji.haas_java_client.SynchronizableFileType.StandardErrorFile; +import static cz.it4i.fiji.haas_java_client.SynchronizableFileType.StandardOutputFile; +import static cz.it4i.fiji.haas_spim_benchmark.core.Configuration.getHaasUpdateTimeout; import static cz.it4i.fiji.haas_spim_benchmark.core.Constants.BENCHMARK_TASK_NAME_MAP; -import static cz.it4i.fiji.haas_spim_benchmark.core.Constants.HAAS_UPDATE_TIMEOUT; import static cz.it4i.fiji.haas_spim_benchmark.core.Constants.UI_TO_HAAS_FREQUENCY_UPDATE_RATIO; import java.util.ArrayList; @@ -30,8 +32,7 @@ class SnakemakeOutputHelper implements HaaSOutputHolder { private final List<BenchmarkError> nonTaskSpecificErrors; private int processedOutputLength; - public SnakemakeOutputHelper(final Job job) - { + public SnakemakeOutputHelper(final Job job) { this.job = job; this.computationAccessor = createComputationAccessor(); this.tasks = new ArrayList<>(); @@ -98,9 +99,8 @@ class SnakemakeOutputHelper implements HaaSOutputHolder { }; result = new SPIMComputationAccessorDecoratorWithTimeout(result, - new HashSet<>(Arrays.asList(SynchronizableFileType.StandardOutputFile, - SynchronizableFileType.StandardErrorFile)), HAAS_UPDATE_TIMEOUT / - UI_TO_HAAS_FREQUENCY_UPDATE_RATIO); + new HashSet<>(Arrays.asList(StandardOutputFile, StandardErrorFile)), + getHaasUpdateTimeout() / UI_TO_HAAS_FREQUENCY_UPDATE_RATIO); return result; } diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/TaskObservableValueRegistry.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/TaskObservableValueRegistry.java index 31fe631a51ce61f3e519d842ab0caa773b3f10ea..1d65c994d163067d4ee3f1cec190de144cdbcd70 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/TaskObservableValueRegistry.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/TaskObservableValueRegistry.java @@ -1,6 +1,9 @@ package cz.it4i.fiji.haas_spim_benchmark.core; +import static cz.it4i.fiji.haas_spim_benchmark.core.Configuration.getHaasUpdateTimeout; +import static cz.it4i.fiji.haas_spim_benchmark.core.Constants.UI_TO_HAAS_FREQUENCY_UPDATE_RATIO; + import java.io.Closeable; import java.util.ArrayList; import java.util.List; @@ -49,8 +52,8 @@ class TaskObservableValueRegistry implements Closeable { synchronized (TaskObservableValueRegistry.this) { if (timer != null) { - timer.schedule(new L_TimerTask(), Constants.HAAS_UPDATE_TIMEOUT / - Constants.UI_TO_HAAS_FREQUENCY_UPDATE_RATIO); + timer.schedule(new L_TimerTask(), getHaasUpdateTimeout() / + UI_TO_HAAS_FREQUENCY_UPDATE_RATIO); } } } diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMControl.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMControl.java index b94cdf5d3b4f42f5b9e56aab8cc50bdeaa21ebba..4f07c494a484d4dccbd79370f8247f2ac268d270 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMControl.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/BenchmarkSPIMControl.java @@ -1,6 +1,7 @@ package cz.it4i.fiji.haas_spim_benchmark.ui; +import static cz.it4i.fiji.haas_spim_benchmark.core.Configuration.getHaasUpdateTimeout; import static cz.it4i.fiji.haas_spim_benchmark.core.Constants.CONFIG_YAML; import java.awt.Window; @@ -94,14 +95,15 @@ public class BenchmarkSPIMControl extends BorderPane implements private final JobStateNameProvider provider = new JobStateNameProvider(); private boolean closed; - + private static Logger log = LoggerFactory.getLogger( cz.it4i.fiji.haas_spim_benchmark.ui.BenchmarkSPIMControl.class); public BenchmarkSPIMControl(BenchmarkJobManager manager) { this.manager = manager; JavaFXRoutines.initRootAndController("BenchmarkSPIM.fxml", this); - jobs.setPlaceholder(new Label("No content in table. Right click to create new one.")); + jobs.setPlaceholder(new Label( + "No content in table. Right click to create new one.")); } @Override @@ -114,13 +116,14 @@ public class BenchmarkSPIMControl extends BorderPane implements initMenu(); boolean result = checkConnection(); synchronized (this) { - if(result && !closed) { + if (result && !closed) { timer.scheduleAtFixedRate(new TimerTask() { + @Override public void run() { updateJobs(false); } - }, Constants.HAAS_UPDATE_TIMEOUT, Constants.HAAS_UPDATE_TIMEOUT); + }, getHaasUpdateTimeout(), getHaasUpdateTimeout()); updateJobs(true); } } @@ -300,15 +303,16 @@ public class BenchmarkSPIMControl extends BorderPane implements } private boolean checkConnection() { - boolean [] result = {false}; - Progress progress = ModalDialogs.doModal(new ProgressDialog( - root, "Connecting to HPC"), WindowConstants.DO_NOTHING_ON_CLOSE); + boolean[] result = { false }; + Progress progress = ModalDialogs.doModal(new ProgressDialog(root, + "Connecting to HPC"), WindowConstants.DO_NOTHING_ON_CLOSE); final CountDownLatch latch = new CountDownLatch(1); executorServiceWS.execute(() -> { try { manager.checkConnection(); result[0] = true; - } finally { + } + finally { progress.done(); latch.countDown(); } @@ -321,7 +325,7 @@ public class BenchmarkSPIMControl extends BorderPane implements } return result[0]; } - + private void updateJobs(boolean showProgress) { Progress progress = showProgress ? ModalDialogs.doModal(new ProgressDialog( root, "Updating jobs"), WindowConstants.DO_NOTHING_ON_CLOSE) diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/TaskComputationAdapter.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/TaskComputationAdapter.java index cb3258eb9b9d16a47f4215dbed4caa5096f7de29..a0d178194a6f2924f9cf78280a47d6d0880187f3 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/TaskComputationAdapter.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/TaskComputationAdapter.java @@ -1,5 +1,8 @@ + package cz.it4i.fiji.haas_spim_benchmark.ui; +import static cz.it4i.fiji.haas_spim_benchmark.core.Configuration.getHaasUpdateTimeout; + import java.io.Closeable; import java.util.LinkedList; import java.util.List; @@ -11,7 +14,6 @@ import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import cz.it4i.fiji.haas_spim_benchmark.core.Constants; import cz.it4i.fiji.haas_spim_benchmark.core.SimpleObservableValue; import cz.it4i.fiji.haas_spim_benchmark.core.TaskComputation; import cz.it4i.fiji.haas_spim_benchmark.core.TaskComputation.Log; @@ -20,12 +22,13 @@ import javafx.beans.value.ObservableValueBase; public class TaskComputationAdapter implements Closeable { - public final static Logger log = LoggerFactory - .getLogger(cz.it4i.fiji.haas_spim_benchmark.ui.TaskComputationAdapter.class); + public final static Logger log = LoggerFactory.getLogger( + cz.it4i.fiji.haas_spim_benchmark.ui.TaskComputationAdapter.class); private final TaskComputation computation; - private final List<ObservableValue<RemoteFileInfo>> outputs = new LinkedList<>(); + private final List<ObservableValue<RemoteFileInfo>> outputs = + new LinkedList<>(); private final List<ObservableLog> logs = new LinkedList<>(); @@ -38,14 +41,17 @@ public class TaskComputationAdapter implements Closeable { public void init() { Map<String, Long> sizes = computation.getOutFileSizes(); - computation.getOutputs().forEach(outputFile -> addOutputFile(outputFile, sizes.get(outputFile))); - computation.getLogs().forEach(decoratedLog -> logs.add(new ObservableLog(decoratedLog))); + computation.getOutputs().forEach(outputFile -> addOutputFile(outputFile, + sizes.get(outputFile))); + computation.getLogs().forEach(decoratedLog -> logs.add(new ObservableLog( + decoratedLog))); synchronized (this) { - if(timer != null) { - timer.schedule(new P_TimerTask(), Constants.HAAS_TIMEOUT, Constants.HAAS_TIMEOUT); + if (timer != null) { + timer.schedule(new P_TimerTask(), getHaasUpdateTimeout(), + getHaasUpdateTimeout()); } } - + } @Override @@ -57,7 +63,7 @@ public class TaskComputationAdapter implements Closeable { public List<ObservableValue<RemoteFileInfo>> getOutputs() { return outputs; } - + public List<ObservableLog> getLogs() { return logs; } @@ -66,29 +72,29 @@ public class TaskComputationAdapter implements Closeable { outputs.add(new ObservableOutputFile(outputFile, size)); } - public static class ObservableLog { - + public static class ObservableLog { + private final String name; - + private final SimpleObservableValue<String> value; public ObservableLog(final Log content) { this.value = new SimpleObservableValue<>(content.getContent()); this.name = content.getName(); } - + public String getName() { return name; } - + public ObservableValue<String> getContent() { return value; } - + public void setContentValue(Log log) { if (!getName().equals(log.getName())) { - throw new IllegalArgumentException( - "this.name=" + getName() + ", log.name=" + log.getName()); + throw new IllegalArgumentException("this.name=" + getName() + + ", log.name=" + log.getName()); } value.update(log.getContent()); } @@ -129,7 +135,9 @@ public class TaskComputationAdapter implements Closeable { public void setSize(Long newValue) { Long oldValue = size; size = newValue; - if (oldValue != newValue && oldValue != null && !oldValue.equals(newValue)) { + if (oldValue != newValue && oldValue != null && !oldValue.equals( + newValue)) + { fireValueChangedEvent(); } } @@ -140,11 +148,13 @@ public class TaskComputationAdapter implements Closeable { @Override public void run() { Map<String, Long> sizes = computation.getOutFileSizes(); - Map<String, Log> computationLogs = computation.getLogs().stream() - .collect(Collectors.<Log, String, Log>toMap((Log l) -> l.getName(), (Log l) -> l)); - TaskComputationAdapter.this.logs - .forEach(processedLog -> processedLog.setContentValue(computationLogs.get(processedLog.getName()))); - outputs.forEach(value -> ((ObservableOutputFile) value).setSize(sizes.get(value.getValue().getName()))); + Map<String, Log> computationLogs = computation.getLogs().stream().collect( + Collectors.<Log, String, Log> toMap((Log l) -> l.getName(), ( + Log l) -> l)); + TaskComputationAdapter.this.logs.forEach(processedLog -> processedLog + .setContentValue(computationLogs.get(processedLog.getName()))); + outputs.forEach(value -> ((ObservableOutputFile) value).setSize(sizes.get( + value.getValue().getName()))); } } diff --git a/haas-spim-benchmark/src/main/resources/hpc_wm_configuration.propertie b/haas-spim-benchmark/src/main/resources/hpc_wm_configuration.propertie new file mode 100644 index 0000000000000000000000000000000000000000..739f0a75c141d3f6d0eb1e2c66e5f2c1d66c62cb --- /dev/null +++ b/haas-spim-benchmark/src/main/resources/hpc_wm_configuration.propertie @@ -0,0 +1,13 @@ +#miliseconds +haas_update_timeout=30000 + +haas_project_ID=open-15-12 + +haas_cluster_node_type=7 + +haas_template_ID=4 + +#seconds +walltime=3600 + +bds_address=http://julius2.it4i.cz/