Skip to content
Snippets Groups Projects
Commit 6f5022ab authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

support for asynchronous get of JobState

parent 342b2a72
No related branches found
No related tags found
No related merge requests found
......@@ -19,7 +19,9 @@ import javafx.scene.control.TableColumn;
import javafx.scene.control.TableView;
public interface JavaFXRoutines {
Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.ui.CloseableControl.class);
static public class TableCellAdapter<S, T> extends TableCell<S, T> {
public interface TableCellUpdater<A, B> {
void accept(TableCell<?, ?> cell, B value, boolean empty);
......@@ -40,23 +42,43 @@ public interface JavaFXRoutines {
static public class FutureValueUpdater<S, T, U extends CompletableFuture<T>> implements TableCellUpdater<S, U> {
private TableCellUpdater<S, T> inner;
public FutureValueUpdater(TableCellUpdater<S, T> inner) {
private Executor executor;
public FutureValueUpdater(TableCellUpdater<S, T> inner, Executor exec) {
this.inner = inner;
this.executor = exec;
}
@Override
public void accept(TableCell<?, ?> cell, U value, boolean empty) {
value.thenAccept(val->inner.accept(cell, val, empty));
if (value != null) {
if (!value.isDone()) {
inner.accept(cell, null, empty);
}
value.thenAcceptAsync(val -> {
inner.accept(cell, val, empty);
}, executor);
} else {
inner.accept(cell, null, empty);
}
}
}
static public class StringValueUpdater<S> implements TableCellUpdater<S, String> {
@Override
public void accept(TableCell<?, ?> cell, String value, boolean empty) {
if (value != null) {
cell.setText(value);
} else if (!empty) {
cell.setText("N/A");
}
}
}
static void initRootAndController(String string, Object parent) {
initRootAndController(string, parent, false);
}
static void initRootAndController(String string, Object parent, boolean setController) {
FXMLLoader fxmlLoader = null;
fxmlLoader = new FXMLLoader(parent.getClass().getResource(string));
......@@ -72,7 +94,7 @@ public interface JavaFXRoutines {
}
});
fxmlLoader.setRoot(parent);
if(setController) {
if (setController) {
fxmlLoader.setController(parent);
}
try {
......@@ -115,6 +137,4 @@ public interface JavaFXRoutines {
});
}
public static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.ui.CloseableControl.class);
}
package cz.it4i.fiji.haas.ui;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import cz.it4i.fiji.haas.ui.UpdatableObservableValue.UpdateStatus;
import javafx.beans.value.ObservableValue;
......@@ -38,6 +40,10 @@ public class ObservableValueRegistry<T> {
return map.get(value);
}
public Collection<ObservableValue<T>> items() {
return map.values().stream().map(val->(ObservableValue<T>)val).collect(Collectors.toList());
}
protected ObservableValue<T> remove(T value) {
return map.remove(value);
}
......
......@@ -69,6 +69,8 @@ public class BenchmarkJobManager {
private JobState realFinishedState;
private CompletableFuture<JobState> running;
public BenchmarkJob(Job job) {
this.job = job;
tasks = new LinkedList<Task>();
......@@ -95,22 +97,34 @@ public class BenchmarkJobManager {
}
public synchronized CompletableFuture<JobState> getStateAsync(Executor executor) {
if(executor == null) {
return CompletableFuture.completedFuture(null);
}
if(running != null) {
return running;
}
JobState state = job.getState();
if (state != JobState.Finished) {
return CompletableFuture.completedFuture(state);
} else if (realFinishedState != null) {
return CompletableFuture.completedFuture(realFinishedState);
}
return CompletableFuture.supplyAsync(()->{
realFinishedState = Stream
.concat(Arrays.asList(state).stream(), getTasks().stream()
.flatMap(task -> task.getComputations().stream()).map(tc -> tc.getState()))
.max(new JobStateComparator()).get();
if (realFinishedState != JobState.Finished && realFinishedState != JobState.Canceled) {
realFinishedState = JobState.Failed;
return running = CompletableFuture.supplyAsync(()->{
try {
realFinishedState = Stream
.concat(Arrays.asList(state).stream(), getTasks().stream()
.flatMap(task -> task.getComputations().stream()).map(tc -> tc.getState()))
.max(new JobStateComparator()).get();
if (realFinishedState != JobState.Finished && realFinishedState != JobState.Canceled) {
realFinishedState = JobState.Failed;
}
return realFinishedState;
} finally {
synchronized(BenchmarkJob.this) {
running = null;
}
}
return realFinishedState;
}, executor);
}
......@@ -215,6 +229,11 @@ public class BenchmarkJobManager {
public List<String> getActualOutput(List<SynchronizableFileType> content) {
return computationAccessor.getActualOutput(content);
}
@Override
public String toString() {
return "" + getId();
}
private String getStringFromTimeSafely(Calendar time) {
return time != null ? time.getTime().toString() : "N/A";
......
......@@ -10,6 +10,7 @@ import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -36,11 +37,12 @@ import cz.it4i.fiji.haas_spim_benchmark.core.Constants;
import cz.it4i.fiji.haas_spim_benchmark.core.FXFrameExecutorService;
import javafx.beans.value.ObservableValue;
import javafx.fxml.FXML;
import javafx.scene.control.TableColumn;
import javafx.scene.control.TableView;
import javafx.scene.layout.BorderPane;
import net.imagej.updater.util.Progress;
public class BenchmarkSPIMController extends BorderPane implements CloseableControl,InitiableControl{
public class BenchmarkSPIMController extends BorderPane implements CloseableControl, InitiableControl {
private static boolean notNullValue(ObservableValue<BenchmarkJob> j, Predicate<BenchmarkJob> pred) {
if (j == null || j.getValue() == null) {
......@@ -59,6 +61,7 @@ public class BenchmarkSPIMController extends BorderPane implements CloseableCont
private ExecutorService executorServiceUI;
private ExecutorService executorServiceWS;
private ExecutorService executorServiceJobState = Executors.newWorkStealingPool();
private Executor executorServiceFX = new FXFrameExecutorService();
private Timer timer;
......@@ -70,9 +73,9 @@ public class BenchmarkSPIMController extends BorderPane implements CloseableCont
public BenchmarkSPIMController(BenchmarkJobManager manager) {
this.manager = manager;
JavaFXRoutines.initRootAndController("BenchmarkSPIM.fxml", this);
}
public void init(Window root) {
this.root = root;
executorServiceWS = Executors.newSingleThreadExecutor();
......@@ -87,11 +90,9 @@ public class BenchmarkSPIMController extends BorderPane implements CloseableCont
initTable();
initMenu();
updateJobs();
}
private void initMenu() {
TableViewContextMenu<ObservableValue<BenchmarkJob>> menu = new TableViewContextMenu<>(jobs);
menu.addItem("Create job", x -> executeWSCallAsync("Creating job", p -> manager.createJob()), j -> true);
......@@ -128,14 +129,13 @@ public class BenchmarkSPIMController extends BorderPane implements CloseableCont
menu.addItem("Explore errors", job -> job.getValue().exploreErrors(),
job -> notNullValue(job, j -> j.getState().equals(JobState.Failed)));
menu.addItem("Show output",
j -> {
new JobOutputView(root, executorServiceUI, j.getValue(),SynchronizableFileType.StandardErrorFile, job->job.getSnakemakeOutput(), Constants.HAAS_UPDATE_TIMEOUT);
new JobOutputView(root, executorServiceUI, j.getValue(),SynchronizableFileType.StandardOutputFile, job->job.getAnotherOutput(), Constants.HAAS_UPDATE_TIMEOUT);
},
job -> notNullValue(job,
j -> EnumSet.of(JobState.Failed, JobState.Finished, JobState.Running, JobState.Canceled)
.contains(j.getState())));
menu.addItem("Show output", j -> {
new JobOutputView(root, executorServiceUI, j.getValue(), SynchronizableFileType.StandardErrorFile,
job -> job.getSnakemakeOutput(), Constants.HAAS_UPDATE_TIMEOUT);
new JobOutputView(root, executorServiceUI, j.getValue(), SynchronizableFileType.StandardOutputFile,
job -> job.getAnotherOutput(), Constants.HAAS_UPDATE_TIMEOUT);
}, job -> notNullValue(job, j -> EnumSet
.of(JobState.Failed, JobState.Finished, JobState.Running, JobState.Canceled).contains(j.getState())));
menu.addItem("Open working directory", j -> open(j.getValue()), x -> notNullValue(x, j -> true));
menu.addItem("Update table", job -> updateJobs(), j -> true);
......@@ -177,7 +177,7 @@ public class BenchmarkSPIMController extends BorderPane implements CloseableCont
}
private void updateJobs(boolean showProgress) {
if(manager == null) {
if (manager == null) {
return;
}
executorServiceWS.execute(() -> {
......@@ -186,21 +186,22 @@ public class BenchmarkSPIMController extends BorderPane implements CloseableCont
WindowConstants.DO_NOTHING_ON_CLOSE)
: new DummyProgress();
registry.update();
try {
Collection<BenchmarkJob> jobs = manager.getJobs();
Set<ObservableValue<BenchmarkJob>> actual = new HashSet<>(this.jobs.getItems());
for (BenchmarkJob bj : jobs) {
ObservableValue<BenchmarkJob> value = registry.addIfAbsent(bj);
executorServiceFX.execute(() -> {
//jobs.forEach(bj->bj.getStateAsync(executorServiceJobState));
Set<ObservableValue<BenchmarkJob>> actual = new HashSet<>(this.jobs.getItems());
for (BenchmarkJob bj : jobs) {
registry.addIfAbsent(bj);
}
registry.update();
executorServiceFX.execute(() -> {
for (ObservableValue<BenchmarkJob> value : registry.items()) {
if (!actual.contains(value)) {
this.jobs.getItems().add(value);
}
});
}
}
});
progress.done();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
......@@ -209,9 +210,10 @@ public class BenchmarkSPIMController extends BorderPane implements CloseableCont
}
private void initTable() {
registry = new ObservableBenchmarkJobRegistry(bj -> remove(bj));
registry = new ObservableBenchmarkJobRegistry(bj -> remove(bj),executorServiceJobState);
setCellValueFactory(0, j -> j.getId() + "");
setCellValueFactory(1, j -> j.getState().toString());
setCellValueFactoryComplatable(1,
j -> j.getStateAsync(executorServiceJobState).thenApply(state -> "" + state));
setCellValueFactory(2, j -> j.getCreationTime().toString());
setCellValueFactory(3, j -> j.getStartTime().toString());
setCellValueFactory(4, j -> j.getEndTime().toString());
......@@ -227,6 +229,17 @@ public class BenchmarkSPIMController extends BorderPane implements CloseableCont
JavaFXRoutines.setCellValueFactory(jobs, index, mapper);
}
@SuppressWarnings("unchecked")
private void setCellValueFactoryComplatable(int index, Function<BenchmarkJob, CompletableFuture<String>> mapper) {
JavaFXRoutines.setCellValueFactory(jobs, index, mapper);
((TableColumn<ObservableValue<BenchmarkJob>, CompletableFuture<String>>) jobs.getColumns().get(index))
.setCellFactory(
column -> new JavaFXRoutines.TableCellAdapter<ObservableValue<BenchmarkJob>, CompletableFuture<String>>(
new JavaFXRoutines.FutureValueUpdater<ObservableValue<BenchmarkJob>, String, CompletableFuture<String>>(
new JavaFXRoutines.StringValueUpdater<ObservableValue<BenchmarkJob>>(),
executorServiceFX)));
}
private interface P_JobAction {
public void doAction(Progress p) throws IOException;
}
......@@ -234,6 +247,7 @@ public class BenchmarkSPIMController extends BorderPane implements CloseableCont
public void close() {
executorServiceUI.shutdown();
executorServiceWS.shutdown();
executorServiceJobState.shutdown();
timer.cancel();
}
}
package cz.it4i.fiji.haas_spim_benchmark.ui;
import java.nio.file.Files;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cz.it4i.fiji.haas.ui.ObservableValueRegistry;
import cz.it4i.fiji.haas.ui.UpdatableObservableValue.UpdateStatus;
import cz.it4i.fiji.haas_java_client.JobState;
......@@ -10,19 +14,32 @@ import cz.it4i.fiji.haas_spim_benchmark.core.BenchmarkJobManager.BenchmarkJob;
public class ObservableBenchmarkJobRegistry extends ObservableValueRegistry<BenchmarkJob> {
public ObservableBenchmarkJobRegistry(Consumer<BenchmarkJob> removeConsumer) {
super(t -> update(t), t -> t.getState(), removeConsumer);
@SuppressWarnings("unused")
private static Logger log = LoggerFactory
.getLogger(cz.it4i.fiji.haas_spim_benchmark.ui.ObservableBenchmarkJobRegistry.class);
public ObservableBenchmarkJobRegistry(Consumer<BenchmarkJob> removeConsumer, Executor exec) {
super(t -> update(t,exec), t -> {
return t.getStateAsync(exec).getNow(null);
}, removeConsumer);
}
private static UpdateStatus update(BenchmarkJob t) {
JobState oldState = t.getState();
t.update();
private static UpdateStatus update(BenchmarkJob t, Executor executor) {
if (!Files.isDirectory(t.getDirectory())) {
return UpdateStatus.Deleted;
}
UpdateStatus result = oldState != t.getState() ? UpdateStatus.Updated : UpdateStatus.NotUpdated;
JobState oldState = t.getStateAsync(executor).getNow(null);
t.update();
JobState newState = t.getStateAsync(executor).getNow(null);
if (newState == null) {
return UpdateStatus.Updated;
}
UpdateStatus result;
result = oldState != newState ? UpdateStatus.Updated : UpdateStatus.NotUpdated;
return result;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment