diff --git a/src/client/client.py b/src/client/client.py index 40f4750131cb2294f85b808de1f7640799530a18..f34373017321b54eb072c9b896862fd28e87ba97 100644 --- a/src/client/client.py +++ b/src/client/client.py @@ -1,4 +1,4 @@ -from loomcomm_pb2 import Register, Data +from loomcomm_pb2 import Register, Data, DataPrologue import socket from connection import Connection @@ -40,14 +40,20 @@ class Client(object): data = {} while expected != len(data): msg = self.connection.receive_message() - msg_data = Data() - msg_data.ParseFromString(msg) - data[msg_data.id] = self.connection.read_data(msg_data.size) + prologue = DataPrologue() + prologue.ParseFromString(msg) + data[prologue.id] = self._receive_data() if single_result: return data[results.id] else: return [data[task.id] for task in results] + def _receive_data(self): + msg_data = Data() + msg_data.ParseFromString(self.connection.receive_message()) + assert msg_data.type_id == 300 + return self.connection.read_data(msg_data.size) + def _send_message(self, message): data = message.SerializeToString() self.connection.send_message(data) diff --git a/src/client/loomcomm_pb2.py b/src/client/loomcomm_pb2.py index cabc963519d757d3add67794d3527bdd0e66ce8e..fc35bf7e0be6a7d87a6dddec23dff9f23a64fd90 100644 --- a/src/client/loomcomm_pb2.py +++ b/src/client/loomcomm_pb2.py @@ -18,7 +18,7 @@ _sym_db = _symbol_database.Default() DESCRIPTOR = _descriptor.FileDescriptor( name='loomcomm.proto', package='loomcomm', - serialized_pb=_b('\n\x0eloomcomm.proto\x12\x08loomcomm\"\x9f\x01\n\x08Register\x12\x18\n\x10protocol_version\x18\x01 \x02(\x05\x12%\n\x04type\x18\x02 \x02(\x0e\x32\x17.loomcomm.Register.Type\x12\x0c\n\x04port\x18\x03 \x01(\x05\x12\x12\n\ntask_types\x18\x04 \x03(\t\"0\n\x04Type\x12\x13\n\x0fREGISTER_WORKER\x10\x01\x12\x13\n\x0fREGISTER_CLIENT\x10\x02\"&\n\rServerMessage\"\x15\n\x04Type\x12\r\n\tSTART_JOB\x10\x01\"\xb1\x01\n\rWorkerCommand\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.WorkerCommand.Type\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x11\n\ttask_type\x18\x03 \x01(\x05\x12\x13\n\x0btask_config\x18\x04 \x01(\t\x12\x13\n\x0btask_inputs\x18\x05 \x03(\x05\x12\x0f\n\x07\x61\x64\x64ress\x18\n \x01(\t\"\x1a\n\x04Type\x12\x08\n\x04TASK\x10\x01\x12\x08\n\x04SEND\x10\x02\"\x1c\n\x0eWorkerResponse\x12\n\n\x02id\x18\x02 \x01(\x05\"\x18\n\x08\x41nnounce\x12\x0c\n\x04port\x18\x01 \x02(\x05\" \n\x04\x44\x61ta\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x0c\n\x04size\x18\x02 \x02(\x04\x42\x02H\x03') + serialized_pb=_b('\n\x0eloomcomm.proto\x12\x08loomcomm\"\x9f\x01\n\x08Register\x12\x18\n\x10protocol_version\x18\x01 \x02(\x05\x12%\n\x04type\x18\x02 \x02(\x0e\x32\x17.loomcomm.Register.Type\x12\x0c\n\x04port\x18\x03 \x01(\x05\x12\x12\n\ntask_types\x18\x04 \x03(\t\"0\n\x04Type\x12\x13\n\x0fREGISTER_WORKER\x10\x01\x12\x13\n\x0fREGISTER_CLIENT\x10\x02\"&\n\rServerMessage\"\x15\n\x04Type\x12\r\n\tSTART_JOB\x10\x01\"\xc4\x01\n\rWorkerCommand\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.WorkerCommand.Type\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x11\n\ttask_type\x18\x03 \x01(\x05\x12\x13\n\x0btask_config\x18\x04 \x01(\t\x12\x13\n\x0btask_inputs\x18\x05 \x03(\x05\x12\x0f\n\x07\x61\x64\x64ress\x18\n \x01(\t\x12\x11\n\twith_size\x18\x0b \x01(\x08\"\x1a\n\x04Type\x12\x08\n\x04TASK\x10\x01\x12\x08\n\x04SEND\x10\x02\"\x1c\n\x0eWorkerResponse\x12\n\n\x02id\x18\x02 \x01(\x05\"\x18\n\x08\x41nnounce\x12\x0c\n\x04port\x18\x01 \x02(\x05\"-\n\x0c\x44\x61taPrologue\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x11\n\tdata_size\x18\x03 \x01(\x04\"%\n\x04\x44\x61ta\x12\x0f\n\x07type_id\x18\x01 \x02(\x05\x12\x0c\n\x04size\x18\x02 \x01(\x04\x42\x02H\x03') ) _sym_db.RegisterFileDescriptor(DESCRIPTOR) @@ -81,8 +81,8 @@ _WORKERCOMMAND_TYPE = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=382, - serialized_end=408, + serialized_start=401, + serialized_end=427, ) _sym_db.RegisterEnumDescriptor(_WORKERCOMMAND_TYPE) @@ -212,6 +212,13 @@ _WORKERCOMMAND = _descriptor.Descriptor( message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None), + _descriptor.FieldDescriptor( + name='with_size', full_name='loomcomm.WorkerCommand.with_size', index=6, + number=11, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), ], extensions=[ ], @@ -225,7 +232,7 @@ _WORKERCOMMAND = _descriptor.Descriptor( oneofs=[ ], serialized_start=231, - serialized_end=408, + serialized_end=427, ) @@ -254,8 +261,8 @@ _WORKERRESPONSE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=410, - serialized_end=438, + serialized_start=429, + serialized_end=457, ) @@ -284,8 +291,45 @@ _ANNOUNCE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=440, - serialized_end=464, + serialized_start=459, + serialized_end=483, +) + + +_DATAPROLOGUE = _descriptor.Descriptor( + name='DataPrologue', + full_name='loomcomm.DataPrologue', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='id', full_name='loomcomm.DataPrologue.id', index=0, + number=1, type=5, cpp_type=1, label=2, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='data_size', full_name='loomcomm.DataPrologue.data_size', index=1, + number=3, type=4, cpp_type=4, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=485, + serialized_end=530, ) @@ -297,7 +341,7 @@ _DATA = _descriptor.Descriptor( containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='id', full_name='loomcomm.Data.id', index=0, + name='type_id', full_name='loomcomm.Data.type_id', index=0, number=1, type=5, cpp_type=1, label=2, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, @@ -305,7 +349,7 @@ _DATA = _descriptor.Descriptor( options=None), _descriptor.FieldDescriptor( name='size', full_name='loomcomm.Data.size', index=1, - number=2, type=4, cpp_type=4, label=2, + number=2, type=4, cpp_type=4, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, @@ -321,8 +365,8 @@ _DATA = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=466, - serialized_end=498, + serialized_start=532, + serialized_end=569, ) _REGISTER.fields_by_name['type'].enum_type = _REGISTER_TYPE @@ -335,6 +379,7 @@ DESCRIPTOR.message_types_by_name['ServerMessage'] = _SERVERMESSAGE DESCRIPTOR.message_types_by_name['WorkerCommand'] = _WORKERCOMMAND DESCRIPTOR.message_types_by_name['WorkerResponse'] = _WORKERRESPONSE DESCRIPTOR.message_types_by_name['Announce'] = _ANNOUNCE +DESCRIPTOR.message_types_by_name['DataPrologue'] = _DATAPROLOGUE DESCRIPTOR.message_types_by_name['Data'] = _DATA Register = _reflection.GeneratedProtocolMessageType('Register', (_message.Message,), dict( @@ -372,6 +417,13 @@ Announce = _reflection.GeneratedProtocolMessageType('Announce', (_message.Messag )) _sym_db.RegisterMessage(Announce) +DataPrologue = _reflection.GeneratedProtocolMessageType('DataPrologue', (_message.Message,), dict( + DESCRIPTOR = _DATAPROLOGUE, + __module__ = 'loomcomm_pb2' + # @@protoc_insertion_point(class_scope:loomcomm.DataPrologue) + )) +_sym_db.RegisterMessage(DataPrologue) + Data = _reflection.GeneratedProtocolMessageType('Data', (_message.Message,), dict( DESCRIPTOR = _DATA, __module__ = 'loomcomm_pb2' diff --git a/src/libloom/CMakeLists.txt b/src/libloom/CMakeLists.txt index 53ce074341c21b60db61225605d82c491f0796e6..0f5e29e00c9d9e38e5fc29c63c941a2a61ee5abf 100644 --- a/src/libloom/CMakeLists.txt +++ b/src/libloom/CMakeLists.txt @@ -11,6 +11,10 @@ add_library(libloom databuilder.h data.cpp data.h + unpacking.cpp + unpacking.h + rawdata.h + rawdata.cpp interconnect.h interconnect.cpp task.cpp diff --git a/src/libloom/data.cpp b/src/libloom/data.cpp index 218f07329d5229718c2e1262d9fc01f8f99f0d1e..f97559c0345ab47016410e3c9d461d1eebd48439 100644 --- a/src/libloom/data.cpp +++ b/src/libloom/data.cpp @@ -2,124 +2,53 @@ #include "worker.h" #include "log.h" -#include <sstream> -#include <assert.h> -#include <sys/types.h> -#include <sys/stat.h> -#include <sys/mman.h> -#include <fcntl.h> -#include <unistd.h> - - using namespace loom; -Data::Data(int id) - : id(id), data(nullptr), size(0), in_file(false) -{ +Data::~Data() { } -Data::~Data() +void Data::serialize(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr) { - if (data != nullptr) { - if (in_file) { - munmap(data, size); - } else { - delete [] data; - } - } + loomcomm::Data msg; + msg.set_type_id(get_type_id()); + msg.set_size(get_size()); + //init_message(worker, msg); + buffer.add(msg); + serialize_data(worker, buffer, data_ptr); } -char* Data::init_memonly(size_t size) +/*void init_message(Worker &worker, loomcomm::Data &msg) { - assert(data == nullptr); - this->size = size; - in_file = false; - data = new char[size]; - return data; -} - -char* Data::init_empty_file(Worker &worker, size_t size) -{ - assert(data == nullptr); - this->size = size; - in_file = true; - - int fd = ::open(get_filename(worker).c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR); - if (fd < 0) { - llog->critical("Cannot open data {} for writing", get_filename(worker)); - log_errno_abort("open"); - } - if (size > 0) { - if (!lseek(fd, size - 1, SEEK_SET)) { - log_errno_abort("lseek"); - } - if (write(fd, "", 1) != 1) { - log_errno_abort("write"); - } - } - map(fd, true); - ::close(fd); +}*/ - return data; -} - -void Data::init_from_file(Worker &worker) +char *Data::get_raw_data(Worker &worker) { - assert(data == nullptr); - in_file = true; - size = file_size(get_filename(worker).c_str()); + return nullptr; } std::string Data::get_filename(Worker &worker) const { - std::stringstream s; - s << worker.get_work_dir() << "data/" << id; - return s.str(); + return ""; } -void Data::make_symlink(Worker &worker, const std::string &path) const +DataUnpacker::~DataUnpacker() { - assert(in_file); - if (symlink(get_filename(worker).c_str(), path.c_str())) { - log_errno_abort("symlink"); - } + } -void Data::open(Worker &worker) +bool DataUnpacker::on_message(Connection &connection, const char *data, size_t size) { - assert(in_file); - - int fd = ::open(get_filename(worker).c_str(), O_RDONLY, S_IRUSR | S_IWUSR); - if (fd < 0) { - llog->critical("Cannot open data {}", get_filename(worker)); - log_errno_abort("open"); - } - struct stat finfo; - memset(&finfo, 0, sizeof(finfo)); - if (fstat(fd, &finfo) == -1) - { - log_errno_abort("fstat"); - } - size = finfo.st_size; - map(fd, false); - ::close(fd); + assert(0); } - -void Data::map(int fd, bool write) +void DataUnpacker::on_data_chunk(const char *data, size_t size) { - assert(data == nullptr); - assert(in_file); - assert(fd >= 0); + assert(0); +} - int flags = PROT_READ; - if (write) { - flags |= PROT_WRITE; - } - data = (char*) mmap(0, size, flags, MAP_SHARED, fd, 0); - if (data == MAP_FAILED) { - log_errno_abort("mmap"); - } +bool DataUnpacker::on_data_finish(Connection &connection) +{ + assert(0); } diff --git a/src/libloom/data.h b/src/libloom/data.h index 2143a86e14c83ceb7c386735ba4ddac9aacc2b8c..8971ba14ea96680a4dfca57c4f2f1b675d5c1e8e 100644 --- a/src/libloom/data.h +++ b/src/libloom/data.h @@ -3,80 +3,51 @@ #include "types.h" -#include <uv.h> +#include "loomcomm.pb.h" -#include <stdlib.h> -#include <vector> +#include <uv.h> #include <string> +#include <memory> namespace loom { class Worker; +class SendBuffer; +class Connection; class Data { public: + virtual ~Data(); - Data(Id id); - ~Data(); - - char* init_memonly(size_t size); - char* init_empty_file(Worker &worker, size_t size); - void init_from_file(Worker &worker); - - int get_id() const { - return id; - } - - size_t get_size() const { - return size; - } + virtual int get_type_id() = 0; - char* get_data(Worker &worker) { - if (data == NULL) { - open(worker); - } - return data; - } + virtual size_t get_size() = 0; + virtual std::string get_info() = 0; - uv_buf_t get_uv_buf(Worker &worker) { - uv_buf_t buf; - buf.base = get_data(worker); - buf.len = size; - return buf; - } + void serialize(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr); + //virtual void init_message(Worker &worker, loomcomm::Data &msg); + virtual void serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr) = 0; - std::string get_filename(Worker &worker) const; - int get_fd(Worker &worker) const; - void make_symlink(Worker &worker, const std::string &path) const; - - /*void add(const char *new_data, size_t size) { - data.insert(data.end(), new_data, new_data + size); - } - - void add(const Data &other) { - data.insert(data.end(), other.data.begin(), other.data.end()); - } + virtual char *get_raw_data(Worker &worker); + virtual std::string get_filename(Worker &worker) const; +}; - void add(const std::string& new_data) { - add(new_data.data(), new_data.size()); +class DataUnpacker +{ +public: + virtual ~DataUnpacker(); + virtual bool init(Worker &worker, Connection &connection, const loomcomm::Data &msg) = 0; + virtual bool on_message(Connection &connection, const char *data, size_t size); + virtual void on_data_chunk(const char *data, size_t size); + virtual bool on_data_finish(Connection &connection); + + std::unique_ptr<Data> release_data() { + return std::move(data); } - - - std::string get_data_as_string() const { - return std::string(data.begin(), data.end()); - }*/ - -private: - - void open(Worker &worker); - void map(int fd, bool write); - - Id id; - char *data; - size_t size; - bool in_file; +protected: + std::unique_ptr<Data> data; }; } diff --git a/src/libloom/databuilder.h b/src/libloom/databuilder.h index c91e94720b0480ada4c96d4e4815a794934fa738..73cd5f364ed760fca9e95615a9bbf39a23692ec2 100644 --- a/src/libloom/databuilder.h +++ b/src/libloom/databuilder.h @@ -1,4 +1,4 @@ -#ifndef LIBLOOM_DATABUILDER_H +/*#ifndef LIBLOOM_DATABUILDER_H #define LIBLOOM_DATABUILDER_H #include "data.h" @@ -44,3 +44,4 @@ public: } #endif // LIBLOOM_DATABUILDER_H +*/ diff --git a/src/libloom/interconnect.cpp b/src/libloom/interconnect.cpp index d9c54b221d1ce8f10adeef59fea5e9ee14f9dd31..0a6bd694951eb0e5b3f4390969a333c81e8e80f4 100644 --- a/src/libloom/interconnect.cpp +++ b/src/libloom/interconnect.cpp @@ -10,7 +10,7 @@ using namespace loom; InterConnection::InterConnection(Worker &worker) - : SimpleConnectionCallback(worker.get_loop()), worker(worker) + : SimpleConnectionCallback(worker.get_loop()), worker(worker), data_id(-1) { } @@ -40,19 +40,42 @@ void InterConnection::on_close() worker.unregister_connection(*this); } +void InterConnection::finish_data() +{ + llog->debug("Data {} sucessfully received", data_id); + worker.publish_data(data_id, + data_unpacker->release_data()); + data_unpacker.reset(); + data_id = -1; +} + void InterConnection::on_message(const char *buffer, size_t size) { - if (address.size()) { + if (data_unpacker.get()) { + data_unpacker->on_message(connection, buffer, size);; + return; + } + if (data_id > 0) { + assert(data_unpacker.get() == nullptr); loomcomm::Data msg; msg.ParseFromArray(buffer, size); + data_unpacker = worker.unpack(msg.type_id()); + if (data_unpacker->init(worker, connection, msg)) { + finish_data(); + } + return; + } else if (address.size()) { + loomcomm::DataPrologue msg; + msg.ParseFromArray(buffer, size); auto id = msg.id(); - auto size = msg.size(); - llog->debug("Receiving data id={} size={}", id, size); - assert(data_builder.get() == nullptr); - bool map_file = !worker.get_work_dir().empty(); - data_builder = std::make_unique<DataBuilder>(worker, id, size, map_file); - connection.set_raw_read(size); + data_id = id; + if (msg.has_data_size()) { + llog->debug("Receiving data id={} (data_size={})", id, msg.data_size()); + } else { + llog->debug("Receiving data id={}", id); + } } else { + // First message loomcomm::Announce msg; msg.ParseFromArray(buffer, size); std::stringstream s; @@ -64,30 +87,34 @@ void InterConnection::on_message(const char *buffer, size_t size) void InterConnection::on_data_chunk(const char *buffer, size_t size) { - assert(data_builder.get()); - data_builder->add(buffer, size); + assert(data_unpacker.get()); + data_unpacker->on_data_chunk(buffer, size); } void InterConnection::on_data_finish() { - std::unique_ptr<Data> data = data_builder->release_data(); - data_builder.reset(); - llog->debug("Data {} sucessfully received", data->get_id()); - assert(data.get()); - worker.publish_data(std::move(data)); + assert(data_unpacker.get()); + if (data_unpacker->on_data_finish(connection)) { + finish_data(); + } } -void InterConnection::send(std::shared_ptr<Data> &data) +void InterConnection::send(Id id, std::shared_ptr<Data> &data, bool with_size) { - loomcomm::Data msg; + SendBuffer *buffer = new SendBuffer(); + loomcomm::DataPrologue msg; + msg.set_id(id); - msg.set_id(data->get_id()); - msg.set_size(data->get_size()); + if (!with_size) { + buffer->add(msg); + } + data->serialize(worker, *buffer, data); - SendBuffer *buffer = new SendBuffer(); - buffer->add(msg); - buffer->add(data, data->get_data(worker), data->get_size()); + if (with_size) { + msg.set_data_size(buffer->get_size()); + buffer->insert(0, msg); + } Connection::State state = connection.get_state(); assert(state == Connection::ConnectionOpen || diff --git a/src/libloom/interconnect.h b/src/libloom/interconnect.h index 342224d624c7356a2ba218de4dba966f033d26a3..5fec9acd1f8dd4bad2adb183ce93bdc42bd4f2a6 100644 --- a/src/libloom/interconnect.h +++ b/src/libloom/interconnect.h @@ -19,7 +19,7 @@ public: InterConnection(Worker &worker); ~InterConnection(); - void send(std::shared_ptr<Data> &data); + void send(Id id, std::shared_ptr<Data> &data, bool with_size); void send(std::unique_ptr<SendBuffer> buffer); void accept(uv_tcp_t *listen_socket) { connection.accept(listen_socket); @@ -46,10 +46,13 @@ protected: void on_connection(); void on_close(); + void finish_data(); + Worker &worker; std::string address; - std::unique_ptr<DataBuilder> data_builder; + std::unique_ptr<DataUnpacker> data_unpacker; + Id data_id; static std::string make_address(const std::string &host, int port); diff --git a/src/libloom/loomcomm.pb.cc b/src/libloom/loomcomm.pb.cc index 8919c01f55223e8394ff0d366cd3b313c4c3ce64..44ef3fc0276f7c6ecd07199ef24dc25fe3705390 100644 --- a/src/libloom/loomcomm.pb.cc +++ b/src/libloom/loomcomm.pb.cc @@ -21,6 +21,7 @@ void protobuf_ShutdownFile_loomcomm_2eproto() { delete WorkerCommand::default_instance_; delete WorkerResponse::default_instance_; delete Announce::default_instance_; + delete DataPrologue::default_instance_; delete Data::default_instance_; } @@ -41,12 +42,14 @@ void protobuf_AddDesc_loomcomm_2eproto() { WorkerCommand::default_instance_ = new WorkerCommand(); WorkerResponse::default_instance_ = new WorkerResponse(); Announce::default_instance_ = new Announce(); + DataPrologue::default_instance_ = new DataPrologue(); Data::default_instance_ = new Data(); Register::default_instance_->InitAsDefaultInstance(); ServerMessage::default_instance_->InitAsDefaultInstance(); WorkerCommand::default_instance_->InitAsDefaultInstance(); WorkerResponse::default_instance_->InitAsDefaultInstance(); Announce::default_instance_->InitAsDefaultInstance(); + DataPrologue::default_instance_->InitAsDefaultInstance(); Data::default_instance_->InitAsDefaultInstance(); ::google::protobuf::internal::OnShutdown(&protobuf_ShutdownFile_loomcomm_2eproto); } @@ -572,6 +575,7 @@ const int WorkerCommand::kTaskTypeFieldNumber; const int WorkerCommand::kTaskConfigFieldNumber; const int WorkerCommand::kTaskInputsFieldNumber; const int WorkerCommand::kAddressFieldNumber; +const int WorkerCommand::kWithSizeFieldNumber; #endif // !_MSC_VER WorkerCommand::WorkerCommand() @@ -598,6 +602,7 @@ void WorkerCommand::SharedCtor() { task_type_ = 0; task_config_ = const_cast< ::std::string*>(&::google::protobuf::internal::GetEmptyStringAlreadyInited()); address_ = const_cast< ::std::string*>(&::google::protobuf::internal::GetEmptyStringAlreadyInited()); + with_size_ = false; ::memset(_has_bits_, 0, sizeof(_has_bits_)); } @@ -642,10 +647,20 @@ WorkerCommand* WorkerCommand::New() const { } void WorkerCommand::Clear() { - if (_has_bits_[0 / 32] & 47) { +#define OFFSET_OF_FIELD_(f) (reinterpret_cast<char*>( \ + &reinterpret_cast<WorkerCommand*>(16)->f) - \ + reinterpret_cast<char*>(16)) + +#define ZR_(first, last) do { \ + size_t f = OFFSET_OF_FIELD_(first); \ + size_t n = OFFSET_OF_FIELD_(last) - f + sizeof(last); \ + ::memset(&first, 0, n); \ + } while (0) + + if (_has_bits_[0 / 32] & 111) { + ZR_(task_type_, with_size_); type_ = 1; id_ = 0; - task_type_ = 0; if (has_task_config()) { if (task_config_ != &::google::protobuf::internal::GetEmptyStringAlreadyInited()) { task_config_->clear(); @@ -657,6 +672,10 @@ void WorkerCommand::Clear() { } } } + +#undef OFFSET_OF_FIELD_ +#undef ZR_ + task_inputs_.Clear(); ::memset(_has_bits_, 0, sizeof(_has_bits_)); mutable_unknown_fields()->clear(); @@ -767,6 +786,21 @@ bool WorkerCommand::MergePartialFromCodedStream( } else { goto handle_unusual; } + if (input->ExpectTag(88)) goto parse_with_size; + break; + } + + // optional bool with_size = 11; + case 11: { + if (tag == 88) { + parse_with_size: + DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive< + bool, ::google::protobuf::internal::WireFormatLite::TYPE_BOOL>( + input, &with_size_))); + set_has_with_size(); + } else { + goto handle_unusual; + } if (input->ExpectAtEnd()) goto success; break; } @@ -830,6 +864,11 @@ void WorkerCommand::SerializeWithCachedSizes( 10, this->address(), output); } + // optional bool with_size = 11; + if (has_with_size()) { + ::google::protobuf::internal::WireFormatLite::WriteBool(11, this->with_size(), output); + } + output->WriteRaw(unknown_fields().data(), unknown_fields().size()); // @@protoc_insertion_point(serialize_end:loomcomm.WorkerCommand) @@ -873,6 +912,11 @@ int WorkerCommand::ByteSize() const { this->address()); } + // optional bool with_size = 11; + if (has_with_size()) { + total_size += 1 + 1; + } + } // repeated int32 task_inputs = 5; { @@ -916,6 +960,9 @@ void WorkerCommand::MergeFrom(const WorkerCommand& from) { if (from.has_address()) { set_address(from.address()); } + if (from.has_with_size()) { + set_with_size(from.with_size()); + } } mutable_unknown_fields()->append(from.unknown_fields()); } @@ -940,6 +987,7 @@ void WorkerCommand::Swap(WorkerCommand* other) { std::swap(task_config_, other->task_config_); task_inputs_.Swap(&other->task_inputs_); std::swap(address_, other->address_); + std::swap(with_size_, other->with_size_); std::swap(_has_bits_[0], other->_has_bits_[0]); _unknown_fields_.swap(other->_unknown_fields_); std::swap(_cached_size_, other->_cached_size_); @@ -1337,7 +1385,246 @@ void Announce::Swap(Announce* other) { // =================================================================== #ifndef _MSC_VER -const int Data::kIdFieldNumber; +const int DataPrologue::kIdFieldNumber; +const int DataPrologue::kDataSizeFieldNumber; +#endif // !_MSC_VER + +DataPrologue::DataPrologue() + : ::google::protobuf::MessageLite() { + SharedCtor(); + // @@protoc_insertion_point(constructor:loomcomm.DataPrologue) +} + +void DataPrologue::InitAsDefaultInstance() { +} + +DataPrologue::DataPrologue(const DataPrologue& from) + : ::google::protobuf::MessageLite() { + SharedCtor(); + MergeFrom(from); + // @@protoc_insertion_point(copy_constructor:loomcomm.DataPrologue) +} + +void DataPrologue::SharedCtor() { + _cached_size_ = 0; + id_ = 0; + data_size_ = GOOGLE_ULONGLONG(0); + ::memset(_has_bits_, 0, sizeof(_has_bits_)); +} + +DataPrologue::~DataPrologue() { + // @@protoc_insertion_point(destructor:loomcomm.DataPrologue) + SharedDtor(); +} + +void DataPrologue::SharedDtor() { + #ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER + if (this != &default_instance()) { + #else + if (this != default_instance_) { + #endif + } +} + +void DataPrologue::SetCachedSize(int size) const { + GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN(); + _cached_size_ = size; + GOOGLE_SAFE_CONCURRENT_WRITES_END(); +} +const DataPrologue& DataPrologue::default_instance() { +#ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER + protobuf_AddDesc_loomcomm_2eproto(); +#else + if (default_instance_ == NULL) protobuf_AddDesc_loomcomm_2eproto(); +#endif + return *default_instance_; +} + +DataPrologue* DataPrologue::default_instance_ = NULL; + +DataPrologue* DataPrologue::New() const { + return new DataPrologue; +} + +void DataPrologue::Clear() { +#define OFFSET_OF_FIELD_(f) (reinterpret_cast<char*>( \ + &reinterpret_cast<DataPrologue*>(16)->f) - \ + reinterpret_cast<char*>(16)) + +#define ZR_(first, last) do { \ + size_t f = OFFSET_OF_FIELD_(first); \ + size_t n = OFFSET_OF_FIELD_(last) - f + sizeof(last); \ + ::memset(&first, 0, n); \ + } while (0) + + ZR_(data_size_, id_); + +#undef OFFSET_OF_FIELD_ +#undef ZR_ + + ::memset(_has_bits_, 0, sizeof(_has_bits_)); + mutable_unknown_fields()->clear(); +} + +bool DataPrologue::MergePartialFromCodedStream( + ::google::protobuf::io::CodedInputStream* input) { +#define DO_(EXPRESSION) if (!(EXPRESSION)) goto failure + ::google::protobuf::uint32 tag; + ::google::protobuf::io::StringOutputStream unknown_fields_string( + mutable_unknown_fields()); + ::google::protobuf::io::CodedOutputStream unknown_fields_stream( + &unknown_fields_string); + // @@protoc_insertion_point(parse_start:loomcomm.DataPrologue) + for (;;) { + ::std::pair< ::google::protobuf::uint32, bool> p = input->ReadTagWithCutoff(127); + tag = p.first; + if (!p.second) goto handle_unusual; + switch (::google::protobuf::internal::WireFormatLite::GetTagFieldNumber(tag)) { + // required int32 id = 1; + case 1: { + if (tag == 8) { + DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive< + ::google::protobuf::int32, ::google::protobuf::internal::WireFormatLite::TYPE_INT32>( + input, &id_))); + set_has_id(); + } else { + goto handle_unusual; + } + if (input->ExpectTag(24)) goto parse_data_size; + break; + } + + // optional uint64 data_size = 3; + case 3: { + if (tag == 24) { + parse_data_size: + DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive< + ::google::protobuf::uint64, ::google::protobuf::internal::WireFormatLite::TYPE_UINT64>( + input, &data_size_))); + set_has_data_size(); + } else { + goto handle_unusual; + } + if (input->ExpectAtEnd()) goto success; + break; + } + + default: { + handle_unusual: + if (tag == 0 || + ::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) == + ::google::protobuf::internal::WireFormatLite::WIRETYPE_END_GROUP) { + goto success; + } + DO_(::google::protobuf::internal::WireFormatLite::SkipField( + input, tag, &unknown_fields_stream)); + break; + } + } + } +success: + // @@protoc_insertion_point(parse_success:loomcomm.DataPrologue) + return true; +failure: + // @@protoc_insertion_point(parse_failure:loomcomm.DataPrologue) + return false; +#undef DO_ +} + +void DataPrologue::SerializeWithCachedSizes( + ::google::protobuf::io::CodedOutputStream* output) const { + // @@protoc_insertion_point(serialize_start:loomcomm.DataPrologue) + // required int32 id = 1; + if (has_id()) { + ::google::protobuf::internal::WireFormatLite::WriteInt32(1, this->id(), output); + } + + // optional uint64 data_size = 3; + if (has_data_size()) { + ::google::protobuf::internal::WireFormatLite::WriteUInt64(3, this->data_size(), output); + } + + output->WriteRaw(unknown_fields().data(), + unknown_fields().size()); + // @@protoc_insertion_point(serialize_end:loomcomm.DataPrologue) +} + +int DataPrologue::ByteSize() const { + int total_size = 0; + + if (_has_bits_[0 / 32] & (0xffu << (0 % 32))) { + // required int32 id = 1; + if (has_id()) { + total_size += 1 + + ::google::protobuf::internal::WireFormatLite::Int32Size( + this->id()); + } + + // optional uint64 data_size = 3; + if (has_data_size()) { + total_size += 1 + + ::google::protobuf::internal::WireFormatLite::UInt64Size( + this->data_size()); + } + + } + total_size += unknown_fields().size(); + + GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN(); + _cached_size_ = total_size; + GOOGLE_SAFE_CONCURRENT_WRITES_END(); + return total_size; +} + +void DataPrologue::CheckTypeAndMergeFrom( + const ::google::protobuf::MessageLite& from) { + MergeFrom(*::google::protobuf::down_cast<const DataPrologue*>(&from)); +} + +void DataPrologue::MergeFrom(const DataPrologue& from) { + GOOGLE_CHECK_NE(&from, this); + if (from._has_bits_[0 / 32] & (0xffu << (0 % 32))) { + if (from.has_id()) { + set_id(from.id()); + } + if (from.has_data_size()) { + set_data_size(from.data_size()); + } + } + mutable_unknown_fields()->append(from.unknown_fields()); +} + +void DataPrologue::CopyFrom(const DataPrologue& from) { + if (&from == this) return; + Clear(); + MergeFrom(from); +} + +bool DataPrologue::IsInitialized() const { + if ((_has_bits_[0] & 0x00000001) != 0x00000001) return false; + + return true; +} + +void DataPrologue::Swap(DataPrologue* other) { + if (other != this) { + std::swap(id_, other->id_); + std::swap(data_size_, other->data_size_); + std::swap(_has_bits_[0], other->_has_bits_[0]); + _unknown_fields_.swap(other->_unknown_fields_); + std::swap(_cached_size_, other->_cached_size_); + } +} + +::std::string DataPrologue::GetTypeName() const { + return "loomcomm.DataPrologue"; +} + + +// =================================================================== + +#ifndef _MSC_VER +const int Data::kTypeIdFieldNumber; const int Data::kSizeFieldNumber; #endif // !_MSC_VER @@ -1359,7 +1646,7 @@ Data::Data(const Data& from) void Data::SharedCtor() { _cached_size_ = 0; - id_ = 0; + type_id_ = 0; size_ = GOOGLE_ULONGLONG(0); ::memset(_has_bits_, 0, sizeof(_has_bits_)); } @@ -1409,7 +1696,7 @@ void Data::Clear() { ::memset(&first, 0, n); \ } while (0) - ZR_(size_, id_); + ZR_(size_, type_id_); #undef OFFSET_OF_FIELD_ #undef ZR_ @@ -1432,13 +1719,13 @@ bool Data::MergePartialFromCodedStream( tag = p.first; if (!p.second) goto handle_unusual; switch (::google::protobuf::internal::WireFormatLite::GetTagFieldNumber(tag)) { - // required int32 id = 1; + // required int32 type_id = 1; case 1: { if (tag == 8) { DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive< ::google::protobuf::int32, ::google::protobuf::internal::WireFormatLite::TYPE_INT32>( - input, &id_))); - set_has_id(); + input, &type_id_))); + set_has_type_id(); } else { goto handle_unusual; } @@ -1446,7 +1733,7 @@ bool Data::MergePartialFromCodedStream( break; } - // required uint64 size = 2; + // optional uint64 size = 2; case 2: { if (tag == 16) { parse_size: @@ -1486,12 +1773,12 @@ failure: void Data::SerializeWithCachedSizes( ::google::protobuf::io::CodedOutputStream* output) const { // @@protoc_insertion_point(serialize_start:loomcomm.Data) - // required int32 id = 1; - if (has_id()) { - ::google::protobuf::internal::WireFormatLite::WriteInt32(1, this->id(), output); + // required int32 type_id = 1; + if (has_type_id()) { + ::google::protobuf::internal::WireFormatLite::WriteInt32(1, this->type_id(), output); } - // required uint64 size = 2; + // optional uint64 size = 2; if (has_size()) { ::google::protobuf::internal::WireFormatLite::WriteUInt64(2, this->size(), output); } @@ -1505,14 +1792,14 @@ int Data::ByteSize() const { int total_size = 0; if (_has_bits_[0 / 32] & (0xffu << (0 % 32))) { - // required int32 id = 1; - if (has_id()) { + // required int32 type_id = 1; + if (has_type_id()) { total_size += 1 + ::google::protobuf::internal::WireFormatLite::Int32Size( - this->id()); + this->type_id()); } - // required uint64 size = 2; + // optional uint64 size = 2; if (has_size()) { total_size += 1 + ::google::protobuf::internal::WireFormatLite::UInt64Size( @@ -1536,8 +1823,8 @@ void Data::CheckTypeAndMergeFrom( void Data::MergeFrom(const Data& from) { GOOGLE_CHECK_NE(&from, this); if (from._has_bits_[0 / 32] & (0xffu << (0 % 32))) { - if (from.has_id()) { - set_id(from.id()); + if (from.has_type_id()) { + set_type_id(from.type_id()); } if (from.has_size()) { set_size(from.size()); @@ -1553,14 +1840,14 @@ void Data::CopyFrom(const Data& from) { } bool Data::IsInitialized() const { - if ((_has_bits_[0] & 0x00000003) != 0x00000003) return false; + if ((_has_bits_[0] & 0x00000001) != 0x00000001) return false; return true; } void Data::Swap(Data* other) { if (other != this) { - std::swap(id_, other->id_); + std::swap(type_id_, other->type_id_); std::swap(size_, other->size_); std::swap(_has_bits_[0], other->_has_bits_[0]); _unknown_fields_.swap(other->_unknown_fields_); diff --git a/src/libloom/loomcomm.pb.h b/src/libloom/loomcomm.pb.h index 87f749b299713e9dc86e8c9dcddddee0ecb550f1..893a2bd934a6239f08cc07e033aa871659481025 100644 --- a/src/libloom/loomcomm.pb.h +++ b/src/libloom/loomcomm.pb.h @@ -37,6 +37,7 @@ class ServerMessage; class WorkerCommand; class WorkerResponse; class Announce; +class DataPrologue; class Data; enum Register_Type { @@ -432,6 +433,13 @@ class WorkerCommand : public ::google::protobuf::MessageLite { inline ::std::string* release_address(); inline void set_allocated_address(::std::string* address); + // optional bool with_size = 11; + inline bool has_with_size() const; + inline void clear_with_size(); + static const int kWithSizeFieldNumber = 11; + inline bool with_size() const; + inline void set_with_size(bool value); + // @@protoc_insertion_point(class_scope:loomcomm.WorkerCommand) private: inline void set_has_type(); @@ -444,6 +452,8 @@ class WorkerCommand : public ::google::protobuf::MessageLite { inline void clear_has_task_config(); inline void set_has_address(); inline void clear_has_address(); + inline void set_has_with_size(); + inline void clear_has_with_size(); ::std::string _unknown_fields_; @@ -453,8 +463,9 @@ class WorkerCommand : public ::google::protobuf::MessageLite { ::google::protobuf::int32 id_; ::std::string* task_config_; ::google::protobuf::RepeatedField< ::google::protobuf::int32 > task_inputs_; - ::std::string* address_; ::google::protobuf::int32 task_type_; + bool with_size_; + ::std::string* address_; #ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER friend void protobuf_AddDesc_loomcomm_2eproto_impl(); #else @@ -650,6 +661,107 @@ class Announce : public ::google::protobuf::MessageLite { }; // ------------------------------------------------------------------- +class DataPrologue : public ::google::protobuf::MessageLite { + public: + DataPrologue(); + virtual ~DataPrologue(); + + DataPrologue(const DataPrologue& from); + + inline DataPrologue& operator=(const DataPrologue& from) { + CopyFrom(from); + return *this; + } + + inline const ::std::string& unknown_fields() const { + return _unknown_fields_; + } + + inline ::std::string* mutable_unknown_fields() { + return &_unknown_fields_; + } + + static const DataPrologue& default_instance(); + + #ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER + // Returns the internal default instance pointer. This function can + // return NULL thus should not be used by the user. This is intended + // for Protobuf internal code. Please use default_instance() declared + // above instead. + static inline const DataPrologue* internal_default_instance() { + return default_instance_; + } + #endif + + void Swap(DataPrologue* other); + + // implements Message ---------------------------------------------- + + DataPrologue* New() const; + void CheckTypeAndMergeFrom(const ::google::protobuf::MessageLite& from); + void CopyFrom(const DataPrologue& from); + void MergeFrom(const DataPrologue& from); + void Clear(); + bool IsInitialized() const; + + int ByteSize() const; + bool MergePartialFromCodedStream( + ::google::protobuf::io::CodedInputStream* input); + void SerializeWithCachedSizes( + ::google::protobuf::io::CodedOutputStream* output) const; + void DiscardUnknownFields(); + int GetCachedSize() const { return _cached_size_; } + private: + void SharedCtor(); + void SharedDtor(); + void SetCachedSize(int size) const; + public: + ::std::string GetTypeName() const; + + // nested types ---------------------------------------------------- + + // accessors ------------------------------------------------------- + + // required int32 id = 1; + inline bool has_id() const; + inline void clear_id(); + static const int kIdFieldNumber = 1; + inline ::google::protobuf::int32 id() const; + inline void set_id(::google::protobuf::int32 value); + + // optional uint64 data_size = 3; + inline bool has_data_size() const; + inline void clear_data_size(); + static const int kDataSizeFieldNumber = 3; + inline ::google::protobuf::uint64 data_size() const; + inline void set_data_size(::google::protobuf::uint64 value); + + // @@protoc_insertion_point(class_scope:loomcomm.DataPrologue) + private: + inline void set_has_id(); + inline void clear_has_id(); + inline void set_has_data_size(); + inline void clear_has_data_size(); + + ::std::string _unknown_fields_; + + ::google::protobuf::uint32 _has_bits_[1]; + mutable int _cached_size_; + ::google::protobuf::uint64 data_size_; + ::google::protobuf::int32 id_; + #ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER + friend void protobuf_AddDesc_loomcomm_2eproto_impl(); + #else + friend void protobuf_AddDesc_loomcomm_2eproto(); + #endif + friend void protobuf_AssignDesc_loomcomm_2eproto(); + friend void protobuf_ShutdownFile_loomcomm_2eproto(); + + void InitAsDefaultInstance(); + static DataPrologue* default_instance_; +}; +// ------------------------------------------------------------------- + class Data : public ::google::protobuf::MessageLite { public: Data(); @@ -711,14 +823,14 @@ class Data : public ::google::protobuf::MessageLite { // accessors ------------------------------------------------------- - // required int32 id = 1; - inline bool has_id() const; - inline void clear_id(); - static const int kIdFieldNumber = 1; - inline ::google::protobuf::int32 id() const; - inline void set_id(::google::protobuf::int32 value); + // required int32 type_id = 1; + inline bool has_type_id() const; + inline void clear_type_id(); + static const int kTypeIdFieldNumber = 1; + inline ::google::protobuf::int32 type_id() const; + inline void set_type_id(::google::protobuf::int32 value); - // required uint64 size = 2; + // optional uint64 size = 2; inline bool has_size() const; inline void clear_size(); static const int kSizeFieldNumber = 2; @@ -727,8 +839,8 @@ class Data : public ::google::protobuf::MessageLite { // @@protoc_insertion_point(class_scope:loomcomm.Data) private: - inline void set_has_id(); - inline void clear_has_id(); + inline void set_has_type_id(); + inline void clear_has_type_id(); inline void set_has_size(); inline void clear_has_size(); @@ -737,7 +849,7 @@ class Data : public ::google::protobuf::MessageLite { ::google::protobuf::uint32 _has_bits_[1]; mutable int _cached_size_; ::google::protobuf::uint64 size_; - ::google::protobuf::int32 id_; + ::google::protobuf::int32 type_id_; #ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER friend void protobuf_AddDesc_loomcomm_2eproto_impl(); #else @@ -1146,6 +1258,30 @@ inline void WorkerCommand::set_allocated_address(::std::string* address) { // @@protoc_insertion_point(field_set_allocated:loomcomm.WorkerCommand.address) } +// optional bool with_size = 11; +inline bool WorkerCommand::has_with_size() const { + return (_has_bits_[0] & 0x00000040u) != 0; +} +inline void WorkerCommand::set_has_with_size() { + _has_bits_[0] |= 0x00000040u; +} +inline void WorkerCommand::clear_has_with_size() { + _has_bits_[0] &= ~0x00000040u; +} +inline void WorkerCommand::clear_with_size() { + with_size_ = false; + clear_has_with_size(); +} +inline bool WorkerCommand::with_size() const { + // @@protoc_insertion_point(field_get:loomcomm.WorkerCommand.with_size) + return with_size_; +} +inline void WorkerCommand::set_with_size(bool value) { + set_has_with_size(); + with_size_ = value; + // @@protoc_insertion_point(field_set:loomcomm.WorkerCommand.with_size) +} + // ------------------------------------------------------------------- // WorkerResponse @@ -1204,33 +1340,85 @@ inline void Announce::set_port(::google::protobuf::int32 value) { // ------------------------------------------------------------------- -// Data +// DataPrologue // required int32 id = 1; -inline bool Data::has_id() const { +inline bool DataPrologue::has_id() const { return (_has_bits_[0] & 0x00000001u) != 0; } -inline void Data::set_has_id() { +inline void DataPrologue::set_has_id() { _has_bits_[0] |= 0x00000001u; } -inline void Data::clear_has_id() { +inline void DataPrologue::clear_has_id() { _has_bits_[0] &= ~0x00000001u; } -inline void Data::clear_id() { +inline void DataPrologue::clear_id() { id_ = 0; clear_has_id(); } -inline ::google::protobuf::int32 Data::id() const { - // @@protoc_insertion_point(field_get:loomcomm.Data.id) +inline ::google::protobuf::int32 DataPrologue::id() const { + // @@protoc_insertion_point(field_get:loomcomm.DataPrologue.id) return id_; } -inline void Data::set_id(::google::protobuf::int32 value) { +inline void DataPrologue::set_id(::google::protobuf::int32 value) { set_has_id(); id_ = value; - // @@protoc_insertion_point(field_set:loomcomm.Data.id) + // @@protoc_insertion_point(field_set:loomcomm.DataPrologue.id) +} + +// optional uint64 data_size = 3; +inline bool DataPrologue::has_data_size() const { + return (_has_bits_[0] & 0x00000002u) != 0; +} +inline void DataPrologue::set_has_data_size() { + _has_bits_[0] |= 0x00000002u; +} +inline void DataPrologue::clear_has_data_size() { + _has_bits_[0] &= ~0x00000002u; +} +inline void DataPrologue::clear_data_size() { + data_size_ = GOOGLE_ULONGLONG(0); + clear_has_data_size(); +} +inline ::google::protobuf::uint64 DataPrologue::data_size() const { + // @@protoc_insertion_point(field_get:loomcomm.DataPrologue.data_size) + return data_size_; +} +inline void DataPrologue::set_data_size(::google::protobuf::uint64 value) { + set_has_data_size(); + data_size_ = value; + // @@protoc_insertion_point(field_set:loomcomm.DataPrologue.data_size) +} + +// ------------------------------------------------------------------- + +// Data + +// required int32 type_id = 1; +inline bool Data::has_type_id() const { + return (_has_bits_[0] & 0x00000001u) != 0; +} +inline void Data::set_has_type_id() { + _has_bits_[0] |= 0x00000001u; +} +inline void Data::clear_has_type_id() { + _has_bits_[0] &= ~0x00000001u; +} +inline void Data::clear_type_id() { + type_id_ = 0; + clear_has_type_id(); +} +inline ::google::protobuf::int32 Data::type_id() const { + // @@protoc_insertion_point(field_get:loomcomm.Data.type_id) + return type_id_; +} +inline void Data::set_type_id(::google::protobuf::int32 value) { + set_has_type_id(); + type_id_ = value; + // @@protoc_insertion_point(field_set:loomcomm.Data.type_id) } -// required uint64 size = 2; +// optional uint64 size = 2; inline bool Data::has_size() const { return (_has_bits_[0] & 0x00000002u) != 0; } diff --git a/src/libloom/rawdata.cpp b/src/libloom/rawdata.cpp new file mode 100644 index 0000000000000000000000000000000000000000..986f3f82ebccacf10201db5eb30219f3a2b304d3 --- /dev/null +++ b/src/libloom/rawdata.cpp @@ -0,0 +1,180 @@ +#include "rawdata.h" + +#include "log.h" +#include "utils.h" +#include "worker.h" + +#include <sstream> +#include <assert.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/mman.h> +#include <fcntl.h> +#include <unistd.h> + +using namespace loom; + +size_t RawData::file_id_counter = 1; + +RawData::RawData() + : data(nullptr), size(0), file_id(0) +{ + +} + +RawData::~RawData() +{ + if (data != nullptr) { + if (file_id) { + munmap(data, size); + } else { + delete [] data; + } + } +} + + + +char* RawData::init_memonly(size_t size) +{ + assert(data == nullptr); + assert(file_id == 0); + this->size = size; + data = new char[size]; + return data; +} + +char* RawData::init_empty_file(Worker &worker, size_t size) +{ + assert(data == nullptr); + + if (file_id == 0) { + assign_file_id(); + } + + this->size = size; + + int fd = ::open(get_filename(worker).c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR); + if (fd < 0) { + llog->critical("Cannot open data {} for writing", get_filename(worker)); + log_errno_abort("open"); + } + + if (size > 0) { + if (!lseek(fd, size - 1, SEEK_SET)) { + log_errno_abort("lseek"); + } + if (write(fd, "", 1) != 1) { + log_errno_abort("write"); + } + } + map(fd, true); + ::close(fd); + + return data; +} + +void RawData::assign_file_id() +{ + assert(file_id == 0); + file_id = file_id_counter++; +} + +void RawData::init_from_file(Worker &worker) +{ + assert(data == nullptr); + if (file_id == 0) { + assign_file_id(); + } + size = file_size(get_filename(worker).c_str()); +} + +std::string RawData::get_filename(Worker &worker) const +{ + assert(file_id); + std::stringstream s; + s << worker.get_work_dir() << "data/" << file_id; + return s.str(); +} + +void RawData::open(Worker &worker) +{ + assert(file_id); + + int fd = ::open(get_filename(worker).c_str(), O_RDONLY, S_IRUSR | S_IWUSR); + if (fd < 0) { + llog->critical("Cannot open data {}", get_filename(worker)); + log_errno_abort("open"); + } + struct stat finfo; + memset(&finfo, 0, sizeof(finfo)); + if (fstat(fd, &finfo) == -1) + { + log_errno_abort("fstat"); + } + size = finfo.st_size; + map(fd, false); + ::close(fd); +} + + +void RawData::map(int fd, bool write) +{ + assert(data == nullptr); + assert(file_id); + assert(fd >= 0); + + int flags = PROT_READ; + if (write) { + flags |= PROT_WRITE; + } + data = (char*) mmap(0, size, flags, MAP_SHARED, fd, 0); + if (data == MAP_FAILED) { + log_errno_abort("mmap"); + } +} + +std::string RawData::get_info() +{ + return "RawData"; +} + +/*void RawData::init_message(Worker &worker, loomcomm::Data &msg) +{ + +}*/ + +void RawData::serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr) +{ + buffer.add(data_ptr, get_raw_data(worker), size); +} + +RawDataUnpacker::~RawDataUnpacker() +{ + +} + +bool RawDataUnpacker::init(Worker &worker, Connection &connection, const loomcomm::Data &msg) +{ + auto data = std::make_unique<RawData>(); + assert(msg.has_size()); + auto size = msg.size(); + pointer = data->init_empty_file(worker, size); + this->data = std::move(data); + if (size == 0) { + return true; + } + connection.set_raw_read(size); + return false; +} + +void RawDataUnpacker::on_data_chunk(const char *data, size_t size) +{ + memcpy(pointer, data, size); + pointer += size; +} + +bool RawDataUnpacker::on_data_finish(Connection &connection) +{ + return true; +} diff --git a/src/libloom/rawdata.h b/src/libloom/rawdata.h new file mode 100644 index 0000000000000000000000000000000000000000..cdee622463442ce9dcbbce4fee666398762efa65 --- /dev/null +++ b/src/libloom/rawdata.h @@ -0,0 +1,72 @@ +#ifndef LIBLOOM_RAWDATA_H +#define LIBLOOM_RAWDATA_H + +#include "data.h" + +namespace loom { + +class RawData : public Data { +public: + static const int TYPE_ID = 300; + + RawData(); + ~RawData(); + + int get_type_id() { + return TYPE_ID; + } + + size_t get_size() { + return size; + } + + char *get_raw_data(Worker &worker) + { + if (data == nullptr) { + open(worker); + } + return data; + } + + std::string get_info(); + void serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr); + + char* init_memonly(size_t size); + char* init_empty_file(Worker &worker, size_t size); + void assign_file_id(); + void init_from_file(Worker &worker); + + std::string get_filename(Worker &worker) const; + int get_fd(Worker &worker) const; + +private: + + void open(Worker &worker); + void map(int fd, bool write); + + char *data; + size_t size; + size_t file_id; + + static size_t file_id_counter; + +}; + +class RawDataUnpacker : public DataUnpacker +{ +public: + ~RawDataUnpacker(); + RawData& get_data() { + return *(static_cast<RawData*>(data.get())); + } + + bool init(Worker &worker, Connection &connection, const loomcomm::Data &msg); + void on_data_chunk(const char *data, size_t size); + bool on_data_finish(Connection &connection); +protected: + char *pointer = nullptr; +}; + +} + +#endif // LIBLOOM_RAWDATA_H diff --git a/src/libloom/sendbuffer.cpp b/src/libloom/sendbuffer.cpp index 5726f2378989c5765a414e8070484002ddb954d8..48c138f6ae9f20b6e30292ca9b47e3cdd5f8db56 100644 --- a/src/libloom/sendbuffer.cpp +++ b/src/libloom/sendbuffer.cpp @@ -29,3 +29,23 @@ void SendBuffer::add(google::protobuf::MessageLite &message) message.SerializeToArray(data.get() + sizeof(size), size); add(std::move(data), size + sizeof(size)); } + +void SendBuffer::insert(int index, google::protobuf::MessageLite &message) +{ + uint32_t size = message.ByteSize(); + auto data = std::make_unique<char[]>(size + sizeof(size)); + + uint32_t *size_ptr = reinterpret_cast<uint32_t *>(data.get()); + *size_ptr = size; + message.SerializeToArray(data.get() + sizeof(size), size); + insert(index, std::move(data), size + sizeof(size)); +} + +size_t SendBuffer::get_size() const +{ + size_t size = 0; + for (auto &buf : bufs) { + size += buf.len; + } + return size; +} diff --git a/src/libloom/sendbuffer.h b/src/libloom/sendbuffer.h index 6160392cb1321424ada3be1471502cf9cb1820e7..f68b303994662822f654da9b7c5ac281193c5ed7 100644 --- a/src/libloom/sendbuffer.h +++ b/src/libloom/sendbuffer.h @@ -38,12 +38,21 @@ public: raw_memory.push_back(std::move(data)); } + void insert(int index, std::unique_ptr<char[]> data, size_t size) { + bufs.emplace(bufs.begin() + index, uv_buf_t {data.get(), size}); + raw_memory.push_back(std::move(data)); + } + + void add(std::shared_ptr<Data> &data, char *data_ptr, size_t size) { bufs.emplace_back(uv_buf_t {data_ptr, size}); data_vector.push_back(data); } void add(::google::protobuf::MessageLite &message); + void insert(int index, ::google::protobuf::MessageLite &message); + + size_t get_size() const; uv_write_t request; diff --git a/src/libloom/taskinstance.cpp b/src/libloom/taskinstance.cpp index d7c8b6c8c63b4585babf180925ccc26e64c56fea..4be650638544119109593e15d95a84a0d865866a 100644 --- a/src/libloom/taskinstance.cpp +++ b/src/libloom/taskinstance.cpp @@ -28,8 +28,7 @@ const std::string TaskInstance::get_task_dir() void TaskInstance::finish(std::unique_ptr<Data> output) { - assert (output->get_id() == get_id()); - worker.publish_data(std::move(output)); + worker.publish_data(get_id(), std::move(output)); worker.task_finished(*this); } diff --git a/src/libloom/types.h b/src/libloom/types.h index 4e88658ad93659ab0070d5f7d08b6c0cbfe91fac..b505c359f7f9d37a0eb37fd8880fd4c6cd666608 100644 --- a/src/libloom/types.h +++ b/src/libloom/types.h @@ -7,6 +7,7 @@ const int PROTOCOL_VERSION = 1; typedef int Id; typedef int TaskId; +typedef int DataTypeId; } diff --git a/src/libloom/unpacking.cpp b/src/libloom/unpacking.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ae62f05804d4fe371a159221ebe06e1679172522 --- /dev/null +++ b/src/libloom/unpacking.cpp @@ -0,0 +1,7 @@ + +#include "unpacking.h" + +loom::UnpackFactory::~UnpackFactory() +{ + +} diff --git a/src/libloom/unpacking.h b/src/libloom/unpacking.h new file mode 100644 index 0000000000000000000000000000000000000000..40ac06d0dc95faafc2b341a0e2c658ee96263eb5 --- /dev/null +++ b/src/libloom/unpacking.h @@ -0,0 +1,27 @@ +#ifndef LIBLOOM_UNPACKING_H +#define LIBLOOM_UNPACKING_H + +#include "data.h" + +#include <memory> + +namespace loom { + +class UnpackFactory +{ +public: + virtual ~UnpackFactory(); + virtual std::unique_ptr<DataUnpacker> make_unpacker() = 0; +}; + +template<typename T> class SimpleUnpackFactory : public UnpackFactory +{ +public: + std::unique_ptr<DataUnpacker> make_unpacker() { + return std::make_unique<T>(); + } +}; + +} + +#endif // LIBLOOM_UNPACKING_H diff --git a/src/libloom/utils.h b/src/libloom/utils.h index 4b90d651e6e4bcf6d1b5b24171b75ef82d386237..52615188ec83658f2e366992af04cebcf2a99916 100644 --- a/src/libloom/utils.h +++ b/src/libloom/utils.h @@ -11,7 +11,7 @@ namespace loom { #define UV_CHECK(call) \ { int _uv_r = (call); \ if (unlikely(_uv_r)) { \ - report_uv_error(_uv_r, __LINE__, __FILE__); \ + loom::report_uv_error(_uv_r, __LINE__, __FILE__); \ } \ } diff --git a/src/libloom/worker.cpp b/src/libloom/worker.cpp index 30675f28d9a88f2ffaac48a04ef033f246c8e11a..44dd72bd5dd6b064cfd8de01cc6747fc87ac0490 100644 --- a/src/libloom/worker.cpp +++ b/src/libloom/worker.cpp @@ -3,6 +3,7 @@ #include "utils.h" #include "log.h" #include "types.h" +#include "rawdata.h" #include <stdlib.h> #include <sstream> @@ -63,6 +64,9 @@ Worker::Worker(uv_loop_t *loop, llog->info("Using '{}' as working directory", work_dir); } + add_unpacker(RawData::TYPE_ID, + std::make_unique<SimpleUnpackFactory<RawDataUnpacker>>()); + resource_cpus = 1; } @@ -174,10 +178,10 @@ void Worker::start_task(std::unique_ptr<Task> task) t->start(); }*/ -void Worker::publish_data(std::unique_ptr<Data> data) +void Worker::publish_data(Id id, std::unique_ptr<Data> data) { - llog->debug("Publishing data id={} size={}", data->get_id(), data->get_size()); - public_data[data->get_id()] = std::move(data); + llog->debug("Publishing data id={} size={}", id, data->get_size()); + public_data[id] = std::move(data); check_waiting_tasks(); } @@ -279,6 +283,20 @@ void Worker::set_cpus(int value) llog->info("Number of CPUs for worker: {}", value); } +void Worker::add_unpacker(DataTypeId type_id, std::unique_ptr<UnpackFactory> factory) +{ + auto &f = unpack_factories[type_id]; + assert(f.get() == nullptr); + f = std::move(factory); +} + +std::unique_ptr<DataUnpacker> Worker::unpack(DataTypeId id) +{ + auto i = unpack_factories.find(id); + assert(i != unpack_factories.end()); + return i->second->make_unpacker(); +} + void Worker::check_waiting_tasks() { bool something_new = false; @@ -322,10 +340,10 @@ void Worker::task_finished(TaskInstance &task) check_ready_tasks(); } -void Worker::send_data(const std::string &address, std::shared_ptr<Data> &data) +void Worker::send_data(const std::string &address, Id id, std::shared_ptr<Data> &data, bool with_size) { auto &connection = get_connection(address);; - connection.send(data); + connection.send(id, data, with_size); } ServerConnection::ServerConnection(Worker &worker) @@ -378,7 +396,8 @@ void ServerConnection::on_message(const char *data, size_t size) msg.set_address(worker.get_server_address() + ":" + address.substr(2, std::string::npos)); } llog->debug("Sending data {} to {}", msg.id(), msg.address()); - assert(worker.send_data(msg.address(), msg.id())); + bool with_size = msg.has_with_size() && msg.with_size(); + assert(worker.send_data(msg.address(), msg.id(), with_size)); break; } default: diff --git a/src/libloom/worker.h b/src/libloom/worker.h index cbfeeff453cc02e559d707e1e1933fc5bb06586a..dc87c915a8bfb3beeb3845282f2bd7cce5b2542e 100644 --- a/src/libloom/worker.h +++ b/src/libloom/worker.h @@ -3,7 +3,7 @@ #include "interconnect.h" #include "taskinstance.h" -#include "data.h" +#include "unpacking.h" #include "taskfactory.h" #include <uv.h> @@ -16,6 +16,7 @@ namespace loom { class Worker; +class DataUnpacker; class ServerConnection : public SimpleConnectionCallback { @@ -50,18 +51,18 @@ public: } void new_task(std::unique_ptr<Task> task); - void send_data(const std::string &address, std::shared_ptr<Data> &data); - bool send_data(const std::string &address, Id id) { + void send_data(const std::string &address, Id id, std::shared_ptr<Data> &data, bool with_size); + bool send_data(const std::string &address, Id id, bool with_size) { auto& data = public_data[id]; if (data.get() == nullptr) { return false; } - send_data(address, data); + send_data(address, id, data, with_size); return true; } void task_finished(TaskInstance &task_instance); - void publish_data(std::unique_ptr<Data> data); + void publish_data(Id id, std::unique_ptr<Data> data); bool has_data(Id id) const { @@ -111,6 +112,8 @@ public: void check_ready_tasks(); void set_cpus(int value); + void add_unpacker(DataTypeId type_id, std::unique_ptr<UnpackFactory> factory); + std::unique_ptr<DataUnpacker> unpack(DataTypeId id); private: void register_worker(); @@ -127,9 +130,12 @@ private: std::vector<std::unique_ptr<TaskInstance>> active_tasks; std::vector<std::unique_ptr<Task>> ready_tasks; std::vector<std::unique_ptr<Task>> waiting_tasks; + std::vector<std::unique_ptr<TaskFactory>> task_factories; + std::unordered_map<int, std::shared_ptr<Data>> public_data; + std::string work_dir; - std::vector<std::unique_ptr<TaskFactory>> task_factories; + std::unordered_map<DataTypeId, std::unique_ptr<UnpackFactory>> unpack_factories; ServerConnection server_conn; std::unordered_map<std::string, std::unique_ptr<InterConnection>> connections; @@ -141,9 +147,6 @@ private: uv_tcp_t listen_socket; int listen_port; - std::string work_dir; - - static void _on_new_connection(uv_stream_t *stream, int status); static void _on_getaddrinfo(uv_getaddrinfo_t* handle, int status, struct addrinfo* response); }; diff --git a/src/proto/loomcomm.proto b/src/proto/loomcomm.proto index 47c5849b237e678625e3426e80d524398f1267a2..5ad238eb80930b3db93f6362253d5568a39a19b4 100644 --- a/src/proto/loomcomm.proto +++ b/src/proto/loomcomm.proto @@ -35,6 +35,7 @@ message WorkerCommand { // SEND optional string address = 10; + optional bool with_size = 11; } message WorkerResponse { @@ -45,7 +46,13 @@ message Announce { required int32 port = 1; } -message Data { +message DataPrologue { required int32 id = 1; - required uint64 size = 2; + optional uint64 data_size = 3; +} + +message Data +{ + required int32 type_id = 1; + optional uint64 size = 2; } diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index ee92058ab23b3e1e420de33bfad941e6caa7026a..57a4f35531abffc1e0ce1dc56a3b66d9ef51226b 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -2,6 +2,8 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14 -g -Wall") add_library(loom-server-lib OBJECT server.cpp server.h + dummyworker.cpp + dummyworker.h taskmanager.cpp taskmanager.h tasknode.cpp diff --git a/src/server/resendtask.cpp b/src/server/resendtask.cpp index d14c6e63ab7da70b3b4e5a61b82edabdf531f439..81c372c73d63ceb7ff2ed6843ee648be69a7c467 100644 --- a/src/server/resendtask.cpp +++ b/src/server/resendtask.cpp @@ -1,4 +1,4 @@ -#include "resendtask.h" +/*#include "resendtask.h" #include "server.h" #include "libloom/loomcomm.pb.h" @@ -17,14 +17,20 @@ void ResendTask::start(DataVector &input_data) { assert(input_data.size() == 1); auto data = *input_data[0]; - llog->debug("Resending data id={} to client", data->get_id()); - - loomcomm::Data msg; - msg.set_id(data->get_id()); - msg.set_size(data->get_size()); + Id id = -get_id(); + llog->debug("Resending data id={} to client", id); + loomcomm::DataPrologue msg; + msg.set_id(id); buffer.add(msg); - buffer.add(data, data->get_data(worker), data->get_size()); + data->serialize(worker, buffer, data); + +// loomcomm::Data msg; +// msg.set_id(id); +// msg.set_size(data->get_size()); +// +// buffer.add(msg); +// buffer.add(data, data->get_data(worker), data->get_size()); auto &connection = server.get_client_connection(); connection.send_buffer(&buffer); @@ -52,3 +58,4 @@ void ResendTask::_SendBuffer::on_finish(int status) llog->debug("Resend task id={} finished", task.get_id()); task.finish_without_data(); } +*/ diff --git a/src/server/resendtask.h b/src/server/resendtask.h index 8108b95244d1193d2eaf6480f8943e7e527a7f95..1caa4833bf41be95387d4c8a0ffe96a93f6c6e28 100644 --- a/src/server/resendtask.h +++ b/src/server/resendtask.h @@ -1,4 +1,4 @@ -#ifndef LOOM_SERVER_RESENDJOB_H +/*#ifndef LOOM_SERVER_RESENDJOB_H #define LOOM_SERVER_RESENDJOB_H #include "libloom/taskinstance.h" @@ -45,3 +45,4 @@ private: #endif // LOOM_SERVER_RESENDJOB_H +*/ diff --git a/src/server/server.cpp b/src/server/server.cpp index 144d0938b096dba039a4a40e175aaf7ec24174ba..aedbcd1bd637c9dac7100c4a3aa4ad61d75ebf71 100644 --- a/src/server/server.cpp +++ b/src/server/server.cpp @@ -11,7 +11,9 @@ using namespace loom; Server::Server(uv_loop_t *loop, int port) : loop(loop), listen_port(port), - task_manager(*this) + task_manager(*this), + dummy_worker(*this) + { if (loop != NULL) { UV_CHECK(uv_tcp_init(loop, &listen_socket)); @@ -20,9 +22,9 @@ Server::Server(uv_loop_t *loop, int port) llog->info("Starting server on {}", port); - dummy_worker = std::make_unique<Worker>(loop, "", 0, ""); - dummy_worker->add_task_factory(std::make_unique<ResendTaskFactory>(*this)); - loom::llog->debug("Dummy worker started at {}", dummy_worker->get_listen_port()); + + dummy_worker.start_listen(); + loom::llog->debug("Dummy worker started at {}", dummy_worker.get_listen_port()); } } @@ -50,13 +52,6 @@ void Server::remove_client_connection(ClientConnection &conn) client_connection.reset(); } -std::string Server::get_dummy_worker_address() const -{ - std::stringstream s; - s << "!:" << dummy_worker->get_listen_port(); - return s.str(); -} - void Server::remove_freshconnection(FreshConnection &conn) { auto i = std::find_if( @@ -67,13 +62,6 @@ void Server::remove_freshconnection(FreshConnection &conn) fresh_connections.erase(i); } -void Server::add_resend_task(Id id) -{ - std::unique_ptr<Task> task = std::make_unique<Task>(-1, 0, ""); - task->add_input(id); - dummy_worker->new_task(std::move(task)); -} - void Server::start_listen() { struct sockaddr_in addr; diff --git a/src/server/server.h b/src/server/server.h index 120d353ab256f7b50215f3ef131c781e4d2b1357..6b5e9b9572ed15374a15e11c850dfb7b52d7c438 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -5,11 +5,11 @@ #include "clientconn.h" #include "freshconn.h" #include "taskmanager.h" - -#include "libloom/worker.h" +#include "dummyworker.h" #include <vector> + class Server { public: @@ -27,12 +27,10 @@ public: void add_client_connection(std::unique_ptr<ClientConnection> conn); void remove_client_connection(ClientConnection &conn); - loom::Worker& get_dummy_worker() const { - return *dummy_worker; + DummyWorker& get_dummy_worker() { + return dummy_worker; } - std::string get_dummy_worker_address() const; - void remove_freshconnection(FreshConnection &conn); TaskManager& get_task_manager() { @@ -54,7 +52,6 @@ private: void start_listen(); uv_loop_t *loop; - std::unique_ptr<loom::Worker> dummy_worker; std::vector<std::unique_ptr<WorkerConnection>> connections; @@ -66,6 +63,7 @@ private: int listen_port; TaskManager task_manager; + DummyWorker dummy_worker; static void _on_new_connection(uv_stream_t *stream, int status); }; diff --git a/src/server/taskmanager.cpp b/src/server/taskmanager.cpp index 974eb51639375dc82b9daa7f0f0ce6d87bf8a91e..7041c0f3e2ab78a8b2bf5cd2d209c479f8f71a68 100644 --- a/src/server/taskmanager.cpp +++ b/src/server/taskmanager.cpp @@ -65,8 +65,7 @@ void TaskManager::on_task_finished(TaskNode &task) llog->debug("Job id={} [RESULT] finished", id); const auto &owners = task.get_owners(); assert(owners.size()); - owners[0]->send_data(id, server.get_dummy_worker_address()); - server.add_resend_task(id); + owners[0]->send_data(id, server.get_dummy_worker().get_address(), true); } else { llog->debug("Job id={} finished", id); } @@ -135,7 +134,7 @@ void TaskManager::distribute_work(TaskNode::Vector &tasks) if (!input->is_owner(load.connection)) { assert(input->get_owners().size() >= 1); WorkerConnection *owner = input->get_owners()[0]; - owner->send_data(input->get_id(), load.connection.get_address()); + owner->send_data(input->get_id(), load.connection.get_address(), false); } } load.connection.send_task(task); diff --git a/src/server/workerconn.cpp b/src/server/workerconn.cpp index c33ea277e033e2909ccba29c1cca70269cda1dbf..72e7fdf1f3bb255f591cf8f7559b8533cda2ac01 100644 --- a/src/server/workerconn.cpp +++ b/src/server/workerconn.cpp @@ -73,7 +73,7 @@ void WorkerConnection::send_task(TaskNode *task) connection->send_message(msg); } -void WorkerConnection::send_data(Id id, const std::string &address) +void WorkerConnection::send_data(Id id, const std::string &address, bool with_size) { llog->debug("Command for {}: SEND id={} address={}", this->address, id, address); @@ -81,6 +81,9 @@ void WorkerConnection::send_data(Id id, const std::string &address) msg.set_type(loomcomm::WorkerCommand_Type_SEND); msg.set_id(id); msg.set_address(address); + if (with_size) { + msg.set_with_size(with_size); + } connection->send_message(msg); } diff --git a/src/server/workerconn.h b/src/server/workerconn.h index d2dcbbbe80a6bf7aa5a1c69c6fc94cd4f854b4a9..c3ad35ef0c6c4b062badc77f9c142f68763cdeee 100644 --- a/src/server/workerconn.h +++ b/src/server/workerconn.h @@ -21,7 +21,7 @@ public: void send_task(TaskNode *task); - void send_data(loom::Id id, const std::string &address); + void send_data(loom::Id id, const std::string &address, bool with_size); auto& get_tasks() { return tasks; diff --git a/src/worker/basictasks.cpp b/src/worker/basictasks.cpp index 2c2e44b9152308efde2da7b4acfcf63cc5f456cd..29bce7c13f40c3eb6732a17460058e947c20613f 100644 --- a/src/worker/basictasks.cpp +++ b/src/worker/basictasks.cpp @@ -1,6 +1,7 @@ #include "basictasks.h" #include "libloom/databuilder.h" +#include "libloom/rawdata.h" #include <string.h> @@ -12,9 +13,10 @@ ConstTask::ConstTask(Worker &worker, std::unique_ptr<Task> task) } -void ConstTask::start(DataVector &inputs) { +void ConstTask::start(DataVector &inputs) +{ auto& config = task->get_config(); - auto output = std::make_unique<Data>(get_id()); + auto output = std::make_unique<RawData>(); memcpy(output->init_empty_file(worker, config.size()), config.c_str(), config.size()); finish(std::move(output)); } @@ -29,10 +31,17 @@ void MergeTask::start(DataVector &inputs) { size_t size = 0; for (auto& data : inputs) { size += (*data)->get_size(); - } - DataBuilder builder(worker, get_id(), size, true); + } + auto output = std::make_unique<RawData>(); + output->init_empty_file(worker, size); + char *dst = output->get_raw_data(worker); + for (auto& data : inputs) { - builder.add((*data)->get_data(worker), (*data)->get_size()); + char *mem = (*data)->get_raw_data(worker); + assert(mem); + size_t size = (*data)->get_size(); + memcpy(dst, mem, size); + dst += size; } - finish(builder.release_data()); + finish(std::move(output)); } diff --git a/src/worker/runtask.cpp b/src/worker/runtask.cpp index 146c2cb29d4d4ae19afefce296ea5e964ac26305..f1f3faff09003e14c99ae0f5548987ecca6d3b58 100644 --- a/src/worker/runtask.cpp +++ b/src/worker/runtask.cpp @@ -2,6 +2,7 @@ #include "runtask.h" #include "libloom/worker.h" +#include "libloom/rawdata.h" #include "libloom/log.h" #include "loomrun.pb.h" @@ -73,8 +74,13 @@ void RunTask::start(DataVector &inputs) assert(map.input_index() >= 0 && map.input_index() < input_size); auto& input = *inputs[map.input_index()]; std::string path = get_path(map.filename()); - llog->debug("Creating symlink of data id={} to '{}'", input->get_id(), map.filename()); - input->make_symlink(worker, path); + std::string filename = input->get_filename(worker); + llog->alert("FILENAME = {} {}", filename, (unsigned long) &input); + assert(!filename.empty()); + llog->debug("Creating symlink of '{}'", map.filename()); + if (symlink(filename.c_str(), path.c_str())) { + log_errno_abort("symlink"); + } /* stdin */ if (map.filename() == "+in") { @@ -150,18 +156,12 @@ void RunTask::_on_close(uv_handle_t *handle) if (index == -2) { continue; } - Id id; - if (index == -1) { - id = task->get_id(); - } else { - // TODO - assert(0); - } - auto data = std::make_unique<Data>(id); + auto data = std::make_unique<RawData>(); + data->assign_file_id(); std::string path = task->get_path(map.filename()); std::string data_path = data->get_filename(task->worker); - llog->debug("Storing file '{}'' as data id={}", map.filename(), data->get_id()); + llog->debug("Storing file '{}'' as index={}", map.filename(), i); //data->create(task->worker, 10); if (unlikely(rename(path.c_str(), data_path.c_str()))) {