Skip to content
Snippets Groups Projects
Commit 92aff223 authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

allow startJob in phases

parent 29dfeff8
No related branches found
No related tags found
No related merge requests found
package cz.it4i.fiji.haas;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Arrays;
import cz.it4i.fiji.haas.JobManager.JobInfo;
......@@ -17,9 +14,7 @@ public class BenchmarkJobManager {
private Progress progress;
private Path workDirectory;
private static final String CONFIG_FOR_MODIFICATION = "not-set-config.yaml";
private static final String CONFIG_MODIFIED = "config.yaml";
public BenchmarkJobManager(Path workDirectory, Progress progress) throws IOException {
this.workDirectory = workDirectory;
jobManager = new JobManager(workDirectory, TestingConstants.getSettings(3, 6));
......@@ -33,12 +28,6 @@ public class BenchmarkJobManager {
if (jobInfo.getState() != JobState.Running) {
throw new IllegalStateException("start of job: " + jobInfo + " failed");
}
ByteArrayOutputStream os = new ByteArrayOutputStream();
jobInfo.downloadFileData(CONFIG_FOR_MODIFICATION, os);
byte[] data = updateConfigFile(os.toByteArray());
jobInfo.uploadFile(new ByteArrayInputStream(data), CONFIG_MODIFIED, data.length,
Instant.now().getEpochSecond());
return jobInfo;
}
......
......@@ -113,10 +113,6 @@ public class Job {
saveJobinfo();
}
private JobInfo updateJobInfo() {
return jobInfo = haasClientSupplier.get().obtainJobInfo(getJobId());
}
public void download() {
download(dummy);
}
......@@ -133,15 +129,7 @@ public class Job {
log.error(e);
}
}
public void downloadFileData(String fileName, OutputStream bos) {
haasClientSupplier.get().downloadFileData(jobId, fileName, bos);
}
public void uploadFileData(InputStream inputStream, String fileName, int length, long lastModification) {
haasClientSupplier.get().uploadFileData(jobId, inputStream, fileName, length, lastModification);
}
public JobState getState() {
return state;
}
......@@ -179,6 +167,10 @@ public class Job {
}
}
private JobInfo updateJobInfo() {
return jobInfo = haasClientSupplier.get().obtainJobInfo(getJobId());
}
private static boolean isValidPath(Path path) {
try {
......
package cz.it4i.fiji.haas;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Calendar;
......@@ -81,8 +79,6 @@ public class JobManager {
public JobState getState(long id) {
return getHaasClient().obtainJobInfo(id).getState();
}
private HaaSClient getHaasClient() {
if (haasClient == null) {
......@@ -127,13 +123,12 @@ public class JobManager {
job.download(progress);
fireValueChangedEvent();
}
public void waitForStart() {
// TODO Auto-generated method stub
}
public void updateInfo() throws IOException {
job.updateState();
}
......@@ -142,25 +137,11 @@ public class JobManager {
public JobInfo getValue() {
return this;
}
public void downloadFileData(String fileName, OutputStream bos) {
job.downloadFileData(fileName, bos);
}
public void uploadFile(InputStream inputStream, String name, int length,
long lastModification) {
job.uploadFileData(inputStream, name, length, lastModification);
}
private String getStringFromTimeSafely(Calendar time) {
return time != null ? time.getTime().toString() : "N/A";
}
}
}
......@@ -2,7 +2,6 @@ package cz.it4i.fiji.haas_java_client;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.file.Files;
import java.nio.file.Path;
......@@ -167,55 +166,43 @@ public class HaaSClient {
return start(files, name, templateParameters, dummyNotifier);
}
private long start(Iterable<Path> files, String name, Collection<Entry<String, String>> templateParameters,
public long start(Stream<UploadingFile> files, String name, Collection<Entry<String, String>> templateParameters,
ProgressNotifier notifier) {
Stream<UploadingFile> fileStream = StreamSupport.stream(files.spliterator(), false)
.map(file -> getUploadingFile(file));
return start(fileStream, name, templateParameters, notifier);
notifier.setTitle("Starting job");
try {
long jobId = doCreateJob(name, templateParameters, notifier);
doUploadFiles(jobId, files, notifier);
doSubmitJob(jobId, notifier);
return jobId;
} catch (ServiceException | JSchException | IOException e) {
throw new RuntimeException(e);
}
}
public long start(Stream<UploadingFile> files, String name, Collection<Entry<String, String>> templateParameters,
public long createJob(String name, Collection<Entry<String, String>> templateParameters,
ProgressNotifier notifier) {
notifier.setTitle("Starting job");
try {
TaskSpecificationExt taskSpec = createTaskSpecification(name, templateId, templateParameters);
JobSpecificationExt jobSpecification = createJobSpecification(name, Arrays.asList(taskSpec));
String jobItem;
SubmittedJobInfoExt job = getJobManagement().createJob(jobSpecification, getSessionID());
notifier.addItem(jobItem = String.format("Created job: %d\n", job.getId()));
FileTransferMethodExt fileTransfer = getFileTransfer().getFileTransferMethod(job.getId(), getSessionID());
List<Long> totalSizes = StreamSupport.stream(files.spliterator(), false).map(f -> f.getLength())
.collect(Collectors.toList());
long totalSize = totalSizes.stream().mapToLong(l -> l.longValue()).sum();
TransferFileProgressForHaaSClient progress = new TransferFileProgressForHaaSClient(totalSize, notifier);
try (ScpClient scpClient = getScpClient(fileTransfer)) {
int index = 0;
for (UploadingFile file : (Iterable<UploadingFile>) files::iterator) {
String item;
progress.startNewFile(totalSizes.get(index));
notifier.addItem(item = "Uploading file: " + file.getName());
String destFile = "'" + fileTransfer.getSharedBasepath() + "/" + file.getName() + "'";
try (InputStream is = file.getInputStream()) {
boolean result = scpClient.upload(is, destFile, file.getLength(), file.getLastTime(), progress);
notifier.itemDone(item);
if (!result) {
throw new HaaSClientException("Uploading of " + file + " to " + destFile + " failed");
}
}
index++;
}
}
getFileTransfer().endFileTransfer(job.getId(), fileTransfer, getSessionID());
// submit job
job = getJobManagement().submitJob((long) job.getId(), getSessionID());
notifier.itemDone(jobItem);
notifier.done();
return job.getId();
return doCreateJob(name, templateParameters, notifier);
} catch (RemoteException | ServiceException e) {
throw new RuntimeException(e);
}
}
public void uploadFiles(long jobId, Stream<UploadingFile> files, ProgressNotifier notifier) {
try {
doUploadFiles(jobId, files, notifier);
} catch (ServiceException | JSchException | IOException e) {
throw new RuntimeException(e);
}
}
public void submitJob(long jobId, ProgressNotifier notifier) {
try {
doSubmitJob(jobId, notifier);
} catch (RemoteException | ServiceException e) {
throw new RuntimeException(e);
}
}
public JobInfo obtainJobInfo(long jobId) {
......@@ -307,32 +294,6 @@ public class HaaSClient {
}
}
public void downloadFileData(long jobId, String fileName, OutputStream os) {
try {
FileTransferMethodExt ft = getFileTransfer().getFileTransferMethod(jobId, getSessionID());
try (ScpClient scpClient = getScpClient(ft)) {
scpClient.download(fileName, os, new TransferFileProgressForHaaSClient(0, dummyNotifier));
getFileTransfer().endFileTransfer(jobId, ft, getSessionID());
}
} catch (IOException | JSchException | ServiceException e) {
throw new HaaSClientException(e);
}
}
public void uploadFileData(Long jobId, InputStream inputStream, String fileName, long length,
long lastModification) {
try {
FileTransferMethodExt ft = getFileTransfer().getFileTransferMethod(jobId, getSessionID());
try (ScpClient scpClient = getScpClient(ft)) {
scpClient.upload(inputStream, fileName, length, lastModification,
new TransferFileProgressForHaaSClient(0, dummyNotifier));
getFileTransfer().endFileTransfer(jobId, ft, getSessionID());
}
} catch (IOException | JSchException | ServiceException e) {
throw new HaaSClientException(e);
}
}
public static UploadingFile getUploadingFile(Path file) {
return new UploadingFile() {
......@@ -374,6 +335,59 @@ public class HaaSClient {
};
}
private void doSubmitJob(long jobId, ProgressNotifier notifier) throws RemoteException, ServiceException {
getJobManagement().submitJob(jobId, getSessionID());
notifier.itemDone(getNotificationItem4JobId(jobId));
notifier.done();
}
private void doUploadFiles(long jobId, Stream<UploadingFile> files, ProgressNotifier notifier)
throws RemoteException, ServiceException, JSchException, IOException, UnsupportedEncodingException {
FileTransferMethodExt fileTransfer = getFileTransfer().getFileTransferMethod(jobId, getSessionID());
List<Long> totalSizes = StreamSupport.stream(files.spliterator(), false).map(f -> f.getLength())
.collect(Collectors.toList());
long totalSize = totalSizes.stream().mapToLong(l -> l.longValue()).sum();
TransferFileProgressForHaaSClient progress = new TransferFileProgressForHaaSClient(totalSize, notifier);
try (ScpClient scpClient = getScpClient(fileTransfer)) {
int index = 0;
for (UploadingFile file : (Iterable<UploadingFile>) files::iterator) {
String item;
progress.startNewFile(totalSizes.get(index));
notifier.addItem(item = "Uploading file: " + file.getName());
String destFile = "'" + fileTransfer.getSharedBasepath() + "/" + file.getName() + "'";
try (InputStream is = file.getInputStream()) {
boolean result = scpClient.upload(is, destFile, file.getLength(), file.getLastTime(), progress);
notifier.itemDone(item);
if (!result) {
throw new HaaSClientException("Uploading of " + file + " to " + destFile + " failed");
}
}
index++;
}
}
getFileTransfer().endFileTransfer(jobId, fileTransfer, getSessionID());
}
private long doCreateJob(String name, Collection<Entry<String, String>> templateParameters,
ProgressNotifier notifier) throws RemoteException, ServiceException {
TaskSpecificationExt taskSpec = createTaskSpecification(name, templateId, templateParameters);
JobSpecificationExt jobSpecification = createJobSpecification(name, Arrays.asList(taskSpec));
SubmittedJobInfoExt job = getJobManagement().createJob(jobSpecification, getSessionID());
notifier.addItem(getNotificationItem4JobId(job.getId()));
return job.getId();
}
private String getNotificationItem4JobId(long jobId) {
return String.format("Created job: %d\n", jobId);
}
private long start(Iterable<Path> files, String name, Collection<Entry<String, String>> templateParameters,
ProgressNotifier notifier) {
Stream<UploadingFile> fileStream = StreamSupport.stream(files.spliterator(), false)
.map(file -> getUploadingFile(file));
return start(fileStream, name, templateParameters, notifier);
}
private List<Long> getSizes(List<String> asList, ScpClient scpClient, ProgressNotifier notifier)
throws JSchException, IOException {
List<Long> result = new LinkedList<>();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment