diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java index 7a04defdd4bfa744f0675d447baf1adc8eaae751..271dbda60d89720793b23378c8a4ac2419ec5a32 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/Job.java @@ -20,6 +20,7 @@ import cz.it4i.fiji.haas.JobManager.JobManager4Job; import cz.it4i.fiji.haas.JobManager.JobSynchronizableFile; import cz.it4i.fiji.haas_java_client.HaaSClient; import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile; +import cz.it4i.fiji.haas_java_client.HaaSFileTransfer; import cz.it4i.fiji.haas_java_client.JobInfo; import cz.it4i.fiji.haas_java_client.JobState; import cz.it4i.fiji.haas_java_client.ProgressNotifier; @@ -75,8 +76,9 @@ public class Job { public void uploadFiles(Iterable<UploadingFile> files, Progress notifier) { HaaSClient client = this.haasClientSupplier.get(); - - client.uploadFiles(jobId, files, new P_ProgressNotifierAdapter(notifier)); + try(HaaSFileTransfer transfer = client.startFileTransfer(getId(), new P_ProgressNotifierAdapter(notifier))){ + transfer.upload(files); + } } public void uploadFilesByName(Iterable<String> files, Progress notifier) { @@ -117,7 +119,9 @@ public class Job { } synchronized public void download(Predicate<String> predicate, Progress notifier) { - haasClientSupplier.get().download(getId(), jobDir, predicate, new P_ProgressNotifierAdapter(notifier)); + try (HaaSFileTransfer fileTransfer = haasClientSupplier.get().startFileTransfer(jobId, new P_ProgressNotifierAdapter(notifier))) { + fileTransfer.download(fileTransfer.getChangedFiles().stream().filter(predicate).collect(Collectors.toList()), jobDir); + } } public JobState getState() { diff --git a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java index 0319efc9237dc0ff9eaca2f6c6f7132771baf7db..118fc52193e8a90bbec09c7cb0b955802672969d 100644 --- a/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java +++ b/haas-imagej-client/src/main/java/cz/it4i/fiji/haas/JobManager.java @@ -7,7 +7,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java index a08e3a1626fbed22b99a2ba35378354ddbb5a3e9..d9441379187153ea6b8ecfc165fe2625922d212b 100644 --- a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java @@ -15,7 +15,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -48,7 +47,60 @@ import cz.it4i.fiji.scpclient.ScpClient; public class HaaSClient { - private static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas_java_client.HaaSClient.class); + public static List<Long> getSizes(List<String> asList, ScpClient scpClient, ProgressNotifier notifier) + throws JSchException, IOException { + List<Long> result = new LinkedList<>(); + + String item; + notifier.addItem(item = "Checking sizes"); + for (String lfile : asList) { + result.add(scpClient.size(lfile)); + notifier.setItemCount(result.size(), asList.size()); + } + notifier.itemDone(item); + return result; + } + + public static UploadingFile getUploadingFile(Path file) { + return new UploadingFile() { + + @Override + public InputStream getInputStream() { + try { + return Files.newInputStream(file); + } catch (IOException e) { + log.error(e.getMessage(), e); + throw new RuntimeException(e); + } + } + + @Override + public String getName() { + return file.getFileName().toString(); + } + + @Override + public long getLength() { + try { + return Files.size(file); + } catch (IOException e) { + log.error(e.getMessage(), e); + throw new RuntimeException(e); + } + } + + @Override + public long getLastTime() { + try { + return Files.getLastModifiedTime(file).toMillis(); + } catch (IOException e) { + log.error(e.getMessage(), e); + throw new RuntimeException(e); + } + } + + }; + } public interface UploadingFile { InputStream getInputStream(); @@ -93,6 +145,8 @@ public class HaaSClient { } } + private static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas_java_client.HaaSClient.class); + private String sessionID; private UserAndLimitationManagementWsSoap userAndLimitationManagement; @@ -109,7 +163,7 @@ public class HaaSClient { private String projectId; - private ProgressNotifier dummyNotifier = new ProgressNotifier() { + public static ProgressNotifier DUMMY_NOTIFIER = new ProgressNotifier() { @Override public void setTitle(String title) { @@ -164,7 +218,7 @@ public class HaaSClient { public long start(Iterable<Path> files, String name, Collection<Entry<String, String>> templateParameters) { Iterable<UploadingFile> uploadingFiles = StreamSupport.stream(files.spliterator(), false) .map(HaaSClient::getUploadingFile).collect(Collectors.toList()); - return start(uploadingFiles, name, templateParameters, dummyNotifier); + return start(uploadingFiles, name, templateParameters, DUMMY_NOTIFIER); } public long start(Iterable<UploadingFile> files, String name, Collection<Entry<String, String>> templateParameters, @@ -172,11 +226,13 @@ public class HaaSClient { notifier.setTitle("Starting job"); try { long jobId = doCreateJob(name, templateParameters); - doUploadFiles(jobId, files, notifier); + try (HaaSFileTransfer transfer = startFileTransfer(jobId, notifier)) { + transfer.upload(files); + } doSubmitJob(jobId); return jobId; - } catch (ServiceException | JSchException | IOException e) { - throw new RuntimeException(e); + } catch (ServiceException | IOException e) { + throw new HaaSClientException(e); } } @@ -189,19 +245,21 @@ public class HaaSClient { } } - public void uploadFiles(long jobId, Iterable<UploadingFile> files, ProgressNotifier notifier) { + public HaaSFileTransfer startFileTransfer(long jobId, ProgressNotifier notifier) { try { - doUploadFiles(jobId, files, notifier); - } catch (ServiceException | JSchException | IOException e) { - throw new RuntimeException(e); + FileTransferMethodExt ft = getFileTransfer().getFileTransferMethod(jobId, getSessionID()); + return new HaaSFileTransferImp(ft, getSessionID(), jobId, getFileTransfer(), getScpClient(ft), notifier); + } catch (RemoteException | ServiceException | UnsupportedEncodingException | JSchException e) { + throw new HaaSClientException(e); } + } public void submitJob(long jobId) { try { doSubmitJob(jobId); } catch (RemoteException | ServiceException e) { - throw new RuntimeException(e); + throw new HaaSClientException(e); } } @@ -247,126 +305,11 @@ public class HaaSClient { throw new HaaSClientException(e); } } - - public void download(long jobId, Path workDir) { - download(jobId, workDir, dummyNotifier); - } - - public void download(long jobId, Path workDirectory, final ProgressNotifier notifier) { - download(jobId, workDirectory, val -> true, notifier); - } - - public void download(long jobId, Path workDirectory, Predicate<String> function, final ProgressNotifier notifier) { - try { - notifier.setTitle("Downloading"); - FileTransferMethodExt ft = getFileTransfer().getFileTransferMethod(jobId, getSessionID()); - try (ScpClient scpClient = getScpClient(ft)) { - String[] filesArray = getFileTransfer().listChangedFilesForJob(jobId, getSessionID()); - Collection<String> files = Arrays.asList(filesArray).stream().filter(function) - .collect(Collectors.toList()); - List<Long> fileSizes = getSizes( - files.stream().map(filename -> "'" + ft.getSharedBasepath() + "/" + filename + "'").collect( - Collectors.toList()), - scpClient, new P_ProgressNotifierDecorator4Size(notifier)); - final long totalFileSize = fileSizes.stream().mapToLong(i -> i.longValue()).sum(); - TransferFileProgressForHaaSClient progress = new TransferFileProgressForHaaSClient(totalFileSize, - notifier); - int idx = 0; - for (String fileName : files) { - fileName = fileName.replaceFirst("/", ""); - Path rFile = workDirectory.resolve(fileName); - if (!Files.exists(rFile.getParent())) { - Files.createDirectories(rFile.getParent()); - } - String fileToDownload = "'" + ft.getSharedBasepath() + "/" + fileName + "'"; - String item; - progress.addItem(item = fileName); - progress.startNewFile(fileSizes.get(idx)); - scpClient.download(fileToDownload, rFile, progress); - progress.itemDone(item); - idx++; - } - } - getFileTransfer().endFileTransfer(jobId, ft, getSessionID()); - notifier.done(); - - } catch (IOException | JSchException | ServiceException e) { - throw new HaaSClientException(e); - } - } - - public static UploadingFile getUploadingFile(Path file) { - return new UploadingFile() { - - @Override - public InputStream getInputStream() { - try { - return Files.newInputStream(file); - } catch (IOException e) { - log.error(e.getMessage(), e); - throw new RuntimeException(e); - } - } - - @Override - public String getName() { - return file.getFileName().toString(); - } - - @Override - public long getLength() { - try { - return Files.size(file); - } catch (IOException e) { - log.error(e.getMessage(), e); - throw new RuntimeException(e); - } - } - - @Override - public long getLastTime() { - try { - return Files.getLastModifiedTime(file).toMillis(); - } catch (IOException e) { - log.error(e.getMessage(), e); - throw new RuntimeException(e); - } - } - - }; - } - + private void doSubmitJob(long jobId) throws RemoteException, ServiceException { getJobManagement().submitJob(jobId, getSessionID()); } - private void doUploadFiles(long jobId, Iterable<UploadingFile> files, ProgressNotifier notifier) - throws RemoteException, ServiceException, JSchException, IOException, UnsupportedEncodingException { - FileTransferMethodExt fileTransfer = getFileTransfer().getFileTransferMethod(jobId, getSessionID()); - List<Long> totalSizes = StreamSupport.stream(files.spliterator(), false).map(f -> f.getLength()) - .collect(Collectors.toList()); - long totalSize = totalSizes.stream().mapToLong(l -> l.longValue()).sum(); - TransferFileProgressForHaaSClient progress = new TransferFileProgressForHaaSClient(totalSize, notifier); - try (ScpClient scpClient = getScpClient(fileTransfer)) { - int index = 0; - for (UploadingFile file : files) { - String item; - progress.startNewFile(totalSizes.get(index)); - notifier.addItem(item = "Uploading file: " + file.getName()); - String destFile = "'" + fileTransfer.getSharedBasepath() + "/" + file.getName() + "'"; - try (InputStream is = file.getInputStream()) { - boolean result = scpClient.upload(is, destFile, file.getLength(), file.getLastTime(), progress); - notifier.itemDone(item); - if (!result) { - throw new HaaSClientException("Uploading of " + file + " to " + destFile + " failed"); - } - } - index++; - } - } - getFileTransfer().endFileTransfer(jobId, fileTransfer, getSessionID()); - } - private long doCreateJob(String name, Collection<Entry<String, String>> templateParameters) throws RemoteException, ServiceException { TaskSpecificationExt taskSpec = createTaskSpecification(name, templateId, templateParameters); @@ -375,20 +318,6 @@ public class HaaSClient { return job.getId(); } - private List<Long> getSizes(List<String> asList, ScpClient scpClient, ProgressNotifier notifier) - throws JSchException, IOException { - List<Long> result = new LinkedList<>(); - - String item; - notifier.addItem(item = "Checking sizes"); - for (String lfile : asList) { - result.add(scpClient.size(lfile)); - notifier.setItemCount(result.size(), asList.size()); - } - notifier.itemDone(item); - return result; - } - private ScpClient getScpClient(FileTransferMethodExt fileTransfer) throws UnsupportedEncodingException, JSchException { byte[] pvtKey = fileTransfer.getCredentials().getPrivateKey().getBytes("UTF-8"); @@ -476,7 +405,7 @@ public class HaaSClient { return sessionID; } - private class P_ProgressNotifierDecorator4Size extends P_ProgressNotifierDecorator { + public static class P_ProgressNotifierDecorator4Size extends P_ProgressNotifierDecorator { private static final int SIZE_RATIO = 20; @@ -492,7 +421,7 @@ public class HaaSClient { } } - private class P_ProgressNotifierDecorator implements ProgressNotifier { + public static class P_ProgressNotifierDecorator implements ProgressNotifier { private ProgressNotifier notifier; public P_ProgressNotifierDecorator(ProgressNotifier notifier) { diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransfer.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransfer.java new file mode 100644 index 0000000000000000000000000000000000000000..b1162d38548ae486fba514a33474a325824591f0 --- /dev/null +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransfer.java @@ -0,0 +1,21 @@ +package cz.it4i.fiji.haas_java_client; + +import java.io.Closeable; +import java.nio.file.Path; +import java.util.Collection; + +import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile; + +public interface HaaSFileTransfer extends Closeable { + + Collection<String> getChangedFiles(); + + @Override + void close(); + + void upload(Iterable<UploadingFile> files); + + void download(Iterable<String> files, Path workDIrectory); + + +} diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransferImp.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransferImp.java new file mode 100644 index 0000000000000000000000000000000000000000..3416d7b2ca995f82fed5a4bd21e8cc7ff300ac6f --- /dev/null +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSFileTransferImp.java @@ -0,0 +1,115 @@ +package cz.it4i.fiji.haas_java_client; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.rmi.RemoteException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import com.jcraft.jsch.JSchException; + +import cz.it4i.fiji.haas_java_client.HaaSClient.P_ProgressNotifierDecorator4Size; +import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile; +import cz.it4i.fiji.haas_java_client.proxy.FileTransferMethodExt; +import cz.it4i.fiji.haas_java_client.proxy.FileTransferWsSoap; +import cz.it4i.fiji.scpclient.ScpClient; + +class HaaSFileTransferImp implements HaaSFileTransfer { + + private FileTransferMethodExt ft; + private ScpClient scpClient; + private FileTransferWsSoap fileTransfer; + private String sessionId; + private long jobId; + private ProgressNotifier notifier; + + + public HaaSFileTransferImp(FileTransferMethodExt ft, String sessionId, long jobId, FileTransferWsSoap fileTransfer, + ScpClient scpClient, ProgressNotifier notifier) { + super(); + this.ft = ft; + this.scpClient = scpClient; + this.fileTransfer = fileTransfer; + this.sessionId = sessionId; + this.jobId = jobId; + this.notifier = notifier; + } + + @Override + public Collection<String> getChangedFiles() { + try { + return Arrays.asList(fileTransfer.listChangedFilesForJob(jobId, sessionId)); + } catch (RemoteException e) { + throw new HaaSClientException(e); + } + } + + @Override + public void close() { + scpClient.close(); + try { + fileTransfer.endFileTransfer(jobId, ft, sessionId); + } catch (RemoteException e) { + throw new HaaSClientException(e); + } + + } + + @Override + public void upload(Iterable<UploadingFile> files) { + List<Long> totalSizes = StreamSupport.stream(files.spliterator(), false).map(f -> f.getLength()) + .collect(Collectors.toList()); + long totalSize = totalSizes.stream().mapToLong(l -> l.longValue()).sum(); + TransferFileProgressForHaaSClient progress = new TransferFileProgressForHaaSClient(totalSize, notifier); + int index = 0; + for (UploadingFile file : files) { + String item; + progress.startNewFile(totalSizes.get(index)); + notifier.addItem(item = "Uploading file: " + file.getName()); + String destFile = "'" + ft.getSharedBasepath() + "/" + file.getName() + "'"; + try (InputStream is = file.getInputStream()) { + boolean result = scpClient.upload(is, destFile, file.getLength(), file.getLastTime(), progress); + notifier.itemDone(item); + if (!result) { + throw new HaaSClientException("Uploading of " + file + " to " + destFile + " failed"); + } + } catch (JSchException | IOException e) { + throw new HaaSClientException(); + } + index++; + } + + } + + @Override + public void download(Iterable<String> files, Path workDirectory) { + List<Long> fileSizes; + try { + fileSizes = HaaSClient.getSizes(StreamSupport.stream(files.spliterator(), false) + .map(filename -> "'" + ft.getSharedBasepath() + "/" + filename + "'").collect(Collectors.toList()), + scpClient, new P_ProgressNotifierDecorator4Size(notifier)); + + final long totalFileSize = fileSizes.stream().mapToLong(i -> i.longValue()).sum(); + TransferFileProgressForHaaSClient progress = new TransferFileProgressForHaaSClient(totalFileSize, notifier); + int idx = 0; + for (String fileName : files) { + fileName = fileName.replaceFirst("/", ""); + Path rFile = workDirectory.resolve(fileName); + String fileToDownload = "'" + ft.getSharedBasepath() + "/" + fileName + "'"; + String item; + progress.addItem(item = fileName); + progress.startNewFile(fileSizes.get(idx)); + scpClient.download(fileToDownload, rFile, progress); + progress.itemDone(item); + idx++; + } + } catch (JSchException | IOException e) { + throw new HaaSClientException(e); + } + } + +} diff --git a/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestHaaSJavaClient.java b/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestHaaSJavaClient.java index af80f57de44850b944bc151a5a83836c3da2c140..3ccdb08030c7b12404653728cd3b2c370511ab26 100644 --- a/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestHaaSJavaClient.java +++ b/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestHaaSJavaClient.java @@ -16,17 +16,17 @@ import org.slf4j.LoggerFactory; import cz.it4i.fiji.haas_java_client.HaaSClient.SynchronizableFiles; import cz.it4i.fiji.haas_java_client.proxy.JobFileContentExt; - public class TestHaaSJavaClient { private static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas_java_client.TestHaaSJavaClient.class); - + public static void main(String[] args) throws ServiceException, IOException { Map<String, String> params = new HashMap<>(); params.put("inputParam", "someStringParam"); Path baseDir = Paths.get("/home/koz01/aaa"); - HaaSClient client = new HaaSClient(TestingConstants.getSettings(1l, 600,7l, "DD-17-31")); - long jobId = client.start(Arrays.asList(Paths.get("/home/koz01/aaa/vecmath.jar")), "TestOutRedirect", params.entrySet()); + HaaSClient client = new HaaSClient(TestingConstants.getSettings(1l, 600, 7l, "DD-17-31")); + long jobId = client.start(Arrays.asList(Paths.get("/home/koz01/aaa/vecmath.jar")), "TestOutRedirect", + params.entrySet()); Path workDir = baseDir.resolve("" + jobId); if (!Files.isDirectory(workDir)) { Files.createDirectories(workDir); @@ -45,7 +45,9 @@ public class TestHaaSJavaClient { } client.downloadPartsOfJobFiles(jobId, taskFileOffset).forEach(jfc -> showJFC(jfc)); if (info.getState() == JobState.Finished) { - client.download(jobId,workDir); + try (HaaSFileTransfer fileTransfer = client.startFileTransfer(jobId, HaaSClient.DUMMY_NOTIFIER)) { + fileTransfer.download(fileTransfer.getChangedFiles(), workDir); + } } log.info("JobId :" + jobId + ", state" + info.getState()); } while (info.getState() != JobState.Canceled && info.getState() != JobState.Failed diff --git a/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestHaaSJavaClientWithSPIM.java b/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestHaaSJavaClientWithSPIM.java index 97be3daae31237f5deaf0eb2726c0ac4ef9ac6a3..08bbda0a57493c9c6cf56b3bf96fadaf34b38027 100644 --- a/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestHaaSJavaClientWithSPIM.java +++ b/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestHaaSJavaClientWithSPIM.java @@ -45,7 +45,10 @@ public class TestHaaSJavaClientWithSPIM { } client.downloadPartsOfJobFiles(jobId, taskFileOffset).forEach(jfc -> showJFC(jfc)); if (info.getState() == JobState.Finished) { - client.download(jobId, workDir); + try (HaaSFileTransfer fileTransfer = client.startFileTransfer(jobId, HaaSClient.DUMMY_NOTIFIER)) { + fileTransfer.download(fileTransfer.getChangedFiles(), workDir); + } + } log.info("JobId :" + jobId + ", state" + info.getState()); firstIteration = false; diff --git a/java-scpclient/src/main/java/cz/it4i/fiji/scpclient/ScpClient.java b/java-scpclient/src/main/java/cz/it4i/fiji/scpclient/ScpClient.java index 007cece06730c6faffb1eb74eb431ab2c4934370..092a4a48a7a148c82d81d46b5acfc817cbb52431 100644 --- a/java-scpclient/src/main/java/cz/it4i/fiji/scpclient/ScpClient.java +++ b/java-scpclient/src/main/java/cz/it4i/fiji/scpclient/ScpClient.java @@ -67,6 +67,9 @@ public class ScpClient implements Closeable { } public boolean download(String lfile, Path rfile, TransferFileProgress progress) throws JSchException, IOException { + if (!Files.exists(rfile.getParent())) { + Files.createDirectories(rfile.getParent()); + } try (OutputStream os = Files.newOutputStream(rfile)) { return download(lfile, os, progress); }