diff --git a/src/client/client.py b/src/client/client.py index 779f688e648f0a6c7d55e0e35b559d372d3f9816..af2960060a3953091c231213ef2b844072effa8f 100644 --- a/src/client/client.py +++ b/src/client/client.py @@ -9,16 +9,22 @@ LOOM_PROTOCOL_VERSION = 1 class Client(object): - def __init__(self, address, port): + def __init__(self, address, port, info=False): self.server_address = address self.server_port = port s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((address, port)) self.connection = Connection(s) + if info: + self.info = [] + else: + self.info = None + msg = Register() msg.type = Register.REGISTER_CLIENT msg.protocol_version = LOOM_PROTOCOL_VERSION + msg.info = info self._send_message(msg) def submit(self, plan, results): @@ -42,14 +48,21 @@ class Client(object): msg = self.connection.receive_message() cmsg = ClientMessage() cmsg.ParseFromString(msg) - assert cmsg.type == ClientMessage.DATA - prologue = cmsg.data - data[prologue.id] = self._receive_data() + if cmsg.type == ClientMessage.DATA: + prologue = cmsg.data + data[prologue.id] = self._receive_data() + else: + assert cmsg.type == ClientMessage.INFO + self.add_info(cmsg.info) + if single_result: return data[results.id] else: return [data[task.id] for task in results] + def add_info(self, info): + self.info.append((info.id, info.worker)) + def _receive_data(self): msg_data = Data() msg_data.ParseFromString(self.connection.receive_message()) diff --git a/src/client/loomcomm_pb2.py b/src/client/loomcomm_pb2.py index a16de84b590b2ecb71b3d082f13714ed67aa5ace..dacde2265c35e6100ae6e2f589847014275e732d 100644 --- a/src/client/loomcomm_pb2.py +++ b/src/client/loomcomm_pb2.py @@ -18,7 +18,7 @@ _sym_db = _symbol_database.Default() DESCRIPTOR = _descriptor.FileDescriptor( name='loomcomm.proto', package='loomcomm', - serialized_pb=_b('\n\x0eloomcomm.proto\x12\x08loomcomm\"\x9f\x01\n\x08Register\x12\x18\n\x10protocol_version\x18\x01 \x02(\x05\x12%\n\x04type\x18\x02 \x02(\x0e\x32\x17.loomcomm.Register.Type\x12\x0c\n\x04port\x18\x03 \x01(\x05\x12\x12\n\ntask_types\x18\x04 \x03(\t\"0\n\x04Type\x12\x13\n\x0fREGISTER_WORKER\x10\x01\x12\x13\n\x0fREGISTER_CLIENT\x10\x02\"&\n\rServerMessage\"\x15\n\x04Type\x12\r\n\tSTART_JOB\x10\x01\"\xc4\x01\n\rWorkerCommand\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.WorkerCommand.Type\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x11\n\ttask_type\x18\x03 \x01(\x05\x12\x13\n\x0btask_config\x18\x04 \x01(\t\x12\x13\n\x0btask_inputs\x18\x05 \x03(\x05\x12\x0f\n\x07\x61\x64\x64ress\x18\n \x01(\t\x12\x11\n\twith_size\x18\x0b \x01(\x08\"\x1a\n\x04Type\x12\x08\n\x04TASK\x10\x01\x12\x08\n\x04SEND\x10\x02\"\x1c\n\x0eWorkerResponse\x12\n\n\x02id\x18\x02 \x01(\x05\"\x18\n\x08\x41nnounce\x12\x0c\n\x04port\x18\x01 \x02(\x05\"-\n\x0c\x44\x61taPrologue\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x11\n\tdata_size\x18\x03 \x01(\x04\"%\n\x04\x44\x61ta\x12\x0f\n\x07type_id\x18\x01 \x02(\x05\x12\x0c\n\x04size\x18\x02 \x01(\x04\"&\n\x08\x46\x65\x65\x64\x62\x61\x63k\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x0e\n\x06worker\x18\x02 \x02(\t\"\xa7\x01\n\rClientMessage\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.ClientMessage.Type\x12$\n\x04\x64\x61ta\x18\x02 \x01(\x0b\x32\x16.loomcomm.DataPrologue\x12$\n\x08\x66\x65\x65\x64\x62\x61\x63k\x18\x03 \x01(\x0b\x32\x12.loomcomm.Feedback\"\x1e\n\x04Type\x12\x08\n\x04\x44\x41TA\x10\x01\x12\x0c\n\x08\x46\x45\x45\x44\x42\x41\x43K\x10\x02\x42\x02H\x03') + serialized_pb=_b('\n\x0eloomcomm.proto\x12\x08loomcomm\"\xbb\x01\n\x08Register\x12\x18\n\x10protocol_version\x18\x01 \x02(\x05\x12%\n\x04type\x18\x02 \x02(\x0e\x32\x17.loomcomm.Register.Type\x12\x0c\n\x04port\x18\x03 \x01(\x05\x12\x12\n\ntask_types\x18\x04 \x03(\t\x12\x0c\n\x04\x63pus\x18\x05 \x01(\x05\x12\x0c\n\x04info\x18\n \x01(\x08\"0\n\x04Type\x12\x13\n\x0fREGISTER_WORKER\x10\x01\x12\x13\n\x0fREGISTER_CLIENT\x10\x02\"&\n\rServerMessage\"\x15\n\x04Type\x12\r\n\tSTART_JOB\x10\x01\"\xc4\x01\n\rWorkerCommand\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.WorkerCommand.Type\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x11\n\ttask_type\x18\x03 \x01(\x05\x12\x13\n\x0btask_config\x18\x04 \x01(\t\x12\x13\n\x0btask_inputs\x18\x05 \x03(\x05\x12\x0f\n\x07\x61\x64\x64ress\x18\n \x01(\t\x12\x11\n\twith_size\x18\x0b \x01(\x08\"\x1a\n\x04Type\x12\x08\n\x04TASK\x10\x01\x12\x08\n\x04SEND\x10\x02\"\x1c\n\x0eWorkerResponse\x12\n\n\x02id\x18\x02 \x01(\x05\"\x18\n\x08\x41nnounce\x12\x0c\n\x04port\x18\x01 \x02(\x05\"-\n\x0c\x44\x61taPrologue\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x11\n\tdata_size\x18\x03 \x01(\x04\"%\n\x04\x44\x61ta\x12\x0f\n\x07type_id\x18\x01 \x02(\x05\x12\x0c\n\x04size\x18\x02 \x01(\x04\"\"\n\x04Info\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x0e\n\x06worker\x18\x02 \x02(\t\"\x9b\x01\n\rClientMessage\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.ClientMessage.Type\x12$\n\x04\x64\x61ta\x18\x02 \x01(\x0b\x32\x16.loomcomm.DataPrologue\x12\x1c\n\x04info\x18\x03 \x01(\x0b\x32\x0e.loomcomm.Info\"\x1a\n\x04Type\x12\x08\n\x04\x44\x41TA\x10\x01\x12\x08\n\x04INFO\x10\x02\x42\x02H\x03') ) _sym_db.RegisterFileDescriptor(DESCRIPTOR) @@ -41,8 +41,8 @@ _REGISTER_TYPE = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=140, - serialized_end=188, + serialized_start=168, + serialized_end=216, ) _sym_db.RegisterEnumDescriptor(_REGISTER_TYPE) @@ -59,8 +59,8 @@ _SERVERMESSAGE_TYPE = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=207, - serialized_end=228, + serialized_start=235, + serialized_end=256, ) _sym_db.RegisterEnumDescriptor(_SERVERMESSAGE_TYPE) @@ -81,8 +81,8 @@ _WORKERCOMMAND_TYPE = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=401, - serialized_end=427, + serialized_start=429, + serialized_end=455, ) _sym_db.RegisterEnumDescriptor(_WORKERCOMMAND_TYPE) @@ -97,14 +97,14 @@ _CLIENTMESSAGE_TYPE = _descriptor.EnumDescriptor( options=None, type=None), _descriptor.EnumValueDescriptor( - name='FEEDBACK', index=1, number=2, + name='INFO', index=1, number=2, options=None, type=None), ], containing_type=None, options=None, - serialized_start=749, - serialized_end=779, + serialized_start=765, + serialized_end=791, ) _sym_db.RegisterEnumDescriptor(_CLIENTMESSAGE_TYPE) @@ -144,6 +144,20 @@ _REGISTER = _descriptor.Descriptor( message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None), + _descriptor.FieldDescriptor( + name='cpus', full_name='loomcomm.Register.cpus', index=4, + number=5, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='info', full_name='loomcomm.Register.info', index=5, + number=10, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), ], extensions=[ ], @@ -157,7 +171,7 @@ _REGISTER = _descriptor.Descriptor( oneofs=[ ], serialized_start=29, - serialized_end=188, + serialized_end=216, ) @@ -180,8 +194,8 @@ _SERVERMESSAGE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=190, - serialized_end=228, + serialized_start=218, + serialized_end=256, ) @@ -253,8 +267,8 @@ _WORKERCOMMAND = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=231, - serialized_end=427, + serialized_start=259, + serialized_end=455, ) @@ -283,8 +297,8 @@ _WORKERRESPONSE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=429, - serialized_end=457, + serialized_start=457, + serialized_end=485, ) @@ -313,8 +327,8 @@ _ANNOUNCE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=459, - serialized_end=483, + serialized_start=487, + serialized_end=511, ) @@ -350,8 +364,8 @@ _DATAPROLOGUE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=485, - serialized_end=530, + serialized_start=513, + serialized_end=558, ) @@ -387,27 +401,27 @@ _DATA = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=532, - serialized_end=569, + serialized_start=560, + serialized_end=597, ) -_FEEDBACK = _descriptor.Descriptor( - name='Feedback', - full_name='loomcomm.Feedback', +_INFO = _descriptor.Descriptor( + name='Info', + full_name='loomcomm.Info', filename=None, file=DESCRIPTOR, containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='id', full_name='loomcomm.Feedback.id', index=0, + name='id', full_name='loomcomm.Info.id', index=0, number=1, type=5, cpp_type=1, label=2, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None), _descriptor.FieldDescriptor( - name='worker', full_name='loomcomm.Feedback.worker', index=1, + name='worker', full_name='loomcomm.Info.worker', index=1, number=2, type=9, cpp_type=9, label=2, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, @@ -424,8 +438,8 @@ _FEEDBACK = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=571, - serialized_end=609, + serialized_start=599, + serialized_end=633, ) @@ -451,7 +465,7 @@ _CLIENTMESSAGE = _descriptor.Descriptor( is_extension=False, extension_scope=None, options=None), _descriptor.FieldDescriptor( - name='feedback', full_name='loomcomm.ClientMessage.feedback', index=2, + name='info', full_name='loomcomm.ClientMessage.info', index=2, number=3, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, @@ -469,8 +483,8 @@ _CLIENTMESSAGE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=612, - serialized_end=779, + serialized_start=636, + serialized_end=791, ) _REGISTER.fields_by_name['type'].enum_type = _REGISTER_TYPE @@ -480,7 +494,7 @@ _WORKERCOMMAND.fields_by_name['type'].enum_type = _WORKERCOMMAND_TYPE _WORKERCOMMAND_TYPE.containing_type = _WORKERCOMMAND _CLIENTMESSAGE.fields_by_name['type'].enum_type = _CLIENTMESSAGE_TYPE _CLIENTMESSAGE.fields_by_name['data'].message_type = _DATAPROLOGUE -_CLIENTMESSAGE.fields_by_name['feedback'].message_type = _FEEDBACK +_CLIENTMESSAGE.fields_by_name['info'].message_type = _INFO _CLIENTMESSAGE_TYPE.containing_type = _CLIENTMESSAGE DESCRIPTOR.message_types_by_name['Register'] = _REGISTER DESCRIPTOR.message_types_by_name['ServerMessage'] = _SERVERMESSAGE @@ -489,7 +503,7 @@ DESCRIPTOR.message_types_by_name['WorkerResponse'] = _WORKERRESPONSE DESCRIPTOR.message_types_by_name['Announce'] = _ANNOUNCE DESCRIPTOR.message_types_by_name['DataPrologue'] = _DATAPROLOGUE DESCRIPTOR.message_types_by_name['Data'] = _DATA -DESCRIPTOR.message_types_by_name['Feedback'] = _FEEDBACK +DESCRIPTOR.message_types_by_name['Info'] = _INFO DESCRIPTOR.message_types_by_name['ClientMessage'] = _CLIENTMESSAGE Register = _reflection.GeneratedProtocolMessageType('Register', (_message.Message,), dict( @@ -541,12 +555,12 @@ Data = _reflection.GeneratedProtocolMessageType('Data', (_message.Message,), dic )) _sym_db.RegisterMessage(Data) -Feedback = _reflection.GeneratedProtocolMessageType('Feedback', (_message.Message,), dict( - DESCRIPTOR = _FEEDBACK, +Info = _reflection.GeneratedProtocolMessageType('Info', (_message.Message,), dict( + DESCRIPTOR = _INFO, __module__ = 'loomcomm_pb2' - # @@protoc_insertion_point(class_scope:loomcomm.Feedback) + # @@protoc_insertion_point(class_scope:loomcomm.Info) )) -_sym_db.RegisterMessage(Feedback) +_sym_db.RegisterMessage(Info) ClientMessage = _reflection.GeneratedProtocolMessageType('ClientMessage', (_message.Message,), dict( DESCRIPTOR = _CLIENTMESSAGE, diff --git a/src/client/plan.py b/src/client/plan.py index a2b74aa072d64080b2ae8c3e86094971207867fd..f356d1619c32e6a0caacb36be82455f98ddec8c0 100644 --- a/src/client/plan.py +++ b/src/client/plan.py @@ -176,10 +176,21 @@ class Plan(object): task.set_message(t, task_types) return msg - def write_dot(self, filename): + def write_dot(self, filename, info=None): + colors = ["red", "green", "blue", "orange", "violet"] + if info: + w = sorted(set(worker for id, worker in info)) + workers = {} + for id, worker in info: + workers[id] = w.index(worker) + del w + else: + workers = None graph = gv.Graph() for task in self.tasks: node = graph.node(task.id) + if workers: + node.color = colors[workers[task.id] % len(colors)] node.label = "{}\n{}".format(str(task.id), task.task_type) for inp in task.inputs: graph.node(inp.id).add_arc(node) diff --git a/src/libloom/connection.cpp b/src/libloom/connection.cpp index 03d737c3108cca7a1a3e9499f30773279595efc9..1fa583f5f9a6f886d68edf1cd41fe1e328122de2 100644 --- a/src/libloom/connection.cpp +++ b/src/libloom/connection.cpp @@ -51,9 +51,10 @@ void Connection::close_and_discard_remaining_data() } void Connection::accept(uv_tcp_t *listen_socket) -{ +{ UV_CHECK(uv_accept((uv_stream_t*) listen_socket, (uv_stream_t*) &socket)); uv_read_start((uv_stream_t *)&socket, _buf_alloc, _on_read); + state = ConnectionOpen; } void Connection::start_read() diff --git a/src/libloom/interconnect.cpp b/src/libloom/interconnect.cpp index 0a6bd694951eb0e5b3f4390969a333c81e8e80f4..e5fa6dbc95439ab5de8652395be0f16fb230104c 100644 --- a/src/libloom/interconnect.cpp +++ b/src/libloom/interconnect.cpp @@ -126,11 +126,6 @@ void InterConnection::send(Id id, std::shared_ptr<Data> &data, bool with_size) } } -void InterConnection::send(std::unique_ptr<SendBuffer> buffer) -{ - -} - std::string InterConnection::make_address(const std::string &host, int port) { std::stringstream s; diff --git a/src/libloom/interconnect.h b/src/libloom/interconnect.h index 5fec9acd1f8dd4bad2adb183ce93bdc42bd4f2a6..4060bc139f483e9f0441cee358d2a11d5cd129b7 100644 --- a/src/libloom/interconnect.h +++ b/src/libloom/interconnect.h @@ -19,8 +19,7 @@ public: InterConnection(Worker &worker); ~InterConnection(); - void send(Id id, std::shared_ptr<Data> &data, bool with_size); - void send(std::unique_ptr<SendBuffer> buffer); + void send(Id id, std::shared_ptr<Data> &data, bool with_size); void accept(uv_tcp_t *listen_socket) { connection.accept(listen_socket); } diff --git a/src/libloom/loomcomm.pb.cc b/src/libloom/loomcomm.pb.cc index 24016c3958f6e36406c2ee497e39d1c9f4c92d84..e33345633d3844cd1b765b3f0d3cdf3a1c4fd483 100644 --- a/src/libloom/loomcomm.pb.cc +++ b/src/libloom/loomcomm.pb.cc @@ -23,7 +23,7 @@ void protobuf_ShutdownFile_loomcomm_2eproto() { delete Announce::default_instance_; delete DataPrologue::default_instance_; delete Data::default_instance_; - delete Feedback::default_instance_; + delete Info::default_instance_; delete ClientMessage::default_instance_; } @@ -46,7 +46,7 @@ void protobuf_AddDesc_loomcomm_2eproto() { Announce::default_instance_ = new Announce(); DataPrologue::default_instance_ = new DataPrologue(); Data::default_instance_ = new Data(); - Feedback::default_instance_ = new Feedback(); + Info::default_instance_ = new Info(); ClientMessage::default_instance_ = new ClientMessage(); Register::default_instance_->InitAsDefaultInstance(); ServerMessage::default_instance_->InitAsDefaultInstance(); @@ -55,7 +55,7 @@ void protobuf_AddDesc_loomcomm_2eproto() { Announce::default_instance_->InitAsDefaultInstance(); DataPrologue::default_instance_->InitAsDefaultInstance(); Data::default_instance_->InitAsDefaultInstance(); - Feedback::default_instance_->InitAsDefaultInstance(); + Info::default_instance_->InitAsDefaultInstance(); ClientMessage::default_instance_->InitAsDefaultInstance(); ::google::protobuf::internal::OnShutdown(&protobuf_ShutdownFile_loomcomm_2eproto); } @@ -99,6 +99,8 @@ const int Register::kProtocolVersionFieldNumber; const int Register::kTypeFieldNumber; const int Register::kPortFieldNumber; const int Register::kTaskTypesFieldNumber; +const int Register::kCpusFieldNumber; +const int Register::kInfoFieldNumber; #endif // !_MSC_VER Register::Register() @@ -123,6 +125,8 @@ void Register::SharedCtor() { protocol_version_ = 0; type_ = 1; port_ = 0; + cpus_ = 0; + info_ = false; ::memset(_has_bits_, 0, sizeof(_has_bits_)); } @@ -161,11 +165,25 @@ Register* Register::New() const { } void Register::Clear() { - if (_has_bits_[0 / 32] & 7) { +#define OFFSET_OF_FIELD_(f) (reinterpret_cast<char*>( \ + &reinterpret_cast<Register*>(16)->f) - \ + reinterpret_cast<char*>(16)) + +#define ZR_(first, last) do { \ + size_t f = OFFSET_OF_FIELD_(first); \ + size_t n = OFFSET_OF_FIELD_(last) - f + sizeof(last); \ + ::memset(&first, 0, n); \ + } while (0) + + if (_has_bits_[0 / 32] & 55) { + ZR_(port_, info_); protocol_version_ = 0; type_ = 1; - port_ = 0; } + +#undef OFFSET_OF_FIELD_ +#undef ZR_ + task_types_.Clear(); ::memset(_has_bits_, 0, sizeof(_has_bits_)); mutable_unknown_fields()->clear(); @@ -245,6 +263,36 @@ bool Register::MergePartialFromCodedStream( goto handle_unusual; } if (input->ExpectTag(34)) goto parse_task_types; + if (input->ExpectTag(40)) goto parse_cpus; + break; + } + + // optional int32 cpus = 5; + case 5: { + if (tag == 40) { + parse_cpus: + DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive< + ::google::protobuf::int32, ::google::protobuf::internal::WireFormatLite::TYPE_INT32>( + input, &cpus_))); + set_has_cpus(); + } else { + goto handle_unusual; + } + if (input->ExpectTag(80)) goto parse_info; + break; + } + + // optional bool info = 10; + case 10: { + if (tag == 80) { + parse_info: + DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive< + bool, ::google::protobuf::internal::WireFormatLite::TYPE_BOOL>( + input, &info_))); + set_has_info(); + } else { + goto handle_unusual; + } if (input->ExpectAtEnd()) goto success; break; } @@ -296,6 +344,16 @@ void Register::SerializeWithCachedSizes( 4, this->task_types(i), output); } + // optional int32 cpus = 5; + if (has_cpus()) { + ::google::protobuf::internal::WireFormatLite::WriteInt32(5, this->cpus(), output); + } + + // optional bool info = 10; + if (has_info()) { + ::google::protobuf::internal::WireFormatLite::WriteBool(10, this->info(), output); + } + output->WriteRaw(unknown_fields().data(), unknown_fields().size()); // @@protoc_insertion_point(serialize_end:loomcomm.Register) @@ -325,6 +383,18 @@ int Register::ByteSize() const { this->port()); } + // optional int32 cpus = 5; + if (has_cpus()) { + total_size += 1 + + ::google::protobuf::internal::WireFormatLite::Int32Size( + this->cpus()); + } + + // optional bool info = 10; + if (has_info()) { + total_size += 1 + 1; + } + } // repeated string task_types = 4; total_size += 1 * this->task_types_size(); @@ -359,6 +429,12 @@ void Register::MergeFrom(const Register& from) { if (from.has_port()) { set_port(from.port()); } + if (from.has_cpus()) { + set_cpus(from.cpus()); + } + if (from.has_info()) { + set_info(from.info()); + } } mutable_unknown_fields()->append(from.unknown_fields()); } @@ -381,6 +457,8 @@ void Register::Swap(Register* other) { std::swap(type_, other->type_); std::swap(port_, other->port_); task_types_.Swap(&other->task_types_); + std::swap(cpus_, other->cpus_); + std::swap(info_, other->info_); std::swap(_has_bits_[0], other->_has_bits_[0]); _unknown_fields_.swap(other->_unknown_fields_); std::swap(_cached_size_, other->_cached_size_); @@ -1869,27 +1947,27 @@ void Data::Swap(Data* other) { // =================================================================== #ifndef _MSC_VER -const int Feedback::kIdFieldNumber; -const int Feedback::kWorkerFieldNumber; +const int Info::kIdFieldNumber; +const int Info::kWorkerFieldNumber; #endif // !_MSC_VER -Feedback::Feedback() +Info::Info() : ::google::protobuf::MessageLite() { SharedCtor(); - // @@protoc_insertion_point(constructor:loomcomm.Feedback) + // @@protoc_insertion_point(constructor:loomcomm.Info) } -void Feedback::InitAsDefaultInstance() { +void Info::InitAsDefaultInstance() { } -Feedback::Feedback(const Feedback& from) +Info::Info(const Info& from) : ::google::protobuf::MessageLite() { SharedCtor(); MergeFrom(from); - // @@protoc_insertion_point(copy_constructor:loomcomm.Feedback) + // @@protoc_insertion_point(copy_constructor:loomcomm.Info) } -void Feedback::SharedCtor() { +void Info::SharedCtor() { ::google::protobuf::internal::GetEmptyString(); _cached_size_ = 0; id_ = 0; @@ -1897,12 +1975,12 @@ void Feedback::SharedCtor() { ::memset(_has_bits_, 0, sizeof(_has_bits_)); } -Feedback::~Feedback() { - // @@protoc_insertion_point(destructor:loomcomm.Feedback) +Info::~Info() { + // @@protoc_insertion_point(destructor:loomcomm.Info) SharedDtor(); } -void Feedback::SharedDtor() { +void Info::SharedDtor() { if (worker_ != &::google::protobuf::internal::GetEmptyStringAlreadyInited()) { delete worker_; } @@ -1914,12 +1992,12 @@ void Feedback::SharedDtor() { } } -void Feedback::SetCachedSize(int size) const { +void Info::SetCachedSize(int size) const { GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN(); _cached_size_ = size; GOOGLE_SAFE_CONCURRENT_WRITES_END(); } -const Feedback& Feedback::default_instance() { +const Info& Info::default_instance() { #ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER protobuf_AddDesc_loomcomm_2eproto(); #else @@ -1928,13 +2006,13 @@ const Feedback& Feedback::default_instance() { return *default_instance_; } -Feedback* Feedback::default_instance_ = NULL; +Info* Info::default_instance_ = NULL; -Feedback* Feedback::New() const { - return new Feedback; +Info* Info::New() const { + return new Info; } -void Feedback::Clear() { +void Info::Clear() { if (_has_bits_[0 / 32] & 3) { id_ = 0; if (has_worker()) { @@ -1947,7 +2025,7 @@ void Feedback::Clear() { mutable_unknown_fields()->clear(); } -bool Feedback::MergePartialFromCodedStream( +bool Info::MergePartialFromCodedStream( ::google::protobuf::io::CodedInputStream* input) { #define DO_(EXPRESSION) if (!(EXPRESSION)) goto failure ::google::protobuf::uint32 tag; @@ -1955,7 +2033,7 @@ bool Feedback::MergePartialFromCodedStream( mutable_unknown_fields()); ::google::protobuf::io::CodedOutputStream unknown_fields_stream( &unknown_fields_string); - // @@protoc_insertion_point(parse_start:loomcomm.Feedback) + // @@protoc_insertion_point(parse_start:loomcomm.Info) for (;;) { ::std::pair< ::google::protobuf::uint32, bool> p = input->ReadTagWithCutoff(127); tag = p.first; @@ -2002,17 +2080,17 @@ bool Feedback::MergePartialFromCodedStream( } } success: - // @@protoc_insertion_point(parse_success:loomcomm.Feedback) + // @@protoc_insertion_point(parse_success:loomcomm.Info) return true; failure: - // @@protoc_insertion_point(parse_failure:loomcomm.Feedback) + // @@protoc_insertion_point(parse_failure:loomcomm.Info) return false; #undef DO_ } -void Feedback::SerializeWithCachedSizes( +void Info::SerializeWithCachedSizes( ::google::protobuf::io::CodedOutputStream* output) const { - // @@protoc_insertion_point(serialize_start:loomcomm.Feedback) + // @@protoc_insertion_point(serialize_start:loomcomm.Info) // required int32 id = 1; if (has_id()) { ::google::protobuf::internal::WireFormatLite::WriteInt32(1, this->id(), output); @@ -2026,10 +2104,10 @@ void Feedback::SerializeWithCachedSizes( output->WriteRaw(unknown_fields().data(), unknown_fields().size()); - // @@protoc_insertion_point(serialize_end:loomcomm.Feedback) + // @@protoc_insertion_point(serialize_end:loomcomm.Info) } -int Feedback::ByteSize() const { +int Info::ByteSize() const { int total_size = 0; if (_has_bits_[0 / 32] & (0xffu << (0 % 32))) { @@ -2056,12 +2134,12 @@ int Feedback::ByteSize() const { return total_size; } -void Feedback::CheckTypeAndMergeFrom( +void Info::CheckTypeAndMergeFrom( const ::google::protobuf::MessageLite& from) { - MergeFrom(*::google::protobuf::down_cast<const Feedback*>(&from)); + MergeFrom(*::google::protobuf::down_cast<const Info*>(&from)); } -void Feedback::MergeFrom(const Feedback& from) { +void Info::MergeFrom(const Info& from) { GOOGLE_CHECK_NE(&from, this); if (from._has_bits_[0 / 32] & (0xffu << (0 % 32))) { if (from.has_id()) { @@ -2074,19 +2152,19 @@ void Feedback::MergeFrom(const Feedback& from) { mutable_unknown_fields()->append(from.unknown_fields()); } -void Feedback::CopyFrom(const Feedback& from) { +void Info::CopyFrom(const Info& from) { if (&from == this) return; Clear(); MergeFrom(from); } -bool Feedback::IsInitialized() const { +bool Info::IsInitialized() const { if ((_has_bits_[0] & 0x00000003) != 0x00000003) return false; return true; } -void Feedback::Swap(Feedback* other) { +void Info::Swap(Info* other) { if (other != this) { std::swap(id_, other->id_); std::swap(worker_, other->worker_); @@ -2096,8 +2174,8 @@ void Feedback::Swap(Feedback* other) { } } -::std::string Feedback::GetTypeName() const { - return "loomcomm.Feedback"; +::std::string Info::GetTypeName() const { + return "loomcomm.Info"; } @@ -2115,7 +2193,7 @@ bool ClientMessage_Type_IsValid(int value) { #ifndef _MSC_VER const ClientMessage_Type ClientMessage::DATA; -const ClientMessage_Type ClientMessage::FEEDBACK; +const ClientMessage_Type ClientMessage::INFO; const ClientMessage_Type ClientMessage::Type_MIN; const ClientMessage_Type ClientMessage::Type_MAX; const int ClientMessage::Type_ARRAYSIZE; @@ -2123,7 +2201,7 @@ const int ClientMessage::Type_ARRAYSIZE; #ifndef _MSC_VER const int ClientMessage::kTypeFieldNumber; const int ClientMessage::kDataFieldNumber; -const int ClientMessage::kFeedbackFieldNumber; +const int ClientMessage::kInfoFieldNumber; #endif // !_MSC_VER ClientMessage::ClientMessage() @@ -2140,10 +2218,10 @@ void ClientMessage::InitAsDefaultInstance() { data_ = const_cast< ::loomcomm::DataPrologue*>(&::loomcomm::DataPrologue::default_instance()); #endif #ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER - feedback_ = const_cast< ::loomcomm::Feedback*>( - ::loomcomm::Feedback::internal_default_instance()); + info_ = const_cast< ::loomcomm::Info*>( + ::loomcomm::Info::internal_default_instance()); #else - feedback_ = const_cast< ::loomcomm::Feedback*>(&::loomcomm::Feedback::default_instance()); + info_ = const_cast< ::loomcomm::Info*>(&::loomcomm::Info::default_instance()); #endif } @@ -2158,7 +2236,7 @@ void ClientMessage::SharedCtor() { _cached_size_ = 0; type_ = 1; data_ = NULL; - feedback_ = NULL; + info_ = NULL; ::memset(_has_bits_, 0, sizeof(_has_bits_)); } @@ -2174,7 +2252,7 @@ void ClientMessage::SharedDtor() { if (this != default_instance_) { #endif delete data_; - delete feedback_; + delete info_; } } @@ -2204,8 +2282,8 @@ void ClientMessage::Clear() { if (has_data()) { if (data_ != NULL) data_->::loomcomm::DataPrologue::Clear(); } - if (has_feedback()) { - if (feedback_ != NULL) feedback_->::loomcomm::Feedback::Clear(); + if (has_info()) { + if (info_ != NULL) info_->::loomcomm::Info::Clear(); } } ::memset(_has_bits_, 0, sizeof(_has_bits_)); @@ -2255,16 +2333,16 @@ bool ClientMessage::MergePartialFromCodedStream( } else { goto handle_unusual; } - if (input->ExpectTag(26)) goto parse_feedback; + if (input->ExpectTag(26)) goto parse_info; break; } - // optional .loomcomm.Feedback feedback = 3; + // optional .loomcomm.Info info = 3; case 3: { if (tag == 26) { - parse_feedback: + parse_info: DO_(::google::protobuf::internal::WireFormatLite::ReadMessageNoVirtual( - input, mutable_feedback())); + input, mutable_info())); } else { goto handle_unusual; } @@ -2309,10 +2387,10 @@ void ClientMessage::SerializeWithCachedSizes( 2, this->data(), output); } - // optional .loomcomm.Feedback feedback = 3; - if (has_feedback()) { + // optional .loomcomm.Info info = 3; + if (has_info()) { ::google::protobuf::internal::WireFormatLite::WriteMessage( - 3, this->feedback(), output); + 3, this->info(), output); } output->WriteRaw(unknown_fields().data(), @@ -2337,11 +2415,11 @@ int ClientMessage::ByteSize() const { this->data()); } - // optional .loomcomm.Feedback feedback = 3; - if (has_feedback()) { + // optional .loomcomm.Info info = 3; + if (has_info()) { total_size += 1 + ::google::protobuf::internal::WireFormatLite::MessageSizeNoVirtual( - this->feedback()); + this->info()); } } @@ -2367,8 +2445,8 @@ void ClientMessage::MergeFrom(const ClientMessage& from) { if (from.has_data()) { mutable_data()->::loomcomm::DataPrologue::MergeFrom(from.data()); } - if (from.has_feedback()) { - mutable_feedback()->::loomcomm::Feedback::MergeFrom(from.feedback()); + if (from.has_info()) { + mutable_info()->::loomcomm::Info::MergeFrom(from.info()); } } mutable_unknown_fields()->append(from.unknown_fields()); @@ -2386,8 +2464,8 @@ bool ClientMessage::IsInitialized() const { if (has_data()) { if (!this->data().IsInitialized()) return false; } - if (has_feedback()) { - if (!this->feedback().IsInitialized()) return false; + if (has_info()) { + if (!this->info().IsInitialized()) return false; } return true; } @@ -2396,7 +2474,7 @@ void ClientMessage::Swap(ClientMessage* other) { if (other != this) { std::swap(type_, other->type_); std::swap(data_, other->data_); - std::swap(feedback_, other->feedback_); + std::swap(info_, other->info_); std::swap(_has_bits_[0], other->_has_bits_[0]); _unknown_fields_.swap(other->_unknown_fields_); std::swap(_cached_size_, other->_cached_size_); diff --git a/src/libloom/loomcomm.pb.h b/src/libloom/loomcomm.pb.h index 0f42bb72120c25bd6956a08089fdb10ea8725cb6..8e44f77916c21aff5d777c8e086a9b16f2c4abeb 100644 --- a/src/libloom/loomcomm.pb.h +++ b/src/libloom/loomcomm.pb.h @@ -39,7 +39,7 @@ class WorkerResponse; class Announce; class DataPrologue; class Data; -class Feedback; +class Info; class ClientMessage; enum Register_Type { @@ -70,11 +70,11 @@ const int WorkerCommand_Type_Type_ARRAYSIZE = WorkerCommand_Type_Type_MAX + 1; enum ClientMessage_Type { ClientMessage_Type_DATA = 1, - ClientMessage_Type_FEEDBACK = 2 + ClientMessage_Type_INFO = 2 }; bool ClientMessage_Type_IsValid(int value); const ClientMessage_Type ClientMessage_Type_Type_MIN = ClientMessage_Type_DATA; -const ClientMessage_Type ClientMessage_Type_Type_MAX = ClientMessage_Type_FEEDBACK; +const ClientMessage_Type ClientMessage_Type_Type_MAX = ClientMessage_Type_INFO; const int ClientMessage_Type_Type_ARRAYSIZE = ClientMessage_Type_Type_MAX + 1; // =================================================================== @@ -190,6 +190,20 @@ class Register : public ::google::protobuf::MessageLite { inline const ::google::protobuf::RepeatedPtrField< ::std::string>& task_types() const; inline ::google::protobuf::RepeatedPtrField< ::std::string>* mutable_task_types(); + // optional int32 cpus = 5; + inline bool has_cpus() const; + inline void clear_cpus(); + static const int kCpusFieldNumber = 5; + inline ::google::protobuf::int32 cpus() const; + inline void set_cpus(::google::protobuf::int32 value); + + // optional bool info = 10; + inline bool has_info() const; + inline void clear_info(); + static const int kInfoFieldNumber = 10; + inline bool info() const; + inline void set_info(bool value); + // @@protoc_insertion_point(class_scope:loomcomm.Register) private: inline void set_has_protocol_version(); @@ -198,6 +212,10 @@ class Register : public ::google::protobuf::MessageLite { inline void clear_has_type(); inline void set_has_port(); inline void clear_has_port(); + inline void set_has_cpus(); + inline void clear_has_cpus(); + inline void set_has_info(); + inline void clear_has_info(); ::std::string _unknown_fields_; @@ -207,6 +225,8 @@ class Register : public ::google::protobuf::MessageLite { int type_; ::google::protobuf::RepeatedPtrField< ::std::string> task_types_; ::google::protobuf::int32 port_; + ::google::protobuf::int32 cpus_; + bool info_; #ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER friend void protobuf_AddDesc_loomcomm_2eproto_impl(); #else @@ -874,14 +894,14 @@ class Data : public ::google::protobuf::MessageLite { }; // ------------------------------------------------------------------- -class Feedback : public ::google::protobuf::MessageLite { +class Info : public ::google::protobuf::MessageLite { public: - Feedback(); - virtual ~Feedback(); + Info(); + virtual ~Info(); - Feedback(const Feedback& from); + Info(const Info& from); - inline Feedback& operator=(const Feedback& from) { + inline Info& operator=(const Info& from) { CopyFrom(from); return *this; } @@ -894,26 +914,26 @@ class Feedback : public ::google::protobuf::MessageLite { return &_unknown_fields_; } - static const Feedback& default_instance(); + static const Info& default_instance(); #ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER // Returns the internal default instance pointer. This function can // return NULL thus should not be used by the user. This is intended // for Protobuf internal code. Please use default_instance() declared // above instead. - static inline const Feedback* internal_default_instance() { + static inline const Info* internal_default_instance() { return default_instance_; } #endif - void Swap(Feedback* other); + void Swap(Info* other); // implements Message ---------------------------------------------- - Feedback* New() const; + Info* New() const; void CheckTypeAndMergeFrom(const ::google::protobuf::MessageLite& from); - void CopyFrom(const Feedback& from); - void MergeFrom(const Feedback& from); + void CopyFrom(const Info& from); + void MergeFrom(const Info& from); void Clear(); bool IsInitialized() const; @@ -954,7 +974,7 @@ class Feedback : public ::google::protobuf::MessageLite { inline ::std::string* release_worker(); inline void set_allocated_worker(::std::string* worker); - // @@protoc_insertion_point(class_scope:loomcomm.Feedback) + // @@protoc_insertion_point(class_scope:loomcomm.Info) private: inline void set_has_id(); inline void clear_has_id(); @@ -976,7 +996,7 @@ class Feedback : public ::google::protobuf::MessageLite { friend void protobuf_ShutdownFile_loomcomm_2eproto(); void InitAsDefaultInstance(); - static Feedback* default_instance_; + static Info* default_instance_; }; // ------------------------------------------------------------------- @@ -1041,7 +1061,7 @@ class ClientMessage : public ::google::protobuf::MessageLite { typedef ClientMessage_Type Type; static const Type DATA = ClientMessage_Type_DATA; - static const Type FEEDBACK = ClientMessage_Type_FEEDBACK; + static const Type INFO = ClientMessage_Type_INFO; static inline bool Type_IsValid(int value) { return ClientMessage_Type_IsValid(value); } @@ -1070,14 +1090,14 @@ class ClientMessage : public ::google::protobuf::MessageLite { inline ::loomcomm::DataPrologue* release_data(); inline void set_allocated_data(::loomcomm::DataPrologue* data); - // optional .loomcomm.Feedback feedback = 3; - inline bool has_feedback() const; - inline void clear_feedback(); - static const int kFeedbackFieldNumber = 3; - inline const ::loomcomm::Feedback& feedback() const; - inline ::loomcomm::Feedback* mutable_feedback(); - inline ::loomcomm::Feedback* release_feedback(); - inline void set_allocated_feedback(::loomcomm::Feedback* feedback); + // optional .loomcomm.Info info = 3; + inline bool has_info() const; + inline void clear_info(); + static const int kInfoFieldNumber = 3; + inline const ::loomcomm::Info& info() const; + inline ::loomcomm::Info* mutable_info(); + inline ::loomcomm::Info* release_info(); + inline void set_allocated_info(::loomcomm::Info* info); // @@protoc_insertion_point(class_scope:loomcomm.ClientMessage) private: @@ -1085,15 +1105,15 @@ class ClientMessage : public ::google::protobuf::MessageLite { inline void clear_has_type(); inline void set_has_data(); inline void clear_has_data(); - inline void set_has_feedback(); - inline void clear_has_feedback(); + inline void set_has_info(); + inline void clear_has_info(); ::std::string _unknown_fields_; ::google::protobuf::uint32 _has_bits_[1]; mutable int _cached_size_; ::loomcomm::DataPrologue* data_; - ::loomcomm::Feedback* feedback_; + ::loomcomm::Info* info_; int type_; #ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER friend void protobuf_AddDesc_loomcomm_2eproto_impl(); @@ -1240,6 +1260,54 @@ Register::mutable_task_types() { return &task_types_; } +// optional int32 cpus = 5; +inline bool Register::has_cpus() const { + return (_has_bits_[0] & 0x00000010u) != 0; +} +inline void Register::set_has_cpus() { + _has_bits_[0] |= 0x00000010u; +} +inline void Register::clear_has_cpus() { + _has_bits_[0] &= ~0x00000010u; +} +inline void Register::clear_cpus() { + cpus_ = 0; + clear_has_cpus(); +} +inline ::google::protobuf::int32 Register::cpus() const { + // @@protoc_insertion_point(field_get:loomcomm.Register.cpus) + return cpus_; +} +inline void Register::set_cpus(::google::protobuf::int32 value) { + set_has_cpus(); + cpus_ = value; + // @@protoc_insertion_point(field_set:loomcomm.Register.cpus) +} + +// optional bool info = 10; +inline bool Register::has_info() const { + return (_has_bits_[0] & 0x00000020u) != 0; +} +inline void Register::set_has_info() { + _has_bits_[0] |= 0x00000020u; +} +inline void Register::clear_has_info() { + _has_bits_[0] &= ~0x00000020u; +} +inline void Register::clear_info() { + info_ = false; + clear_has_info(); +} +inline bool Register::info() const { + // @@protoc_insertion_point(field_get:loomcomm.Register.info) + return info_; +} +inline void Register::set_info(bool value) { + set_has_info(); + info_ = value; + // @@protoc_insertion_point(field_set:loomcomm.Register.info) +} + // ------------------------------------------------------------------- // ServerMessage @@ -1689,85 +1757,85 @@ inline void Data::set_size(::google::protobuf::uint64 value) { // ------------------------------------------------------------------- -// Feedback +// Info // required int32 id = 1; -inline bool Feedback::has_id() const { +inline bool Info::has_id() const { return (_has_bits_[0] & 0x00000001u) != 0; } -inline void Feedback::set_has_id() { +inline void Info::set_has_id() { _has_bits_[0] |= 0x00000001u; } -inline void Feedback::clear_has_id() { +inline void Info::clear_has_id() { _has_bits_[0] &= ~0x00000001u; } -inline void Feedback::clear_id() { +inline void Info::clear_id() { id_ = 0; clear_has_id(); } -inline ::google::protobuf::int32 Feedback::id() const { - // @@protoc_insertion_point(field_get:loomcomm.Feedback.id) +inline ::google::protobuf::int32 Info::id() const { + // @@protoc_insertion_point(field_get:loomcomm.Info.id) return id_; } -inline void Feedback::set_id(::google::protobuf::int32 value) { +inline void Info::set_id(::google::protobuf::int32 value) { set_has_id(); id_ = value; - // @@protoc_insertion_point(field_set:loomcomm.Feedback.id) + // @@protoc_insertion_point(field_set:loomcomm.Info.id) } // required string worker = 2; -inline bool Feedback::has_worker() const { +inline bool Info::has_worker() const { return (_has_bits_[0] & 0x00000002u) != 0; } -inline void Feedback::set_has_worker() { +inline void Info::set_has_worker() { _has_bits_[0] |= 0x00000002u; } -inline void Feedback::clear_has_worker() { +inline void Info::clear_has_worker() { _has_bits_[0] &= ~0x00000002u; } -inline void Feedback::clear_worker() { +inline void Info::clear_worker() { if (worker_ != &::google::protobuf::internal::GetEmptyStringAlreadyInited()) { worker_->clear(); } clear_has_worker(); } -inline const ::std::string& Feedback::worker() const { - // @@protoc_insertion_point(field_get:loomcomm.Feedback.worker) +inline const ::std::string& Info::worker() const { + // @@protoc_insertion_point(field_get:loomcomm.Info.worker) return *worker_; } -inline void Feedback::set_worker(const ::std::string& value) { +inline void Info::set_worker(const ::std::string& value) { set_has_worker(); if (worker_ == &::google::protobuf::internal::GetEmptyStringAlreadyInited()) { worker_ = new ::std::string; } worker_->assign(value); - // @@protoc_insertion_point(field_set:loomcomm.Feedback.worker) + // @@protoc_insertion_point(field_set:loomcomm.Info.worker) } -inline void Feedback::set_worker(const char* value) { +inline void Info::set_worker(const char* value) { set_has_worker(); if (worker_ == &::google::protobuf::internal::GetEmptyStringAlreadyInited()) { worker_ = new ::std::string; } worker_->assign(value); - // @@protoc_insertion_point(field_set_char:loomcomm.Feedback.worker) + // @@protoc_insertion_point(field_set_char:loomcomm.Info.worker) } -inline void Feedback::set_worker(const char* value, size_t size) { +inline void Info::set_worker(const char* value, size_t size) { set_has_worker(); if (worker_ == &::google::protobuf::internal::GetEmptyStringAlreadyInited()) { worker_ = new ::std::string; } worker_->assign(reinterpret_cast<const char*>(value), size); - // @@protoc_insertion_point(field_set_pointer:loomcomm.Feedback.worker) + // @@protoc_insertion_point(field_set_pointer:loomcomm.Info.worker) } -inline ::std::string* Feedback::mutable_worker() { +inline ::std::string* Info::mutable_worker() { set_has_worker(); if (worker_ == &::google::protobuf::internal::GetEmptyStringAlreadyInited()) { worker_ = new ::std::string; } - // @@protoc_insertion_point(field_mutable:loomcomm.Feedback.worker) + // @@protoc_insertion_point(field_mutable:loomcomm.Info.worker) return worker_; } -inline ::std::string* Feedback::release_worker() { +inline ::std::string* Info::release_worker() { clear_has_worker(); if (worker_ == &::google::protobuf::internal::GetEmptyStringAlreadyInited()) { return NULL; @@ -1777,7 +1845,7 @@ inline ::std::string* Feedback::release_worker() { return temp; } } -inline void Feedback::set_allocated_worker(::std::string* worker) { +inline void Info::set_allocated_worker(::std::string* worker) { if (worker_ != &::google::protobuf::internal::GetEmptyStringAlreadyInited()) { delete worker_; } @@ -1788,7 +1856,7 @@ inline void Feedback::set_allocated_worker(::std::string* worker) { clear_has_worker(); worker_ = const_cast< ::std::string*>(&::google::protobuf::internal::GetEmptyStringAlreadyInited()); } - // @@protoc_insertion_point(field_set_allocated:loomcomm.Feedback.worker) + // @@protoc_insertion_point(field_set_allocated:loomcomm.Info.worker) } // ------------------------------------------------------------------- @@ -1865,49 +1933,49 @@ inline void ClientMessage::set_allocated_data(::loomcomm::DataPrologue* data) { // @@protoc_insertion_point(field_set_allocated:loomcomm.ClientMessage.data) } -// optional .loomcomm.Feedback feedback = 3; -inline bool ClientMessage::has_feedback() const { +// optional .loomcomm.Info info = 3; +inline bool ClientMessage::has_info() const { return (_has_bits_[0] & 0x00000004u) != 0; } -inline void ClientMessage::set_has_feedback() { +inline void ClientMessage::set_has_info() { _has_bits_[0] |= 0x00000004u; } -inline void ClientMessage::clear_has_feedback() { +inline void ClientMessage::clear_has_info() { _has_bits_[0] &= ~0x00000004u; } -inline void ClientMessage::clear_feedback() { - if (feedback_ != NULL) feedback_->::loomcomm::Feedback::Clear(); - clear_has_feedback(); +inline void ClientMessage::clear_info() { + if (info_ != NULL) info_->::loomcomm::Info::Clear(); + clear_has_info(); } -inline const ::loomcomm::Feedback& ClientMessage::feedback() const { - // @@protoc_insertion_point(field_get:loomcomm.ClientMessage.feedback) +inline const ::loomcomm::Info& ClientMessage::info() const { + // @@protoc_insertion_point(field_get:loomcomm.ClientMessage.info) #ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER - return feedback_ != NULL ? *feedback_ : *default_instance().feedback_; + return info_ != NULL ? *info_ : *default_instance().info_; #else - return feedback_ != NULL ? *feedback_ : *default_instance_->feedback_; + return info_ != NULL ? *info_ : *default_instance_->info_; #endif } -inline ::loomcomm::Feedback* ClientMessage::mutable_feedback() { - set_has_feedback(); - if (feedback_ == NULL) feedback_ = new ::loomcomm::Feedback; - // @@protoc_insertion_point(field_mutable:loomcomm.ClientMessage.feedback) - return feedback_; +inline ::loomcomm::Info* ClientMessage::mutable_info() { + set_has_info(); + if (info_ == NULL) info_ = new ::loomcomm::Info; + // @@protoc_insertion_point(field_mutable:loomcomm.ClientMessage.info) + return info_; } -inline ::loomcomm::Feedback* ClientMessage::release_feedback() { - clear_has_feedback(); - ::loomcomm::Feedback* temp = feedback_; - feedback_ = NULL; +inline ::loomcomm::Info* ClientMessage::release_info() { + clear_has_info(); + ::loomcomm::Info* temp = info_; + info_ = NULL; return temp; } -inline void ClientMessage::set_allocated_feedback(::loomcomm::Feedback* feedback) { - delete feedback_; - feedback_ = feedback; - if (feedback) { - set_has_feedback(); +inline void ClientMessage::set_allocated_info(::loomcomm::Info* info) { + delete info_; + info_ = info; + if (info) { + set_has_info(); } else { - clear_has_feedback(); + clear_has_info(); } - // @@protoc_insertion_point(field_set_allocated:loomcomm.ClientMessage.feedback) + // @@protoc_insertion_point(field_set_allocated:loomcomm.ClientMessage.info) } diff --git a/src/libloom/worker.cpp b/src/libloom/worker.cpp index a24f2c7deb71c278e96d77cf1e7b37d8e22a6bb8..02630f5cf85b1783587979b02d286df3330f2911 100644 --- a/src/libloom/worker.cpp +++ b/src/libloom/worker.cpp @@ -132,6 +132,7 @@ void Worker::register_worker() msg.set_protocol_version(PROTOCOL_VERSION); msg.set_port(get_listen_port()); + msg.set_cpus(resource_cpus); for (auto& factory : task_factories) { msg.add_task_types(factory->get_name()); diff --git a/src/proto/loomcomm.proto b/src/proto/loomcomm.proto index d286021ad22d748bcb09887abf69ec6b5059aaef..976f6d14ed3a1e70ee5ce7f8e4e4238834f47fa0 100644 --- a/src/proto/loomcomm.proto +++ b/src/proto/loomcomm.proto @@ -9,8 +9,14 @@ message Register { } required int32 protocol_version = 1; required Type type = 2; + + // Worker optional int32 port = 3; repeated string task_types = 4; + optional int32 cpus = 5; + + // Client + optional bool info = 10; } message ServerMessage { @@ -57,7 +63,7 @@ message Data optional uint64 size = 2; } -message Feedback { +message Info { required int32 id = 1; required string worker = 2; } @@ -65,9 +71,9 @@ message Feedback { message ClientMessage { enum Type { DATA = 1; - FEEDBACK = 2; + INFO = 2; } required Type type = 1; optional DataPrologue data = 2; - optional Feedback feedback = 3; + optional Info info = 3; } diff --git a/src/server/clientconn.cpp b/src/server/clientconn.cpp index bb18a8bf4badc07d760814ab9b7cb23bfa9ba300..9c6a3222d79e40961ae95deb01bcd709115d3ffc 100644 --- a/src/server/clientconn.cpp +++ b/src/server/clientconn.cpp @@ -6,8 +6,8 @@ using namespace loom; -ClientConnection::ClientConnection(Server &server, std::unique_ptr<loom::Connection> connection) - : server(server), connection(std::move(connection)) +ClientConnection::ClientConnection(Server &server, std::unique_ptr<loom::Connection> connection, bool info_flag) + : server(server), connection(std::move(connection)), info_flag(info_flag) { this->connection->set_callback(this); llog->info("Client {} connected", this->connection->get_peername()); diff --git a/src/server/clientconn.h b/src/server/clientconn.h index b83d3707ae9a9cb31f6526a6e28e691330a2b7dd..aa9b035a600c659e6ecf4500d5bebf4d608035cb 100644 --- a/src/server/clientconn.h +++ b/src/server/clientconn.h @@ -12,7 +12,7 @@ class Server; class ClientConnection : public loom::ConnectionCallback { public: ClientConnection(Server &server, - std::unique_ptr<loom::Connection> connection); + std::unique_ptr<loom::Connection> connection, bool info_flag); ~ClientConnection(); void on_message(const char *buffer, size_t size); void on_close(); @@ -25,9 +25,14 @@ public: connection->send_buffer(buffer); } + bool has_info_flag() const { + return info_flag; + } + protected: Server &server; std::unique_ptr<loom::Connection> connection; + bool info_flag; }; diff --git a/src/server/freshconn.cpp b/src/server/freshconn.cpp index 18d4f95883de2bcb48f938934db0b8a1c6fe2bb8..4de479e2c1afc976bbc68e0f8f586c89035f7157 100644 --- a/src/server/freshconn.cpp +++ b/src/server/freshconn.cpp @@ -46,15 +46,18 @@ void FreshConnection::on_message(const char *buffer, size_t size) auto wconn = std::make_unique<WorkerConnection>(server, std::move(connection), address.str(), - task_types); + task_types, + msg.cpus()); server.add_worker_connection(std::move(wconn)); server.remove_freshconnection(*this); return; } if (msg.type() == loomcomm::Register_Type_REGISTER_CLIENT) { + bool info_flag = msg.has_info() && msg.info(); auto cconn = std::make_unique<ClientConnection>(server, - std::move(connection)); + std::move(connection), + info_flag); server.add_client_connection(std::move(cconn)); assert(connection.get() == nullptr); server.remove_freshconnection(*this); diff --git a/src/server/server.cpp b/src/server/server.cpp index aedbcd1bd637c9dac7100c4a3aa4ad61d75ebf71..1ead2e0a71415ef1c0525162202b981a51bc4bd6 100644 --- a/src/server/server.cpp +++ b/src/server/server.cpp @@ -3,6 +3,7 @@ #include "libloom/utils.h" #include "libloom/log.h" +#include "libloom/loomcomm.pb.h" #include <sstream> @@ -62,6 +63,26 @@ void Server::remove_freshconnection(FreshConnection &conn) fresh_connections.erase(i); } +void Server::on_task_finished(TaskNode &task) +{ + assert(client_connection); + if (client_connection->has_info_flag()) { + loomcomm::ClientMessage cmsg; + cmsg.set_type(loomcomm::ClientMessage_Type_INFO); + loomcomm::Info *info = cmsg.mutable_info(); + info->set_id(task.get_id()); + const auto& owners = task.get_owners(); + assert(owners.size()); + info->set_worker(owners.back()->get_address()); + + SendBuffer *buffer = new SendBuffer; + buffer->add(cmsg); + + client_connection->send_buffer(buffer); + } + task_manager.on_task_finished(task); +} + void Server::start_listen() { struct sockaddr_in addr; diff --git a/src/server/server.h b/src/server/server.h index 6b5e9b9572ed15374a15e11c850dfb7b52d7c438..5e2f3a9efcdc1cafdfe6c46cae4cee8e387c8201 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -48,6 +48,8 @@ public: void add_resend_task(loom::Id id); + void on_task_finished(TaskNode &task); + private: void start_listen(); diff --git a/src/server/taskmanager.cpp b/src/server/taskmanager.cpp index 7041c0f3e2ab78a8b2bf5cd2d209c479f8f71a68..6c797bc0c6641fac898fca38a88508ae9a1923c4 100644 --- a/src/server/taskmanager.cpp +++ b/src/server/taskmanager.cpp @@ -5,6 +5,7 @@ #include "libloom/log.h" #include <algorithm> +#include <limits.h> using namespace loom; @@ -74,6 +75,11 @@ void TaskManager::on_task_finished(TaskNode &task) distribute_work(ready); } +struct _TaskInfo { + int priority; + TaskNode::Vector new_tasks; +}; + TaskManager::WorkDistribution TaskManager::compute_distribution(TaskNode::Vector &tasks) { WorkDistribution distribution; @@ -83,27 +89,44 @@ TaskManager::WorkDistribution TaskManager::compute_distribution(TaskNode::Vector return distribution; } - std::unordered_map<WorkerConnection*, TaskNode::Vector> map; + std::unordered_map<WorkerConnection*, _TaskInfo> map; + + for (auto &connection : connections) { + int size = connection->get_tasks().size(); + int cpus = connection->get_resource_cpus(); + map[connection.get()].priority = size - cpus; + } - int c = 0; - size_t size = connections.size(); for (TaskNode* task : tasks) { auto &inputs = task->get_inputs(); - bool done = false; + WorkerConnection *found = nullptr; + int best_priority = INT_MAX; + for (TaskNode *inp : inputs) { + auto &owners = inp->get_owners(); - if (owners.size()) { - map[owners[0]].push_back(task); - done = true; - break; + for (WorkerConnection *owner : owners) { + auto &info = map[owner]; + if (info.priority < best_priority) { + best_priority = info.priority; + found = owner; + } } } - if (done) { - continue; + + if (best_priority >= 0) { + for (auto &i : map) { + _TaskInfo &info = i.second; + if (info.priority < best_priority) { + best_priority = info.priority; + found = i.first; + } + } } - map[connections[c].get()].push_back(task); - c += 1; - c %= size; + + auto &info = map[found]; + info.new_tasks.push_back(task); + info.priority += 1; } /*std::sort(connections.begin(), connections.end(), @@ -113,9 +136,11 @@ TaskManager::WorkDistribution TaskManager::compute_distribution(TaskNode::Vector });*/ - distribution.reserve(map.size()); + //distribution.reserve(map.size()); for (auto& pair : map) { - distribution.emplace_back(WorkerLoad{*pair.first, std::move(pair.second)}); + if (!pair.second.new_tasks.empty()) { + distribution.emplace_back(WorkerLoad{*pair.first, std::move(pair.second.new_tasks)}); + } } return distribution; } diff --git a/src/server/workerconn.cpp b/src/server/workerconn.cpp index 72e7fdf1f3bb255f591cf8f7559b8533cda2ac01..4692e9f9c7ae9ccd07d8dab4f9d651afd87bbb85 100644 --- a/src/server/workerconn.cpp +++ b/src/server/workerconn.cpp @@ -11,13 +11,15 @@ using namespace loom; WorkerConnection::WorkerConnection(Server &server, std::unique_ptr<Connection> connection, const std::string& address, - const std::vector<std::string> &task_types) + const std::vector<std::string> &task_types, + int resource_cpus) : server(server), connection(std::move(connection)), - address(address) + resource_cpus(resource_cpus), + address(address) { - llog->info("Worker {} connected", address); + llog->info("Worker {} connected (cpus={})", address, resource_cpus); if (this->connection.get()) { this->connection->set_callback(this); } @@ -34,13 +36,14 @@ void WorkerConnection::on_message(const char *buffer, size_t size) loomcomm::WorkerResponse msg; msg.ParseFromArray(buffer, size); - auto it = tasks.find(msg.id()); - assert(it != tasks.end()); - + const auto it = tasks.find(msg.id()); + assert(it != tasks.end()); TaskNode *task = it->second; + tasks.erase(it); + task->add_owner(this); task->set_finished(); - server.get_task_manager().on_task_finished(*task); + server.on_task_finished(*task); } void WorkerConnection::on_close() diff --git a/src/server/workerconn.h b/src/server/workerconn.h index c3ad35ef0c6c4b062badc77f9c142f68763cdeee..13e60c3cfd79b00f8e791e61bb8a35997485a097 100644 --- a/src/server/workerconn.h +++ b/src/server/workerconn.h @@ -15,7 +15,8 @@ public: WorkerConnection(Server &server, std::unique_ptr<loom::Connection> connection, const std::string& address, - const std::vector<std::string> &task_types); + const std::vector<std::string> &task_types, + int resource_cpus); void on_message(const char *buffer, size_t size); void on_close(); @@ -41,10 +42,15 @@ public: assert(0); } + int get_resource_cpus() const { + return resource_cpus; + } + private: Server &server; std::unique_ptr<loom::Connection> connection; std::unordered_map<loom::Id, TaskNode*> tasks; + int resource_cpus; std::string address; std::vector<int> task_type_translates; diff --git a/tests/client/cv_test.py b/tests/client/cv_test.py index 4f7f1a47a2bd579a0edaace7edc1179f10e50f00..d6c74bb5e4bce085d8eeec9a19c44facc474a0f3 100644 --- a/tests/client/cv_test.py +++ b/tests/client/cv_test.py @@ -12,6 +12,7 @@ def test_cv_iris(loom_env): CHUNK_SIZE = 150 / CHUNKS # There are 150 irises loom_env.start(2) + loom_env.info = True p = loom_env.plan() a = p.task_open(IRIS_DATA) @@ -38,8 +39,10 @@ def test_cv_iris(loom_env): task.map_file_in(model, "model") predict.append(task) - # p.write_dot("test.dot") results = loom_env.submit(p, predict) + assert len(results) == CHUNKS for line in results: assert line.startswith("Accuracy = ") + + p.write_dot("test.dot", loom_env.client.info) diff --git a/tests/client/loomenv.py b/tests/client/loomenv.py index 47e056716c5cc48898306d499ad5c54680e0b53b..ca5676beb456de9750015ebf0dcaa1b0a7876dde 100644 --- a/tests/client/loomenv.py +++ b/tests/client/loomenv.py @@ -47,9 +47,14 @@ class Env(): class LoomEnv(Env): PORT = 19010 + info = False + _client = None def start(self, workers_count, cpus=1): - self.kill_all() + if self.processes: + self._client = None + self.kill_all() + time.sleep(0.2) server_args = (LOOM_SERVER_BIN, "--debug", "--port=" + str(self.PORT)) @@ -79,11 +84,14 @@ class LoomEnv(Env): def plan(self): return client.Plan() + @property def client(self): - return client.Client("localhost", self.PORT) + if self._client is None: + self._client = client.Client("localhost", self.PORT, self.info) + return self._client def submit(self, plan, results): - return self.client().submit(plan, results) + return self.client.submit(plan, results) @pytest.yield_fixture(autouse=True, scope="function") diff --git a/tests/server/scheduler.cpp b/tests/server/scheduler.cpp index 503e1a9ce303bee15d4f4839d839cfd10315f5b8..9b44aef77e850b7f2d17547f022d44c9ede2285e 100644 --- a/tests/server/scheduler.cpp +++ b/tests/server/scheduler.cpp @@ -9,7 +9,8 @@ typedef std::unordered_map<WorkerConnection*, TaskSet> DistMap; -DistMap to_distmap(TaskManager::WorkDistribution dist) +static DistMap +to_distmap(TaskManager::WorkDistribution dist) { DistMap map; for (auto &load : dist) { @@ -22,6 +23,14 @@ DistMap to_distmap(TaskManager::WorkDistribution dist) return map; } +static std::unique_ptr<WorkerConnection> +simple_worker(Server &server, const std::string &name, int cpus=1) +{ + std::vector<std::string> tt; + return std::make_unique<WorkerConnection>(server, nullptr, name, tt, cpus); +} + + TEST_CASE( "Server scheduling - separate tasks", "[scheduling]" ) { Server server(NULL, 0); TaskManager &manager = server.get_task_manager(); @@ -42,14 +51,14 @@ TEST_CASE( "Server scheduling - separate tasks", "[scheduling]" ) { std::vector<std::string> tt; - auto wconn = std::make_unique<WorkerConnection>(server, nullptr, "w1", tt); + auto wconn = simple_worker(server, "w1"); WorkerConnection *w1 = wconn.get(); server.add_worker_connection(std::move(wconn)); auto d2 = to_distmap(manager.compute_distribution(v)); CHECK(d2.size() == 1); - wconn = std::make_unique<WorkerConnection>(server, nullptr, "w2", tt); + wconn = simple_worker(server, "w1"); WorkerConnection *w2 = wconn.get(); server.add_worker_connection(std::move(wconn)); CHECK(server.get_connections().size() == 2);