Commit 9817de25 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: Workers informs about finished transfers

parent 6df2c722
......@@ -50,8 +50,9 @@ message WorkerCommand {
message WorkerResponse {
enum Type {
FINISH = 1;
FAILED = 2;
FINISHED = 1;
TRANSFERED = 2;
FAILED = 3;
}
required Type type = 1;
required int32 id = 2;
......
......@@ -19,7 +19,7 @@ from . import loomplan_pb2
DESCRIPTOR = _descriptor.FileDescriptor(
name='loomcomm.proto',
package='loomcomm',
serialized_pb=_b('\n\x0eloomcomm.proto\x12\x08loomcomm\x1a\x0eloomplan.proto\"\xc1\x01\n\x08Register\x12\x18\n\x10protocol_version\x18\x01 \x02(\x05\x12%\n\x04type\x18\x02 \x02(\x0e\x32\x17.loomcomm.Register.Type\x12\x0c\n\x04port\x18\x03 \x01(\x05\x12\x12\n\ntask_types\x18\x04 \x03(\t\x12\x12\n\ndata_types\x18\x05 \x03(\t\x12\x0c\n\x04\x63pus\x18\x06 \x01(\x05\"0\n\x04Type\x12\x13\n\x0fREGISTER_WORKER\x10\x01\x12\x13\n\x0fREGISTER_CLIENT\x10\x02\"&\n\rServerMessage\"\x15\n\x04Type\x12\r\n\tSTART_JOB\x10\x01\"\xde\x01\n\rWorkerCommand\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.WorkerCommand.Type\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x11\n\ttask_type\x18\x03 \x01(\x05\x12\x13\n\x0btask_config\x18\x04 \x01(\t\x12\x13\n\x0btask_inputs\x18\x05 \x03(\x05\x12\x0f\n\x07\x61\x64\x64ress\x18\n \x01(\t\x12\x0f\n\x07symbols\x18\x64 \x03(\t\"6\n\x04Type\x12\x08\n\x04TASK\x10\x01\x12\x08\n\x04SEND\x10\x02\x12\n\n\x06REMOVE\x10\x03\x12\x0e\n\nDICTIONARY\x10\x04\"\x9a\x01\n\x0eWorkerResponse\x12+\n\x04type\x18\x01 \x02(\x0e\x32\x1d.loomcomm.WorkerResponse.Type\x12\n\n\x02id\x18\x02 \x02(\x05\x12\x0c\n\x04size\x18\x03 \x01(\x04\x12\x0e\n\x06length\x18\x04 \x01(\x04\x12\x11\n\terror_msg\x18\x64 \x01(\t\"\x1e\n\x04Type\x12\n\n\x06\x46INISH\x10\x01\x12\n\n\x06\x46\x41ILED\x10\x02\"\x18\n\x08\x41nnounce\x12\x0c\n\x04port\x18\x01 \x02(\x05\"=\n\nDataHeader\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x0f\n\x07type_id\x18\x03 \x02(\x05\x12\x12\n\nn_messages\x18\x02 \x02(\x03\"\x9c\x01\n\x05\x45vent\x12\x0c\n\x04time\x18\x01 \x02(\x04\x12\"\n\x04type\x18\x02 \x02(\x0e\x32\x14.loomcomm.Event.Type\x12\n\n\x02id\x18\x03 \x02(\x05\x12\x11\n\tworker_id\x18\x04 \x01(\x05\"B\n\x04Type\x12\x0e\n\nTASK_START\x10\x01\x12\x0c\n\x08TASK_END\x10\x02\x12\x0e\n\nSEND_START\x10\x03\x12\x0c\n\x08SEND_END\x10\x04\"6\n\x05\x45rror\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x0e\n\x06worker\x18\x02 \x02(\t\x12\x11\n\terror_msg\x18\x03 \x02(\t\"\xe8\x01\n\rClientMessage\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.ClientMessage.Type\x12\"\n\x04\x64\x61ta\x18\x02 \x01(\x0b\x32\x14.loomcomm.DataHeader\x12\x1e\n\x05\x65vent\x18\x03 \x01(\x0b\x32\x0f.loomcomm.Event\x12\x1e\n\x05\x65rror\x18\x04 \x01(\x0b\x32\x0f.loomcomm.Error\x12\x0f\n\x07symbols\x18\x05 \x03(\t\"6\n\x04Type\x12\x08\n\x04\x44\x41TA\x10\x01\x12\t\n\x05\x45VENT\x10\x02\x12\t\n\x05\x45RROR\x10\x03\x12\x0e\n\nDICTIONARY\x10\x04\"<\n\x0c\x43lientSubmit\x12\x1c\n\x04plan\x18\x01 \x02(\x0b\x32\x0e.loomplan.Plan\x12\x0e\n\x06report\x18\x02 \x02(\x08\x42\x02H\x03')
serialized_pb=_b('\n\x0eloomcomm.proto\x12\x08loomcomm\x1a\x0eloomplan.proto\"\xc1\x01\n\x08Register\x12\x18\n\x10protocol_version\x18\x01 \x02(\x05\x12%\n\x04type\x18\x02 \x02(\x0e\x32\x17.loomcomm.Register.Type\x12\x0c\n\x04port\x18\x03 \x01(\x05\x12\x12\n\ntask_types\x18\x04 \x03(\t\x12\x12\n\ndata_types\x18\x05 \x03(\t\x12\x0c\n\x04\x63pus\x18\x06 \x01(\x05\"0\n\x04Type\x12\x13\n\x0fREGISTER_WORKER\x10\x01\x12\x13\n\x0fREGISTER_CLIENT\x10\x02\"&\n\rServerMessage\"\x15\n\x04Type\x12\r\n\tSTART_JOB\x10\x01\"\xde\x01\n\rWorkerCommand\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.WorkerCommand.Type\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x11\n\ttask_type\x18\x03 \x01(\x05\x12\x13\n\x0btask_config\x18\x04 \x01(\t\x12\x13\n\x0btask_inputs\x18\x05 \x03(\x05\x12\x0f\n\x07\x61\x64\x64ress\x18\n \x01(\t\x12\x0f\n\x07symbols\x18\x64 \x03(\t\"6\n\x04Type\x12\x08\n\x04TASK\x10\x01\x12\x08\n\x04SEND\x10\x02\x12\n\n\x06REMOVE\x10\x03\x12\x0e\n\nDICTIONARY\x10\x04\"\xac\x01\n\x0eWorkerResponse\x12+\n\x04type\x18\x01 \x02(\x0e\x32\x1d.loomcomm.WorkerResponse.Type\x12\n\n\x02id\x18\x02 \x02(\x05\x12\x0c\n\x04size\x18\x03 \x01(\x04\x12\x0e\n\x06length\x18\x04 \x01(\x04\x12\x11\n\terror_msg\x18\x64 \x01(\t\"0\n\x04Type\x12\x0c\n\x08\x46INISHED\x10\x01\x12\x0e\n\nTRANSFERED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\"\x18\n\x08\x41nnounce\x12\x0c\n\x04port\x18\x01 \x02(\x05\"=\n\nDataHeader\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x0f\n\x07type_id\x18\x03 \x02(\x05\x12\x12\n\nn_messages\x18\x02 \x02(\x03\"\x9c\x01\n\x05\x45vent\x12\x0c\n\x04time\x18\x01 \x02(\x04\x12\"\n\x04type\x18\x02 \x02(\x0e\x32\x14.loomcomm.Event.Type\x12\n\n\x02id\x18\x03 \x02(\x05\x12\x11\n\tworker_id\x18\x04 \x01(\x05\"B\n\x04Type\x12\x0e\n\nTASK_START\x10\x01\x12\x0c\n\x08TASK_END\x10\x02\x12\x0e\n\nSEND_START\x10\x03\x12\x0c\n\x08SEND_END\x10\x04\"6\n\x05\x45rror\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x0e\n\x06worker\x18\x02 \x02(\t\x12\x11\n\terror_msg\x18\x03 \x02(\t\"\xe8\x01\n\rClientMessage\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.ClientMessage.Type\x12\"\n\x04\x64\x61ta\x18\x02 \x01(\x0b\x32\x14.loomcomm.DataHeader\x12\x1e\n\x05\x65vent\x18\x03 \x01(\x0b\x32\x0f.loomcomm.Event\x12\x1e\n\x05\x65rror\x18\x04 \x01(\x0b\x32\x0f.loomcomm.Error\x12\x0f\n\x07symbols\x18\x05 \x03(\t\"6\n\x04Type\x12\x08\n\x04\x44\x41TA\x10\x01\x12\t\n\x05\x45VENT\x10\x02\x12\t\n\x05\x45RROR\x10\x03\x12\x0e\n\nDICTIONARY\x10\x04\"<\n\x0c\x43lientSubmit\x12\x1c\n\x04plan\x18\x01 \x02(\x0b\x32\x0e.loomplan.Plan\x12\x0e\n\x06report\x18\x02 \x02(\x08\x42\x02H\x03')
,
dependencies=[loomplan_pb2.DESCRIPTOR,])
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
......@@ -103,18 +103,22 @@ _WORKERRESPONSE_TYPE = _descriptor.EnumDescriptor(
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='FINISH', index=0, number=1,
name='FINISHED', index=0, number=1,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='FAILED', index=1, number=2,
name='TRANSFERED', index=1, number=2,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='FAILED', index=2, number=3,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=630,
serialized_end=660,
serialized_end=678,
)
_sym_db.RegisterEnumDescriptor(_WORKERRESPONSE_TYPE)
......@@ -143,8 +147,8 @@ _EVENT_TYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
serialized_start=842,
serialized_end=908,
serialized_start=860,
serialized_end=926,
)
_sym_db.RegisterEnumDescriptor(_EVENT_TYPE)
......@@ -173,8 +177,8 @@ _CLIENTMESSAGE_TYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
serialized_start=1145,
serialized_end=1199,
serialized_start=1163,
serialized_end=1217,
)
_sym_db.RegisterEnumDescriptor(_CLIENTMESSAGE_TYPE)
......@@ -397,7 +401,7 @@ _WORKERRESPONSE = _descriptor.Descriptor(
oneofs=[
],
serialized_start=506,
serialized_end=660,
serialized_end=678,
)
......@@ -426,8 +430,8 @@ _ANNOUNCE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=662,
serialized_end=686,
serialized_start=680,
serialized_end=704,
)
......@@ -470,8 +474,8 @@ _DATAHEADER = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=688,
serialized_end=749,
serialized_start=706,
serialized_end=767,
)
......@@ -522,8 +526,8 @@ _EVENT = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=752,
serialized_end=908,
serialized_start=770,
serialized_end=926,
)
......@@ -566,8 +570,8 @@ _ERROR = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=910,
serialized_end=964,
serialized_start=928,
serialized_end=982,
)
......@@ -625,8 +629,8 @@ _CLIENTMESSAGE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=967,
serialized_end=1199,
serialized_start=985,
serialized_end=1217,
)
......@@ -662,8 +666,8 @@ _CLIENTSUBMIT = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=1201,
serialized_end=1261,
serialized_start=1219,
serialized_end=1279,
)
_REGISTER.fields_by_name['type'].enum_type = _REGISTER_TYPE
......
......@@ -1083,6 +1083,7 @@ bool WorkerResponse_Type_IsValid(int value) {
switch(value) {
case 1:
case 2:
case 3:
return true;
default:
return false;
......@@ -1090,7 +1091,8 @@ bool WorkerResponse_Type_IsValid(int value) {
}
#ifndef _MSC_VER
const WorkerResponse_Type WorkerResponse::FINISH;
const WorkerResponse_Type WorkerResponse::FINISHED;
const WorkerResponse_Type WorkerResponse::TRANSFERED;
const WorkerResponse_Type WorkerResponse::FAILED;
const WorkerResponse_Type WorkerResponse::Type_MIN;
const WorkerResponse_Type WorkerResponse::Type_MAX;
......
......@@ -73,11 +73,12 @@ const WorkerCommand_Type WorkerCommand_Type_Type_MAX = WorkerCommand_Type_DICTIO
const int WorkerCommand_Type_Type_ARRAYSIZE = WorkerCommand_Type_Type_MAX + 1;
enum WorkerResponse_Type {
WorkerResponse_Type_FINISH = 1,
WorkerResponse_Type_FAILED = 2
WorkerResponse_Type_FINISHED = 1,
WorkerResponse_Type_TRANSFERED = 2,
WorkerResponse_Type_FAILED = 3
};
bool WorkerResponse_Type_IsValid(int value);
const WorkerResponse_Type WorkerResponse_Type_Type_MIN = WorkerResponse_Type_FINISH;
const WorkerResponse_Type WorkerResponse_Type_Type_MIN = WorkerResponse_Type_FINISHED;
const WorkerResponse_Type WorkerResponse_Type_Type_MAX = WorkerResponse_Type_FAILED;
const int WorkerResponse_Type_Type_ARRAYSIZE = WorkerResponse_Type_Type_MAX + 1;
......@@ -612,7 +613,8 @@ class WorkerResponse : public ::google::protobuf::MessageLite {
// nested types ----------------------------------------------------
typedef WorkerResponse_Type Type;
static const Type FINISH = WorkerResponse_Type_FINISH;
static const Type FINISHED = WorkerResponse_Type_FINISHED;
static const Type TRANSFERED = WorkerResponse_Type_TRANSFERED;
static const Type FAILED = WorkerResponse_Type_FAILED;
static inline bool Type_IsValid(int value) {
return WorkerResponse_Type_IsValid(value);
......
......@@ -50,6 +50,15 @@ void InterConnection::on_connect()
early_sends.clear();
}
void InterConnection::finish_receive()
{
logger->debug("Interconnect: Data id={} received", unpacking_data_id);
worker.data_transfered(unpacking_data_id);
worker.publish_data(unpacking_data_id, unpacker->finish());
unpacking_data_id = -1;
unpacker.reset();
}
void InterConnection::on_message(const char *buffer, size_t size)
{
if (!address.empty()) {
......@@ -58,9 +67,7 @@ void InterConnection::on_message(const char *buffer, size_t size)
auto result = unpacker->on_message(buffer, size);
switch(result) {
case DataUnpacker::FINISHED:
worker.publish_data(unpacking_data_id, unpacker->finish());
unpacking_data_id = -1;
unpacker.reset();
finish_receive();
return;
case DataUnpacker::MESSAGE:
return;
......@@ -103,10 +110,8 @@ void InterConnection::on_stream_data(const char *buffer, size_t size, size_t rem
auto result = unpacker->on_stream_data(buffer, size, remaining);
switch(result) {
case DataUnpacker::FINISHED:
worker.publish_data(unpacking_data_id, unpacker->finish());
unpacking_data_id = -1;
unpacker.reset();
socket.set_stream_mode(false);
finish_receive();
return;
case DataUnpacker::MESSAGE:
socket.set_stream_mode(false);
......
......@@ -52,6 +52,8 @@ protected:
void on_stream_data(const char *buffer, size_t size, size_t remaining);
void on_connect();
void finish_receive();
base::Socket socket;
Worker &worker;
std::string address;
......
......@@ -160,7 +160,7 @@ void RunTask::start(DataVector &inputs)
close(stderr_fd);
if (r) {
fail_libuv("uv_spawn", r);
fail_libuv(std::string("spawning '") + options.file + "'", r);
return;
}
}
......
......@@ -435,7 +435,7 @@ void Worker::task_finished(TaskInstance &task, const DataPtr &data)
{
if (server_conn.is_connected()) {
loomcomm::WorkerResponse msg;
msg.set_type(loomcomm::WorkerResponse_Type_FINISH);
msg.set_type(loomcomm::WorkerResponse_Type_FINISHED);
msg.set_id(task.get_id());
msg.set_size(data->get_size());
msg.set_length(data->get_length());
......@@ -445,6 +445,16 @@ void Worker::task_finished(TaskInstance &task, const DataPtr &data)
check_ready_tasks();
}
void Worker::data_transfered(base::Id task_id)
{
if (server_conn.is_connected()) {
loomcomm::WorkerResponse msg;
msg.set_type(loomcomm::WorkerResponse_Type_TRANSFERED);
msg.set_id(task_id);
send_message(server_conn, msg);
}
}
void Worker::send_data(const std::string &address, Id id, DataPtr &data)
{
auto &connection = get_connection(address);;
......
......@@ -48,6 +48,8 @@ public:
void task_finished(TaskInstance &task_instance, const DataPtr &data);
void task_failed(TaskInstance &task_instance, const std::string &error_msg);
void data_transfered(base::Id task_id);
void task_redirect(TaskInstance &task, std::unique_ptr<TaskDescription> new_task_desc);
void publish_data(base::Id id, const DataPtr &data);
void remove_data(base::Id id);
......
......@@ -58,6 +58,13 @@ void ComputationState::set_task_finished(const PlanNode&node, size_t size, size_
workers[wc].free_cpus += node.get_n_cpus();
}
void ComputationState::set_data_transfered(loom::base::Id id, WorkerConnection *wc)
{
TaskState &state = get_state(id);
assert(state.get_worker_status(wc) == WStatus::TRANSFER);
state.set_worker_status(wc, WStatus::OWNER);
}
void ComputationState::remove_state(loom::base::Id id)
{
auto it = states.find(id);
......
......@@ -42,6 +42,7 @@ public:
void add_ready_nodes(const std::vector<loom::base::Id> &ids);
void set_task_finished(const PlanNode& node, size_t size, size_t length, WorkerConnection *wc);
void set_data_transfered(loom::base::Id id, WorkerConnection *wc);
bool has_pending_tasks() const {
return !pending_tasks.empty();
......
......@@ -87,6 +87,11 @@ void Server::on_task_finished(loom::base::Id id, size_t size, size_t length, Wor
task_manager.on_task_finished(id, size, length, wc);
}
void Server::on_data_transfered(loom::base::Id id, WorkerConnection *wc)
{
task_manager.on_data_transfered(id, wc);
}
void Server::inform_about_error(std::string &error_msg)
{
......
......@@ -61,6 +61,7 @@ public:
void add_resend_task(loom::base::Id id);
void on_task_finished(loom::base::Id id, size_t size, size_t length, WorkerConnection *wc);
void on_data_transfered(loom::base::Id id, WorkerConnection *wc);
loom::base::Dictionary& get_dictionary() {
return dictionary;
......
......@@ -55,7 +55,7 @@ void TaskManager::start_task(WorkerConnection *wc, Id task_id)
WorkerConnection *owner = state.get_first_owner();
assert(owner);
owner->send_data(id, wc->get_address());
state.set_worker_status(wc, WStatus::OWNER);
state.set_worker_status(wc, WStatus::TRANSFER);
}
}
......@@ -92,7 +92,9 @@ void TaskManager::remove_state(TaskState &state)
logger->debug("Removing state id={}", state.get_id());
assert(state.get_ref_counter() == 0);
loom::base::Id id = state.get_id();
state.foreach_owner([id](WorkerConnection *wc) {
state.foreach_worker([id](WorkerConnection *wc, WStatus status) {
assert(status == WStatus::OWNER);
wc->remove_data(id);
});
cstate.remove_state(id);
......@@ -144,6 +146,12 @@ void TaskManager::on_task_finished(loom::base::Id id, size_t size, size_t length
}
}
void TaskManager::on_data_transfered(Id id, WorkerConnection *wc)
{
logger->debug("Data id={} transfered to {}", id, wc->get_address());
cstate.set_data_transfered(id, wc);
}
void TaskManager::register_worker(WorkerConnection *wc)
{
cstate.add_worker(wc);
......
......@@ -37,6 +37,7 @@ public:
}
void on_task_finished(loom::base::Id id, size_t size, size_t length, WorkerConnection *wc);
void on_data_transfered(loom::base::Id id, WorkerConnection *wc);
void register_worker(WorkerConnection *wc);
bool is_plan_finished() const {
......
......@@ -96,7 +96,7 @@ public:
return false;
}
template<typename F> void foreach_planned_owner(F f) const {
template<typename F> void foreach_planned_owner(const F &f) const {
for(auto &pair : workers) {
if (is_planned_owner(pair.second)) {
f(pair.first);
......@@ -104,7 +104,7 @@ public:
}
}
template<typename F> void foreach_owner(F f) const {
template<typename F> void foreach_owner(const F &f) const {
for(auto &pair : workers) {
if (pair.second == WStatus::OWNER) {
f(pair.first);
......@@ -112,6 +112,12 @@ public:
}
}
template<typename F> void foreach_worker(const F &f) const {
for(auto &pair : workers) {
f(pair.first, pair.second);
}
}
std::string get_info() const;
private:
......
......@@ -45,11 +45,16 @@ void WorkerConnection::on_message(const char *buffer, size_t size)
loomcomm::WorkerResponse msg;
msg.ParseFromArray(buffer, size);
if (msg.type() == loomcomm::WorkerResponse_Type_FINISH) {
if (msg.type() == loomcomm::WorkerResponse_Type_FINISHED) {
server.on_task_finished(msg.id(), msg.size(), msg.length(), this);
return;
}
if (msg.type() == loomcomm::WorkerResponse_Type_TRANSFERED) {
server.on_data_transfered(msg.id(), this);
return;
}
if (msg.type() == loomcomm::WorkerResponse_Type_FAILED) {
assert(msg.has_error_msg());
server.inform_about_task_error(msg.id(), *this, msg.error_msg());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment