Commit 727fab1f authored by Vladimír Ulman's avatar Vladimír Ulman
Browse files

S WIP 2/3: ADD aux/NetMessagesProcessor with code DEL from cmdFromNetwork,

           CHG in cmdFromNetwork and fromFlightRecorder to acknoledge the new
           code, also COS in reporting and exceptions handling
parent a01a7e8a
......@@ -5,31 +5,49 @@ import java.nio.file.Paths;
import java.io.IOException;
import java.util.ListIterator;
import de.mpicbg.ulman.simviewer.aux.NetMessagesProcessor;
/**
* Operates on a given file that consists of messages that could be normally
* transferred over the network but were instead saved into this file.
* Owing to this class, the messages can be extracted from the file, and
* can be injected to the NetworkScene as if the messages would come over
* the network. This way, a simulation can be replayed (even backwards!)
* be processed with the NetMessagesProcessor to update the SimViewer's
* displayed content. This way, a simulation can be replayed (even backwards!)
* provided the file's content shows a history of some simulation -- in
* which case the individual time points are separated with the "tick"
* messages.
*
* The class's API is synchronized on this object so multiple callers may
* operate (request to replay previous or next time point) on this object.
*
* Most of the API here may throw InterruptedException which is because all
* the code here boils down to calling NetMessagesProcessor's processMsg(),
* which may want to wait a little while under some circumstances and may
* get interrupted while doing so -- this gets propagated upstream.
*
* This file was created and is being developed by Vladimir Ulman, 2019.
*/
public class FlightRecorder
public class CommandFromFlightRecorder
{
/** constructor to create connection to a displayed window */
public FlightRecorder(final NetworkScene ns)
/** constructor to link to a shared NetMessagesProcessor
(that connects this 'commander' to the displayed window */
public
CommandFromFlightRecorder(final NetMessagesProcessor nmp)
{
//keep the reference
netCommandsProcessor = ns;
netMsgProcessor = nmp;
}
/** reference on the messages processor */
private
final NetMessagesProcessor netMsgProcessor;
//--------------------------------------------
/** setup the reading of the FRfilename (FR = Flight Recording) */
public synchronized
void open(final String FRfilename)
throws IOException
throws IOException, InterruptedException
{
nextMsgs = null;
//NB: stays null if the consequent operations should fail
......@@ -38,10 +56,6 @@ public class FlightRecorder
sendNextTimepointMessages();
}
/** reference on the processor of the network (display) commands */
private
final NetworkScene netCommandsProcessor;
/** should always point just before the first message of a time point,
this time point is termed as the "next time point"; that often
means that this iterator should point just after some "tick" message */
......@@ -51,10 +65,11 @@ public class FlightRecorder
//--------------------------------------------
/** extracts the messages from the current timepoint up to the next one,
and sends the messages to the associated NetworkScene; returns true
and sends the messages to the associated NetMessagesProcessor; returns true
if the operation was successful */
public synchronized
boolean sendNextTimepointMessages()
throws InterruptedException
{
if (nextMsgs == null) return false; //stop if the no file is opened
......@@ -63,7 +78,7 @@ public class FlightRecorder
while (nextMsgs.hasNext() && readNextMsg)
{
final String msg = nextMsgs.next();
netCommandsProcessor.processMsg(msg);
netMsgProcessor.processMsg(msg);
if (msg.startsWith("v1 tick")) readNextMsg = false;
}
......@@ -73,9 +88,10 @@ public class FlightRecorder
/** extracts the messages from the previous timepoint (that is the time point
that is just before this one, which was just replayed), and sends the messages
to the associated NetworkScene; returns true if the operation was successful */
to the associated NetMessagesProcessor; returns true if the operation was successful */
public synchronized
boolean sendPrevTimepointMessages()
throws InterruptedException
{
if (nextMsgs == null) return false; //stop if the no file is opened
......@@ -98,6 +114,7 @@ public class FlightRecorder
returns true if the operation was successful */
public synchronized
boolean rewindAndSendFirstTimepoint()
throws InterruptedException
{
if (nextMsgs == null) return false; //stop if the no file is opened
......@@ -111,6 +128,7 @@ public class FlightRecorder
returns true if the operation was successful */
public synchronized
boolean rewindAndSendLastTimepoint()
throws InterruptedException
{
if (nextMsgs == null) return false; //stop if the no file is opened
......
package de.mpicbg.ulman.simviewer;
import java.util.Locale;
import java.util.Scanner;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
import de.mpicbg.ulman.simviewer.aux.Point;
import de.mpicbg.ulman.simviewer.aux.Line;
import de.mpicbg.ulman.simviewer.aux.Vector;
import de.mpicbg.ulman.simviewer.aux.NetMessagesProcessor;
/**
* Adapted from TexturedCubeJavaExample.java from the scenery project,
* originally created by kharrington on 7/6/16.
* Operates on a network socket and listens for incoming messages.
* Owing to this class, the messages can be extracted from the network, and
* be processed with the NetMessagesProcessor to update the SimViewer's
* displayed content. This way, an online view on the current state of
* a running simulation can be observed provided the simulator is "broadcasting"
* the relevant messages (including the "tick" messages to denote between
* individual simulated time points).
*
* This file was created and is being developed by Vladimir Ulman, 2018.
*/
public class NetworkScene implements Runnable
public class CommandFromNetwork implements Runnable
{
final int listenOnPort;
/** constructor to create connection (listening at the 8765 port) to a displayed window */
public NetworkScene(final DisplayScene _scene)
/** constructor to create connection (listening at the 8765 port),
and link it to a shared NetMessagesProcessor (that connects this
'commander' to the displayed window */
public CommandFromNetwork(final NetMessagesProcessor nmp)
{
scene = _scene;
netMsgProcessor = nmp;
listenOnPort = 8765;
}
/** constructor to create connection (listening at the given port) to a displayed window */
public NetworkScene(final DisplayScene _scene, final int _port)
/** constructor to create connection (listening at the given port),
and link it to a shared NetMessagesProcessor (that connects this
'commander' to the displayed window */
public CommandFromNetwork(final NetMessagesProcessor nmp, final int _port)
{
scene = _scene;
netMsgProcessor = nmp;
listenOnPort = _port;
}
/** reference on the controlled rendering display */
private final DisplayScene scene;
/** reference on the messages processor */
private
final NetMessagesProcessor netMsgProcessor;
/** the port to listen at */
private
final int listenOnPort;
/** reads the console and dispatches the commands */
//--------------------------------------------
/** listens on the network and dispatches the commands */
public void run()
{
//start receiver in an infinite loop
......@@ -49,7 +57,6 @@ public class NetworkScene implements Runnable
final ZMQ.Context zmqContext = ZMQ.context(1);
ZMQ.Socket socket = null;
try {
//socket = zmqContext.socket(ZMQ.PAIR); //NB: CLIENT/SERVER from v4.2 is not available yet
socket = zmqContext.socket(SocketType.PAIR);
if (socket == null)
throw new Exception("Network listener: Cannot obtain local socket.");
......@@ -59,13 +66,13 @@ public class NetworkScene implements Runnable
socket.bind("tcp://*:"+listenOnPort);
//the incoming data buffer
String msg = null;
String msg;
while (true)
{
msg = socket.recvStr(ZMQ.NOBLOCK);
if (msg != null)
processMsg(msg);
netMsgProcessor.processMsg(msg);
else
Thread.sleep(1000);
}
......@@ -74,7 +81,7 @@ public class NetworkScene implements Runnable
System.out.println("Network listener: Crashed with ZeroMQ error: " + e.getMessage());
}
catch (InterruptedException e) {
//System.out.println("Network listener: Stopped.");
System.out.println("Network listener: Interrupted.");
}
catch (Exception e) {
System.out.println("Network listener: Error: " + e.getMessage());
......@@ -92,224 +99,4 @@ public class NetworkScene implements Runnable
System.out.println("Network listener: Stopped.");
}
}
//not private, so anyone around can use it (esp. the FlightRecorder)
void processMsg(final String msg)
{
try {
if (msg.startsWith("v1 points")) processPoints(msg);
else
if (msg.startsWith("v1 lines")) processLines(msg);
else
if (msg.startsWith("v1 vectors")) processVectors(msg);
else
if (msg.startsWith("v1 triangles")) processTriangles(msg);
else
if (msg.startsWith("v1 tick")) processTickMessage(msg.substring(8));
else
System.out.println("Don't understand this msg: "+msg);
}
catch (java.util.InputMismatchException e) {
System.out.println("Parsing error: " + e.getMessage());
}
}
private
void processPoints(final String msg)
{
Scanner s = new Scanner(msg).useLocale(Locale.ENGLISH);
//System.out.println("processing point msg: "+msg);
//this skips the "v1 points" - the two tokens
s.next();
s.next();
final int N = s.nextInt();
if (N > 10) scene.suspendNodesUpdating();
//is the next token 'dim'?
if (s.next("dim").startsWith("dim") == false)
{
System.out.println("Don't understand this msg: "+msg);
s.close();
return;
}
//so the next token is dimensionality of the points
final int D = s.nextInt();
//now, point by point is reported
final Point p = new Point();
int ID = 0;
for (int n=0; n < N; ++n)
{
//extract the point ID
ID = s.nextInt();
//now read and save coordinates
int d=0;
for (; d < D && d < 3; ++d) p.centre.set(d, s.nextFloat());
//read possibly remaining coordinates (for which we have no room to store them)
for (; d < D; ++d) s.nextFloat();
//NB: all points in the same message (in this function call) are of the same dimensionality
p.radius.set(0, s.nextFloat());
p.radius.set(1, p.radius.x());
p.radius.set(2, p.radius.x());
p.color = s.nextInt();
scene.addUpdateOrRemovePoint(ID,p);
}
s.close();
if (N > 10) scene.resumeNodesUpdating();
}
private
void processLines(final String msg)
{
Scanner s = new Scanner(msg).useLocale(Locale.ENGLISH);
//System.out.println("processing point msg: "+msg);
//this skips the "v1 lines" - the two tokens
s.next();
s.next();
final int N = s.nextInt();
if (N > 10) scene.suspendNodesUpdating();
//is the next token 'dim'?
if (s.next("dim").startsWith("dim") == false)
{
System.out.println("Don't understand this msg: "+msg);
s.close();
return;
}
//so the next token is dimensionality of the points
final int D = s.nextInt();
//now, point pair by pair is reported
final Line l = new Line();
int ID = 0;
for (int n=0; n < N; ++n)
{
//extract the point ID
ID = s.nextInt();
//now read the first in the pair and save coordinates
int d=0;
for (; d < D && d < 3; ++d) l.posA.set(d, s.nextFloat());
//read possibly remaining coordinates (for which we have no room to store them)
for (; d < D; ++d) s.nextFloat();
//now read the second in the pair and save sizes
d=0;
for (; d < D && d < 3; ++d) l.posB.set(d, s.nextFloat());
//read possibly remaining coordinates (for which we have no room to store them)
for (; d < D; ++d) s.nextFloat();
l.color = s.nextInt();
scene.addUpdateOrRemoveLine(ID,l);
}
s.close();
if (N > 10) scene.resumeNodesUpdating();
}
private
void processVectors(final String msg)
{
Scanner s = new Scanner(msg).useLocale(Locale.ENGLISH);
//System.out.println("processing point msg: "+msg);
//this skips the "v1 vectors" - the two tokens
s.next();
s.next();
final int N = s.nextInt();
if (N > 10) scene.suspendNodesUpdating();
//is the next token 'dim'?
if (s.next("dim").startsWith("dim") == false)
{
System.out.println("Don't understand this msg: "+msg);
s.close();
return;
}
//so the next token is dimensionality of the points
final int D = s.nextInt();
//now, point pair by pair is reported
final Vector v = new Vector();
int ID = 0;
for (int n=0; n < N; ++n)
{
//extract the point ID
ID = s.nextInt();
//now read the first in the pair and save coordinates
int d=0;
for (; d < D && d < 3; ++d) v.base.set(d, s.nextFloat());
//read possibly remaining coordinates (for which we have no room to store them)
for (; d < D; ++d) s.nextFloat();
//now read the second in the pair and save sizes
d=0;
for (; d < D && d < 3; ++d) v.vector.set(d, s.nextFloat());
//read possibly remaining coordinates (for which we have no room to store them)
for (; d < D; ++d) s.nextFloat();
v.color = s.nextInt();
scene.addUpdateOrRemoveVector(ID,v);
}
s.close();
if (N > 10) scene.resumeNodesUpdating();
}
private
void processTriangles(final String msg)
{
System.out.println("not implemented yet: "+msg);
}
/** this is a general (free format) message, which is assumed
to be sent typically after one simulation round is over */
private
void processTickMessage(final String msg)
{
System.out.println("Got tick message: "+msg);
//check if we should save the screen
if (scene.savingScreenshots)
{
//give scenery some grace time to redraw everything
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("But continuing with the processing....");
}
scene.saveNextScreenshot();
}
if (scene.garbageCollecting) scene.garbageCollect();
scene.increaseTickCounter();
}
}
package de.mpicbg.ulman.simviewer.aux;
import java.util.Locale;
import java.util.Scanner;
import de.mpicbg.ulman.simviewer.DisplayScene;
/**
* A class to parse the messages according the "network-protocol" and
* to command the SimViewer consequently. The "network-protocol" consists
* of commands to draw, update, or delete primitive graphics such as points
* or lines. The protocol is the best defined in the EmbryoGen simulator's
* code, see the file DisplayUnits/SceneryDisplayUnit.cpp in there. The protocol
* is utilized, e.g., in the CommandFromNetwork and CommandFromFlightRecorder
* classes.
*
* Since the DisplayScene's API for updating the displayed graphics was not
* designed for concurrent access, the callers have to synchronize explicitly
* among themselves. This is why, the methods of this class are synchronized
* on this object.
*
* This file was created and is being developed by Vladimir Ulman, 2019.
*/
public class NetMessagesProcessor
{
/** constructor to store the connection to a displayed window
that shall be commanded from the incoming messages */
public NetMessagesProcessor(final DisplayScene _scene)
{
scene = _scene;
}
/** the entry function to process the incoming message; since the "tick" message
may trigger a short waiting before a screen shot of the commanding window is
requested (see the code of the processTickMessage()) and the waiting can be
interrupted, this method may throw an InterruptedException */
public synchronized
void processMsg(final String msg)
throws InterruptedException
{
try {
if (msg.startsWith("v1 points")) processPoints(msg);
else
if (msg.startsWith("v1 lines")) processLines(msg);
else
if (msg.startsWith("v1 vectors")) processVectors(msg);
else
if (msg.startsWith("v1 triangles")) processTriangles(msg);
else
if (msg.startsWith("v1 tick")) processTickMessage(msg.substring(8));
else
System.out.println("NetMessagesProcessor: Don't understand this msg: "+msg);
}
catch (java.util.InputMismatchException e) {
System.out.println("NetMessagesProcessor: Parsing error: " + e.getMessage());
}
}
//----------------------------------------------------------------------------
/** reference on the controlled rendering display */
private final DisplayScene scene;
private
void processPoints(final String msg)
{
Scanner s = new Scanner(msg).useLocale(Locale.ENGLISH);
//System.out.println("processing point msg: "+msg);
//this skips the "v1 points" - the two tokens
s.next();
s.next();
final int N = s.nextInt();
if (N > 10) scene.suspendNodesUpdating();
//is the next token 'dim'?
if (s.next("dim").startsWith("dim") == false)
{
System.out.println("NetMessagesProcessor: Don't understand this msg: "+msg);
s.close();
return;
}
//so the next token is dimensionality of the points
final int D = s.nextInt();
//now, point by point is reported
final Point p = new Point();
int ID = 0;
for (int n=0; n < N; ++n)
{
//extract the point ID
ID = s.nextInt();
//now read and save coordinates
int d=0;
for (; d < D && d < 3; ++d) p.centre.set(d, s.nextFloat());
//read possibly remaining coordinates (for which we have no room to store them)
for (; d < D; ++d) s.nextFloat();
//NB: all points in the same message (in this function call) are of the same dimensionality
p.radius.set(0, s.nextFloat());
p.radius.set(1, p.radius.x());
p.radius.set(2, p.radius.x());
p.color = s.nextInt();
scene.addUpdateOrRemovePoint(ID,p);
}
s.close();
if (N > 10) scene.resumeNodesUpdating();
}
private
void processLines(final String msg)
{
Scanner s = new Scanner(msg).useLocale(Locale.ENGLISH);
//System.out.println("processing point msg: "+msg);
//this skips the "v1 lines" - the two tokens
s.next();
s.next();
final int N = s.nextInt();
if (N > 10) scene.suspendNodesUpdating();
//is the next token 'dim'?
if (s.next("dim").startsWith("dim") == false)
{
System.out.println("NetMessagesProcessor: Don't understand this msg: "+msg);
s.close();
return;
}
//so the next token is dimensionality of the points
final int D = s.nextInt();