Commit 2d7d5103 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: Plan splitted to Plan and PlanBuilder

parent 492bb781
from .client import Client, LoomException, TaskFailed # noqa
from .plan import Plan, cpus, cpu1 # noqa
from .plan import Plan # noqa
from .planbuilder import PlanBuilder, cpus, cpu1 # noqa
import loomplan_pb2
import loomrun_pb2
import struct
POLICY_STANDARD = loomplan_pb2.Task.POLICY_STANDARD
......@@ -57,36 +54,8 @@ class ResourceRequest(object):
return hash(tuple(sorted(self.resources.items())))
def cpus(value):
r = ResourceRequest()
r.add_resource("loom/resource/cpus", value)
return r
cpu1 = cpus(1)
class Plan(object):
TASK_BASE_GET = "loom/base/get"
TASK_BASE_SLICE = "loom/base/slice"
TASK_BASE_SIZE = "loom/base/size"
TASK_BASE_LENGTH = "loom/base/length"
TASK_DATA_CONST = "loom/data/const"
TASK_DATA_MERGE = "loom/data/merge"
TASK_DATA_OPEN = "loom/data/open"
TASK_DATA_SPLIT = "loom/data/split"
TASK_ARRAY_MAKE = "loom/array/make"
TASK_RUN = "loom/run/run"
TASK_SCHEDULER_DSLICE = "loom/scheduler/dslice"
TASK_SCHEDULER_DGET = "loom/scheduler/dget"
u64 = struct.Struct("<Q")
u64u64 = struct.Struct("<QQ")
def __init__(self):
self.tasks = []
......@@ -96,113 +65,6 @@ class Plan(object):
self.tasks.append(task)
return task
def task_dslice(self, input):
task = Task()
task.task_type = self.TASK_SCHEDULER_DSLICE
task.policy = POLICY_SCHEDULER
task.inputs = (input,)
return self.add(task)
def task_dget(self, input):
task = Task()
task.task_type = self.TASK_SCHEDULER_DGET
task.policy = POLICY_SCHEDULER
task.inputs = (input,)
return self.add(task)
def task_const(self, data):
task = Task()
task.task_type = self.TASK_DATA_CONST
task.config = data
task.policy = POLICY_SIMPLE
return self.add(task)
def task_merge(self, inputs):
task = Task()
task.task_type = self.TASK_DATA_MERGE
task.inputs = inputs
task.policy = POLICY_SIMPLE
return self.add(task)
def task_open(self, filename):
task = Task()
task.task_type = self.TASK_DATA_OPEN
task.config = filename
task.policy = POLICY_SIMPLE
return self.add(task)
def task_split(self, input, char=None):
task = Task()
task.task_type = self.TASK_DATA_SPLIT
task.inputs = (input,)
task.policy = POLICY_SIMPLE
return self.add(task)
def task_run(self,
args,
inputs=(),
outputs=(None,),
stdin=None,
request=cpu1):
if isinstance(args, str):
args = args.split()
if stdin is not None:
inputs = ((stdin, None),) + tuple(inputs)
task = Task()
task.task_type = self.TASK_RUN
task.inputs = tuple(i for i, fname in inputs)
msg = loomrun_pb2.Run()
msg.args.extend(args)
msg.map_inputs.extend(fname if fname else "+in"
for i, fname in inputs)
msg.map_outputs.extend(fname if fname else "+out"
for fname in outputs)
task.config = msg.SerializeToString()
task.resource_request = request
return self.add(task)
def task_array_make(self, inputs):
task = Task()
task.task_type = self.TASK_ARRAY_MAKE
task.inputs = inputs
task.policy = POLICY_SIMPLE
return self.add(task)
def task_size(self, input):
task = Task()
task.task_type = self.TASK_BASE_SIZE
task.inputs = (input,)
task.policy = POLICY_SIMPLE
return self.add(task)
def task_length(self, input):
task = Task()
task.task_type = self.TASK_BASE_LENGTH
task.inputs = (input,)
task.policy = POLICY_SIMPLE
return self.add(task)
def task_get(self, input, index):
task = Task()
task.task_type = self.TASK_BASE_GET
task.inputs = (input,)
task.config = self.u64.pack(index)
task.policy = POLICY_SIMPLE
return self.add(task)
def task_slice(self, input, start, end):
task = Task()
task.task_type = self.TASK_BASE_SLICE
task.inputs = (input,)
task.config = self.u64u64.pack(start, end)
task.policy = POLICY_SIMPLE
return self.add(task)
def set_message(self, msg, symbols):
requests = set()
for task in self.tasks:
......
from .plan import Plan, Task, ResourceRequest
from .plan import POLICY_SCHEDULER, POLICY_SIMPLE
import struct
import loomrun_pb2
def cpus(value):
r = ResourceRequest()
r.add_resource("loom/resource/cpus", value)
return r
cpu1 = cpus(1)
class PlanBuilder(object):
TASK_BASE_GET = "loom/base/get"
TASK_BASE_SLICE = "loom/base/slice"
TASK_BASE_SIZE = "loom/base/size"
TASK_BASE_LENGTH = "loom/base/length"
TASK_DATA_CONST = "loom/data/const"
TASK_DATA_MERGE = "loom/data/merge"
TASK_DATA_OPEN = "loom/data/open"
TASK_DATA_SPLIT = "loom/data/split"
TASK_ARRAY_MAKE = "loom/array/make"
TASK_RUN = "loom/run/run"
TASK_SCHEDULER_DSLICE = "loom/scheduler/dslice"
TASK_SCHEDULER_DGET = "loom/scheduler/dget"
u64 = struct.Struct("<Q")
u64u64 = struct.Struct("<QQ")
def __init__(self, plan=None):
if plan is None:
plan = Plan()
self.plan = plan
def task_dslice(self, input):
task = Task()
task.task_type = self.TASK_SCHEDULER_DSLICE
task.policy = POLICY_SCHEDULER
task.inputs = (input,)
return self.plan.add(task)
def task_dget(self, input):
task = Task()
task.task_type = self.TASK_SCHEDULER_DGET
task.policy = POLICY_SCHEDULER
task.inputs = (input,)
return self.plan.add(task)
def task_const(self, data):
task = Task()
task.task_type = self.TASK_DATA_CONST
task.config = data
task.policy = POLICY_SIMPLE
return self.plan.add(task)
def task_merge(self, inputs):
task = Task()
task.task_type = self.TASK_DATA_MERGE
task.inputs = inputs
task.policy = POLICY_SIMPLE
return self.plan.add(task)
def task_open(self, filename):
task = Task()
task.task_type = self.TASK_DATA_OPEN
task.config = filename
task.policy = POLICY_SIMPLE
return self.plan.add(task)
def task_split(self, input, char=None):
task = Task()
task.task_type = self.TASK_DATA_SPLIT
task.inputs = (input,)
task.policy = POLICY_SIMPLE
return self.plan.add(task)
def task_run(self,
args,
inputs=(),
outputs=(None,),
stdin=None,
request=cpu1):
if isinstance(args, str):
args = args.split()
if stdin is not None:
inputs = ((stdin, None),) + tuple(inputs)
task = Task()
task.task_type = self.TASK_RUN
task.inputs = tuple(i for i, fname in inputs)
msg = loomrun_pb2.Run()
msg.args.extend(args)
msg.map_inputs.extend(fname if fname else "+in"
for i, fname in inputs)
msg.map_outputs.extend(fname if fname else "+out"
for fname in outputs)
task.config = msg.SerializeToString()
task.resource_request = request
return self.plan.add(task)
def task_array_make(self, inputs):
task = Task()
task.task_type = self.TASK_ARRAY_MAKE
task.inputs = inputs
task.policy = POLICY_SIMPLE
return self.plan.add(task)
def task_size(self, input):
task = Task()
task.task_type = self.TASK_BASE_SIZE
task.inputs = (input,)
task.policy = POLICY_SIMPLE
return self.plan.add(task)
def task_length(self, input):
task = Task()
task.task_type = self.TASK_BASE_LENGTH
task.inputs = (input,)
task.policy = POLICY_SIMPLE
return self.plan.add(task)
def task_get(self, input, index):
task = Task()
task.task_type = self.TASK_BASE_GET
task.inputs = (input,)
task.config = self.u64.pack(index)
task.policy = POLICY_SIMPLE
return self.plan.add(task)
def task_slice(self, input, start, end):
task = Task()
task.task_type = self.TASK_BASE_SLICE
task.inputs = (input,)
task.config = self.u64u64.pack(start, end)
task.policy = POLICY_SIMPLE
return self.plan.add(task)
......@@ -6,7 +6,7 @@ loom_env # silence flake8
def test_make_array(loom_env):
loom_env.start(1)
p = loom_env.plan()
p = loom_env.plan_builder()
a = p.task_const("ABC")
b = p.task_const("123456")
c = p.task_const("")
......@@ -27,7 +27,7 @@ def test_make_array(loom_env):
def test_slice_array(loom_env):
loom_env.start(1)
p = loom_env.plan()
p = loom_env.plan_builder()
items = [p.task_const(str(i)) for i in xrange(20)]
a = p.task_array_make(items)
......@@ -47,7 +47,7 @@ def test_slice_array(loom_env):
def test_make_empty_array(loom_env):
p = loom_env.plan()
p = loom_env.plan_builder()
a = p.task_array_make(())
loom_env.start(1)
result_a = loom_env.submit(p, a)
......@@ -55,7 +55,7 @@ def test_make_empty_array(loom_env):
def test_array_of_array(loom_env):
p = loom_env.plan()
p = loom_env.plan_builder()
a = p.task_const("ABC")
b = p.task_const("123")
......@@ -73,7 +73,7 @@ def test_array_of_array(loom_env):
def test_array_same_value(loom_env):
p = loom_env.plan()
p = loom_env.plan_builder()
a = p.task_const("ABC")
b = p.task_array_make((a, a, a, a))
loom_env.start(1)
......
......@@ -14,7 +14,7 @@ def test_cv_iris(loom_env):
loom_env.start(4, 4)
loom_env.info = False
p = loom_env.plan()
p = loom_env.plan_builder()
data = p.task_open(IRIS_DATA)
data = p.task_run(("sort", "--random-sort", "-"), [(data, None)])
lines = p.task_split(data)
......
......@@ -32,7 +32,7 @@ def str2datetime(s):
def test_single_result(loom_env):
loom_env.start(1)
p = loom_env.plan()
p = loom_env.plan_builder()
a = p.task_const("ABCDE")
b = p.task_const("123")
......@@ -42,7 +42,7 @@ def test_single_result(loom_env):
def test_more_results(loom_env):
loom_env.start(1)
p = loom_env.plan()
p = loom_env.plan_builder()
a = p.task_const("ABCDE")
b = p.task_const("123")
......@@ -52,7 +52,7 @@ def test_more_results(loom_env):
def test_merge_w3(loom_env):
loom_env.start(3)
p = loom_env.plan()
p = loom_env.plan_builder()
a = p.task_const("ABCDE")
b = p.task_const("123")
......@@ -62,7 +62,7 @@ def test_merge_w3(loom_env):
def test_run_separated_1_cpu(loom_env):
loom_env.start(1)
p = loom_env.plan()
p = loom_env.plan_builder()
args = pytestprog(0.3, stamp=True)
tasks = [p.task_run(args) for i in range(4)]
......@@ -86,7 +86,7 @@ def test_run_separated_1_cpu(loom_env):
def test_run_separated_4_cpu(loom_env):
loom_env.start(1, cpus=4)
p = loom_env.plan()
p = loom_env.plan_builder()
args = pytestprog(0.3, stamp=True)
tasks = [p.task_run(args) for i in range(4)]
......@@ -110,7 +110,7 @@ def test_run_separated_4_cpu(loom_env):
def test_run_separated_4cpu_tasks_4_cpu(loom_env):
loom_env.start(1, cpus=4)
p = loom_env.plan()
p = loom_env.plan_builder()
args = pytestprog(0.3, stamp=True)
tasks = [p.task_run(args, request=client.cpus(4)) for i in range(4)]
......@@ -133,7 +133,7 @@ def test_run_separated_4cpu_tasks_4_cpu(loom_env):
def test_run_double_lines(loom_env):
p = loom_env.plan()
p = loom_env.plan_builder()
COUNT = 20000
......@@ -157,7 +157,7 @@ def test_run_double_lines(loom_env):
def test_run_files(loom_env):
p = loom_env.plan()
p = loom_env.plan_builder()
a1 = p.task_const("abcd" * 100)
b1 = p.task_run(
......@@ -177,7 +177,7 @@ def test_run_files(loom_env):
def test_open_and_merge(loom_env):
p = loom_env.plan()
p = loom_env.plan_builder()
a = p.task_open(FILE1)
b = p.task_open(FILE2)
c = p.task_merge((a, b))
......@@ -192,7 +192,7 @@ def test_open_and_merge(loom_env):
def test_open_and_splitlines(loom_env):
loom_env.start(1)
p = loom_env.plan()
p = loom_env.plan_builder()
a = p.task_open(FILE2)
lines = p.task_split(a)
c1 = p.task_slice(lines, 2, 6)
......@@ -212,7 +212,7 @@ def test_open_and_splitlines(loom_env):
def test_split(loom_env):
loom_env.start(1)
text = "Line1\nLine2\nLine3\nLine4"
p = loom_env.plan()
p = loom_env.plan_builder()
a = p.task_const(text)
b = p.task_split(a)
......@@ -232,7 +232,7 @@ def test_split(loom_env):
def test_size_and_length(loom_env):
loom_env.start(1)
p = loom_env.plan()
p = loom_env.plan_builder()
text = "12345" * 5
a1 = p.task_const(text)
......
......@@ -7,7 +7,7 @@ loom_env # silence flake8
def test_dslice(loom_env):
loom_env.start(2)
p = loom_env.plan()
p = loom_env.plan_builder()
consts = []
for i in xrange(16):
......@@ -24,7 +24,7 @@ def test_dslice(loom_env):
def test_dget(loom_env):
loom_env.start(2)
p = loom_env.plan()
p = loom_env.plan_builder()
consts = []
for i in xrange(16):
......
......@@ -8,7 +8,7 @@ loom_env # silence flake8
def test_invalid_program(loom_env):
loom_env.start(1)
p = loom_env.plan()
p = loom_env.plan_builder()
a = p.task_run("/usr/bin/non-existing-program")
with pytest.raises(client.TaskFailed):
......@@ -17,7 +17,7 @@ def test_invalid_program(loom_env):
def test_program_failed(loom_env):
loom_env.start(1)
p = loom_env.plan()
p = loom_env.plan_builder()
a = p.task_run("ls /non-existing-dictionary")
with pytest.raises(client.TaskFailed):
......
......@@ -81,8 +81,8 @@ class LoomEnv(Env):
assert not server.poll()
assert not any(w.poll() for w in workers)
def plan(self):
return client.Plan()
def plan_builder(self):
return client.PlanBuilder()
@property
def client(self):
......@@ -91,6 +91,8 @@ class LoomEnv(Env):
return self._client
def submit(self, plan, results, report=None):
if isinstance(plan, client.PlanBuilder):
plan = plan.plan
if report:
report = os.path.join(LOOM_TEST_BUILD_DIR, report)
return self.client.submit(plan, results, report)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment