Skip to content
Snippets Groups Projects
Commit beacc8a4 authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

test haas with spim

parent 1dc258d7
Branches
Tags
No related merge requests found
......@@ -38,11 +38,11 @@ import cz.it4i.fiji.haas_java_client.proxy.UserAndLimitationManagementWsSoap;
import cz.it4i.fiji.scpclient.ScpClient;
public class HaaSClient {
static public class SynchronizableFiles {
private Collection<TaskFileOffsetExt> files = new LinkedList<>();
public void addFile(long taskId, SynchronizableFileType type, long offset) {
TaskFileOffsetExt off = new TaskFileOffsetExt();
off.setFileType(getType(type));
......@@ -50,26 +50,26 @@ public class HaaSClient {
off.setOffset(offset);
files.add(off);
}
private Collection<TaskFileOffsetExt> getFiles() {
return files;
}
private SynchronizableFilesExt getType(SynchronizableFileType type) {
switch(type) {
case LogFile:
return SynchronizableFilesExt.LogFile;
case ProgressFile:
return SynchronizableFilesExt.ProgressFile;
case StandardErrorFile:
return SynchronizableFilesExt.StandardErrorFile;
case StandardOutputFile:
return SynchronizableFilesExt.StandardOutputFile;
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
switch (type) {
case LogFile:
return SynchronizableFilesExt.LogFile;
case ProgressFile:
return SynchronizableFilesExt.ProgressFile;
case StandardErrorFile:
return SynchronizableFilesExt.StandardErrorFile;
case StandardOutputFile:
return SynchronizableFilesExt.StandardOutputFile;
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}
}
}
private interface Constants {
......@@ -89,10 +89,18 @@ public class HaaSClient {
private FileTransferWsSoap fileTransfer;
private Integer timeOut;
private Long templateId;
private Long clusterNodeType;
private String projectId;
final static private Map<JobStateExt, JobState> WS_STATE2STATE;
static {
Map<JobStateExt, JobState> map= new HashMap<JobStateExt, JobState>();
Map<JobStateExt, JobState> map = new HashMap<JobStateExt, JobState>();
map.put(JobStateExt.Canceled, JobState.Canceled);
map.put(JobStateExt.Configuring, JobState.Configuring);
map.put(JobStateExt.Failed, JobState.Failed);
......@@ -103,13 +111,16 @@ public class HaaSClient {
WS_STATE2STATE = Collections.unmodifiableMap(map);
}
public HaaSClient(Path workDirectory) {
public HaaSClient(Path workDirectory, Long templateId, Integer timeOut,Long clusterNodeType, String projectId) {
super();
this.workDirectory = workDirectory;
this.templateId = templateId;
this.timeOut = timeOut;
this.clusterNodeType = clusterNodeType;
this.projectId = projectId;
}
public long start(Iterable<Path> files, String name, long templateId,
Collection<Entry<String, String>> templateParameters) {
public long start(Iterable<Path> files, String name, Collection<Entry<String, String>> templateParameters) {
TaskSpecificationExt taskSpec = createTaskSpecification(name, templateId, templateParameters);
JobSpecificationExt jobSpecification = createJobSpecification(name, Arrays.asList(taskSpec));
......@@ -176,7 +187,7 @@ public class HaaSClient {
try (ScpClient scpClient = getScpClient(ft)) {
for (String fileName : getFileTransfer().listChangedFilesForJob(jobId, getSessionID())) {
fileName=fileName.replaceAll("/", "");
fileName = fileName.replaceAll("/", "");
Path rFile = workDirectory.resolve(fileName);
scpClient.download(ft.getSharedBasepath() + "//" + fileName, rFile);
}
......@@ -199,15 +210,15 @@ public class HaaSClient {
testJob.setMinCores(1);
testJob.setMaxCores(1);
testJob.setPriority(JobPriorityExt.Average);
testJob.setProject("ExpTests");
testJob.setWaitingLimit(600);
testJob.setWalltimeLimit(600);
testJob.setProject(projectId);
testJob.setWaitingLimit(null);
testJob.setWalltimeLimit(timeOut);
testJob.setNotificationEmail(Constants.EMAIL);
testJob.setPhoneNumber(Constants.PHONE);
testJob.setNotifyOnAbort(false);
testJob.setNotifyOnFinish(false);
testJob.setNotifyOnStart(false);
testJob.setClusterNodeTypeId(7l);
testJob.setClusterNodeTypeId(clusterNodeType);
testJob.setEnvironmentVariables(new EnvironmentVariableExt[0]);
testJob.setTasks(tasks.stream().toArray(TaskSpecificationExt[]::new));
return testJob;
......@@ -220,7 +231,7 @@ public class HaaSClient {
testTask.setName(name);
testTask.setMinCores(1);
testTask.setMaxCores(1);
testTask.setWalltimeLimit(600);
testTask.setWalltimeLimit(timeOut);
testTask.setRequiredNodes(null);
testTask.setIsExclusive(false);
testTask.setIsRerunnable(false);
......
......@@ -17,8 +17,8 @@ public class TestHaaSJavaClient {
public static void main(String[] args) throws RemoteException, ServiceException {
Map<String, String> params = new HashMap<>();
params.put("inputParam", "someStringParam");
HaaSClient client = new HaaSClient(Paths.get("/home/koz01/aaa"));
long jobId = client.start(Arrays.asList(Paths.get("/home/koz01/aaa/vecmath.jar")), "TestOutRedirect", 1, params.entrySet());
HaaSClient client = new HaaSClient(Paths.get("/home/koz01/aaa"), 1l,600, 7l,"DD-17-31");
long jobId = client.start(Arrays.asList(Paths.get("/home/koz01/aaa/vecmath.jar")), "TestOutRedirect", params.entrySet());
JobInfo info;
do {
try {
......
package cz.it4i.fiji.haas_java_client;
import java.nio.file.Paths;
import java.rmi.RemoteException;
import java.util.Collections;
import javax.xml.rpc.ServiceException;
import cz.it4i.fiji.haas_java_client.HaaSClient.SynchronizableFiles;
import cz.it4i.fiji.haas_java_client.proxy.JobFileContentExt;
public class TestHaaSJavaClientWithSPIM {
public static void main(String[] args) throws RemoteException, ServiceException {
HaaSClient client = new HaaSClient(Paths.get("/home/koz01/Work/vyzkumnik/fiji/work/aaa"), 2l, 9600, 6l,
"DD-17-31");
long jobId = 36;// client.start(Collections.emptyList(), "TestOutRedirect",
// Collections.emptyList());
JobInfo info;
boolean firstIteration = true;
do {
if (!firstIteration) {
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
info = client.obtainJobInfo(jobId);
HaaSClient.SynchronizableFiles taskFileOffset = new HaaSClient.SynchronizableFiles();
for (Long id : info.getTasks()) {
addOffsetFilesForTask(id, taskFileOffset);
}
client.downloadPartsOfJobFiles(jobId, taskFileOffset).forEach(jfc -> showJFC(jfc));
if (info.getState() == JobState.Finished) {
client.download(jobId);
}
System.out.println("JobId :" + jobId + ", state" + info.getState());
firstIteration = false;
} while (info.getState() != JobState.Canceled && info.getState() != JobState.Failed
&& info.getState() != JobState.Finished);
}
private static void addOffsetFilesForTask(Long taskId, SynchronizableFiles files) {
files.addFile(taskId, SynchronizableFileType.ProgressFile, 0);
files.addFile(taskId, SynchronizableFileType.StandardErrorFile, 0);
files.addFile(taskId, SynchronizableFileType.StandardOutputFile, 0);
files.addFile(taskId, SynchronizableFileType.LogFile, 0);
}
private static void showJFC(JobFileContentExt file) {
System.out.println("File: " + file.getFileType() + ", " + file.getRelativePath());
System.out.println("TaskInfoId: " + file.getSubmittedTaskInfoId());
System.out.println("Offset: " + file.getOffset());
System.out.println("Content: " + file.getContent());
}
}
......@@ -74,7 +74,7 @@ public class ScpClient implements Closeable {
channel.connect();
byte[] buf = new byte[1024];
byte[] buf = new byte[getBufferSize()];
// send '\0'
buf[0] = 0;
......@@ -194,7 +194,7 @@ public class ScpClient implements Closeable {
if (checkAck(in) != 0) {
return false;
}
byte[] buf = new byte[1024];
byte[] buf = new byte[getBufferSize()];
// send a content of lfile
try (InputStream fis = Files.newInputStream(file)) {
while (true) {
......@@ -219,6 +219,10 @@ public class ScpClient implements Closeable {
return true;
}
private int getBufferSize() {
return 1024*1024;
}
private Session connectionSession() throws JSchException {
if (session == null) {
session = jsch.getSession(username, hostName);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment