Commit 52e79ea2 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: Task labels & rview improved

parent abe98999
......@@ -18,7 +18,7 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='loomplan.proto',
package='loomplan',
serialized_pb=_b('\n\x0eloomplan.proto\x12\x08loomplan\"\xe0\x01\n\x04Task\x12\x11\n\ttask_type\x18\x01 \x02(\x05\x12\x0e\n\x06\x63onfig\x18\x02 \x02(\x0c\x12\x11\n\tinput_ids\x18\x03 \x03(\x05\x12\x36\n\x06policy\x18\x04 \x01(\x0e\x32\x15.loomplan.Task.Policy:\x0fPOLICY_STANDARD\x12\"\n\x16resource_request_index\x18\x05 \x01(\x05:\x02-1\"F\n\x06Policy\x12\x13\n\x0fPOLICY_STANDARD\x10\x01\x12\x11\n\rPOLICY_SIMPLE\x10\x02\x12\x14\n\x10POLICY_SCHEDULER\x10\x03\"0\n\x08Resource\x12\x15\n\rresource_type\x18\x01 \x02(\x05\x12\r\n\x05value\x18\x02 \x02(\x05\"8\n\x0fResourceRequest\x12%\n\tresources\x18\x01 \x03(\x0b\x32\x12.loomplan.Resource\"o\n\x04Plan\x12\x34\n\x11resource_requests\x18\x01 \x03(\x0b\x32\x19.loomplan.ResourceRequest\x12\x1d\n\x05tasks\x18\x02 \x03(\x0b\x32\x0e.loomplan.Task\x12\x12\n\nresult_ids\x18\x03 \x03(\x05\x42\x02H\x03')
serialized_pb=_b('\n\x0eloomplan.proto\x12\x08loomplan\"\xef\x01\n\x04Task\x12\x11\n\ttask_type\x18\x01 \x02(\x05\x12\x0e\n\x06\x63onfig\x18\x02 \x02(\x0c\x12\x11\n\tinput_ids\x18\x03 \x03(\x05\x12\x36\n\x06policy\x18\x04 \x01(\x0e\x32\x15.loomplan.Task.Policy:\x0fPOLICY_STANDARD\x12\"\n\x16resource_request_index\x18\x05 \x01(\x05:\x02-1\x12\r\n\x05label\x18\x0c \x01(\t\"F\n\x06Policy\x12\x13\n\x0fPOLICY_STANDARD\x10\x01\x12\x11\n\rPOLICY_SIMPLE\x10\x02\x12\x14\n\x10POLICY_SCHEDULER\x10\x03\"0\n\x08Resource\x12\x15\n\rresource_type\x18\x01 \x02(\x05\x12\r\n\x05value\x18\x02 \x02(\x05\"8\n\x0fResourceRequest\x12%\n\tresources\x18\x01 \x03(\x0b\x32\x12.loomplan.Resource\"o\n\x04Plan\x12\x34\n\x11resource_requests\x18\x01 \x03(\x0b\x32\x19.loomplan.ResourceRequest\x12\x1d\n\x05tasks\x18\x02 \x03(\x0b\x32\x0e.loomplan.Task\x12\x12\n\nresult_ids\x18\x03 \x03(\x05\x42\x02H\x03')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
......@@ -45,8 +45,8 @@ _TASK_POLICY = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
serialized_start=183,
serialized_end=253,
serialized_start=198,
serialized_end=268,
)
_sym_db.RegisterEnumDescriptor(_TASK_POLICY)
......@@ -93,6 +93,13 @@ _TASK = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='label', full_name='loomplan.Task.label', index=5,
number=12, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
......@@ -106,7 +113,7 @@ _TASK = _descriptor.Descriptor(
oneofs=[
],
serialized_start=29,
serialized_end=253,
serialized_end=268,
)
......@@ -142,8 +149,8 @@ _RESOURCE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=255,
serialized_end=303,
serialized_start=270,
serialized_end=318,
)
......@@ -172,8 +179,8 @@ _RESOURCEREQUEST = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=305,
serialized_end=361,
serialized_start=320,
serialized_end=376,
)
......@@ -216,8 +223,8 @@ _PLAN = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=363,
serialized_end=474,
serialized_start=378,
serialized_end=489,
)
_TASK.fields_by_name['policy'].enum_type = _TASK_POLICY
......
......@@ -15,6 +15,7 @@ class Task(object):
config = ""
policy = POLICY_STANDARD
resource_request = None
label = None
def set_message(self, msg, symbols, requests):
msg.config = self.config
......@@ -23,6 +24,8 @@ class Task(object):
msg.policy = self.policy
if self.resource_request:
msg.resource_request_index = requests.index(self.resource_request)
if self.label:
msg.label = self.label
class ResourceRequest(object):
......
......@@ -87,6 +87,7 @@ const int Task::kConfigFieldNumber;
const int Task::kInputIdsFieldNumber;
const int Task::kPolicyFieldNumber;
const int Task::kResourceRequestIndexFieldNumber;
const int Task::kLabelFieldNumber;
#endif // !_MSC_VER
Task::Task()
......@@ -112,6 +113,7 @@ void Task::SharedCtor() {
config_ = const_cast< ::std::string*>(&::google::protobuf::internal::GetEmptyStringAlreadyInited());
policy_ = 1;
resource_request_index_ = -1;
label_ = const_cast< ::std::string*>(&::google::protobuf::internal::GetEmptyStringAlreadyInited());
::memset(_has_bits_, 0, sizeof(_has_bits_));
}
......@@ -124,6 +126,9 @@ void Task::SharedDtor() {
if (config_ != &::google::protobuf::internal::GetEmptyStringAlreadyInited()) {
delete config_;
}
if (label_ != &::google::protobuf::internal::GetEmptyStringAlreadyInited()) {
delete label_;
}
#ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER
if (this != &default_instance()) {
#else
......@@ -153,7 +158,7 @@ Task* Task::New() const {
}
void Task::Clear() {
if (_has_bits_[0 / 32] & 27) {
if (_has_bits_[0 / 32] & 59) {
task_type_ = 0;
if (has_config()) {
if (config_ != &::google::protobuf::internal::GetEmptyStringAlreadyInited()) {
......@@ -162,6 +167,11 @@ void Task::Clear() {
}
policy_ = 1;
resource_request_index_ = -1;
if (has_label()) {
if (label_ != &::google::protobuf::internal::GetEmptyStringAlreadyInited()) {
label_->clear();
}
}
}
input_ids_.Clear();
::memset(_has_bits_, 0, sizeof(_has_bits_));
......@@ -260,6 +270,19 @@ bool Task::MergePartialFromCodedStream(
} else {
goto handle_unusual;
}
if (input->ExpectTag(98)) goto parse_label;
break;
}
// optional string label = 12;
case 12: {
if (tag == 98) {
parse_label:
DO_(::google::protobuf::internal::WireFormatLite::ReadString(
input, this->mutable_label()));
} else {
goto handle_unusual;
}
if (input->ExpectAtEnd()) goto success;
break;
}
......@@ -317,6 +340,12 @@ void Task::SerializeWithCachedSizes(
::google::protobuf::internal::WireFormatLite::WriteInt32(5, this->resource_request_index(), output);
}
// optional string label = 12;
if (has_label()) {
::google::protobuf::internal::WireFormatLite::WriteStringMaybeAliased(
12, this->label(), output);
}
output->WriteRaw(unknown_fields().data(),
unknown_fields().size());
// @@protoc_insertion_point(serialize_end:loomplan.Task)
......@@ -353,6 +382,13 @@ int Task::ByteSize() const {
this->resource_request_index());
}
// optional string label = 12;
if (has_label()) {
total_size += 1 +
::google::protobuf::internal::WireFormatLite::StringSize(
this->label());
}
}
// repeated int32 input_ids = 3;
{
......@@ -393,6 +429,9 @@ void Task::MergeFrom(const Task& from) {
if (from.has_resource_request_index()) {
set_resource_request_index(from.resource_request_index());
}
if (from.has_label()) {
set_label(from.label());
}
}
mutable_unknown_fields()->append(from.unknown_fields());
}
......@@ -416,6 +455,7 @@ void Task::Swap(Task* other) {
input_ids_.Swap(&other->input_ids_);
std::swap(policy_, other->policy_);
std::swap(resource_request_index_, other->resource_request_index_);
std::swap(label_, other->label_);
std::swap(_has_bits_[0], other->_has_bits_[0]);
_unknown_fields_.swap(other->_unknown_fields_);
std::swap(_cached_size_, other->_cached_size_);
......
......@@ -169,6 +169,18 @@ class Task : public ::google::protobuf::MessageLite {
inline ::google::protobuf::int32 resource_request_index() const;
inline void set_resource_request_index(::google::protobuf::int32 value);
// optional string label = 12;
inline bool has_label() const;
inline void clear_label();
static const int kLabelFieldNumber = 12;
inline const ::std::string& label() const;
inline void set_label(const ::std::string& value);
inline void set_label(const char* value);
inline void set_label(const char* value, size_t size);
inline ::std::string* mutable_label();
inline ::std::string* release_label();
inline void set_allocated_label(::std::string* label);
// @@protoc_insertion_point(class_scope:loomplan.Task)
private:
inline void set_has_task_type();
......@@ -179,6 +191,8 @@ class Task : public ::google::protobuf::MessageLite {
inline void clear_has_policy();
inline void set_has_resource_request_index();
inline void clear_has_resource_request_index();
inline void set_has_label();
inline void clear_has_label();
::std::string _unknown_fields_;
......@@ -188,6 +202,7 @@ class Task : public ::google::protobuf::MessageLite {
::google::protobuf::int32 task_type_;
int policy_;
::google::protobuf::RepeatedField< ::google::protobuf::int32 > input_ids_;
::std::string* label_;
::google::protobuf::int32 resource_request_index_;
#ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER
friend void protobuf_AddDesc_loomplan_2eproto_impl();
......@@ -701,6 +716,82 @@ inline void Task::set_resource_request_index(::google::protobuf::int32 value) {
// @@protoc_insertion_point(field_set:loomplan.Task.resource_request_index)
}
// optional string label = 12;
inline bool Task::has_label() const {
return (_has_bits_[0] & 0x00000020u) != 0;
}
inline void Task::set_has_label() {
_has_bits_[0] |= 0x00000020u;
}
inline void Task::clear_has_label() {
_has_bits_[0] &= ~0x00000020u;
}
inline void Task::clear_label() {
if (label_ != &::google::protobuf::internal::GetEmptyStringAlreadyInited()) {
label_->clear();
}
clear_has_label();
}
inline const ::std::string& Task::label() const {
// @@protoc_insertion_point(field_get:loomplan.Task.label)
return *label_;
}
inline void Task::set_label(const ::std::string& value) {
set_has_label();
if (label_ == &::google::protobuf::internal::GetEmptyStringAlreadyInited()) {
label_ = new ::std::string;
}
label_->assign(value);
// @@protoc_insertion_point(field_set:loomplan.Task.label)
}
inline void Task::set_label(const char* value) {
set_has_label();
if (label_ == &::google::protobuf::internal::GetEmptyStringAlreadyInited()) {
label_ = new ::std::string;
}
label_->assign(value);
// @@protoc_insertion_point(field_set_char:loomplan.Task.label)
}
inline void Task::set_label(const char* value, size_t size) {
set_has_label();
if (label_ == &::google::protobuf::internal::GetEmptyStringAlreadyInited()) {
label_ = new ::std::string;
}
label_->assign(reinterpret_cast<const char*>(value), size);
// @@protoc_insertion_point(field_set_pointer:loomplan.Task.label)
}
inline ::std::string* Task::mutable_label() {
set_has_label();
if (label_ == &::google::protobuf::internal::GetEmptyStringAlreadyInited()) {
label_ = new ::std::string;
}
// @@protoc_insertion_point(field_mutable:loomplan.Task.label)
return label_;
}
inline ::std::string* Task::release_label() {
clear_has_label();
if (label_ == &::google::protobuf::internal::GetEmptyStringAlreadyInited()) {
return NULL;
} else {
::std::string* temp = label_;
label_ = const_cast< ::std::string*>(&::google::protobuf::internal::GetEmptyStringAlreadyInited());
return temp;
}
}
inline void Task::set_allocated_label(::std::string* label) {
if (label_ != &::google::protobuf::internal::GetEmptyStringAlreadyInited()) {
delete label_;
}
if (label) {
set_has_label();
label_ = label;
} else {
clear_has_label();
label_ = const_cast< ::std::string*>(&::google::protobuf::internal::GetEmptyStringAlreadyInited());
}
// @@protoc_insertion_point(field_set_allocated:loomplan.Task.label)
}
// -------------------------------------------------------------------
// Resource
......
......@@ -14,6 +14,8 @@ message Task {
repeated int32 input_ids = 3;
optional Policy policy = 4 [default = POLICY_STANDARD];
optional int32 resource_request_index = 5 [default = -1];
optional string label = 12;
}
message Resource {
......
......@@ -11,6 +11,25 @@ sys.path.insert(0,
import loomreport_pb2 # noqa
import loomcomm_pb2 # noqa
import gv # noqas
import matplotlib.pyplot as plt # noqa
def generate_colors(count):
def linspace(start, stop, n):
if n == 1:
yield stop
return
h = float(stop - start) / (n - 1)
for i in range(n):
yield start + h * i
return plt.cm.Set2(list(linspace(0, 1, 12)))
def dot_color(color):
r = int(color[0] * 256)
g = int(color[1] * 256)
b = int(color[2] * 256)
return "#" + hex((r << 16) + (g << 8) + b)[2:]
class Report:
......@@ -25,22 +44,42 @@ class Report:
self.symbols = [s.replace("loom", "L")
for s in self.report_msg.symbols]
def collect_labels(self):
labels = set()
tasks = self.report_msg.plan.tasks
symbols = self.symbols
for task in tasks:
if task.label:
labels.add(task.label)
else:
labels.add(symbols[task.task_type])
return sorted(labels)
def create_graph(self):
TASK_START = loomcomm_pb2.Event.TASK_START
graph = gv.Graph()
symbols = self.symbols
colors = ["red", "green", "blue", "yellow", "orange", "pink"]
task_workers = {}
for event in self.report_msg.events:
if event.type == TASK_START:
task_workers[event.id] = event.worker_id
labels = self.collect_labels()
colors = [dot_color(c) for c in generate_colors(len(labels))]
for i, task in enumerate(self.report_msg.plan.tasks):
node = graph.node(i)
node.label = symbols[task.task_type]
if task.label:
label = task.label
else:
label = symbols[task.task_type]
node.label = label
if task_workers:
node.color = colors[task_workers[i]]
node.label += "\nw={}".format(task_workers[i])
node.fillcolor = colors[labels.index(label)]
node.color = colors[labels.index(label)]
for j in task.input_ids:
graph.node(j).add_arc(node)
return graph
......@@ -50,13 +89,8 @@ class Report:
TASK_END = loomcomm_pb2.Event.TASK_END
workers = {}
symbol_ids = set()
tasks = self.report_msg.plan.tasks
for task in tasks:
symbol_ids.add(task.task_type)
symbols = list(symbol_ids)
symbols.sort(key=lambda x: self.symbols[x])
symbols = self.symbols
labels = self.collect_labels()
for event in self.report_msg.events:
lst = workers.get(event.worker_id)
......@@ -82,21 +116,26 @@ class Report:
xmin = []
xmax = []
colors = []
labels = []
y_labels = []
color_list = ["red", "green", "blue", "pink", "orange"]
color_list = generate_colors(len(labels))
tasks = self.report_msg.plan.tasks
index = 0
for w_index, (worker, lst) in enumerate(sorted(workers.items())):
labels.append("Worker {}".format(w_index))
labels.extend([""] * len(lst))
y_labels.append("Worker {}".format(w_index))
y_labels.extend([""] * len(lst))
for lst2 in lst:
for i in xrange(0, len(lst2), 2):
y.append(index)
xmin.append(lst2[i].time)
xmax.append(lst2[i + 1].time)
task = tasks[lst2[i].id]
colors.append(color_list[symbols.index(task.task_type)])
if task.label:
label = task.label
else:
label = symbols[task.task_type]
colors.append(color_list[labels.index(label)])
index += 1
index += 1
......@@ -104,6 +143,6 @@ class Report:
xmin,
xmax,
colors,
labels,
[(self.symbols[symbol_id], color_list[symbols.index(symbol_id)])
for symbol_id in symbols])
y_labels,
[(l, color_list[i])
for i, l in enumerate(labels)])
......@@ -30,12 +30,14 @@ def test_cv_iris(loom_env):
for ts in trainsets:
model = p.task_run("svm-train data",
[(ts, "data")], ["data.model"])
model.label = "svm-train"
models.append(model)
predict = []
for chunk, model in zip(chunks, models):
task = p.task_run("svm-predict testdata model out",
[(chunk, "testdata"), (model, "model")])
task.label = "svm-predict"
predict.append(task)
loom_env.make_dry_report(p.plan, "dry.report")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment