diff --git a/haas-imagej-client/pom.xml b/haas-imagej-client/pom.xml index efd6dd583d61904dc0cd099b0dd12019ff899d94..abf38a003a8ac07b16a50e7723fa442d01a74d78 100644 --- a/haas-imagej-client/pom.xml +++ b/haas-imagej-client/pom.xml @@ -85,7 +85,16 @@ <artifactId>haas-java-client</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> - <!-- https://mvnrepository.com/artifact/net.imagej/imagej-launcher --> + <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-jdk14 --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </dependency> </dependencies> </project> diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/BenchmarkJobManager.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/BenchmarkJobManager.java new file mode 100644 index 0000000000000000000000000000000000000000..e4620e640b62ab4225db4a491261813b5dca8d4b --- /dev/null +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/BenchmarkJobManager.java @@ -0,0 +1,49 @@ +package cz.it4i.fiji.haas; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Path; +import java.time.Instant; +import java.util.Collections; + +import cz.it4i.fiji.haas.JobManager.JobInfo; +import cz.it4i.fiji.haas_java_client.JobState; +import net.imagej.updater.util.Progress; + +public class BenchmarkJobManager { + private JobManager jobManager; + private Progress progress; + private Path workDirectory; + private static final String CONFIG_FOR_MODIFICATION = "not-set-config.yaml"; + private static final String CONFIG_MODIFIED = "config.yaml"; + + public BenchmarkJobManager(Path workDirectory, Progress progress) throws IOException { + this.workDirectory = workDirectory; + jobManager = new JobManager(workDirectory, TestingConstants.getSettings(3, 6)); + this.progress = progress; + } + + public JobInfo startJob() throws IOException { + JobInfo jobInfo = jobManager.startJob( Collections.emptyList(), null); + jobInfo.waitForStart(); + if(jobInfo.getState() != JobState.Running) { + throw new IllegalStateException("start of job: " + jobInfo + " failed"); + } + ByteArrayOutputStream os = new ByteArrayOutputStream(); + jobInfo.downloadFileData(CONFIG_FOR_MODIFICATION,os); + byte[]data = updateConfigFile(os.toByteArray()); + + jobInfo.uploadFile (new ByteArrayInputStream(data),CONFIG_MODIFIED, data.length ,Instant.now().getEpochSecond()); + return jobInfo; + } + + private byte[] updateConfigFile(byte[] data) throws IOException { + return data; + + } + + +} diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java index 5bc7650ea9047f08d800d63d71f6a795d1191f67..a280b9c9646b8e4a588c627665c454fdc521c5af 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java @@ -24,7 +24,6 @@ import net.imagej.updater.util.Progress; public class Job { - private static final String JOB_HAS_DATA_TO_DOWNLOAD_PROPERTY = "job.needDownload"; public static boolean isJobPath(Path p) { @@ -46,37 +45,39 @@ public class Job { private Long jobId; final private Progress dummy = new Progress() { - + @Override public void setTitle(String title) { } - + @Override public void setItemCount(int count, int total) { } - + @Override public void setCount(int count, int total) { } - + @Override public void itemDone(Object item) { } - + @Override public void done() { } - + @Override public void addItem(Object item) { } }; - - public Job(Path path, Collection<Path> files, Supplier<HaaSClient> haasClientSupplier, Progress progress) throws IOException { + + public Job(Path basePath, Collection<Path> files, Supplier<HaaSClient> haasClientSupplier, Progress progress) + throws IOException { this(haasClientSupplier); HaaSClient client = this.haasClientSupplier.get(); - long id = client.start(files, "TestOutRedirect", Collections.emptyList(), new P_ProgressNotifierAdapter(progress)); - jobDir = path.resolve("" + id); + long id = client.start(files, "TestOutRedirect", Collections.emptyList(), + new P_ProgressNotifierAdapter(progress)); + jobDir = basePath.resolve("" + id); Files.createDirectory(jobDir); updateState(); } @@ -132,6 +133,11 @@ public class Job { } } + public void downloadFileData(String fileName, OutputStream bos) { + haasClientSupplier.get().downloadFileData(jobId,fileName, bos); + } + + public JobState getState() { return state; } @@ -182,8 +188,6 @@ public class Job { private static long getJobId(Path path) { return Long.parseLong(path.getFileName().toString()); } - - private class P_ProgressNotifierAdapter implements ProgressNotifier { private Progress progress; @@ -216,7 +220,9 @@ public class Job { public void done() { progress.done(); } - - + } + + + } diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java index 9d10969960757e162b245388e50ebf0f142c5a20..ba976fa2216c5a263969d20939e4bfdc67b56db1 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java @@ -1,6 +1,8 @@ package cz.it4i.fiji.haas; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.Calendar; @@ -9,6 +11,9 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import cz.it4i.fiji.haas_java_client.HaaSClient; import cz.it4i.fiji.haas_java_client.JobState; import cz.it4i.fiji.haas_java_client.Settings; @@ -17,6 +22,8 @@ import net.imagej.updater.util.Progress; public class JobManager { + private static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.JobManager.class); + private Path workDirectory; private Collection<Job> jobs = new LinkedList<>(); @@ -25,7 +32,6 @@ public class JobManager { private Settings settings; - public JobManager(Path workDirectory, Settings settings) throws IOException { this.workDirectory = workDirectory; this.settings = settings; @@ -39,8 +45,20 @@ public class JobManager { } - public void startJob(Path path, Collection<Path> files, Progress progress) throws IOException { - jobs.add(new Job(path, files, this::getHaasClient, progress)); + public JobInfo startJob(Collection<Path> files, Progress progress) throws IOException { + Job job; + jobs.add(job = new Job(workDirectory, files, this::getHaasClient, progress)); + return new JobInfo(job) { + @Override + public JobState getState() { + try { + job.updateState(); + } catch (IOException e) { + log.error(e.getMessage(), e); + } + return super.getState(); + } + }; } public Iterable<JobInfo> getJobsNeedingDownload() { @@ -58,6 +76,12 @@ public class JobManager { } + public JobState getState(long id) { + return getHaasClient().obtainJobInfo(id).getState(); + } + + + private HaaSClient getHaasClient() { if (haasClient == null) { haasClient = new HaaSClient(settings); @@ -65,7 +89,6 @@ public class JobManager { return haasClient; } - public static class JobInfo extends ObservableValueBase<JobInfo> { private Job job; @@ -102,7 +125,13 @@ public class JobManager { job.download(progress); fireValueChangedEvent(); } + + public void waitForStart() { + // TODO Auto-generated method stub + + } + public void updateInfo() throws IOException { job.updateState(); } @@ -111,10 +140,25 @@ public class JobManager { public JobInfo getValue() { return this; } + + public void downloadFileData(String fileName, OutputStream bos) { + job.downloadFileData(fileName, bos); + } + + public void uploadFile(ByteArrayInputStream byteArrayInputStream, String configModified, int length, + long epochSecond) { + + //TODO + } private String getStringFromTimeSafely(Calendar time) { return time != null ? time.getTime().toString() : "N/A"; } + + + + } + } diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/RunWithHaaS.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/RunWithHaaS.java index cce86a42bc28b6ce4e0b19152bbd590666eaddda..13199d040f44358ac5617d64b2ebe036db469880 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/RunWithHaaS.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/RunWithHaaS.java @@ -49,9 +49,8 @@ public class RunWithHaaS implements Command { @Override public void run() { try { - jobManager = new JobManager(getWorkingDirectoryPath(),TestingConstants.getSettings()); - jobManager.startJob(getWorkingDirectoryPath(), getContent(dataDirectory), - ModalDialogs.doModal(new ProgressDialog(getFrame()))); + jobManager = new JobManager(getWorkingDirectoryPath(), TestingConstants.getSettings()); + jobManager.startJob(getContent(dataDirectory), ModalDialogs.doModal(new ProgressDialog(getFrame()))); } catch (IOException e) { log.error(e); } diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java index 7be909bb246f78bf2b36c3d6766829015917be1e..8a171f8f1b571397077f527438b4b7cc021e637c 100644 --- a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java @@ -1,6 +1,7 @@ package cz.it4i.fiji.haas_java_client; import java.io.IOException; +import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.nio.file.Files; import java.nio.file.Path; @@ -14,7 +15,9 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.xml.rpc.ServiceException; @@ -210,7 +213,7 @@ public class HaaSClient { public java.util.Calendar getEndTime() { return info.getEndTime(); }; - + public Calendar getCreationTime() { return info.getCreationTime(); }; @@ -235,20 +238,25 @@ public class HaaSClient { } public void download(long jobId, Path workDirectory, final ProgressNotifier notifier) { + download(jobId, workDirectory, notifier, val -> true); + } + + public void download(long jobId, Path workDirectory, final ProgressNotifier notifier, Predicate<String> function) { try { notifier.setTitle("Downloading"); FileTransferMethodExt ft = getFileTransfer().getFileTransferMethod(jobId, getSessionID()); try (ScpClient scpClient = getScpClient(ft)) { - String[] files = getFileTransfer().listChangedFilesForJob(jobId, getSessionID()); + String[] filesArray = getFileTransfer().listChangedFilesForJob(jobId, getSessionID()); + Stream<String> files = Arrays.asList(filesArray).stream().filter(function); List<Long> fileSizes = getSizes( - Arrays.asList(files).stream() - .map(filename -> "'" + ft.getSharedBasepath() + "/" + filename + "'") - .collect(Collectors.toList()), + files.map(filename -> "'" + ft.getSharedBasepath() + "/" + filename + "'").collect( + Collectors.toList()), scpClient, new P_ProgressNotifierDecorator4Size(notifier)); final long totalFileSize = fileSizes.stream().mapToLong(i -> i.longValue()).sum(); - TransferFileProgressForHaaSClient progress =new TransferFileProgressForHaaSClient(totalFileSize, notifier); + TransferFileProgressForHaaSClient progress = new TransferFileProgressForHaaSClient(totalFileSize, + notifier); int idx = 0; - for (String fileName : files) { + for (String fileName : (Iterable<String>) files::iterator) { fileName = fileName.replaceFirst("/", ""); Path rFile = workDirectory.resolve(fileName); if (!Files.exists(rFile.getParent())) { @@ -271,6 +279,17 @@ public class HaaSClient { } } + public void downloadFileData(long jobId, String fileName, OutputStream os) { + try { + FileTransferMethodExt ft = getFileTransfer().getFileTransferMethod(jobId, getSessionID()); + try (ScpClient scpClient = getScpClient(ft)) { + scpClient.download(fileName, os, new TransferFileProgressForHaaSClient(0, dummyNotifier)); + } + } catch (IOException | JSchException | ServiceException e) { + throw new HaaSClientException(e); + } + } + private List<Long> getSizes(List<String> asList, ScpClient scpClient, ProgressNotifier notifier) throws JSchException, IOException { List<Long> result = new LinkedList<>(); @@ -428,4 +447,5 @@ public class HaaSClient { notifier.done(); } } + } diff --git a/java-scpclient/src/main/java/cz/it4i/fiji/scpclient/ScpClient.java b/java-scpclient/src/main/java/cz/it4i/fiji/scpclient/ScpClient.java index 85b5b25f0ce82e6c4949d40c4cd88c8cb5cc5eaa..fecafb4e2d63daf2e22a464959a125729073cbb2 100644 --- a/java-scpclient/src/main/java/cz/it4i/fiji/scpclient/ScpClient.java +++ b/java-scpclient/src/main/java/cz/it4i/fiji/scpclient/ScpClient.java @@ -66,6 +66,13 @@ public class ScpClient implements Closeable { } public boolean download(String lfile, Path rfile, TransferFileProgress progress) throws JSchException, IOException { + try(OutputStream os = Files.newOutputStream(rfile)) { + return download(lfile, os, progress); + } + } + + public boolean download(String lfile, OutputStream os, TransferFileProgress progress) + throws JSchException, IOException { Session session = connectionSession(); // exec 'scp -f rfile' remotely @@ -125,25 +132,24 @@ public class ScpClient implements Closeable { out.flush(); // read a content of lfile - try (OutputStream fos = Files.newOutputStream(rfile)) { - int foo; - while (true) { - if (buf.length < filesize) - foo = buf.length; - else - foo = (int) filesize; - foo = in.read(buf, 0, foo); - if (foo < 0) { - // error - break; - } - fos.write(buf, 0, foo); - progress.dataTransfered(foo); - filesize -= foo; - if (filesize == 0L) - break; + int foo; + while (true) { + if (buf.length < filesize) + foo = buf.length; + else + foo = (int) filesize; + foo = in.read(buf, 0, foo); + if (foo < 0) { + // error + break; } + os.write(buf, 0, foo); + progress.dataTransfered(foo); + filesize -= foo; + if (filesize == 0L) + break; } + if (checkAck(in) != 0) { return false; } @@ -167,11 +173,16 @@ public class ScpClient implements Closeable { } public boolean upload(Path file, String rfile, TransferFileProgress progress) throws JSchException, IOException { + try (InputStream is = Files.newInputStream(file)) { + return upload(is, file.getFileName().toString(), file.toFile().length(), file.toFile().lastModified(), + rfile, progress); + } + } + public boolean upload(InputStream is, String fileName, long length, long lastModified, String rfile, + TransferFileProgress progress) throws JSchException, IOException { Session session = connectionSession(); - boolean ptimestamp = true; - // exec 'scp -t rfile' remotely String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + rfile; Channel channel = session.openChannel("exec"); @@ -184,10 +195,10 @@ public class ScpClient implements Closeable { } if (ptimestamp) { - command = "T " + (file.toFile().lastModified() / 1000) + " 0"; + command = "T " + (lastModified / 1000) + " 0"; // The access time should be sent here, // but it is not accessible with JavaAPI ;-< - command += (" " + (file.toFile().lastModified() / 1000) + " 0\n"); + command += (" " + (lastModified / 1000) + " 0\n"); out.write(command.getBytes()); out.flush(); if (checkAck(in) != 0) { @@ -196,9 +207,9 @@ public class ScpClient implements Closeable { } // send "C0644 filesize filename", where filename should not include '/' - long filesize = file.toFile().length(); + long filesize = length; command = "C0644 " + filesize + " "; - command += file.getFileName().toString(); + command += fileName; command += "\n"; out.write(command.getBytes()); out.flush(); @@ -207,14 +218,12 @@ public class ScpClient implements Closeable { } byte[] buf = new byte[getBufferSize()]; // send a content of lfile - try (InputStream fis = Files.newInputStream(file)) { - while (true) { - int len = fis.read(buf, 0, buf.length); - if (len <= 0) - break; - out.write(buf, 0, len); // out.flush(); - progress.dataTransfered(len); - } + while (true) { + int len = is.read(buf, 0, buf.length); + if (len <= 0) + break; + out.write(buf, 0, len); // out.flush(); + progress.dataTransfered(len); } // send '\0' buf[0] = 0; @@ -233,35 +242,35 @@ public class ScpClient implements Closeable { public long size(String lfile) throws JSchException, IOException { Session session = connectionSession(); - + // exec 'scp -f rfile' remotely String command = "scp -f " + lfile; Channel channel = session.openChannel("exec"); - + try { ((ChannelExec) channel).setCommand(command); - + // get I/O streams for remote scp try (OutputStream out = channel.getOutputStream(); InputStream in = channel.getInputStream()) { - + channel.connect(); - + byte[] buf = new byte[getBufferSize()]; - + // send '\0' buf[0] = 0; out.write(buf, 0, 1); out.flush(); - + while (true) { int c = checkAck(in); if (c != 'C') { break; } - + // read '0644 ' in.read(buf, 0, 5); - + long filesize = 0L; while (true) { if (in.read(buf, 0, 1) < 0) { @@ -273,10 +282,10 @@ public class ScpClient implements Closeable { filesize = filesize * 10L + (long) (buf[0] - '0'); } return filesize; - + } } - + } finally { channel.disconnect(); }