Commit a18f93e2 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: rview and report system introduced

parent 72ff1701
......@@ -12,6 +12,7 @@ _build
*.so.*
*_pch.h.cpp
*_resource.rc
.*
# qtcreator generated files
*.pro.user*
......
from loomcomm_pb2 import Register, Data, ClientMessage
from loomcomm_pb2 import Register, Data, ClientMessage, ClientSubmit
from loomreport_pb2 import Report
import socket
from connection import Connection
......@@ -23,7 +24,7 @@ class TaskFailed(LoomException):
class Client(object):
def __init__(self, address, port, info=False):
def __init__(self, address, port):
self.server_address = address
self.server_port = port
......@@ -36,32 +37,27 @@ class Client(object):
s.connect((address, port))
self.connection = Connection(s)
if info:
self.info = []
else:
self.info = None
msg = Register()
msg.type = Register.REGISTER_CLIENT
msg.protocol_version = LOOM_PROTOCOL_VERSION
msg.info = info
self._send_message(msg)
while self.symbols is None:
self._read_symbols()
def submit(self, plan, results):
msg = plan.create_message(self.symbols)
def submit(self, plan, results, report=None):
msg = ClientSubmit()
msg.report = bool(report)
plan.set_message(msg.plan, self.symbols)
if isinstance(results, Task):
single_result = True
msg.result_ids.extend((results.id,))
msg.plan.result_ids.extend((results.id,))
expected = 1
else:
single_result = False
r = set(results)
msg.result_ids.extend(r.id for r in r)
msg.plan.result_ids.extend(r.id for r in r)
expected = len(r)
self._send_message(msg)
......@@ -74,18 +70,35 @@ class Client(object):
if cmsg.type == ClientMessage.DATA:
prologue = cmsg.data
data[prologue.id] = self._receive_data()
elif cmsg.type == ClientMessage.INFO:
self.add_info(cmsg.info)
elif cmsg.type == ClientMessage.EVENT:
self.process_event(cmsg.event)
elif cmsg.type == ClientMessage.ERROR:
self.process_error(cmsg)
else:
assert 0
if report:
self._write_report(report, plan)
if single_result:
return data[results.id]
else:
return [data[task.id] for task in results]
def _symbol_list(self):
symbols = [None] * len(self.symbols)
for name, index in self.symbols.items():
symbols[index] = name
return symbols
def _write_report(self, report_filename, plan):
report_msg = Report()
report_msg.symbols.extend(self._symbol_list())
plan.set_message(report_msg.plan, self.symbols)
with open(report_filename + ".report", "w") as f:
f.write(report_msg.SerializeToString())
def _read_symbols(self):
msg = self.connection.receive_message()
cmsg = ClientMessage()
......@@ -102,8 +115,8 @@ class Client(object):
error = cmsg.error
raise TaskFailed(error.id, error.worker, error.error_msg)
def add_info(self, info):
self.info.append((info.id, info.worker))
def process_event(self, event):
assert 0
def _receive_data(self):
msg_data = Data()
......
This diff is collapsed.
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: loomreport.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
import loomplan_pb2
import loomcomm_pb2
DESCRIPTOR = _descriptor.FileDescriptor(
name='loomreport.proto',
package='loomcomm',
serialized_pb=_b('\n\x10loomreport.proto\x12\x08loomcomm\x1a\x0eloomplan.proto\x1a\x0eloomcomm.proto\"X\n\x06Report\x12\x0f\n\x07symbols\x18\x01 \x03(\t\x12\x1c\n\x04plan\x18\x02 \x02(\x0b\x32\x0e.loomplan.Plan\x12\x1f\n\x06\x65vents\x18\x03 \x03(\x0b\x32\x0f.loomcomm.EventB\x02H\x03')
,
dependencies=[loomplan_pb2.DESCRIPTOR,loomcomm_pb2.DESCRIPTOR,])
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
_REPORT = _descriptor.Descriptor(
name='Report',
full_name='loomcomm.Report',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='symbols', full_name='loomcomm.Report.symbols', index=0,
number=1, type=9, cpp_type=9, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='plan', full_name='loomcomm.Report.plan', index=1,
number=2, type=11, cpp_type=10, label=2,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='events', full_name='loomcomm.Report.events', index=2,
number=3, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
extension_ranges=[],
oneofs=[
],
serialized_start=62,
serialized_end=150,
)
_REPORT.fields_by_name['plan'].message_type = loomplan_pb2._PLAN
_REPORT.fields_by_name['events'].message_type = loomcomm_pb2._EVENT
DESCRIPTOR.message_types_by_name['Report'] = _REPORT
Report = _reflection.GeneratedProtocolMessageType('Report', (_message.Message,), dict(
DESCRIPTOR = _REPORT,
__module__ = 'loomreport_pb2'
# @@protoc_insertion_point(class_scope:loomcomm.Report)
))
_sym_db.RegisterMessage(Report)
DESCRIPTOR.has_options = True
DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('H\003'))
# @@protoc_insertion_point(module_scope)
import loomplan_pb2
import loomrun_pb2
import gv
import struct
......@@ -204,9 +203,7 @@ class Plan(object):
task.policy = POLICY_SIMPLE
return self.add(task)
def create_message(self, symbols):
msg = loomplan_pb2.Plan()
def set_message(self, msg, symbols):
requests = set()
for task in self.tasks:
if task.resource_request:
......@@ -221,23 +218,3 @@ class Plan(object):
t = msg.tasks.add()
task.set_message(t, symbols, requests)
return msg
def write_dot(self, filename, info=None):
colors = ["red", "green", "blue", "orange", "violet"]
if info:
w = sorted(set(worker for id, worker in info))
workers = {}
for id, worker in info:
workers[id] = w.index(worker)
del w
else:
workers = None
graph = gv.Graph()
for task in self.tasks:
node = graph.node(task.id)
if workers:
node.color = colors[workers[task.id] % len(colors)]
node.label = "{}\n{}".format(str(task.id), task.task_type)
for inp in task.inputs:
graph.node(inp.id).add_arc(node)
graph.write(filename)
This diff is collapsed.
This diff is collapsed.
package loomcomm;
option optimize_for = LITE_RUNTIME;
import "loomplan.proto";
message Register {
enum Type {
......@@ -15,9 +16,6 @@ message Register {
repeated string task_types = 4;
repeated string data_types = 5;
optional int32 cpus = 6;
// Client
optional bool info = 10;
}
message ServerMessage {
......@@ -85,9 +83,17 @@ message Data
optional uint64 arg1_u64 = 9;
}
message Info {
required int32 id = 1;
required string worker = 2;
message Event {
enum Type {
TASK_START = 1;
TASK_END = 2;
SEND_START = 3;
SEND_END = 4;
}
required uint64 time = 1;
required Type type = 2;
required int32 id = 3;
optional int32 worker_index = 4;
}
message Error {
......@@ -99,13 +105,18 @@ message Error {
message ClientMessage {
enum Type {
DATA = 1;
INFO = 2;
EVENT = 2;
ERROR = 3;
DICTIONARY = 4;
}
required Type type = 1;
optional DataPrologue data = 2;
optional Info info = 3;
optional Event event = 3;
optional Error error = 4;
repeated string symbols = 5;
}
message ClientSubmit {
required loomplan.Plan plan = 1;
required bool report = 2;
}
package loomcomm;
option optimize_for = LITE_RUNTIME;
import "loomplan.proto";
import "loomcomm.proto";
message Report {
repeated string symbols = 1;
required loomplan.Plan plan = 2;
repeated loomcomm.Event events = 3;
}
......@@ -15,3 +15,4 @@ protoc loomrun.proto --cpp_out=${LIBLOOM_DIR}/tasks
protoc loomcomm.proto --python_out=${CLIENT_DIR}
protoc loomplan.proto --python_out=${CLIENT_DIR}
protoc loomrun.proto --python_out=${CLIENT_DIR}
protoc loomreport.proto --python_out=${CLIENT_DIR}
# HACK! We need to fix this
import sys
import os
sys.path.insert(0,
os.path.join(
os.path.dirname(__file__),
"..",
"client"))
import loomreport_pb2 # noqa
import gv # noqas
class Report:
def __init__(self, filename):
with open(filename) as f:
raw_data = f.read()
self.report_msg = loomreport_pb2.Report()
self.report_msg.ParseFromString(raw_data)
self.symbols = [s.replace("loom", "L")
for s in self.report_msg.symbols]
def create_graph(self):
graph = gv.Graph()
symbols = self.symbols
for i, task in enumerate(self.report_msg.plan.tasks):
node = graph.node(i)
node.label = symbols[task.task_type]
for j in task.input_ids:
graph.node(j).add_arc(node)
return graph
from report import Report
import argparse
import subprocess
def parse_args():
parser = argparse.ArgumentParser(
description="rview -- Loom report inscpector")
parser.add_argument("report",
metavar="REPORT",
type=str,
help="Path to report")
parser.add_argument("--show-symbols",
action="store_true")
parser.add_argument("--show-graph",
action="store_true")
return parser.parse_args()
def run_program(args, stdin=None):
p = subprocess.Popen(args, stdin=subprocess.PIPE)
p.communicate(input=stdin)
def show_symbols(report):
for i, symbol in enumerate(report.symbols):
print "{}: {}".format(i, symbol)
def show_graph(report):
dot = report.create_graph().make_dot("Plan")
run_program(("xdot", "-"), dot)
def main():
args = parse_args()
report = Report(args.report)
if args.show_symbols:
show_symbols(report)
if args.show_graph:
show_graph(report)
if __name__ == "__main__":
main()
......@@ -7,8 +7,9 @@
using namespace loom;
ClientConnection::ClientConnection(Server &server, std::unique_ptr<loom::Connection> connection, bool info_flag)
: server(server), connection(std::move(connection)), info_flag(info_flag)
ClientConnection::ClientConnection(Server &server,
std::unique_ptr<loom::Connection> connection)
: server(server), connection(std::move(connection))
{
this->connection->set_callback(this);
llog->info("Client {} connected", this->connection->get_peername());
......@@ -36,13 +37,14 @@ ClientConnection::~ClientConnection()
void ClientConnection::on_message(const char *buffer, size_t size)
{
llog->debug("Plan received");
loomplan::Plan plan;
plan.ParseFromArray(buffer, size);
loomcomm::ClientSubmit submit;
submit.ParseFromArray(buffer, size);
auto& task_manager = server.get_task_manager();
const loomplan::Plan &plan = submit.plan();
loom::Id id_base = server.new_id(plan.tasks_size());
task_manager.add_plan(Plan(plan, id_base, server.get_dictionary()));
llog->info("Plan submitted tasks={}", plan.tasks_size());
llog->info("Plan submitted tasks={} report={}", plan.tasks_size(), submit.report());
}
void ClientConnection::on_close()
......
......@@ -13,7 +13,7 @@ class Server;
class ClientConnection : public loom::ConnectionCallback {
public:
ClientConnection(Server &server,
std::unique_ptr<loom::Connection> connection, bool info_flag);
std::unique_ptr<loom::Connection> connection);
~ClientConnection();
void on_message(const char *buffer, size_t size);
void on_close();
......@@ -22,14 +22,9 @@ public:
connection->send_buffer(buffer);
}
bool has_info_flag() const {
return info_flag;
}
protected:
Server &server;
std::unique_ptr<loom::Connection> connection;
bool info_flag;
};
......
......@@ -16,6 +16,8 @@ ComputationState::ComputationState(Server &server) : server(server)
get_task_id = dictionary.find_or_create("loom/base/get");
dslice_task_id = dictionary.find_or_create("loom/scheduler/dslice");
dget_task_id = dictionary.find_or_create("loom/scheduler/dget");
base_time = uv_now(server.get_loop());
}
void ComputationState::set_plan(Plan &&plan)
......
......@@ -62,12 +62,15 @@ private:
Plan plan;
Server &server;
uint64_t base_time;
loom::Id dslice_task_id;
loom::Id dget_task_id;
loom::Id slice_task_id;
loom::Id get_task_id;
WorkerConnection *get_best_holder_of_deps(PlanNode *task);
WorkerConnection *find_best_worker_for_node(PlanNode *task);
......
......@@ -61,10 +61,8 @@ void FreshConnection::on_message(const char *buffer, size_t size)
return;
}
if (msg.type() == loomcomm::Register_Type_REGISTER_CLIENT) {
bool info_flag = msg.has_info() && msg.info();
auto cconn = std::make_unique<ClientConnection>(server,
std::move(connection),
info_flag);
std::move(connection));
server.add_client_connection(std::move(cconn));
assert(connection.get() == nullptr);
server.remove_freshconnection(*this);
......
......@@ -81,9 +81,10 @@ void Server::remove_freshconnection(FreshConnection &conn)
void Server::on_task_finished(loom::Id id, size_t size, size_t length, WorkerConnection *wc)
{
assert(client_connection);
if (client_connection->has_info_flag()) {
/*
if (client_connection->is_report_enabled()) {
assert(0);
/*loomcomm::ClientMessage cmsg;
loomcomm::ClientMessage cmsg;
cmsg.set_type(loomcomm::ClientMessage_Type_INFO);
loomcomm::Info *info = cmsg.mutable_info();
info->set_id(task.get_id());
......@@ -95,8 +96,8 @@ void Server::on_task_finished(loom::Id id, size_t size, size_t length, WorkerCon
SendBuffer *buffer = new SendBuffer;
buffer->add(cmsg);
client_connection->send_buffer(buffer);*/
}
client_connection->send_buffer(buffer);
}*/
task_manager.on_task_finished(id, size, length, wc);
}
......
......@@ -37,7 +37,7 @@ def test_cv_iris(loom_env):
[(chunk, "testdata"), (model, "model")])
predict.append(task)
results = loom_env.submit(p, predict)
results = loom_env.submit(p, predict, report="cv")
assert len(results) == CHUNKS
for line in results:
......
......@@ -47,7 +47,6 @@ class Env():
class LoomEnv(Env):
PORT = 19010
info = False
_client = None
def start(self, workers_count, cpus=1):
......@@ -88,11 +87,13 @@ class LoomEnv(Env):
@property
def client(self):
if self._client is None:
self._client = client.Client("localhost", self.PORT, self.info)
self._client = client.Client("localhost", self.PORT)
return self._client
def submit(self, plan, results):
return self.client.submit(plan, results)
def submit(self, plan, results, report=None):
if report:
report = os.path.join(LOOM_TEST_BUILD_DIR, report)
return self.client.submit(plan, results, report)
@pytest.yield_fixture(autouse=True, scope="function")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment