Skip to content
Snippets Groups Projects
plan.py 5.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • Ada Böhm's avatar
    Ada Böhm committed
    
    import loomplan_pb2
    import loomrun_pb2
    
    Ada Böhm's avatar
    Ada Böhm committed
    import gv
    
    
    Ada Böhm's avatar
    Ada Böhm committed
    import struct
    
    Ada Böhm's avatar
    Ada Böhm committed
    
    
    
    POLICY_STANDARD = loomplan_pb2.Task.POLICY_STANDARD
    POLICY_SIMPLE = loomplan_pb2.Task.POLICY_SIMPLE
    POLICY_SCHEDULER = loomplan_pb2.Task.POLICY_SCHEDULER
    
    Ada Böhm's avatar
    Ada Böhm committed
    
    
    
    Ada Böhm's avatar
    Ada Böhm committed
    class Task(object):
    
        inputs = ()
        id = None
        config = ""
    
        policy = POLICY_STANDARD
    
    Ada Böhm's avatar
    Ada Böhm committed
    
    
        def set_message(self, msg, symbols):
    
    Ada Böhm's avatar
    Ada Böhm committed
            msg.config = self.config
    
            msg.task_type = symbols[self.task_type]
    
    Ada Böhm's avatar
    Ada Böhm committed
            msg.input_ids.extend(t.id for t in self.inputs)
    
            msg.policy = self.policy
    
    Ada Böhm's avatar
    Ada Böhm committed
    
    
    class Plan(object):
    
    
        TASK_BASE_GET = "loom/base/get"
        TASK_BASE_SLICE = "loom/base/slice"
    
        TASK_BASE_SIZE = "loom/base/size"
        TASK_BASE_LENGTH = "loom/base/length"
    
    Ada Böhm's avatar
    Ada Böhm committed
    
    
        TASK_DATA_CONST = "loom/data/const"
        TASK_DATA_MERGE = "loom/data/merge"
        TASK_DATA_OPEN = "loom/data/open"
        TASK_DATA_SPLIT = "loom/data/split"
    
    Ada Böhm's avatar
    Ada Böhm committed
    
    
        TASK_ARRAY_MAKE = "loom/array/make"
    
    Ada Böhm's avatar
    Ada Böhm committed
    
    
        TASK_RUN = "loom/run/run"
    
        TASK_SCHEDULER_DSLICE = "loom/scheduler/dslice"
    
    Ada Böhm's avatar
    Ada Böhm committed
        TASK_SCHEDULER_DGET = "loom/scheduler/dget"
    
    Ada Böhm's avatar
    Ada Böhm committed
    
    
    Ada Böhm's avatar
    Ada Böhm committed
        u64 = struct.Struct("<Q")
    
    Ada Böhm's avatar
    Ada Böhm committed
        u64u64 = struct.Struct("<QQ")
    
    Ada Böhm's avatar
    Ada Böhm committed
    
    
    Ada Böhm's avatar
    Ada Böhm committed
        def __init__(self):
            self.tasks = []
            self.task_types = set()
    
        def add(self, task):
            assert task.id is None
            task.id = len(self.tasks)
            self.tasks.append(task)
            self.task_types.add(task.task_type)
            return task
    
    
    Ada Böhm's avatar
    Ada Böhm committed
        def task_dslice(self, input):
            task = Task()
            task.task_type = self.TASK_SCHEDULER_DSLICE
    
            task.policy = POLICY_SCHEDULER
    
    Ada Böhm's avatar
    Ada Böhm committed
            task.inputs = (input,)
            return self.add(task)
    
    
    Ada Böhm's avatar
    Ada Böhm committed
        def task_dget(self, input):
            task = Task()
            task.task_type = self.TASK_SCHEDULER_DGET
    
            task.policy = POLICY_SCHEDULER
    
    Ada Böhm's avatar
    Ada Böhm committed
            task.inputs = (input,)
            return self.add(task)
    
    
    Ada Böhm's avatar
    Ada Böhm committed
        def task_const(self, data):
    
    Ada Böhm's avatar
    Ada Böhm committed
            task = Task()
            task.task_type = self.TASK_DATA_CONST
            task.config = data
    
            task.policy = POLICY_SIMPLE
    
    Ada Böhm's avatar
    Ada Böhm committed
            return self.add(task)
    
    Ada Böhm's avatar
    Ada Böhm committed
        def task_merge(self, inputs):
    
    Ada Böhm's avatar
    Ada Böhm committed
            task = Task()
            task.task_type = self.TASK_DATA_MERGE
            task.inputs = inputs
    
            task.policy = POLICY_SIMPLE
    
    Ada Böhm's avatar
    Ada Böhm committed
            return self.add(task)
    
        def task_open(self, filename):
            task = Task()
            task.task_type = self.TASK_DATA_OPEN
            task.config = filename
    
            task.policy = POLICY_SIMPLE
    
    Ada Böhm's avatar
    Ada Böhm committed
            return self.add(task)
    
    Ada Böhm's avatar
    Ada Böhm committed
    
    
    Ada Böhm's avatar
    Ada Böhm committed
        def task_split(self, input, char=None):
    
    Ada Böhm's avatar
    Ada Böhm committed
            task = Task()
    
    Ada Böhm's avatar
    Ada Böhm committed
            task.task_type = self.TASK_DATA_SPLIT
    
    Ada Böhm's avatar
    Ada Böhm committed
            task.inputs = (input,)
    
            task.policy = POLICY_SIMPLE
    
    Ada Böhm's avatar
    Ada Böhm committed
            return self.add(task)
    
    Ada Böhm's avatar
    Ada Böhm committed
    
    
    Ada Böhm's avatar
    Ada Böhm committed
        def task_run(self, args, inputs=(), outputs=(None,), stdin=None):
    
            if isinstance(args, str):
                args = args.split()
    
    
    Ada Böhm's avatar
    Ada Böhm committed
            if stdin is not None:
                inputs = ((stdin, None),) + tuple(inputs)
    
    
            task = Task()
            task.task_type = self.TASK_RUN
            task.inputs = tuple(i for i, fname in inputs)
    
            msg = loomrun_pb2.Run()
            msg.args.extend(args)
    
            msg.map_inputs.extend(fname if fname else "+in"
                                  for i, fname in inputs)
            msg.map_outputs.extend(fname if fname else "+out"
                                   for fname in outputs)
            task.config = msg.SerializeToString()
            return self.add(task)
    
    Ada Böhm's avatar
    Ada Böhm committed
    
    
    Ada Böhm's avatar
    Ada Böhm committed
        def task_array_make(self, inputs):
            task = Task()
            task.task_type = self.TASK_ARRAY_MAKE
            task.inputs = inputs
    
            task.policy = POLICY_SIMPLE
    
    Ada Böhm's avatar
    Ada Böhm committed
            return self.add(task)
    
    
        def task_size(self, input):
            task = Task()
            task.task_type = self.TASK_BASE_SIZE
            task.inputs = (input,)
    
            task.policy = POLICY_SIMPLE
    
            return self.add(task)
    
        def task_length(self, input):
            task = Task()
            task.task_type = self.TASK_BASE_LENGTH
            task.inputs = (input,)
    
            task.policy = POLICY_SIMPLE
    
    Ada Böhm's avatar
    Ada Böhm committed
        def task_get(self, input, index):
    
    Ada Böhm's avatar
    Ada Böhm committed
            task = Task()
    
    Ada Böhm's avatar
    Ada Böhm committed
            task.task_type = self.TASK_BASE_GET
    
    Ada Böhm's avatar
    Ada Böhm committed
            task.inputs = (input,)
            task.config = self.u64.pack(index)
    
            task.policy = POLICY_SIMPLE
    
    Ada Böhm's avatar
    Ada Böhm committed
            return self.add(task)
    
    Ada Böhm's avatar
    Ada Böhm committed
    
    
    Ada Böhm's avatar
    Ada Böhm committed
        def task_slice(self, input, start, end):
            task = Task()
            task.task_type = self.TASK_BASE_SLICE
            task.inputs = (input,)
            task.config = self.u64u64.pack(start, end)
    
            task.policy = POLICY_SIMPLE
    
    Ada Böhm's avatar
    Ada Böhm committed
            return self.add(task)
    
    
        def create_message(self, symbols):
    
    Ada Böhm's avatar
    Ada Böhm committed
            msg = loomplan_pb2.Plan()
            for task in self.tasks:
                t = msg.tasks.add()
    
                task.set_message(t, symbols)
    
    Ada Böhm's avatar
    Ada Böhm committed
            return msg
    
    Ada Böhm's avatar
    Ada Böhm committed
        def write_dot(self, filename, info=None):
            colors = ["red", "green", "blue", "orange", "violet"]
            if info:
                w = sorted(set(worker for id, worker in info))
                workers = {}
                for id, worker in info:
                    workers[id] = w.index(worker)
                del w
            else:
                workers = None
    
    Ada Böhm's avatar
    Ada Böhm committed
            graph = gv.Graph()
            for task in self.tasks:
                node = graph.node(task.id)
    
    Ada Böhm's avatar
    Ada Böhm committed
                if workers:
                    node.color = colors[workers[task.id] % len(colors)]
    
    Ada Böhm's avatar
    Ada Böhm committed
                node.label = "{}\n{}".format(str(task.id), task.task_type)
                for inp in task.inputs:
                    graph.node(inp.id).add_arc(node)
            graph.write(filename)