Commit 451319ed authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: New Python API for client

parent ddf5a42d
from .client import Client, LoomException, TaskFailed, make_dry_report # noqa
from .plan import Plan # noqa
from .planbuilder import PlanBuilder, cpus, cpu1 # noqa
from client import Client, LoomException, TaskFailed, make_dry_report # noqa
import tasks # noqa
......@@ -3,7 +3,8 @@ from ..pb.loomreport_pb2 import Report
import socket
from connection import Connection
from plan import Task
from task import Task
from plan import Plan
LOOM_PROTOCOL_VERSION = 1
......@@ -45,20 +46,25 @@ class Client(object):
while self.symbols is None:
self._read_symbols()
def submit(self, plan, results, report=None):
msg = ClientSubmit()
msg.report = bool(report)
plan.set_message(msg.plan, self.symbols)
if isinstance(results, Task):
def submit(self, tasks, report=None):
if isinstance(tasks, Task):
single_result = True
msg.plan.result_ids.extend((results.id,))
expected = 1
tasks = (tasks,)
else:
single_result = False
r = set(results)
msg.plan.result_ids.extend(r.id for r in r)
expected = len(r)
task_set = set(tasks)
plan = Plan()
for task in task_set:
plan.add(task)
msg = ClientSubmit()
msg.report = bool(report)
msg.plan.result_ids.extend(plan.tasks[t] for t in task_set)
expected = len(task_set)
plan.set_message(msg.plan, self.symbols)
self._send_message(msg)
......@@ -84,9 +90,9 @@ class Client(object):
write_report(report_data, report)
if single_result:
return data[results.id]
return data[plan.tasks[tasks[0]]]
else:
return [data[task.id] for task in results]
return [data[plan.tasks[t]] for t in tasks]
def _symbol_list(self):
symbols = [None] * len(self.symbols)
......@@ -136,7 +142,14 @@ class Client(object):
self.connection.send_message(data)
def make_dry_report(plan, report_filename):
def make_dry_report(tasks, report_filename):
if isinstance(tasks, Task):
tasks = (tasks,)
plan = Plan()
for task in tasks:
plan.add(task)
# Create symbols
symbols = sorted(plan.collect_symbols())
symbol_table = {}
......
from ..pb import loomplan_pb2 as loomplan
POLICY_STANDARD = loomplan.Task.POLICY_STANDARD
POLICY_SIMPLE = loomplan.Task.POLICY_SIMPLE
POLICY_SCHEDULER = loomplan.Task.POLICY_SCHEDULER
class Task(object):
task_type = None
inputs = ()
id = None
config = ""
policy = POLICY_STANDARD
resource_request = None
label = None
def set_message(self, msg, symbols, requests):
msg.config = self.config
msg.task_type = symbols[self.task_type]
msg.input_ids.extend(t.id for t in self.inputs)
msg.policy = self.policy
if self.resource_request:
msg.resource_request_index = requests.index(self.resource_request)
if self.label:
msg.label = self.label
class ResourceRequest(object):
def __init__(self):
self.resources = {}
def add_resource(self, name, value):
self.resources[name] = value
@property
def names(self):
return self.resources.keys()
def set_message(self, msg, symbols):
for name, value in self.resources.items():
r = msg.resources.add()
r.resource_type = symbols[name]
r.value = value
def __eq__(self, other):
if not isinstance(other, ResourceRequest):
return False
return self.resources == other.resources
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(tuple(sorted(self.resources.items())))
class Plan(object):
def __init__(self):
self.tasks = []
self.tasks = {}
def add(self, task):
assert task.id is None
task.id = len(self.tasks)
self.tasks.append(task)
return task
def give_id(task):
if task not in self.tasks:
for t in task.inputs:
give_id(t)
self.tasks[task] = len(self.tasks)
give_id(task)
def collect_symbols(self):
symbols = set()
......@@ -79,16 +22,34 @@ class Plan(object):
def set_message(self, msg, symbols):
requests = set()
# Linearize tasks
tasks = [None] * len(self.tasks)
for task, index in self.tasks.items():
tasks[index] = task
# Gather requests
requests = set()
for task in self.tasks:
if task.resource_request:
requests.add(task.resource_request)
requests = list(requests)
# Build requests
for request in requests:
r = msg.resource_requests.add()
request.set_message(r, symbols)
for task in self.tasks:
t = msg.tasks.add()
task.set_message(t, symbols, requests)
# Build tasks
for task in tasks:
msg_t = msg.tasks.add()
msg_t.config = task.config
msg_t.task_type = symbols[task.task_type]
msg_t.input_ids.extend(self.tasks[t] for t in task.inputs)
msg_t.policy = task.policy
if task.resource_request:
msg_t.resource_request_index = \
requests.index(task.resource_request)
if task.label:
msg_t.label = task.label
return msg
from .plan import Plan, Task, ResourceRequest
from .plan import POLICY_SCHEDULER, POLICY_SIMPLE
import struct
from ..pb import loomrun_pb2 as loomrun
def cpus(value):
r = ResourceRequest()
r.add_resource("loom/resource/cpus", value)
return r
cpu1 = cpus(1)
class PlanBuilder(object):
TASK_BASE_GET = "loom/base/get"
TASK_BASE_SLICE = "loom/base/slice"
TASK_BASE_SIZE = "loom/base/size"
TASK_BASE_LENGTH = "loom/base/length"
TASK_DATA_CONST = "loom/data/const"
TASK_DATA_MERGE = "loom/data/merge"
TASK_DATA_OPEN = "loom/data/open"
TASK_DATA_SPLIT = "loom/data/split"
TASK_ARRAY_MAKE = "loom/array/make"
TASK_RUN = "loom/run/run"
TASK_SCHEDULER_DSLICE = "loom/scheduler/dslice"
TASK_SCHEDULER_DGET = "loom/scheduler/dget"
u64 = struct.Struct("<Q")
u64u64 = struct.Struct("<QQ")
def __init__(self, plan=None):
if plan is None:
plan = Plan()
self.plan = plan
def task_dslice(self, input):
task = Task()
task.task_type = self.TASK_SCHEDULER_DSLICE
task.policy = POLICY_SCHEDULER
task.inputs = (input,)
return self.plan.add(task)
def task_dget(self, input):
task = Task()
task.task_type = self.TASK_SCHEDULER_DGET
task.policy = POLICY_SCHEDULER
task.inputs = (input,)
return self.plan.add(task)
def task_const(self, data):
task = Task()
task.task_type = self.TASK_DATA_CONST
task.config = data
task.policy = POLICY_SIMPLE
return self.plan.add(task)
def task_merge(self, inputs, delimiter=""):
task = Task()
task.task_type = self.TASK_DATA_MERGE
task.inputs = inputs
task.policy = POLICY_SIMPLE
if delimiter:
task.config = delimiter
return self.plan.add(task)
def task_open(self, filename):
task = Task()
task.task_type = self.TASK_DATA_OPEN
task.config = filename
task.policy = POLICY_SIMPLE
return self.plan.add(task)
def task_split(self, input, char=None):
task = Task()
task.task_type = self.TASK_DATA_SPLIT
task.inputs = (input,)
task.policy = POLICY_SIMPLE
return self.plan.add(task)
def task_run(self,
args,
inputs=(),
outputs=(None,),
stdin=None,
request=cpu1):
if isinstance(args, str):
args = args.split()
if stdin is not None:
inputs = ((stdin, None),) + tuple(inputs)
task = Task()
task.task_type = self.TASK_RUN
task.inputs = tuple(i for i, fname in inputs)
msg = loomrun.Run()
msg.args.extend(args)
msg.map_inputs.extend(fname if fname else "+in"
for i, fname in inputs)
msg.map_outputs.extend(fname if fname else "+out"
for fname in outputs)
task.config = msg.SerializeToString()
task.resource_request = request
return self.plan.add(task)
def task_array_make(self, inputs):
task = Task()
task.task_type = self.TASK_ARRAY_MAKE
task.inputs = inputs
task.policy = POLICY_SIMPLE
return self.plan.add(task)
def task_size(self, input):
task = Task()
task.task_type = self.TASK_BASE_SIZE
task.inputs = (input,)
task.policy = POLICY_SIMPLE
return self.plan.add(task)
def task_length(self, input):
task = Task()
task.task_type = self.TASK_BASE_LENGTH
task.inputs = (input,)
task.policy = POLICY_SIMPLE
return self.plan.add(task)
def task_get(self, input, index):
task = Task()
task.task_type = self.TASK_BASE_GET
task.inputs = (input,)
task.config = self.u64.pack(index)
task.policy = POLICY_SIMPLE
return self.plan.add(task)
def task_slice(self, input, start, end):
task = Task()
task.task_type = self.TASK_BASE_SLICE
task.inputs = (input,)
task.config = self.u64u64.pack(start, end)
task.policy = POLICY_SIMPLE
return self.plan.add(task)
from ..pb import loomplan_pb2 as loomplan
POLICY_STANDARD = loomplan.Task.POLICY_STANDARD
POLICY_SIMPLE = loomplan.Task.POLICY_SIMPLE
POLICY_SCHEDULER = loomplan.Task.POLICY_SCHEDULER
class Task(object):
task_type = None
inputs = ()
config = ""
policy = POLICY_STANDARD
resource_request = None
label = None
def __repr__(self):
if self.label:
label = self.label
else:
label = self.task_type
return "<Task '{}'>".format(label)
class ResourceRequest(object):
def __init__(self):
self.resources = {}
def add_resource(self, name, value):
self.resources[name] = value
@property
def names(self):
return self.resources.keys()
def set_message(self, msg, symbols):
for name, value in self.resources.items():
r = msg.resources.add()
r.resource_type = symbols[name]
r.value = value
def __eq__(self, other):
if not isinstance(other, ResourceRequest):
return False
return self.resources == other.resources
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(tuple(sorted(self.resources.items())))
from .task import Task, ResourceRequest
from .task import POLICY_SCHEDULER, POLICY_SIMPLE
import struct
from ..pb import loomrun_pb2 as loomrun
def cpus(value):
r = ResourceRequest()
r.add_resource("loom/resource/cpus", value)
return r
cpu1 = cpus(1)
BASE_GET = "loom/base/get"
BASE_SLICE = "loom/base/slice"
BASE_SIZE = "loom/base/size"
BASE_LENGTH = "loom/base/length"
DATA_CONST = "loom/data/const"
DATA_MERGE = "loom/data/merge"
DATA_OPEN = "loom/data/open"
DATA_SPLIT = "loom/data/split"
ARRAY_MAKE = "loom/array/make"
RUN = "loom/run/run"
SCHEDULER_DSLICE = "loom/scheduler/dslice"
SCHEDULER_DGET = "loom/scheduler/dget"
u64 = struct.Struct("<Q")
u64u64 = struct.Struct("<QQ")
def dslice(input):
task = Task()
task.task_type = SCHEDULER_DSLICE
task.policy = POLICY_SCHEDULER
task.inputs = (input,)
return task
def dget(input):
task = Task()
task.task_type = SCHEDULER_DGET
task.policy = POLICY_SCHEDULER
task.inputs = (input,)
return task
def const(data):
task = Task()
task.task_type = DATA_CONST
task.config = data
task.policy = POLICY_SIMPLE
return task
def merge(inputs, delimiter=""):
assert isinstance(delimiter, str)
task = Task()
task.task_type = DATA_MERGE
task.inputs = inputs
task.policy = POLICY_SIMPLE
if delimiter:
task.config = delimiter
return task
def open(filename):
task = Task()
task.task_type = DATA_OPEN
task.config = filename
task.policy = POLICY_SIMPLE
return task
def split(input, char=None):
task = Task()
task.task_type = DATA_SPLIT
task.inputs = (input,)
task.policy = POLICY_SIMPLE
return task
def run(args,
inputs=(),
outputs=(None,),
stdin=None,
request=cpu1):
if isinstance(args, str):
args = args.split()
if stdin is not None:
inputs = ((stdin, None),) + tuple(inputs)
task = Task()
task.task_type = RUN
task.inputs = tuple(i for i, fname in inputs)
msg = loomrun.Run()
msg.args.extend(args)
msg.map_inputs.extend(fname if fname else "+in"
for i, fname in inputs)
msg.map_outputs.extend(fname if fname else "+out"
for fname in outputs)
task.config = msg.SerializeToString()
task.resource_request = request
return task
def array_make(inputs):
task = Task()
task.task_type = ARRAY_MAKE
task.inputs = inputs
task.policy = POLICY_SIMPLE
return task
def size(input):
task = Task()
task.task_type = BASE_SIZE
task.inputs = (input,)
task.policy = POLICY_SIMPLE
return task
def length(input):
task = Task()
task.task_type = BASE_LENGTH
task.inputs = (input,)
task.policy = POLICY_SIMPLE
return task
def get(input, index):
task = Task()
task.task_type = BASE_GET
task.inputs = (input,)
task.config = u64.pack(index)
task.policy = POLICY_SIMPLE
return task
def slice(input, start, end):
task = Task()
task.task_type = BASE_SLICE
task.inputs = (input,)
task.config = u64u64.pack(start, end)
task.policy = POLICY_SIMPLE
return task
from loomenv import loom_env, LOOM_TESTPROG, LOOM_TEST_DATA_DIR # noqa
import loom.client.tasks as tasks # noqa
loom_env # silence flake8
......@@ -6,18 +7,17 @@ loom_env # silence flake8
def test_make_array(loom_env):
loom_env.start(1)
p = loom_env.plan_builder()
a = p.task_const("ABC")
b = p.task_const("123456")
c = p.task_const("")
d = p.task_array_make((a, b, c))
a = tasks.const("ABC")
b = tasks.const("123456")
c = tasks.const("")
d = tasks.array_make((a, b, c))
e0 = p.task_get(d, 0)
e1 = p.task_get(d, 1)
e2 = p.task_get(d, 2)
e0 = tasks.get(d, 0)
e1 = tasks.get(d, 1)
e2 = tasks.get(d, 2)
result_d, result_e0, result_e1, result_e2 = \
loom_env.submit(p, (d, e0, e1, e2))
loom_env.submit((d, e0, e1, e2))
assert result_d == ["ABC", "123456", ""]
assert result_e0 == "ABC"
assert result_e1 == "123456"
......@@ -27,18 +27,17 @@ def test_make_array(loom_env):
def test_slice_array(loom_env):
loom_env.start(1)
p = loom_env.plan_builder()
items = [p.task_const(str(i)) for i in xrange(20)]
a = p.task_array_make(items)
items = [tasks.const(str(i)) for i in xrange(20)]
a = tasks.array_make(items)
e0 = p.task_slice(a, 0, 100)
e1 = p.task_slice(a, 50, 100)
e2 = p.task_slice(a, 2, 4)
e3 = p.task_slice(a, 1, 0)
e4 = p.task_slice(a, 4, 100)
e0 = tasks.slice(a, 0, 100)
e1 = tasks.slice(a, 50, 100)
e2 = tasks.slice(a, 2, 4)
e3 = tasks.slice(a, 1, 0)
e4 = tasks.slice(a, 4, 100)
r0, r1, r2, r3, r4 = \
loom_env.submit(p, (e0, e1, e2, e3, e4))
loom_env.submit((e0, e1, e2, e3, e4))
assert r0 == list(map(str, range(20)))
assert r1 == []
assert r2 == ['2', '3']
......@@ -47,34 +46,30 @@ def test_slice_array(loom_env):
def test_make_empty_array(loom_env):
p = loom_env.plan_builder()
a = p.task_array_make(())
a = tasks.array_make(())
loom_env.start(1)
result_a = loom_env.submit(p, a)
result_a = loom_env.submit(a)
assert result_a == []
def test_array_of_array(loom_env):
p = loom_env.plan_builder()
a = tasks.const("ABC")
b = tasks.const("123")
c = tasks.const("+++")
a = p.task_const("ABC")
b = p.task_const("123")
c = p.task_const("+++")
d = p.task_array_make((a, b))
e = p.task_array_make((d, c))
f = p.task_get(e, 0)
d = tasks.array_make((a, b))
e = tasks.array_make((d, c))
f = tasks.get(e, 0)
loom_env.start(1)
result_d, result_e, result_f = loom_env.submit(p, (d, e, f))
result_d, result_e, result_f = loom_env.submit((d, e, f))
assert result_d == ["ABC", "123"]
assert result_e == [["ABC", "123"], "+++"]
assert result_f == ["ABC", "123"]
def test_array_same_value(loom_env):
p = loom_env.plan_builder()
a = p.task_const("ABC")
b = p.task_array_make((a, a, a, a))
a = tasks.const("ABC")
b = tasks.array_make((a, a, a, a))
loom_env.start(1)
assert ["ABC"] * 4 == loom_env.submit(p, b)
assert ["ABC"] * 4 == loom_env.submit(b)
from loomenv import loom_env, LOOM_TESTPROG, LOOM_TEST_DATA_DIR # noqa
import loom.client.tasks as tasks # noqa
import os
......@@ -14,34 +15,33 @@ def test_cv_iris(loom_env):
loom_env.start(4, 4)
loom_env.info = False
p = loom_env.plan_builder()
data = p.task_open(IRIS_DATA)
data = p.task_run(("sort", "--random-sort", "-"), [(data, None)])
lines = p.task_split(data)
data = tasks.open(IRIS_DATA)
data = tasks.run(("sort", "--random-sort", "-"), [(data, None)])
lines = tasks.split(data)
chunks = [p.task_slice(lines, i * CHUNK_SIZE, (i + 1) * CHUNK_SIZE)
chunks = [tasks.slice(lines, i * CHUNK_SIZE, (i + 1) * CHUNK_SIZE)
for i in xrange(CHUNKS)]
trainsets = [p.task_merge(chunks[:i] + chunks[i + 1:])
trainsets = [tasks.merge(chunks[:i] + chunks[i + 1:])
for i in xrange(CHUNKS)]
models = []
for ts in trainsets:
model = p.task_run("svm-train data",
model = tasks.run("svm-train data",
[(ts, "data")], ["data.model"])
model.label = "svm-train"
models.append(model)
predict = []
for chunk, model in zip(chunks, models):
task = p.task_run("svm-predict testdata model out",
task = tasks.run("svm-predict testdata model out",
[(chunk, "testdata"), (model, "model")])
task.label = "svm-predict"
predict.append(task)
loom_env.make_dry_report(p.plan, "dry.report")
loom_env.make_dry_report(predict, "dry.report")
results = loom_env.submit(p, predict, report="cv.report")
results = loom_env.submit(predict, report="cv.report")
assert len(results) == CHUNKS
for line in results:
......