Skip to content
Snippets Groups Projects
Commit e07fe64d authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

refactoring

parent 1d953b39
No related branches found
No related tags found
No related merge requests found
......@@ -20,6 +20,7 @@ import cz.it4i.fiji.haas.JobManager.JobManager4Job;
import cz.it4i.fiji.haas.JobManager.JobSynchronizableFile;
import cz.it4i.fiji.haas_java_client.HaaSClient;
import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile;
import cz.it4i.fiji.haas_java_client.HaaSFileTransfer;
import cz.it4i.fiji.haas_java_client.JobInfo;
import cz.it4i.fiji.haas_java_client.JobState;
import cz.it4i.fiji.haas_java_client.ProgressNotifier;
......@@ -75,8 +76,9 @@ public class Job {
public void uploadFiles(Iterable<UploadingFile> files, Progress notifier) {
HaaSClient client = this.haasClientSupplier.get();
client.uploadFiles(jobId, files, new P_ProgressNotifierAdapter(notifier));
try(HaaSFileTransfer transfer = client.startFileTransfer(getId(), new P_ProgressNotifierAdapter(notifier))){
transfer.upload(files);
}
}
public void uploadFilesByName(Iterable<String> files, Progress notifier) {
......@@ -117,7 +119,9 @@ public class Job {
}
synchronized public void download(Predicate<String> predicate, Progress notifier) {
haasClientSupplier.get().download(getId(), jobDir, predicate, new P_ProgressNotifierAdapter(notifier));
try (HaaSFileTransfer fileTransfer = haasClientSupplier.get().startFileTransfer(jobId, new P_ProgressNotifierAdapter(notifier))) {
fileTransfer.download(fileTransfer.getChangedFiles().stream().filter(predicate).collect(Collectors.toList()), jobDir);
}
}
public JobState getState() {
......
......@@ -7,7 +7,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -15,7 +15,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
......@@ -48,7 +47,60 @@ import cz.it4i.fiji.scpclient.ScpClient;
public class HaaSClient {
private static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas_java_client.HaaSClient.class);
public static List<Long> getSizes(List<String> asList, ScpClient scpClient, ProgressNotifier notifier)
throws JSchException, IOException {
List<Long> result = new LinkedList<>();
String item;
notifier.addItem(item = "Checking sizes");
for (String lfile : asList) {
result.add(scpClient.size(lfile));
notifier.setItemCount(result.size(), asList.size());
}
notifier.itemDone(item);
return result;
}
public static UploadingFile getUploadingFile(Path file) {
return new UploadingFile() {
@Override
public InputStream getInputStream() {
try {
return Files.newInputStream(file);
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
@Override
public String getName() {
return file.getFileName().toString();
}
@Override
public long getLength() {
try {
return Files.size(file);
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
@Override
public long getLastTime() {
try {
return Files.getLastModifiedTime(file).toMillis();
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
};
}
public interface UploadingFile {
InputStream getInputStream();
......@@ -93,6 +145,8 @@ public class HaaSClient {
}
}
private static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas_java_client.HaaSClient.class);
private String sessionID;
private UserAndLimitationManagementWsSoap userAndLimitationManagement;
......@@ -109,7 +163,7 @@ public class HaaSClient {
private String projectId;
private ProgressNotifier dummyNotifier = new ProgressNotifier() {
public static ProgressNotifier DUMMY_NOTIFIER = new ProgressNotifier() {
@Override
public void setTitle(String title) {
......@@ -164,7 +218,7 @@ public class HaaSClient {
public long start(Iterable<Path> files, String name, Collection<Entry<String, String>> templateParameters) {
Iterable<UploadingFile> uploadingFiles = StreamSupport.stream(files.spliterator(), false)
.map(HaaSClient::getUploadingFile).collect(Collectors.toList());
return start(uploadingFiles, name, templateParameters, dummyNotifier);
return start(uploadingFiles, name, templateParameters, DUMMY_NOTIFIER);
}
public long start(Iterable<UploadingFile> files, String name, Collection<Entry<String, String>> templateParameters,
......@@ -172,11 +226,13 @@ public class HaaSClient {
notifier.setTitle("Starting job");
try {
long jobId = doCreateJob(name, templateParameters);
doUploadFiles(jobId, files, notifier);
try (HaaSFileTransfer transfer = startFileTransfer(jobId, notifier)) {
transfer.upload(files);
}
doSubmitJob(jobId);
return jobId;
} catch (ServiceException | JSchException | IOException e) {
throw new RuntimeException(e);
} catch (ServiceException | IOException e) {
throw new HaaSClientException(e);
}
}
......@@ -189,19 +245,21 @@ public class HaaSClient {
}
}
public void uploadFiles(long jobId, Iterable<UploadingFile> files, ProgressNotifier notifier) {
public HaaSFileTransfer startFileTransfer(long jobId, ProgressNotifier notifier) {
try {
doUploadFiles(jobId, files, notifier);
} catch (ServiceException | JSchException | IOException e) {
throw new RuntimeException(e);
FileTransferMethodExt ft = getFileTransfer().getFileTransferMethod(jobId, getSessionID());
return new HaaSFileTransferImp(ft, getSessionID(), jobId, getFileTransfer(), getScpClient(ft), notifier);
} catch (RemoteException | ServiceException | UnsupportedEncodingException | JSchException e) {
throw new HaaSClientException(e);
}
}
public void submitJob(long jobId) {
try {
doSubmitJob(jobId);
} catch (RemoteException | ServiceException e) {
throw new RuntimeException(e);
throw new HaaSClientException(e);
}
}
......@@ -247,126 +305,11 @@ public class HaaSClient {
throw new HaaSClientException(e);
}
}
public void download(long jobId, Path workDir) {
download(jobId, workDir, dummyNotifier);
}
public void download(long jobId, Path workDirectory, final ProgressNotifier notifier) {
download(jobId, workDirectory, val -> true, notifier);
}
public void download(long jobId, Path workDirectory, Predicate<String> function, final ProgressNotifier notifier) {
try {
notifier.setTitle("Downloading");
FileTransferMethodExt ft = getFileTransfer().getFileTransferMethod(jobId, getSessionID());
try (ScpClient scpClient = getScpClient(ft)) {
String[] filesArray = getFileTransfer().listChangedFilesForJob(jobId, getSessionID());
Collection<String> files = Arrays.asList(filesArray).stream().filter(function)
.collect(Collectors.toList());
List<Long> fileSizes = getSizes(
files.stream().map(filename -> "'" + ft.getSharedBasepath() + "/" + filename + "'").collect(
Collectors.toList()),
scpClient, new P_ProgressNotifierDecorator4Size(notifier));
final long totalFileSize = fileSizes.stream().mapToLong(i -> i.longValue()).sum();
TransferFileProgressForHaaSClient progress = new TransferFileProgressForHaaSClient(totalFileSize,
notifier);
int idx = 0;
for (String fileName : files) {
fileName = fileName.replaceFirst("/", "");
Path rFile = workDirectory.resolve(fileName);
if (!Files.exists(rFile.getParent())) {
Files.createDirectories(rFile.getParent());
}
String fileToDownload = "'" + ft.getSharedBasepath() + "/" + fileName + "'";
String item;
progress.addItem(item = fileName);
progress.startNewFile(fileSizes.get(idx));
scpClient.download(fileToDownload, rFile, progress);
progress.itemDone(item);
idx++;
}
}
getFileTransfer().endFileTransfer(jobId, ft, getSessionID());
notifier.done();
} catch (IOException | JSchException | ServiceException e) {
throw new HaaSClientException(e);
}
}
public static UploadingFile getUploadingFile(Path file) {
return new UploadingFile() {
@Override
public InputStream getInputStream() {
try {
return Files.newInputStream(file);
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
@Override
public String getName() {
return file.getFileName().toString();
}
@Override
public long getLength() {
try {
return Files.size(file);
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
@Override
public long getLastTime() {
try {
return Files.getLastModifiedTime(file).toMillis();
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
};
}
private void doSubmitJob(long jobId) throws RemoteException, ServiceException {
getJobManagement().submitJob(jobId, getSessionID());
}
private void doUploadFiles(long jobId, Iterable<UploadingFile> files, ProgressNotifier notifier)
throws RemoteException, ServiceException, JSchException, IOException, UnsupportedEncodingException {
FileTransferMethodExt fileTransfer = getFileTransfer().getFileTransferMethod(jobId, getSessionID());
List<Long> totalSizes = StreamSupport.stream(files.spliterator(), false).map(f -> f.getLength())
.collect(Collectors.toList());
long totalSize = totalSizes.stream().mapToLong(l -> l.longValue()).sum();
TransferFileProgressForHaaSClient progress = new TransferFileProgressForHaaSClient(totalSize, notifier);
try (ScpClient scpClient = getScpClient(fileTransfer)) {
int index = 0;
for (UploadingFile file : files) {
String item;
progress.startNewFile(totalSizes.get(index));
notifier.addItem(item = "Uploading file: " + file.getName());
String destFile = "'" + fileTransfer.getSharedBasepath() + "/" + file.getName() + "'";
try (InputStream is = file.getInputStream()) {
boolean result = scpClient.upload(is, destFile, file.getLength(), file.getLastTime(), progress);
notifier.itemDone(item);
if (!result) {
throw new HaaSClientException("Uploading of " + file + " to " + destFile + " failed");
}
}
index++;
}
}
getFileTransfer().endFileTransfer(jobId, fileTransfer, getSessionID());
}
private long doCreateJob(String name, Collection<Entry<String, String>> templateParameters)
throws RemoteException, ServiceException {
TaskSpecificationExt taskSpec = createTaskSpecification(name, templateId, templateParameters);
......@@ -375,20 +318,6 @@ public class HaaSClient {
return job.getId();
}
private List<Long> getSizes(List<String> asList, ScpClient scpClient, ProgressNotifier notifier)
throws JSchException, IOException {
List<Long> result = new LinkedList<>();
String item;
notifier.addItem(item = "Checking sizes");
for (String lfile : asList) {
result.add(scpClient.size(lfile));
notifier.setItemCount(result.size(), asList.size());
}
notifier.itemDone(item);
return result;
}
private ScpClient getScpClient(FileTransferMethodExt fileTransfer)
throws UnsupportedEncodingException, JSchException {
byte[] pvtKey = fileTransfer.getCredentials().getPrivateKey().getBytes("UTF-8");
......@@ -476,7 +405,7 @@ public class HaaSClient {
return sessionID;
}
private class P_ProgressNotifierDecorator4Size extends P_ProgressNotifierDecorator {
public static class P_ProgressNotifierDecorator4Size extends P_ProgressNotifierDecorator {
private static final int SIZE_RATIO = 20;
......@@ -492,7 +421,7 @@ public class HaaSClient {
}
}
private class P_ProgressNotifierDecorator implements ProgressNotifier {
public static class P_ProgressNotifierDecorator implements ProgressNotifier {
private ProgressNotifier notifier;
public P_ProgressNotifierDecorator(ProgressNotifier notifier) {
......
package cz.it4i.fiji.haas_java_client;
import java.io.Closeable;
import java.nio.file.Path;
import java.util.Collection;
import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile;
public interface HaaSFileTransfer extends Closeable {
Collection<String> getChangedFiles();
@Override
void close();
void upload(Iterable<UploadingFile> files);
void download(Iterable<String> files, Path workDIrectory);
}
package cz.it4i.fiji.haas_java_client;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.rmi.RemoteException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import com.jcraft.jsch.JSchException;
import cz.it4i.fiji.haas_java_client.HaaSClient.P_ProgressNotifierDecorator4Size;
import cz.it4i.fiji.haas_java_client.HaaSClient.UploadingFile;
import cz.it4i.fiji.haas_java_client.proxy.FileTransferMethodExt;
import cz.it4i.fiji.haas_java_client.proxy.FileTransferWsSoap;
import cz.it4i.fiji.scpclient.ScpClient;
class HaaSFileTransferImp implements HaaSFileTransfer {
private FileTransferMethodExt ft;
private ScpClient scpClient;
private FileTransferWsSoap fileTransfer;
private String sessionId;
private long jobId;
private ProgressNotifier notifier;
public HaaSFileTransferImp(FileTransferMethodExt ft, String sessionId, long jobId, FileTransferWsSoap fileTransfer,
ScpClient scpClient, ProgressNotifier notifier) {
super();
this.ft = ft;
this.scpClient = scpClient;
this.fileTransfer = fileTransfer;
this.sessionId = sessionId;
this.jobId = jobId;
this.notifier = notifier;
}
@Override
public Collection<String> getChangedFiles() {
try {
return Arrays.asList(fileTransfer.listChangedFilesForJob(jobId, sessionId));
} catch (RemoteException e) {
throw new HaaSClientException(e);
}
}
@Override
public void close() {
scpClient.close();
try {
fileTransfer.endFileTransfer(jobId, ft, sessionId);
} catch (RemoteException e) {
throw new HaaSClientException(e);
}
}
@Override
public void upload(Iterable<UploadingFile> files) {
List<Long> totalSizes = StreamSupport.stream(files.spliterator(), false).map(f -> f.getLength())
.collect(Collectors.toList());
long totalSize = totalSizes.stream().mapToLong(l -> l.longValue()).sum();
TransferFileProgressForHaaSClient progress = new TransferFileProgressForHaaSClient(totalSize, notifier);
int index = 0;
for (UploadingFile file : files) {
String item;
progress.startNewFile(totalSizes.get(index));
notifier.addItem(item = "Uploading file: " + file.getName());
String destFile = "'" + ft.getSharedBasepath() + "/" + file.getName() + "'";
try (InputStream is = file.getInputStream()) {
boolean result = scpClient.upload(is, destFile, file.getLength(), file.getLastTime(), progress);
notifier.itemDone(item);
if (!result) {
throw new HaaSClientException("Uploading of " + file + " to " + destFile + " failed");
}
} catch (JSchException | IOException e) {
throw new HaaSClientException();
}
index++;
}
}
@Override
public void download(Iterable<String> files, Path workDirectory) {
List<Long> fileSizes;
try {
fileSizes = HaaSClient.getSizes(StreamSupport.stream(files.spliterator(), false)
.map(filename -> "'" + ft.getSharedBasepath() + "/" + filename + "'").collect(Collectors.toList()),
scpClient, new P_ProgressNotifierDecorator4Size(notifier));
final long totalFileSize = fileSizes.stream().mapToLong(i -> i.longValue()).sum();
TransferFileProgressForHaaSClient progress = new TransferFileProgressForHaaSClient(totalFileSize, notifier);
int idx = 0;
for (String fileName : files) {
fileName = fileName.replaceFirst("/", "");
Path rFile = workDirectory.resolve(fileName);
String fileToDownload = "'" + ft.getSharedBasepath() + "/" + fileName + "'";
String item;
progress.addItem(item = fileName);
progress.startNewFile(fileSizes.get(idx));
scpClient.download(fileToDownload, rFile, progress);
progress.itemDone(item);
idx++;
}
} catch (JSchException | IOException e) {
throw new HaaSClientException(e);
}
}
}
......@@ -16,17 +16,17 @@ import org.slf4j.LoggerFactory;
import cz.it4i.fiji.haas_java_client.HaaSClient.SynchronizableFiles;
import cz.it4i.fiji.haas_java_client.proxy.JobFileContentExt;
public class TestHaaSJavaClient {
private static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas_java_client.TestHaaSJavaClient.class);
public static void main(String[] args) throws ServiceException, IOException {
Map<String, String> params = new HashMap<>();
params.put("inputParam", "someStringParam");
Path baseDir = Paths.get("/home/koz01/aaa");
HaaSClient client = new HaaSClient(TestingConstants.getSettings(1l, 600,7l, "DD-17-31"));
long jobId = client.start(Arrays.asList(Paths.get("/home/koz01/aaa/vecmath.jar")), "TestOutRedirect", params.entrySet());
HaaSClient client = new HaaSClient(TestingConstants.getSettings(1l, 600, 7l, "DD-17-31"));
long jobId = client.start(Arrays.asList(Paths.get("/home/koz01/aaa/vecmath.jar")), "TestOutRedirect",
params.entrySet());
Path workDir = baseDir.resolve("" + jobId);
if (!Files.isDirectory(workDir)) {
Files.createDirectories(workDir);
......@@ -45,7 +45,9 @@ public class TestHaaSJavaClient {
}
client.downloadPartsOfJobFiles(jobId, taskFileOffset).forEach(jfc -> showJFC(jfc));
if (info.getState() == JobState.Finished) {
client.download(jobId,workDir);
try (HaaSFileTransfer fileTransfer = client.startFileTransfer(jobId, HaaSClient.DUMMY_NOTIFIER)) {
fileTransfer.download(fileTransfer.getChangedFiles(), workDir);
}
}
log.info("JobId :" + jobId + ", state" + info.getState());
} while (info.getState() != JobState.Canceled && info.getState() != JobState.Failed
......
......@@ -45,7 +45,10 @@ public class TestHaaSJavaClientWithSPIM {
}
client.downloadPartsOfJobFiles(jobId, taskFileOffset).forEach(jfc -> showJFC(jfc));
if (info.getState() == JobState.Finished) {
client.download(jobId, workDir);
try (HaaSFileTransfer fileTransfer = client.startFileTransfer(jobId, HaaSClient.DUMMY_NOTIFIER)) {
fileTransfer.download(fileTransfer.getChangedFiles(), workDir);
}
}
log.info("JobId :" + jobId + ", state" + info.getState());
firstIteration = false;
......
......@@ -67,6 +67,9 @@ public class ScpClient implements Closeable {
}
public boolean download(String lfile, Path rfile, TransferFileProgress progress) throws JSchException, IOException {
if (!Files.exists(rfile.getParent())) {
Files.createDirectories(rfile.getParent());
}
try (OutputStream os = Files.newOutputStream(rfile)) {
return download(lfile, os, progress);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment