Commit f7a48e01 authored by Stanislav Bohm's avatar Stanislav Bohm
Browse files

ENH: new mapping files for "run" task

parent 7caba4cf
......@@ -18,64 +18,13 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='loomrun.proto',
package='loomrun',
serialized_pb=_b('\n\rloomrun.proto\x12\x07loomrun\"X\n\x07MapFile\x12\x10\n\x08\x66ilename\x18\x01 \x02(\t\x12\x13\n\x0binput_index\x18\x02 \x02(\x05\x12\x14\n\x0coutput_index\x18\x03 \x02(\x05\x12\x10\n\x08variable\x18\x04 \x01(\t\"3\n\x03Run\x12\x0c\n\x04\x61rgs\x18\x01 \x03(\t\x12\x1e\n\x04maps\x18\x02 \x03(\x0b\x32\x10.loomrun.MapFileB\x02H\x03')
serialized_pb=_b('\n\rloomrun.proto\x12\x07loomrun\"<\n\x03Run\x12\x0c\n\x04\x61rgs\x18\x01 \x03(\t\x12\x12\n\nmap_inputs\x18\x02 \x03(\t\x12\x13\n\x0bmap_outputs\x18\x03 \x03(\tB\x02H\x03')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
_MAPFILE = _descriptor.Descriptor(
name='MapFile',
full_name='loomrun.MapFile',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='filename', full_name='loomrun.MapFile.filename', index=0,
number=1, type=9, cpp_type=9, label=2,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='input_index', full_name='loomrun.MapFile.input_index', index=1,
number=2, type=5, cpp_type=1, label=2,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='output_index', full_name='loomrun.MapFile.output_index', index=2,
number=3, type=5, cpp_type=1, label=2,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='variable', full_name='loomrun.MapFile.variable', index=3,
number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
extension_ranges=[],
oneofs=[
],
serialized_start=26,
serialized_end=114,
)
_RUN = _descriptor.Descriptor(
name='Run',
full_name='loomrun.Run',
......@@ -91,8 +40,15 @@ _RUN = _descriptor.Descriptor(
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='maps', full_name='loomrun.Run.maps', index=1,
number=2, type=11, cpp_type=10, label=3,
name='map_inputs', full_name='loomrun.Run.map_inputs', index=1,
number=2, type=9, cpp_type=9, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='map_outputs', full_name='loomrun.Run.map_outputs', index=2,
number=3, type=9, cpp_type=9, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
......@@ -108,21 +64,12 @@ _RUN = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=116,
serialized_end=167,
serialized_start=26,
serialized_end=86,
)
_RUN.fields_by_name['maps'].message_type = _MAPFILE
DESCRIPTOR.message_types_by_name['MapFile'] = _MAPFILE
DESCRIPTOR.message_types_by_name['Run'] = _RUN
MapFile = _reflection.GeneratedProtocolMessageType('MapFile', (_message.Message,), dict(
DESCRIPTOR = _MAPFILE,
__module__ = 'loomrun_pb2'
# @@protoc_insertion_point(class_scope:loomrun.MapFile)
))
_sym_db.RegisterMessage(MapFile)
Run = _reflection.GeneratedProtocolMessageType('Run', (_message.Message,), dict(
DESCRIPTOR = _RUN,
__module__ = 'loomrun_pb2'
......
......@@ -139,6 +139,8 @@ class Plan(object):
TASK_ARRAY_MAKE = "array/make"
TASK_ARRAY_GET = "array/get"
TASK_RUN = "run"
u64 = struct.Struct("<Q")
def __init__(self):
......@@ -164,9 +166,23 @@ class Plan(object):
def task_split_lines(self, input, start, end):
return self.add(SplitLinesTask(input, start, end))
def task_run(self, args, stdin=None, stdout=None, variable=None):
return self.add(
RunTask(args, stdin=stdin, stdout=stdout, variable=variable))
def task_run(self, args, inputs=(), outputs=(None,)):
if isinstance(args, str):
args = args.split()
task = Task()
task.task_type = self.TASK_RUN
task.inputs = tuple(i for i, fname in inputs)
msg = loomrun_pb2.Run()
msg.args.extend(args)
msg.map_inputs.extend(fname if fname else "+in"
for i, fname in inputs)
msg.map_outputs.extend(fname if fname else "+out"
for fname in outputs)
task.config = msg.SerializeToString()
return self.add(task)
def task_array_make(self, inputs):
task = Task()
......
......@@ -4,6 +4,7 @@
#include "log.h"
#include "types.h"
#include "data/rawdata.h"
#include "data/array.h"
#include <stdlib.h>
#include <sstream>
......@@ -67,6 +68,10 @@ Worker::Worker(uv_loop_t *loop,
add_unpacker(RawData::TYPE_ID,
std::make_unique<SimpleUnpackFactory<RawDataUnpacker>>());
add_unpacker(Array::TYPE_ID,
std::make_unique<SimpleUnpackFactory<ArrayUnpacker>>());
resource_cpus = 1;
}
......
......@@ -2,14 +2,8 @@ package loomrun;
option optimize_for = LITE_RUNTIME;
message MapFile {
required string filename = 1;
required int32 input_index = 2; // -2 = no mapping
required int32 output_index = 3; // -2 = no mapping, -1 = task_id
optional string variable = 4;
}
message Run {
repeated string args = 1;
repeated MapFile maps = 2;
repeated string map_inputs = 2;
repeated string map_outputs = 3;
}
......@@ -175,7 +175,6 @@ void TaskManager::distribute_work(TaskNode::Vector &tasks)
input->add_owner(&load.connection);
}
}
llog->alert("X = {}", task->get_id());
load.connection.send_task(task);
}
}
......
......@@ -32,8 +32,8 @@ void MergeTask::start(DataVector &inputs) {
for (auto& data : inputs) {
char *mem = (*data)->get_raw_data(worker);
assert(mem);
size_t size = (*data)->get_size();
assert(mem || size == 0);
memcpy(dst, mem, size);
dst += size;
}
......
......@@ -16,7 +16,6 @@
namespace loomrun {
void protobuf_ShutdownFile_loomrun_2eproto() {
delete MapFile::default_instance_;
delete Run::default_instance_;
}
......@@ -32,9 +31,7 @@ void protobuf_AddDesc_loomrun_2eproto() {
GOOGLE_PROTOBUF_VERIFY_VERSION;
#endif
MapFile::default_instance_ = new MapFile();
Run::default_instance_ = new Run();
MapFile::default_instance_->InitAsDefaultInstance();
Run::default_instance_->InitAsDefaultInstance();
::google::protobuf::internal::OnShutdown(&protobuf_ShutdownFile_loomrun_2eproto);
}
......@@ -54,333 +51,12 @@ struct StaticDescriptorInitializer_loomrun_2eproto {
} static_descriptor_initializer_loomrun_2eproto_;
#endif
// ===================================================================
#ifndef _MSC_VER
const int MapFile::kFilenameFieldNumber;
const int MapFile::kInputIndexFieldNumber;
const int MapFile::kOutputIndexFieldNumber;
const int MapFile::kVariableFieldNumber;
#endif // !_MSC_VER
MapFile::MapFile()
: ::google::protobuf::MessageLite() {
SharedCtor();
// @@protoc_insertion_point(constructor:loomrun.MapFile)
}
void MapFile::InitAsDefaultInstance() {
}
MapFile::MapFile(const MapFile& from)
: ::google::protobuf::MessageLite() {
SharedCtor();
MergeFrom(from);
// @@protoc_insertion_point(copy_constructor:loomrun.MapFile)
}
void MapFile::SharedCtor() {
::google::protobuf::internal::GetEmptyString();
_cached_size_ = 0;
filename_ = const_cast< ::std::string*>(&::google::protobuf::internal::GetEmptyStringAlreadyInited());
input_index_ = 0;
output_index_ = 0;
variable_ = const_cast< ::std::string*>(&::google::protobuf::internal::GetEmptyStringAlreadyInited());
::memset(_has_bits_, 0, sizeof(_has_bits_));
}
MapFile::~MapFile() {
// @@protoc_insertion_point(destructor:loomrun.MapFile)
SharedDtor();
}
void MapFile::SharedDtor() {
if (filename_ != &::google::protobuf::internal::GetEmptyStringAlreadyInited()) {
delete filename_;
}
if (variable_ != &::google::protobuf::internal::GetEmptyStringAlreadyInited()) {
delete variable_;
}
#ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER
if (this != &default_instance()) {
#else
if (this != default_instance_) {
#endif
}
}
void MapFile::SetCachedSize(int size) const {
GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN();
_cached_size_ = size;
GOOGLE_SAFE_CONCURRENT_WRITES_END();
}
const MapFile& MapFile::default_instance() {
#ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER
protobuf_AddDesc_loomrun_2eproto();
#else
if (default_instance_ == NULL) protobuf_AddDesc_loomrun_2eproto();
#endif
return *default_instance_;
}
MapFile* MapFile::default_instance_ = NULL;
MapFile* MapFile::New() const {
return new MapFile;
}
void MapFile::Clear() {
#define OFFSET_OF_FIELD_(f) (reinterpret_cast<char*>( \
&reinterpret_cast<MapFile*>(16)->f) - \
reinterpret_cast<char*>(16))
#define ZR_(first, last) do { \
size_t f = OFFSET_OF_FIELD_(first); \
size_t n = OFFSET_OF_FIELD_(last) - f + sizeof(last); \
::memset(&first, 0, n); \
} while (0)
if (_has_bits_[0 / 32] & 15) {
ZR_(input_index_, output_index_);
if (has_filename()) {
if (filename_ != &::google::protobuf::internal::GetEmptyStringAlreadyInited()) {
filename_->clear();
}
}
if (has_variable()) {
if (variable_ != &::google::protobuf::internal::GetEmptyStringAlreadyInited()) {
variable_->clear();
}
}
}
#undef OFFSET_OF_FIELD_
#undef ZR_
::memset(_has_bits_, 0, sizeof(_has_bits_));
mutable_unknown_fields()->clear();
}
bool MapFile::MergePartialFromCodedStream(
::google::protobuf::io::CodedInputStream* input) {
#define DO_(EXPRESSION) if (!(EXPRESSION)) goto failure
::google::protobuf::uint32 tag;
::google::protobuf::io::StringOutputStream unknown_fields_string(
mutable_unknown_fields());
::google::protobuf::io::CodedOutputStream unknown_fields_stream(
&unknown_fields_string);
// @@protoc_insertion_point(parse_start:loomrun.MapFile)
for (;;) {
::std::pair< ::google::protobuf::uint32, bool> p = input->ReadTagWithCutoff(127);
tag = p.first;
if (!p.second) goto handle_unusual;
switch (::google::protobuf::internal::WireFormatLite::GetTagFieldNumber(tag)) {
// required string filename = 1;
case 1: {
if (tag == 10) {
DO_(::google::protobuf::internal::WireFormatLite::ReadString(
input, this->mutable_filename()));
} else {
goto handle_unusual;
}
if (input->ExpectTag(16)) goto parse_input_index;
break;
}
// required int32 input_index = 2;
case 2: {
if (tag == 16) {
parse_input_index:
DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive<
::google::protobuf::int32, ::google::protobuf::internal::WireFormatLite::TYPE_INT32>(
input, &input_index_)));
set_has_input_index();
} else {
goto handle_unusual;
}
if (input->ExpectTag(24)) goto parse_output_index;
break;
}
// required int32 output_index = 3;
case 3: {
if (tag == 24) {
parse_output_index:
DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive<
::google::protobuf::int32, ::google::protobuf::internal::WireFormatLite::TYPE_INT32>(
input, &output_index_)));
set_has_output_index();
} else {
goto handle_unusual;
}
if (input->ExpectTag(34)) goto parse_variable;
break;
}
// optional string variable = 4;
case 4: {
if (tag == 34) {
parse_variable:
DO_(::google::protobuf::internal::WireFormatLite::ReadString(
input, this->mutable_variable()));
} else {
goto handle_unusual;
}
if (input->ExpectAtEnd()) goto success;
break;
}
default: {
handle_unusual:
if (tag == 0 ||
::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
::google::protobuf::internal::WireFormatLite::WIRETYPE_END_GROUP) {
goto success;
}
DO_(::google::protobuf::internal::WireFormatLite::SkipField(
input, tag, &unknown_fields_stream));
break;
}
}
}
success:
// @@protoc_insertion_point(parse_success:loomrun.MapFile)
return true;
failure:
// @@protoc_insertion_point(parse_failure:loomrun.MapFile)
return false;
#undef DO_
}
void MapFile::SerializeWithCachedSizes(
::google::protobuf::io::CodedOutputStream* output) const {
// @@protoc_insertion_point(serialize_start:loomrun.MapFile)
// required string filename = 1;
if (has_filename()) {
::google::protobuf::internal::WireFormatLite::WriteStringMaybeAliased(
1, this->filename(), output);
}
// required int32 input_index = 2;
if (has_input_index()) {
::google::protobuf::internal::WireFormatLite::WriteInt32(2, this->input_index(), output);
}
// required int32 output_index = 3;
if (has_output_index()) {
::google::protobuf::internal::WireFormatLite::WriteInt32(3, this->output_index(), output);
}
// optional string variable = 4;
if (has_variable()) {
::google::protobuf::internal::WireFormatLite::WriteStringMaybeAliased(
4, this->variable(), output);
}
output->WriteRaw(unknown_fields().data(),
unknown_fields().size());
// @@protoc_insertion_point(serialize_end:loomrun.MapFile)
}
int MapFile::ByteSize() const {
int total_size = 0;
if (_has_bits_[0 / 32] & (0xffu << (0 % 32))) {
// required string filename = 1;
if (has_filename()) {
total_size += 1 +
::google::protobuf::internal::WireFormatLite::StringSize(
this->filename());
}
// required int32 input_index = 2;
if (has_input_index()) {
total_size += 1 +
::google::protobuf::internal::WireFormatLite::Int32Size(
this->input_index());
}
// required int32 output_index = 3;
if (has_output_index()) {
total_size += 1 +
::google::protobuf::internal::WireFormatLite::Int32Size(
this->output_index());
}
// optional string variable = 4;
if (has_variable()) {
total_size += 1 +
::google::protobuf::internal::WireFormatLite::StringSize(
this->variable());
}
}
total_size += unknown_fields().size();
GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN();
_cached_size_ = total_size;
GOOGLE_SAFE_CONCURRENT_WRITES_END();
return total_size;
}
void MapFile::CheckTypeAndMergeFrom(
const ::google::protobuf::MessageLite& from) {
MergeFrom(*::google::protobuf::down_cast<const MapFile*>(&from));
}
void MapFile::MergeFrom(const MapFile& from) {
GOOGLE_CHECK_NE(&from, this);
if (from._has_bits_[0 / 32] & (0xffu << (0 % 32))) {
if (from.has_filename()) {
set_filename(from.filename());
}
if (from.has_input_index()) {
set_input_index(from.input_index());
}
if (from.has_output_index()) {
set_output_index(from.output_index());
}
if (from.has_variable()) {
set_variable(from.variable());
}
}
mutable_unknown_fields()->append(from.unknown_fields());
}
void MapFile::CopyFrom(const MapFile& from) {
if (&from == this) return;
Clear();
MergeFrom(from);
}
bool MapFile::IsInitialized() const {
if ((_has_bits_[0] & 0x00000007) != 0x00000007) return false;
return true;
}
void MapFile::Swap(MapFile* other) {
if (other != this) {
std::swap(filename_, other->filename_);
std::swap(input_index_, other->input_index_);
std::swap(output_index_, other->output_index_);
std::swap(variable_, other->variable_);
std::swap(_has_bits_[0], other->_has_bits_[0]);
_unknown_fields_.swap(other->_unknown_fields_);
std::swap(_cached_size_, other->_cached_size_);
}
}
::std::string MapFile::GetTypeName() const {
return "loomrun.MapFile";
}
// ===================================================================
#ifndef _MSC_VER
const int Run::kArgsFieldNumber;
const int Run::kMapsFieldNumber;
const int Run::kMapInputsFieldNumber;
const int Run::kMapOutputsFieldNumber;
#endif // !_MSC_VER
Run::Run()
......@@ -441,7 +117,8 @@ Run* Run::New() const {
void Run::Clear() {
args_.Clear();
maps_.Clear();
map_inputs_.Clear();
map_outputs_.Clear();
::memset(_has_bits_, 0, sizeof(_has_bits_));
mutable_unknown_fields()->clear();
}
......@@ -470,20 +147,34 @@ bool Run::MergePartialFromCodedStream(
goto handle_unusual;
}
if (input->ExpectTag(10)) goto parse_args;
if (input->ExpectTag(18)) goto parse_maps;
if (input->ExpectTag(18)) goto parse_map_inputs;
break;
}
// repeated .loomrun.MapFile maps = 2;
// repeated string map_inputs = 2;
case 2: {
if (tag == 18) {
parse_maps:
DO_(::google::protobuf::internal::WireFormatLite::ReadMessageNoVirtual(
input, add_maps()));
parse_map_inputs:
DO_(::google::protobuf::internal::WireFormatLite::ReadString(
input, this->add_map_inputs()));
} else {
goto handle_unusual;
}
if (input->ExpectTag(18)) goto parse_maps;
if (input->ExpectTag(18)) goto parse_map_inputs;
if (input->ExpectTag(26)) goto parse_map_outputs;
break;
}
// repeated string map_outputs = 3;
case 3: {
if (tag == 26) {
parse_map_outputs:
DO_(::google::protobuf::internal::WireFormatLite::ReadString(
input, this->add_map_outputs()));
} else {
goto handle_unusual;
}
if (input->ExpectTag(26)) goto parse_map_outputs;
if (input->ExpectAtEnd()) goto success;
break;
}
......@@ -519,10 +210,16 @@ void Run::SerializeWithCachedSizes(
1, this->args(i), output);
}
// repeated .loomrun.MapFile maps = 2;
for (int i = 0; i < this->maps_size(); i++) {
::google::protobuf::internal::WireFormatLite::WriteMessage(
2, this->maps(i), output);
// repeated string map_inputs = 2;
for (int i = 0; i < this->map_inputs_size(); i++) {
::google::protobuf::internal::WireFormatLite::WriteString(