Commit 701cb31a authored by Stanislav Bohm's avatar Stanislav Bohm

RF: Simplified client<->server protocol

parent f424553a
......@@ -27,8 +27,7 @@ class Client(object):
self.server_address = address
self.server_port = port
self.dictionary_symbols = None
self.dictionary_map = None
self.symbols = None
self.array_id = None
self.rawdata_id = None
......@@ -48,9 +47,12 @@ class Client(object):
msg.info = info
self._send_message(msg)
while self.symbols is None:
self._read_symbols()
def submit(self, plan, results):
msg = plan.create_message()
msg = plan.create_message(self.symbols)
if isinstance(results, Task):
single_result = True
......@@ -76,19 +78,25 @@ class Client(object):
self.add_info(cmsg.info)
elif cmsg.type == ClientMessage.ERROR:
self.process_error(cmsg)
elif cmsg.type == ClientMessage.DICTIONARY:
self.dictionary_symbols = cmsg.symbols
self.dictionary_map = {}
for i, s in enumerate(self.dictionary_symbols):
self.dictionary_map[s] = i
self.array_id = self.dictionary_map["loom/array"]
self.rawdata_id = self.dictionary_map["loom/data"]
else:
assert 0
if single_result:
return data[results.id]
else:
return [data[task.id] for task in results]
def _read_symbols(self):
msg = self.connection.receive_message()
cmsg = ClientMessage()
cmsg.ParseFromString(msg)
assert cmsg.type == ClientMessage.DICTIONARY
self.symbols = {}
for i, s in enumerate(cmsg.symbols):
self.symbols[s] = i
self.array_id = self.symbols["loom/array"]
self.rawdata_id = self.symbols["loom/data"]
def process_error(self, cmsg):
assert cmsg.HasField("error")
error = cmsg.error
......@@ -105,6 +113,7 @@ class Client(object):
return self.connection.read_data(msg_data.size)
if type_id == self.array_id:
return [self._receive_data() for i in xrange(msg_data.length)]
print type_id, self.array_id, self.rawdata_id
assert 0
def _send_message(self, message):
......
......@@ -18,7 +18,7 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='loomplan.proto',
package='loomplan',
serialized_pb=_b('\n\x0eloomplan.proto\x12\x08loomplan\"<\n\x04Task\x12\x11\n\ttask_type\x18\x01 \x02(\x05\x12\x0e\n\x06\x63onfig\x18\x02 \x02(\x0c\x12\x11\n\tinput_ids\x18\x03 \x03(\x05\"M\n\x04Plan\x12\x12\n\ntask_types\x18\x01 \x03(\t\x12\x1d\n\x05tasks\x18\x02 \x03(\x0b\x32\x0e.loomplan.Task\x12\x12\n\nresult_ids\x18\x03 \x03(\x05\x42\x02H\x03')
serialized_pb=_b('\n\x0eloomplan.proto\x12\x08loomplan\"<\n\x04Task\x12\x11\n\ttask_type\x18\x01 \x02(\x05\x12\x0e\n\x06\x63onfig\x18\x02 \x02(\x0c\x12\x11\n\tinput_ids\x18\x03 \x03(\x05\"9\n\x04Plan\x12\x1d\n\x05tasks\x18\x02 \x03(\x0b\x32\x0e.loomplan.Task\x12\x12\n\nresult_ids\x18\x03 \x03(\x05\x42\x02H\x03')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
......@@ -77,21 +77,14 @@ _PLAN = _descriptor.Descriptor(
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='task_types', full_name='loomplan.Plan.task_types', index=0,
number=1, type=9, cpp_type=9, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='tasks', full_name='loomplan.Plan.tasks', index=1,
name='tasks', full_name='loomplan.Plan.tasks', index=0,
number=2, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='result_ids', full_name='loomplan.Plan.result_ids', index=2,
name='result_ids', full_name='loomplan.Plan.result_ids', index=1,
number=3, type=5, cpp_type=1, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
......@@ -109,7 +102,7 @@ _PLAN = _descriptor.Descriptor(
oneofs=[
],
serialized_start=90,
serialized_end=167,
serialized_end=147,
)
_PLAN.fields_by_name['tasks'].message_type = _TASK
......
......@@ -20,9 +20,9 @@ class Task(object):
self.inputs = inputs
"""
def set_message(self, msg, task_types):
def set_message(self, msg, symbols):
msg.config = self.config
msg.task_type = task_types.index(self.task_type)
msg.task_type = symbols[self.task_type]
msg.input_ids.extend(t.id for t in self.inputs)
......@@ -127,16 +127,11 @@ class Plan(object):
task.config = self.u64u64.pack(start, end)
return self.add(task)
def create_message(self):
task_types = list(self.task_types)
task_types.sort()
def create_message(self, symbols):
msg = loomplan_pb2.Plan()
msg.task_types.extend(task_types)
for task in self.tasks:
t = msg.tasks.add()
task.set_message(t, task_types)
task.set_message(t, symbols)
return msg
def write_dot(self, filename, info=None):
......
......@@ -331,7 +331,6 @@ void Task::Swap(Task* other) {
// ===================================================================
#ifndef _MSC_VER
const int Plan::kTaskTypesFieldNumber;
const int Plan::kTasksFieldNumber;
const int Plan::kResultIdsFieldNumber;
#endif // !_MSC_VER
......@@ -353,7 +352,6 @@ Plan::Plan(const Plan& from)
}
void Plan::SharedCtor() {
::google::protobuf::internal::GetEmptyString();
_cached_size_ = 0;
::memset(_has_bits_, 0, sizeof(_has_bits_));
}
......@@ -393,7 +391,6 @@ Plan* Plan::New() const {
}
void Plan::Clear() {
task_types_.Clear();
tasks_.Clear();
result_ids_.Clear();
::memset(_has_bits_, 0, sizeof(_has_bits_));
......@@ -414,20 +411,6 @@ bool Plan::MergePartialFromCodedStream(
tag = p.first;
if (!p.second) goto handle_unusual;
switch (::google::protobuf::internal::WireFormatLite::GetTagFieldNumber(tag)) {
// repeated string task_types = 1;
case 1: {
if (tag == 10) {
parse_task_types:
DO_(::google::protobuf::internal::WireFormatLite::ReadString(
input, this->add_task_types()));
} else {
goto handle_unusual;
}
if (input->ExpectTag(10)) goto parse_task_types;
if (input->ExpectTag(18)) goto parse_tasks;
break;
}
// repeated .loomplan.Task tasks = 2;
case 2: {
if (tag == 18) {
......@@ -486,12 +469,6 @@ failure:
void Plan::SerializeWithCachedSizes(
::google::protobuf::io::CodedOutputStream* output) const {
// @@protoc_insertion_point(serialize_start:loomplan.Plan)
// repeated string task_types = 1;
for (int i = 0; i < this->task_types_size(); i++) {
::google::protobuf::internal::WireFormatLite::WriteString(
1, this->task_types(i), output);
}
// repeated .loomplan.Task tasks = 2;
for (int i = 0; i < this->tasks_size(); i++) {
::google::protobuf::internal::WireFormatLite::WriteMessage(
......@@ -512,13 +489,6 @@ void Plan::SerializeWithCachedSizes(
int Plan::ByteSize() const {
int total_size = 0;
// repeated string task_types = 1;
total_size += 1 * this->task_types_size();
for (int i = 0; i < this->task_types_size(); i++) {
total_size += ::google::protobuf::internal::WireFormatLite::StringSize(
this->task_types(i));
}
// repeated .loomplan.Task tasks = 2;
total_size += 1 * this->tasks_size();
for (int i = 0; i < this->tasks_size(); i++) {
......@@ -552,7 +522,6 @@ void Plan::CheckTypeAndMergeFrom(
void Plan::MergeFrom(const Plan& from) {
GOOGLE_CHECK_NE(&from, this);
task_types_.MergeFrom(from.task_types_);
tasks_.MergeFrom(from.tasks_);
result_ids_.MergeFrom(from.result_ids_);
mutable_unknown_fields()->append(from.unknown_fields());
......@@ -572,7 +541,6 @@ bool Plan::IsInitialized() const {
void Plan::Swap(Plan* other) {
if (other != this) {
task_types_.Swap(&other->task_types_);
tasks_.Swap(&other->tasks_);
result_ids_.Swap(&other->result_ids_);
std::swap(_has_bits_[0], other->_has_bits_[0]);
......
......@@ -217,22 +217,6 @@ class Plan : public ::google::protobuf::MessageLite {
// accessors -------------------------------------------------------
// repeated string task_types = 1;
inline int task_types_size() const;
inline void clear_task_types();
static const int kTaskTypesFieldNumber = 1;
inline const ::std::string& task_types(int index) const;
inline ::std::string* mutable_task_types(int index);
inline void set_task_types(int index, const ::std::string& value);
inline void set_task_types(int index, const char* value);
inline void set_task_types(int index, const char* value, size_t size);
inline ::std::string* add_task_types();
inline void add_task_types(const ::std::string& value);
inline void add_task_types(const char* value);
inline void add_task_types(const char* value, size_t size);
inline const ::google::protobuf::RepeatedPtrField< ::std::string>& task_types() const;
inline ::google::protobuf::RepeatedPtrField< ::std::string>* mutable_task_types();
// repeated .loomplan.Task tasks = 2;
inline int tasks_size() const;
inline void clear_tasks();
......@@ -264,7 +248,6 @@ class Plan : public ::google::protobuf::MessageLite {
::google::protobuf::uint32 _has_bits_[1];
mutable int _cached_size_;
::google::protobuf::RepeatedPtrField< ::std::string> task_types_;
::google::protobuf::RepeatedPtrField< ::loomplan::Task > tasks_;
::google::protobuf::RepeatedField< ::google::protobuf::int32 > result_ids_;
#ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER
......@@ -419,60 +402,6 @@ Task::mutable_input_ids() {
// Plan
// repeated string task_types = 1;
inline int Plan::task_types_size() const {
return task_types_.size();
}
inline void Plan::clear_task_types() {
task_types_.Clear();
}
inline const ::std::string& Plan::task_types(int index) const {
// @@protoc_insertion_point(field_get:loomplan.Plan.task_types)
return task_types_.Get(index);
}
inline ::std::string* Plan::mutable_task_types(int index) {
// @@protoc_insertion_point(field_mutable:loomplan.Plan.task_types)
return task_types_.Mutable(index);
}
inline void Plan::set_task_types(int index, const ::std::string& value) {
// @@protoc_insertion_point(field_set:loomplan.Plan.task_types)
task_types_.Mutable(index)->assign(value);
}
inline void Plan::set_task_types(int index, const char* value) {
task_types_.Mutable(index)->assign(value);
// @@protoc_insertion_point(field_set_char:loomplan.Plan.task_types)
}
inline void Plan::set_task_types(int index, const char* value, size_t size) {
task_types_.Mutable(index)->assign(
reinterpret_cast<const char*>(value), size);
// @@protoc_insertion_point(field_set_pointer:loomplan.Plan.task_types)
}
inline ::std::string* Plan::add_task_types() {
return task_types_.Add();
}
inline void Plan::add_task_types(const ::std::string& value) {
task_types_.Add()->assign(value);
// @@protoc_insertion_point(field_add:loomplan.Plan.task_types)
}
inline void Plan::add_task_types(const char* value) {
task_types_.Add()->assign(value);
// @@protoc_insertion_point(field_add_char:loomplan.Plan.task_types)
}
inline void Plan::add_task_types(const char* value, size_t size) {
task_types_.Add()->assign(reinterpret_cast<const char*>(value), size);
// @@protoc_insertion_point(field_add_pointer:loomplan.Plan.task_types)
}
inline const ::google::protobuf::RepeatedPtrField< ::std::string>&
Plan::task_types() const {
// @@protoc_insertion_point(field_list:loomplan.Plan.task_types)
return task_types_;
}
inline ::google::protobuf::RepeatedPtrField< ::std::string>*
Plan::mutable_task_types() {
// @@protoc_insertion_point(field_mutable_list:loomplan.Plan.task_types)
return &task_types_;
}
// repeated .loomplan.Task tasks = 2;
inline int Plan::tasks_size() const {
return tasks_.size();
......
......@@ -9,7 +9,6 @@ message Task {
}
message Plan {
repeated string task_types = 1;
repeated Task tasks = 2;
repeated int32 result_ids = 3;
}
......@@ -19,21 +19,13 @@ TaskManager::TaskManager(Server &server)
void TaskManager::add_plan(const loomplan::Plan &plan, bool distribute)
{
int tt_size = plan.task_types_size();
int type_task_translation[tt_size];
Dictionary &dictionary = server.get_dictionary();
for (int i = 0; i < tt_size; i++) {
type_task_translation[i] = dictionary.find_or_create(plan.task_types(i));
}
auto task_size = plan.tasks_size();
int id_base = server.new_id(task_size);
for (int i = 0; i < task_size; i++) {
const auto& pt = plan.tasks(i);
auto id = i + id_base;
tasks[id] = std::make_unique<TaskNode>(
id, i, type_task_translation[pt.task_type()], pt.config());
id, i, pt.task_type(), pt.config());
}
std::vector<TaskNode*> ready_tasks;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment