Skip to content
Snippets Groups Projects
Commit 9a2dec80 authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

feature: write to middleware before read

parent f94a49ec
Branches
Tags
No related merge requests found
...@@ -203,8 +203,13 @@ class MiddlewareTunnel implements Closeable { ...@@ -203,8 +203,13 @@ class MiddlewareTunnel implements Closeable {
if (len == 0) { if (len == 0) {
continue; continue;
} }
if (!sendToMiddleware(buffer, len)) { try {
break; if (!sendToMiddleware(buffer, len)) {
break;
}
}
finally {
connection.dataSentNotify();
} }
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("send " + len + " bytes to middleware"); log.debug("send " + len + " bytes to middleware");
...@@ -264,6 +269,7 @@ class MiddlewareTunnel implements Closeable { ...@@ -264,6 +269,7 @@ class MiddlewareTunnel implements Closeable {
} }
final int reallySend = dataTransfer.writeDataToJobNode(sending, jobId, final int reallySend = dataTransfer.writeDataToJobNode(sending, jobId,
ipAddress, sessionCode, false); ipAddress, sessionCode, false);
if (reallySend == -1) { if (reallySend == -1) {
return false; return false;
} }
...@@ -301,6 +307,12 @@ class MiddlewareTunnel implements Closeable { ...@@ -301,6 +307,12 @@ class MiddlewareTunnel implements Closeable {
} }
private void readFromMiddleware(final P_Connection connection) { private void readFromMiddleware(final P_Connection connection) {
try {
connection.waitForFirstDataSent();
}
catch (InterruptedException exc1) {
return;
}
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("START: readFromMiddleware"); log.debug("START: readFromMiddleware");
} }
...@@ -350,12 +362,12 @@ class MiddlewareTunnel implements Closeable { ...@@ -350,12 +362,12 @@ class MiddlewareTunnel implements Closeable {
} }
} }
catch (final InterruptedIOException e) { catch (final InterruptedIOException e) {
//ignore this // ignore this
} }
catch(SocketException e) { catch (SocketException e) {
if(!e.getMessage().equals("Broken pipe (Write failed)")) { if (!e.getMessage().equals("Broken pipe (Write failed)")) {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
} }
} }
catch (final IOException e) { catch (final IOException e) {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
...@@ -384,6 +396,8 @@ class MiddlewareTunnel implements Closeable { ...@@ -384,6 +396,8 @@ class MiddlewareTunnel implements Closeable {
private final CountDownLatch latchOfBothDirections = new CountDownLatch(2); private final CountDownLatch latchOfBothDirections = new CountDownLatch(2);
private final CountDownLatch dataSentFlag = new CountDownLatch(1);
public P_Connection(final Socket soc) { public P_Connection(final Socket soc) {
this.socket = soc; this.socket = soc;
} }
...@@ -436,6 +450,16 @@ class MiddlewareTunnel implements Closeable { ...@@ -436,6 +450,16 @@ class MiddlewareTunnel implements Closeable {
stop(latchOfBothDirections); stop(latchOfBothDirections);
} }
public void dataSentNotify() {
if (dataSentFlag.getCount() > 0) {
dataSentFlag.countDown();
}
}
public void waitForFirstDataSent() throws InterruptedException {
dataSentFlag.await();
}
private void stop(final CountDownLatch localLatch) { private void stop(final CountDownLatch localLatch) {
for (final Thread thread : threads) { for (final Thread thread : threads) {
if (thread != null) { if (thread != null) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment