Commit b4bec0df authored by Stanislav Bohm's avatar Stanislav Bohm

ENH+RF: Refactoring of network layer + introduction of libloomnet

parent ab49d5b1
......@@ -43,7 +43,6 @@ message WorkerCommand {
// SEND
optional string address = 10;
optional bool with_size = 11;
// DICTIONARY
repeated string symbols = 100;
......@@ -69,18 +68,10 @@ message Announce {
required int32 port = 1;
}
message DataPrologue {
message DataHeader {
required int32 id = 1;
optional uint64 data_size = 3;
}
message Data
{
required int32 type_id = 1;
required uint64 size = 2;
optional uint64 length = 3;
optional uint64 arg0_u64 = 8;
optional uint64 arg1_u64 = 9;
required int32 type_id = 3;
required int64 n_messages = 2;
}
message Event {
......@@ -110,7 +101,7 @@ message ClientMessage {
DICTIONARY = 4;
}
required Type type = 1;
optional DataPrologue data = 2;
optional DataHeader data = 2;
optional Event event = 3;
optional Error error = 4;
repeated string symbols = 5;
......
......@@ -15,3 +15,11 @@ protoc loomcomm.proto --python_out=${PYTHON_DIR}
protoc loomplan.proto --python_out=${PYTHON_DIR}
protoc loomrun.proto --python_out=${PYTHON_DIR}
protoc loomreport.proto --python_out=${PYTHON_DIR}
# Fix python
for f in ${PYTHON_DIR}/*.py
do
sed -i -e 's/import loomplan_pb2/from . import loomplan_pb2/g' ${f}
sed -i -e 's/import loomcomm_pb2/from . import loomcomm_pb2/g' ${f}
done
......@@ -3,10 +3,11 @@ from .connection import Connection
from .task import Task
from .plan import Plan
from ..pb.loomcomm_pb2 import Register, Data, ClientMessage, ClientSubmit
from ..pb.loomcomm_pb2 import Register, DataHeader, ClientMessage, ClientSubmit
from ..pb.loomreport_pb2 import Report
import socket
import struct
LOOM_PROTOCOL_VERSION = 1
......@@ -106,8 +107,7 @@ class Client(object):
cmsg = ClientMessage()
cmsg.ParseFromString(msg)
if cmsg.type == ClientMessage.DATA:
prologue = cmsg.data
data[prologue.id] = self._receive_data()
data[cmsg.data.id] = self._receive_data(cmsg.data.type_id)
elif cmsg.type == ClientMessage.EVENT:
self.process_event(cmsg.event, report_data)
elif cmsg.type == ClientMessage.ERROR:
......@@ -155,14 +155,17 @@ class Client(object):
new_event = report_data.events.add()
new_event.CopyFrom(event)
def _receive_data(self):
msg_data = Data()
msg_data.ParseFromString(self.connection.receive_message())
type_id = msg_data.type_id
def _receive_data(self, type_id):
if type_id == self.rawdata_id:
return self.connection.read_data(msg_data.size)
return self.connection.receive_message()
if type_id == self.array_id:
return [self._receive_data() for i in range(msg_data.length)]
types = self.connection.receive_message()
assert len(types) % 4 == 0
result = []
for i in range(0, len(types), 4):
type_id = struct.unpack_from("I", types, i)[0]
result.append(self._receive_data(type_id))
return result
print(type_id, self.array_id, self.rawdata_id)
assert 0
......
import struct
u32 = struct.Struct("<I")
u64 = struct.Struct("<Q")
class Connection(object):
......@@ -13,11 +13,11 @@ class Connection(object):
def receive_message(self):
while True:
size = len(self.data)
if size > 4:
msg_size = u32.unpack(self.data[:4])[0]
msg_size += 4
if size >= 8:
msg_size = u64.unpack(self.data[:8])[0]
msg_size += 8
if size >= msg_size:
message = self.data[4:msg_size]
message = self.data[8:msg_size]
self.data = self.data[msg_size:]
return message
new_data = self.socket.recv(65536)
......@@ -37,5 +37,5 @@ class Connection(object):
self.data = self.socket.recv(65536)
def send_message(self, data):
data = u32.pack(len(data)) + data
data = u64.pack(len(data)) + data
self.socket.sendall(data)
This diff is collapsed.
add_subdirectory(libloomnet)
add_subdirectory(libloom)
add_subdirectory(worker)
add_subdirectory(server)
......@@ -22,8 +22,6 @@ add_library(libloom
tasks/python.h
tasks/loomrun.pb.cc
tasks/loomrun.pb.h
connection.cpp
connection.h
worker.cpp
worker.h
taskinstance.cpp
......@@ -43,13 +41,10 @@ add_library(libloom
task.cpp
task.h
taskdesc.h
sendbuffer.h
sendbuffer.cpp
loomcomm.pb.h
loomcomm.pb.cc
loomplan.pb.h
loomplan.pb.cc
compat.h
log.h
log.cpp
types.h
......
......@@ -18,7 +18,7 @@ public:
}
const std::string& get_work_dir() const {
return server_address;
return work_dir;
}
int get_port() const {
......
#include "connection.h"
#include "utils.h"
#include <string.h>
#include <libloom/compat.h>
using namespace loom;
Connection::Connection(ConnectionCallback *callback, uv_loop_t *loop)
: callback(callback),
state(ConnectionNew),
data_size(0),
data_ptr(nullptr),
remaining_raw_data(0)
{
uv_tcp_init(loop, &socket);
uv_tcp_nodelay(&socket, 1);
socket.data = this;
}
Connection::~Connection()
{
assert(state == ConnectionClosed);
}
std::string Connection::get_peername()
{
sockaddr_in addr;
int len = sizeof(addr);
UV_CHECK(uv_tcp_getpeername(&socket, (struct sockaddr*) &addr, &len));
char tmp[60];
UV_CHECK(uv_ip4_name(&addr, tmp, 60));
return tmp;
}
void Connection::close()
{
if (state != ConnectionClosed && state != ConnectionClosing) {
state = ConnectionClosing;
uv_close((uv_handle_t*) &socket, _on_close);
}
}
void Connection::close_and_discard_remaining_data()
{
received_buffer.reset();
data_size = 0;
uv_read_stop((uv_stream_t*) &socket);
close();
}
void Connection::accept(uv_tcp_t *listen_socket)
{
UV_CHECK(uv_accept((uv_stream_t*) listen_socket, (uv_stream_t*) &socket));
uv_read_start((uv_stream_t *)&socket, _buf_alloc, _on_read);
state = ConnectionOpen;
}
void Connection::start_read()
{
uv_read_start((uv_stream_t *)&socket, _buf_alloc, _on_read);
}
void Connection::set_raw_read(size_t size)
{
assert(remaining_raw_data == 0);
if (size == 0) {
callback->on_data_finish();
return;
}
remaining_raw_data = size;
if (data_size == 0) {
return;
}
if (data_size <= size) {
callback->on_data_chunk(data_ptr, data_size);
remaining_raw_data -= data_size;
data_size = 0;
received_buffer.reset();
if (remaining_raw_data == 0) {
callback->on_data_finish();
}
return;
}
callback->on_data_chunk(data_ptr, size);
remaining_raw_data = 0;
data_size -= size;
data_ptr += size;
callback->on_data_finish();
}
void Connection::_on_connection(uv_connect_t *connect, int status)
{
Connection *connection = static_cast<Connection *>(connect->data);
if (status) {
connection->callback->on_error(status);
} else {
connection->state = ConnectionOpen;
connection->callback->on_connection();
}
delete connect;
}
void Connection::connect(std::string host, int port)
{
assert(state == ConnectionNew);
state = ConnectionConnecting;
struct sockaddr_in dest;
UV_CHECK(uv_ip4_addr(host.c_str(), port, &dest));
uv_connect_t *connect = new uv_connect_t;
connect->data = this;
UV_CHECK(uv_tcp_connect(connect, &socket, (const struct sockaddr *)&dest, _on_connection));
}
void Connection::_on_write(uv_write_t *write_req, int status)
{
UV_CHECK(status);
SendBuffer *buffer = static_cast<SendBuffer *>(write_req->data);
buffer->on_finish(status);
}
void Connection::send_message(google::protobuf::MessageLite &message)
{
auto buffer = std::make_unique<SendBuffer>();
buffer->add(message);
send_buffer(std::move(buffer));
}
void Connection::send_buffer(std::unique_ptr<SendBuffer> buffer)
{
uv_buf_t *bufs = buffer->get_uv_bufs();
size_t count = buffer->get_uv_bufs_count();
SendBuffer *b = buffer.release();
// It will be released in _on_write callback
// It is stored in b->request.data
UV_CHECK(uv_write(&b->request, (uv_stream_t *) &socket, bufs, count, _on_write));
}
void Connection::_on_read(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
{
Connection *connection = static_cast<Connection *>(stream->data);
if (nread == UV_EOF) {
if (buf->base) {
delete[] buf->base;
}
connection->close();
return;
}
if (nread < 0) {
connection->callback->on_error(nread);
return;
}
if (nread == 0) {
return;
}
/* This needs some better buffer handling,
* but for now, we just stick with super simple solution */
auto &data_size = connection->data_size;
auto &data_ptr = connection->data_ptr;
auto &received_buffer = connection->received_buffer;
auto &remaining_raw_data = connection->remaining_raw_data;
auto size = nread;
auto data = buf->base;
if (data_size) {
char *new_data = new char[size + data_size];
memcpy(new_data, data_ptr, data_size);
memcpy(new_data + data_size, data, size);
received_buffer.reset(new_data);
data_ptr = new_data;
delete[] data;
data_size += size;
} else {
data_size = size;
received_buffer.reset(data);
data_ptr = data;
}
for (;;) {
if (remaining_raw_data) {
if (data_size == 0) {
return;
}
if (data_size <= remaining_raw_data) {
connection->callback->on_data_chunk(data_ptr, data_size);
remaining_raw_data -= data_size;
data_size = 0;
received_buffer.reset();
if (remaining_raw_data == 0) {
connection->callback->on_data_finish();
}
return;
}
connection->callback->on_data_chunk(data_ptr, remaining_raw_data);
data_ptr += remaining_raw_data;
data_size -= remaining_raw_data;
remaining_raw_data = 0;
connection->callback->on_data_finish();
continue;
}
if (data_size < (ssize_t) sizeof(uint32_t)) {
return;
}
uint32_t sz = *(reinterpret_cast<uint32_t *>(data_ptr));
uint32_t sz2 = sz + sizeof(uint32_t);
if (data_size < sz2) {
return;
}
char *msg_data = data_ptr + sizeof(uint32_t);
data_ptr += sz2;
data_size -= sz2;
connection->callback->on_message(msg_data, sz);
if (data_size == 0) {
received_buffer.reset();
return;
}
}
}
void Connection::_buf_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
{
buf->base = new char[suggested_size];
buf->len = suggested_size;
}
void Connection::_on_close(uv_handle_t *handle)
{
Connection *connection = static_cast<Connection *>(handle->data);
connection->state = ConnectionClosed;
connection->callback->on_close();
}
ConnectionCallback::~ConnectionCallback()
{
}
void ConnectionCallback::on_error(int error_code)
{
assert(error_code != 0);
UV_CHECK(error_code);
}
SimpleConnectionCallback::~SimpleConnectionCallback()
{
}
#ifndef LOOM_CONNECTION_H
#define LOOM_CONNECTION_H
#include "utils.h"
#include "sendbuffer.h"
#include <uv.h>
#include <string>
#include <functional>
#include <memory>
#include <assert.h>
namespace loom {
class Connection;
/** Base class for handling events emited by Connection */
class ConnectionCallback
{
friend class Connection;
public:
virtual ~ConnectionCallback();
protected:
/** New connection is established. (Not used when connection created by 'accept'). */
virtual void on_connection() { assert(0); }
/** An error has occured, error_code is from libuv. */
virtual void on_error(int error_code);
/** The connection is closed. */
virtual void on_close() = 0;
/** Raw data received */
virtual void on_data_chunk(const char *buffer, size_t size) { assert(0); }
/** Raw data finished */
virtual void on_data_finish() { assert(0); }
/** Message received */
virtual void on_message(const char *buffer, size_t size) = 0;
};
/** Class representing TCP/IP connection */
class Connection
{
public:
enum State {
ConnectionNew,
ConnectionConnecting,
ConnectionOpen,
ConnectionClosing,
ConnectionClosed,
};
Connection(ConnectionCallback *callback, uv_loop_t *loop);
~Connection();
State get_state() const {
return state;
}
/** Set callback instance; the old one is forgotten. */
void set_callback(ConnectionCallback *callback) {
this->callback = callback;
}
void send(uv_write_t *request, uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb)
{
UV_CHECK(uv_write(request, (uv_stream_t*) &socket, bufs, nbufs, cb));
}
std::string get_peername();
void connect(std::string host, int port);
void send_message(::google::protobuf::MessageLite &message);
void send_buffer(std::unique_ptr<SendBuffer> buffer);
void close();
void close_and_discard_remaining_data();
void accept(uv_tcp_t *listen_socket);
void start_read();
void set_raw_read(size_t size);
protected:
ConnectionCallback* callback;
State state;
uv_tcp_t socket;
size_t data_size;
char *data_ptr;
std::unique_ptr<char[]> received_buffer;
size_t remaining_raw_data;
private:
static void _on_connection(uv_connect_t *connect, int status);
static void _on_write(uv_write_t *write_req, int status);
static void _on_read(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf);
static void _on_close(uv_handle_t *handle);
static void _buf_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
};
class SimpleConnectionCallback : public ConnectionCallback
{
public:
SimpleConnectionCallback(uv_loop_t *loop) : connection(this, loop) {}
virtual ~SimpleConnectionCallback();
void connect(const std::string &address, int port) {
connection.connect(address, port);
}
void send_message(::google::protobuf::MessageLite &message) {
connection.send_message(message);
}
void close()
{
connection.close();
}
protected:
Connection connection;
};
}
#endif // LOOM_CLIENT_CONNECTION_H
......@@ -23,25 +23,6 @@ std::shared_ptr<Data> Data::get_slice(size_t from, size_t to)
assert(0);
}
void Data::serialize(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr)
{
loomcomm::Data msg;
msg.set_type_id(worker.get_dictionary().find_symbol_or_fail(get_type_name()));
msg.set_size(get_size());
auto length = get_length();
if (length) {
msg.set_length(length);
}
init_message(worker, msg);
buffer.add(msg);
serialize_data(worker, buffer, data_ptr);
}
void Data::init_message(Worker &worker, loomcomm::Data &msg) const
{
}
const char * Data::get_raw_data() const
{
return nullptr;
......@@ -54,25 +35,24 @@ std::string Data::get_filename() const
bool Data::has_raw_data() const
{
return false;
return false;
}
DataUnpacker::~DataUnpacker()
Id Data::get_type_id(Worker &worker) const
{
return worker.get_dictionary().find_symbol(get_type_name());
}
bool DataUnpacker::on_message(Connection &connection, const char *data, size_t size)
DataBufferItem::DataBufferItem(std::shared_ptr<Data> &data, const char *mem, size_t size)
: mem(mem), size(size), data(data)
{
assert(0);
}
void DataUnpacker::on_data_chunk(const char *data, size_t size)
{
assert(0);
}
bool DataUnpacker::on_data_finish(Connection &connection)
uv_buf_t DataBufferItem::get_buf()
{
assert(0);
uv_buf_t buf;
buf.base = const_cast<char*>(mem);
buf.len = size;
return buf;
}
......@@ -5,6 +5,8 @@
#include "loomcomm.pb.h"
#include "libloomnet/sendbuffer.h"
#include <uv.h>
#include <string>
#include <memory>
......@@ -12,7 +14,7 @@
namespace loom {
class Worker;
class SendBuffer;
class Connection;
/** Base class for data objects */
......@@ -26,12 +28,12 @@ public:
/** Get size of data */
virtual size_t get_size() = 0;
/** Get debugging info string */
virtual std::string get_info() = 0;
/** Get length of data (when object is not indexable then returns 0) */
virtual size_t get_length();
/** Get debugging info string */
virtual std::string get_info() = 0;
/** Get subobject at given index (0 ... get_length()) */
virtual std::shared_ptr<Data> get_at_index(size_t index);
......@@ -39,7 +41,7 @@ public:
virtual std::shared_ptr<Data> get_slice(size_t from, size_t to);
/** Serialize object into send buffer */
void serialize(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr);
virtual size_t serialize(Worker &worker, loom::net::SendBuffer &buffer, std::shared_ptr<Data> &data_ptr) = 0;
/** Get pointer to raw data, returns nullptr when it is not possible */
virtual const char *get_raw_data() const;
......@@ -49,31 +51,21 @@ public:
virtual bool has_raw_data() const;
protected:
/** Init serialization message */
virtual void init_message(Worker &worker, loomcomm::Data &msg) const;
loom::Id get_type_id(Worker &worker) const;
/** Serialize content */
virtual void serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr) = 0;
protected:
};
/** Base class for deserialization */
class DataUnpacker
{
public:
virtual ~DataUnpacker();
virtual bool init(Worker &worker, Connection &connection, const loomcomm::Data &msg) = 0;
virtual bool on_message(Connection &connection, const char *data, size_t size);
virtual void on_data_chunk(const char *data, size_t size);
virtual bool on_data_finish(Connection &connection);
std::shared_ptr<Data>& get_data() {
return data;
}
class DataBufferItem : public loom::net::SendBufferItem {
public:
DataBufferItem(std::shared_ptr<Data> &data, const char *mem, size_t size);
uv_buf_t get_buf();
protected:
std::shared_ptr<Data> data;
const char *mem;
size_t size;
std::shared_ptr<Data> data;
};
typedef std::vector<std::shared_ptr<Data>> DataVector;
......
#include "array.h"
#include "../compat.h"
#include "libloomnet/compat.h"
#include "../worker.h"
#include "../log.h"
......@@ -63,22 +63,35 @@ std::shared_ptr<Data> &Array::get_ref_at_index(size_t index)
return items[index];
}
std::string Array::get_type_name() const
{
return "loom/array";
}