Commit 473e3067 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: length and data size in messages

parent 09978eea
......@@ -88,9 +88,9 @@ class Client(object):
msg_data.ParseFromString(self.connection.receive_message())
type_id = msg_data.type_id
if type_id == 300: # Data
return self.connection.read_data(msg_data.arg0_u64)
return self.connection.read_data(msg_data.size)
if type_id == 400: # Array
return [self._receive_data() for i in xrange(msg_data.arg0_u64)]
return [self._receive_data() for i in xrange(msg_data.length)]
assert 0
def _send_message(self, message):
......
......@@ -18,7 +18,7 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='loomcomm.proto',
package='loomcomm',
serialized_pb=_b('\n\x0eloomcomm.proto\x12\x08loomcomm\"\xbb\x01\n\x08Register\x12\x18\n\x10protocol_version\x18\x01 \x02(\x05\x12%\n\x04type\x18\x02 \x02(\x0e\x32\x17.loomcomm.Register.Type\x12\x0c\n\x04port\x18\x03 \x01(\x05\x12\x12\n\ntask_types\x18\x04 \x03(\t\x12\x0c\n\x04\x63pus\x18\x05 \x01(\x05\x12\x0c\n\x04info\x18\n \x01(\x08\"0\n\x04Type\x12\x13\n\x0fREGISTER_WORKER\x10\x01\x12\x13\n\x0fREGISTER_CLIENT\x10\x02\"&\n\rServerMessage\"\x15\n\x04Type\x12\r\n\tSTART_JOB\x10\x01\"\xf1\x01\n\rWorkerCommand\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.WorkerCommand.Type\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x11\n\ttask_type\x18\x03 \x01(\x05\x12\x13\n\x0btask_config\x18\x04 \x01(\t\x12\x13\n\x0btask_inputs\x18\x05 \x03(\x05\x12\x0f\n\x07\x61\x64\x64ress\x18\n \x01(\t\x12\x11\n\twith_size\x18\x0b \x01(\x08\x12\x0f\n\x07symbols\x18\x64 \x03(\t\"6\n\x04Type\x12\x08\n\x04TASK\x10\x01\x12\x08\n\x04SEND\x10\x02\x12\n\n\x06REMOVE\x10\x03\x12\x0e\n\nDICTIONARY\x10\x04\"|\n\x0eWorkerResponse\x12+\n\x04type\x18\x01 \x02(\x0e\x32\x1d.loomcomm.WorkerResponse.Type\x12\n\n\x02id\x18\x02 \x02(\x05\x12\x11\n\terror_msg\x18\x03 \x01(\t\"\x1e\n\x04Type\x12\n\n\x06\x46INISH\x10\x01\x12\n\n\x06\x46\x41ILED\x10\x02\"\x18\n\x08\x41nnounce\x12\x0c\n\x04port\x18\x01 \x02(\x05\"-\n\x0c\x44\x61taPrologue\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x11\n\tdata_size\x18\x03 \x01(\x04\";\n\x04\x44\x61ta\x12\x0f\n\x07type_id\x18\x01 \x02(\x05\x12\x10\n\x08\x61rg0_u64\x18\x02 \x01(\x04\x12\x10\n\x08\x61rg1_u64\x18\x03 \x01(\x04\"\"\n\x04Info\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x0e\n\x06worker\x18\x02 \x02(\t\"6\n\x05\x45rror\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x0e\n\x06worker\x18\x02 \x02(\t\x12\x11\n\terror_msg\x18\x03 \x02(\t\"\xc6\x01\n\rClientMessage\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.ClientMessage.Type\x12$\n\x04\x64\x61ta\x18\x02 \x01(\x0b\x32\x16.loomcomm.DataPrologue\x12\x1c\n\x04info\x18\x03 \x01(\x0b\x32\x0e.loomcomm.Info\x12\x1e\n\x05\x65rror\x18\x04 \x01(\x0b\x32\x0f.loomcomm.Error\"%\n\x04Type\x12\x08\n\x04\x44\x41TA\x10\x01\x12\x08\n\x04INFO\x10\x02\x12\t\n\x05\x45RROR\x10\x03\x42\x02H\x03')
serialized_pb=_b('\n\x0eloomcomm.proto\x12\x08loomcomm\"\xbb\x01\n\x08Register\x12\x18\n\x10protocol_version\x18\x01 \x02(\x05\x12%\n\x04type\x18\x02 \x02(\x0e\x32\x17.loomcomm.Register.Type\x12\x0c\n\x04port\x18\x03 \x01(\x05\x12\x12\n\ntask_types\x18\x04 \x03(\t\x12\x0c\n\x04\x63pus\x18\x05 \x01(\x05\x12\x0c\n\x04info\x18\n \x01(\x08\"0\n\x04Type\x12\x13\n\x0fREGISTER_WORKER\x10\x01\x12\x13\n\x0fREGISTER_CLIENT\x10\x02\"&\n\rServerMessage\"\x15\n\x04Type\x12\r\n\tSTART_JOB\x10\x01\"\xf1\x01\n\rWorkerCommand\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.WorkerCommand.Type\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x11\n\ttask_type\x18\x03 \x01(\x05\x12\x13\n\x0btask_config\x18\x04 \x01(\t\x12\x13\n\x0btask_inputs\x18\x05 \x03(\x05\x12\x0f\n\x07\x61\x64\x64ress\x18\n \x01(\t\x12\x11\n\twith_size\x18\x0b \x01(\x08\x12\x0f\n\x07symbols\x18\x64 \x03(\t\"6\n\x04Type\x12\x08\n\x04TASK\x10\x01\x12\x08\n\x04SEND\x10\x02\x12\n\n\x06REMOVE\x10\x03\x12\x0e\n\nDICTIONARY\x10\x04\"\x9a\x01\n\x0eWorkerResponse\x12+\n\x04type\x18\x01 \x02(\x0e\x32\x1d.loomcomm.WorkerResponse.Type\x12\n\n\x02id\x18\x02 \x02(\x05\x12\x0c\n\x04size\x18\x03 \x01(\x04\x12\x0e\n\x06length\x18\x04 \x01(\x04\x12\x11\n\terror_msg\x18\x64 \x01(\t\"\x1e\n\x04Type\x12\n\n\x06\x46INISH\x10\x01\x12\n\n\x06\x46\x41ILED\x10\x02\"\x18\n\x08\x41nnounce\x12\x0c\n\x04port\x18\x01 \x02(\x05\"-\n\x0c\x44\x61taPrologue\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x11\n\tdata_size\x18\x03 \x01(\x04\"Y\n\x04\x44\x61ta\x12\x0f\n\x07type_id\x18\x01 \x02(\x05\x12\x0c\n\x04size\x18\x02 \x02(\x04\x12\x0e\n\x06length\x18\x03 \x01(\x04\x12\x10\n\x08\x61rg0_u64\x18\x08 \x01(\x04\x12\x10\n\x08\x61rg1_u64\x18\t \x01(\x04\"\"\n\x04Info\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x0e\n\x06worker\x18\x02 \x02(\t\"6\n\x05\x45rror\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x0e\n\x06worker\x18\x02 \x02(\t\x12\x11\n\terror_msg\x18\x03 \x02(\t\"\xc6\x01\n\rClientMessage\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.ClientMessage.Type\x12$\n\x04\x64\x61ta\x18\x02 \x01(\x0b\x32\x16.loomcomm.DataPrologue\x12\x1c\n\x04info\x18\x03 \x01(\x0b\x32\x0e.loomcomm.Info\x12\x1e\n\x05\x65rror\x18\x04 \x01(\x0b\x32\x0f.loomcomm.Error\"%\n\x04Type\x12\x08\n\x04\x44\x41TA\x10\x01\x12\x08\n\x04INFO\x10\x02\x12\t\n\x05\x45RROR\x10\x03\x42\x02H\x03')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
......@@ -111,8 +111,8 @@ _WORKERRESPONSE_TYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
serialized_start=596,
serialized_end=626,
serialized_start=627,
serialized_end=657,
)
_sym_db.RegisterEnumDescriptor(_WORKERRESPONSE_TYPE)
......@@ -137,8 +137,8 @@ _CLIENTMESSAGE_TYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
serialized_start=1016,
serialized_end=1053,
serialized_start=1077,
serialized_end=1114,
)
_sym_db.RegisterEnumDescriptor(_CLIENTMESSAGE_TYPE)
......@@ -335,8 +335,22 @@ _WORKERRESPONSE = _descriptor.Descriptor(
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='error_msg', full_name='loomcomm.WorkerResponse.error_msg', index=2,
number=3, type=9, cpp_type=9, label=1,
name='size', full_name='loomcomm.WorkerResponse.size', index=2,
number=3, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='length', full_name='loomcomm.WorkerResponse.length', index=3,
number=4, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='error_msg', full_name='loomcomm.WorkerResponse.error_msg', index=4,
number=100, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
......@@ -353,8 +367,8 @@ _WORKERRESPONSE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=502,
serialized_end=626,
serialized_start=503,
serialized_end=657,
)
......@@ -383,8 +397,8 @@ _ANNOUNCE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=628,
serialized_end=652,
serialized_start=659,
serialized_end=683,
)
......@@ -420,8 +434,8 @@ _DATAPROLOGUE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=654,
serialized_end=699,
serialized_start=685,
serialized_end=730,
)
......@@ -440,19 +454,33 @@ _DATA = _descriptor.Descriptor(
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='arg0_u64', full_name='loomcomm.Data.arg0_u64', index=1,
number=2, type=4, cpp_type=4, label=1,
name='size', full_name='loomcomm.Data.size', index=1,
number=2, type=4, cpp_type=4, label=2,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='arg1_u64', full_name='loomcomm.Data.arg1_u64', index=2,
name='length', full_name='loomcomm.Data.length', index=2,
number=3, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='arg0_u64', full_name='loomcomm.Data.arg0_u64', index=3,
number=8, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='arg1_u64', full_name='loomcomm.Data.arg1_u64', index=4,
number=9, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
......@@ -464,8 +492,8 @@ _DATA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=701,
serialized_end=760,
serialized_start=732,
serialized_end=821,
)
......@@ -501,8 +529,8 @@ _INFO = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=762,
serialized_end=796,
serialized_start=823,
serialized_end=857,
)
......@@ -545,8 +573,8 @@ _ERROR = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=798,
serialized_end=852,
serialized_start=859,
serialized_end=913,
)
......@@ -597,8 +625,8 @@ _CLIENTMESSAGE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=855,
serialized_end=1053,
serialized_start=916,
serialized_end=1114,
)
_REGISTER.fields_by_name['type'].enum_type = _REGISTER_TYPE
......
......@@ -8,11 +8,20 @@ Data::~Data() {
}
size_t Data::get_length() const
{
return 0;
}
void Data::serialize(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr)
{
loomcomm::Data msg;
msg.set_type_id(get_type_id());
//msg.set_size(get_size());
msg.set_size(get_size());
auto length = get_length();
if (length) {
msg.set_length(length);
}
init_message(worker, msg);
buffer.add(msg);
serialize_data(worker, buffer, data_ptr);
......
......@@ -21,9 +21,9 @@ public:
virtual ~Data();
virtual int get_type_id() = 0;
virtual size_t get_size() = 0;
virtual std::string get_info() = 0;
virtual size_t get_length() const;
void serialize(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr);
virtual void init_message(Worker &worker, loomcomm::Data &msg) const;
......
......@@ -42,11 +42,6 @@ void loom::Array::serialize_data(loom::Worker &worker, loom::SendBuffer &buffer,
}
}
void loom::Array::init_message(loom::Worker &worker, loomcomm::Data &msg) const
{
msg.set_arg0_u64(length);
}
loom::ArrayUnpacker::~ArrayUnpacker()
{
......@@ -54,9 +49,8 @@ loom::ArrayUnpacker::~ArrayUnpacker()
bool loom::ArrayUnpacker::init(loom::Worker &worker, loom::Connection &connection, const loomcomm::Data &msg)
{
assert(msg.has_arg0_u64());
length = msg.length();
index = 0;
length = msg.arg0_u64();
items = std::make_unique<std::shared_ptr<Data>[]>(length);
if (length == 0) {
finish();
......
......@@ -25,7 +25,6 @@ public:
std::shared_ptr<Data>& get_at_index(size_t index);
void serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr);
void init_message(Worker &worker, loomcomm::Data &msg) const;
private:
size_t length;
......
......@@ -138,11 +138,6 @@ std::string RawData::get_info()
return "RawData";
}
void RawData::init_message(Worker &worker, loomcomm::Data &msg) const
{
msg.set_arg0_u64(size);
}
void RawData::serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr)
{
buffer.add(data_ptr, get_raw_data(worker), size);
......@@ -157,8 +152,7 @@ bool RawDataUnpacker::init(Worker &worker, Connection &connection, const loomcom
{
this->data = std::make_shared<RawData>();
RawData &data = static_cast<RawData&>(*this->data);
assert(msg.has_arg0_u64());
auto size = msg.arg0_u64();
size_t size = msg.size();
pointer = data.init_empty_file(worker, size);
if (size == 0) {
return true;
......
......@@ -28,8 +28,7 @@ public:
return data;
}
std::string get_info();
void init_message(Worker &worker, loomcomm::Data &msg) const;
std::string get_info();
void serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr);
//char* init_memonly(size_t size);
......
......@@ -19,7 +19,7 @@ public:
InterConnection(Worker &worker);
~InterConnection();
void send(Id id, std::shared_ptr<Data> &data, bool with_size);
void send(Id id, std::shared_ptr<Data> &data, bool with_size);
void accept(uv_tcp_t *listen_socket) {
connection.accept(listen_socket);
}
......
This diff is collapsed.
......@@ -630,10 +630,24 @@ class WorkerResponse : public ::google::protobuf::MessageLite {
inline ::google::protobuf::int32 id() const;
inline void set_id(::google::protobuf::int32 value);
// optional string error_msg = 3;
// optional uint64 size = 3;
inline bool has_size() const;
inline void clear_size();
static const int kSizeFieldNumber = 3;
inline ::google::protobuf::uint64 size() const;
inline void set_size(::google::protobuf::uint64 value);
// optional uint64 length = 4;
inline bool has_length() const;
inline void clear_length();
static const int kLengthFieldNumber = 4;
inline ::google::protobuf::uint64 length() const;
inline void set_length(::google::protobuf::uint64 value);
// optional string error_msg = 100;
inline bool has_error_msg() const;
inline void clear_error_msg();
static const int kErrorMsgFieldNumber = 3;
static const int kErrorMsgFieldNumber = 100;
inline const ::std::string& error_msg() const;
inline void set_error_msg(const ::std::string& value);
inline void set_error_msg(const char* value);
......@@ -648,6 +662,10 @@ class WorkerResponse : public ::google::protobuf::MessageLite {
inline void clear_has_type();
inline void set_has_id();
inline void clear_has_id();
inline void set_has_size();
inline void clear_has_size();
inline void set_has_length();
inline void clear_has_length();
inline void set_has_error_msg();
inline void clear_has_error_msg();
......@@ -657,6 +675,8 @@ class WorkerResponse : public ::google::protobuf::MessageLite {
mutable int _cached_size_;
int type_;
::google::protobuf::int32 id_;
::google::protobuf::uint64 size_;
::google::protobuf::uint64 length_;
::std::string* error_msg_;
#ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER
friend void protobuf_AddDesc_loomcomm_2eproto_impl();
......@@ -931,17 +951,31 @@ class Data : public ::google::protobuf::MessageLite {
inline ::google::protobuf::int32 type_id() const;
inline void set_type_id(::google::protobuf::int32 value);
// optional uint64 arg0_u64 = 2;
// required uint64 size = 2;
inline bool has_size() const;
inline void clear_size();
static const int kSizeFieldNumber = 2;
inline ::google::protobuf::uint64 size() const;
inline void set_size(::google::protobuf::uint64 value);
// optional uint64 length = 3;
inline bool has_length() const;
inline void clear_length();
static const int kLengthFieldNumber = 3;
inline ::google::protobuf::uint64 length() const;
inline void set_length(::google::protobuf::uint64 value);
// optional uint64 arg0_u64 = 8;
inline bool has_arg0_u64() const;
inline void clear_arg0_u64();
static const int kArg0U64FieldNumber = 2;
static const int kArg0U64FieldNumber = 8;
inline ::google::protobuf::uint64 arg0_u64() const;
inline void set_arg0_u64(::google::protobuf::uint64 value);
// optional uint64 arg1_u64 = 3;
// optional uint64 arg1_u64 = 9;
inline bool has_arg1_u64() const;
inline void clear_arg1_u64();
static const int kArg1U64FieldNumber = 3;
static const int kArg1U64FieldNumber = 9;
inline ::google::protobuf::uint64 arg1_u64() const;
inline void set_arg1_u64(::google::protobuf::uint64 value);
......@@ -949,6 +983,10 @@ class Data : public ::google::protobuf::MessageLite {
private:
inline void set_has_type_id();
inline void clear_has_type_id();
inline void set_has_size();
inline void clear_has_size();
inline void set_has_length();
inline void clear_has_length();
inline void set_has_arg0_u64();
inline void clear_has_arg0_u64();
inline void set_has_arg1_u64();
......@@ -958,6 +996,8 @@ class Data : public ::google::protobuf::MessageLite {
::google::protobuf::uint32 _has_bits_[1];
mutable int _cached_size_;
::google::protobuf::uint64 size_;
::google::protobuf::uint64 length_;
::google::protobuf::uint64 arg0_u64_;
::google::protobuf::uint64 arg1_u64_;
::google::protobuf::int32 type_id_;
......@@ -1916,16 +1956,64 @@ inline void WorkerResponse::set_id(::google::protobuf::int32 value) {
// @@protoc_insertion_point(field_set:loomcomm.WorkerResponse.id)
}
// optional string error_msg = 3;
inline bool WorkerResponse::has_error_msg() const {
// optional uint64 size = 3;
inline bool WorkerResponse::has_size() const {
return (_has_bits_[0] & 0x00000004u) != 0;
}
inline void WorkerResponse::set_has_error_msg() {
inline void WorkerResponse::set_has_size() {
_has_bits_[0] |= 0x00000004u;
}
inline void WorkerResponse::clear_has_error_msg() {
inline void WorkerResponse::clear_has_size() {
_has_bits_[0] &= ~0x00000004u;
}
inline void WorkerResponse::clear_size() {
size_ = GOOGLE_ULONGLONG(0);
clear_has_size();
}
inline ::google::protobuf::uint64 WorkerResponse::size() const {
// @@protoc_insertion_point(field_get:loomcomm.WorkerResponse.size)
return size_;
}
inline void WorkerResponse::set_size(::google::protobuf::uint64 value) {
set_has_size();
size_ = value;
// @@protoc_insertion_point(field_set:loomcomm.WorkerResponse.size)
}
// optional uint64 length = 4;
inline bool WorkerResponse::has_length() const {
return (_has_bits_[0] & 0x00000008u) != 0;
}
inline void WorkerResponse::set_has_length() {
_has_bits_[0] |= 0x00000008u;
}
inline void WorkerResponse::clear_has_length() {
_has_bits_[0] &= ~0x00000008u;
}
inline void WorkerResponse::clear_length() {
length_ = GOOGLE_ULONGLONG(0);
clear_has_length();
}
inline ::google::protobuf::uint64 WorkerResponse::length() const {
// @@protoc_insertion_point(field_get:loomcomm.WorkerResponse.length)
return length_;
}
inline void WorkerResponse::set_length(::google::protobuf::uint64 value) {
set_has_length();
length_ = value;
// @@protoc_insertion_point(field_set:loomcomm.WorkerResponse.length)
}
// optional string error_msg = 100;
inline bool WorkerResponse::has_error_msg() const {
return (_has_bits_[0] & 0x00000010u) != 0;
}
inline void WorkerResponse::set_has_error_msg() {
_has_bits_[0] |= 0x00000010u;
}
inline void WorkerResponse::clear_has_error_msg() {
_has_bits_[0] &= ~0x00000010u;
}
inline void WorkerResponse::clear_error_msg() {
if (error_msg_ != &::google::protobuf::internal::GetEmptyStringAlreadyInited()) {
error_msg_->clear();
......@@ -2100,16 +2188,64 @@ inline void Data::set_type_id(::google::protobuf::int32 value) {
// @@protoc_insertion_point(field_set:loomcomm.Data.type_id)
}
// optional uint64 arg0_u64 = 2;
inline bool Data::has_arg0_u64() const {
// required uint64 size = 2;
inline bool Data::has_size() const {
return (_has_bits_[0] & 0x00000002u) != 0;
}
inline void Data::set_has_arg0_u64() {
inline void Data::set_has_size() {
_has_bits_[0] |= 0x00000002u;
}
inline void Data::clear_has_arg0_u64() {
inline void Data::clear_has_size() {
_has_bits_[0] &= ~0x00000002u;
}
inline void Data::clear_size() {
size_ = GOOGLE_ULONGLONG(0);
clear_has_size();
}
inline ::google::protobuf::uint64 Data::size() const {
// @@protoc_insertion_point(field_get:loomcomm.Data.size)
return size_;
}
inline void Data::set_size(::google::protobuf::uint64 value) {
set_has_size();
size_ = value;
// @@protoc_insertion_point(field_set:loomcomm.Data.size)
}
// optional uint64 length = 3;
inline bool Data::has_length() const {
return (_has_bits_[0] & 0x00000004u) != 0;
}
inline void Data::set_has_length() {
_has_bits_[0] |= 0x00000004u;
}
inline void Data::clear_has_length() {
_has_bits_[0] &= ~0x00000004u;
}
inline void Data::clear_length() {
length_ = GOOGLE_ULONGLONG(0);
clear_has_length();
}
inline ::google::protobuf::uint64 Data::length() const {
// @@protoc_insertion_point(field_get:loomcomm.Data.length)
return length_;
}
inline void Data::set_length(::google::protobuf::uint64 value) {
set_has_length();
length_ = value;
// @@protoc_insertion_point(field_set:loomcomm.Data.length)
}
// optional uint64 arg0_u64 = 8;
inline bool Data::has_arg0_u64() const {
return (_has_bits_[0] & 0x00000008u) != 0;
}
inline void Data::set_has_arg0_u64() {
_has_bits_[0] |= 0x00000008u;
}
inline void Data::clear_has_arg0_u64() {
_has_bits_[0] &= ~0x00000008u;
}
inline void Data::clear_arg0_u64() {
arg0_u64_ = GOOGLE_ULONGLONG(0);
clear_has_arg0_u64();
......@@ -2124,15 +2260,15 @@ inline void Data::set_arg0_u64(::google::protobuf::uint64 value) {
// @@protoc_insertion_point(field_set:loomcomm.Data.arg0_u64)
}
// optional uint64 arg1_u64 = 3;
// optional uint64 arg1_u64 = 9;
inline bool Data::has_arg1_u64() const {
return (_has_bits_[0] & 0x00000004u) != 0;
return (_has_bits_[0] & 0x00000010u) != 0;
}
inline void Data::set_has_arg1_u64() {
_has_bits_[0] |= 0x00000004u;
_has_bits_[0] |= 0x00000010u;
}
inline void Data::clear_has_arg1_u64() {
_has_bits_[0] &= ~0x00000004u;
_has_bits_[0] &= ~0x00000010u;
}
inline void Data::clear_arg1_u64() {
arg1_u64_ = GOOGLE_ULONGLONG(0);
......
......@@ -40,11 +40,8 @@ void TaskInstance::fail_libuv(const std::string &error_msg, int error_code)
void TaskInstance::finish(std::shared_ptr<Data> &output)
{
assert(output);
worker.publish_data(get_id(), output);
worker.task_finished(*this);
}
void TaskInstance::finish_without_data()
{
worker.task_finished(*this);
assert(output);
worker.task_finished(*this, *output);
}
......@@ -46,7 +46,6 @@ protected:
void fail(const std::string &error_msg);
void fail_libuv(const std::string &error_msg, int error_code);
void finish(std::shared_ptr<Data> &output);
void finish_without_data();
Worker &worker;
std::unique_ptr<Task> task;
......
......@@ -190,7 +190,7 @@ void Worker::start_task(std::unique_ptr<Task> task)
void Worker::publish_data(Id id, std::shared_ptr<Data> &data)
{
llog->debug("Publishing data id={} size={}", id, data->get_size());
public_data[id] = std::move(data);
public_data[id] = data;
check_waiting_tasks();
}
......@@ -309,7 +309,7 @@ void Worker::add_unpacker(DataTypeId type_id, std::unique_ptr<UnpackFactory> fac
std::unique_ptr<DataUnpacker> Worker::unpack(DataTypeId id)
{
auto i = unpack_factories.find(id);
auto i = unpack_factories.find(id);
assert(i != unpack_factories.end());
return i->second->make_unpacker();
}
......@@ -366,12 +366,14 @@ void Worker::task_failed(TaskInstance &task, const std::string &error_msg)
remove_task(task);
}
void Worker::task_finished(TaskInstance &task)
void Worker::task_finished(TaskInstance &task, Data &data)
{
if (server_conn.is_connected()) {
loomcomm::WorkerResponse msg;
msg.set_type(loomcomm::WorkerResponse_Type_FINISH);
msg.set_id(task.get_id());
msg.set_size(data.get_size());
msg.set_length(data.get_length());
server_conn.send_message(msg);
}
resource_cpus += 1;
......
......@@ -62,7 +62,7 @@ public:
return true;
}
void task_finished(TaskInstance &task_instance);
void task_finished(TaskInstance &task_instance, Data &data);
void task_failed(TaskInstance &task_instance, const std::string &error_msg);
void publish_data(Id id, std::shared_ptr<Data> &data);
void remove_data(Id id);
......@@ -115,7 +115,7 @@ public:
void check_ready_tasks();
void set_cpus(int value);
void add_unpacker(DataTypeId type_id, std::unique_ptr<UnpackFactory> factory);
void add_unpacker(DataTypeId type_id, std::unique_ptr<UnpackFactory> factory);
std::unique_ptr<DataUnpacker> unpack(DataTypeId id);
Dictionary& get_dictionary() {
......@@ -128,7 +128,7 @@ private:
void register_worker();
void start_listen();
void remove_task(TaskInstance &task);
void remove_task(TaskInstance &task);
void start_task(std::unique_ptr<Task> task);
//int get_listen_port();
......
......@@ -58,8 +58,12 @@ message WorkerResponse {
required Type type = 1;
required int32 id = 2;
// FINISHED
optional uint64 size = 3;
optional uint64 length = 4;
// FAILED
optional string error_msg = 3;
optional string error_msg = 100;
}
message Announce {
......@@ -74,8 +78,10 @@ message DataPrologue {
message Data
{
required int32 type_id = 1;
optional uint64 arg0_u64 = 2;
optional uint64 arg1_u64 = 3;
required uint64 size = 2;
optional uint64 length = 3;
optional uint64 arg0_u64 = 8;
optional uint64 arg1_u64 = 9;
}
message Info {
......
......@@ -22,7 +22,10 @@ public:
};
TaskNode(loom::Id id, int task_type, const std::string &config)
: state(WAITING), id(id), ref_count(0), task_type(task_type), config(config) {}
: state(WAITING), id(id), ref_count(0), task_type(task_type),
config(config),
size(0), length(0)
{}
bool is_ready() {
assert(state == WAITING);
......@@ -95,9 +98,11 @@ public:
return std::find(owners.begin(), owners.end(), &wconn) != owners.end();
}
void set_finished() {
void set_finished(size_t size, size_t length) {
assert(state == RUNNING);
state = FINISHED;
this->size = size;
this->length = length;
}
const Vector& get_inputs() {
......@@ -114,6 +119,9 @@ private:
std::string config;
std::vector<WorkerConnection*> owners;
size_t size;
size_t length;
};
......
......@@ -57,7 +57,7 @@ void WorkerConnection::on_message(const char *buffer, size_t size)
if (msg.type() == loomcomm::WorkerResponse_Type_FINISH) {
task->add_owner(this);
task->set_finished();
task->set_finished(msg.size(), msg.length());
server.on_task_finished(*task);
return;
}
......
......@@ -4,15 +4,18 @@ loom_env # silence flake8