diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java index dadc29e2bf1132f0b81a2c78d38dbdd811612dfc..2c9e4317ea1352d4374240d7d0816769ecc86065 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/BenchmarkJobManager.java @@ -63,6 +63,8 @@ public class BenchmarkJobManager { private SPIMComputationAccessor computationAccessor; + private int processedOutputLength = 0; + public BenchmarkJob(Job job) { this.job = job; @@ -168,6 +170,8 @@ public class BenchmarkJobManager { if (tasks == null) { fillTasks(); } + // Carry on in output processing + processOutput(); return tasks; } @@ -200,6 +204,38 @@ public class BenchmarkJobManager { // Order tasks chronologically List<String> chronologicList = STATISTICS_TASK_NAME_MAP.keySet().stream().collect(Collectors.toList()); Collections.sort(tasks, Comparator.comparingInt(task -> chronologicList.indexOf(task.getDescription()))); + + + } + + private void processOutput() { + + final String OUTPUT_PARSING_RULE = "rule "; + final String OUTPUT_PARSING_COLON = ":"; + + String output = computationAccessor.getActualOutput().substring(processedOutputLength); + int outputLengthToBeProcessed = output.length(); + + int ruleRelativeIndex = -1; + int colonRelativeIndex = -1; + while (true) { + + ruleRelativeIndex = output.indexOf(OUTPUT_PARSING_RULE, colonRelativeIndex); + colonRelativeIndex = output.indexOf(OUTPUT_PARSING_COLON, ruleRelativeIndex); + + if (ruleRelativeIndex == -1 || colonRelativeIndex == -1) { + break; + } + + String taskDescription = output.substring(ruleRelativeIndex + OUTPUT_PARSING_RULE.length(), colonRelativeIndex); + List<Task> task = tasks.stream().filter(t -> t.getDescription().equals(taskDescription)).collect(Collectors.toList()); + if (1 == task.size()) { + // TODO: Consider throwing an exception + task.get(0).populateTaskComputationParameters(processedOutputLength + ruleRelativeIndex); + } + } + + processedOutputLength = processedOutputLength + outputLengthToBeProcessed; } private void setDownloaded(boolean b) { diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/SPIMComputationAccessor.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/SPIMComputationAccessor.java index b1dc0c1610248f7a639bc8106958db13622d74e7..d0283db80c1ae69815923886723074ecf57bf31b 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/SPIMComputationAccessor.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/SPIMComputationAccessor.java @@ -5,8 +5,11 @@ import java.util.Collection; import cz.it4i.fiji.haas.HaaSOutputHolder; public interface SPIMComputationAccessor extends HaaSOutputHolder { + + final String FILE_SEPARATOR_UNIX = "/"; + default boolean fileExists(String fileName) { - return getChangedFiles().contains(fileName); + return getChangedFiles().contains(FILE_SEPARATOR_UNIX + fileName); } Collection<String> getChangedFiles(); diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/Task.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/Task.java index 38f42ad8a8f74d6a6172399c814fd2852e1445b4..0f9079e0af8763bd59475b383c72685e422d1c89 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/Task.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/Task.java @@ -1,31 +1,57 @@ package cz.it4i.fiji.haas_spim_benchmark.core; +import java.util.Comparator; import java.util.LinkedList; import java.util.List; +import cz.it4i.fiji.haas_java_client.JobState; + public class Task { + private final String description; private final List<TaskComputation> computations; - public Task(SPIMComputationAccessor outputHolder, String description, int numComputations) { + public Task(SPIMComputationAccessor computationAccessor, String description, int numOfExpectedComputations) { this.description = description; - this.computations = new LinkedList<>(); - - for (int i = 0; i < numComputations; i++) { - computations.add(new TaskComputation(outputHolder, this, i + 1)); + this.computations = new LinkedList<>(); + for (int i = 0; i < numOfExpectedComputations; i++) { + computations.add(new TaskComputation(computationAccessor, i + 1)); + } + } + + /** + * Looks up next task computation (i.e. with lowest timepoint) of the current task and populates its parameters + * @param positionInOutput: Index of the output position to search from + * @return success flag + */ + public boolean populateTaskComputationParameters(int positionInOutput) { + TaskComputation tc = computations.stream() + .filter(c -> c.getState().equals(JobState.Unknown)) + .min(Comparator.comparingInt(c -> c.getTimepoint())) + .get(); + if (null == tc) { + return false; } + return tc.populateParameters(positionInOutput); } + /** + * @return task description + */ public String getDescription() { return description; } + /** + * @return list of task computations + */ public List<TaskComputation> getComputations() { return computations; } - + + // TODO: Method stub public void update() { - // TODO Auto-generated method stub - + } + } diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/TaskComputation.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/TaskComputation.java index 82fd4bce4a05f7096db730aa6a5fd5c00d154ba4..c609a02b77507804b5949b2ac6a3be717a8d2f37 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/TaskComputation.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/core/TaskComputation.java @@ -2,146 +2,149 @@ package cz.it4i.fiji.haas_spim_benchmark.core; import java.util.Arrays; import java.util.Collection; -import java.util.LinkedList; import java.util.Scanner; import cz.it4i.fiji.haas_java_client.JobState; public class TaskComputation { - - // A single-purpose class dedicated to help with TaskComputation members initialization - private class ParsedTaskComputationValues { - private final Collection<String> logs; - private final Collection<String> inputs; - private final Collection<String> outputs; - private final Long id; - - public ParsedTaskComputationValues(Collection<String> inputs, Collection<String> outputs, Collection<String> logs, Long id) { - this.inputs = inputs; - this.outputs = outputs; - this.logs = logs; - this.id = id; - } - } private final SPIMComputationAccessor computationAccessor; - private final Task task; private final int timepoint; - private final Collection<String> inputs; - private final Collection<String> outputs; - private final Collection<String> logs; - private final Long id; - - //TASK 1011 what states will be defined and how it will be defined - private JobState state = JobState.Unknown; - - public TaskComputation(SPIMComputationAccessor computationAccessor, Task task, int timepoint) { + + private JobState state; + private int positionInOutput; + + private Collection<String> inputs; + @SuppressWarnings("unused") + private Collection<String> outputs; + private Collection<String> logs; + private Long id; + + /** + * Creates a TaskComputation object Note: At the time of creation, the job + * parameters are not populated + */ + public TaskComputation(SPIMComputationAccessor computationAccessor, int timepoint) { this.computationAccessor = computationAccessor; - this.task = task; this.timepoint = timepoint; - ParsedTaskComputationValues parsedValues = parseStuff(computationAccessor); - this.inputs = parsedValues.inputs; - this.outputs = parsedValues.outputs; - this.logs = parsedValues.logs; - this.id = parsedValues.id; + this.state = JobState.Unknown; updateState(); } + /** + * @return current job state + */ public JobState getState() { - updateState();//TASK 1011 it is not good idea update every time when state is requested + updateState(); return state; } - - public void update() { - } + /** + * @return job timepoint + */ public int getTimepoint() { return timepoint; } - private void updateState() { - //TASK 1011 This should never happen, add some error handling to resolveId() - if (id == null) { - state = JobState.Unknown; - return; + /** + * @return job id + */ + public Long getId() { + return id; + } + + // TODO: Method stub + public void update() { + + } + + /** + * Populates parameters of the current object by searching the output + * + * @param positionInOutput: + * Index of the output position to search from + * @return success flag + */ + public boolean populateParameters(int positionInOutput) { + + // Should the state be different than unknown, there is no need to populate + // parameters + if (state != JobState.Unknown) { + return false; } - + + this.positionInOutput = positionInOutput; + if (!resolveJobParameters()) { + return false; + } + state = JobState.Queued; - - // Check whether a log file exists - if (!logs.stream().anyMatch(logFile -> computationAccessor.fileExists(logFile))) { - return; + updateState(); + + return true; + } + + private void updateState() { + + // Should the state be queued, try to find out whether a log file exists + if (state == JobState.Queued) { + if (null != logs && !logs.stream().anyMatch(logFile -> computationAccessor.fileExists(logFile))) { + return; // No log file exists yet + } + state = JobState.Running; } - - state = JobState.Running; - - // Check whether the corresponding job has finished - final String OUTPUT_PARSING_FINISHED_JOB = "Finished job "; - final String desiredPatternFinishedJob = OUTPUT_PARSING_FINISHED_JOB + id.toString(); - final String OUTPUT_PARSING_ERRONEOUS_JOB = "Error job "; - final String desiredPatternErroneousJob = OUTPUT_PARSING_ERRONEOUS_JOB + id.toString(); - String currentLine; - Scanner scanner = new Scanner(computationAccessor.getActualOutput()); - while (scanner.hasNextLine()) { - currentLine = scanner.nextLine(); - if (currentLine.contains(desiredPatternErroneousJob)) { - state = JobState.Failed; - break; - } else if (currentLine.contains(desiredPatternFinishedJob)) { - state = JobState.Finished; - break; + + // Finally, look up any traces that the job has failed or finished + if (state == JobState.Running) { + + final String OUTPUT_PARSING_FINISHED_JOB = "Finished job "; + final String desiredPatternFinishedJob = OUTPUT_PARSING_FINISHED_JOB + id.toString(); + final String OUTPUT_PARSING_ERRONEOUS_JOB = "Error job "; + final String desiredPatternErroneousJob = OUTPUT_PARSING_ERRONEOUS_JOB + id.toString(); + + Scanner scanner = new Scanner(computationAccessor.getActualOutput().substring(positionInOutput)); + String currentLine; + while (scanner.hasNextLine()) { + currentLine = scanner.nextLine(); + if (currentLine.contains(desiredPatternErroneousJob)) { + state = JobState.Failed; + break; + } else if (currentLine.contains(desiredPatternFinishedJob)) { + state = JobState.Finished; + break; + } } + scanner.close(); } - scanner.close(); - - return; } - private Long getId() { - return id; - } + private boolean resolveJobParameters() { - private ParsedTaskComputationValues parseStuff(SPIMComputationAccessor outputHolder) { - - final String OUTPUT_PARSING_RULE = "rule "; - final String OUTPUT_PARSING_COLON = ":"; final String OUTPUT_PARSING_COMMA_SPACE = ", "; - final String desiredPattern = OUTPUT_PARSING_RULE + task.getDescription() + OUTPUT_PARSING_COLON; - final String OUTPUT_PARSING_INPUTS = "input: "; final String OUTPUT_PARSING_OUTPUTS = "output: "; final String OUTPUT_PARSING_LOGS = "log: "; final String OUTPUT_PARSING_JOB_ID = "jobid: "; - - Scanner scanner = new Scanner(outputHolder.getActualOutput()); - int jobsToSkip = timepoint; - while (scanner.hasNextLine() && jobsToSkip > 0) { - if (scanner.nextLine().equals(desiredPattern)) { - jobsToSkip--; - } - } - + + Scanner scanner = new Scanner(computationAccessor.getActualOutput().substring(positionInOutput)); String currentLine; - Collection<String> resolvedInputs = new LinkedList<>(); - Collection<String> resolvedOutputs = new LinkedList<>(); - Collection<String> resolvedLogs = new LinkedList<>(); - Long resolvedId = null; while (scanner.hasNextLine()) { currentLine = scanner.nextLine(); if (currentLine.contains(OUTPUT_PARSING_INPUTS)) { - resolvedInputs = Arrays.asList(currentLine.split(OUTPUT_PARSING_INPUTS)[1].split(OUTPUT_PARSING_COMMA_SPACE)); + inputs = Arrays.asList(currentLine.split(OUTPUT_PARSING_INPUTS)[1].split(OUTPUT_PARSING_COMMA_SPACE)); } else if (currentLine.contains(OUTPUT_PARSING_OUTPUTS)) { - resolvedOutputs = Arrays.asList(currentLine.split(OUTPUT_PARSING_OUTPUTS)[1].split(OUTPUT_PARSING_COMMA_SPACE)); + outputs = Arrays.asList(currentLine.split(OUTPUT_PARSING_OUTPUTS)[1].split(OUTPUT_PARSING_COMMA_SPACE)); } else if (currentLine.contains(OUTPUT_PARSING_LOGS)) { - resolvedLogs = Arrays.asList(currentLine.split(OUTPUT_PARSING_LOGS)[1].split(OUTPUT_PARSING_COMMA_SPACE)); + logs = Arrays.asList(currentLine.split(OUTPUT_PARSING_LOGS)[1].split(OUTPUT_PARSING_COMMA_SPACE)); } else if (currentLine.contains(OUTPUT_PARSING_JOB_ID)) { - resolvedId = Long.parseLong(currentLine.split(OUTPUT_PARSING_JOB_ID)[1]); + id = Long.parseLong(currentLine.split(OUTPUT_PARSING_JOB_ID)[1]); } else if (currentLine.trim().isEmpty()) { - break; + break; } } scanner.close(); - - return new ParsedTaskComputationValues(resolvedInputs, resolvedOutputs, resolvedLogs, resolvedId); - } + + return !(inputs == null || id == null); + } + } diff --git a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/SPIMPipelineProgressViewController.java b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/SPIMPipelineProgressViewController.java index ea01b94fc6e387837bdd9165a2038372016d6d2d..c9fed7101d07c9a4dca9da4cfb52432cababbe19 100644 --- a/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/SPIMPipelineProgressViewController.java +++ b/haas-spim-benchmark/src/main/java/cz/it4i/fiji/haas_spim_benchmark/ui/SPIMPipelineProgressViewController.java @@ -91,8 +91,13 @@ public class SPIMPipelineProgressViewController implements FXFrame.Controller { } }, Constants.HAAS_UPDATE_TIMEOUT / Constants.UI_TO_HAAS_FREQUENCY_UPDATE_RATIO); } else { - List<TaskComputation> computations = tasks.stream().map(task -> task.getComputations()) - .collect(Collectors.<List<TaskComputation>>maxBy((a, b) -> a.size() - b.size())).get(); + + Optional<List<TaskComputation>> optional = tasks.stream().map(task -> task.getComputations()) + .collect(Collectors.<List<TaskComputation>>maxBy((a, b) -> a.size() - b.size())); + if(!optional.isPresent()) { + return; + } + List<TaskComputation> computations = optional.get(); int i = 0; FXFrame.Controller.setCellValueFactory(this.tasks, i++, (Function<Task, String>) v -> v.getDescription());