Skip to content
Snippets Groups Projects
Commit b42caff5 authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

Merge branch 'master' into iss1007-provideTaskInfos

parents 56f5c2b6 da74aec5
No related branches found
No related tags found
No related merge requests found
......@@ -63,6 +63,8 @@ public class BenchmarkJobManager {
private SPIMComputationAccessor computationAccessor;
private int processedOutputLength = 0;
public BenchmarkJob(Job job) {
this.job = job;
......@@ -168,6 +170,8 @@ public class BenchmarkJobManager {
if (tasks == null) {
fillTasks();
}
// Carry on in output processing
processOutput();
return tasks;
}
......@@ -200,6 +204,38 @@ public class BenchmarkJobManager {
// Order tasks chronologically
List<String> chronologicList = STATISTICS_TASK_NAME_MAP.keySet().stream().collect(Collectors.toList());
Collections.sort(tasks, Comparator.comparingInt(task -> chronologicList.indexOf(task.getDescription())));
}
private void processOutput() {
final String OUTPUT_PARSING_RULE = "rule ";
final String OUTPUT_PARSING_COLON = ":";
String output = computationAccessor.getActualOutput().substring(processedOutputLength);
int outputLengthToBeProcessed = output.length();
int ruleRelativeIndex = -1;
int colonRelativeIndex = -1;
while (true) {
ruleRelativeIndex = output.indexOf(OUTPUT_PARSING_RULE, colonRelativeIndex);
colonRelativeIndex = output.indexOf(OUTPUT_PARSING_COLON, ruleRelativeIndex);
if (ruleRelativeIndex == -1 || colonRelativeIndex == -1) {
break;
}
String taskDescription = output.substring(ruleRelativeIndex + OUTPUT_PARSING_RULE.length(), colonRelativeIndex);
List<Task> task = tasks.stream().filter(t -> t.getDescription().equals(taskDescription)).collect(Collectors.toList());
if (1 == task.size()) {
// TODO: Consider throwing an exception
task.get(0).populateTaskComputationParameters(processedOutputLength + ruleRelativeIndex);
}
}
processedOutputLength = processedOutputLength + outputLengthToBeProcessed;
}
private void setDownloaded(boolean b) {
......
......@@ -5,8 +5,11 @@ import java.util.Collection;
import cz.it4i.fiji.haas.HaaSOutputHolder;
public interface SPIMComputationAccessor extends HaaSOutputHolder {
final String FILE_SEPARATOR_UNIX = "/";
default boolean fileExists(String fileName) {
return getChangedFiles().contains(fileName);
return getChangedFiles().contains(FILE_SEPARATOR_UNIX + fileName);
}
Collection<String> getChangedFiles();
......
package cz.it4i.fiji.haas_spim_benchmark.core;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import cz.it4i.fiji.haas_java_client.JobState;
public class Task {
private final String description;
private final List<TaskComputation> computations;
public Task(SPIMComputationAccessor outputHolder, String description, int numComputations) {
public Task(SPIMComputationAccessor computationAccessor, String description, int numOfExpectedComputations) {
this.description = description;
this.computations = new LinkedList<>();
for (int i = 0; i < numComputations; i++) {
computations.add(new TaskComputation(outputHolder, this, i + 1));
this.computations = new LinkedList<>();
for (int i = 0; i < numOfExpectedComputations; i++) {
computations.add(new TaskComputation(computationAccessor, i + 1));
}
}
/**
* Looks up next task computation (i.e. with lowest timepoint) of the current task and populates its parameters
* @param positionInOutput: Index of the output position to search from
* @return success flag
*/
public boolean populateTaskComputationParameters(int positionInOutput) {
TaskComputation tc = computations.stream()
.filter(c -> c.getState().equals(JobState.Unknown))
.min(Comparator.comparingInt(c -> c.getTimepoint()))
.get();
if (null == tc) {
return false;
}
return tc.populateParameters(positionInOutput);
}
/**
* @return task description
*/
public String getDescription() {
return description;
}
/**
* @return list of task computations
*/
public List<TaskComputation> getComputations() {
return computations;
}
// TODO: Method stub
public void update() {
// TODO Auto-generated method stub
}
}
......@@ -2,146 +2,149 @@ package cz.it4i.fiji.haas_spim_benchmark.core;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Scanner;
import cz.it4i.fiji.haas_java_client.JobState;
public class TaskComputation {
// A single-purpose class dedicated to help with TaskComputation members initialization
private class ParsedTaskComputationValues {
private final Collection<String> logs;
private final Collection<String> inputs;
private final Collection<String> outputs;
private final Long id;
public ParsedTaskComputationValues(Collection<String> inputs, Collection<String> outputs, Collection<String> logs, Long id) {
this.inputs = inputs;
this.outputs = outputs;
this.logs = logs;
this.id = id;
}
}
private final SPIMComputationAccessor computationAccessor;
private final Task task;
private final int timepoint;
private final Collection<String> inputs;
private final Collection<String> outputs;
private final Collection<String> logs;
private final Long id;
//TASK 1011 what states will be defined and how it will be defined
private JobState state = JobState.Unknown;
public TaskComputation(SPIMComputationAccessor computationAccessor, Task task, int timepoint) {
private JobState state;
private int positionInOutput;
private Collection<String> inputs;
@SuppressWarnings("unused")
private Collection<String> outputs;
private Collection<String> logs;
private Long id;
/**
* Creates a TaskComputation object Note: At the time of creation, the job
* parameters are not populated
*/
public TaskComputation(SPIMComputationAccessor computationAccessor, int timepoint) {
this.computationAccessor = computationAccessor;
this.task = task;
this.timepoint = timepoint;
ParsedTaskComputationValues parsedValues = parseStuff(computationAccessor);
this.inputs = parsedValues.inputs;
this.outputs = parsedValues.outputs;
this.logs = parsedValues.logs;
this.id = parsedValues.id;
this.state = JobState.Unknown;
updateState();
}
/**
* @return current job state
*/
public JobState getState() {
updateState();//TASK 1011 it is not good idea update every time when state is requested
updateState();
return state;
}
public void update() {
}
/**
* @return job timepoint
*/
public int getTimepoint() {
return timepoint;
}
private void updateState() {
//TASK 1011 This should never happen, add some error handling to resolveId()
if (id == null) {
state = JobState.Unknown;
return;
/**
* @return job id
*/
public Long getId() {
return id;
}
// TODO: Method stub
public void update() {
}
/**
* Populates parameters of the current object by searching the output
*
* @param positionInOutput:
* Index of the output position to search from
* @return success flag
*/
public boolean populateParameters(int positionInOutput) {
// Should the state be different than unknown, there is no need to populate
// parameters
if (state != JobState.Unknown) {
return false;
}
this.positionInOutput = positionInOutput;
if (!resolveJobParameters()) {
return false;
}
state = JobState.Queued;
// Check whether a log file exists
if (!logs.stream().anyMatch(logFile -> computationAccessor.fileExists(logFile))) {
return;
updateState();
return true;
}
private void updateState() {
// Should the state be queued, try to find out whether a log file exists
if (state == JobState.Queued) {
if (null != logs && !logs.stream().anyMatch(logFile -> computationAccessor.fileExists(logFile))) {
return; // No log file exists yet
}
state = JobState.Running;
}
state = JobState.Running;
// Check whether the corresponding job has finished
final String OUTPUT_PARSING_FINISHED_JOB = "Finished job ";
final String desiredPatternFinishedJob = OUTPUT_PARSING_FINISHED_JOB + id.toString();
final String OUTPUT_PARSING_ERRONEOUS_JOB = "Error job ";
final String desiredPatternErroneousJob = OUTPUT_PARSING_ERRONEOUS_JOB + id.toString();
String currentLine;
Scanner scanner = new Scanner(computationAccessor.getActualOutput());
while (scanner.hasNextLine()) {
currentLine = scanner.nextLine();
if (currentLine.contains(desiredPatternErroneousJob)) {
state = JobState.Failed;
break;
} else if (currentLine.contains(desiredPatternFinishedJob)) {
state = JobState.Finished;
break;
// Finally, look up any traces that the job has failed or finished
if (state == JobState.Running) {
final String OUTPUT_PARSING_FINISHED_JOB = "Finished job ";
final String desiredPatternFinishedJob = OUTPUT_PARSING_FINISHED_JOB + id.toString();
final String OUTPUT_PARSING_ERRONEOUS_JOB = "Error job ";
final String desiredPatternErroneousJob = OUTPUT_PARSING_ERRONEOUS_JOB + id.toString();
Scanner scanner = new Scanner(computationAccessor.getActualOutput().substring(positionInOutput));
String currentLine;
while (scanner.hasNextLine()) {
currentLine = scanner.nextLine();
if (currentLine.contains(desiredPatternErroneousJob)) {
state = JobState.Failed;
break;
} else if (currentLine.contains(desiredPatternFinishedJob)) {
state = JobState.Finished;
break;
}
}
scanner.close();
}
scanner.close();
return;
}
private Long getId() {
return id;
}
private boolean resolveJobParameters() {
private ParsedTaskComputationValues parseStuff(SPIMComputationAccessor outputHolder) {
final String OUTPUT_PARSING_RULE = "rule ";
final String OUTPUT_PARSING_COLON = ":";
final String OUTPUT_PARSING_COMMA_SPACE = ", ";
final String desiredPattern = OUTPUT_PARSING_RULE + task.getDescription() + OUTPUT_PARSING_COLON;
final String OUTPUT_PARSING_INPUTS = "input: ";
final String OUTPUT_PARSING_OUTPUTS = "output: ";
final String OUTPUT_PARSING_LOGS = "log: ";
final String OUTPUT_PARSING_JOB_ID = "jobid: ";
Scanner scanner = new Scanner(outputHolder.getActualOutput());
int jobsToSkip = timepoint;
while (scanner.hasNextLine() && jobsToSkip > 0) {
if (scanner.nextLine().equals(desiredPattern)) {
jobsToSkip--;
}
}
Scanner scanner = new Scanner(computationAccessor.getActualOutput().substring(positionInOutput));
String currentLine;
Collection<String> resolvedInputs = new LinkedList<>();
Collection<String> resolvedOutputs = new LinkedList<>();
Collection<String> resolvedLogs = new LinkedList<>();
Long resolvedId = null;
while (scanner.hasNextLine()) {
currentLine = scanner.nextLine();
if (currentLine.contains(OUTPUT_PARSING_INPUTS)) {
resolvedInputs = Arrays.asList(currentLine.split(OUTPUT_PARSING_INPUTS)[1].split(OUTPUT_PARSING_COMMA_SPACE));
inputs = Arrays.asList(currentLine.split(OUTPUT_PARSING_INPUTS)[1].split(OUTPUT_PARSING_COMMA_SPACE));
} else if (currentLine.contains(OUTPUT_PARSING_OUTPUTS)) {
resolvedOutputs = Arrays.asList(currentLine.split(OUTPUT_PARSING_OUTPUTS)[1].split(OUTPUT_PARSING_COMMA_SPACE));
outputs = Arrays.asList(currentLine.split(OUTPUT_PARSING_OUTPUTS)[1].split(OUTPUT_PARSING_COMMA_SPACE));
} else if (currentLine.contains(OUTPUT_PARSING_LOGS)) {
resolvedLogs = Arrays.asList(currentLine.split(OUTPUT_PARSING_LOGS)[1].split(OUTPUT_PARSING_COMMA_SPACE));
logs = Arrays.asList(currentLine.split(OUTPUT_PARSING_LOGS)[1].split(OUTPUT_PARSING_COMMA_SPACE));
} else if (currentLine.contains(OUTPUT_PARSING_JOB_ID)) {
resolvedId = Long.parseLong(currentLine.split(OUTPUT_PARSING_JOB_ID)[1]);
id = Long.parseLong(currentLine.split(OUTPUT_PARSING_JOB_ID)[1]);
} else if (currentLine.trim().isEmpty()) {
break;
break;
}
}
scanner.close();
return new ParsedTaskComputationValues(resolvedInputs, resolvedOutputs, resolvedLogs, resolvedId);
}
return !(inputs == null || id == null);
}
}
......@@ -91,8 +91,13 @@ public class SPIMPipelineProgressViewController implements FXFrame.Controller {
}
}, Constants.HAAS_UPDATE_TIMEOUT / Constants.UI_TO_HAAS_FREQUENCY_UPDATE_RATIO);
} else {
List<TaskComputation> computations = tasks.stream().map(task -> task.getComputations())
.collect(Collectors.<List<TaskComputation>>maxBy((a, b) -> a.size() - b.size())).get();
Optional<List<TaskComputation>> optional = tasks.stream().map(task -> task.getComputations())
.collect(Collectors.<List<TaskComputation>>maxBy((a, b) -> a.size() - b.size()));
if(!optional.isPresent()) {
return;
}
List<TaskComputation> computations = optional.get();
int i = 0;
FXFrame.Controller.setCellValueFactory(this.tasks, i++, (Function<Task, String>) v -> v.getDescription());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment