Skip to content
Snippets Groups Projects
Commit c4d6d78b authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

Testing HaaS API for benchamrk

parent bb34acc8
No related branches found
No related tags found
No related merge requests found
......@@ -17,7 +17,7 @@ public class BenchmarkJobManager {
private Map<JobInfo, Path> uploadedFiles = new HashMap<>();
public BenchmarkJobManager(Path workDirectory, Progress progress) throws IOException {
jobManager = new JobManager(workDirectory, TestingConstants.getSettings(3, 6));
jobManager = new JobManager(workDirectory, TestingConstants.getSettings(2, 6));
this.progress = progress;
}
......@@ -29,7 +29,7 @@ public class BenchmarkJobManager {
}
public void startJob(JobInfo jobInfo) {
jobInfo.uploadFiles(Arrays.asList(HaaSClient.getUploadingFile(uploadedFiles.get(jobInfo))).stream());
jobInfo.uploadFiles(() -> Arrays.asList(HaaSClient.getUploadingFile(uploadedFiles.get(jobInfo))).stream());
jobInfo.submit();
}
......
......@@ -94,7 +94,7 @@ public class Job {
loadJobInfo();
}
public void uploadFiles(Stream<UploadingFile> files) {
public void uploadFiles(Supplier<Stream<UploadingFile>> files) {
HaaSClient client = this.haasClientSupplier.get();
client.uploadFiles(jobId, files, notifier);
}
......
......@@ -7,6 +7,7 @@ import java.util.Calendar;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
......@@ -47,7 +48,7 @@ public class JobManager {
public JobInfo createJob(Progress progress) throws IOException {
Job job;
jobs.add(job = new Job(settings.getJobName(),workDirectory, this::getHaasClient, progress));
jobs.add(job = new Job(settings.getJobName(), workDirectory, this::getHaasClient, progress));
return new JobInfo(job) {
@Override
public JobState getState() {
......@@ -60,14 +61,13 @@ public class JobManager {
}
};
}
public JobInfo startJob(Stream<UploadingFile> files, Progress progress) throws IOException {
public JobInfo startJob(Supplier<Stream<UploadingFile>> files, Progress progress) throws IOException {
JobInfo result = createJob(progress);
result.uploadFiles(files);
result.submit();
return result;
}
public Iterable<JobInfo> getJobsNeedingDownload() {
return () -> jobs.stream().filter(j -> j.needsDownload()).map(j -> new JobInfo(j)).iterator();
......@@ -103,16 +103,14 @@ public class JobManager {
this.job = job;
}
public void uploadFiles(Stream<UploadingFile> files) {
public void uploadFiles(Supplier<Stream<UploadingFile>> files) {
job.uploadFiles(files);
}
public void submit() {
job.submit();
}
public Long getId() {
return job.getJobId();
}
......@@ -160,13 +158,10 @@ public class JobManager {
return time != null ? time.getTime().toString() : "N/A";
}
public Path storeDataInWorkdirectory(UploadingFile uploadingFile) throws IOException {
return job.storeDataInWorkdirectory(uploadingFile);
}
}
}
......@@ -51,7 +51,13 @@ public class RunWithHaaS implements Command {
public void run() {
try {
jobManager = new JobManager(getWorkingDirectoryPath(), TestingConstants.getSettings());
jobManager.startJob(getContent(dataDirectory).stream().map(HaaSClient::getUploadingFile), ModalDialogs.doModal(new ProgressDialog(getFrame())));
jobManager.startJob(() -> {
try {
return getContent(dataDirectory).stream().map(HaaSClient::getUploadingFile);
} catch (IOException e) {
throw new RuntimeException(e);
}
}, ModalDialogs.doModal(new ProgressDialog(getFrame())));
} catch (IOException e) {
log.error(e);
}
......
......@@ -19,7 +19,7 @@ public class RunBenchmark {
if(!Files.exists(p)) {
Files.createDirectory(p);
}
BenchmarkJobManager benchmarkJobManager = new BenchmarkJobManager(Paths.get("/tmp"), new P_Progress());
BenchmarkJobManager benchmarkJobManager = new BenchmarkJobManager(p, new P_Progress());
JobInfo ji = benchmarkJobManager.createJob();
log.info("job: " + ji + " created.");
}
......
......@@ -16,6 +16,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
......@@ -166,8 +167,8 @@ public class HaaSClient {
return start(files, name, templateParameters, dummyNotifier);
}
public long start(Stream<UploadingFile> files, String name, Collection<Entry<String, String>> templateParameters,
ProgressNotifier notifier) {
public long start(Supplier<Stream<UploadingFile>> files, String name,
Collection<Entry<String, String>> templateParameters, ProgressNotifier notifier) {
notifier.setTitle("Starting job");
try {
long jobId = doCreateJob(name, templateParameters, notifier);
......@@ -189,7 +190,7 @@ public class HaaSClient {
}
}
public void uploadFiles(long jobId, Stream<UploadingFile> files, ProgressNotifier notifier) {
public void uploadFiles(long jobId, Supplier<Stream<UploadingFile>> files, ProgressNotifier notifier) {
try {
doUploadFiles(jobId, files, notifier);
} catch (ServiceException | JSchException | IOException e) {
......@@ -341,16 +342,16 @@ public class HaaSClient {
notifier.done();
}
private void doUploadFiles(long jobId, Stream<UploadingFile> files, ProgressNotifier notifier)
private void doUploadFiles(long jobId, Supplier<Stream<UploadingFile>> files, ProgressNotifier notifier)
throws RemoteException, ServiceException, JSchException, IOException, UnsupportedEncodingException {
FileTransferMethodExt fileTransfer = getFileTransfer().getFileTransferMethod(jobId, getSessionID());
List<Long> totalSizes = StreamSupport.stream(files.spliterator(), false).map(f -> f.getLength())
List<Long> totalSizes = StreamSupport.stream(files.get().spliterator(), false).map(f -> f.getLength())
.collect(Collectors.toList());
long totalSize = totalSizes.stream().mapToLong(l -> l.longValue()).sum();
TransferFileProgressForHaaSClient progress = new TransferFileProgressForHaaSClient(totalSize, notifier);
try (ScpClient scpClient = getScpClient(fileTransfer)) {
int index = 0;
for (UploadingFile file : (Iterable<UploadingFile>) files::iterator) {
for (UploadingFile file : (Iterable<UploadingFile>) files.get()::iterator) {
String item;
progress.startNewFile(totalSizes.get(index));
notifier.addItem(item = "Uploading file: " + file.getName());
......@@ -383,7 +384,7 @@ public class HaaSClient {
private long start(Iterable<Path> files, String name, Collection<Entry<String, String>> templateParameters,
ProgressNotifier notifier) {
Stream<UploadingFile> fileStream = StreamSupport.stream(files.spliterator(), false)
Supplier<Stream<UploadingFile>> fileStream = () -> StreamSupport.stream(files.spliterator(), false)
.map(file -> getUploadingFile(file));
return start(fileStream, name, templateParameters, notifier);
}
......
......@@ -21,7 +21,7 @@ public class TestHaaSJavaClient {
params.put("inputParam", "someStringParam");
Path baseDir = Paths.get("/home/koz01/aaa");
HaaSClient client = new HaaSClient(TestingConstants.getSettings(1l, 600,7l, "DD-17-31"));
long jobId = client.start(Arrays.asList(Paths.get("/home/koz01/.inputrc")), "TestOutRedirect", params.entrySet());
long jobId = client.start(Arrays.asList(Paths.get("/home/koz01/aaa/vecmath.jar")), "TestOutRedirect", params.entrySet());
Path workDir = baseDir.resolve("" + jobId);
if (!Files.isDirectory(workDir)) {
Files.createDirectories(workDir);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment