Commit ca29bd3d authored by Petr Bainar's avatar Petr Bainar

Merge branch 'miscStuffExtended' into 'master'

Misc stuff extended

See merge request !31
parents 889d9978 9b7dcafd
......@@ -21,6 +21,7 @@ import javafx.scene.control.TableColumn;
import javafx.scene.control.TableRow;
import javafx.scene.control.TableView;
import javafx.scene.paint.Color;
import javafx.util.Callback;
public interface JavaFXRoutines {
......@@ -62,7 +63,15 @@ public interface JavaFXRoutines {
Function<U, V> mapper) {
((TableColumn<T, V>) tableView.getColumns().get(index))
.setCellValueFactory(f -> new ObservableValueAdapter<>(f.getValue(), mapper));
}
@SuppressWarnings("unchecked")
static public <T, V> void setCellValueFactoryForList(TableView<T> tableView,
int index,
Callback<TableColumn.CellDataFeatures<T, V>, ObservableValue<V>> callback)
{
((TableColumn<T, V>) tableView.getColumns().get(index)).setCellValueFactory(
callback);
}
static public RunnableFuture<Void> runOnFxThread(Runnable runnable) {
......
......@@ -87,8 +87,7 @@ public class BenchmarkJobManager implements Closeable {
public BenchmarkJob(Job job) {
this.job = job;
snakemakeOutputHelper = new SnakemakeOutputHelper(job,
new LinkedList<Task>(), new LinkedList<BenchmarkError>());
snakemakeOutputHelper = new SnakemakeOutputHelper(job);
}
public synchronized void startJob(Progress progress) throws IOException {
......
......@@ -13,44 +13,33 @@ import java.util.TimerTask;
import cz.it4i.fiji.haas_java_client.SynchronizableFileType;
import cz.it4i.fiji.haas_spim_benchmark.core.BenchmarkJobManager.BenchmarkJob;
import javafx.beans.value.ObservableValue;
class HaasOutputObservableValueRegistry implements Closeable {
private final BenchmarkJob job;
private final Map<SynchronizableFileType, SimpleObservableValue<String>> observableValues =
new HashMap<>();
private final Timer timer;
private final TimerTask updateTask;
private Timer timer;
private boolean isRunning = false;
private int numberOfListeners = 0;
private boolean closed = false;
public HaasOutputObservableValueRegistry(final BenchmarkJob job) {
this.job = job;
this.observableValues.put(SynchronizableFileType.StandardOutputFile,
createObservableValue());
this.observableValues.put(SynchronizableFileType.StandardErrorFile,
createObservableValue());
this.timer = new Timer();
this.updateTask = new TimerTask() {
@Override
public void run() {
final List<SynchronizableFileType> types = new LinkedList<>(
observableValues.keySet());
Streams.zip(types.stream(), job.getComputationOutput(types).stream(), (
type, value) -> (Runnable) (() -> observableValues.get(type).update(
value))).forEach(r -> r.run());
}
};
}
@Override
public synchronized void close() {
timer.cancel();
stopTimer();
numberOfListeners = 0;
closed = true;
}
public ObservableValue<String> getObservableOutput(
public SimpleObservableValue<String> getObservableOutput(
final SynchronizableFileType type)
{
return observableValues.get(type);
......@@ -62,13 +51,17 @@ class HaasOutputObservableValueRegistry implements Closeable {
}
private synchronized void increaseNumberOfObservers() {
numberOfListeners++;
evaluateTimer();
if (!closed) {
numberOfListeners++;
evaluateTimer();
}
}
private synchronized void decreaseNumberOfObservers() {
numberOfListeners--;
evaluateTimer();
if (!closed) {
numberOfListeners--;
evaluateTimer();
}
}
private void evaluateTimer() {
......@@ -76,15 +69,35 @@ class HaasOutputObservableValueRegistry implements Closeable {
final boolean anyListeners = numberOfListeners > 0;
if (!isRunning && anyListeners) {
timer.schedule(updateTask, 0, Constants.HAAS_UPDATE_TIMEOUT /
timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
final List<SynchronizableFileType> types = new LinkedList<>(
observableValues.keySet());
Streams.zip(types.stream(), job.getComputationOutput(types).stream(),
(type, value) -> (Runnable) (() -> observableValues.get(type)
.update(value))).forEach(r -> r.run());
}
}, 0, Constants.HAAS_UPDATE_TIMEOUT /
Constants.UI_TO_HAAS_FREQUENCY_UPDATE_RATIO);
isRunning = true;
}
else if (isRunning && !anyListeners) {
stopTimer();
}
}
private void stopTimer() {
if (timer != null) {
timer.cancel();
timer = null;
isRunning = false;
}
}
}
......@@ -17,7 +17,6 @@ import org.slf4j.LoggerFactory;
import cz.it4i.fiji.haas.ui.UpdatableObservableValue;
import cz.it4i.fiji.haas_java_client.SynchronizableFileType;
import cz.it4i.fiji.haas_spim_benchmark.core.BenchmarkJobManager.BenchmarkJob;
import javafx.beans.value.ObservableValue;
public class ObservableBenchmarkJob extends
UpdatableObservableValue<BenchmarkJob> implements Closeable
......@@ -36,7 +35,9 @@ public class ObservableBenchmarkJob extends
private final P_Observable fileTransferObservable = new P_Observable();
private final HaasOutputObservableValueRegistry observableValueRegistry;
private final HaasOutputObservableValueRegistry haasOutputRegistry;
private final TaskObservableValueRegistry taskRegistry;
public interface TransferProgress {
......@@ -59,7 +60,8 @@ public class ObservableBenchmarkJob extends
wrapped.setUploadNotifier(uploadProgress);
wrapped.resumeTransfer();
observableValueRegistry = new HaasOutputObservableValueRegistry(getValue());
haasOutputRegistry = new HaasOutputObservableValueRegistry(getValue());
taskRegistry = new TaskObservableValueRegistry(getValue());
}
public TransferProgress getDownloadProgress() {
......@@ -83,15 +85,20 @@ public class ObservableBenchmarkJob extends
fileTransferObservable.deleteObserver(observer);
}
public ObservableValue<String> getObservableSnakemakeOutput(
public SimpleObservableValue<String> getObservableSnakemakeOutput(
SynchronizableFileType type)
{
return observableValueRegistry.getObservableOutput(type);
return haasOutputRegistry.getObservableOutput(type);
}
public SimpleObservableList<Task> getObservableTaskList() {
return taskRegistry.getTaskList();
}
@Override
public void close() {
observableValueRegistry.close();
haasOutputRegistry.close();
taskRegistry.close();
}
@Override
......
package cz.it4i.fiji.haas_spim_benchmark.core;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import cz.it4i.fiji.haas_java_client.SynchronizableFileType;
public class SPIMComputationAccessorAdapter implements SPIMComputationAccessor {
@Override
public List<String> getActualOutput(List<SynchronizableFileType> content) {
return mapCollect(content, c -> "");
}
@Override
public List<Long> getFileSizes(List<String> names) {
return mapCollect(names, s -> 0l);
}
@Override
public List<String> getFileContents(List<String> logs) {
return mapCollect(logs, s -> "");
}
@Override
public Collection<String> getChangedFiles() {
return Collections.emptyList();
}
private <U, V> List<V> mapCollect(List<U> input, Function<U, V> map) {
return input.stream().map(map).collect(Collectors.toList());
}
}
package cz.it4i.fiji.haas_spim_benchmark.core;
import java.util.List;
import javafx.collections.ListChangeListener;
import javafx.collections.ModifiableObservableListBase;
public class SimpleObservableList<T> extends ModifiableObservableListBase<T> {
private final List<T> innerList;
private final Runnable numberOfListenersChangedCallback;
private int numberOfSubscribedListeners = 0;
public SimpleObservableList(final List<T> list) {
this(list, null);
}
public SimpleObservableList(final List<T> list,
Runnable numberOfListenersCallback)
{
this.innerList = list;
this.numberOfListenersChangedCallback = numberOfListenersCallback;
}
@Override
public T get(int index) {
return innerList.get(index);
}
@Override
public int size() {
return innerList.size();
}
@Override
protected void doAdd(int index, T element) {
innerList.add(index, element);
}
@Override
protected T doSet(int index, T element) {
return innerList.set(index, element);
}
@Override
protected T doRemove(int index) {
return innerList.remove(index);
}
synchronized public boolean hasAnyListeners() {
assert numberOfSubscribedListeners >= 0;
return numberOfSubscribedListeners > 0;
}
synchronized public void subscribe(ListChangeListener<? super T> listener) {
super.addListener(listener);
numberOfSubscribedListeners++;
if (numberOfListenersChangedCallback != null) {
numberOfListenersChangedCallback.run();
}
}
synchronized public void unsubscribe(ListChangeListener<? super T> listener) {
super.removeListener(listener);
numberOfSubscribedListeners--;
if (numberOfListenersChangedCallback != null) {
numberOfListenersChangedCallback.run();
}
}
}
......@@ -5,6 +5,7 @@ import static cz.it4i.fiji.haas_spim_benchmark.core.Constants.BENCHMARK_TASK_NAM
import static cz.it4i.fiji.haas_spim_benchmark.core.Constants.HAAS_UPDATE_TIMEOUT;
import static cz.it4i.fiji.haas_spim_benchmark.core.Constants.UI_TO_HAAS_FREQUENCY_UPDATE_RATIO;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
......@@ -29,13 +30,12 @@ class SnakemakeOutputHelper implements HaaSOutputHolder {
private final List<BenchmarkError> nonTaskSpecificErrors;
private int processedOutputLength;
public SnakemakeOutputHelper(final Job job, final List<Task> tasks,
final List<BenchmarkError> nonTaskSpecificErrors)
public SnakemakeOutputHelper(final Job job)
{
this.job = job;
this.computationAccessor = createComputationAccessor();
this.tasks = tasks;
this.nonTaskSpecificErrors = nonTaskSpecificErrors;
this.tasks = new ArrayList<>();
this.nonTaskSpecificErrors = new ArrayList<>();
}
@Override
......@@ -45,7 +45,7 @@ class SnakemakeOutputHelper implements HaaSOutputHolder {
return computationAccessor.getActualOutput(content);
}
List<Task> getTasks() {
synchronized List<Task> getTasks() {
// If no tasks have been identified, try to search through the output
if (tasks.isEmpty()) {
......
package cz.it4i.fiji.haas_spim_benchmark.core;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import cz.it4i.fiji.haas.ui.JavaFXRoutines;
import cz.it4i.fiji.haas_spim_benchmark.core.BenchmarkJobManager.BenchmarkJob;
class TaskObservableValueRegistry implements Closeable {
private final static Task EMPTY_TASK = new Task(
new SPIMComputationAccessorAdapter(), "", 0);
private final BenchmarkJob job;
private final SimpleObservableList<Task> observableTaskList;
private Timer timer;
private boolean isRunning = false;
private boolean closed = false;
public TaskObservableValueRegistry(final BenchmarkJob job) {
this.job = job;
this.observableTaskList = new SimpleObservableList<>(new ArrayList<Task>(),
this::evaluateTimer);
this.observableTaskList.add(EMPTY_TASK);
}
//TODO close neverCalled
@Override
public synchronized void close() {
stopTimer();
closed = true;
}
public SimpleObservableList<Task> getTaskList() {
return observableTaskList;
}
private synchronized void evaluateTimer() {
class L_TimerTask extends TimerTask {
@Override
public void run() {
List<Task> tasks = job.getTasks();
JavaFXRoutines.runOnFxThread(() -> observableTaskList.setAll(tasks));
synchronized(TaskObservableValueRegistry.this) {
if (timer != null) {
timer.schedule(new L_TimerTask(), Constants.HAAS_UPDATE_TIMEOUT /
Constants.UI_TO_HAAS_FREQUENCY_UPDATE_RATIO);
}
}
}
}
if (closed) {
return;
}
final boolean anyListeners = observableTaskList.hasAnyListeners();
if (!isRunning && anyListeners) {
timer = new Timer();
timer.schedule(new L_TimerTask(), 0);
isRunning = true;
}
else if (isRunning && !anyListeners) {
stopTimer();
}
}
private void stopTimer() {
if (timer != null) {
timer.cancel();
timer = null;
isRunning = false;
}
}
}
......@@ -2,6 +2,12 @@
package cz.it4i.fiji.haas_spim_benchmark.ui;
import java.awt.Window;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.swing.WindowConstants;
import net.imagej.updater.util.Progress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -9,9 +15,17 @@ import org.slf4j.LoggerFactory;
import cz.it4i.fiji.haas.ui.CloseableControl;
import cz.it4i.fiji.haas.ui.InitiableControl;
import cz.it4i.fiji.haas.ui.JavaFXRoutines;
import cz.it4i.fiji.haas.ui.ModalDialogs;
import cz.it4i.fiji.haas.ui.ProgressDialog;
import cz.it4i.fiji.haas_java_client.JobState;
import cz.it4i.fiji.haas_java_client.SynchronizableFileType;
import cz.it4i.fiji.haas_spim_benchmark.core.ObservableBenchmarkJob;
import cz.it4i.fiji.haas_spim_benchmark.core.SimpleObservableList;
import cz.it4i.fiji.haas_spim_benchmark.core.SimpleObservableValue;
import cz.it4i.fiji.haas_spim_benchmark.core.Task;
import javafx.beans.value.ChangeListener;
import javafx.beans.value.ObservableValue;
import javafx.collections.ListChangeListener;
import javafx.fxml.FXML;
import javafx.scene.control.Tab;
import javafx.scene.control.TabPane;
......@@ -24,13 +38,22 @@ public class JobDetailControl extends TabPane implements CloseableControl,
cz.it4i.fiji.haas_spim_benchmark.ui.JobDetailControl.class);
@FXML
private SPIMPipelineProgressViewController progressView;
private SPIMPipelineProgressViewController progressControl;
@FXML
private Tab progressTab;
@FXML
private LogViewControl snakemakeOutputControl;
@FXML
private LogViewControl errorOutput;
private Tab snakemakeOutputTab;
@FXML
private LogViewControl standardOutput;
private LogViewControl otherOutputControl;
@FXML
private Tab otherOutputTab;
@FXML
private JobPropertiesControl jobProperties;
......@@ -44,9 +67,59 @@ public class JobDetailControl extends TabPane implements CloseableControl,
@FXML
private Tab dataUploadTab;
private final ExecutorService executorServiceWS;
private final ObservableBenchmarkJob job;
private SimpleObservableList<Task> taskList;
private final ListChangeListener<Task> taskListListener =
new ListChangeListener<Task>()
{
@Override
public void onChanged(Change<? extends Task> c) {
setTabAvailability(progressTab, taskList == null || taskList.isEmpty());
}
};
private SimpleObservableValue<String> errorOutput;
private final ChangeListener<String> errorOutputListener =
new ChangeListener<String>()
{
@Override
public void changed(ObservableValue<? extends String> observable,
String oldValue, String newValue)
{
if (newValue != null) {
setTabAvailability(snakemakeOutputTab, newValue.isEmpty());
}
}
};
private SimpleObservableValue<String> standardOutput;
private final ChangeListener<String> standardOutputListener =
new ChangeListener<String>()
{
@Override
public void changed(ObservableValue<? extends String> observable,
String oldValue, String newValue)
{
if (newValue != null) {
setTabAvailability(otherOutputTab, newValue.isEmpty());
}
}
};
public JobDetailControl(final ObservableBenchmarkJob job) {
executorServiceWS = Executors.newSingleThreadExecutor();
JavaFXRoutines.initRootAndController("JobDetail.fxml", this);
this.job = job;
}
......@@ -55,31 +128,54 @@ public class JobDetailControl extends TabPane implements CloseableControl,
@Override
public void init(final Window parameter) {
progressView.init(parameter);
progressView.setJob(job);
errorOutput.setObservable(job.getObservableSnakemakeOutput(
SynchronizableFileType.StandardErrorFile));
standardOutput.setObservable(job.getObservableSnakemakeOutput(
SynchronizableFileType.StandardOutputFile));
jobProperties.setJob(job);
dataUpload.setJob(job);
if (job.getValue().getState() == JobState.Disposed) {
// TODO: Handle this?
if (log.isInfoEnabled()) {
log.info("Job " + job.getValue().getId() +
" state has been resolved as Disposed.");
}
}
if (areExecutionDetailsAvailable()) {
enableAllTabs();
}
else {
disableNonPermanentTabs();
}
Progress progress = ModalDialogs.doModal(new ProgressDialog(parameter,
"Downloading tasks"), WindowConstants.DO_NOTHING_ON_CLOSE);
executorServiceWS.execute(() -> {
try {
progressControl.init(parameter);
taskList = job.getObservableTaskList();
taskList.subscribe(taskListListener);
progressControl.setObservable(taskList);
errorOutput = job.getObservableSnakemakeOutput(
SynchronizableFileType.StandardErrorFile);
errorOutput.addListener(errorOutputListener);
snakemakeOutputControl.setObservable(errorOutput);
standardOutput = job.getObservableSnakemakeOutput(
SynchronizableFileType.StandardOutputFile);
standardOutput.addListener(standardOutputListener);
otherOutputControl.setObservable(standardOutput);
jobProperties.setJob(job);
dataUpload.setJob(job);
if (job.getValue().getState() == JobState.Disposed) {
// TODO: Handle this?
if (log.isInfoEnabled()) {
log.info("Job " + job.getValue().getId() +
" state has been resolved as Disposed.");
}
}
setActiveFirstVisibleTab(true);
}
finally {
final ListChangeListener<Task> localListener = new ListChangeListener<Task>() {
@Override
public void onChanged(Change<? extends Task> c) {
taskList.unsubscribe(this);
progress.done();
}
};
taskList.subscribe(localListener);
}
});
setActiveFirstVisibleTab();
}
// -- CloseableControl methods --
......@@ -87,45 +183,38 @@ public class JobDetailControl extends TabPane implements CloseableControl,
@Override
public void close() {
executorServiceWS.shutdown();
// Close controllers
progressView.close();
taskList.unsubscribe(taskListListener);
progressControl.close();
errorOutput.removeListener(errorOutputListener);
snakemakeOutputControl.close();
standardOutput.removeListener(standardOutputListener);
otherOutputControl.close();
jobProperties.close();
dataUpload.close();
}
// -- Helper methods --
/*
* Checks whether execution details are available
*/
private boolean areExecutionDetailsAvailable() {
return job.getValue().getState() == JobState.Running || job.getValue()
.getState() == JobState.Finished || job.getValue()
.getState() == JobState.Failed || job.getValue()
.getState() == JobState.Canceled;
private void setTabAvailability(final Tab tab, final boolean disabled) {
tab.setDisable(disabled);
setActiveFirstVisibleTab(false);
}
/*
* Disables all tabs except those which shall be always enabled, such as job properties tab
*/
private void disableNonPermanentTabs() {
getTabs().stream().filter(t -> t != jobPropertiesTab && t != dataUploadTab)
.forEach(t -> t.setDisable(true));
}