plan.py 5.14 KB
Newer Older
Stanislav Bohm's avatar
Stanislav Bohm committed
1 2 3

import loomplan_pb2
import loomrun_pb2
Stanislav Bohm's avatar
Stanislav Bohm committed
4 5
import gv

Stanislav Bohm's avatar
Stanislav Bohm committed
6
import struct
Stanislav Bohm's avatar
Stanislav Bohm committed
7 8


Stanislav Bohm's avatar
Stanislav Bohm committed
9 10 11 12 13
MODE_STANDARD = loomplan_pb2.Task.MODE_STANDARD
MODE_SIMPLE = loomplan_pb2.Task.MODE_SIMPLE
MODE_SCHEDULER = loomplan_pb2.Task.MODE_SCHEDULER


Stanislav Bohm's avatar
Stanislav Bohm committed
14 15 16 17 18
class Task(object):

    inputs = ()
    id = None
    config = ""
Stanislav Bohm's avatar
Stanislav Bohm committed
19
    mode = MODE_STANDARD
Stanislav Bohm's avatar
Stanislav Bohm committed
20

21
    def set_message(self, msg, symbols):
Stanislav Bohm's avatar
Stanislav Bohm committed
22
        msg.config = self.config
23
        msg.task_type = symbols[self.task_type]
Stanislav Bohm's avatar
Stanislav Bohm committed
24
        msg.input_ids.extend(t.id for t in self.inputs)
Stanislav Bohm's avatar
Stanislav Bohm committed
25
        msg.mode = self.mode
Stanislav Bohm's avatar
Stanislav Bohm committed
26 27 28 29


class Plan(object):

30 31
    TASK_BASE_GET = "loom/base/get"
    TASK_BASE_SLICE = "loom/base/slice"
32 33
    TASK_BASE_SIZE = "loom/base/size"
    TASK_BASE_LENGTH = "loom/base/length"
Stanislav Bohm's avatar
Stanislav Bohm committed
34

35 36 37 38
    TASK_DATA_CONST = "loom/data/const"
    TASK_DATA_MERGE = "loom/data/merge"
    TASK_DATA_OPEN = "loom/data/open"
    TASK_DATA_SPLIT = "loom/data/split"
Stanislav Bohm's avatar
Stanislav Bohm committed
39

40
    TASK_ARRAY_MAKE = "loom/array/make"
Stanislav Bohm's avatar
Stanislav Bohm committed
41

42
    TASK_RUN = "loom/run/run"
43

44
    TASK_SCHEDULER_DSLICE = "loom/scheduler/dslice"
Stanislav Bohm's avatar
Stanislav Bohm committed
45
    TASK_SCHEDULER_DGET = "loom/scheduler/dget"
Stanislav Bohm's avatar
Stanislav Bohm committed
46

Stanislav Bohm's avatar
Stanislav Bohm committed
47
    u64 = struct.Struct("<Q")
Stanislav Bohm's avatar
Stanislav Bohm committed
48
    u64u64 = struct.Struct("<QQ")
Stanislav Bohm's avatar
Stanislav Bohm committed
49

Stanislav Bohm's avatar
Stanislav Bohm committed
50 51 52 53 54 55 56 57 58 59 60
    def __init__(self):
        self.tasks = []
        self.task_types = set()

    def add(self, task):
        assert task.id is None
        task.id = len(self.tasks)
        self.tasks.append(task)
        self.task_types.add(task.task_type)
        return task

Stanislav Bohm's avatar
Stanislav Bohm committed
61 62 63
    def task_dslice(self, input):
        task = Task()
        task.task_type = self.TASK_SCHEDULER_DSLICE
Stanislav Bohm's avatar
Stanislav Bohm committed
64
        task.mode = MODE_SCHEDULER
Stanislav Bohm's avatar
Stanislav Bohm committed
65 66 67
        task.inputs = (input,)
        return self.add(task)

Stanislav Bohm's avatar
Stanislav Bohm committed
68 69 70 71 72 73 74
    def task_dget(self, input):
        task = Task()
        task.task_type = self.TASK_SCHEDULER_DGET
        task.mode = MODE_SCHEDULER
        task.inputs = (input,)
        return self.add(task)

Stanislav Bohm's avatar
Stanislav Bohm committed
75
    def task_const(self, data):
Stanislav Bohm's avatar
Stanislav Bohm committed
76 77 78
        task = Task()
        task.task_type = self.TASK_DATA_CONST
        task.config = data
Stanislav Bohm's avatar
Stanislav Bohm committed
79
        task.mode = MODE_SIMPLE
Stanislav Bohm's avatar
Stanislav Bohm committed
80
        return self.add(task)
Stanislav Bohm's avatar
Stanislav Bohm committed
81

Stanislav Bohm's avatar
Stanislav Bohm committed
82
    def task_merge(self, inputs):
Stanislav Bohm's avatar
Stanislav Bohm committed
83 84 85
        task = Task()
        task.task_type = self.TASK_DATA_MERGE
        task.inputs = inputs
Stanislav Bohm's avatar
Stanislav Bohm committed
86
        task.mode = MODE_SIMPLE
Stanislav Bohm's avatar
Stanislav Bohm committed
87 88 89 90 91 92
        return self.add(task)

    def task_open(self, filename):
        task = Task()
        task.task_type = self.TASK_DATA_OPEN
        task.config = filename
Stanislav Bohm's avatar
Stanislav Bohm committed
93
        task.mode = MODE_SIMPLE
Stanislav Bohm's avatar
Stanislav Bohm committed
94
        return self.add(task)
Stanislav Bohm's avatar
Stanislav Bohm committed
95

Stanislav Bohm's avatar
Stanislav Bohm committed
96
    def task_split(self, input, char=None):
Stanislav Bohm's avatar
Stanislav Bohm committed
97
        task = Task()
Stanislav Bohm's avatar
Stanislav Bohm committed
98
        task.task_type = self.TASK_DATA_SPLIT
Stanislav Bohm's avatar
Stanislav Bohm committed
99
        task.inputs = (input,)
Stanislav Bohm's avatar
Stanislav Bohm committed
100
        task.mode = MODE_SIMPLE
Stanislav Bohm's avatar
Stanislav Bohm committed
101
        return self.add(task)
Stanislav Bohm's avatar
Stanislav Bohm committed
102

Stanislav Bohm's avatar
Stanislav Bohm committed
103
    def task_run(self, args, inputs=(), outputs=(None,), stdin=None):
104 105 106
        if isinstance(args, str):
            args = args.split()

Stanislav Bohm's avatar
Stanislav Bohm committed
107 108 109
        if stdin is not None:
            inputs = ((stdin, None),) + tuple(inputs)

110 111 112 113 114 115 116 117 118 119 120 121 122
        task = Task()
        task.task_type = self.TASK_RUN
        task.inputs = tuple(i for i, fname in inputs)

        msg = loomrun_pb2.Run()
        msg.args.extend(args)

        msg.map_inputs.extend(fname if fname else "+in"
                              for i, fname in inputs)
        msg.map_outputs.extend(fname if fname else "+out"
                               for fname in outputs)
        task.config = msg.SerializeToString()
        return self.add(task)
Stanislav Bohm's avatar
Stanislav Bohm committed
123

Stanislav Bohm's avatar
Stanislav Bohm committed
124 125 126 127
    def task_array_make(self, inputs):
        task = Task()
        task.task_type = self.TASK_ARRAY_MAKE
        task.inputs = inputs
Stanislav Bohm's avatar
Stanislav Bohm committed
128
        task.mode = MODE_SIMPLE
Stanislav Bohm's avatar
Stanislav Bohm committed
129 130
        return self.add(task)

131 132 133 134 135 136 137 138 139 140 141 142 143 144
    def task_size(self, input):
        task = Task()
        task.task_type = self.TASK_BASE_SIZE
        task.inputs = (input,)
        task.mode = MODE_SIMPLE
        return self.add(task)

    def task_length(self, input):
        task = Task()
        task.task_type = self.TASK_BASE_LENGTH
        task.inputs = (input,)
        task.mode = MODE_SIMPLE
        return self.add(task)

Stanislav Bohm's avatar
Stanislav Bohm committed
145
    def task_get(self, input, index):
Stanislav Bohm's avatar
Stanislav Bohm committed
146
        task = Task()
Stanislav Bohm's avatar
Stanislav Bohm committed
147
        task.task_type = self.TASK_BASE_GET
Stanislav Bohm's avatar
Stanislav Bohm committed
148 149
        task.inputs = (input,)
        task.config = self.u64.pack(index)
Stanislav Bohm's avatar
Stanislav Bohm committed
150
        task.mode = MODE_SIMPLE
Stanislav Bohm's avatar
Stanislav Bohm committed
151
        return self.add(task)
Stanislav Bohm's avatar
Stanislav Bohm committed
152

Stanislav Bohm's avatar
Stanislav Bohm committed
153 154 155 156 157
    def task_slice(self, input, start, end):
        task = Task()
        task.task_type = self.TASK_BASE_SLICE
        task.inputs = (input,)
        task.config = self.u64u64.pack(start, end)
Stanislav Bohm's avatar
Stanislav Bohm committed
158
        task.mode = MODE_SIMPLE
Stanislav Bohm's avatar
Stanislav Bohm committed
159 160
        return self.add(task)

161
    def create_message(self, symbols):
Stanislav Bohm's avatar
Stanislav Bohm committed
162 163 164
        msg = loomplan_pb2.Plan()
        for task in self.tasks:
            t = msg.tasks.add()
165
            task.set_message(t, symbols)
Stanislav Bohm's avatar
Stanislav Bohm committed
166
        return msg
Stanislav Bohm's avatar
Stanislav Bohm committed
167

Stanislav Bohm's avatar
Stanislav Bohm committed
168 169 170 171 172 173 174 175 176 177
    def write_dot(self, filename, info=None):
        colors = ["red", "green", "blue", "orange", "violet"]
        if info:
            w = sorted(set(worker for id, worker in info))
            workers = {}
            for id, worker in info:
                workers[id] = w.index(worker)
            del w
        else:
            workers = None
Stanislav Bohm's avatar
Stanislav Bohm committed
178 179 180
        graph = gv.Graph()
        for task in self.tasks:
            node = graph.node(task.id)
Stanislav Bohm's avatar
Stanislav Bohm committed
181 182
            if workers:
                node.color = colors[workers[task.id] % len(colors)]
Stanislav Bohm's avatar
Stanislav Bohm committed
183 184 185 186
            node.label = "{}\n{}".format(str(task.id), task.task_type)
            for inp in task.inputs:
                graph.node(inp.id).add_arc(node)
        graph.write(filename)