Skip to content
Snippets Groups Projects
Commit c1a9282c authored by Petr Bainar's avatar Petr Bainar
Browse files

Transfer pane

parent bf0fb53c
No related branches found
No related tags found
No related merge requests found
Showing
with 323 additions and 72 deletions
package cz.it4i.fiji.haas;
import java.io.IOException;
......@@ -24,6 +25,7 @@ import org.slf4j.LoggerFactory;
import cz.it4i.fiji.haas.JobManager.JobManager4Job;
import cz.it4i.fiji.haas.JobManager.JobSynchronizableFile;
import cz.it4i.fiji.haas.data_transfer.Synchronization;
import cz.it4i.fiji.haas_java_client.FileTransferInfo;
import cz.it4i.fiji.haas_java_client.HaaSClient;
import cz.it4i.fiji.haas_java_client.HaaSFileTransfer;
import cz.it4i.fiji.haas_java_client.JobInfo;
......@@ -408,57 +410,72 @@ public class Job {
return outputDirectory;
}
public List<FileTransferInfo> getFileTransferInfo() {
return synchronization.getFileTransferInfo();
}
private void storeInputOutputDirectory() {
if (inputDirectory == null) {
useDemoData = true;
propertyHolder.setValue(JOB_USE_DEMO_DATA, "" + useDemoData);
} else {
}
else {
storeDataDirectory(JOB_INPUT_DIRECTORY_PATH, inputDirectory);
}
storeDataDirectory(JOB_OUTPUT_DIRECTORY_PATH, outputDirectory);
}
private void storeDataDirectory(String directoryPropertyName, Path directory) {
private void storeDataDirectory(final String directoryPropertyName,
final Path directory)
{
if (!jobDir.equals(directory)) {
propertyHolder.setValue(directoryPropertyName, directory.toString());
}
}
private Path getDataDirectory(String typeOfDirectory, Path jobDirectory) {
String directory = propertyHolder.getValue(typeOfDirectory);
private Path getDataDirectory(final String typeOfDirectory,
final Path jobDirectory)
{
final String directory = propertyHolder.getValue(typeOfDirectory);
return directory != null ? Paths.get(directory) : jobDirectory;
}
private boolean getSafeBoolean(String value) {
private boolean getSafeBoolean(final String value) {
return value != null ? Boolean.parseBoolean(value) : false;
}
private void setJobDirectory(Path jobDirectory, Function<Path, Path> inputDirectoryProvider,
Function<Path, Path> outputDirectoryProvider) {
private void setJobDirectory(final Path jobDirectory,
final Function<Path, Path> inputDirectoryProvider,
final Function<Path, Path> outputDirectoryProvider)
{
this.jobDir = jobDirectory;
try {
this.synchronization = new Synchronization(() -> startFileTransfer(HaaSClient.DUMMY_TRANSFER_FILE_PROGRESS),
jobDir, this.inputDirectory = inputDirectoryProvider.apply(jobDir),
this.outputDirectory = outputDirectoryProvider.apply(jobDir), () -> {
setProperty(JOB_NEEDS_UPLOAD, false);
setUploaded(true);
}, () -> {
setDownloaded(true);
setProperty(JOB_NEEDS_DOWNLOAD, false);
setCanBeDownloaded(false);
}, p -> jobManager.canUpload(Job.this, p));
} catch (IOException e) {
this.synchronization = new Synchronization(() -> startFileTransfer(
HaaSClient.DUMMY_TRANSFER_FILE_PROGRESS), jobDir, this.inputDirectory =
inputDirectoryProvider.apply(jobDir), this.outputDirectory =
outputDirectoryProvider.apply(jobDir), () -> {
setProperty(JOB_NEEDS_UPLOAD, false);
setUploaded(true);
}, () -> {
setDownloaded(true);
setProperty(JOB_NEEDS_DOWNLOAD, false);
setCanBeDownloaded(false);
}, p -> jobManager.canUpload(Job.this, p));
}
catch (final IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
private HaaSFileTransfer startFileTransfer(TransferFileProgress progress) {
private HaaSFileTransfer startFileTransfer(
final TransferFileProgress progress)
{
return haasClientSupplier.get().startFileTransfer(getId(), progress);
}
private void setName(String name) {
private void setName(final String name) {
setProperty(JOB_NAME, name);
}
......
package cz.it4i.fiji.haas.data_transfer;
import java.io.BufferedReader;
......@@ -5,8 +6,8 @@ import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Queue;
import java.util.Set;
import java.util.function.Function;
......@@ -45,10 +46,10 @@ public class PersistentIndex<T> {
indexedFiles.remove(p);
}
public synchronized void fillQueue(Queue<T> toUpload) {
toUpload.addAll(indexedFiles);
public synchronized Set<T> getIndexedItems() {
return Collections.unmodifiableSet(indexedFiles);
}
public synchronized void clear() throws IOException {
indexedFiles.clear();
storeToWorkingFile();
......@@ -60,7 +61,7 @@ public class PersistentIndex<T> {
private void loadFromWorkingFile() throws IOException {
indexedFiles.clear();
if(Files.exists(workingFile)) {
if (Files.exists(workingFile)) {
try (BufferedReader br = Files.newBufferedReader(workingFile)) {
String line;
while (null != (line = br.readLine())) {
......@@ -74,5 +75,4 @@ public class PersistentIndex<T> {
indexedFiles.add(fromStringConvertor.apply(line));
}
}
......@@ -88,10 +88,14 @@ public abstract class PersistentSynchronizationProcess<T> {
}
public void resume() {
index.fillQueue(toProcessQueue);
toProcessQueue.addAll(index.getIndexedItems());
runner.runIfNotRunning(this::doProcess);
}
public Set<T> getIndexedItems() {
return index.getIndexedItems();
}
public void setNotifier(ProgressNotifier notifier) {
this.notifier = notifier;
}
......
package cz.it4i.fiji.haas.data_transfer;
import java.io.Closeable;
......@@ -10,6 +11,7 @@ import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -21,6 +23,8 @@ import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cz.it4i.fiji.haas_java_client.FileTransferInfo;
import cz.it4i.fiji.haas_java_client.FileTransferState;
import cz.it4i.fiji.haas_java_client.HaaSFileTransfer;
import cz.it4i.fiji.haas_java_client.ProgressNotifier;
import cz.it4i.fiji.haas_java_client.UploadingFile;
......@@ -31,9 +35,9 @@ public class Synchronization implements Closeable {
public static final Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.data_transfer.Synchronization.class);
private static final String FILE_INDEX_TO_UPLOAD_FILENAME = ".toUploadFiles";
private static final String FILE_INDEX_TO_DOWNLOAD_FILENAME = ".toDownloadFiles";
private static final String FILE_INDEX_UPLOADED_FILENAME = ".uploaded";
private static final String FILE_INDEX_TO_DOWNLOAD_FILENAME =
".toDownloadFiles";
private static final String FILE_INDEX_DOWNLOADED_FILENAME = ".downloaded";
private final Path workingDirectory;
......@@ -44,6 +48,8 @@ public class Synchronization implements Closeable {
private final PersistentIndex<Path> filesDownloaded;
private final PersistentIndex<Path> filesUploaded;
private final PersistentSynchronizationProcess<Path> uploadProcess;
private final P_PersistentDownloadProcess downloadProcess;
......@@ -59,10 +65,14 @@ public class Synchronization implements Closeable {
this.inputDirectory = inputDirectory;
this.outputDirectory = outputDirectory;
this.service = Executors.newFixedThreadPool(2);
this.filesDownloaded = new PersistentIndex<>(workingDirectory.resolve(FILE_INDEX_DOWNLOADED_FILENAME),
name -> Paths.get(name));
this.uploadProcess = createUploadProcess(fileTransferSupplier, service, uploadFinishedNotifier);
this.downloadProcess = createDownloadProcess(fileTransferSupplier, service, downloadFinishedNotifier);
this.filesDownloaded = new PersistentIndex<>(workingDirectory.resolve(
FILE_INDEX_DOWNLOADED_FILENAME), name -> Paths.get(name));
this.filesUploaded = new PersistentIndex<>(workingDirectory.resolve(
FILE_INDEX_UPLOADED_FILENAME), name -> Paths.get(name));
this.uploadProcess = createUploadProcess(fileTransferSupplier, service,
uploadFinishedNotifier);
this.downloadProcess = createDownloadProcess(fileTransferSupplier, service,
downloadFinishedNotifier);
this.uploadFilter = uploadFilter;
}
......@@ -100,6 +110,17 @@ public class Synchronization implements Closeable {
this.downloadProcess.resume();
}
public List<FileTransferInfo> getFileTransferInfo() {
final List<FileTransferInfo> list = new LinkedList<>();
filesUploaded.getIndexedItems().forEach(ii -> {
list.add(new FileTransferInfo(ii, FileTransferState.Finished));
});
uploadProcess.getIndexedItems().forEach(ii -> {
list.add(new FileTransferInfo(ii, FileTransferState.Queuing));
});
return list;
}
@Override
public void close() {
service.shutdown();
......@@ -127,9 +148,18 @@ public class Synchronization implements Closeable {
}
@Override
protected void processItem(HaaSFileTransfer tr, Path p) throws InterruptedIOException {
UploadingFile uf = new UploadingFileImpl(p);
protected void processItem(final HaaSFileTransfer tr, final Path p)
throws InterruptedIOException
{
final UploadingFile uf = new UploadingFileImpl(p);
tr.upload(uf);
filesUploaded.insert(inputDirectory.resolve(p.toString()));
try {
filesUploaded.storeToWorkingFile();
}
catch (final IOException e) {
log.error(e.getMessage(), e);
}
}
@Override
......@@ -172,14 +202,17 @@ public class Synchronization implements Closeable {
}
@Override
protected void processItem(HaaSFileTransfer tr, String file) throws InterruptedIOException {
protected void processItem(final HaaSFileTransfer tr, final String file)
throws InterruptedIOException
{
tr.download(file, outputDirectory);
filesDownloaded.insert(outputDirectory.resolve(file));
try {
filesDownloaded.storeToWorkingFile();
} catch (IOException e) {
}
catch (final IOException e) {
log.error(e.getMessage(), e);
}
tr.download(file, outputDirectory);
}
@Override
......
package cz.it4i.fiji.haas_java_client;
import java.nio.file.Path;
public class FileTransferInfo {
private final Path path;
private final FileTransferState state;
public FileTransferInfo(final Path path, final FileTransferState state) {
this.path = path;
this.state = state;
}
public String getPathAsString() {
return path.toString();
}
public FileTransferState getState() {
return state;
}
}
package cz.it4i.fiji.haas_java_client;
public enum FileTransferState {
Unknown, Queuing, InProcess, Finished;
}
\ No newline at end of file
package cz.it4i.fiji.haas_java_client;
import com.jcraft.jsch.JSchException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
......@@ -12,8 +15,6 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.jcraft.jsch.JSchException;
import cz.it4i.fiji.haas_java_client.proxy.FileTransferMethodExt;
import cz.it4i.fiji.scpclient.ScpClient;
import cz.it4i.fiji.scpclient.TransferFileProgress;
......@@ -39,34 +40,47 @@ class HaaSFileTransferImp implements HaaSFileTransfer {
}
@Override
public void upload(UploadingFile file) throws InterruptedIOException{
String destFile = "'" + ft.getSharedBasepath() + "/" + file.getName() + "'";
public void upload(final UploadingFile file) throws InterruptedIOException {
final String destFile = "'" + ft.getSharedBasepath() + "/" + file
.getName() + "'";
try (InputStream is = file.getInputStream()) {
boolean result = scpClient.upload(is, destFile, file.getLength(), file.getLastTime(), progress);
if (!result) {
throw new HaaSClientException("Uploading of " + file + " to " + destFile + " failed");
if (!scpClient.upload(is, destFile, file.getLength(), file.getLastTime(),
progress))
{
throw new HaaSClientException("Uploading of " + file + " to " +
destFile + " failed");
}
} catch(InterruptedIOException e) {
}
catch (final InterruptedIOException e) {
throw e;
} catch (JSchException | IOException e) {
}
catch (JSchException | IOException e) {
throw new HaaSClientException(e);
}
}
@Override
public void download(String fileName, Path workDirectory) throws InterruptedIOException{
public void download(String fileName, final Path workDirectory)
throws InterruptedIOException
{
try {
fileName = fileName.replaceFirst("/", "");
Path rFile = workDirectory.resolve(fileName);
String fileToDownload = "'" + ft.getSharedBasepath() + "/" + fileName + "'";
scpClient.download(fileToDownload, rFile, progress);
} catch(InterruptedIOException e) {
final Path rFile = workDirectory.resolve(fileName);
final String fileToDownload = "'" + ft.getSharedBasepath() + "/" +
fileName + "'";
if (!scpClient.download(fileToDownload, rFile, progress)) {
throw new HaaSClientException("Downloading of " + fileName + " to " +
workDirectory + " failed");
}
}
catch (final InterruptedIOException e) {
throw e;
} catch (JSchException | IOException e) {
}
catch (JSchException | IOException e) {
throw new HaaSClientException(e);
}
}
@Override
public void setProgress(TransferFileProgress progress) {
this.progress = progress;
......
......@@ -54,6 +54,7 @@ import cz.it4i.fiji.haas.HaaSOutputHolder;
import cz.it4i.fiji.haas.HaaSOutputHolderImpl;
import cz.it4i.fiji.haas.Job;
import cz.it4i.fiji.haas.JobManager;
import cz.it4i.fiji.haas_java_client.FileTransferInfo;
import cz.it4i.fiji.haas_java_client.HaaSClientSettings;
import cz.it4i.fiji.haas_java_client.JobSettings;
import cz.it4i.fiji.haas_java_client.JobSettingsBuilder;
......@@ -282,11 +283,15 @@ public class BenchmarkJobManager implements Closeable {
public Path getOutputDirectory() {
return job.getOutputDirectory();
}
public Path getResultXML() {
return Paths.get(job.getProperty(SPIM_OUTPUT_FILENAME_PATTERN) + ".xml");
}
public List<FileTransferInfo> getFileTransferInfo() {
return job.getFileTransferInfo();
}
private ProgressNotifier convertTo(Progress progress) {
return progress == null ? null : new ProgressNotifierAdapter(progress);
}
......
package cz.it4i.fiji.haas_spim_benchmark.ui;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cz.it4i.fiji.haas.ui.CloseableControl;
import cz.it4i.fiji.haas.ui.JavaFXRoutines;
import cz.it4i.fiji.haas.ui.UpdatableObservableValue;
import cz.it4i.fiji.haas.ui.UpdatableObservableValue.UpdateStatus;
import cz.it4i.fiji.haas_java_client.FileTransferInfo;
import cz.it4i.fiji.haas_spim_benchmark.core.BenchmarkJobManager.BenchmarkJob;
import javafx.beans.value.ObservableValue;
import javafx.fxml.FXML;
import javafx.scene.control.TableView;
import javafx.scene.layout.BorderPane;
public class DataTransferController extends BorderPane implements
CloseableControl
{
private static final String FXML_FILE_NAME = "DataTransfer.fxml";
@SuppressWarnings("unused")
private static Logger log = LoggerFactory.getLogger(
cz.it4i.fiji.haas_spim_benchmark.ui.DataTransferController.class);
@FXML
private TableView<ObservableValue<FileTransferInfo>> filesToUpload;
private BenchmarkJob job;
public DataTransferController() {
JavaFXRoutines.initRootAndController(FXML_FILE_NAME, this);
initTable();
}
public void setJob(final BenchmarkJob job) {
this.job = job;
fillTable();
}
// -- CloseableControl methods --
@Override
public void close() {
// DO NOTHING
}
// -- Helper methods --
private void initTable() {
setCellValueFactory(0, f -> f.getPathAsString());
setCellValueFactory(1, f -> f.getState().toString());
}
private void setCellValueFactory(final int columnIndex,
final Function<FileTransferInfo, String> mapper)
{
JavaFXRoutines.setCellValueFactory(filesToUpload, columnIndex, mapper);
}
private void fillTable() {
final List<ObservableValue<FileTransferInfo>> tempList = new LinkedList<>();
job.getFileTransferInfo().forEach(i -> {
tempList.add(new UpdatableObservableValue<>(i,
x -> UpdateStatus.NotUpdated, x -> x));
});
filesToUpload.getItems().addAll(tempList);
}
}
......@@ -21,7 +21,6 @@ public class JobDetailControl extends TabPane implements CloseableControl,
InitiableControl
{
@SuppressWarnings("unused")
private static Logger log = LoggerFactory.getLogger(
cz.it4i.fiji.haas_spim_benchmark.ui.JobDetailControl.class);
......@@ -40,11 +39,17 @@ public class JobDetailControl extends TabPane implements CloseableControl,
@FXML
private Tab jobPropertiesTab;
@FXML
private DataTransferController dataUpload;
@FXML
private Tab dataUploadTab;
private final HaaSOutputObservableValueRegistry observableValueRegistry;
private final BenchmarkJob job;
public JobDetailControl(BenchmarkJob job) {
public JobDetailControl(final BenchmarkJob job) {
JavaFXRoutines.initRootAndController("JobDetail.fxml", this);
progressView.setJob(job);
observableValueRegistry = new HaaSOutputObservableValueRegistry(job,
......@@ -55,34 +60,65 @@ public class JobDetailControl extends TabPane implements CloseableControl,
standardOutput.setObservable(observableValueRegistry.createObservable(
SynchronizableFileType.StandardOutputFile));
jobProperties.setJob(job);
dataUpload.setJob(job);
observableValueRegistry.start();
this.job = job;
}
// -- InitiableControl methods --
@Override
public void init(Window parameter) {
if (!isExecutionDetailsAvailable(job)) {
enableOnlySpecificTab(jobPropertiesTab);
public void init(final Window parameter) {
if (job.getState() == JobState.Disposed) {
// TODO: Handle this?
log.debug("Job " + job.getId() + " state has been resolved as Disposed.");
}
}
private void enableOnlySpecificTab(Tab tabToLeaveEnabled) {
getTabs().stream().filter(node -> node != tabToLeaveEnabled).forEach(
node -> node.setDisable(true));
getSelectionModel().select(jobPropertiesTab);
}
disableNonPermanentTabs();
private boolean isExecutionDetailsAvailable(BenchmarkJob inspectedJob) {
return inspectedJob.getState() == JobState.Running || inspectedJob
.getState() == JobState.Finished || inspectedJob
.getState() == JobState.Failed || inspectedJob
.getState() == JobState.Canceled;
if (areExecutionDetailsAvailable()) {
enableAllTabs();
}
}
// -- CloseableControl methods --
@Override
public void close() {
observableValueRegistry.close();
// Close controllers
progressView.close();
jobProperties.close();
dataUpload.close();
}
// -- Helper methods --
/*
* Checks whether execution details are available
*/
private boolean areExecutionDetailsAvailable() {
return job.getState() == JobState.Running || job
.getState() == JobState.Finished || job.getState() == JobState.Failed ||
job.getState() == JobState.Canceled;
}
/*
* Disables all tabs except those which shall be always enabled, such as job properties tab
*/
private void disableNonPermanentTabs() {
getTabs().stream().filter(t -> t != jobPropertiesTab && t != dataUploadTab)
.forEach(t -> t.setDisable(true));
getSelectionModel().select(jobPropertiesTab);
}
/*
* Enables all tabs
*/
private void enableAllTabs() {
getTabs().stream().forEach(t -> t.setDisable(false));
}
}
<?xml version="1.0" encoding="UTF-8"?>
<?import javafx.geometry.Insets?>
<?import javafx.scene.control.TableColumn?>
<?import javafx.scene.control.TableView?>
<?import javafx.scene.layout.BorderPane?>
<?import javafx.scene.layout.HBox?>
<fx:root type="BorderPane" xmlns="http://javafx.com/javafx/8.0.65" xmlns:fx="http://javafx.com/fxml/1" fx:controller="cz.it4i.fiji.haas_spim_benchmark.ui.DataTransferController">
<center>
<HBox>
<children>
<TableView fx:id="filesToUpload" prefHeight="400.0" prefWidth="500.0" HBox.hgrow="ALWAYS">
<columns>
<TableColumn prefWidth="400.0" text="File" />
<TableColumn prefWidth="100.0" text="Status" />
</columns>
</TableView>
</children>
<padding>
<Insets bottom="1.0" left="1.0" right="1.0" top="1.0" />
</padding>
</HBox>
</center>
</fx:root>
<?xml version="1.0" encoding="UTF-8"?>
<?import cz.it4i.fiji.haas_spim_benchmark.ui.DataTransferController?>
<?import cz.it4i.fiji.haas_spim_benchmark.ui.LogViewControl?>
<?import cz.it4i.fiji.haas_spim_benchmark.ui.SPIMPipelineProgressViewController?>
<?import javafx.scene.control.Tab?>
......@@ -45,5 +46,12 @@
</content>
</Tab>
<Tab closable="false" text="Data upload" fx:id="dataUploadTab">
<content>
<HBox>
<DataTransferController fx:id="dataUpload" HBox.hgrow="ALWAYS" />
</HBox>
</content>
</Tab>
</tabs>
</fx:root>
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment