Commit 803e97b6 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: Task mode

parent ca761af1
......@@ -18,12 +18,38 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='loomplan.proto',
package='loomplan',
serialized_pb=_b('\n\x0eloomplan.proto\x12\x08loomplan\"<\n\x04Task\x12\x11\n\ttask_type\x18\x01 \x02(\x05\x12\x0e\n\x06\x63onfig\x18\x02 \x02(\x0c\x12\x11\n\tinput_ids\x18\x03 \x03(\x05\"9\n\x04Plan\x12\x1d\n\x05tasks\x18\x02 \x03(\x0b\x32\x0e.loomplan.Task\x12\x12\n\nresult_ids\x18\x03 \x03(\x05\x42\x02H\x03')
serialized_pb=_b('\n\x0eloomplan.proto\x12\x08loomplan\"\xae\x01\n\x04Task\x12\x11\n\ttask_type\x18\x01 \x02(\x05\x12\x0e\n\x06\x63onfig\x18\x02 \x02(\x0c\x12\x11\n\tinput_ids\x18\x03 \x03(\x05\x12\x30\n\x04mode\x18\x04 \x01(\x0e\x32\x13.loomplan.Task.Mode:\rMODE_STANDARD\">\n\x04Mode\x12\x11\n\rMODE_STANDARD\x10\x01\x12\x0f\n\x0bMODE_SIMPLE\x10\x02\x12\x12\n\x0eMODE_SCHEDULER\x10\x03\"9\n\x04Plan\x12\x1d\n\x05tasks\x18\x02 \x03(\x0b\x32\x0e.loomplan.Task\x12\x12\n\nresult_ids\x18\x03 \x03(\x05\x42\x02H\x03')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
_TASK_MODE = _descriptor.EnumDescriptor(
name='Mode',
full_name='loomplan.Task.Mode',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='MODE_STANDARD', index=0, number=1,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='MODE_SIMPLE', index=1, number=2,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='MODE_SCHEDULER', index=2, number=3,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=141,
serialized_end=203,
)
_sym_db.RegisterEnumDescriptor(_TASK_MODE)
_TASK = _descriptor.Descriptor(
name='Task',
......@@ -53,19 +79,27 @@ _TASK = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='mode', full_name='loomplan.Task.mode', index=3,
number=4, type=14, cpp_type=8, label=1,
has_default_value=True, default_value=1,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
_TASK_MODE,
],
options=None,
is_extendable=False,
extension_ranges=[],
oneofs=[
],
serialized_start=28,
serialized_end=88,
serialized_start=29,
serialized_end=203,
)
......@@ -101,10 +135,12 @@ _PLAN = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=90,
serialized_end=147,
serialized_start=205,
serialized_end=262,
)
_TASK.fields_by_name['mode'].enum_type = _TASK_MODE
_TASK_MODE.containing_type = _TASK
_PLAN.fields_by_name['tasks'].message_type = _TASK
DESCRIPTOR.message_types_by_name['Task'] = _TASK
DESCRIPTOR.message_types_by_name['Plan'] = _PLAN
......
......@@ -6,11 +6,17 @@ import gv
import struct
MODE_STANDARD = loomplan_pb2.Task.MODE_STANDARD
MODE_SIMPLE = loomplan_pb2.Task.MODE_SIMPLE
MODE_SCHEDULER = loomplan_pb2.Task.MODE_SCHEDULER
class Task(object):
inputs = ()
id = None
config = ""
mode = MODE_STANDARD
def set_message(self, msg, symbols):
msg.config = self.config
......@@ -51,6 +57,7 @@ class Plan(object):
def task_dslice(self, input):
task = Task()
task.task_type = self.TASK_SCHEDULER_DSLICE
task.mode = MODE_SCHEDULER
task.inputs = (input,)
return self.add(task)
......@@ -58,24 +65,28 @@ class Plan(object):
task = Task()
task.task_type = self.TASK_DATA_CONST
task.config = data
task.mode = MODE_SIMPLE
return self.add(task)
def task_merge(self, inputs):
task = Task()
task.task_type = self.TASK_DATA_MERGE
task.inputs = inputs
task.mode = MODE_SIMPLE
return self.add(task)
def task_open(self, filename):
task = Task()
task.task_type = self.TASK_DATA_OPEN
task.config = filename
task.mode = MODE_SIMPLE
return self.add(task)
def task_split(self, input, char=None):
task = Task()
task.task_type = self.TASK_DATA_SPLIT
task.inputs = (input,)
task.mode = MODE_SIMPLE
return self.add(task)
def task_run(self, args, inputs=(), outputs=(None,), stdin=None):
......@@ -103,6 +114,7 @@ class Plan(object):
task = Task()
task.task_type = self.TASK_ARRAY_MAKE
task.inputs = inputs
task.mode = MODE_SIMPLE
return self.add(task)
def task_get(self, input, index):
......@@ -110,6 +122,7 @@ class Plan(object):
task.task_type = self.TASK_BASE_GET
task.inputs = (input,)
task.config = self.u64.pack(index)
task.mode = MODE_SIMPLE
return self.add(task)
def task_slice(self, input, start, end):
......@@ -117,6 +130,7 @@ class Plan(object):
task.task_type = self.TASK_BASE_SLICE
task.inputs = (input,)
task.config = self.u64u64.pack(start, end)
task.mode = MODE_SIMPLE
return self.add(task)
def create_message(self, symbols):
......
......@@ -56,10 +56,30 @@ struct StaticDescriptorInitializer_loomplan_2eproto {
// ===================================================================
bool Task_Mode_IsValid(int value) {
switch(value) {
case 1:
case 2:
case 3:
return true;
default:
return false;
}
}
#ifndef _MSC_VER
const Task_Mode Task::MODE_STANDARD;
const Task_Mode Task::MODE_SIMPLE;
const Task_Mode Task::MODE_SCHEDULER;
const Task_Mode Task::Mode_MIN;
const Task_Mode Task::Mode_MAX;
const int Task::Mode_ARRAYSIZE;
#endif // _MSC_VER
#ifndef _MSC_VER
const int Task::kTaskTypeFieldNumber;
const int Task::kConfigFieldNumber;
const int Task::kInputIdsFieldNumber;
const int Task::kModeFieldNumber;
#endif // !_MSC_VER
Task::Task()
......@@ -83,6 +103,7 @@ void Task::SharedCtor() {
_cached_size_ = 0;
task_type_ = 0;
config_ = const_cast< ::std::string*>(&::google::protobuf::internal::GetEmptyStringAlreadyInited());
mode_ = 1;
::memset(_has_bits_, 0, sizeof(_has_bits_));
}
......@@ -124,13 +145,14 @@ Task* Task::New() const {
}
void Task::Clear() {
if (_has_bits_[0 / 32] & 3) {
if (_has_bits_[0 / 32] & 11) {
task_type_ = 0;
if (has_config()) {
if (config_ != &::google::protobuf::internal::GetEmptyStringAlreadyInited()) {
config_->clear();
}
}
mode_ = 1;
}
input_ids_.Clear();
::memset(_has_bits_, 0, sizeof(_has_bits_));
......@@ -193,6 +215,27 @@ bool Task::MergePartialFromCodedStream(
goto handle_unusual;
}
if (input->ExpectTag(24)) goto parse_input_ids;
if (input->ExpectTag(32)) goto parse_mode;
break;
}
// optional .loomplan.Task.Mode mode = 4 [default = MODE_STANDARD];
case 4: {
if (tag == 32) {
parse_mode:
int value;
DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive<
int, ::google::protobuf::internal::WireFormatLite::TYPE_ENUM>(
input, &value)));
if (::loomplan::Task_Mode_IsValid(value)) {
set_mode(static_cast< ::loomplan::Task_Mode >(value));
} else {
unknown_fields_stream.WriteVarint32(tag);
unknown_fields_stream.WriteVarint32(value);
}
} else {
goto handle_unusual;
}
if (input->ExpectAtEnd()) goto success;
break;
}
......@@ -239,6 +282,12 @@ void Task::SerializeWithCachedSizes(
3, this->input_ids(i), output);
}
// optional .loomplan.Task.Mode mode = 4 [default = MODE_STANDARD];
if (has_mode()) {
::google::protobuf::internal::WireFormatLite::WriteEnum(
4, this->mode(), output);
}
output->WriteRaw(unknown_fields().data(),
unknown_fields().size());
// @@protoc_insertion_point(serialize_end:loomplan.Task)
......@@ -262,6 +311,12 @@ int Task::ByteSize() const {
this->config());
}
// optional .loomplan.Task.Mode mode = 4 [default = MODE_STANDARD];
if (has_mode()) {
total_size += 1 +
::google::protobuf::internal::WireFormatLite::EnumSize(this->mode());
}
}
// repeated int32 input_ids = 3;
{
......@@ -296,6 +351,9 @@ void Task::MergeFrom(const Task& from) {
if (from.has_config()) {
set_config(from.config());
}
if (from.has_mode()) {
set_mode(from.mode());
}
}
mutable_unknown_fields()->append(from.unknown_fields());
}
......@@ -317,6 +375,7 @@ void Task::Swap(Task* other) {
std::swap(task_type_, other->task_type_);
std::swap(config_, other->config_);
input_ids_.Swap(&other->input_ids_);
std::swap(mode_, other->mode_);
std::swap(_has_bits_[0], other->_has_bits_[0]);
_unknown_fields_.swap(other->_unknown_fields_);
std::swap(_cached_size_, other->_cached_size_);
......
......@@ -35,6 +35,16 @@ void protobuf_ShutdownFile_loomplan_2eproto();
class Task;
class Plan;
enum Task_Mode {
Task_Mode_MODE_STANDARD = 1,
Task_Mode_MODE_SIMPLE = 2,
Task_Mode_MODE_SCHEDULER = 3
};
bool Task_Mode_IsValid(int value);
const Task_Mode Task_Mode_Mode_MIN = Task_Mode_MODE_STANDARD;
const Task_Mode Task_Mode_Mode_MAX = Task_Mode_MODE_SCHEDULER;
const int Task_Mode_Mode_ARRAYSIZE = Task_Mode_Mode_MAX + 1;
// ===================================================================
class Task : public ::google::protobuf::MessageLite {
......@@ -96,6 +106,20 @@ class Task : public ::google::protobuf::MessageLite {
// nested types ----------------------------------------------------
typedef Task_Mode Mode;
static const Mode MODE_STANDARD = Task_Mode_MODE_STANDARD;
static const Mode MODE_SIMPLE = Task_Mode_MODE_SIMPLE;
static const Mode MODE_SCHEDULER = Task_Mode_MODE_SCHEDULER;
static inline bool Mode_IsValid(int value) {
return Task_Mode_IsValid(value);
}
static const Mode Mode_MIN =
Task_Mode_Mode_MIN;
static const Mode Mode_MAX =
Task_Mode_Mode_MAX;
static const int Mode_ARRAYSIZE =
Task_Mode_Mode_ARRAYSIZE;
// accessors -------------------------------------------------------
// required int32 task_type = 1;
......@@ -129,20 +153,30 @@ class Task : public ::google::protobuf::MessageLite {
inline ::google::protobuf::RepeatedField< ::google::protobuf::int32 >*
mutable_input_ids();
// optional .loomplan.Task.Mode mode = 4 [default = MODE_STANDARD];
inline bool has_mode() const;
inline void clear_mode();
static const int kModeFieldNumber = 4;
inline ::loomplan::Task_Mode mode() const;
inline void set_mode(::loomplan::Task_Mode value);
// @@protoc_insertion_point(class_scope:loomplan.Task)
private:
inline void set_has_task_type();
inline void clear_has_task_type();
inline void set_has_config();
inline void clear_has_config();
inline void set_has_mode();
inline void clear_has_mode();
::std::string _unknown_fields_;
::google::protobuf::uint32 _has_bits_[1];
mutable int _cached_size_;
::std::string* config_;
::google::protobuf::RepeatedField< ::google::protobuf::int32 > input_ids_;
::google::protobuf::int32 task_type_;
int mode_;
::google::protobuf::RepeatedField< ::google::protobuf::int32 > input_ids_;
#ifdef GOOGLE_PROTOBUF_NO_STATIC_INITIALIZER
friend void protobuf_AddDesc_loomplan_2eproto_impl();
#else
......@@ -398,6 +432,31 @@ Task::mutable_input_ids() {
return &input_ids_;
}
// optional .loomplan.Task.Mode mode = 4 [default = MODE_STANDARD];
inline bool Task::has_mode() const {
return (_has_bits_[0] & 0x00000008u) != 0;
}
inline void Task::set_has_mode() {
_has_bits_[0] |= 0x00000008u;
}
inline void Task::clear_has_mode() {
_has_bits_[0] &= ~0x00000008u;
}
inline void Task::clear_mode() {
mode_ = 1;
clear_has_mode();
}
inline ::loomplan::Task_Mode Task::mode() const {
// @@protoc_insertion_point(field_get:loomplan.Task.mode)
return static_cast< ::loomplan::Task_Mode >(mode_);
}
inline void Task::set_mode(::loomplan::Task_Mode value) {
assert(::loomplan::Task_Mode_IsValid(value));
set_has_mode();
mode_ = value;
// @@protoc_insertion_point(field_set:loomplan.Task.mode)
}
// -------------------------------------------------------------------
// Plan
......
......@@ -3,9 +3,16 @@ package loomplan;
option optimize_for = LITE_RUNTIME;
message Task {
enum Mode {
MODE_STANDARD = 1;
MODE_SIMPLE = 2;
MODE_SCHEDULER = 3;
}
required int32 task_type = 1;
required bytes config = 2;
repeated int32 input_ids = 3;
optional Mode mode = 4 [default = MODE_STANDARD];
}
message Plan {
......
......@@ -18,6 +18,20 @@ TaskManager::TaskManager(Server &server)
slice_task_id = dictionary.find_or_create("loom/base/slice");
}
static TaskNode::TaskMode read_task_mode(loomplan::Task_Mode mode) {
switch(mode) {
case loomplan::Task_Mode_MODE_STANDARD:
return TaskNode::MODE_STANDARD;
case loomplan::Task_Mode_MODE_SIMPLE:
return TaskNode::MODE_SIMPLE;
case loomplan::Task_Mode_MODE_SCHEDULER:
return TaskNode::MODE_SCHEDULER;
default:
llog->critical("Invalid task mode");
exit(1);
}
}
void TaskManager::add_plan(const loomplan::Plan &plan, bool distribute)
{
auto task_size = plan.tasks_size();
......@@ -26,7 +40,7 @@ void TaskManager::add_plan(const loomplan::Plan &plan, bool distribute)
const auto& pt = plan.tasks(i);
auto id = i + id_base;
tasks[id] = std::make_unique<TaskNode>(
id, i, pt.task_type(), pt.config());
id, i, read_task_mode(pt.mode()), pt.task_type(), pt.config());
}
std::vector<TaskNode*> ready_tasks;
......@@ -173,12 +187,12 @@ void TaskManager::expand_dslice(TaskNode *node, TaskNode::Vector &tasks)
i = indices[1];
std::string config(reinterpret_cast<char*>(&indices), sizeof(size_t) * 2);
auto new_slice = std::make_unique<TaskNode>(new_id, -1, slice_task_id, config);
auto new_slice = std::make_unique<TaskNode>(new_id, -1, TaskNode::MODE_SIMPLE, slice_task_id, config);
new_slice->set_inputs(node->get_inputs());
tasks.push_back(new_slice.get());
auto new_task = std::make_unique<TaskNode>(
new_id + 1, -1, next->get_task_type(), next->get_config());
new_id + 1, -1, next->get_mode(), next->get_task_type(), next->get_config());
new_task->add_input(new_slice.get());
new_slice->inc_ref_counter();
new_tasks.push_back(new_slice.get());
......
......@@ -23,8 +23,14 @@ public:
FINISHED
};
TaskNode(loom::Id id, loom::Id client_id, int task_type, const std::string &config)
: state(WAITING), id(id), ref_count(0), task_type(task_type),
enum TaskMode {
MODE_STANDARD,
MODE_SIMPLE,
MODE_SCHEDULER
};
TaskNode(loom::Id id, loom::Id client_id, TaskMode mode, int task_type, const std::string &config)
: state(WAITING), id(id), mode(mode), ref_count(0), task_type(task_type),
config(config),
size(0), length(0), client_id(client_id)
{}
......@@ -72,6 +78,10 @@ public:
return ref_count;
}
TaskMode get_mode() const {
return mode;
}
void set_state(State state) {
this->state = state;
}
......@@ -136,6 +146,7 @@ public:
private:
State state;
loom::Id id;
TaskMode mode;
int ref_count;
loom::TaskId task_type;
Vector inputs;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment