Skip to content
Snippets Groups Projects
Commit 1bab16c7 authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

Merge branch 'iss1007-differentWorkers'

parents 8444af16 1510560b
No related branches found
No related tags found
No related merge requests found
......@@ -54,8 +54,9 @@ public class Job {
long id = client.createJob(name, Collections.emptyList());
jobDir = basePath.resolve("" + id);
propertyHolder = new PropertyHolder(jobDir.resolve(JOB_INFO_FILE));
setName(name);
Files.createDirectory(jobDir);
setName(name);
}
public void setName(String name) {
......
package cz.it4i.fiji.haas_spim_benchmark.core;
import java.util.concurrent.Executor;
import cz.it4i.fiji.haas.ui.FXFrame;
public class FXFrameExecutorService implements Executor{
@Override
public void execute(Runnable command) {
FXFrame.runOnFxThread(() -> {
command.run();
});
}
}
......@@ -11,8 +11,11 @@ import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
......@@ -32,6 +35,7 @@ import cz.it4i.fiji.haas_java_client.JobState;
import cz.it4i.fiji.haas_spim_benchmark.core.BenchmarkJobManager;
import cz.it4i.fiji.haas_spim_benchmark.core.BenchmarkJobManager.Job;
import cz.it4i.fiji.haas_spim_benchmark.core.Constants;
import cz.it4i.fiji.haas_spim_benchmark.core.FXFrameExecutorService;
import javafx.fxml.FXML;
import javafx.scene.control.TableColumn;
import javafx.scene.control.TableView;
......@@ -54,7 +58,9 @@ public class BenchmarkSPIMController implements FXFrame.Controller {
private Window root;
private ExecutorService executorService;
private ExecutorService executorServiceUI;
private ExecutorService executorServiceWS;
private Executor executorServiceFX = new FXFrameExecutorService();
private Timer timer;
......@@ -63,7 +69,8 @@ public class BenchmarkSPIMController implements FXFrame.Controller {
@Override
public void init(Window frame) {
executorService = Executors.newSingleThreadExecutor();
executorServiceWS = Executors.newSingleThreadExecutor();
executorServiceUI = Executors.newSingleThreadExecutor();
timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
......@@ -91,89 +98,108 @@ public class BenchmarkSPIMController implements FXFrame.Controller {
private void initMenu() {
TableViewContextMenu<Job> menu = new TableViewContextMenu<>(jobs);
menu.addItem("Create job", x -> executeJobActionAsync("Creating job", false, p -> manager.createJob()), j -> true);
menu.addItem("Start job", job -> executeJobActionAsync("Starting job", p -> job.startJob(p)),
job -> notNullValue(job, j -> j.getState() == JobState.Configuring || j.getState() == JobState.Finished));
menu.addItem("Download result", job -> executeJobActionAsync("Downloading data", p -> job.downloadData(p)),
menu.addItem("Create job", x -> executeWSCallAsync("Creating job", p -> manager.createJob()),
j -> true);
menu.addItem("Start job", job -> executeWSCallAsync("Starting job", p -> job.startJob(p)),
job -> notNullValue(job,
j -> j.getState() == JobState.Configuring || j.getState() == JobState.Finished));
menu.addItem("Download result", job -> executeWSCallAsync("Downloading data", p -> job.downloadData(p)),
job -> notNullValue(job,
j -> EnumSet.of(JobState.Failed, JobState.Finished).contains(j.getState()) && !j.downloaded()));
menu.addItem("Download statistics",
job -> executeJobActionAsync("Downloading data", p -> job.downloadStatistics(p)),
job -> notNullValue(job, j -> j.getState() == JobState.Finished));
menu.addItem("Show output", j -> new JobOutputView(root, executorService, j, Constants.HAAS_UPDATE_TIMEOUT),
job -> executeWSCallAsync("Downloading data", p -> job.downloadStatistics(p)),
job -> notNullValue(job, j -> j.getState() == JobState.Finished));
menu.addItem("Show output", j -> new JobOutputView(root, executorServiceUI, j, Constants.HAAS_UPDATE_TIMEOUT),
job -> notNullValue(job,
j -> EnumSet.of(JobState.Failed, JobState.Finished, JobState.Running).contains(j.getState())));
menu.addItem("Open", j->open(j), x->true);
menu.addItem("Open", j -> open(j), x -> true);
menu.addItem("Update table", job -> updateJobs(), j -> true);
}
private void open(Job j) {
executorService.execute(() -> {
Desktop desktop = Desktop.getDesktop();
try {
desktop.open(j.getDirectory().toFile());
} catch (IOException e) {
log.error(e.getMessage(), e);
}});
}
private void executeJobActionAsync(String title, P_JobAction action) {
executeJobActionAsync(title, true, action);
executorServiceUI.execute(() -> {
Desktop desktop = Desktop.getDesktop();
try {
desktop.open(j.getDirectory().toFile());
} catch (IOException e) {
log.error(e.getMessage(), e);
}
});
}
private void executeJobActionAsync(String title, boolean update, P_JobAction action) {
executorService.execute(() -> {
private <V> void executeAsync(Executor executor, Callable<V> action, Consumer<V> postAction) {
executor.execute(() -> {
V result;
try {
ProgressDialog dialog = ModalDialogs.doModal(new ProgressDialog(root, title),
WindowConstants.DO_NOTHING_ON_CLOSE);
action.doAction(dialog);
dialog.done();
if(update) {
updateJobs();
}
} catch (IOException e) {
result = action.call();
postAction.accept(result);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
});
}
private void executeWSCallAsync(String title, P_JobAction action) {
executeWSCallAsync(title, true, action);
}
private void executeWSCallAsync(String title, boolean update, P_JobAction action) {
executeAsync(executorServiceWS, (Callable<Void>) ()->{
ProgressDialog dialog = ModalDialogs.doModal(new ProgressDialog(root, title),
WindowConstants.DO_NOTHING_ON_CLOSE);
action.doAction(dialog);
dialog.done();
return null;
}, x-> {if(update) updateJobs(); });
}
private void updateJobs() {
updateJobs(true);
}
private void updateJobs(boolean showProgress) {
executorService.execute(() -> {
executorServiceUI.execute(() -> {
Progress progress = showProgress
? ModalDialogs.doModal(new ProgressDialog(root, "Updating jobs"),
WindowConstants.DO_NOTHING_ON_CLOSE)
: new DummyProgress();
Set<Job> old = new HashSet<Job>(jobs.getItems());
Map<Job, Job> actual;
try {
actual = manager.getJobs().stream().map(job -> job.update())
.collect(Collectors.toMap(job -> job, job -> job));
} catch (IOException e) {
throw new RuntimeException(e);
manager.getJobs().forEach(job -> job.update());
} catch (IOException e1) {
throw new RuntimeException(e1);
}
for (Job job : old) {
if (!actual.containsKey(job)) {
jobs.getItems().remove(job);
} else {
job.update(actual.get(job));
executorServiceUI.execute(() -> {
Set<Job> old = new HashSet<Job>(jobs.getItems());
Map<Job, Job> actual;
try {
actual = manager.getJobs().stream().
collect(Collectors.toMap(job -> job, job -> job));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
progress.done();
FXFrame.runOnFxThread(() -> {
for (Job job : actual.keySet()) {
if (!old.contains(job)) {
jobs.getItems().add(job);
for (Job job : old) {
if (!actual.containsKey(job)) {
jobs.getItems().remove(job);
} else {
job.update(actual.get(job));
}
}
progress.done();
executorServiceFX.execute(() -> {
for (Job job : actual.keySet()) {
if (!old.contains(job)) {
jobs.getItems().add(job);
}
}
});
});
});
}
private void initTable() {
......@@ -196,7 +222,8 @@ public class BenchmarkSPIMController implements FXFrame.Controller {
}
private void dispose() {
executorService.shutdown();
executorServiceUI.shutdown();
executorServiceWS.shutdown();
timer.cancel();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment