Commit a3b5327b authored by Stanislav Bohm's avatar Stanislav Bohm

Initial commit

parents
_build
*~
*.autosave
*.a
*.core
*.moc
*.o
*.obj
*.orig
*.rej
*.so
*.so.*
*_pch.h.cpp
*_resource.rc
# qtcreator generated files
*.pro.user*
# xemacs temporary files
*.flc
# Vim temporary files
.*.swp
# Python byte code
*.pyc
tests/client/build
project(loom-rt)
cmake_minimum_required(VERSION 2.8)
enable_testing()
add_subdirectory(src)
add_subdirectory(tests)
add_custom_target(check COMMAND ${CMAKE_CTEST_COMMAND} --verbose)
add_subdirectory(libloom)
add_subdirectory(worker)
add_subdirectory(server)
from .client import Client # noqa
from .plan import Plan # noqa
from loomcomm_pb2 import Register, Data
import socket
from connection import Connection
from plan import Task
LOOM_PROTOCOL_VERSION = 1
class Client(object):
def __init__(self, address, port):
self.server_address = address
self.server_port = port
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((address, port))
self.connection = Connection(s)
msg = Register()
msg.type = Register.REGISTER_CLIENT
msg.protocol_version = LOOM_PROTOCOL_VERSION
self._send_message(msg)
def submit(self, plan, results):
msg = plan.create_message()
if isinstance(results, Task):
single_result = True
msg.result_ids.extend((results.id,))
expected = 1
else:
single_result = False
r = set(results)
msg.result_ids.extend(r.id for r in r)
expected = len(r)
self._send_message(msg)
data = {}
while expected != len(data):
msg = self.connection.receive_message()
msg_data = Data()
msg_data.ParseFromString(msg)
data[msg_data.id] = self.connection.read_data(msg_data.size)
if single_result:
return data[results.id]
else:
return [data[task.id] for task in results]
def _send_message(self, message):
data = message.SerializeToString()
self.connection.send_message(data)
import struct
u32 = struct.Struct("<I")
class Connection(object):
def __init__(self, socket):
self.socket = socket
self.data = ""
def receive_message(self):
while True:
size = len(self.data)
if size > 4:
msg_size = u32.unpack(self.data[:4])[0]
msg_size += 4
if size >= msg_size:
message = self.data[4:msg_size]
self.data = self.data[msg_size:]
return message
new_data = self.socket.recv(65536)
if not new_data:
raise Exception("Connection to server lost")
self.data += new_data
def read_data(self, data_size):
result = ""
while True:
change = min(data_size, len(self.data))
result += self.data[:change]
self.data = self.data[change:]
data_size -= change
if data_size == 0:
return result
self.data = self.socket.recv(65536)
def send_message(self, data):
data = u32.pack(len(data)) + data
self.socket.sendall(data)
This diff is collapsed.
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: loomplan.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='loomplan.proto',
package='loomplan',
serialized_pb=_b('\n\x0eloomplan.proto\x12\x08loomplan\"<\n\x04Task\x12\x11\n\ttask_type\x18\x01 \x02(\x05\x12\x0e\n\x06\x63onfig\x18\x02 \x02(\x0c\x12\x11\n\tinput_ids\x18\x03 \x03(\x05\"M\n\x04Plan\x12\x12\n\ntask_types\x18\x01 \x03(\t\x12\x1d\n\x05tasks\x18\x02 \x03(\x0b\x32\x0e.loomplan.Task\x12\x12\n\nresult_ids\x18\x03 \x03(\x05\x42\x02H\x03')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
_TASK = _descriptor.Descriptor(
name='Task',
full_name='loomplan.Task',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='task_type', full_name='loomplan.Task.task_type', index=0,
number=1, type=5, cpp_type=1, label=2,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='config', full_name='loomplan.Task.config', index=1,
number=2, type=12, cpp_type=9, label=2,
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='input_ids', full_name='loomplan.Task.input_ids', index=2,
number=3, type=5, cpp_type=1, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
extension_ranges=[],
oneofs=[
],
serialized_start=28,
serialized_end=88,
)
_PLAN = _descriptor.Descriptor(
name='Plan',
full_name='loomplan.Plan',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='task_types', full_name='loomplan.Plan.task_types', index=0,
number=1, type=9, cpp_type=9, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='tasks', full_name='loomplan.Plan.tasks', index=1,
number=2, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='result_ids', full_name='loomplan.Plan.result_ids', index=2,
number=3, type=5, cpp_type=1, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
extension_ranges=[],
oneofs=[
],
serialized_start=90,
serialized_end=167,
)
_PLAN.fields_by_name['tasks'].message_type = _TASK
DESCRIPTOR.message_types_by_name['Task'] = _TASK
DESCRIPTOR.message_types_by_name['Plan'] = _PLAN
Task = _reflection.GeneratedProtocolMessageType('Task', (_message.Message,), dict(
DESCRIPTOR = _TASK,
__module__ = 'loomplan_pb2'
# @@protoc_insertion_point(class_scope:loomplan.Task)
))
_sym_db.RegisterMessage(Task)
Plan = _reflection.GeneratedProtocolMessageType('Plan', (_message.Message,), dict(
DESCRIPTOR = _PLAN,
__module__ = 'loomplan_pb2'
# @@protoc_insertion_point(class_scope:loomplan.Plan)
))
_sym_db.RegisterMessage(Plan)
DESCRIPTOR.has_options = True
DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('H\003'))
# @@protoc_insertion_point(module_scope)
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: loomrun.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='loomrun.proto',
package='loomrun',
serialized_pb=_b('\n\rloomrun.proto\x12\x07loomrun\"X\n\x07MapFile\x12\x10\n\x08\x66ilename\x18\x01 \x02(\t\x12\x13\n\x0binput_index\x18\x02 \x02(\x05\x12\x14\n\x0coutput_index\x18\x03 \x02(\x05\x12\x10\n\x08variable\x18\x04 \x01(\t\"3\n\x03Run\x12\x0c\n\x04\x61rgs\x18\x01 \x03(\t\x12\x1e\n\x04maps\x18\x02 \x03(\x0b\x32\x10.loomrun.MapFileB\x02H\x03')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
_MAPFILE = _descriptor.Descriptor(
name='MapFile',
full_name='loomrun.MapFile',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='filename', full_name='loomrun.MapFile.filename', index=0,
number=1, type=9, cpp_type=9, label=2,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='input_index', full_name='loomrun.MapFile.input_index', index=1,
number=2, type=5, cpp_type=1, label=2,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='output_index', full_name='loomrun.MapFile.output_index', index=2,
number=3, type=5, cpp_type=1, label=2,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='variable', full_name='loomrun.MapFile.variable', index=3,
number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
extension_ranges=[],
oneofs=[
],
serialized_start=26,
serialized_end=114,
)
_RUN = _descriptor.Descriptor(
name='Run',
full_name='loomrun.Run',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='args', full_name='loomrun.Run.args', index=0,
number=1, type=9, cpp_type=9, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='maps', full_name='loomrun.Run.maps', index=1,
number=2, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
extension_ranges=[],
oneofs=[
],
serialized_start=116,
serialized_end=167,
)
_RUN.fields_by_name['maps'].message_type = _MAPFILE
DESCRIPTOR.message_types_by_name['MapFile'] = _MAPFILE
DESCRIPTOR.message_types_by_name['Run'] = _RUN
MapFile = _reflection.GeneratedProtocolMessageType('MapFile', (_message.Message,), dict(
DESCRIPTOR = _MAPFILE,
__module__ = 'loomrun_pb2'
# @@protoc_insertion_point(class_scope:loomrun.MapFile)
))
_sym_db.RegisterMessage(MapFile)
Run = _reflection.GeneratedProtocolMessageType('Run', (_message.Message,), dict(
DESCRIPTOR = _RUN,
__module__ = 'loomrun_pb2'
# @@protoc_insertion_point(class_scope:loomrun.Run)
))
_sym_db.RegisterMessage(Run)
DESCRIPTOR.has_options = True
DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('H\003'))
# @@protoc_insertion_point(module_scope)
import loomplan_pb2
import loomrun_pb2
class Task(object):
inputs = ()
id = None
config = ""
"""
def __init__(self, config, inputs=()):
self.id = None
self.task_type = task_type
self.config = config
self.inputs = inputs
"""
def set_message(self, msg, task_types):
msg.config = self.config
msg.task_type = task_types.index(self.task_type)
msg.input_ids.extend(t.id for t in self.inputs)
class ConstTask(Task):
task_type = "const"
def __init__(self, data):
self.config = data
class MergeTask(Task):
task_type = "merge"
def __init__(self, inputs):
self.inputs = inputs
class RunTask(Task):
task_type = "run"
def __init__(self, args, stdin=None, stdout=None, variable=None):
if isinstance(args, str):
args = args.split()
else:
args = map(str, args)
self.args = args
self.variable = variable
self.stdout = None
self.stdin = stdin
self.stdout = stdout
self.file_ins = []
self.file_outs = []
def map_file_in(self, task, filename):
self.file_ins.append((task, filename))
def map_file_out(self, filename):
self.file_outs.append(filename)
@property
def inputs(self):
if self.stdin is not None:
inputs = [self.stdin]
else:
inputs = []
for task, filename in self.file_ins:
if task not in inputs:
inputs.append(task)
return inputs
@property
def config(self):
msg = loomrun_pb2.Run()
msg.args.extend(self.args)
# stdin
if self.stdin is not None:
mf = msg.maps.add()
mf.filename = "+in"
mf.input_index = 0
mf.output_index = -2
output_mapped = False
for filename in self.file_outs:
mf = msg.maps.add()
mf.filename = filename
mf.input_index = -2
mf.output_index = -1
output_mapped = True
if not output_mapped:
# stdout
mf = msg.maps.add()
mf.filename = "+out"
mf.input_index = -2
mf.output_index = -1
inputs = self.inputs
for task, filename in self.file_ins:
mf = msg.maps.add()
mf.filename = filename
mf.input_index = inputs.index(task)
mf.output_index = -2
return msg.SerializeToString()
class Plan(object):
def __init__(self):
self.tasks = []
self.task_types = set()
def add(self, task):
assert task.id is None
task.id = len(self.tasks)
self.tasks.append(task)
self.task_types.add(task.task_type)
return task
def task_const(self, data):
return self.add(ConstTask(data))
def task_merge(self, inputs):
return self.add(MergeTask(inputs))
def task_run(self, args, stdin=None, stdout=None, variable=None):
return self.add(
RunTask(args, stdin=stdin, stdout=stdout, variable=variable))
def create_message(self):
task_types = list(self.task_types)
task_types.sort()
msg = loomplan_pb2.Plan()
msg.task_types.extend(task_types)
for task in self.tasks:
t = msg.tasks.add()
task.set_message(t, task_types)
return msg
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14 -g -Wall")
add_library(libloom
connection.cpp
connection.h
worker.cpp
worker.h
taskinstance.cpp
taskinstance.h
taskfactory.h
databuilder.cpp
databuilder.h
data.cpp
data.h
interconnect.h
interconnect.cpp
task.cpp
task.h
fileservice.cpp
fileservice.h
loomcomm.pb.h
loomcomm.pb.cc
loomplan.pb.h
loomplan.pb.cc
log.h
log.cpp
types.h
utils.h
utils.cpp)
target_include_directories(libloom PUBLIC ${PROJECT_SOURCE_DIR}/src)
#include "connection.h"
#include "utils.h"
#include "log.h" // DEBUG
using namespace loom;
struct SendBuffer {
uv_write_t write_req;
std::unique_ptr<char[]> data;
};
Connection::Connection(ConnectionCallback *callback, uv_loop_t *loop)
: callback(callback),
state(ConnectionNew),
data_size(0),
data_ptr(nullptr),
remaining_raw_data(0)
{
uv_tcp_init(loop, &socket);
uv_tcp_nodelay(&socket, 1);
socket.data = this;
}
Connection::~Connection()
{
assert(state == ConnectionClosed);
}
std::string Connection::get_peername()
{
sockaddr_in addr;
int len = sizeof(addr);
UV_CHECK(uv_tcp_getpeername(&socket, (struct sockaddr*) &addr, &len));
char tmp[60];
UV_CHECK(uv_ip4_name(&addr, tmp, 60));
return tmp;
}
void Connection::close()
{
if (state != ConnectionClosed && state != ConnectionClosing) {
state = ConnectionClosing;
uv_close((uv_handle_t*) &socket, _on_close);
}
}
void Connection::close_and_discard_remaining_data()
{
received_buffer.reset();
data_size = 0;
uv_read_stop((uv_stream_t*) &socket);
close();
}
void Connection::accept(uv_tcp_t *listen_socket)
{
UV_CHECK(uv_accept((uv_stream_t*) listen_socket, (uv_stream_t*) &socket));
uv_read_start((uv_stream_t *)&socket, _buf_alloc, _on_read);
}
void Connection::start_read()
{
uv_read_start((uv_stream_t *)&socket, _buf_alloc, _on_read);
}
void Connection::set_raw_read(size_t size)
{
assert(remaining_raw_data == 0);
if (size == 0) {
callback->on_data_finish();
return;
}
remaining_raw_data = size;
if (data_size == 0) {
return;
}
if (data_size <= size) {
callback->on_data_chunk(data_ptr, data_size);
remaining_raw_data -= data_size;
data_size = 0;
received_buffer.reset();
if (remaining_raw_data == 0) {
callback->on_data_finish();