Skip to content
Snippets Groups Projects
Commit 76dc5c57 authored by Jan Kožusznik's avatar Jan Kožusznik
Browse files

fix: send EOF to server, increase buffer size

parent f03c9131
Branches
No related tags found
No related merge requests found
......@@ -38,6 +38,8 @@ class MidlewareTunnel implements Closeable {
private static final long ZERO_COUNT_PAUSE = 500;
private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024 * 32;
private final long jobId;
private ServerSocket ss;
......@@ -54,6 +56,10 @@ class MidlewareTunnel implements Closeable {
private final ExecutorService executorService;
private final int sendBufferData = DEFAULT_BUFFER_SIZE;
private P_Connection lastConnection;
public MidlewareTunnel(ExecutorService executorService, long jobId, String hostIp, String sessionCode) {
this.jobId = jobId;
this.dataTransfer = new DataTransferWs().getDataTransferWsSoap12();
......@@ -83,7 +89,16 @@ class MidlewareTunnel implements Closeable {
try {
while (!Thread.interrupted() && !ss.isClosed()) {
try (Socket soc = ss.accept()) {
DataTransferMethodExt transfer = dataTransfer.getDataTransferMethod(ipAddress, port, jobId,
sessionCode);
doTransfer(soc, port);
if (log.isDebugEnabled()) {
log.debug("endDataTransfer");
}
dataTransfer.endDataTransfer(transfer, sessionCode);
if (log.isDebugEnabled()) {
log.debug("endDataTransfer - DONE");
}
} catch (SocketTimeoutException e) {
// ignore and check interruption
} catch (IOException e) {
......@@ -95,6 +110,7 @@ class MidlewareTunnel implements Closeable {
if (log.isDebugEnabled()) {
log.debug("MiddlewareTunnel - interrupted");
}
mainLatch.countDown();
}
});
......@@ -126,20 +142,15 @@ class MidlewareTunnel implements Closeable {
private void doTransfer(Socket soc, int port) {
log.debug("START: doTransfer");
DataTransferMethodExt transfer = dataTransfer.getDataTransferMethod(ipAddress, port, jobId, sessionCode);
P_Connection connection = new P_Connection(soc);
connection.setClientHandler(c -> sendToMiddleware(c));
connection.setServerHandler(c -> readFromMiddleware(c));
connection.establish();
if (log.isDebugEnabled()) {
log.debug("END: doTransfer");
}
if (log.isDebugEnabled()) {
log.debug("endDataTransfer");
if(lastConnection != null) {
lastConnection.finishIfNeeded();
}
dataTransfer.endDataTransfer(transfer, sessionCode);
lastConnection = new P_Connection(soc);
lastConnection.setClientHandler(c -> sendToMiddleware(c));
lastConnection.setServerHandler(c -> readFromMiddleware(c));
lastConnection.establish();
if (log.isDebugEnabled()) {
log.debug("endDataTransfer - DONE");
log.debug("END: doTransfer");
}
}
......@@ -150,15 +161,13 @@ class MidlewareTunnel implements Closeable {
try {
InputStream is = Channels.newInputStream(Channels.newChannel(connection.getSocket().getInputStream()));
int len;
byte[] buffer = new byte[4096];
while (!connection.isServerClosed() && -1 != (len = is.read(buffer))) {
if (connection.isServerClosed()) {
break;
}
byte[] buffer = new byte[sendBufferData];
while (-1 != (len = is.read(buffer))) {
if (len == 0) {
continue;
}
if (!sendToMiddleware(buffer, len, connection)) {
if (!sendToMiddleware(buffer, len)) {
break;
}
if (log.isDebugEnabled()) {
......@@ -167,8 +176,7 @@ class MidlewareTunnel implements Closeable {
}
}
connection.clientClosed();
sendEOF2Middleware();
} catch (InterruptedIOException e) {
return;
} catch (SocketException e) {
......@@ -184,12 +192,16 @@ class MidlewareTunnel implements Closeable {
}
}
private boolean sendToMiddleware(byte[] buffer, int len, P_Connection connection) {
private void sendEOF2Middleware() {
dataTransfer.sendDataToJobNode(null, jobId, ipAddress, sessionCode);
}
private boolean sendToMiddleware(byte[] buffer, int len) {
byte[] sending;
int toSend = len;
int offset = 0;
int zeroCounter = 0;
do {
while (toSend != 0) {
if (toSend != buffer.length || offset != 0) {
sending = new byte[toSend];
System.arraycopy(buffer, offset, sending, 0, toSend);
......@@ -219,7 +231,8 @@ class MidlewareTunnel implements Closeable {
} else {
zeroCounter = 0;
}
} while (toSend != 0 && !connection.isServerClosed());
}
;
return true;
}
......@@ -227,15 +240,11 @@ class MidlewareTunnel implements Closeable {
if (log.isDebugEnabled()) {
log.debug("START: readFromMiddleware");
}
try {
OutputStream os = connection.getSocket().getOutputStream();
try (OutputStream os = connection.getSocket().getOutputStream()) {
byte[] received = null;
int zeroCounter = 0;
while (!connection.isClientClosed()
&& null != (received = dataTransfer.readDataFromJobNode(jobId, ipAddress, sessionCode))) {
if (connection.isClientClosed()) {
break;
}
while (null != (received = dataTransfer.readDataFromJobNode(jobId, ipAddress, sessionCode))) {
if (received.length > 0) {
os.write(received);
os.flush();
......@@ -262,8 +271,6 @@ class MidlewareTunnel implements Closeable {
}
}
}
os.flush();
connection.serverClosed();
} catch (InterruptedIOException e) {
return;
} catch (IOException e) {
......@@ -284,6 +291,10 @@ class MidlewareTunnel implements Closeable {
private final Future<?>[] futures = new Future[2];
private final CountDownLatch latchFromClient = new CountDownLatch(1);
private final CountDownLatch latchOfBothDirections = new CountDownLatch(2);
public P_Connection(Socket soc) {
this.socket = soc;
}
......@@ -300,41 +311,32 @@ class MidlewareTunnel implements Closeable {
return socket;
}
public void clientClosed() {
setClosed(FROM_CLIENT);
}
public void serverClosed() {
setClosed(FROM_SERVER);
}
public boolean isClientClosed() {
return isClosed(FROM_CLIENT);
}
public boolean isServerClosed() {
return isClosed(FROM_SERVER);
}
public void establish() {
CountDownLatch localLatch = new CountDownLatch(2);
for (int i = 0; i < runnable.length; i++) {
int final_i = i;
futures[i] = executorService.submit(() -> {
runnable[final_i].run();
localLatch.countDown();
if(final_i == FROM_CLIENT) {
latchFromClient.countDown();
}
latchOfBothDirections.countDown();
return null;
});
}
try {
localLatch.await();
latchFromClient.await();
} catch (InterruptedException e) {
stop(localLatch);
stop(latchOfBothDirections);
Thread.currentThread().interrupt();
}
}
public void finishIfNeeded() {
stop(latchOfBothDirections);
}
private void stop(CountDownLatch localLatch) {
for (Future<?> thread : futures) {
thread.cancel(true);
......@@ -346,12 +348,6 @@ class MidlewareTunnel implements Closeable {
}
}
private boolean isClosed(int type) {
return Thread.interrupted();
}
private void setClosed(int type) {
futures[(type + 1) % 2].cancel(true);
}
}
}
\ No newline at end of file
......@@ -33,7 +33,7 @@ public class TestCommunicationWithNodes {
Settings settings = SettingsProvider.getSettings(6l, 3600, 7l, "OPEN-12-20",
TestingConstants.CONFIGURATION_FILE_NAME);
HaaSClient client = new HaaSClient(settings);
long id = 362;//startJob(client);
long id = 376;//client.createJob("New job", Collections.emptyList());
String sessionID = client.getSessionID();
log.info(id + " - " + client.obtainJobInfo(id).getState() + " - " + sessionID);
if(client.obtainJobInfo(id).getState() != JobState.Running && client.obtainJobInfo(id).getState() != JobState.Queued) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment