From a80dbff22caba712a75ec2665a58bb0ebe0af391 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Ko=C5=BEusznik?= <jan@kozusznik.cz> Date: Wed, 20 Jun 2018 10:39:31 +0200 Subject: [PATCH] feature: integrate tunnel with HaaSClient --- haas-java-client/pom.xml | 2 +- .../fiji/haas_java_client/HaaSClient.java | 35 ++++++++++++++++--- .../it4i/fiji/haas_java_client/JobInfo.java | 3 ++ .../haas_java_client/MidlewareTunnel.java | 6 ++-- .../fiji/haas_java_client/TunnelToNode.java | 7 ++++ .../TestCommunicationWithNodes.java | 25 ++++--------- 6 files changed, 50 insertions(+), 28 deletions(-) create mode 100644 haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/TunnelToNode.java diff --git a/haas-java-client/pom.xml b/haas-java-client/pom.xml index 77a91b03..97bce298 100644 --- a/haas-java-client/pom.xml +++ b/haas-java-client/pom.xml @@ -4,7 +4,7 @@ <modelVersion>4.0.0</modelVersion> <groupId>cz.it4i.fiji</groupId> <artifactId>haas-java-client</artifactId> - <version>0.0.3-SNAPSHOT</version> + <version>0.0.4-SNAPSHOT</version> <name>HaaS library for Java</name> <packaging>jar</packaging> <properties> diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java index 3d6d45a6..68664b19 100644 --- a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/HaaSClient.java @@ -16,6 +16,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.concurrent.Executors; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -233,6 +234,21 @@ public class HaaSClient { public HaaSFileTransfer startFileTransfer(long jobId) { return startFileTransfer(jobId, DUMMY_TRANSFER_FILE_PROGRESS); } + + public TunnelToNode openTunnel(long jobId, String nodeIP, int localPort, int remotePort) throws ServiceException, IOException { + MidlewareTunnel tunnel = new MidlewareTunnel(Executors.newCachedThreadPool(), jobId, nodeIP, getSessionID()); + tunnel.open(localPort, remotePort); + return new TunnelToNode() { + @Override + public void close() throws IOException { + tunnel.close(); + } + @Override + public int getLocalPort() { + return tunnel.getLocalPort(); + } + }; + } public void submitJob(long jobId) { try { @@ -245,9 +261,11 @@ public class HaaSClient { public JobInfo obtainJobInfo(long jobId) { try { final SubmittedJobInfoExt info = getJobManagement().getCurrentInfoForJob(jobId, getSessionID()); + final Collection<Long> tasksId = info.getTasks().getSubmittedTaskInfoExt().stream().map(ti -> ti.getId()) .collect(Collectors.toList()); return new JobInfo() { + private List<String> ips; @Override public Collection<Long> getTasks() { return tasksId; @@ -272,6 +290,18 @@ public class HaaSClient { public Calendar getCreationTime() { return toGregorian(info.getCreationTime()); }; + + @Override + public List<String> getNodesIPs() { + if(ips == null) { + try { + ips = getJobManagement().getAllocatedNodesIPs(jobId, getSessionID()).getString().stream().collect(Collectors.toList()); + } catch (RemoteException | ServiceException e) { + log.error(e.getMessage(), e); + } + } + return ips; + } }; } catch (RemoteException | ServiceException e) { throw new HaaSClientException(e); @@ -552,9 +582,4 @@ public class HaaSClient { return result; } - public List<String> getNodesIps(long id) throws RemoteException, ServiceException { - - return getJobManagement().getAllocatedNodesIPs(id, getSessionID()).getString().stream().collect(Collectors.toList()); - } - } diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/JobInfo.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/JobInfo.java index b0868f76..07b7517e 100644 --- a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/JobInfo.java +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/JobInfo.java @@ -2,6 +2,7 @@ package cz.it4i.fiji.haas_java_client; import java.util.Calendar; import java.util.Collection; +import java.util.List; public interface JobInfo { @@ -15,4 +16,6 @@ public interface JobInfo { Calendar getEndTime(); Calendar getCreationTime(); + + List<String> getNodesIPs(); } diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/MidlewareTunnel.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/MidlewareTunnel.java index 5d5890eb..a9eff0c5 100644 --- a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/MidlewareTunnel.java +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/MidlewareTunnel.java @@ -26,7 +26,7 @@ import cz.it4i.fiji.haas_java_client.proxy.DataTransferMethodExt; import cz.it4i.fiji.haas_java_client.proxy.DataTransferWs; import cz.it4i.fiji.haas_java_client.proxy.DataTransferWsSoap; -public class MidlewareTunnel implements Closeable { +class MidlewareTunnel implements Closeable { public static final Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas_java_client.MidlewareTunnel.class); @@ -64,7 +64,7 @@ public class MidlewareTunnel implements Closeable { open(0, port); } - public void open(int localport, int port) throws UnknownHostException, IOException { + public void open(int localport, int port) throws IOException { open(localport, port, DEFAULT_BACKLOG); } @@ -72,7 +72,7 @@ public class MidlewareTunnel implements Closeable { if (ss != null) { throw new IllegalStateException(); } - ss = new ServerSocket(localport, backlog, InetAddress.getByName("localhost")); + ss = new ServerSocket(localport, backlog, InetAddress.getLocalHost()); ss.setSoTimeout(TIMEOUT); mainLatch = new CountDownLatch(1); mainFuture = executorService.submit(() -> { diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/TunnelToNode.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/TunnelToNode.java new file mode 100644 index 00000000..b9b34c67 --- /dev/null +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/TunnelToNode.java @@ -0,0 +1,7 @@ +package cz.it4i.fiji.haas_java_client; + +import java.io.Closeable; + +public interface TunnelToNode extends Closeable{ + int getLocalPort(); +} diff --git a/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestCommunicationWithNodes.java b/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestCommunicationWithNodes.java index 6008a4a3..b66b2058 100644 --- a/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestCommunicationWithNodes.java +++ b/haas-java-client/src/test/java/cz/it4i/fiji/haas_java_client/TestCommunicationWithNodes.java @@ -1,10 +1,7 @@ package cz.it4i.fiji.haas_java_client; import java.io.IOException; -import java.util.Collections; import java.util.Scanner; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import javax.xml.rpc.ServiceException; @@ -17,6 +14,7 @@ public class TestCommunicationWithNodes { private static String[] predefined = new String[2]; + @SuppressWarnings("resource") public static void main(String[] args) throws ServiceException, IOException, InterruptedException { predefined[0] = "POST /modules/'command:net.imagej.ops.math.PrimitiveMath$IntegerAdd'?process=false HTTP/1.1\r\n" + "Content-Type: application/json\r\n" + @@ -46,26 +44,15 @@ public class TestCommunicationWithNodes { log.info("" + client.obtainJobInfo(id).getState()); Thread.sleep(5000); } - - log.info("adresess " + client.getNodesIps(id)); - ExecutorService service = Executors.newCachedThreadPool(); - try(MidlewareTunnel tunnel = new MidlewareTunnel(service, id, client.getNodesIps(id).get(0), sessionID)) { - tunnel.open(8080, 8080); + String ip; + log.info("adresess " + (ip = client.obtainJobInfo(id).getNodesIPs().get(0))); + try(TunnelToNode tunnel = client.openTunnel( id, ip, 8080, 8080)) { log.info("localhost:" + tunnel.getLocalPort()); - Scanner sc = new Scanner(System.in); System.out.println("Press enter"); - sc.nextLine(); + new Scanner(System.in).nextLine(); } } - private static void logData(String direction, byte[] received) { - log.info(direction + " - " + new String(received)); - } - - private static long startJob(HaaSClient client) { - long id = client.createJob("Proof", 1, Collections.emptyList()); - client.submitJob(id); - return id; - } + } -- GitLab