From 41cb1af32ef78c95e684dd7356b3b9bc077d3553 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Ko=C5=BEusznik?= <jan@kozusznik.cz> Date: Wed, 11 Jul 2018 20:45:16 +0200 Subject: [PATCH] code: clean up and add debug loging --- .../fiji/haas_java_client/HaaSClient.java | 377 +++++++++++------- 1 file changed, 236 insertions(+), 141 deletions(-) diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java index 18c1dd2d..ee0f3e47 100644 --- a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java @@ -1,3 +1,4 @@ + package cz.it4i.fiji.haas_java_client; import com.jcraft.jsch.JSchException; @@ -57,50 +58,49 @@ import cz.it4i.fiji.scpclient.TransferFileProgress; public class HaaSClient { - public static final TransferFileProgress DUMMY_TRANSFER_FILE_PROGRESS = new TransferFileProgress() { + public static final TransferFileProgress DUMMY_TRANSFER_FILE_PROGRESS = + new TransferFileProgress() + { - @Override - public void dataTransfered(long bytesTransfered) { - // TODO Auto-generated method stub + @Override + public void dataTransfered(final long bytesTransfered) { + // TODO Auto-generated method stub - } - }; + } + }; - public static ProgressNotifier DUMMY_PROGRESS_NOTIFIER = new ProgressNotifier() { + public static ProgressNotifier DUMMY_PROGRESS_NOTIFIER = + new ProgressNotifier() + { - @Override - public void setTitle(String title) { - } + @Override + public void setTitle(final String title) {} - @Override - public void setItemCount(int count, int total) { - } + @Override + public void setItemCount(final int count, final int total) {} - @Override - public void setCount(int count, int total) { - } + @Override + public void setCount(final int count, final int total) {} - @Override - public void itemDone(Object item) { - } + @Override + public void itemDone(final Object item) {} - @Override - public void done() { - } + @Override + public void done() {} - @Override - public void addItem(Object item) { - } - }; + @Override + public void addItem(final Object item) {} + }; - public static UploadingFile getUploadingFile(Path file) { + public static UploadingFile getUploadingFile(final Path file) { return new UploadingFile() { @Override public InputStream getInputStream() { try { return Files.newInputStream(file); - } catch (IOException e) { + } + catch (final IOException e) { log.error(e.getMessage(), e); throw new RuntimeException(e); } @@ -115,7 +115,8 @@ public class HaaSClient { public long getLength() { try { return Files.size(file); - } catch (IOException e) { + } + catch (final IOException e) { log.error(e.getMessage(), e); throw new RuntimeException(e); } @@ -125,7 +126,8 @@ public class HaaSClient { public long getLastTime() { try { return Files.getLastModifiedTime(file).toMillis(); - } catch (IOException e) { + } + catch (final IOException e) { log.error(e.getMessage(), e); throw new RuntimeException(e); } @@ -138,8 +140,10 @@ public class HaaSClient { private final Collection<TaskFileOffsetExt> files = new LinkedList<>(); - public void addFile(long taskId, SynchronizableFileType type, long offset) { - TaskFileOffsetExt off = new TaskFileOffsetExt(); + public void addFile(final long taskId, final SynchronizableFileType type, + final long offset) + { + final TaskFileOffsetExt off = new TaskFileOffsetExt(); off.setFileType(getType(type)); off.setSubmittedTaskInfoId(taskId); off.setOffset(offset); @@ -150,29 +154,30 @@ public class HaaSClient { return files; } - private SynchronizableFilesExt getType(SynchronizableFileType type) { + private SynchronizableFilesExt getType(final SynchronizableFileType type) { switch (type) { - case LogFile: - return SynchronizableFilesExt.LOG_FILE; - case ProgressFile: - return SynchronizableFilesExt.PROGRESS_FILE; - case StandardErrorFile: - return SynchronizableFilesExt.STANDARD_ERROR_FILE; - case StandardOutputFile: - return SynchronizableFilesExt.STANDARD_OUTPUT_FILE; - default: - throw new UnsupportedOperationException("Unsupported type: " + type); + case LogFile: + return SynchronizableFilesExt.LOG_FILE; + case ProgressFile: + return SynchronizableFilesExt.PROGRESS_FILE; + case StandardErrorFile: + return SynchronizableFilesExt.STANDARD_ERROR_FILE; + case StandardOutputFile: + return SynchronizableFilesExt.STANDARD_OUTPUT_FILE; + default: + throw new UnsupportedOperationException("Unsupported type: " + type); } } } - private static Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas_java_client.HaaSClient.class); + private static Logger log = LoggerFactory.getLogger( + cz.it4i.fiji.haas_java_client.HaaSClient.class); final static private Map<JobStateExt, JobState> WS_STATE2STATE; static { - Map<JobStateExt, JobState> map = new HashMap<>(); + final Map<JobStateExt, JobState> map = new HashMap<>(); map.put(JobStateExt.CANCELED, JobState.Canceled); map.put(JobStateExt.CONFIGURING, JobState.Configuring); map.put(JobStateExt.FAILED, JobState.Failed); @@ -193,41 +198,51 @@ public class HaaSClient { private final String projectId; - private final Map<Long, P_FileTransferPool> filetransferPoolMap = new HashMap<>(); + private final Map<Long, P_FileTransferPool> filetransferPoolMap = + new HashMap<>(); private final HaaSClientSettings settings; private DataTransferWsSoap dataTransferWs; - - - public HaaSClient(HaaSClientSettings settings) { + public HaaSClient(final HaaSClientSettings settings) { this.settings = settings; this.projectId = settings.getProjectId(); } - public long createJob(JobSettings jobSettings, Collection<Entry<String, String>> templateParameters) { + public long createJob(final JobSettings jobSettings, + final Collection<Entry<String, String>> templateParameters) + { return doCreateJob(jobSettings, templateParameters); } - public HaaSFileTransfer startFileTransfer(long jobId, TransferFileProgress notifier) { + public HaaSFileTransfer startFileTransfer(final long jobId, + final TransferFileProgress notifier) + { try { return createFileTransfer(jobId, notifier); - } catch (RemoteException | ServiceException | UnsupportedEncodingException | JSchException e) { + } + catch (RemoteException | ServiceException | UnsupportedEncodingException + | JSchException e) + { throw new HaaSClientException(e); } } - public HaaSFileTransfer startFileTransfer(long jobId) { + public HaaSFileTransfer startFileTransfer(final long jobId) { return startFileTransfer(jobId, DUMMY_TRANSFER_FILE_PROGRESS); } - public TunnelToNode openTunnel(long jobId, String nodeIP, int localPort, int remotePort) { + public TunnelToNode openTunnel(final long jobId, final String nodeIP, + final int localPort, final int remotePort) + { MiddlewareTunnel tunnel; try { - tunnel = new MiddlewareTunnel(Executors.newCachedThreadPool(), jobId, nodeIP, getSessionID()); + tunnel = new MiddlewareTunnel(Executors.newCachedThreadPool(), jobId, + nodeIP, getSessionID()); tunnel.open(localPort, remotePort); return new TunnelToNode() { + @Override public void close() throws IOException { tunnel.close(); @@ -243,22 +258,25 @@ public class HaaSClient { return tunnel.getLocalHost(); } }; - } catch (IOException e) { + } + catch (final IOException e) { log.error(e.getMessage(), e); throw new HaaSClientException(e); } } - public void submitJob(long jobId) { + public void submitJob(final long jobId) { doSubmitJob(jobId); } - public JobInfo obtainJobInfo(long jobId) { - final SubmittedJobInfoExt info = getJobManagement().getCurrentInfoForJob(jobId, getSessionID()); + public JobInfo obtainJobInfo(final long jobId) { + final SubmittedJobInfoExt info = getJobManagement().getCurrentInfoForJob( + jobId, getSessionID()); - final Collection<Long> tasksId = info.getTasks().getSubmittedTaskInfoExt().stream().map(ti -> ti.getId()) - .collect(Collectors.toList()); + final Collection<Long> tasksId = info.getTasks().getSubmittedTaskInfoExt() + .stream().map(ti -> ti.getId()).collect(Collectors.toList()); return new JobInfo() { + private List<String> ips; @Override @@ -298,26 +316,32 @@ public class HaaSClient { }; } - public Collection<JobFileContentExt> downloadPartsOfJobFiles(Long jobId, HaaSClient.SynchronizableFiles files) { - ArrayOfTaskFileOffsetExt fileOffsetExt = new ArrayOfTaskFileOffsetExt(); + public Collection<JobFileContentExt> downloadPartsOfJobFiles(final Long jobId, + final HaaSClient.SynchronizableFiles files) + { + final ArrayOfTaskFileOffsetExt fileOffsetExt = + new ArrayOfTaskFileOffsetExt(); fileOffsetExt.getTaskFileOffsetExt().addAll(files.getFiles()); - return getFileTransfer().downloadPartsOfJobFilesFromCluster(jobId, fileOffsetExt, getSessionID()) - .getJobFileContentExt(); + return getFileTransfer().downloadPartsOfJobFilesFromCluster(jobId, + fileOffsetExt, getSessionID()).getJobFileContentExt(); } - public Collection<String> getChangedFiles(long jobId) { - return getFileTransfer().listChangedFilesForJob(jobId, getSessionID()).getString(); + public Collection<String> getChangedFiles(final long jobId) { + return getFileTransfer().listChangedFilesForJob(jobId, getSessionID()) + .getString(); } - public void cancelJob(Long jobId) { + public void cancelJob(final Long jobId) { getJobManagement().cancelJob(jobId, getSessionID()); } - public void deleteJob(long id) { + public void deleteJob(final long id) { getJobManagement().deleteJob(id, getSessionID()); } - public HaaSDataTransfer startDataTransfer(long jobId, int nodeNumber, int port) { + public HaaSDataTransfer startDataTransfer(final long jobId, + final int nodeNumber, final int port) + { return createDataTransfer(jobId, nodeNumber, port); } @@ -328,81 +352,127 @@ public class HaaSClient { return sessionID; } - private HaaSFileTransferImp createFileTransfer(long jobId, TransferFileProgress progress) - throws RemoteException, UnsupportedEncodingException, ServiceException, JSchException { - P_FileTransferPool pool = filetransferPoolMap.computeIfAbsent(jobId, id -> new P_FileTransferPool(id)); - FileTransferMethodExt ft = pool.obtain(); + private HaaSFileTransferImp createFileTransfer(final long jobId, + final TransferFileProgress progress) throws RemoteException, + UnsupportedEncodingException, ServiceException, JSchException + { + final P_FileTransferPool pool = filetransferPoolMap.computeIfAbsent(jobId, + id -> new P_FileTransferPool(id)); + final FileTransferMethodExt ft = pool.obtain(); try { return new HaaSFileTransferImp(ft, getScpClient(ft), progress) { + @Override public void close() { super.close(); try { pool.release(); - } catch (RemoteException | ServiceException e) { + } + catch (RemoteException | ServiceException e) { throw new HaaSClientException(e); } } }; - } catch (UnsupportedEncodingException | JSchException e) { + } + catch (UnsupportedEncodingException | JSchException e) { pool.release(); throw e; } } - private HaaSDataTransfer createDataTransfer(long jobId, int nodeNumber, int port) { - String host = getJobManagement().getAllocatedNodesIPs(jobId, getSessionID()).getString().get(nodeNumber); - DataTransferWsSoap ws = getDataTransfer(); - DataTransferMethodExt dataTransferMethodExt = ws.getDataTransferMethod(host, port, jobId, - getSessionID()); - String sessionId = getSessionID(); + private HaaSDataTransfer createDataTransfer(final long jobId, + final int nodeNumber, final int port) + { + final String host = getJobManagement().getAllocatedNodesIPs(jobId, + getSessionID()).getString().get(nodeNumber); + final DataTransferWsSoap ws = getDataTransfer(); + final DataTransferMethodExt dataTransferMethodExt = ws + .getDataTransferMethod(host, port, jobId, getSessionID()); + final String sessionId = getSessionID(); return new HaaSDataTransfer() { - + @Override public void close() throws IOException { + if (log.isDebugEnabled()) { + log.debug("close"); + } ws.endDataTransfer(dataTransferMethodExt, sessionId); + if (log.isDebugEnabled()) { + log.debug("close - DONE"); + } } - + @Override - public void write(byte[] buffer) { + public void write(final byte[] buffer) { + if (log.isDebugEnabled()) { + log.debug("write: {}", new String(buffer)); + } ws.writeDataToJobNode(buffer, jobId, host, sessionId, false); + if (log.isDebugEnabled()) { + log.debug("write - DONE"); + } } - + @Override public byte[] read() { - return ws.readDataFromJobNode(jobId, host, sessionId); + if (log.isDebugEnabled()) { + log.debug("read: "); + } + final byte[] result = ws.readDataFromJobNode(jobId, host, sessionId); + if (log.isDebugEnabled()) { + log.debug("read - DONE: {}", result != null ? new String(result) + : "EOF"); + } + return result; } - + @Override public void closeConnection() { + if (log.isDebugEnabled()) { + log.debug("closeConnection"); + } ws.writeDataToJobNode(null, jobId, host, sessionId, true); - }}; + if (log.isDebugEnabled()) { + log.debug("closeConnection - DONE"); + } + } + }; } - private void doSubmitJob(long jobId) { + private void doSubmitJob(final long jobId) { getJobManagement().submitJob(jobId, getSessionID()); } - private long doCreateJob(JobSettings jobSettings, Collection<Entry<String, String>> templateParameters) { - Collection<TaskSpecificationExt> taskSpec = Arrays - .asList(createTaskSpecification(jobSettings, templateParameters)); - JobSpecificationExt jobSpecification = createJobSpecification(jobSettings, taskSpec); - SubmittedJobInfoExt job = getJobManagement().createJob(jobSpecification, getSessionID()); + private long doCreateJob(final JobSettings jobSettings, + final Collection<Entry<String, String>> templateParameters) + { + final Collection<TaskSpecificationExt> taskSpec = Arrays.asList( + createTaskSpecification(jobSettings, templateParameters)); + final JobSpecificationExt jobSpecification = createJobSpecification( + jobSettings, taskSpec); + final SubmittedJobInfoExt job = getJobManagement().createJob( + jobSpecification, getSessionID()); return job.getId(); } - private ScpClient getScpClient(FileTransferMethodExt fileTransfer) - throws UnsupportedEncodingException, JSchException { - byte[] pvtKey = fileTransfer.getCredentials().getPrivateKey().getBytes("UTF-8"); - return new ScpClient(fileTransfer.getServerHostname(), fileTransfer.getCredentials().getUsername(), pvtKey); + private ScpClient getScpClient(final FileTransferMethodExt fileTransfer) + throws UnsupportedEncodingException, JSchException + { + final byte[] pvtKey = fileTransfer.getCredentials().getPrivateKey() + .getBytes("UTF-8"); + return new ScpClient(fileTransfer.getServerHostname(), fileTransfer + .getCredentials().getUsername(), pvtKey); } - private JobSpecificationExt createJobSpecification(JobSettings jobSettings, - Collection<TaskSpecificationExt> tasks) { - JobSpecificationExt testJob = new JobSpecificationExt(); + private JobSpecificationExt createJobSpecification( + final JobSettings jobSettings, final Collection<TaskSpecificationExt> tasks) + { + final JobSpecificationExt testJob = new JobSpecificationExt(); testJob.setName(jobSettings.getJobName()); - testJob.setMinCores(jobSettings.getNumberOfCoresPerNode() * jobSettings.getNumberOfNodes()); - testJob.setMaxCores(jobSettings.getNumberOfCoresPerNode() * jobSettings.getNumberOfNodes()); + testJob.setMinCores(jobSettings.getNumberOfCoresPerNode() * jobSettings + .getNumberOfNodes()); + testJob.setMaxCores(jobSettings.getNumberOfCoresPerNode() * jobSettings + .getNumberOfNodes()); testJob.setPriority(JobPriorityExt.AVERAGE); testJob.setProject(projectId); testJob.setWaitingLimit(null); @@ -414,17 +484,22 @@ public class HaaSClient { testJob.setNotifyOnStart(false); testJob.setClusterNodeTypeId(jobSettings.getClusterNodeType()); testJob.setEnvironmentVariables(new ArrayOfEnvironmentVariableExt()); - testJob.setTasks(getAndFill(new ArrayOfTaskSpecificationExt(), a -> a.getTaskSpecificationExt().addAll(tasks))); + testJob.setTasks(getAndFill(new ArrayOfTaskSpecificationExt(), a -> a + .getTaskSpecificationExt().addAll(tasks))); return testJob; } - private TaskSpecificationExt createTaskSpecification(JobSettings jobSettings, - Collection<Entry<String, String>> templateParameters) { + private TaskSpecificationExt createTaskSpecification( + final JobSettings jobSettings, + final Collection<Entry<String, String>> templateParameters) + { - TaskSpecificationExt testTask = new TaskSpecificationExt(); + final TaskSpecificationExt testTask = new TaskSpecificationExt(); testTask.setName(jobSettings.getJobName() + "-task"); - testTask.setMinCores(jobSettings.getNumberOfCoresPerNode() * jobSettings.getNumberOfNodes()); - testTask.setMaxCores(jobSettings.getNumberOfCoresPerNode() * jobSettings.getNumberOfNodes()); + testTask.setMinCores(jobSettings.getNumberOfCoresPerNode() * jobSettings + .getNumberOfNodes()); + testTask.setMaxCores(jobSettings.getNumberOfCoresPerNode() * jobSettings + .getNumberOfNodes()); testTask.setWalltimeLimit(jobSettings.getWalltimeLimit()); testTask.setRequiredNodes(null); testTask.setIsExclusive(false); @@ -438,29 +513,33 @@ public class HaaSClient { testTask.setCommandTemplateId(jobSettings.getTemplateId()); testTask.setEnvironmentVariables(new ArrayOfEnvironmentVariableExt()); testTask.setDependsOn(null); - testTask.setTemplateParameterValues(getAndFill(new ArrayOfCommandTemplateParameterValueExt(), - t -> t.getCommandTemplateParameterValueExt() - .addAll(templateParameters.stream() - .map(pair -> createCommandTemplateParameterValueExt(pair.getKey(), pair.getValue())) - .collect(Collectors.toList())))); + testTask.setTemplateParameterValues(getAndFill( + new ArrayOfCommandTemplateParameterValueExt(), t -> t + .getCommandTemplateParameterValueExt().addAll(templateParameters + .stream().map(pair -> createCommandTemplateParameterValueExt(pair + .getKey(), pair.getValue())).collect(Collectors.toList())))); return testTask; } private String authenticate() { - return getUserAndLimitationManagement() - .authenticateUserPassword(createPasswordCredentialsExt(settings.getUserName(), settings.getPassword())); + return getUserAndLimitationManagement().authenticateUserPassword( + createPasswordCredentialsExt(settings.getUserName(), settings + .getPassword())); } synchronized private DataTransferWsSoap getDataTransfer() { - if(dataTransferWs == null) { + if (dataTransferWs == null) { dataTransferWs = new DataTransferWs().getDataTransferWsSoap12(); } return dataTransferWs; } - synchronized private UserAndLimitationManagementWsSoap getUserAndLimitationManagement() { + synchronized private UserAndLimitationManagementWsSoap + getUserAndLimitationManagement() + { if (userAndLimitationManagement == null) { - userAndLimitationManagement = new UserAndLimitationManagementWs().getUserAndLimitationManagementWsSoap12(); + userAndLimitationManagement = new UserAndLimitationManagementWs() + .getUserAndLimitationManagementWsSoap12(); } return userAndLimitationManagement; } @@ -479,51 +558,54 @@ public class HaaSClient { return fileTransferWS; } - public static class P_ProgressNotifierDecorator4Size extends P_ProgressNotifierDecorator { + public static class P_ProgressNotifierDecorator4Size extends + P_ProgressNotifierDecorator + { private static final int SIZE_RATIO = 20; - public P_ProgressNotifierDecorator4Size(ProgressNotifier notifier) { + public P_ProgressNotifierDecorator4Size(final ProgressNotifier notifier) { super(notifier); } @Override - public void setItemCount(int count, int total) { + public void setItemCount(final int count, final int total) { super.setItemCount(count, total); setCount(count, total * SIZE_RATIO); } } public static class P_ProgressNotifierDecorator implements ProgressNotifier { + private final ProgressNotifier notifier; - public P_ProgressNotifierDecorator(ProgressNotifier notifier) { + public P_ProgressNotifierDecorator(final ProgressNotifier notifier) { this.notifier = notifier; } @Override - public void setTitle(String title) { + public void setTitle(final String title) { notifier.setTitle(title); } @Override - public void setCount(int count, int total) { + public void setCount(final int count, final int total) { notifier.setCount(count, total); } @Override - public void addItem(Object item) { + public void addItem(final Object item) { notifier.addItem(item); } @Override - public void setItemCount(int count, int total) { + public void setItemCount(final int count, final int total) { notifier.setItemCount(count, total); } @Override - public void itemDone(Object item) { + public void itemDone(final Object item) { notifier.itemDone(item); } @@ -544,17 +626,22 @@ public class HaaSClient { } private class P_FileTransferPool { + private FileTransferMethodExt holded; private int counter; private final P_Supplier<FileTransferMethodExt> factory; private final P_Consumer<FileTransferMethodExt> destroyer; - public P_FileTransferPool(long jobId) { - this.factory = () -> getFileTransfer().getFileTransferMethod(jobId, getSessionID()); - this.destroyer = val -> getFileTransfer().endFileTransfer(jobId, val, sessionID); + public P_FileTransferPool(final long jobId) { + this.factory = () -> getFileTransfer().getFileTransferMethod(jobId, + getSessionID()); + this.destroyer = val -> getFileTransfer().endFileTransfer(jobId, val, + sessionID); } - public synchronized FileTransferMethodExt obtain() throws RemoteException, ServiceException { + public synchronized FileTransferMethodExt obtain() throws RemoteException, + ServiceException + { if (holded == null) { holded = factory.get(); } @@ -562,7 +649,9 @@ public class HaaSClient { return holded; } - public synchronized void release() throws RemoteException, ServiceException { + public synchronized void release() throws RemoteException, + ServiceException + { if (--counter == 0) { destroyer.accept(holded); holded = null; @@ -571,24 +660,30 @@ public class HaaSClient { } - private static <T> T getAndFill(T value, Consumer<T> filler) { + private static <T> T getAndFill(final T value, final Consumer<T> filler) { filler.accept(value); return value; } - private static Calendar toGregorian(XMLGregorianCalendar time) { - return Optional.ofNullable(time).map(t -> t.toGregorianCalendar()).orElse(null); + private static Calendar toGregorian(final XMLGregorianCalendar time) { + return Optional.ofNullable(time).map(t -> t.toGregorianCalendar()).orElse( + null); } - private static CommandTemplateParameterValueExt createCommandTemplateParameterValueExt(String key, String value) { - CommandTemplateParameterValueExt result = new CommandTemplateParameterValueExt(); + private static CommandTemplateParameterValueExt + createCommandTemplateParameterValueExt(final String key, final String value) + { + final CommandTemplateParameterValueExt result = + new CommandTemplateParameterValueExt(); result.setCommandParameterIdentifier(key); result.setParameterValue(value); return result; } - private static PasswordCredentialsExt createPasswordCredentialsExt(String userName, String password) { - PasswordCredentialsExt result = new PasswordCredentialsExt(); + private static PasswordCredentialsExt createPasswordCredentialsExt( + final String userName, final String password) + { + final PasswordCredentialsExt result = new PasswordCredentialsExt(); result.setUsername(userName); result.setPassword(password); return result; -- GitLab