Skip to content
Snippets Groups Projects
Commit 00d3b6d0 authored by Unknown's avatar Unknown
Browse files

Support for benchmark

parent 8d8c6b82
No related branches found
No related tags found
No related merge requests found
...@@ -85,7 +85,16 @@ ...@@ -85,7 +85,16 @@
<artifactId>haas-java-client</artifactId> <artifactId>haas-java-client</artifactId>
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/net.imagej/imagej-launcher --> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-jdk14 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>
package cz.it4i.fiji.haas;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Collections;
import cz.it4i.fiji.haas.JobManager.JobInfo;
import cz.it4i.fiji.haas_java_client.JobState;
import net.imagej.updater.util.Progress;
public class BenchmarkJobManager {
private JobManager jobManager;
private Progress progress;
private Path workDirectory;
private static final String CONFIG_FOR_MODIFICATION = "not-set-config.yaml";
private static final String CONFIG_MODIFIED = "config.yaml";
public BenchmarkJobManager(Path workDirectory, Progress progress) throws IOException {
this.workDirectory = workDirectory;
jobManager = new JobManager(workDirectory, TestingConstants.getSettings(3, 6));
this.progress = progress;
}
public JobInfo startJob() throws IOException {
JobInfo jobInfo = jobManager.startJob( Collections.emptyList(), null);
jobInfo.waitForStart();
if(jobInfo.getState() != JobState.Running) {
throw new IllegalStateException("start of job: " + jobInfo + " failed");
}
ByteArrayOutputStream os = new ByteArrayOutputStream();
jobInfo.downloadFileData(CONFIG_FOR_MODIFICATION,os);
byte[]data = updateConfigFile(os.toByteArray());
jobInfo.uploadFile (new ByteArrayInputStream(data),CONFIG_MODIFIED, data.length ,Instant.now().getEpochSecond());
return jobInfo;
}
private byte[] updateConfigFile(byte[] data) throws IOException {
return data;
}
}
...@@ -24,7 +24,6 @@ import net.imagej.updater.util.Progress; ...@@ -24,7 +24,6 @@ import net.imagej.updater.util.Progress;
public class Job { public class Job {
private static final String JOB_HAS_DATA_TO_DOWNLOAD_PROPERTY = "job.needDownload"; private static final String JOB_HAS_DATA_TO_DOWNLOAD_PROPERTY = "job.needDownload";
public static boolean isJobPath(Path p) { public static boolean isJobPath(Path p) {
...@@ -46,37 +45,39 @@ public class Job { ...@@ -46,37 +45,39 @@ public class Job {
private Long jobId; private Long jobId;
final private Progress dummy = new Progress() { final private Progress dummy = new Progress() {
@Override @Override
public void setTitle(String title) { public void setTitle(String title) {
} }
@Override @Override
public void setItemCount(int count, int total) { public void setItemCount(int count, int total) {
} }
@Override @Override
public void setCount(int count, int total) { public void setCount(int count, int total) {
} }
@Override @Override
public void itemDone(Object item) { public void itemDone(Object item) {
} }
@Override @Override
public void done() { public void done() {
} }
@Override @Override
public void addItem(Object item) { public void addItem(Object item) {
} }
}; };
public Job(Path path, Collection<Path> files, Supplier<HaaSClient> haasClientSupplier, Progress progress) throws IOException { public Job(Path basePath, Collection<Path> files, Supplier<HaaSClient> haasClientSupplier, Progress progress)
throws IOException {
this(haasClientSupplier); this(haasClientSupplier);
HaaSClient client = this.haasClientSupplier.get(); HaaSClient client = this.haasClientSupplier.get();
long id = client.start(files, "TestOutRedirect", Collections.emptyList(), new P_ProgressNotifierAdapter(progress)); long id = client.start(files, "TestOutRedirect", Collections.emptyList(),
jobDir = path.resolve("" + id); new P_ProgressNotifierAdapter(progress));
jobDir = basePath.resolve("" + id);
Files.createDirectory(jobDir); Files.createDirectory(jobDir);
updateState(); updateState();
} }
...@@ -132,6 +133,11 @@ public class Job { ...@@ -132,6 +133,11 @@ public class Job {
} }
} }
public void downloadFileData(String fileName, OutputStream bos) {
haasClientSupplier.get().downloadFileData(jobId,fileName, bos);
}
public JobState getState() { public JobState getState() {
return state; return state;
} }
...@@ -182,8 +188,6 @@ public class Job { ...@@ -182,8 +188,6 @@ public class Job {
private static long getJobId(Path path) { private static long getJobId(Path path) {
return Long.parseLong(path.getFileName().toString()); return Long.parseLong(path.getFileName().toString());
} }
private class P_ProgressNotifierAdapter implements ProgressNotifier { private class P_ProgressNotifierAdapter implements ProgressNotifier {
private Progress progress; private Progress progress;
...@@ -216,7 +220,9 @@ public class Job { ...@@ -216,7 +220,9 @@ public class Job {
public void done() { public void done() {
progress.done(); progress.done();
} }
} }
} }
package cz.it4i.fiji.haas; package cz.it4i.fiji.haas;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Calendar; import java.util.Calendar;
...@@ -9,6 +11,9 @@ import java.util.Iterator; ...@@ -9,6 +11,9 @@ import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cz.it4i.fiji.haas_java_client.HaaSClient; import cz.it4i.fiji.haas_java_client.HaaSClient;
import cz.it4i.fiji.haas_java_client.JobState; import cz.it4i.fiji.haas_java_client.JobState;
import cz.it4i.fiji.haas_java_client.Settings; import cz.it4i.fiji.haas_java_client.Settings;
...@@ -17,6 +22,8 @@ import net.imagej.updater.util.Progress; ...@@ -17,6 +22,8 @@ import net.imagej.updater.util.Progress;
public class JobManager { public class JobManager {
private static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas.JobManager.class);
private Path workDirectory; private Path workDirectory;
private Collection<Job> jobs = new LinkedList<>(); private Collection<Job> jobs = new LinkedList<>();
...@@ -25,7 +32,6 @@ public class JobManager { ...@@ -25,7 +32,6 @@ public class JobManager {
private Settings settings; private Settings settings;
public JobManager(Path workDirectory, Settings settings) throws IOException { public JobManager(Path workDirectory, Settings settings) throws IOException {
this.workDirectory = workDirectory; this.workDirectory = workDirectory;
this.settings = settings; this.settings = settings;
...@@ -39,8 +45,20 @@ public class JobManager { ...@@ -39,8 +45,20 @@ public class JobManager {
} }
public void startJob(Path path, Collection<Path> files, Progress progress) throws IOException { public JobInfo startJob(Collection<Path> files, Progress progress) throws IOException {
jobs.add(new Job(path, files, this::getHaasClient, progress)); Job job;
jobs.add(job = new Job(workDirectory, files, this::getHaasClient, progress));
return new JobInfo(job) {
@Override
public JobState getState() {
try {
job.updateState();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
return super.getState();
}
};
} }
public Iterable<JobInfo> getJobsNeedingDownload() { public Iterable<JobInfo> getJobsNeedingDownload() {
...@@ -58,6 +76,12 @@ public class JobManager { ...@@ -58,6 +76,12 @@ public class JobManager {
} }
public JobState getState(long id) {
return getHaasClient().obtainJobInfo(id).getState();
}
private HaaSClient getHaasClient() { private HaaSClient getHaasClient() {
if (haasClient == null) { if (haasClient == null) {
haasClient = new HaaSClient(settings); haasClient = new HaaSClient(settings);
...@@ -65,7 +89,6 @@ public class JobManager { ...@@ -65,7 +89,6 @@ public class JobManager {
return haasClient; return haasClient;
} }
public static class JobInfo extends ObservableValueBase<JobInfo> { public static class JobInfo extends ObservableValueBase<JobInfo> {
private Job job; private Job job;
...@@ -102,7 +125,13 @@ public class JobManager { ...@@ -102,7 +125,13 @@ public class JobManager {
job.download(progress); job.download(progress);
fireValueChangedEvent(); fireValueChangedEvent();
} }
public void waitForStart() {
// TODO Auto-generated method stub
}
public void updateInfo() throws IOException { public void updateInfo() throws IOException {
job.updateState(); job.updateState();
} }
...@@ -111,10 +140,25 @@ public class JobManager { ...@@ -111,10 +140,25 @@ public class JobManager {
public JobInfo getValue() { public JobInfo getValue() {
return this; return this;
} }
public void downloadFileData(String fileName, OutputStream bos) {
job.downloadFileData(fileName, bos);
}
public void uploadFile(ByteArrayInputStream byteArrayInputStream, String configModified, int length,
long epochSecond) {
//TODO
}
private String getStringFromTimeSafely(Calendar time) { private String getStringFromTimeSafely(Calendar time) {
return time != null ? time.getTime().toString() : "N/A"; return time != null ? time.getTime().toString() : "N/A";
} }
} }
} }
...@@ -49,9 +49,8 @@ public class RunWithHaaS implements Command { ...@@ -49,9 +49,8 @@ public class RunWithHaaS implements Command {
@Override @Override
public void run() { public void run() {
try { try {
jobManager = new JobManager(getWorkingDirectoryPath(),TestingConstants.getSettings()); jobManager = new JobManager(getWorkingDirectoryPath(), TestingConstants.getSettings());
jobManager.startJob(getWorkingDirectoryPath(), getContent(dataDirectory), jobManager.startJob(getContent(dataDirectory), ModalDialogs.doModal(new ProgressDialog(getFrame())));
ModalDialogs.doModal(new ProgressDialog(getFrame())));
} catch (IOException e) { } catch (IOException e) {
log.error(e); log.error(e);
} }
......
package cz.it4i.fiji.haas_java_client; package cz.it4i.fiji.haas_java_client;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
...@@ -14,7 +15,9 @@ import java.util.LinkedList; ...@@ -14,7 +15,9 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.xml.rpc.ServiceException; import javax.xml.rpc.ServiceException;
...@@ -210,7 +213,7 @@ public class HaaSClient { ...@@ -210,7 +213,7 @@ public class HaaSClient {
public java.util.Calendar getEndTime() { public java.util.Calendar getEndTime() {
return info.getEndTime(); return info.getEndTime();
}; };
public Calendar getCreationTime() { public Calendar getCreationTime() {
return info.getCreationTime(); return info.getCreationTime();
}; };
...@@ -235,20 +238,25 @@ public class HaaSClient { ...@@ -235,20 +238,25 @@ public class HaaSClient {
} }
public void download(long jobId, Path workDirectory, final ProgressNotifier notifier) { public void download(long jobId, Path workDirectory, final ProgressNotifier notifier) {
download(jobId, workDirectory, notifier, val -> true);
}
public void download(long jobId, Path workDirectory, final ProgressNotifier notifier, Predicate<String> function) {
try { try {
notifier.setTitle("Downloading"); notifier.setTitle("Downloading");
FileTransferMethodExt ft = getFileTransfer().getFileTransferMethod(jobId, getSessionID()); FileTransferMethodExt ft = getFileTransfer().getFileTransferMethod(jobId, getSessionID());
try (ScpClient scpClient = getScpClient(ft)) { try (ScpClient scpClient = getScpClient(ft)) {
String[] files = getFileTransfer().listChangedFilesForJob(jobId, getSessionID()); String[] filesArray = getFileTransfer().listChangedFilesForJob(jobId, getSessionID());
Stream<String> files = Arrays.asList(filesArray).stream().filter(function);
List<Long> fileSizes = getSizes( List<Long> fileSizes = getSizes(
Arrays.asList(files).stream() files.map(filename -> "'" + ft.getSharedBasepath() + "/" + filename + "'").collect(
.map(filename -> "'" + ft.getSharedBasepath() + "/" + filename + "'") Collectors.toList()),
.collect(Collectors.toList()),
scpClient, new P_ProgressNotifierDecorator4Size(notifier)); scpClient, new P_ProgressNotifierDecorator4Size(notifier));
final long totalFileSize = fileSizes.stream().mapToLong(i -> i.longValue()).sum(); final long totalFileSize = fileSizes.stream().mapToLong(i -> i.longValue()).sum();
TransferFileProgressForHaaSClient progress =new TransferFileProgressForHaaSClient(totalFileSize, notifier); TransferFileProgressForHaaSClient progress = new TransferFileProgressForHaaSClient(totalFileSize,
notifier);
int idx = 0; int idx = 0;
for (String fileName : files) { for (String fileName : (Iterable<String>) files::iterator) {
fileName = fileName.replaceFirst("/", ""); fileName = fileName.replaceFirst("/", "");
Path rFile = workDirectory.resolve(fileName); Path rFile = workDirectory.resolve(fileName);
if (!Files.exists(rFile.getParent())) { if (!Files.exists(rFile.getParent())) {
...@@ -271,6 +279,17 @@ public class HaaSClient { ...@@ -271,6 +279,17 @@ public class HaaSClient {
} }
} }
public void downloadFileData(long jobId, String fileName, OutputStream os) {
try {
FileTransferMethodExt ft = getFileTransfer().getFileTransferMethod(jobId, getSessionID());
try (ScpClient scpClient = getScpClient(ft)) {
scpClient.download(fileName, os, new TransferFileProgressForHaaSClient(0, dummyNotifier));
}
} catch (IOException | JSchException | ServiceException e) {
throw new HaaSClientException(e);
}
}
private List<Long> getSizes(List<String> asList, ScpClient scpClient, ProgressNotifier notifier) private List<Long> getSizes(List<String> asList, ScpClient scpClient, ProgressNotifier notifier)
throws JSchException, IOException { throws JSchException, IOException {
List<Long> result = new LinkedList<>(); List<Long> result = new LinkedList<>();
...@@ -428,4 +447,5 @@ public class HaaSClient { ...@@ -428,4 +447,5 @@ public class HaaSClient {
notifier.done(); notifier.done();
} }
} }
} }
...@@ -66,6 +66,13 @@ public class ScpClient implements Closeable { ...@@ -66,6 +66,13 @@ public class ScpClient implements Closeable {
} }
public boolean download(String lfile, Path rfile, TransferFileProgress progress) throws JSchException, IOException { public boolean download(String lfile, Path rfile, TransferFileProgress progress) throws JSchException, IOException {
try(OutputStream os = Files.newOutputStream(rfile)) {
return download(lfile, os, progress);
}
}
public boolean download(String lfile, OutputStream os, TransferFileProgress progress)
throws JSchException, IOException {
Session session = connectionSession(); Session session = connectionSession();
// exec 'scp -f rfile' remotely // exec 'scp -f rfile' remotely
...@@ -125,25 +132,24 @@ public class ScpClient implements Closeable { ...@@ -125,25 +132,24 @@ public class ScpClient implements Closeable {
out.flush(); out.flush();
// read a content of lfile // read a content of lfile
try (OutputStream fos = Files.newOutputStream(rfile)) { int foo;
int foo; while (true) {
while (true) { if (buf.length < filesize)
if (buf.length < filesize) foo = buf.length;
foo = buf.length; else
else foo = (int) filesize;
foo = (int) filesize; foo = in.read(buf, 0, foo);
foo = in.read(buf, 0, foo); if (foo < 0) {
if (foo < 0) { // error
// error break;
break;
}
fos.write(buf, 0, foo);
progress.dataTransfered(foo);
filesize -= foo;
if (filesize == 0L)
break;
} }
os.write(buf, 0, foo);
progress.dataTransfered(foo);
filesize -= foo;
if (filesize == 0L)
break;
} }
if (checkAck(in) != 0) { if (checkAck(in) != 0) {
return false; return false;
} }
...@@ -167,11 +173,16 @@ public class ScpClient implements Closeable { ...@@ -167,11 +173,16 @@ public class ScpClient implements Closeable {
} }
public boolean upload(Path file, String rfile, TransferFileProgress progress) throws JSchException, IOException { public boolean upload(Path file, String rfile, TransferFileProgress progress) throws JSchException, IOException {
try (InputStream is = Files.newInputStream(file)) {
return upload(is, file.getFileName().toString(), file.toFile().length(), file.toFile().lastModified(),
rfile, progress);
}
}
public boolean upload(InputStream is, String fileName, long length, long lastModified, String rfile,
TransferFileProgress progress) throws JSchException, IOException {
Session session = connectionSession(); Session session = connectionSession();
boolean ptimestamp = true; boolean ptimestamp = true;
// exec 'scp -t rfile' remotely // exec 'scp -t rfile' remotely
String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + rfile; String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + rfile;
Channel channel = session.openChannel("exec"); Channel channel = session.openChannel("exec");
...@@ -184,10 +195,10 @@ public class ScpClient implements Closeable { ...@@ -184,10 +195,10 @@ public class ScpClient implements Closeable {
} }
if (ptimestamp) { if (ptimestamp) {
command = "T " + (file.toFile().lastModified() / 1000) + " 0"; command = "T " + (lastModified / 1000) + " 0";
// The access time should be sent here, // The access time should be sent here,
// but it is not accessible with JavaAPI ;-< // but it is not accessible with JavaAPI ;-<
command += (" " + (file.toFile().lastModified() / 1000) + " 0\n"); command += (" " + (lastModified / 1000) + " 0\n");
out.write(command.getBytes()); out.write(command.getBytes());
out.flush(); out.flush();
if (checkAck(in) != 0) { if (checkAck(in) != 0) {
...@@ -196,9 +207,9 @@ public class ScpClient implements Closeable { ...@@ -196,9 +207,9 @@ public class ScpClient implements Closeable {
} }
// send "C0644 filesize filename", where filename should not include '/' // send "C0644 filesize filename", where filename should not include '/'
long filesize = file.toFile().length(); long filesize = length;
command = "C0644 " + filesize + " "; command = "C0644 " + filesize + " ";
command += file.getFileName().toString(); command += fileName;
command += "\n"; command += "\n";
out.write(command.getBytes()); out.write(command.getBytes());
out.flush(); out.flush();
...@@ -207,14 +218,12 @@ public class ScpClient implements Closeable { ...@@ -207,14 +218,12 @@ public class ScpClient implements Closeable {
} }
byte[] buf = new byte[getBufferSize()]; byte[] buf = new byte[getBufferSize()];
// send a content of lfile // send a content of lfile
try (InputStream fis = Files.newInputStream(file)) { while (true) {
while (true) { int len = is.read(buf, 0, buf.length);
int len = fis.read(buf, 0, buf.length); if (len <= 0)
if (len <= 0) break;
break; out.write(buf, 0, len); // out.flush();
out.write(buf, 0, len); // out.flush(); progress.dataTransfered(len);
progress.dataTransfered(len);
}
} }
// send '\0' // send '\0'
buf[0] = 0; buf[0] = 0;
...@@ -233,35 +242,35 @@ public class ScpClient implements Closeable { ...@@ -233,35 +242,35 @@ public class ScpClient implements Closeable {
public long size(String lfile) throws JSchException, IOException { public long size(String lfile) throws JSchException, IOException {
Session session = connectionSession(); Session session = connectionSession();
// exec 'scp -f rfile' remotely // exec 'scp -f rfile' remotely
String command = "scp -f " + lfile; String command = "scp -f " + lfile;
Channel channel = session.openChannel("exec"); Channel channel = session.openChannel("exec");
try { try {
((ChannelExec) channel).setCommand(command); ((ChannelExec) channel).setCommand(command);
// get I/O streams for remote scp // get I/O streams for remote scp
try (OutputStream out = channel.getOutputStream(); InputStream in = channel.getInputStream()) { try (OutputStream out = channel.getOutputStream(); InputStream in = channel.getInputStream()) {
channel.connect(); channel.connect();
byte[] buf = new byte[getBufferSize()]; byte[] buf = new byte[getBufferSize()];
// send '\0' // send '\0'
buf[0] = 0; buf[0] = 0;
out.write(buf, 0, 1); out.write(buf, 0, 1);
out.flush(); out.flush();
while (true) { while (true) {
int c = checkAck(in); int c = checkAck(in);
if (c != 'C') { if (c != 'C') {
break; break;
} }
// read '0644 ' // read '0644 '
in.read(buf, 0, 5); in.read(buf, 0, 5);
long filesize = 0L; long filesize = 0L;
while (true) { while (true) {
if (in.read(buf, 0, 1) < 0) { if (in.read(buf, 0, 1) < 0) {
...@@ -273,10 +282,10 @@ public class ScpClient implements Closeable { ...@@ -273,10 +282,10 @@ public class ScpClient implements Closeable {
filesize = filesize * 10L + (long) (buf[0] - '0'); filesize = filesize * 10L + (long) (buf[0] - '0');
} }
return filesize; return filesize;
} }
} }
} finally { } finally {
channel.disconnect(); channel.disconnect();
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment