Skip to content
Snippets Groups Projects
Commit a80dbff2 authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

feature: integrate tunnel with HaaSClient

parent 26883aa3
Branches
Tags
No related merge requests found
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>cz.it4i.fiji</groupId> <groupId>cz.it4i.fiji</groupId>
<artifactId>haas-java-client</artifactId> <artifactId>haas-java-client</artifactId>
<version>0.0.3-SNAPSHOT</version> <version>0.0.4-SNAPSHOT</version>
<name>HaaS library for Java</name> <name>HaaS library for Java</name>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
......
...@@ -16,6 +16,7 @@ import java.util.List; ...@@ -16,6 +16,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
...@@ -233,6 +234,21 @@ public class HaaSClient { ...@@ -233,6 +234,21 @@ public class HaaSClient {
public HaaSFileTransfer startFileTransfer(long jobId) { public HaaSFileTransfer startFileTransfer(long jobId) {
return startFileTransfer(jobId, DUMMY_TRANSFER_FILE_PROGRESS); return startFileTransfer(jobId, DUMMY_TRANSFER_FILE_PROGRESS);
} }
public TunnelToNode openTunnel(long jobId, String nodeIP, int localPort, int remotePort) throws ServiceException, IOException {
MidlewareTunnel tunnel = new MidlewareTunnel(Executors.newCachedThreadPool(), jobId, nodeIP, getSessionID());
tunnel.open(localPort, remotePort);
return new TunnelToNode() {
@Override
public void close() throws IOException {
tunnel.close();
}
@Override
public int getLocalPort() {
return tunnel.getLocalPort();
}
};
}
public void submitJob(long jobId) { public void submitJob(long jobId) {
try { try {
...@@ -245,9 +261,11 @@ public class HaaSClient { ...@@ -245,9 +261,11 @@ public class HaaSClient {
public JobInfo obtainJobInfo(long jobId) { public JobInfo obtainJobInfo(long jobId) {
try { try {
final SubmittedJobInfoExt info = getJobManagement().getCurrentInfoForJob(jobId, getSessionID()); final SubmittedJobInfoExt info = getJobManagement().getCurrentInfoForJob(jobId, getSessionID());
final Collection<Long> tasksId = info.getTasks().getSubmittedTaskInfoExt().stream().map(ti -> ti.getId()) final Collection<Long> tasksId = info.getTasks().getSubmittedTaskInfoExt().stream().map(ti -> ti.getId())
.collect(Collectors.toList()); .collect(Collectors.toList());
return new JobInfo() { return new JobInfo() {
private List<String> ips;
@Override @Override
public Collection<Long> getTasks() { public Collection<Long> getTasks() {
return tasksId; return tasksId;
...@@ -272,6 +290,18 @@ public class HaaSClient { ...@@ -272,6 +290,18 @@ public class HaaSClient {
public Calendar getCreationTime() { public Calendar getCreationTime() {
return toGregorian(info.getCreationTime()); return toGregorian(info.getCreationTime());
}; };
@Override
public List<String> getNodesIPs() {
if(ips == null) {
try {
ips = getJobManagement().getAllocatedNodesIPs(jobId, getSessionID()).getString().stream().collect(Collectors.toList());
} catch (RemoteException | ServiceException e) {
log.error(e.getMessage(), e);
}
}
return ips;
}
}; };
} catch (RemoteException | ServiceException e) { } catch (RemoteException | ServiceException e) {
throw new HaaSClientException(e); throw new HaaSClientException(e);
...@@ -552,9 +582,4 @@ public class HaaSClient { ...@@ -552,9 +582,4 @@ public class HaaSClient {
return result; return result;
} }
public List<String> getNodesIps(long id) throws RemoteException, ServiceException {
return getJobManagement().getAllocatedNodesIPs(id, getSessionID()).getString().stream().collect(Collectors.toList());
}
} }
...@@ -2,6 +2,7 @@ package cz.it4i.fiji.haas_java_client; ...@@ -2,6 +2,7 @@ package cz.it4i.fiji.haas_java_client;
import java.util.Calendar; import java.util.Calendar;
import java.util.Collection; import java.util.Collection;
import java.util.List;
public interface JobInfo { public interface JobInfo {
...@@ -15,4 +16,6 @@ public interface JobInfo { ...@@ -15,4 +16,6 @@ public interface JobInfo {
Calendar getEndTime(); Calendar getEndTime();
Calendar getCreationTime(); Calendar getCreationTime();
List<String> getNodesIPs();
} }
...@@ -26,7 +26,7 @@ import cz.it4i.fiji.haas_java_client.proxy.DataTransferMethodExt; ...@@ -26,7 +26,7 @@ import cz.it4i.fiji.haas_java_client.proxy.DataTransferMethodExt;
import cz.it4i.fiji.haas_java_client.proxy.DataTransferWs; import cz.it4i.fiji.haas_java_client.proxy.DataTransferWs;
import cz.it4i.fiji.haas_java_client.proxy.DataTransferWsSoap; import cz.it4i.fiji.haas_java_client.proxy.DataTransferWsSoap;
public class MidlewareTunnel implements Closeable { class MidlewareTunnel implements Closeable {
public static final Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas_java_client.MidlewareTunnel.class); public static final Logger log = LoggerFactory.getLogger(cz.it4i.fiji.haas_java_client.MidlewareTunnel.class);
...@@ -64,7 +64,7 @@ public class MidlewareTunnel implements Closeable { ...@@ -64,7 +64,7 @@ public class MidlewareTunnel implements Closeable {
open(0, port); open(0, port);
} }
public void open(int localport, int port) throws UnknownHostException, IOException { public void open(int localport, int port) throws IOException {
open(localport, port, DEFAULT_BACKLOG); open(localport, port, DEFAULT_BACKLOG);
} }
...@@ -72,7 +72,7 @@ public class MidlewareTunnel implements Closeable { ...@@ -72,7 +72,7 @@ public class MidlewareTunnel implements Closeable {
if (ss != null) { if (ss != null) {
throw new IllegalStateException(); throw new IllegalStateException();
} }
ss = new ServerSocket(localport, backlog, InetAddress.getByName("localhost")); ss = new ServerSocket(localport, backlog, InetAddress.getLocalHost());
ss.setSoTimeout(TIMEOUT); ss.setSoTimeout(TIMEOUT);
mainLatch = new CountDownLatch(1); mainLatch = new CountDownLatch(1);
mainFuture = executorService.submit(() -> { mainFuture = executorService.submit(() -> {
......
package cz.it4i.fiji.haas_java_client;
import java.io.Closeable;
public interface TunnelToNode extends Closeable{
int getLocalPort();
}
package cz.it4i.fiji.haas_java_client; package cz.it4i.fiji.haas_java_client;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.Scanner; import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.xml.rpc.ServiceException; import javax.xml.rpc.ServiceException;
...@@ -17,6 +14,7 @@ public class TestCommunicationWithNodes { ...@@ -17,6 +14,7 @@ public class TestCommunicationWithNodes {
private static String[] predefined = new String[2]; private static String[] predefined = new String[2];
@SuppressWarnings("resource")
public static void main(String[] args) throws ServiceException, IOException, InterruptedException { public static void main(String[] args) throws ServiceException, IOException, InterruptedException {
predefined[0] = "POST /modules/'command:net.imagej.ops.math.PrimitiveMath$IntegerAdd'?process=false HTTP/1.1\r\n" + predefined[0] = "POST /modules/'command:net.imagej.ops.math.PrimitiveMath$IntegerAdd'?process=false HTTP/1.1\r\n" +
"Content-Type: application/json\r\n" + "Content-Type: application/json\r\n" +
...@@ -46,26 +44,15 @@ public class TestCommunicationWithNodes { ...@@ -46,26 +44,15 @@ public class TestCommunicationWithNodes {
log.info("" + client.obtainJobInfo(id).getState()); log.info("" + client.obtainJobInfo(id).getState());
Thread.sleep(5000); Thread.sleep(5000);
} }
String ip;
log.info("adresess " + client.getNodesIps(id)); log.info("adresess " + (ip = client.obtainJobInfo(id).getNodesIPs().get(0)));
ExecutorService service = Executors.newCachedThreadPool(); try(TunnelToNode tunnel = client.openTunnel( id, ip, 8080, 8080)) {
try(MidlewareTunnel tunnel = new MidlewareTunnel(service, id, client.getNodesIps(id).get(0), sessionID)) {
tunnel.open(8080, 8080);
log.info("localhost:" + tunnel.getLocalPort()); log.info("localhost:" + tunnel.getLocalPort());
Scanner sc = new Scanner(System.in);
System.out.println("Press enter"); System.out.println("Press enter");
sc.nextLine(); new Scanner(System.in).nextLine();
} }
} }
private static void logData(String direction, byte[] received) {
log.info(direction + " - " + new String(received));
}
private static long startJob(HaaSClient client) {
long id = client.createJob("Proof", 1, Collections.emptyList());
client.submitJob(id);
return id;
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment