Newer
Older
POLICY_STANDARD = loomplan_pb2.Task.POLICY_STANDARD
POLICY_SIMPLE = loomplan_pb2.Task.POLICY_SIMPLE
POLICY_SCHEDULER = loomplan_pb2.Task.POLICY_SCHEDULER
class Task(object):
inputs = ()
id = None
config = ""
msg.task_type = symbols[self.task_type]
TASK_BASE_GET = "loom/base/get"
TASK_BASE_SLICE = "loom/base/slice"
TASK_BASE_SIZE = "loom/base/size"
TASK_BASE_LENGTH = "loom/base/length"
TASK_DATA_CONST = "loom/data/const"
TASK_DATA_MERGE = "loom/data/merge"
TASK_DATA_OPEN = "loom/data/open"
TASK_DATA_SPLIT = "loom/data/split"
TASK_SCHEDULER_DSLICE = "loom/scheduler/dslice"
def __init__(self):
self.tasks = []
self.task_types = set()
def add(self, task):
assert task.id is None
task.id = len(self.tasks)
self.tasks.append(task)
self.task_types.add(task.task_type)
return task
def task_dslice(self, input):
task = Task()
task.task_type = self.TASK_SCHEDULER_DSLICE
def task_dget(self, input):
task = Task()
task.task_type = self.TASK_SCHEDULER_DGET
task = Task()
task.task_type = self.TASK_DATA_CONST
task.config = data
task = Task()
task.task_type = self.TASK_DATA_MERGE
task.inputs = inputs
return self.add(task)
def task_open(self, filename):
task = Task()
task.task_type = self.TASK_DATA_OPEN
task.config = filename
def task_run(self, args, inputs=(), outputs=(None,), stdin=None):
if isinstance(args, str):
args = args.split()
if stdin is not None:
inputs = ((stdin, None),) + tuple(inputs)
task = Task()
task.task_type = self.TASK_RUN
task.inputs = tuple(i for i, fname in inputs)
msg = loomrun_pb2.Run()
msg.args.extend(args)
msg.map_inputs.extend(fname if fname else "+in"
for i, fname in inputs)
msg.map_outputs.extend(fname if fname else "+out"
for fname in outputs)
task.config = msg.SerializeToString()
return self.add(task)
def task_array_make(self, inputs):
task = Task()
task.task_type = self.TASK_ARRAY_MAKE
task.inputs = inputs
def task_size(self, input):
task = Task()
task.task_type = self.TASK_BASE_SIZE
task.inputs = (input,)
return self.add(task)
def task_length(self, input):
task = Task()
task.task_type = self.TASK_BASE_LENGTH
task.inputs = (input,)
return self.add(task)
task.inputs = (input,)
task.config = self.u64.pack(index)
def task_slice(self, input, start, end):
task = Task()
task.task_type = self.TASK_BASE_SLICE
task.inputs = (input,)
task.config = self.u64u64.pack(start, end)
msg = loomplan_pb2.Plan()
for task in self.tasks:
t = msg.tasks.add()
def write_dot(self, filename, info=None):
colors = ["red", "green", "blue", "orange", "violet"]
if info:
w = sorted(set(worker for id, worker in info))
workers = {}
for id, worker in info:
workers[id] = w.index(worker)
del w
else:
workers = None
graph = gv.Graph()
for task in self.tasks:
node = graph.node(task.id)
if workers:
node.color = colors[workers[task.id] % len(colors)]