Commit f6225913 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: stats reporting

parent 5fce50d4
......@@ -94,21 +94,35 @@ message Error {
required string error_msg = 3;
}
message ClientMessage {
message Stats {
optional int32 n_workers = 1;
optional int32 n_data_objects = 2;
}
message ClientResponse {
enum Type {
DATA = 1;
EVENT = 2;
ERROR = 3;
DICTIONARY = 4;
STATS = 5;
}
required Type type = 1;
optional DataHeader data = 2;
optional Event event = 3;
optional Error error = 4;
repeated string symbols = 5;
optional Stats stats = 6;
}
message ClientSubmit {
required loomplan.Plan plan = 1;
required bool report = 2;
message ClientRequest {
enum Type {
PLAN = 1;
STATS = 2;
}
// PLAN
required Type type = 1;
optional loomplan.Plan plan = 2;
optional bool report = 3 [default=false];
}
......@@ -3,13 +3,14 @@ from .connection import Connection
from .task import Task
from .plan import Plan
from ..pb.loomcomm_pb2 import Register, DataHeader, ClientMessage, ClientSubmit
from ..pb.loomcomm_pb2 import Register, DataHeader
from ..pb.loomcomm_pb2 import ClientRequest, ClientResponse
from ..pb.loomreport_pb2 import Report
import socket
import struct
LOOM_PROTOCOL_VERSION = 1
LOOM_PROTOCOL_VERSION = 2
class LoomException(Exception):
......@@ -58,6 +59,20 @@ class Client(object):
while self.symbols is None:
self._read_symbols()
"""Ask server for basic statistic informations"""
def get_stats(self):
msg = ClientRequest()
msg.type = ClientRequest.STATS
self._send_message(msg)
msg = self.connection.receive_message()
cmsg = ClientResponse()
cmsg.ParseFromString(msg)
assert cmsg.type == ClientResponse.STATS
return {
"n_workers": cmsg.stats.n_workers,
"n_data_objects": cmsg.stats.n_data_objects
}
def submit(self, tasks, report=None):
"""Submits task(s) to the server and waits for results
......@@ -89,7 +104,8 @@ class Client(object):
for task in task_set:
plan.add(task)
msg = ClientSubmit()
msg = ClientRequest()
msg.type = ClientRequest.PLAN
msg.report = bool(report)
msg.plan.result_ids.extend(plan.tasks[t] for t in task_set)
......@@ -104,13 +120,13 @@ class Client(object):
data = {}
while expected != len(data):
msg = self.connection.receive_message()
cmsg = ClientMessage()
cmsg = ClientResponse()
cmsg.ParseFromString(msg)
if cmsg.type == ClientMessage.DATA:
if cmsg.type == ClientResponse.DATA:
data[cmsg.data.id] = self._receive_data(cmsg.data.type_id)
elif cmsg.type == ClientMessage.EVENT:
elif cmsg.type == ClientResponse.EVENT:
self.process_event(cmsg.event, report_data)
elif cmsg.type == ClientMessage.ERROR:
elif cmsg.type == ClientResponse.ERROR:
self.process_error(cmsg)
else:
assert 0
......@@ -137,9 +153,9 @@ class Client(object):
def _read_symbols(self):
msg = self.connection.receive_message()
cmsg = ClientMessage()
cmsg = ClientResponse()
cmsg.ParseFromString(msg)
assert cmsg.type == ClientMessage.DICTIONARY
assert cmsg.type == ClientResponse.DICTIONARY
self.symbols = {}
for i, s in enumerate(cmsg.symbols):
self.symbols[s] = i
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -4,7 +4,7 @@
namespace loom {
namespace base {
const int PROTOCOL_VERSION = 1;
const int PROTOCOL_VERSION = 2;
typedef int Id;
}
......
......@@ -28,8 +28,8 @@ ClientConnection::ClientConnection(Server &server,
logger->info("Client {} connected", this->socket->get_peername());
// Send dictionary
loomcomm::ClientMessage cmsg;
cmsg.set_type(loomcomm::ClientMessage_Type_DICTIONARY);
loomcomm::ClientResponse cmsg;
cmsg.set_type(loomcomm::ClientResponse_Type_DICTIONARY);
std::vector<std::string> symbols = server.get_dictionary().get_all_symbols();
for (std::string &symbol : symbols) {
......@@ -42,13 +42,27 @@ ClientConnection::ClientConnection(Server &server,
void ClientConnection::on_message(const char *buffer, size_t size)
{
logger->debug("Plan received");
loomcomm::ClientSubmit submit;
submit.ParseFromArray(buffer, size);
auto& task_manager = server.get_task_manager();
loomcomm::ClientRequest request;
request.ParseFromArray(buffer, size);
const loomplan::Plan &plan = submit.plan();
loom::base::Id id_base = server.new_id(plan.tasks_size());
task_manager.add_plan(Plan(plan, id_base, server.get_dictionary()), submit.report());
logger->info("Plan submitted tasks={} report={}", plan.tasks_size(), submit.report());
if (request.type() == loomcomm::ClientRequest_Type_PLAN) {
logger->debug("Plan received");
const loomplan::Plan &plan = request.plan();
loom::base::Id id_base = server.new_id(plan.tasks_size());
bool report = request.report();
task_manager.add_plan(Plan(plan, id_base, server.get_dictionary()), report);
logger->info("Plan submitted tasks={} report={}", plan.tasks_size(), report);
} else if (request.type() == loomcomm::ClientRequest_Type_STATS) {
logger->debug("Stats request");
loomcomm::ClientResponse cmsg;
cmsg.set_type(loomcomm::ClientResponse_Type_STATS);
loomcomm::Stats *stats = cmsg.mutable_stats();
stats->set_n_workers(server.get_connections().size());
stats->set_n_data_objects(server.get_task_manager().get_n_of_data_objects());
send_message(cmsg);
} else {
logger->critical("Invalid request type");
exit(1);
}
}
......@@ -91,6 +91,17 @@ bool ComputationState::is_finished() const
return states.empty();
}
int ComputationState::get_n_data_objects() const
{
int count = 0;
for (auto& pair : states) {
if (pair.second.has_owner()) {
count += 1;
}
}
return count;
}
void ComputationState::add_pending_task(loom::base::Id id)
{
logger->debug("Add pending task and creating state id={}", id);
......
......@@ -71,6 +71,8 @@ public:
return workers;
}
int get_n_data_objects() const;
private:
std::unordered_map<loom::base::Id, TaskState> states;
std::unordered_map<WorkerConnection*, WorkerInfo> workers;
......
......@@ -101,8 +101,8 @@ void DWConnection::on_message(const char *buffer, size_t size)
msg.set_id(client_id);
logger->debug("DummyWorker: Capturing data for client data_id={} (messages={})", data_id, remaining_messages);
loomcomm::ClientMessage cmsg;
cmsg.set_type(loomcomm::ClientMessage_Type_DATA);
loomcomm::ClientResponse cmsg;
cmsg.set_type(loomcomm::ClientResponse_Type_DATA);
*cmsg.mutable_data() = msg;
send_buffer->add(base::message_to_item(cmsg));
}
......@@ -92,18 +92,13 @@ void Server::on_data_transfered(loom::base::Id id, WorkerConnection *wc)
task_manager.on_data_transfered(id, wc);
}
void Server::inform_about_error(std::string &error_msg)
{
}
void Server::inform_about_task_error(Id id, WorkerConnection &wconn, const std::string &error_msg)
{
logger->error("Task id={} failed on worker {}: {}",
id, wconn.get_address(), error_msg);
loomcomm::ClientMessage msg;
msg.set_type(loomcomm::ClientMessage_Type_ERROR);
loomcomm::ClientResponse msg;
msg.set_type(loomcomm::ClientResponse_Type_ERROR);
loomcomm::Error *error = msg.mutable_error();
error->set_id(id);
......@@ -150,8 +145,8 @@ void Server::report_event(std::unique_ptr<loomcomm::Event> event)
return;
}
loomcomm::ClientMessage cmsg;
cmsg.set_type(loomcomm::ClientMessage_Type_EVENT);
loomcomm::ClientResponse cmsg;
cmsg.set_type(loomcomm::ClientResponse_Type_EVENT);
cmsg.set_allocated_event(event.release());
client_connection->send_message(cmsg);
......
......@@ -67,7 +67,6 @@ public:
return dictionary;
}
void inform_about_error(std::string &error_msg);
void inform_about_task_error(loom::base::Id id, WorkerConnection &wconn, const std::string &error_msg);
loom::base::Id new_id(int count = 1) {
......
......@@ -44,6 +44,10 @@ public:
return cstate.is_finished();
}
int get_n_of_data_objects() const {
return cstate.get_n_data_objects();
}
void run_task_distribution();
private:
......
......@@ -74,7 +74,13 @@ public:
return i->second;
}
WorkerConnection *get_first_owner() {
bool has_owner() const {
return get_first_owner() == nullptr;
}
WorkerConnection *get_first_owner() const {
for (auto &p : workers) {
if (p.second == WStatus::OWNER) {
return p.first;
......
......@@ -50,6 +50,7 @@ class LoomEnv(Env):
_client = None
def start(self, workers_count, cpus=1):
self.workers_count = workers_count
if self.processes:
self._client = None
self.kill_all()
......@@ -86,16 +87,24 @@ class LoomEnv(Env):
assert not server.poll()
assert not any(w.poll() for w in workers)
def check_stats(self):
stats = self._client.get_stats()
assert stats["n_workers"] == self.workers_count
assert stats["n_data_objects"] == 0
@property
def client(self):
if self._client is None:
self._client = client.Client("localhost", self.PORT)
self.check_stats()
return self._client
def submit(self, results, report=None):
if report:
report = os.path.join(LOOM_TEST_BUILD_DIR, report)
return self.client.submit(results, report)
result = self.client.submit(results, report)
self.check_stats()
return result
def make_dry_report(self, tasks, filename):
filename = os.path.join(LOOM_TEST_BUILD_DIR, filename)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment