Commit 1b5408ed authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: Task tracing

parent a18f93e2
......@@ -62,6 +62,9 @@ class Client(object):
self._send_message(msg)
if report:
report_data = self._create_report(plan)
data = {}
while expected != len(data):
msg = self.connection.receive_message()
......@@ -71,14 +74,14 @@ class Client(object):
prologue = cmsg.data
data[prologue.id] = self._receive_data()
elif cmsg.type == ClientMessage.EVENT:
self.process_event(cmsg.event)
self.process_event(cmsg.event, report_data)
elif cmsg.type == ClientMessage.ERROR:
self.process_error(cmsg)
else:
assert 0
if report:
self._write_report(report, plan)
self._write_report(report_data, report)
if single_result:
return data[results.id]
......@@ -91,13 +94,15 @@ class Client(object):
symbols[index] = name
return symbols
def _write_report(self, report_filename, plan):
def _create_report(self, plan):
report_msg = Report()
report_msg.symbols.extend(self._symbol_list())
plan.set_message(report_msg.plan, self.symbols)
return report_msg
def _write_report(self, report_data, report_filename):
with open(report_filename + ".report", "w") as f:
f.write(report_msg.SerializeToString())
f.write(report_data.SerializeToString())
def _read_symbols(self):
msg = self.connection.receive_message()
......@@ -115,8 +120,9 @@ class Client(object):
error = cmsg.error
raise TaskFailed(error.id, error.worker, error.error_msg)
def process_event(self, event):
assert 0
def process_event(self, event, report_data):
new_event = report_data.events.add()
new_event.CopyFrom(event)
def _receive_data(self):
msg_data = Data()
......
......@@ -19,7 +19,7 @@ import loomplan_pb2
DESCRIPTOR = _descriptor.FileDescriptor(
name='loomcomm.proto',
package='loomcomm',
serialized_pb=_b('\n\x0eloomcomm.proto\x12\x08loomcomm\x1a\x0eloomplan.proto\"\xc1\x01\n\x08Register\x12\x18\n\x10protocol_version\x18\x01 \x02(\x05\x12%\n\x04type\x18\x02 \x02(\x0e\x32\x17.loomcomm.Register.Type\x12\x0c\n\x04port\x18\x03 \x01(\x05\x12\x12\n\ntask_types\x18\x04 \x03(\t\x12\x12\n\ndata_types\x18\x05 \x03(\t\x12\x0c\n\x04\x63pus\x18\x06 \x01(\x05\"0\n\x04Type\x12\x13\n\x0fREGISTER_WORKER\x10\x01\x12\x13\n\x0fREGISTER_CLIENT\x10\x02\"&\n\rServerMessage\"\x15\n\x04Type\x12\r\n\tSTART_JOB\x10\x01\"\xf1\x01\n\rWorkerCommand\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.WorkerCommand.Type\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x11\n\ttask_type\x18\x03 \x01(\x05\x12\x13\n\x0btask_config\x18\x04 \x01(\t\x12\x13\n\x0btask_inputs\x18\x05 \x03(\x05\x12\x0f\n\x07\x61\x64\x64ress\x18\n \x01(\t\x12\x11\n\twith_size\x18\x0b \x01(\x08\x12\x0f\n\x07symbols\x18\x64 \x03(\t\"6\n\x04Type\x12\x08\n\x04TASK\x10\x01\x12\x08\n\x04SEND\x10\x02\x12\n\n\x06REMOVE\x10\x03\x12\x0e\n\nDICTIONARY\x10\x04\"\x9a\x01\n\x0eWorkerResponse\x12+\n\x04type\x18\x01 \x02(\x0e\x32\x1d.loomcomm.WorkerResponse.Type\x12\n\n\x02id\x18\x02 \x02(\x05\x12\x0c\n\x04size\x18\x03 \x01(\x04\x12\x0e\n\x06length\x18\x04 \x01(\x04\x12\x11\n\terror_msg\x18\x64 \x01(\t\"\x1e\n\x04Type\x12\n\n\x06\x46INISH\x10\x01\x12\n\n\x06\x46\x41ILED\x10\x02\"\x18\n\x08\x41nnounce\x12\x0c\n\x04port\x18\x01 \x02(\x05\"-\n\x0c\x44\x61taPrologue\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x11\n\tdata_size\x18\x03 \x01(\x04\"Y\n\x04\x44\x61ta\x12\x0f\n\x07type_id\x18\x01 \x02(\x05\x12\x0c\n\x04size\x18\x02 \x02(\x04\x12\x0e\n\x06length\x18\x03 \x01(\x04\x12\x10\n\x08\x61rg0_u64\x18\x08 \x01(\x04\x12\x10\n\x08\x61rg1_u64\x18\t \x01(\x04\"\x9f\x01\n\x05\x45vent\x12\x0c\n\x04time\x18\x01 \x02(\x04\x12\"\n\x04type\x18\x02 \x02(\x0e\x32\x14.loomcomm.Event.Type\x12\n\n\x02id\x18\x03 \x02(\x05\x12\x14\n\x0cworker_index\x18\x04 \x01(\x05\"B\n\x04Type\x12\x0e\n\nTASK_START\x10\x01\x12\x0c\n\x08TASK_END\x10\x02\x12\x0e\n\nSEND_START\x10\x03\x12\x0c\n\x08SEND_END\x10\x04\"6\n\x05\x45rror\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x0e\n\x06worker\x18\x02 \x02(\t\x12\x11\n\terror_msg\x18\x03 \x02(\t\"\xea\x01\n\rClientMessage\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.ClientMessage.Type\x12$\n\x04\x64\x61ta\x18\x02 \x01(\x0b\x32\x16.loomcomm.DataPrologue\x12\x1e\n\x05\x65vent\x18\x03 \x01(\x0b\x32\x0f.loomcomm.Event\x12\x1e\n\x05\x65rror\x18\x04 \x01(\x0b\x32\x0f.loomcomm.Error\x12\x0f\n\x07symbols\x18\x05 \x03(\t\"6\n\x04Type\x12\x08\n\x04\x44\x41TA\x10\x01\x12\t\n\x05\x45VENT\x10\x02\x12\t\n\x05\x45RROR\x10\x03\x12\x0e\n\nDICTIONARY\x10\x04\"<\n\x0c\x43lientSubmit\x12\x1c\n\x04plan\x18\x01 \x02(\x0b\x32\x0e.loomplan.Plan\x12\x0e\n\x06report\x18\x02 \x02(\x08\x42\x02H\x03')
serialized_pb=_b('\n\x0eloomcomm.proto\x12\x08loomcomm\x1a\x0eloomplan.proto\"\xc1\x01\n\x08Register\x12\x18\n\x10protocol_version\x18\x01 \x02(\x05\x12%\n\x04type\x18\x02 \x02(\x0e\x32\x17.loomcomm.Register.Type\x12\x0c\n\x04port\x18\x03 \x01(\x05\x12\x12\n\ntask_types\x18\x04 \x03(\t\x12\x12\n\ndata_types\x18\x05 \x03(\t\x12\x0c\n\x04\x63pus\x18\x06 \x01(\x05\"0\n\x04Type\x12\x13\n\x0fREGISTER_WORKER\x10\x01\x12\x13\n\x0fREGISTER_CLIENT\x10\x02\"&\n\rServerMessage\"\x15\n\x04Type\x12\r\n\tSTART_JOB\x10\x01\"\xf1\x01\n\rWorkerCommand\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.WorkerCommand.Type\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x11\n\ttask_type\x18\x03 \x01(\x05\x12\x13\n\x0btask_config\x18\x04 \x01(\t\x12\x13\n\x0btask_inputs\x18\x05 \x03(\x05\x12\x0f\n\x07\x61\x64\x64ress\x18\n \x01(\t\x12\x11\n\twith_size\x18\x0b \x01(\x08\x12\x0f\n\x07symbols\x18\x64 \x03(\t\"6\n\x04Type\x12\x08\n\x04TASK\x10\x01\x12\x08\n\x04SEND\x10\x02\x12\n\n\x06REMOVE\x10\x03\x12\x0e\n\nDICTIONARY\x10\x04\"\x9a\x01\n\x0eWorkerResponse\x12+\n\x04type\x18\x01 \x02(\x0e\x32\x1d.loomcomm.WorkerResponse.Type\x12\n\n\x02id\x18\x02 \x02(\x05\x12\x0c\n\x04size\x18\x03 \x01(\x04\x12\x0e\n\x06length\x18\x04 \x01(\x04\x12\x11\n\terror_msg\x18\x64 \x01(\t\"\x1e\n\x04Type\x12\n\n\x06\x46INISH\x10\x01\x12\n\n\x06\x46\x41ILED\x10\x02\"\x18\n\x08\x41nnounce\x12\x0c\n\x04port\x18\x01 \x02(\x05\"-\n\x0c\x44\x61taPrologue\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x11\n\tdata_size\x18\x03 \x01(\x04\"Y\n\x04\x44\x61ta\x12\x0f\n\x07type_id\x18\x01 \x02(\x05\x12\x0c\n\x04size\x18\x02 \x02(\x04\x12\x0e\n\x06length\x18\x03 \x01(\x04\x12\x10\n\x08\x61rg0_u64\x18\x08 \x01(\x04\x12\x10\n\x08\x61rg1_u64\x18\t \x01(\x04\"\x9c\x01\n\x05\x45vent\x12\x0c\n\x04time\x18\x01 \x02(\x04\x12\"\n\x04type\x18\x02 \x02(\x0e\x32\x14.loomcomm.Event.Type\x12\n\n\x02id\x18\x03 \x02(\x05\x12\x11\n\tworker_id\x18\x04 \x01(\x05\"B\n\x04Type\x12\x0e\n\nTASK_START\x10\x01\x12\x0c\n\x08TASK_END\x10\x02\x12\x0e\n\nSEND_START\x10\x03\x12\x0c\n\x08SEND_END\x10\x04\"6\n\x05\x45rror\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x0e\n\x06worker\x18\x02 \x02(\t\x12\x11\n\terror_msg\x18\x03 \x02(\t\"\xea\x01\n\rClientMessage\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.ClientMessage.Type\x12$\n\x04\x64\x61ta\x18\x02 \x01(\x0b\x32\x16.loomcomm.DataPrologue\x12\x1e\n\x05\x65vent\x18\x03 \x01(\x0b\x32\x0f.loomcomm.Event\x12\x1e\n\x05\x65rror\x18\x04 \x01(\x0b\x32\x0f.loomcomm.Error\x12\x0f\n\x07symbols\x18\x05 \x03(\t\"6\n\x04Type\x12\x08\n\x04\x44\x41TA\x10\x01\x12\t\n\x05\x45VENT\x10\x02\x12\t\n\x05\x45RROR\x10\x03\x12\x0e\n\nDICTIONARY\x10\x04\"<\n\x0c\x43lientSubmit\x12\x1c\n\x04plan\x18\x01 \x02(\x0b\x32\x0e.loomplan.Plan\x12\x0e\n\x06report\x18\x02 \x02(\x08\x42\x02H\x03')
,
dependencies=[loomplan_pb2.DESCRIPTOR,])
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
......@@ -143,8 +143,8 @@ _EVENT_TYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
serialized_start=939,
serialized_end=1005,
serialized_start=936,
serialized_end=1002,
)
_sym_db.RegisterEnumDescriptor(_EVENT_TYPE)
......@@ -173,8 +173,8 @@ _CLIENTMESSAGE_TYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
serialized_start=1244,
serialized_end=1298,
serialized_start=1241,
serialized_end=1295,
)
_sym_db.RegisterEnumDescriptor(_CLIENTMESSAGE_TYPE)
......@@ -562,7 +562,7 @@ _EVENT = _descriptor.Descriptor(
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='worker_index', full_name='loomcomm.Event.worker_index', index=3,
name='worker_id', full_name='loomcomm.Event.worker_id', index=3,
number=4, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
......@@ -581,7 +581,7 @@ _EVENT = _descriptor.Descriptor(
oneofs=[
],
serialized_start=846,
serialized_end=1005,
serialized_end=1002,
)
......@@ -624,8 +624,8 @@ _ERROR = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=1007,
serialized_end=1061,
serialized_start=1004,
serialized_end=1058,
)
......@@ -683,8 +683,8 @@ _CLIENTMESSAGE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=1064,
serialized_end=1298,
serialized_start=1061,
serialized_end=1295,
)
......@@ -720,8 +720,8 @@ _CLIENTSUBMIT = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=1300,
serialized_end=1360,
serialized_start=1297,
serialized_end=1357,
)
_REGISTER.fields_by_name['type'].enum_type = _REGISTER_TYPE
......
......@@ -2,6 +2,7 @@
#include "utils.h"
#include <string.h>
#include <libloom/compat.h>
using namespace loom;
......@@ -124,16 +125,19 @@ void Connection::_on_write(uv_write_t *write_req, int status)
void Connection::send_message(google::protobuf::MessageLite &message)
{
SendBuffer *buffer = new SendBuffer();
auto buffer = std::make_unique<SendBuffer>();
buffer->add(message);
send_buffer(buffer);
send_buffer(std::move(buffer));
}
void Connection::send_buffer(SendBuffer *buffer)
void Connection::send_buffer(std::unique_ptr<SendBuffer> buffer)
{
uv_buf_t *bufs = buffer->get_uv_bufs();
size_t count = buffer->get_uv_bufs_count();
UV_CHECK(uv_write(&buffer->request, (uv_stream_t *) &socket, bufs, count, _on_write));
SendBuffer *b = buffer.release();
// It will be released in _on_write callback
// It is stored in b->request.data
UV_CHECK(uv_write(&b->request, (uv_stream_t *) &socket, bufs, count, _on_write));
}
void Connection::_on_read(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
......
......@@ -72,7 +72,7 @@ public:
void connect(std::string host, int port);
void send_message(::google::protobuf::MessageLite &message);
void send_buffer(SendBuffer *buffer);
void send_buffer(std::unique_ptr<SendBuffer> buffer);
void close();
void close_and_discard_remaining_data();
......
......@@ -28,7 +28,7 @@ void InterConnection::on_connection()
send_message(msg);
for (auto& buffer : early_sends) {
connection.send_buffer(buffer.release());
connection.send_buffer(std::move(buffer));
}
early_sends.clear();
}
......@@ -100,7 +100,7 @@ void InterConnection::on_data_finish()
void InterConnection::send(Id id, std::shared_ptr<Data> &data, bool with_size)
{
SendBuffer *buffer = new SendBuffer();
auto buffer = std::make_unique<SendBuffer>();
loomcomm::DataPrologue msg;
msg.set_id(id);
......@@ -119,9 +119,9 @@ void InterConnection::send(Id id, std::shared_ptr<Data> &data, bool with_size)
assert(state == Connection::ConnectionOpen ||
state == Connection::ConnectionConnecting);
if (state == Connection::ConnectionOpen) {
connection.send_buffer(buffer);
connection.send_buffer(std::move(buffer));
} else {
early_sends.push_back(std::unique_ptr<SendBuffer>(buffer));
early_sends.push_back(std::move(buffer));
}
}
......
......@@ -2295,7 +2295,7 @@ const int Event::Type_ARRAYSIZE;
const int Event::kTimeFieldNumber;
const int Event::kTypeFieldNumber;
const int Event::kIdFieldNumber;
const int Event::kWorkerIndexFieldNumber;
const int Event::kWorkerIdFieldNumber;
#endif // !_MSC_VER
Event::Event()
......@@ -2319,7 +2319,7 @@ void Event::SharedCtor() {
time_ = GOOGLE_ULONGLONG(0);
type_ = 1;
id_ = 0;
worker_index_ = 0;
worker_id_ = 0;
::memset(_has_bits_, 0, sizeof(_has_bits_));
}
......@@ -2369,7 +2369,7 @@ void Event::Clear() {
} while (0)
if (_has_bits_[0 / 32] & 15) {
ZR_(id_, worker_index_);
ZR_(id_, worker_id_);
time_ = GOOGLE_ULONGLONG(0);
type_ = 1;
}
......@@ -2441,18 +2441,18 @@ bool Event::MergePartialFromCodedStream(
} else {
goto handle_unusual;
}
if (input->ExpectTag(32)) goto parse_worker_index;
if (input->ExpectTag(32)) goto parse_worker_id;
break;
}
// optional int32 worker_index = 4;
// optional int32 worker_id = 4;
case 4: {
if (tag == 32) {
parse_worker_index:
parse_worker_id:
DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive<
::google::protobuf::int32, ::google::protobuf::internal::WireFormatLite::TYPE_INT32>(
input, &worker_index_)));
set_has_worker_index();
input, &worker_id_)));
set_has_worker_id();
} else {
goto handle_unusual;
}
......@@ -2501,9 +2501,9 @@ void Event::SerializeWithCachedSizes(
::google::protobuf::internal::WireFormatLite::WriteInt32(3, this->id(), output);
}
// optional int32 worker_index = 4;
if (has_worker_index()) {
::google::protobuf::internal::WireFormatLite::WriteInt32(4, this->worker_index(), output);
// optional int32 worker_id = 4;
if (has_worker_id()) {
::google::protobuf::internal::WireFormatLite::WriteInt32(4, this->worker_id(), output);
}
output->WriteRaw(unknown_fields().data(),
......@@ -2535,11 +2535,11 @@ int Event::ByteSize() const {
this->id());
}
// optional int32 worker_index = 4;
if (has_worker_index()) {
// optional int32 worker_id = 4;
if (has_worker_id()) {
total_size += 1 +
::google::protobuf::internal::WireFormatLite::Int32Size(
this->worker_index());
this->worker_id());
}
}
......@@ -2568,8 +2568,8 @@ void Event::MergeFrom(const Event& from) {
if (from.has_id()) {
set_id(from.id());
}
if (from.has_worker_index()) {
set_worker_index(from.worker_index());
if (from.has_worker_id()) {
set_worker_id(from.worker_id());
}
}
mutable_unknown_fields()->append(from.unknown_fields());
......@@ -2592,7 +2592,7 @@ void Event::Swap(Event* other) {
std::swap(time_, other->time_);
std::swap(type_, other->type_);
std::swap(id_, other->id_);
std::swap(worker_index_, other->worker_index_);
std::swap(worker_id_, other->worker_id_);
std::swap(_has_bits_[0], other->_has_bits_[0]);
_unknown_fields_.swap(other->_unknown_fields_);
std::swap(_cached_size_, other->_cached_size_);
......
......@@ -1132,12 +1132,12 @@ class Event : public ::google::protobuf::MessageLite {
inline ::google::protobuf::int32 id() const;
inline void set_id(::google::protobuf::int32 value);
// optional int32 worker_index = 4;
inline bool has_worker_index() const;
inline void clear_worker_index();
static const int kWorkerIndexFieldNumber = 4;
inline ::google::protobuf::int32 worker_index() const;
inline void set_worker_index(::google::protobuf::int32 value);
// optional int32 worker_id = 4;
inline bool has_worker_id() const;
inline void clear_worker_id();
static const int kWorkerIdFieldNumber = 4;
inline ::google::protobuf::int32 worker_id() const;
inline void set_worker_id(::google::protobuf::int32 value);
// @@protoc_insertion_point(class_scope:loomcomm.Event)
private:
......@@ -1147,8 +1147,8 @@ class Event : public ::google::protobuf::MessageLite {
inline void clear_has_type();
inline void set_has_id();
inline void clear_has_id();
inline void set_has_worker_index();
inline void clear_has_worker_index();
inline void set_has_worker_id();
inline void clear_has_worker_id();
::std::string _unknown_fields_;
......@@ -1157,7 +1157,7 @@ class Event : public ::google::protobuf::MessageLite {
::google::protobuf::uint64 time_;
int type_;
::google::protobuf::int32 id_;
::google::protobuf::int32 worker_index_;
::google::protobuf::int32 worker_id_;
#ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER
friend void protobuf_AddDesc_loomcomm_2eproto_impl();
#else
......@@ -2563,28 +2563,28 @@ inline void Event::set_id(::google::protobuf::int32 value) {
// @@protoc_insertion_point(field_set:loomcomm.Event.id)
}
// optional int32 worker_index = 4;
inline bool Event::has_worker_index() const {
// optional int32 worker_id = 4;
inline bool Event::has_worker_id() const {
return (_has_bits_[0] & 0x00000008u) != 0;
}
inline void Event::set_has_worker_index() {
inline void Event::set_has_worker_id() {
_has_bits_[0] |= 0x00000008u;
}
inline void Event::clear_has_worker_index() {
inline void Event::clear_has_worker_id() {
_has_bits_[0] &= ~0x00000008u;
}
inline void Event::clear_worker_index() {
worker_index_ = 0;
clear_has_worker_index();
inline void Event::clear_worker_id() {
worker_id_ = 0;
clear_has_worker_id();
}
inline ::google::protobuf::int32 Event::worker_index() const {
// @@protoc_insertion_point(field_get:loomcomm.Event.worker_index)
return worker_index_;
inline ::google::protobuf::int32 Event::worker_id() const {
// @@protoc_insertion_point(field_get:loomcomm.Event.worker_id)
return worker_id_;
}
inline void Event::set_worker_index(::google::protobuf::int32 value) {
set_has_worker_index();
worker_index_ = value;
// @@protoc_insertion_point(field_set:loomcomm.Event.worker_index)
inline void Event::set_worker_id(::google::protobuf::int32 value) {
set_has_worker_id();
worker_id_ = value;
// @@protoc_insertion_point(field_set:loomcomm.Event.worker_id)
}
// -------------------------------------------------------------------
......
......@@ -93,7 +93,7 @@ message Event {
required uint64 time = 1;
required Type type = 2;
required int32 id = 3;
optional int32 worker_index = 4;
optional int32 worker_id = 4;
}
message Error {
......
......@@ -9,6 +9,7 @@ sys.path.insert(0,
"client"))
import loomreport_pb2 # noqa
import loomcomm_pb2 # noqa
import gv # noqas
......@@ -25,11 +26,84 @@ class Report:
for s in self.report_msg.symbols]
def create_graph(self):
TASK_START = loomcomm_pb2.Event.TASK_START
graph = gv.Graph()
symbols = self.symbols
colors = ["red", "green", "blue", "yellow", "orange", "pink"]
task_workers = {}
for event in self.report_msg.events:
if event.type == TASK_START:
task_workers[event.id] = event.worker_id
for i, task in enumerate(self.report_msg.plan.tasks):
node = graph.node(i)
node.label = symbols[task.task_type]
if task_workers:
node.color = colors[task_workers[i]]
for j in task.input_ids:
graph.node(j).add_arc(node)
return graph
def get_events_hline_data(self):
TASK_START = loomcomm_pb2.Event.TASK_START
TASK_END = loomcomm_pb2.Event.TASK_END
workers = {}
symbol_ids = set()
tasks = self.report_msg.plan.tasks
for task in tasks:
symbol_ids.add(task.task_type)
symbols = list(symbol_ids)
symbols.sort(key=lambda x: self.symbols[x])
for event in self.report_msg.events:
lst = workers.get(event.worker_id)
if lst is None:
lst = []
workers[event.worker_id] = lst
if event.type == TASK_END:
for lst2 in lst:
if lst2[-1].type == TASK_START and lst2[-1].id == event.id:
lst2.append(event)
break
else:
assert 0
else:
for lst2 in lst:
if lst2[-1].type == TASK_END:
lst2.append(event)
break
else:
lst.append([event])
y = []
xmin = []
xmax = []
colors = []
labels = []
color_list = ["red", "green", "blue", "pink", "orange"]
index = 0
for w_index, (worker, lst) in enumerate(sorted(workers.items())):
labels.append("Worker {}".format(w_index))
labels.extend([""] * len(lst))
for lst2 in lst:
for i in xrange(0, len(lst2), 2):
y.append(index)
xmin.append(lst2[i].time)
xmax.append(lst2[i + 1].time)
task = tasks[lst2[i].id]
colors.append(color_list[symbols.index(task.task_type)])
index += 1
index += 1
return (y,
xmin,
xmax,
colors,
labels,
[(self.symbols[symbol_id], color_list[symbols.index(symbol_id)])
for symbol_id in symbols])
......@@ -2,6 +2,9 @@
from report import Report
import argparse
import subprocess
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import sys
def parse_args():
......@@ -19,6 +22,9 @@ def parse_args():
parser.add_argument("--show-graph",
action="store_true")
parser.add_argument("--show-trace",
action="store_true")
return parser.parse_args()
......@@ -37,16 +43,38 @@ def show_graph(report):
run_program(("xdot", "-"), dot)
def show_trace(report):
plt.ion()
plt.gca().invert_yaxis()
y, xmin, xmax, colors, labels, symbols = report.get_events_hline_data()
plt.hlines(y, xmin, xmax, colors, linewidth=2)
plt.scatter(xmin, y, marker='|', s=100, c=colors)
plt.scatter(xmax, y, marker='|', s=100, c=colors)
plt.yticks(range(len(labels)), labels)
plt.legend(handles=[mpatches.Patch(color=c, label=s)
for s, c in symbols])
plt.show(block=True)
def main():
args = parse_args()
report = Report(args.report)
empty = True
if args.show_symbols:
empty = False
show_symbols(report)
if args.show_graph:
empty = False
show_graph(report)
if args.show_trace:
empty = False
show_trace(report)
if empty:
sys.stderr.write("No operation specified\n")
if __name__ == "__main__":
main()
#include "clientconn.h"
#include "server.h"
#include "libloom/loomplan.pb.h"
#include "libloom/loomcomm.pb.h"
#include "libloom/compat.h"
#include "libloom/log.h"
using namespace loom;
......@@ -23,9 +25,9 @@ ClientConnection::ClientConnection(Server &server,
std::string *s = cmsg.add_symbols();
*s = symbol;
}
SendBuffer *send_buffer = new SendBuffer();
auto send_buffer = std::make_unique<SendBuffer>();
send_buffer->add(cmsg);
this->connection->send_buffer(send_buffer);
this->connection->send_buffer(std::move(send_buffer));
// End of send dictionary
}
......@@ -43,7 +45,7 @@ void ClientConnection::on_message(const char *buffer, size_t size)
const loomplan::Plan &plan = submit.plan();
loom::Id id_base = server.new_id(plan.tasks_size());
task_manager.add_plan(Plan(plan, id_base, server.get_dictionary()));
task_manager.add_plan(Plan(plan, id_base, server.get_dictionary()), submit.report());
llog->info("Plan submitted tasks={} report={}", plan.tasks_size(), submit.report());
}
......
......@@ -18,8 +18,8 @@ public:
void on_message(const char *buffer, size_t size);
void on_close();
void send_buffer(loom::SendBuffer *buffer) {
connection->send_buffer(buffer);
void send_buffer(std::unique_ptr<loom::SendBuffer> buffer) {
connection->send_buffer(std::move(buffer));
}
protected:
......
......@@ -40,26 +40,26 @@ void ComputationState::add_worker(WorkerConnection* wconn)
w.n_tasks = 0;
}
void ComputationState::set_running_task(WorkerConnection *wc, loom::Id id)
void ComputationState::set_running_task(const PlanNode &node, WorkerConnection *wc)
{
TaskState &state = get_state(id);
auto it = pending_tasks.find(id);
TaskState &state = get_state(node.get_id());
auto it = pending_tasks.find(node.get_id());
assert(it != pending_tasks.end());
pending_tasks.erase(it);
assert(state.get_worker_status(wc) == TaskState::S_NONE);
state.set_worker_status(wc, TaskState::S_RUNNING);
workers[wc].n_tasks++;
workers[wc].n_tasks += node.get_n_cpus();
}
void ComputationState::set_task_finished(loom::Id id, size_t size, size_t length, WorkerConnection *wc)
void ComputationState::set_task_finished(const PlanNode&node, size_t size, size_t length, WorkerConnection *wc)
{
TaskState &state = get_state(id);
TaskState &state = get_state(node.get_id());
assert(state.get_worker_status(wc) == TaskState::S_RUNNING);
state.set_worker_status(wc, TaskState::S_OWNER);
state.set_size(size);
state.set_length(length);
workers[wc].n_tasks--;
workers[wc].n_tasks -= node.get_n_cpus();
}
void ComputationState::remove_state(loom::Id id)
......@@ -265,7 +265,7 @@ int ComputationState::get_max_cpus()
TaskDistribution ComputationState::compute_distribution()
{
loom::llog->debug("Computation for distribution of {} task(s)", pending_tasks.size());
loom::llog->debug("Computation of distribution: {} task(s)", pending_tasks.size());
TaskDistribution result;
if (pending_tasks.empty()) {
......
......@@ -20,7 +20,7 @@ public:
void set_plan(Plan &&plan);
void add_worker(WorkerConnection* wc);
void set_running_task(WorkerConnection *wc, loom::Id id);
void set_running_task(const PlanNode &node, WorkerConnection *wc);
TaskDistribution compute_initial_distribution();
TaskDistribution compute_distribution();
......@@ -43,7 +43,7 @@ public:
}
void add_ready_nodes(const std::vector<loom::Id> &ids);
void set_task_finished(loom::Id id, size_t size, size_t length, WorkerConnection *wc);
void set_task_finished(const PlanNode& node, size_t size, size_t length, WorkerConnection *wc);
const Plan& get_plan() const {
return plan;
......@@ -55,6 +55,10 @@ public:
void add_ready_nexts(const PlanNode &node);
bool is_finished() const;
uint64_t get_base_time() const {
return base_time;
}
private:
std::unordered_map<loom::Id, TaskState> states;
std::unordered_map<WorkerConnection*, WorkerInfo> workers;
......
......@@ -111,7 +111,7 @@ void DWConnection::on_data_chunk(const char *buffer, size_t size)
void DWConnection::on_data_finish()
{
llog->debug("Resending data to client");
worker.server.get_client_connection().send_buffer(send_buffer.release());
worker.server.get_client_connection().send_buffer(std::move(send_buffer));
if (worker.server.get_task_manager().is_plan_finished()) {
loom::llog->info("Plan is finished");
}
......
......@@ -12,7 +12,8 @@
using namespace loom;
FreshConnection::FreshConnection(Server &server) :
server(server), connection(std::make_unique<Connection>(this, server.get_loop()))
server(server),
connection(std::make_unique<Connection>(this, server.get_loop()))
{
}
......@@ -54,7 +55,8 @@ void FreshConnection::on_message(const char *buffer, size_t size)
address.str(),
task_types,
data_types,
msg.cpus());
msg.cpus(),
server.new_id());
server.add_worker_connection(std::move(wconn));
server.remove_freshconnection(*this);
......
......@@ -80,24 +80,6 @@ void Server::remove_freshconnection(FreshConnection &conn)
void Server::on_task_finished(loom::Id id, size_t size, size_t length, WorkerConnection *wc)
{
assert(client_connection);
/*
if (client_connection->is_report_enabled()) {
assert(0);
loomcomm::ClientMessage cmsg;
cmsg.set_type(loomcomm::ClientMessage_Type_INFO);
loomcomm::Info *info = cmsg.mutable_info();
info->set_id(task.get_id());