Commit 09978eea authored by Stanislav Bohm's avatar Stanislav Bohm

RF: Task refactored

parent e98ad40e
......@@ -26,122 +26,19 @@ class Task(object):
msg.input_ids.extend(t.id for t in self.inputs)
class ConstTask(Task):
task_type = "const"
def __init__(self, data):
self.config = data
class MergeTask(Task):
task_type = "merge"
def __init__(self, inputs):
self.inputs = inputs
class OpenTask(Task):
task_type = "open"
def __init__(self, filename):
self.config = filename
class SplitLinesTask(Task):
task_type = "split_lines"
struct = struct.Struct("<QQ")
def __init__(self, input, start, end):
self.config = self.struct.pack(start, end)
self.inputs = (input,)
class RunTask(Task):
task_type = "run"
def __init__(self, args, stdin=None, stdout=None, variable=None):
if isinstance(args, str):
args = args.split()
else:
args = map(str, args)
self.args = args
self.variable = variable
self.stdout = None
self.stdin = stdin
self.stdout = stdout
self.file_ins = []
self.file_outs = []
def map_file_in(self, task, filename):
self.file_ins.append((task, filename))
return self
def map_file_out(self, filename):
self.file_outs.append(filename)
return self
@property
def inputs(self):
if self.stdin is not None:
inputs = [self.stdin]
else:
inputs = []
for task, filename in self.file_ins:
if task not in inputs:
inputs.append(task)
return inputs
@property
def config(self):
msg = loomrun_pb2.Run()
msg.args.extend(self.args)
# stdin
if self.stdin is not None:
mf = msg.maps.add()
mf.filename = "+in"
mf.input_index = 0
mf.output_index = -2
output_mapped = False
for filename in self.file_outs:
mf = msg.maps.add()
mf.filename = filename
mf.input_index = -2
mf.output_index = -1
output_mapped = True
if not output_mapped:
# stdout
mf = msg.maps.add()
mf.filename = "+out"
mf.input_index = -2
mf.output_index = -1
inputs = self.inputs
for task, filename in self.file_ins:
mf = msg.maps.add()
mf.filename = filename
mf.input_index = inputs.index(task)
mf.output_index = -2
return msg.SerializeToString()
class Plan(object):
TASK_DATA_CONST = "data/const"
TASK_DATA_MERGE = "data/merge"
TASK_DATA_OPEN = "data/open"
TASK_DATA_SPLIT_LINES = "data/split_lines"
TASK_ARRAY_MAKE = "array/make"
TASK_ARRAY_GET = "array/get"
TASK_RUN = "run"
TASK_RUN = "run/run"
u64 = struct.Struct("<Q")
u64u64 = struct.Struct("<QQ")
def __init__(self):
self.tasks = []
......@@ -155,16 +52,29 @@ class Plan(object):
return task
def task_const(self, data):
return self.add(ConstTask(data))
def task_open(self, filename):
return self.add(OpenTask(filename))
task = Task()
task.task_type = self.TASK_DATA_CONST
task.config = data
return self.add(task)
def task_merge(self, inputs):
return self.add(MergeTask(inputs))
task = Task()
task.task_type = self.TASK_DATA_MERGE
task.inputs = inputs
return self.add(task)
def task_open(self, filename):
task = Task()
task.task_type = self.TASK_DATA_OPEN
task.config = filename
return self.add(task)
def task_split_lines(self, input, start, end):
return self.add(SplitLinesTask(input, start, end))
task = Task()
task.task_type = self.TASK_DATA_SPLIT_LINES
task.config = self.u64u64.pack(start, end)
task.inputs = (input,)
return self.add(task)
def task_run(self, args, inputs=(), outputs=(None,), stdin=None):
if isinstance(args, str):
......
......@@ -114,15 +114,15 @@ int main(int argc, char **argv)
// Basic
worker.add_task_factory(
std::make_unique<SimpleTaskFactory<RunTask>>("run"));
std::make_unique<SimpleTaskFactory<RunTask>>("run/run"));
worker.add_task_factory(
std::make_unique<SimpleTaskFactory<ConstTask>>("const"));
std::make_unique<SimpleTaskFactory<ConstTask>>("data/const"));
worker.add_task_factory(
std::make_unique<SimpleTaskFactory<MergeTask>>("merge"));
std::make_unique<SimpleTaskFactory<MergeTask>>("data/merge"));
worker.add_task_factory(
std::make_unique<SimpleTaskFactory<OpenTask>>("open"));
std::make_unique<SimpleTaskFactory<OpenTask>>("data/open"));
worker.add_task_factory(
std::make_unique<SimpleTaskFactory<LineSplitTask>>("split_lines"));
std::make_unique<SimpleTaskFactory<LineSplitTask>>("data/split_lines"));
// Arrays
worker.add_task_factory(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment