diff --git a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/MiddlewareTunnel.java b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/MiddlewareTunnel.java index a4a8fa1f01aad0a258b91ee2c59293ea97492c00..ae5add90329131dd8275e4ecdf55a435e977276d 100644 --- a/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/MiddlewareTunnel.java +++ b/haas-java-client/src/main/java/cz/it4i/fiji/haas_java_client/MiddlewareTunnel.java @@ -203,8 +203,13 @@ class MiddlewareTunnel implements Closeable { if (len == 0) { continue; } - if (!sendToMiddleware(buffer, len)) { - break; + try { + if (!sendToMiddleware(buffer, len)) { + break; + } + } + finally { + connection.dataSentNotify(); } if (log.isDebugEnabled()) { log.debug("send " + len + " bytes to middleware"); @@ -264,6 +269,7 @@ class MiddlewareTunnel implements Closeable { } final int reallySend = dataTransfer.writeDataToJobNode(sending, jobId, ipAddress, sessionCode, false); + if (reallySend == -1) { return false; } @@ -301,6 +307,12 @@ class MiddlewareTunnel implements Closeable { } private void readFromMiddleware(final P_Connection connection) { + try { + connection.waitForFirstDataSent(); + } + catch (InterruptedException exc1) { + return; + } if (log.isDebugEnabled()) { log.debug("START: readFromMiddleware"); } @@ -350,12 +362,12 @@ class MiddlewareTunnel implements Closeable { } } catch (final InterruptedIOException e) { - //ignore this + // ignore this } - catch(SocketException e) { - if(!e.getMessage().equals("Broken pipe (Write failed)")) { + catch (SocketException e) { + if (!e.getMessage().equals("Broken pipe (Write failed)")) { log.error(e.getMessage(), e); - } + } } catch (final IOException e) { log.error(e.getMessage(), e); @@ -384,6 +396,8 @@ class MiddlewareTunnel implements Closeable { private final CountDownLatch latchOfBothDirections = new CountDownLatch(2); + private final CountDownLatch dataSentFlag = new CountDownLatch(1); + public P_Connection(final Socket soc) { this.socket = soc; } @@ -436,6 +450,16 @@ class MiddlewareTunnel implements Closeable { stop(latchOfBothDirections); } + public void dataSentNotify() { + if (dataSentFlag.getCount() > 0) { + dataSentFlag.countDown(); + } + } + + public void waitForFirstDataSent() throws InterruptedException { + dataSentFlag.await(); + } + private void stop(final CountDownLatch localLatch) { for (final Thread thread : threads) { if (thread != null) {