Skip to content
Snippets Groups Projects
Commit 29dfeff8 authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

Convert HaaSClient interface to Stream<UploadingData>

parent ec47d176
Branches
Tags
No related merge requests found
......@@ -5,9 +5,10 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Collections;
import java.util.Arrays;
import cz.it4i.fiji.haas.JobManager.JobInfo;
import cz.it4i.fiji.haas_java_client.HaaSClient;
import cz.it4i.fiji.haas_java_client.JobState;
import net.imagej.updater.util.Progress;
......@@ -26,7 +27,8 @@ public class BenchmarkJobManager {
}
public JobInfo startJob() throws IOException {
JobInfo jobInfo = jobManager.startJob(Collections.emptyList(), null);
JobInfo jobInfo = jobManager.startJob(Arrays.asList(getUploadingFile()).stream(), progress);
jobInfo.waitForStart();
if (jobInfo.getState() != JobState.Running) {
throw new IllegalStateException("start of job: " + jobInfo + " failed");
......@@ -40,6 +42,10 @@ public class BenchmarkJobManager {
return jobInfo;
}
private HaaSClient.UploadingFile getUploadingFile() {
return new UploadingFileFromResource("", "config.yaml");
}
public JobState getState(long jobId) {
return jobManager.getState(jobId);
}
......
......@@ -7,16 +7,17 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Properties;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.scijava.log.LogService;
import org.scijava.plugin.Parameter;
import cz.it4i.fiji.haas_java_client.HaaSClient;
import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile;
import cz.it4i.fiji.haas_java_client.JobInfo;
import cz.it4i.fiji.haas_java_client.JobState;
import cz.it4i.fiji.haas_java_client.ProgressNotifier;
......@@ -71,7 +72,7 @@ public class Job {
}
};
public Job(Path basePath, Collection<Path> files, Supplier<HaaSClient> haasClientSupplier, Progress progress)
public Job(Path basePath, Stream<UploadingFile> files, Supplier<HaaSClient> haasClientSupplier, Progress progress)
throws IOException {
this(haasClientSupplier);
HaaSClient client = this.haasClientSupplier.get();
......
......@@ -10,11 +10,13 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cz.it4i.fiji.haas_java_client.HaaSClient;
import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile;
import cz.it4i.fiji.haas_java_client.JobState;
import cz.it4i.fiji.haas_java_client.Settings;
import javafx.beans.value.ObservableValueBase;
......@@ -45,7 +47,7 @@ public class JobManager {
}
public JobInfo startJob(Collection<Path> files, Progress progress) throws IOException {
public JobInfo startJob(Stream<UploadingFile> files, Progress progress) throws IOException {
Job job;
jobs.add(job = new Job(workDirectory, files, this::getHaasClient, progress));
return new JobInfo(job) {
......
......@@ -19,6 +19,7 @@ import org.scijava.ui.UIService;
import org.scijava.widget.UIComponent;
import cz.it4i.fiji.haas.ui.ProgressDialog;
import cz.it4i.fiji.haas_java_client.HaaSClient;
import net.imagej.ImageJ;
/**
......@@ -50,7 +51,7 @@ public class RunWithHaaS implements Command {
public void run() {
try {
jobManager = new JobManager(getWorkingDirectoryPath(), TestingConstants.getSettings());
jobManager.startJob(getContent(dataDirectory), ModalDialogs.doModal(new ProgressDialog(getFrame())));
jobManager.startJob(getContent(dataDirectory).stream().map(HaaSClient::getUploadingFile), ModalDialogs.doModal(new ProgressDialog(getFrame())));
} catch (IOException e) {
log.error(e);
}
......
package cz.it4i.fiji.haas;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile;
public class UploadingFileFromResource implements UploadingFile {
private String fileName;
private String base;
private Long length;
private long lastTime;
public UploadingFileFromResource(String base, String fileName) {
this.base = base;
this.fileName = fileName;
this.lastTime = Instant.now().getEpochSecond()*1000;
}
@Override
public InputStream getInputStream() {
return this.getClass().getResourceAsStream(base + "/" + fileName);
}
@Override
public String getName() {
return fileName;
}
@Override
public long getLength() {
if(length == null) {
length = computeLenght();
}
return length;
}
private Long computeLenght() {
try(InputStream is = getInputStream()) {
long result = 0;
int available;
while(0 != (available = is.available())) {
result += is.skip(available);
}
return result;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public long getLastTime() {
return lastTime;
}
}
This diff is collapsed.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cz.it4i.fiji.haas.UploadingFileFromResource;
import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile;
public class TestUploadingData {
private static Logger log = LoggerFactory.getLogger(TestUploadingData.class);
public static void main(String[] args) {
UploadingFile uf = new UploadingFileFromResource("", "config.yaml");
log.info("size: " + uf.getLength());
}
}
......@@ -84,7 +84,18 @@
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-jdk14 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
</project>
......@@ -19,9 +19,13 @@ import java.util.Map.Entry;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.xml.rpc.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.jcraft.jsch.JSchException;
import cz.it4i.fiji.haas_java_client.proxy.CommandTemplateParameterValueExt;
......@@ -45,13 +49,19 @@ import cz.it4i.fiji.haas_java_client.proxy.UserAndLimitationManagementWsSoap;
import cz.it4i.fiji.scpclient.ScpClient;
public class HaaSClient {
interface UploadingFile {
private static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas_java_client.HaaSClient.class);
public interface UploadingFile {
InputStream getInputStream();
String getName();
long getLength();
long getLastTime();
}
static public class SynchronizableFiles {
private Collection<TaskFileOffsetExt> files = new LinkedList<>();
......@@ -157,30 +167,41 @@ public class HaaSClient {
return start(files, name, templateParameters, dummyNotifier);
}
public long start(Iterable<Path> files, String name, Collection<Entry<String, String>> templateParameters,
private long start(Iterable<Path> files, String name, Collection<Entry<String, String>> templateParameters,
ProgressNotifier notifier) {
Stream<UploadingFile> fileStream = StreamSupport.stream(files.spliterator(), false)
.map(file -> getUploadingFile(file));
return start(fileStream, name, templateParameters, notifier);
}
public long start(Stream<UploadingFile> files, String name, Collection<Entry<String, String>> templateParameters,
ProgressNotifier notifier) {
notifier.setTitle("Starting job");
TaskSpecificationExt taskSpec = createTaskSpecification(name, templateId, templateParameters);
JobSpecificationExt jobSpecification = createJobSpecification(name, Arrays.asList(taskSpec));
try {
String item;
TaskSpecificationExt taskSpec = createTaskSpecification(name, templateId, templateParameters);
JobSpecificationExt jobSpecification = createJobSpecification(name, Arrays.asList(taskSpec));
String jobItem;
SubmittedJobInfoExt job = getJobManagement().createJob(jobSpecification, getSessionID());
notifier.addItem(jobItem = String.format("Created job: %d\n", job.getId()));
FileTransferMethodExt fileTransfer = getFileTransfer().getFileTransferMethod(job.getId(), getSessionID());
List<Long> totalSizes = getSizes(files);
List<Long> totalSizes = StreamSupport.stream(files.spliterator(), false).map(f -> f.getLength())
.collect(Collectors.toList());
long totalSize = totalSizes.stream().mapToLong(l -> l.longValue()).sum();
TransferFileProgressForHaaSClient progress = new TransferFileProgressForHaaSClient(totalSize, notifier);
try (ScpClient scpClient = getScpClient(fileTransfer)) {
int index = 0;
for (Path file : files) {
for (UploadingFile file : (Iterable<UploadingFile>) files::iterator) {
String item;
progress.startNewFile(totalSizes.get(index));
notifier.addItem(item = "Uploading file: " + file.getFileName());
String destFile = "'" + fileTransfer.getSharedBasepath() + "/" + file.getFileName() + "'";
boolean result = scpClient.upload(file, destFile, progress);
notifier.itemDone(item);
if (!result) {
throw new HaaSClientException("Uploading of " + file + " to " + destFile + " failed");
notifier.addItem(item = "Uploading file: " + file.getName());
String destFile = "'" + fileTransfer.getSharedBasepath() + "/" + file.getName() + "'";
try (InputStream is = file.getInputStream()) {
boolean result = scpClient.upload(is, destFile, file.getLength(), file.getLastTime(), progress);
notifier.itemDone(item);
if (!result) {
throw new HaaSClientException("Uploading of " + file + " to " + destFile + " failed");
}
}
index++;
}
......@@ -297,13 +318,14 @@ public class HaaSClient {
throw new HaaSClientException(e);
}
}
public void uploadFileData(Long jobId, InputStream inputStream, String fileName, long length,
long lastModification) {
try {
FileTransferMethodExt ft = getFileTransfer().getFileTransferMethod(jobId, getSessionID());
try (ScpClient scpClient = getScpClient(ft)) {
scpClient.upload(inputStream,fileName,length,lastModification, new TransferFileProgressForHaaSClient(0, dummyNotifier));
scpClient.upload(inputStream, fileName, length, lastModification,
new TransferFileProgressForHaaSClient(0, dummyNotifier));
getFileTransfer().endFileTransfer(jobId, ft, getSessionID());
}
} catch (IOException | JSchException | ServiceException e) {
......@@ -311,6 +333,47 @@ public class HaaSClient {
}
}
public static UploadingFile getUploadingFile(Path file) {
return new UploadingFile() {
@Override
public InputStream getInputStream() {
try {
return Files.newInputStream(file);
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
@Override
public String getName() {
return file.getFileName().toString();
}
@Override
public long getLength() {
try {
return Files.size(file);
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
@Override
public long getLastTime() {
try {
return Files.getLastModifiedTime(file).toMillis();
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
};
}
private List<Long> getSizes(List<String> asList, ScpClient scpClient, ProgressNotifier notifier)
throws JSchException, IOException {
List<Long> result = new LinkedList<>();
......@@ -405,14 +468,6 @@ public class HaaSClient {
return fileTransfer;
}
private List<Long> getSizes(Iterable<Path> files) throws IOException {
List<Long> result = new LinkedList<>();
for (Path path : files) {
result.add(Files.size(path));
}
return result;
}
private String getSessionID() throws RemoteException, ServiceException {
if (sessionID == null) {
sessionID = authenticate();
......@@ -469,6 +524,4 @@ public class HaaSClient {
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment