Commit 493af0b2 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: Basic infrastructure for different data types

parent 7e2afba4
from loomcomm_pb2 import Register, Data
from loomcomm_pb2 import Register, Data, DataPrologue
import socket
from connection import Connection
......@@ -40,14 +40,20 @@ class Client(object):
data = {}
while expected != len(data):
msg = self.connection.receive_message()
msg_data = Data()
msg_data.ParseFromString(msg)
data[msg_data.id] = self.connection.read_data(msg_data.size)
prologue = DataPrologue()
prologue.ParseFromString(msg)
data[prologue.id] = self._receive_data()
if single_result:
return data[results.id]
else:
return [data[task.id] for task in results]
def _receive_data(self):
msg_data = Data()
msg_data.ParseFromString(self.connection.receive_message())
assert msg_data.type_id == 300
return self.connection.read_data(msg_data.size)
def _send_message(self, message):
data = message.SerializeToString()
self.connection.send_message(data)
......@@ -18,7 +18,7 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='loomcomm.proto',
package='loomcomm',
serialized_pb=_b('\n\x0eloomcomm.proto\x12\x08loomcomm\"\x9f\x01\n\x08Register\x12\x18\n\x10protocol_version\x18\x01 \x02(\x05\x12%\n\x04type\x18\x02 \x02(\x0e\x32\x17.loomcomm.Register.Type\x12\x0c\n\x04port\x18\x03 \x01(\x05\x12\x12\n\ntask_types\x18\x04 \x03(\t\"0\n\x04Type\x12\x13\n\x0fREGISTER_WORKER\x10\x01\x12\x13\n\x0fREGISTER_CLIENT\x10\x02\"&\n\rServerMessage\"\x15\n\x04Type\x12\r\n\tSTART_JOB\x10\x01\"\xb1\x01\n\rWorkerCommand\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.WorkerCommand.Type\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x11\n\ttask_type\x18\x03 \x01(\x05\x12\x13\n\x0btask_config\x18\x04 \x01(\t\x12\x13\n\x0btask_inputs\x18\x05 \x03(\x05\x12\x0f\n\x07\x61\x64\x64ress\x18\n \x01(\t\"\x1a\n\x04Type\x12\x08\n\x04TASK\x10\x01\x12\x08\n\x04SEND\x10\x02\"\x1c\n\x0eWorkerResponse\x12\n\n\x02id\x18\x02 \x01(\x05\"\x18\n\x08\x41nnounce\x12\x0c\n\x04port\x18\x01 \x02(\x05\" \n\x04\x44\x61ta\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x0c\n\x04size\x18\x02 \x02(\x04\x42\x02H\x03')
serialized_pb=_b('\n\x0eloomcomm.proto\x12\x08loomcomm\"\x9f\x01\n\x08Register\x12\x18\n\x10protocol_version\x18\x01 \x02(\x05\x12%\n\x04type\x18\x02 \x02(\x0e\x32\x17.loomcomm.Register.Type\x12\x0c\n\x04port\x18\x03 \x01(\x05\x12\x12\n\ntask_types\x18\x04 \x03(\t\"0\n\x04Type\x12\x13\n\x0fREGISTER_WORKER\x10\x01\x12\x13\n\x0fREGISTER_CLIENT\x10\x02\"&\n\rServerMessage\"\x15\n\x04Type\x12\r\n\tSTART_JOB\x10\x01\"\xc4\x01\n\rWorkerCommand\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.WorkerCommand.Type\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x11\n\ttask_type\x18\x03 \x01(\x05\x12\x13\n\x0btask_config\x18\x04 \x01(\t\x12\x13\n\x0btask_inputs\x18\x05 \x03(\x05\x12\x0f\n\x07\x61\x64\x64ress\x18\n \x01(\t\x12\x11\n\twith_size\x18\x0b \x01(\x08\"\x1a\n\x04Type\x12\x08\n\x04TASK\x10\x01\x12\x08\n\x04SEND\x10\x02\"\x1c\n\x0eWorkerResponse\x12\n\n\x02id\x18\x02 \x01(\x05\"\x18\n\x08\x41nnounce\x12\x0c\n\x04port\x18\x01 \x02(\x05\"-\n\x0c\x44\x61taPrologue\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x11\n\tdata_size\x18\x03 \x01(\x04\"%\n\x04\x44\x61ta\x12\x0f\n\x07type_id\x18\x01 \x02(\x05\x12\x0c\n\x04size\x18\x02 \x01(\x04\x42\x02H\x03')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
......@@ -81,8 +81,8 @@ _WORKERCOMMAND_TYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
serialized_start=382,
serialized_end=408,
serialized_start=401,
serialized_end=427,
)
_sym_db.RegisterEnumDescriptor(_WORKERCOMMAND_TYPE)
......@@ -212,6 +212,13 @@ _WORKERCOMMAND = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='with_size', full_name='loomcomm.WorkerCommand.with_size', index=6,
number=11, type=8, cpp_type=7, label=1,
has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
......@@ -225,7 +232,7 @@ _WORKERCOMMAND = _descriptor.Descriptor(
oneofs=[
],
serialized_start=231,
serialized_end=408,
serialized_end=427,
)
......@@ -254,8 +261,8 @@ _WORKERRESPONSE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=410,
serialized_end=438,
serialized_start=429,
serialized_end=457,
)
......@@ -284,8 +291,45 @@ _ANNOUNCE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=440,
serialized_end=464,
serialized_start=459,
serialized_end=483,
)
_DATAPROLOGUE = _descriptor.Descriptor(
name='DataPrologue',
full_name='loomcomm.DataPrologue',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='id', full_name='loomcomm.DataPrologue.id', index=0,
number=1, type=5, cpp_type=1, label=2,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='data_size', full_name='loomcomm.DataPrologue.data_size', index=1,
number=3, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
extension_ranges=[],
oneofs=[
],
serialized_start=485,
serialized_end=530,
)
......@@ -297,7 +341,7 @@ _DATA = _descriptor.Descriptor(
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='id', full_name='loomcomm.Data.id', index=0,
name='type_id', full_name='loomcomm.Data.type_id', index=0,
number=1, type=5, cpp_type=1, label=2,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
......@@ -305,7 +349,7 @@ _DATA = _descriptor.Descriptor(
options=None),
_descriptor.FieldDescriptor(
name='size', full_name='loomcomm.Data.size', index=1,
number=2, type=4, cpp_type=4, label=2,
number=2, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
......@@ -321,8 +365,8 @@ _DATA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=466,
serialized_end=498,
serialized_start=532,
serialized_end=569,
)
_REGISTER.fields_by_name['type'].enum_type = _REGISTER_TYPE
......@@ -335,6 +379,7 @@ DESCRIPTOR.message_types_by_name['ServerMessage'] = _SERVERMESSAGE
DESCRIPTOR.message_types_by_name['WorkerCommand'] = _WORKERCOMMAND
DESCRIPTOR.message_types_by_name['WorkerResponse'] = _WORKERRESPONSE
DESCRIPTOR.message_types_by_name['Announce'] = _ANNOUNCE
DESCRIPTOR.message_types_by_name['DataPrologue'] = _DATAPROLOGUE
DESCRIPTOR.message_types_by_name['Data'] = _DATA
Register = _reflection.GeneratedProtocolMessageType('Register', (_message.Message,), dict(
......@@ -372,6 +417,13 @@ Announce = _reflection.GeneratedProtocolMessageType('Announce', (_message.Messag
))
_sym_db.RegisterMessage(Announce)
DataPrologue = _reflection.GeneratedProtocolMessageType('DataPrologue', (_message.Message,), dict(
DESCRIPTOR = _DATAPROLOGUE,
__module__ = 'loomcomm_pb2'
# @@protoc_insertion_point(class_scope:loomcomm.DataPrologue)
))
_sym_db.RegisterMessage(DataPrologue)
Data = _reflection.GeneratedProtocolMessageType('Data', (_message.Message,), dict(
DESCRIPTOR = _DATA,
__module__ = 'loomcomm_pb2'
......
......@@ -11,6 +11,10 @@ add_library(libloom
databuilder.h
data.cpp
data.h
unpacking.cpp
unpacking.h
rawdata.h
rawdata.cpp
interconnect.h
interconnect.cpp
task.cpp
......
......@@ -2,124 +2,53 @@
#include "worker.h"
#include "log.h"
#include <sstream>
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>
using namespace loom;
Data::Data(int id)
: id(id), data(nullptr), size(0), in_file(false)
{
Data::~Data() {
}
Data::~Data()
void Data::serialize(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr)
{
if (data != nullptr) {
if (in_file) {
munmap(data, size);
} else {
delete [] data;
}
}
loomcomm::Data msg;
msg.set_type_id(get_type_id());
msg.set_size(get_size());
//init_message(worker, msg);
buffer.add(msg);
serialize_data(worker, buffer, data_ptr);
}
char* Data::init_memonly(size_t size)
/*void init_message(Worker &worker, loomcomm::Data &msg)
{
assert(data == nullptr);
this->size = size;
in_file = false;
data = new char[size];
return data;
}
char* Data::init_empty_file(Worker &worker, size_t size)
{
assert(data == nullptr);
this->size = size;
in_file = true;
int fd = ::open(get_filename(worker).c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
if (fd < 0) {
llog->critical("Cannot open data {} for writing", get_filename(worker));
log_errno_abort("open");
}
if (size > 0) {
if (!lseek(fd, size - 1, SEEK_SET)) {
log_errno_abort("lseek");
}
if (write(fd, "", 1) != 1) {
log_errno_abort("write");
}
}
map(fd, true);
::close(fd);
}*/
return data;
}
void Data::init_from_file(Worker &worker)
char *Data::get_raw_data(Worker &worker)
{
assert(data == nullptr);
in_file = true;
size = file_size(get_filename(worker).c_str());
return nullptr;
}
std::string Data::get_filename(Worker &worker) const
{
std::stringstream s;
s << worker.get_work_dir() << "data/" << id;
return s.str();
return "";
}
void Data::make_symlink(Worker &worker, const std::string &path) const
DataUnpacker::~DataUnpacker()
{
assert(in_file);
if (symlink(get_filename(worker).c_str(), path.c_str())) {
log_errno_abort("symlink");
}
}
void Data::open(Worker &worker)
bool DataUnpacker::on_message(Connection &connection, const char *data, size_t size)
{
assert(in_file);
int fd = ::open(get_filename(worker).c_str(), O_RDONLY, S_IRUSR | S_IWUSR);
if (fd < 0) {
llog->critical("Cannot open data {}", get_filename(worker));
log_errno_abort("open");
}
struct stat finfo;
memset(&finfo, 0, sizeof(finfo));
if (fstat(fd, &finfo) == -1)
{
log_errno_abort("fstat");
}
size = finfo.st_size;
map(fd, false);
::close(fd);
assert(0);
}
void Data::map(int fd, bool write)
void DataUnpacker::on_data_chunk(const char *data, size_t size)
{
assert(data == nullptr);
assert(in_file);
assert(fd >= 0);
assert(0);
}
int flags = PROT_READ;
if (write) {
flags |= PROT_WRITE;
}
data = (char*) mmap(0, size, flags, MAP_SHARED, fd, 0);
if (data == MAP_FAILED) {
log_errno_abort("mmap");
}
bool DataUnpacker::on_data_finish(Connection &connection)
{
assert(0);
}
......@@ -3,80 +3,51 @@
#include "types.h"
#include <uv.h>
#include "loomcomm.pb.h"
#include <stdlib.h>
#include <vector>
#include <uv.h>
#include <string>
#include <memory>
namespace loom {
class Worker;
class SendBuffer;
class Connection;
class Data
{
public:
virtual ~Data();
Data(Id id);
~Data();
char* init_memonly(size_t size);
char* init_empty_file(Worker &worker, size_t size);
void init_from_file(Worker &worker);
int get_id() const {
return id;
}
size_t get_size() const {
return size;
}
virtual int get_type_id() = 0;
char* get_data(Worker &worker) {
if (data == NULL) {
open(worker);
}
return data;
}
virtual size_t get_size() = 0;
virtual std::string get_info() = 0;
uv_buf_t get_uv_buf(Worker &worker) {
uv_buf_t buf;
buf.base = get_data(worker);
buf.len = size;
return buf;
}
void serialize(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr);
//virtual void init_message(Worker &worker, loomcomm::Data &msg);
virtual void serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr) = 0;
std::string get_filename(Worker &worker) const;
int get_fd(Worker &worker) const;
void make_symlink(Worker &worker, const std::string &path) const;
/*void add(const char *new_data, size_t size) {
data.insert(data.end(), new_data, new_data + size);
}
void add(const Data &other) {
data.insert(data.end(), other.data.begin(), other.data.end());
}
virtual char *get_raw_data(Worker &worker);
virtual std::string get_filename(Worker &worker) const;
};
void add(const std::string& new_data) {
add(new_data.data(), new_data.size());
class DataUnpacker
{
public:
virtual ~DataUnpacker();
virtual bool init(Worker &worker, Connection &connection, const loomcomm::Data &msg) = 0;
virtual bool on_message(Connection &connection, const char *data, size_t size);
virtual void on_data_chunk(const char *data, size_t size);
virtual bool on_data_finish(Connection &connection);
std::unique_ptr<Data> release_data() {
return std::move(data);
}
std::string get_data_as_string() const {
return std::string(data.begin(), data.end());
}*/
private:
void open(Worker &worker);
void map(int fd, bool write);
Id id;
char *data;
size_t size;
bool in_file;
protected:
std::unique_ptr<Data> data;
};
}
......
#ifndef LIBLOOM_DATABUILDER_H
/*#ifndef LIBLOOM_DATABUILDER_H
#define LIBLOOM_DATABUILDER_H
#include "data.h"
......@@ -44,3 +44,4 @@ public:
}
#endif // LIBLOOM_DATABUILDER_H
*/
......@@ -10,7 +10,7 @@
using namespace loom;
InterConnection::InterConnection(Worker &worker)
: SimpleConnectionCallback(worker.get_loop()), worker(worker)
: SimpleConnectionCallback(worker.get_loop()), worker(worker), data_id(-1)
{
}
......@@ -40,19 +40,42 @@ void InterConnection::on_close()
worker.unregister_connection(*this);
}
void InterConnection::finish_data()
{
llog->debug("Data {} sucessfully received", data_id);
worker.publish_data(data_id,
data_unpacker->release_data());
data_unpacker.reset();
data_id = -1;
}
void InterConnection::on_message(const char *buffer, size_t size)
{
if (address.size()) {
if (data_unpacker.get()) {
data_unpacker->on_message(connection, buffer, size);;
return;
}
if (data_id > 0) {
assert(data_unpacker.get() == nullptr);
loomcomm::Data msg;
msg.ParseFromArray(buffer, size);
data_unpacker = worker.unpack(msg.type_id());
if (data_unpacker->init(worker, connection, msg)) {
finish_data();
}
return;
} else if (address.size()) {
loomcomm::DataPrologue msg;
msg.ParseFromArray(buffer, size);
auto id = msg.id();
auto size = msg.size();
llog->debug("Receiving data id={} size={}", id, size);
assert(data_builder.get() == nullptr);
bool map_file = !worker.get_work_dir().empty();
data_builder = std::make_unique<DataBuilder>(worker, id, size, map_file);
connection.set_raw_read(size);
data_id = id;
if (msg.has_data_size()) {
llog->debug("Receiving data id={} (data_size={})", id, msg.data_size());
} else {
llog->debug("Receiving data id={}", id);
}
} else {
// First message
loomcomm::Announce msg;
msg.ParseFromArray(buffer, size);
std::stringstream s;
......@@ -64,30 +87,34 @@ void InterConnection::on_message(const char *buffer, size_t size)
void InterConnection::on_data_chunk(const char *buffer, size_t size)
{
assert(data_builder.get());
data_builder->add(buffer, size);
assert(data_unpacker.get());
data_unpacker->on_data_chunk(buffer, size);
}
void InterConnection::on_data_finish()
{
std::unique_ptr<Data> data = data_builder->release_data();
data_builder.reset();
llog->debug("Data {} sucessfully received", data->get_id());
assert(data.get());
worker.publish_data(std::move(data));
assert(data_unpacker.get());
if (data_unpacker->on_data_finish(connection)) {
finish_data();
}
}
void InterConnection::send(std::shared_ptr<Data> &data)
void InterConnection::send(Id id, std::shared_ptr<Data> &data, bool with_size)
{
loomcomm::Data msg;
SendBuffer *buffer = new SendBuffer();
loomcomm::DataPrologue msg;
msg.set_id(id);
msg.set_id(data->get_id());
msg.set_size(data->get_size());
if (!with_size) {
buffer->add(msg);
}
data->serialize(worker, *buffer, data);
SendBuffer *buffer = new SendBuffer();
buffer->add(msg);
buffer->add(data, data->get_data(worker), data->get_size());
if (with_size) {
msg.set_data_size(buffer->get_size());
buffer->insert(0, msg);
}
Connection::State state = connection.get_state();
assert(state == Connection::ConnectionOpen ||
......
......@@ -19,7 +19,7 @@ public:
InterConnection(Worker &worker);
~InterConnection();
void send(std::shared_ptr<Data> &data);
void send(Id id, std::shared_ptr<Data> &data, bool with_size);
void send(std::unique_ptr<SendBuffer> buffer);
void accept(uv_tcp_t *listen_socket) {
connection.accept(listen_socket);
......@@ -46,10 +46,13 @@ protected:
void on_connection();
void on_close();
void finish_data();
Worker &worker;
std::string address;
std::unique_ptr<DataBuilder> data_builder;
std::unique_ptr<DataUnpacker> data_unpacker;
Id data_id;
static std::string make_address(const std::string &host, int port);
......
This diff is collapsed.
This diff is collapsed.
#include "rawdata.h"
#include "log.h"
#include "utils.h"
#include "worker.h"
#include <sstream>
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>
using namespace loom;
size_t RawData::file_id_counter = 1;
RawData::RawData()
: data(nullptr), size(0), file_id(0)
{
}
RawData::~RawData()
{
if (data != nullptr) {
if (file_id) {
munmap(data, size);
} else {
delete [] data;
}
}
}
char* RawData::init_memonly(size_t size)
{
assert(data == nullptr);
assert(file_id == 0);
this->size = size;
data = new char[size];
return data;
}
char* RawData::init_empty_file(Worker &worker, size_t size)
{
assert(data == nullptr);
if (file_id == 0) {
assign_file_id();
}
this->size = size;
int fd = ::open(get_filename(worker).c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
if (fd < 0) {
llog->critical("Cannot open data {} for writing", get_filename(worker));
log_errno_abort("open");
}
if (size > 0) {
if (!lseek(fd, size - 1, SEEK_SET)) {
log_errno_abort("lseek");
}
if (write(fd, "", 1) != 1) {
log_errno_abort("write");
}
}
map(fd, true);
::close(fd);
return data;
}
void RawData::assign_file_id()
{
assert(file_id == 0);
file_id = file_id_counter++;
}
void RawData::init_from_file(Worker &worker)
{
assert(data == nullptr);
if (file_id == 0) {
assign_file_id();
}
size = file_size(get_filename(worker).c_str());
}
std::string RawData::get_filename(Worker &worker) const
{
assert(file_id);
std::stringstream s;
s << worker.get_work_dir() << "data/" << file_id;
return s.str();
}
void RawData::open(Worker &worker)
{
assert(file_id);
int fd = ::open(get_filename(worker).c_str(), O_RDONLY, S_IRUSR | S_IWUSR);
if (fd < 0) {
llog->critical("Cannot open data {}", get_filename(worker));
log_errno_abort("open");
}
struct stat finfo;
memset(&finfo, 0, sizeof(finfo));
if (fstat(fd, &finfo) == -1)
{
log_errno_abort("fstat");
}
size = finfo.st_size;
map(fd, false);
::close(fd);
}