Skip to content
Snippets Groups Projects
Commit 41cb1af3 authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

code: clean up and add debug loging

parent 2d459203
No related branches found
No related tags found
No related merge requests found
package cz.it4i.fiji.haas_java_client;
import com.jcraft.jsch.JSchException;
......@@ -57,50 +58,49 @@ import cz.it4i.fiji.scpclient.TransferFileProgress;
public class HaaSClient {
public static final TransferFileProgress DUMMY_TRANSFER_FILE_PROGRESS = new TransferFileProgress() {
public static final TransferFileProgress DUMMY_TRANSFER_FILE_PROGRESS =
new TransferFileProgress()
{
@Override
public void dataTransfered(long bytesTransfered) {
public void dataTransfered(final long bytesTransfered) {
// TODO Auto-generated method stub
}
};
public static ProgressNotifier DUMMY_PROGRESS_NOTIFIER = new ProgressNotifier() {
public static ProgressNotifier DUMMY_PROGRESS_NOTIFIER =
new ProgressNotifier()
{
@Override
public void setTitle(String title) {
}
public void setTitle(final String title) {}
@Override
public void setItemCount(int count, int total) {
}
public void setItemCount(final int count, final int total) {}
@Override
public void setCount(int count, int total) {
}
public void setCount(final int count, final int total) {}
@Override
public void itemDone(Object item) {
}
public void itemDone(final Object item) {}
@Override
public void done() {
}
public void done() {}
@Override
public void addItem(Object item) {
}
public void addItem(final Object item) {}
};
public static UploadingFile getUploadingFile(Path file) {
public static UploadingFile getUploadingFile(final Path file) {
return new UploadingFile() {
@Override
public InputStream getInputStream() {
try {
return Files.newInputStream(file);
} catch (IOException e) {
}
catch (final IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
......@@ -115,7 +115,8 @@ public class HaaSClient {
public long getLength() {
try {
return Files.size(file);
} catch (IOException e) {
}
catch (final IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
......@@ -125,7 +126,8 @@ public class HaaSClient {
public long getLastTime() {
try {
return Files.getLastModifiedTime(file).toMillis();
} catch (IOException e) {
}
catch (final IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
......@@ -138,8 +140,10 @@ public class HaaSClient {
private final Collection<TaskFileOffsetExt> files = new LinkedList<>();
public void addFile(long taskId, SynchronizableFileType type, long offset) {
TaskFileOffsetExt off = new TaskFileOffsetExt();
public void addFile(final long taskId, final SynchronizableFileType type,
final long offset)
{
final TaskFileOffsetExt off = new TaskFileOffsetExt();
off.setFileType(getType(type));
off.setSubmittedTaskInfoId(taskId);
off.setOffset(offset);
......@@ -150,7 +154,7 @@ public class HaaSClient {
return files;
}
private SynchronizableFilesExt getType(SynchronizableFileType type) {
private SynchronizableFilesExt getType(final SynchronizableFileType type) {
switch (type) {
case LogFile:
return SynchronizableFilesExt.LOG_FILE;
......@@ -167,12 +171,13 @@ public class HaaSClient {
}
}
private static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas_java_client.HaaSClient.class);
private static Logger log = LoggerFactory.getLogger(
cz.it4i.fiji.haas_java_client.HaaSClient.class);
final static private Map<JobStateExt, JobState> WS_STATE2STATE;
static {
Map<JobStateExt, JobState> map = new HashMap<>();
final Map<JobStateExt, JobState> map = new HashMap<>();
map.put(JobStateExt.CANCELED, JobState.Canceled);
map.put(JobStateExt.CONFIGURING, JobState.Configuring);
map.put(JobStateExt.FAILED, JobState.Failed);
......@@ -193,41 +198,51 @@ public class HaaSClient {
private final String projectId;
private final Map<Long, P_FileTransferPool> filetransferPoolMap = new HashMap<>();
private final Map<Long, P_FileTransferPool> filetransferPoolMap =
new HashMap<>();
private final HaaSClientSettings settings;
private DataTransferWsSoap dataTransferWs;
public HaaSClient(HaaSClientSettings settings) {
public HaaSClient(final HaaSClientSettings settings) {
this.settings = settings;
this.projectId = settings.getProjectId();
}
public long createJob(JobSettings jobSettings, Collection<Entry<String, String>> templateParameters) {
public long createJob(final JobSettings jobSettings,
final Collection<Entry<String, String>> templateParameters)
{
return doCreateJob(jobSettings, templateParameters);
}
public HaaSFileTransfer startFileTransfer(long jobId, TransferFileProgress notifier) {
public HaaSFileTransfer startFileTransfer(final long jobId,
final TransferFileProgress notifier)
{
try {
return createFileTransfer(jobId, notifier);
} catch (RemoteException | ServiceException | UnsupportedEncodingException | JSchException e) {
}
catch (RemoteException | ServiceException | UnsupportedEncodingException
| JSchException e)
{
throw new HaaSClientException(e);
}
}
public HaaSFileTransfer startFileTransfer(long jobId) {
public HaaSFileTransfer startFileTransfer(final long jobId) {
return startFileTransfer(jobId, DUMMY_TRANSFER_FILE_PROGRESS);
}
public TunnelToNode openTunnel(long jobId, String nodeIP, int localPort, int remotePort) {
public TunnelToNode openTunnel(final long jobId, final String nodeIP,
final int localPort, final int remotePort)
{
MiddlewareTunnel tunnel;
try {
tunnel = new MiddlewareTunnel(Executors.newCachedThreadPool(), jobId, nodeIP, getSessionID());
tunnel = new MiddlewareTunnel(Executors.newCachedThreadPool(), jobId,
nodeIP, getSessionID());
tunnel.open(localPort, remotePort);
return new TunnelToNode() {
@Override
public void close() throws IOException {
tunnel.close();
......@@ -243,22 +258,25 @@ public class HaaSClient {
return tunnel.getLocalHost();
}
};
} catch (IOException e) {
}
catch (final IOException e) {
log.error(e.getMessage(), e);
throw new HaaSClientException(e);
}
}
public void submitJob(long jobId) {
public void submitJob(final long jobId) {
doSubmitJob(jobId);
}
public JobInfo obtainJobInfo(long jobId) {
final SubmittedJobInfoExt info = getJobManagement().getCurrentInfoForJob(jobId, getSessionID());
public JobInfo obtainJobInfo(final long jobId) {
final SubmittedJobInfoExt info = getJobManagement().getCurrentInfoForJob(
jobId, getSessionID());
final Collection<Long> tasksId = info.getTasks().getSubmittedTaskInfoExt().stream().map(ti -> ti.getId())
.collect(Collectors.toList());
final Collection<Long> tasksId = info.getTasks().getSubmittedTaskInfoExt()
.stream().map(ti -> ti.getId()).collect(Collectors.toList());
return new JobInfo() {
private List<String> ips;
@Override
......@@ -298,26 +316,32 @@ public class HaaSClient {
};
}
public Collection<JobFileContentExt> downloadPartsOfJobFiles(Long jobId, HaaSClient.SynchronizableFiles files) {
ArrayOfTaskFileOffsetExt fileOffsetExt = new ArrayOfTaskFileOffsetExt();
public Collection<JobFileContentExt> downloadPartsOfJobFiles(final Long jobId,
final HaaSClient.SynchronizableFiles files)
{
final ArrayOfTaskFileOffsetExt fileOffsetExt =
new ArrayOfTaskFileOffsetExt();
fileOffsetExt.getTaskFileOffsetExt().addAll(files.getFiles());
return getFileTransfer().downloadPartsOfJobFilesFromCluster(jobId, fileOffsetExt, getSessionID())
.getJobFileContentExt();
return getFileTransfer().downloadPartsOfJobFilesFromCluster(jobId,
fileOffsetExt, getSessionID()).getJobFileContentExt();
}
public Collection<String> getChangedFiles(long jobId) {
return getFileTransfer().listChangedFilesForJob(jobId, getSessionID()).getString();
public Collection<String> getChangedFiles(final long jobId) {
return getFileTransfer().listChangedFilesForJob(jobId, getSessionID())
.getString();
}
public void cancelJob(Long jobId) {
public void cancelJob(final Long jobId) {
getJobManagement().cancelJob(jobId, getSessionID());
}
public void deleteJob(long id) {
public void deleteJob(final long id) {
getJobManagement().deleteJob(id, getSessionID());
}
public HaaSDataTransfer startDataTransfer(long jobId, int nodeNumber, int port) {
public HaaSDataTransfer startDataTransfer(final long jobId,
final int nodeNumber, final int port)
{
return createDataTransfer(jobId, nodeNumber, port);
}
......@@ -328,81 +352,127 @@ public class HaaSClient {
return sessionID;
}
private HaaSFileTransferImp createFileTransfer(long jobId, TransferFileProgress progress)
throws RemoteException, UnsupportedEncodingException, ServiceException, JSchException {
P_FileTransferPool pool = filetransferPoolMap.computeIfAbsent(jobId, id -> new P_FileTransferPool(id));
FileTransferMethodExt ft = pool.obtain();
private HaaSFileTransferImp createFileTransfer(final long jobId,
final TransferFileProgress progress) throws RemoteException,
UnsupportedEncodingException, ServiceException, JSchException
{
final P_FileTransferPool pool = filetransferPoolMap.computeIfAbsent(jobId,
id -> new P_FileTransferPool(id));
final FileTransferMethodExt ft = pool.obtain();
try {
return new HaaSFileTransferImp(ft, getScpClient(ft), progress) {
@Override
public void close() {
super.close();
try {
pool.release();
} catch (RemoteException | ServiceException e) {
}
catch (RemoteException | ServiceException e) {
throw new HaaSClientException(e);
}
}
};
} catch (UnsupportedEncodingException | JSchException e) {
}
catch (UnsupportedEncodingException | JSchException e) {
pool.release();
throw e;
}
}
private HaaSDataTransfer createDataTransfer(long jobId, int nodeNumber, int port) {
String host = getJobManagement().getAllocatedNodesIPs(jobId, getSessionID()).getString().get(nodeNumber);
DataTransferWsSoap ws = getDataTransfer();
DataTransferMethodExt dataTransferMethodExt = ws.getDataTransferMethod(host, port, jobId,
getSessionID());
String sessionId = getSessionID();
private HaaSDataTransfer createDataTransfer(final long jobId,
final int nodeNumber, final int port)
{
final String host = getJobManagement().getAllocatedNodesIPs(jobId,
getSessionID()).getString().get(nodeNumber);
final DataTransferWsSoap ws = getDataTransfer();
final DataTransferMethodExt dataTransferMethodExt = ws
.getDataTransferMethod(host, port, jobId, getSessionID());
final String sessionId = getSessionID();
return new HaaSDataTransfer() {
@Override
public void close() throws IOException {
if (log.isDebugEnabled()) {
log.debug("close");
}
ws.endDataTransfer(dataTransferMethodExt, sessionId);
if (log.isDebugEnabled()) {
log.debug("close - DONE");
}
}
@Override
public void write(byte[] buffer) {
public void write(final byte[] buffer) {
if (log.isDebugEnabled()) {
log.debug("write: {}", new String(buffer));
}
ws.writeDataToJobNode(buffer, jobId, host, sessionId, false);
if (log.isDebugEnabled()) {
log.debug("write - DONE");
}
}
@Override
public byte[] read() {
return ws.readDataFromJobNode(jobId, host, sessionId);
if (log.isDebugEnabled()) {
log.debug("read: ");
}
final byte[] result = ws.readDataFromJobNode(jobId, host, sessionId);
if (log.isDebugEnabled()) {
log.debug("read - DONE: {}", result != null ? new String(result)
: "EOF");
}
return result;
}
@Override
public void closeConnection() {
if (log.isDebugEnabled()) {
log.debug("closeConnection");
}
ws.writeDataToJobNode(null, jobId, host, sessionId, true);
}};
if (log.isDebugEnabled()) {
log.debug("closeConnection - DONE");
}
}
};
}
private void doSubmitJob(long jobId) {
private void doSubmitJob(final long jobId) {
getJobManagement().submitJob(jobId, getSessionID());
}
private long doCreateJob(JobSettings jobSettings, Collection<Entry<String, String>> templateParameters) {
Collection<TaskSpecificationExt> taskSpec = Arrays
.asList(createTaskSpecification(jobSettings, templateParameters));
JobSpecificationExt jobSpecification = createJobSpecification(jobSettings, taskSpec);
SubmittedJobInfoExt job = getJobManagement().createJob(jobSpecification, getSessionID());
private long doCreateJob(final JobSettings jobSettings,
final Collection<Entry<String, String>> templateParameters)
{
final Collection<TaskSpecificationExt> taskSpec = Arrays.asList(
createTaskSpecification(jobSettings, templateParameters));
final JobSpecificationExt jobSpecification = createJobSpecification(
jobSettings, taskSpec);
final SubmittedJobInfoExt job = getJobManagement().createJob(
jobSpecification, getSessionID());
return job.getId();
}
private ScpClient getScpClient(FileTransferMethodExt fileTransfer)
throws UnsupportedEncodingException, JSchException {
byte[] pvtKey = fileTransfer.getCredentials().getPrivateKey().getBytes("UTF-8");
return new ScpClient(fileTransfer.getServerHostname(), fileTransfer.getCredentials().getUsername(), pvtKey);
private ScpClient getScpClient(final FileTransferMethodExt fileTransfer)
throws UnsupportedEncodingException, JSchException
{
final byte[] pvtKey = fileTransfer.getCredentials().getPrivateKey()
.getBytes("UTF-8");
return new ScpClient(fileTransfer.getServerHostname(), fileTransfer
.getCredentials().getUsername(), pvtKey);
}
private JobSpecificationExt createJobSpecification(JobSettings jobSettings,
Collection<TaskSpecificationExt> tasks) {
JobSpecificationExt testJob = new JobSpecificationExt();
private JobSpecificationExt createJobSpecification(
final JobSettings jobSettings, final Collection<TaskSpecificationExt> tasks)
{
final JobSpecificationExt testJob = new JobSpecificationExt();
testJob.setName(jobSettings.getJobName());
testJob.setMinCores(jobSettings.getNumberOfCoresPerNode() * jobSettings.getNumberOfNodes());
testJob.setMaxCores(jobSettings.getNumberOfCoresPerNode() * jobSettings.getNumberOfNodes());
testJob.setMinCores(jobSettings.getNumberOfCoresPerNode() * jobSettings
.getNumberOfNodes());
testJob.setMaxCores(jobSettings.getNumberOfCoresPerNode() * jobSettings
.getNumberOfNodes());
testJob.setPriority(JobPriorityExt.AVERAGE);
testJob.setProject(projectId);
testJob.setWaitingLimit(null);
......@@ -414,17 +484,22 @@ public class HaaSClient {
testJob.setNotifyOnStart(false);
testJob.setClusterNodeTypeId(jobSettings.getClusterNodeType());
testJob.setEnvironmentVariables(new ArrayOfEnvironmentVariableExt());
testJob.setTasks(getAndFill(new ArrayOfTaskSpecificationExt(), a -> a.getTaskSpecificationExt().addAll(tasks)));
testJob.setTasks(getAndFill(new ArrayOfTaskSpecificationExt(), a -> a
.getTaskSpecificationExt().addAll(tasks)));
return testJob;
}
private TaskSpecificationExt createTaskSpecification(JobSettings jobSettings,
Collection<Entry<String, String>> templateParameters) {
private TaskSpecificationExt createTaskSpecification(
final JobSettings jobSettings,
final Collection<Entry<String, String>> templateParameters)
{
TaskSpecificationExt testTask = new TaskSpecificationExt();
final TaskSpecificationExt testTask = new TaskSpecificationExt();
testTask.setName(jobSettings.getJobName() + "-task");
testTask.setMinCores(jobSettings.getNumberOfCoresPerNode() * jobSettings.getNumberOfNodes());
testTask.setMaxCores(jobSettings.getNumberOfCoresPerNode() * jobSettings.getNumberOfNodes());
testTask.setMinCores(jobSettings.getNumberOfCoresPerNode() * jobSettings
.getNumberOfNodes());
testTask.setMaxCores(jobSettings.getNumberOfCoresPerNode() * jobSettings
.getNumberOfNodes());
testTask.setWalltimeLimit(jobSettings.getWalltimeLimit());
testTask.setRequiredNodes(null);
testTask.setIsExclusive(false);
......@@ -438,17 +513,18 @@ public class HaaSClient {
testTask.setCommandTemplateId(jobSettings.getTemplateId());
testTask.setEnvironmentVariables(new ArrayOfEnvironmentVariableExt());
testTask.setDependsOn(null);
testTask.setTemplateParameterValues(getAndFill(new ArrayOfCommandTemplateParameterValueExt(),
t -> t.getCommandTemplateParameterValueExt()
.addAll(templateParameters.stream()
.map(pair -> createCommandTemplateParameterValueExt(pair.getKey(), pair.getValue()))
.collect(Collectors.toList()))));
testTask.setTemplateParameterValues(getAndFill(
new ArrayOfCommandTemplateParameterValueExt(), t -> t
.getCommandTemplateParameterValueExt().addAll(templateParameters
.stream().map(pair -> createCommandTemplateParameterValueExt(pair
.getKey(), pair.getValue())).collect(Collectors.toList()))));
return testTask;
}
private String authenticate() {
return getUserAndLimitationManagement()
.authenticateUserPassword(createPasswordCredentialsExt(settings.getUserName(), settings.getPassword()));
return getUserAndLimitationManagement().authenticateUserPassword(
createPasswordCredentialsExt(settings.getUserName(), settings
.getPassword()));
}
synchronized private DataTransferWsSoap getDataTransfer() {
......@@ -458,9 +534,12 @@ public class HaaSClient {
return dataTransferWs;
}
synchronized private UserAndLimitationManagementWsSoap getUserAndLimitationManagement() {
synchronized private UserAndLimitationManagementWsSoap
getUserAndLimitationManagement()
{
if (userAndLimitationManagement == null) {
userAndLimitationManagement = new UserAndLimitationManagementWs().getUserAndLimitationManagementWsSoap12();
userAndLimitationManagement = new UserAndLimitationManagementWs()
.getUserAndLimitationManagementWsSoap12();
}
return userAndLimitationManagement;
}
......@@ -479,51 +558,54 @@ public class HaaSClient {
return fileTransferWS;
}
public static class P_ProgressNotifierDecorator4Size extends P_ProgressNotifierDecorator {
public static class P_ProgressNotifierDecorator4Size extends
P_ProgressNotifierDecorator
{
private static final int SIZE_RATIO = 20;
public P_ProgressNotifierDecorator4Size(ProgressNotifier notifier) {
public P_ProgressNotifierDecorator4Size(final ProgressNotifier notifier) {
super(notifier);
}
@Override
public void setItemCount(int count, int total) {
public void setItemCount(final int count, final int total) {
super.setItemCount(count, total);
setCount(count, total * SIZE_RATIO);
}
}
public static class P_ProgressNotifierDecorator implements ProgressNotifier {
private final ProgressNotifier notifier;
public P_ProgressNotifierDecorator(ProgressNotifier notifier) {
public P_ProgressNotifierDecorator(final ProgressNotifier notifier) {
this.notifier = notifier;
}
@Override
public void setTitle(String title) {
public void setTitle(final String title) {
notifier.setTitle(title);
}
@Override
public void setCount(int count, int total) {
public void setCount(final int count, final int total) {
notifier.setCount(count, total);
}
@Override
public void addItem(Object item) {
public void addItem(final Object item) {
notifier.addItem(item);
}
@Override
public void setItemCount(int count, int total) {
public void setItemCount(final int count, final int total) {
notifier.setItemCount(count, total);
}
@Override
public void itemDone(Object item) {
public void itemDone(final Object item) {
notifier.itemDone(item);
}
......@@ -544,17 +626,22 @@ public class HaaSClient {
}
private class P_FileTransferPool {
private FileTransferMethodExt holded;
private int counter;
private final P_Supplier<FileTransferMethodExt> factory;
private final P_Consumer<FileTransferMethodExt> destroyer;
public P_FileTransferPool(long jobId) {
this.factory = () -> getFileTransfer().getFileTransferMethod(jobId, getSessionID());
this.destroyer = val -> getFileTransfer().endFileTransfer(jobId, val, sessionID);
public P_FileTransferPool(final long jobId) {
this.factory = () -> getFileTransfer().getFileTransferMethod(jobId,
getSessionID());
this.destroyer = val -> getFileTransfer().endFileTransfer(jobId, val,
sessionID);
}
public synchronized FileTransferMethodExt obtain() throws RemoteException, ServiceException {
public synchronized FileTransferMethodExt obtain() throws RemoteException,
ServiceException
{
if (holded == null) {
holded = factory.get();
}
......@@ -562,7 +649,9 @@ public class HaaSClient {
return holded;
}
public synchronized void release() throws RemoteException, ServiceException {
public synchronized void release() throws RemoteException,
ServiceException
{
if (--counter == 0) {
destroyer.accept(holded);
holded = null;
......@@ -571,24 +660,30 @@ public class HaaSClient {
}
private static <T> T getAndFill(T value, Consumer<T> filler) {
private static <T> T getAndFill(final T value, final Consumer<T> filler) {
filler.accept(value);
return value;
}
private static Calendar toGregorian(XMLGregorianCalendar time) {
return Optional.ofNullable(time).map(t -> t.toGregorianCalendar()).orElse(null);
private static Calendar toGregorian(final XMLGregorianCalendar time) {
return Optional.ofNullable(time).map(t -> t.toGregorianCalendar()).orElse(
null);
}
private static CommandTemplateParameterValueExt createCommandTemplateParameterValueExt(String key, String value) {
CommandTemplateParameterValueExt result = new CommandTemplateParameterValueExt();
private static CommandTemplateParameterValueExt
createCommandTemplateParameterValueExt(final String key, final String value)
{
final CommandTemplateParameterValueExt result =
new CommandTemplateParameterValueExt();
result.setCommandParameterIdentifier(key);
result.setParameterValue(value);
return result;
}
private static PasswordCredentialsExt createPasswordCredentialsExt(String userName, String password) {
PasswordCredentialsExt result = new PasswordCredentialsExt();
private static PasswordCredentialsExt createPasswordCredentialsExt(
final String userName, final String password)
{
final PasswordCredentialsExt result = new PasswordCredentialsExt();
result.setUsername(userName);
result.setPassword(password);
return result;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment