Skip to content
Snippets Groups Projects
Commit d14e2226 authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

Merge branch 'iss1011' into 'master'

Iss1011

See merge request fiji/haas-java-client!5
parents 9e96dd52 48a79a0b
No related branches found
No related tags found
1 merge request!5Iss1011
Showing
with 298 additions and 14 deletions
package cz.it4i.fiji.haas;
public interface HaaSOutputHolder {
String getActualOutput();
}
\ No newline at end of file
package cz.it4i.fiji.haas;
import java.util.Arrays;
import cz.it4i.fiji.haas.JobManager.JobSynchronizableFile;
import cz.it4i.fiji.haas_java_client.SynchronizableFileType;
public class HaaSOutputHolderImpl implements HaaSOutputHolder {
private StringBuilder result = new StringBuilder();
private HaaSOutputSource source;
private SynchronizableFileType type;
public HaaSOutputHolderImpl(HaaSOutputSource source, SynchronizableFileType typeForHold) {
super();
this.source = source;
this.type = typeForHold;
}
/* (non-Javadoc)
* @see cz.it4i.fiji.haas.HaaSOutputHolder#getActualOutput()
*/
@Override
public String getActualOutput () {
updateData();
return result.toString();
}
private void updateData() {
JobSynchronizableFile file = new JobSynchronizableFile(type, result.length());
result.append(source.getOutput(Arrays.asList(file)).get(0));
}
}
package cz.it4i.fiji.haas;
import java.util.List;
import cz.it4i.fiji.haas.JobManager.JobSynchronizableFile;
public interface HaaSOutputSource {
public List<String> getOutput(List<JobSynchronizableFile> files);
}
package cz.it4i.fiji.haas_java_client; package cz.it4i.fiji.haas_java_client;
public enum JobState { public enum JobState {
Configuring , Unknown,
Submitted , Configuring,
Queued , Submitted,
Running , Queued,
Finished , Running,
Failed , Finished,
Failed,
Canceled; Canceled;
} }
package cz.it4i.fiji.haas_spim_benchmark.core; package cz.it4i.fiji.haas_spim_benchmark.core;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileWriter; import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
...@@ -9,12 +10,14 @@ import java.nio.file.Files; ...@@ -9,12 +10,14 @@ import java.nio.file.Files;
import java.nio.file.InvalidPathException; import java.nio.file.InvalidPathException;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Calendar; import java.util.Calendar;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Scanner;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
...@@ -22,6 +25,9 @@ import org.slf4j.Logger; ...@@ -22,6 +25,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.Yaml;
import cz.it4i.fiji.haas.HaaSOutputHolder;
import cz.it4i.fiji.haas.HaaSOutputHolderImpl;
import cz.it4i.fiji.haas.HaaSOutputSource;
import cz.it4i.fiji.haas.Job; import cz.it4i.fiji.haas.Job;
import cz.it4i.fiji.haas.JobManager; import cz.it4i.fiji.haas.JobManager;
import cz.it4i.fiji.haas.JobManager.JobSynchronizableFile; import cz.it4i.fiji.haas.JobManager.JobSynchronizableFile;
...@@ -29,6 +35,7 @@ import cz.it4i.fiji.haas.UploadingFileFromResource; ...@@ -29,6 +35,7 @@ import cz.it4i.fiji.haas.UploadingFileFromResource;
import cz.it4i.fiji.haas_java_client.HaaSClient; import cz.it4i.fiji.haas_java_client.HaaSClient;
import cz.it4i.fiji.haas_java_client.JobState; import cz.it4i.fiji.haas_java_client.JobState;
import cz.it4i.fiji.haas_java_client.Settings; import cz.it4i.fiji.haas_java_client.Settings;
import cz.it4i.fiji.haas_java_client.SynchronizableFileType;
import javafx.beans.value.ObservableValueBase; import javafx.beans.value.ObservableValueBase;
import net.imagej.updater.util.Progress; import net.imagej.updater.util.Progress;
...@@ -39,10 +46,33 @@ public class BenchmarkJobManager { ...@@ -39,10 +46,33 @@ public class BenchmarkJobManager {
private static Logger log = LoggerFactory private static Logger log = LoggerFactory
.getLogger(cz.it4i.fiji.haas_spim_benchmark.core.BenchmarkJobManager.class); .getLogger(cz.it4i.fiji.haas_spim_benchmark.core.BenchmarkJobManager.class);
public final class BenchmarkJob extends ObservableValueBase<BenchmarkJob> { private JobManager jobManager;
public final class BenchmarkJob extends ObservableValueBase<BenchmarkJob> implements HaaSOutputSource {
private Job job; private Job job;
private JobState oldState; private JobState oldState;
private List<Task> tasks;
private SPIMComputationAccessor computationAccessor = new SPIMComputationAccessor() {
private HaaSOutputHolder outputOfSnakemake =
new HaaSOutputHolderImpl(getValue(), SynchronizableFileType.StandardErrorFile);
@Override
public String getActualOutput() {
return outputOfSnakemake.getActualOutput();
}
@Override
public boolean fileExists(Path filePath) {
File f = new File(filePath.toString());
return f.exists() && !f.isDirectory();
}
};
public BenchmarkJob(Job job) { public BenchmarkJob(Job job) {
super(); super();
...@@ -73,7 +103,7 @@ public class BenchmarkJobManager { ...@@ -73,7 +103,7 @@ public class BenchmarkJobManager {
setDownloaded(true); setDownloaded(true);
} }
public void downloadStatistics(Progress progress) throws IOException { public void downloadStatistics(Progress progress) throws IOException {
job.download(BenchmarkJobManager.downloadStatistics(), progress); job.download(BenchmarkJobManager.downloadStatistics(), progress);
fireValueChangedEvent(); fireValueChangedEvent();
Path resultFile = job.getDirectory().resolve(Constants.BENCHMARK_RESULT_FILE); Path resultFile = job.getDirectory().resolve(Constants.BENCHMARK_RESULT_FILE);
...@@ -84,7 +114,7 @@ public class BenchmarkJobManager { ...@@ -84,7 +114,7 @@ public class BenchmarkJobManager {
public List<String> getOutput(List<JobSynchronizableFile> files) { public List<String> getOutput(List<JobSynchronizableFile> files) {
return job.getOutput(files); return job.getOutput(files);
} }
public long getId() { public long getId() {
return job.getId(); return job.getId();
} }
...@@ -118,8 +148,7 @@ public class BenchmarkJobManager { ...@@ -118,8 +148,7 @@ public class BenchmarkJobManager {
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (obj instanceof BenchmarkJob) { if (obj instanceof BenchmarkJob) {
BenchmarkJob job = (BenchmarkJob) obj; return ((BenchmarkJob) obj).getId() == getId();
return job.getId() == getId();
} }
return false; return false;
} }
...@@ -145,6 +174,40 @@ public class BenchmarkJobManager { ...@@ -145,6 +174,40 @@ public class BenchmarkJobManager {
return job.getDirectory(); return job.getDirectory();
} }
public List<Task> getTasks() {
if (tasks == null) {
fillTasks();
}
return tasks;
}
private void fillTasks() {
final String OUTPUT_PARSING_JOB_COUNTS = "Job counts:";
final String OUTPUT_PARSING_TAB_DELIMITER = "\\t";
final int OUTPUT_PARSING_EXPECTED_NUMBER_OF_WORDS_PER_LINE = 2;
tasks = new ArrayList<>();
Scanner scanner = new Scanner(computationAccessor.getActualOutput());
while (scanner.hasNextLine()) {
if (!scanner.nextLine().equals(OUTPUT_PARSING_JOB_COUNTS)) {
continue;
}
scanner.nextLine();
while (true) {
List<String> lineWords = Arrays.stream(scanner.nextLine().split(OUTPUT_PARSING_TAB_DELIMITER))
.filter(word -> word.length() > 0).collect(Collectors.toList());
if (lineWords.size() != OUTPUT_PARSING_EXPECTED_NUMBER_OF_WORDS_PER_LINE) {
break;
}
tasks.add(new Task(computationAccessor, lineWords.get(1), Integer.parseInt(lineWords.get(0))));
}
break;
}
scanner.close();
}
private void setDownloaded(boolean b) { private void setDownloaded(boolean b) {
job.setProperty(JOB_HAS_DATA_TO_DOWNLOAD_PROPERTY, b + ""); job.setProperty(JOB_HAS_DATA_TO_DOWNLOAD_PROPERTY, b + "");
} }
...@@ -155,8 +218,6 @@ public class BenchmarkJobManager { ...@@ -155,8 +218,6 @@ public class BenchmarkJobManager {
} }
} }
private JobManager jobManager;
public BenchmarkJobManager(BenchmarkSPIMParameters params) throws IOException { public BenchmarkJobManager(BenchmarkSPIMParameters params) throws IOException {
jobManager = new JobManager(params.workingDirectory(), constructSettingsFromParams(params)); jobManager = new JobManager(params.workingDirectory(), constructSettingsFromParams(params));
} }
...@@ -331,7 +392,6 @@ public class BenchmarkJobManager { ...@@ -331,7 +392,6 @@ public class BenchmarkJobManager {
} }
private static Settings constructSettingsFromParams(BenchmarkSPIMParameters params) { private static Settings constructSettingsFromParams(BenchmarkSPIMParameters params) {
// TODO Auto-generated method stub
return new Settings() { return new Settings() {
@Override @Override
......
package cz.it4i.fiji.haas_spim_benchmark.core;
import java.util.Collection;
abstract public class PipelineBase<T extends PipelineBase<?,?>,S> {
private Collection<T> successors;
private S id;
public PipelineBase( S id) {
super();
this.id = id;
}
public Collection<T> getSuccessors() {
if(successors == null) {
successors = fillSuccesors();
}
return successors;
}
public S getId() {
return id;
}
abstract protected Collection<T> fillSuccesors();
}
package cz.it4i.fiji.haas_spim_benchmark.core;
import java.nio.file.Path;
import cz.it4i.fiji.haas.HaaSOutputHolder;
public interface SPIMComputationAccessor extends HaaSOutputHolder {
boolean fileExists(Path fileName);
}
package cz.it4i.fiji.haas_spim_benchmark.core;
import java.util.Collection;
import java.util.LinkedList;
public class Task {
private final String description;
private final Collection<TaskComputation> computations;
public Task(SPIMComputationAccessor outputHolder, String description, int numComputations) {
this.description = description;
this.computations = new LinkedList<>();
for (int i = 0; i < numComputations; i++) {
computations.add(new TaskComputation(outputHolder, this, i + 1));
}
}
public String getDescription() {
return description;
}
public Collection<TaskComputation> getComputations() {
return computations;
}
}
package cz.it4i.fiji.haas_spim_benchmark.core;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Scanner;
import cz.it4i.fiji.haas_java_client.JobState;
public class TaskComputation {
// A single-purpose class dedicated to help with TaskComputation members initialization
private class ParsedTaskComputationValues {
private final Collection<String> logs;
private final Collection<String> inputs;
private final Collection<String> outputs;
private final Long id;
public ParsedTaskComputationValues(Collection<String> inputs, Collection<String> outputs, Collection<String> logs, Long id) {
this.inputs = inputs;
this.outputs = outputs;
this.logs = logs;
this.id = id;
}
}
private final Task task;
private final int timepoint;
private final Collection<String> inputs;
private final Collection<String> outputs;
private final Collection<String> logs;
private final Long id;
//TASK 1011 what states will be defined and how it will be defined
private JobState state = JobState.Unknown;
public TaskComputation(SPIMComputationAccessor outputHolder, Task task, int timepoint) {
this.task = task;
this.timepoint = timepoint;
ParsedTaskComputationValues parsedValues = parseStuff(outputHolder);
this.inputs = parsedValues.inputs;
this.outputs = parsedValues.outputs;
this.logs = parsedValues.logs;
this.id = parsedValues.id;
}
public JobState getState() {
updateState();//TASK 1011 it is not good idea update every time when state is requested
return state;
}
private void updateState() {
//TASK 1011 This should never happen, add some error handling to resolveId()
if (id == null) {
return;
}
//String snakeOutput = outputHolder.getActualOutput();
//TASK 1011
//resolve if job is queued (defined id), started (exists log file), finished (in log is Finished job 10.) or
//or failed (some error in log)
}
private Long getId() {
return id;
}
private ParsedTaskComputationValues parseStuff(SPIMComputationAccessor outputHolder) {
final String OUTPUT_PARSING_RULE = "rule ";
final String OUTPUT_PARSING_COLON = ":";
final String OUTPUT_PARSING_COMMA_SPACE = ", ";
final String desiredPattern = OUTPUT_PARSING_RULE + task.getDescription() + OUTPUT_PARSING_COLON;
final String OUTPUT_PARSING_INPUTS = "input: ";
final String OUTPUT_PARSING_OUTPUTS = "output: ";
final String OUTPUT_PARSING_LOGS = "log: ";
final String OUTPUT_PARSING_JOB_ID = "jobid: ";
Scanner scanner = new Scanner(outputHolder.getActualOutput());
int jobsToSkip = timepoint;
while (scanner.hasNextLine() && jobsToSkip > 0) {
if (scanner.nextLine().equals(desiredPattern)) {
jobsToSkip--;
}
}
String currentLine;
Collection<String> resolvedInputs = new LinkedList<>();
Collection<String> resolvedOutputs = new LinkedList<>();
Collection<String> resolvedLogs = new LinkedList<>();
Long resolvedId = null;
while (scanner.hasNextLine()) {
currentLine = scanner.nextLine();
if (currentLine.contains(OUTPUT_PARSING_INPUTS)) {
resolvedInputs = Arrays.asList(currentLine.split(OUTPUT_PARSING_INPUTS)[1].split(OUTPUT_PARSING_COMMA_SPACE));
} else if (currentLine.contains(OUTPUT_PARSING_OUTPUTS)) {
resolvedOutputs = Arrays.asList(currentLine.split(OUTPUT_PARSING_OUTPUTS)[1].split(OUTPUT_PARSING_COMMA_SPACE));
} else if (currentLine.contains(OUTPUT_PARSING_LOGS)) {
resolvedLogs = Arrays.asList(currentLine.split(OUTPUT_PARSING_LOGS)[1].split(OUTPUT_PARSING_COMMA_SPACE));
} else if (currentLine.contains(OUTPUT_PARSING_JOB_ID)) {
resolvedId = Long.parseLong(currentLine.split(OUTPUT_PARSING_JOB_ID)[1]);
} else if (currentLine.trim().isEmpty()) {
break;
}
}
scanner.close();
return new ParsedTaskComputationValues(resolvedInputs, resolvedOutputs, resolvedLogs, resolvedId);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment