Commit c510093c authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: ClientMessage

parent 818fc0f6
from loomcomm_pb2 import Register, Data, DataPrologue
from loomcomm_pb2 import Register, Data, ClientMessage
import socket
from connection import Connection
......@@ -40,8 +40,10 @@ class Client(object):
data = {}
while expected != len(data):
msg = self.connection.receive_message()
prologue = DataPrologue()
prologue.ParseFromString(msg)
cmsg = ClientMessage()
cmsg.ParseFromString(msg)
assert cmsg.type == ClientMessage.DATA
prologue = cmsg.data
data[prologue.id] = self._receive_data()
if single_result:
return data[results.id]
......
......@@ -18,7 +18,7 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='loomcomm.proto',
package='loomcomm',
serialized_pb=_b('\n\x0eloomcomm.proto\x12\x08loomcomm\"\x9f\x01\n\x08Register\x12\x18\n\x10protocol_version\x18\x01 \x02(\x05\x12%\n\x04type\x18\x02 \x02(\x0e\x32\x17.loomcomm.Register.Type\x12\x0c\n\x04port\x18\x03 \x01(\x05\x12\x12\n\ntask_types\x18\x04 \x03(\t\"0\n\x04Type\x12\x13\n\x0fREGISTER_WORKER\x10\x01\x12\x13\n\x0fREGISTER_CLIENT\x10\x02\"&\n\rServerMessage\"\x15\n\x04Type\x12\r\n\tSTART_JOB\x10\x01\"\xc4\x01\n\rWorkerCommand\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.WorkerCommand.Type\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x11\n\ttask_type\x18\x03 \x01(\x05\x12\x13\n\x0btask_config\x18\x04 \x01(\t\x12\x13\n\x0btask_inputs\x18\x05 \x03(\x05\x12\x0f\n\x07\x61\x64\x64ress\x18\n \x01(\t\x12\x11\n\twith_size\x18\x0b \x01(\x08\"\x1a\n\x04Type\x12\x08\n\x04TASK\x10\x01\x12\x08\n\x04SEND\x10\x02\"\x1c\n\x0eWorkerResponse\x12\n\n\x02id\x18\x02 \x01(\x05\"\x18\n\x08\x41nnounce\x12\x0c\n\x04port\x18\x01 \x02(\x05\"-\n\x0c\x44\x61taPrologue\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x11\n\tdata_size\x18\x03 \x01(\x04\"%\n\x04\x44\x61ta\x12\x0f\n\x07type_id\x18\x01 \x02(\x05\x12\x0c\n\x04size\x18\x02 \x01(\x04\x42\x02H\x03')
serialized_pb=_b('\n\x0eloomcomm.proto\x12\x08loomcomm\"\x9f\x01\n\x08Register\x12\x18\n\x10protocol_version\x18\x01 \x02(\x05\x12%\n\x04type\x18\x02 \x02(\x0e\x32\x17.loomcomm.Register.Type\x12\x0c\n\x04port\x18\x03 \x01(\x05\x12\x12\n\ntask_types\x18\x04 \x03(\t\"0\n\x04Type\x12\x13\n\x0fREGISTER_WORKER\x10\x01\x12\x13\n\x0fREGISTER_CLIENT\x10\x02\"&\n\rServerMessage\"\x15\n\x04Type\x12\r\n\tSTART_JOB\x10\x01\"\xc4\x01\n\rWorkerCommand\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.WorkerCommand.Type\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x11\n\ttask_type\x18\x03 \x01(\x05\x12\x13\n\x0btask_config\x18\x04 \x01(\t\x12\x13\n\x0btask_inputs\x18\x05 \x03(\x05\x12\x0f\n\x07\x61\x64\x64ress\x18\n \x01(\t\x12\x11\n\twith_size\x18\x0b \x01(\x08\"\x1a\n\x04Type\x12\x08\n\x04TASK\x10\x01\x12\x08\n\x04SEND\x10\x02\"\x1c\n\x0eWorkerResponse\x12\n\n\x02id\x18\x02 \x01(\x05\"\x18\n\x08\x41nnounce\x12\x0c\n\x04port\x18\x01 \x02(\x05\"-\n\x0c\x44\x61taPrologue\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x11\n\tdata_size\x18\x03 \x01(\x04\"%\n\x04\x44\x61ta\x12\x0f\n\x07type_id\x18\x01 \x02(\x05\x12\x0c\n\x04size\x18\x02 \x01(\x04\"&\n\x08\x46\x65\x65\x64\x62\x61\x63k\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x0e\n\x06worker\x18\x02 \x02(\t\"\xa7\x01\n\rClientMessage\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.ClientMessage.Type\x12$\n\x04\x64\x61ta\x18\x02 \x01(\x0b\x32\x16.loomcomm.DataPrologue\x12$\n\x08\x66\x65\x65\x64\x62\x61\x63k\x18\x03 \x01(\x0b\x32\x12.loomcomm.Feedback\"\x1e\n\x04Type\x12\x08\n\x04\x44\x41TA\x10\x01\x12\x0c\n\x08\x46\x45\x45\x44\x42\x41\x43K\x10\x02\x42\x02H\x03')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
......@@ -86,6 +86,28 @@ _WORKERCOMMAND_TYPE = _descriptor.EnumDescriptor(
)
_sym_db.RegisterEnumDescriptor(_WORKERCOMMAND_TYPE)
_CLIENTMESSAGE_TYPE = _descriptor.EnumDescriptor(
name='Type',
full_name='loomcomm.ClientMessage.Type',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='DATA', index=0, number=1,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='FEEDBACK', index=1, number=2,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=749,
serialized_end=779,
)
_sym_db.RegisterEnumDescriptor(_CLIENTMESSAGE_TYPE)
_REGISTER = _descriptor.Descriptor(
name='Register',
......@@ -369,11 +391,97 @@ _DATA = _descriptor.Descriptor(
serialized_end=569,
)
_FEEDBACK = _descriptor.Descriptor(
name='Feedback',
full_name='loomcomm.Feedback',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='id', full_name='loomcomm.Feedback.id', index=0,
number=1, type=5, cpp_type=1, label=2,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='worker', full_name='loomcomm.Feedback.worker', index=1,
number=2, type=9, cpp_type=9, label=2,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
extension_ranges=[],
oneofs=[
],
serialized_start=571,
serialized_end=609,
)
_CLIENTMESSAGE = _descriptor.Descriptor(
name='ClientMessage',
full_name='loomcomm.ClientMessage',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='type', full_name='loomcomm.ClientMessage.type', index=0,
number=1, type=14, cpp_type=8, label=2,
has_default_value=False, default_value=1,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='data', full_name='loomcomm.ClientMessage.data', index=1,
number=2, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='feedback', full_name='loomcomm.ClientMessage.feedback', index=2,
number=3, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
_CLIENTMESSAGE_TYPE,
],
options=None,
is_extendable=False,
extension_ranges=[],
oneofs=[
],
serialized_start=612,
serialized_end=779,
)
_REGISTER.fields_by_name['type'].enum_type = _REGISTER_TYPE
_REGISTER_TYPE.containing_type = _REGISTER
_SERVERMESSAGE_TYPE.containing_type = _SERVERMESSAGE
_WORKERCOMMAND.fields_by_name['type'].enum_type = _WORKERCOMMAND_TYPE
_WORKERCOMMAND_TYPE.containing_type = _WORKERCOMMAND
_CLIENTMESSAGE.fields_by_name['type'].enum_type = _CLIENTMESSAGE_TYPE
_CLIENTMESSAGE.fields_by_name['data'].message_type = _DATAPROLOGUE
_CLIENTMESSAGE.fields_by_name['feedback'].message_type = _FEEDBACK
_CLIENTMESSAGE_TYPE.containing_type = _CLIENTMESSAGE
DESCRIPTOR.message_types_by_name['Register'] = _REGISTER
DESCRIPTOR.message_types_by_name['ServerMessage'] = _SERVERMESSAGE
DESCRIPTOR.message_types_by_name['WorkerCommand'] = _WORKERCOMMAND
......@@ -381,6 +489,8 @@ DESCRIPTOR.message_types_by_name['WorkerResponse'] = _WORKERRESPONSE
DESCRIPTOR.message_types_by_name['Announce'] = _ANNOUNCE
DESCRIPTOR.message_types_by_name['DataPrologue'] = _DATAPROLOGUE
DESCRIPTOR.message_types_by_name['Data'] = _DATA
DESCRIPTOR.message_types_by_name['Feedback'] = _FEEDBACK
DESCRIPTOR.message_types_by_name['ClientMessage'] = _CLIENTMESSAGE
Register = _reflection.GeneratedProtocolMessageType('Register', (_message.Message,), dict(
DESCRIPTOR = _REGISTER,
......@@ -431,6 +541,20 @@ Data = _reflection.GeneratedProtocolMessageType('Data', (_message.Message,), dic
))
_sym_db.RegisterMessage(Data)
Feedback = _reflection.GeneratedProtocolMessageType('Feedback', (_message.Message,), dict(
DESCRIPTOR = _FEEDBACK,
__module__ = 'loomcomm_pb2'
# @@protoc_insertion_point(class_scope:loomcomm.Feedback)
))
_sym_db.RegisterMessage(Feedback)
ClientMessage = _reflection.GeneratedProtocolMessageType('ClientMessage', (_message.Message,), dict(
DESCRIPTOR = _CLIENTMESSAGE,
__module__ = 'loomcomm_pb2'
# @@protoc_insertion_point(class_scope:loomcomm.ClientMessage)
))
_sym_db.RegisterMessage(ClientMessage)
DESCRIPTOR.has_options = True
DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('H\003'))
......
This diff is collapsed.
This diff is collapsed.
......@@ -56,3 +56,18 @@ message Data
required int32 type_id = 1;
optional uint64 size = 2;
}
message Feedback {
required int32 id = 1;
required string worker = 2;
}
message ClientMessage {
enum Type {
DATA = 1;
FEEDBACK = 2;
}
required Type type = 1;
optional DataPrologue data = 2;
optional Feedback feedback = 3;
}
......@@ -76,7 +76,11 @@ void DWConnection::on_message(const char *buffer, size_t size)
loomcomm::DataPrologue msg;
msg.ParseFromArray(buffer, size);
send_buffer->add(msg);
loomcomm::ClientMessage cmsg;
cmsg.set_type(loomcomm::ClientMessage_Type_DATA);
*cmsg.mutable_data() = msg;
send_buffer->add(cmsg);
assert(msg.has_data_size());
size_t data_size = msg.data_size();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment