plan.py 5.14 KB
Newer Older
Stanislav Bohm's avatar
Stanislav Bohm committed
1 2 3

import loomplan_pb2
import loomrun_pb2
Stanislav Bohm's avatar
Stanislav Bohm committed
4 5
import gv

Stanislav Bohm's avatar
Stanislav Bohm committed
6
import struct
Stanislav Bohm's avatar
Stanislav Bohm committed
7 8


Stanislav Bohm's avatar
Stanislav Bohm committed
9 10 11 12 13
MODE_STANDARD = loomplan_pb2.Task.MODE_STANDARD
MODE_SIMPLE = loomplan_pb2.Task.MODE_SIMPLE
MODE_SCHEDULER = loomplan_pb2.Task.MODE_SCHEDULER


Stanislav Bohm's avatar
Stanislav Bohm committed
14 15 16 17 18
class Task(object):

    inputs = ()
    id = None
    config = ""
Stanislav Bohm's avatar
Stanislav Bohm committed
19
    mode = MODE_STANDARD
Stanislav Bohm's avatar
Stanislav Bohm committed
20

21
    def set_message(self, msg, symbols):
Stanislav Bohm's avatar
Stanislav Bohm committed
22
        msg.config = self.config
23
        msg.task_type = symbols[self.task_type]
Stanislav Bohm's avatar
Stanislav Bohm committed
24
        msg.input_ids.extend(t.id for t in self.inputs)
Stanislav Bohm's avatar
Stanislav Bohm committed
25
        msg.mode = self.mode
Stanislav Bohm's avatar
Stanislav Bohm committed
26 27 28 29


class Plan(object):

30 31
    TASK_BASE_GET = "loom/base/get"
    TASK_BASE_SLICE = "loom/base/slice"
32 33
    TASK_BASE_SIZE = "loom/base/size"
    TASK_BASE_LENGTH = "loom/base/length"
Stanislav Bohm's avatar
Stanislav Bohm committed
34

35 36 37 38
    TASK_DATA_CONST = "loom/data/const"
    TASK_DATA_MERGE = "loom/data/merge"
    TASK_DATA_OPEN = "loom/data/open"
    TASK_DATA_SPLIT = "loom/data/split"
Stanislav Bohm's avatar
Stanislav Bohm committed
39

40
    TASK_ARRAY_MAKE = "loom/array/make"
Stanislav Bohm's avatar
Stanislav Bohm committed
41

42
    TASK_RUN = "loom/run/run"
43

44
    TASK_SCHEDULER_DSLICE = "loom/scheduler/dslice"
Stanislav Bohm's avatar
Stanislav Bohm committed
45
    TASK_SCHEDULER_DGET = "loom/scheduler/dget"
Stanislav Bohm's avatar
Stanislav Bohm committed
46

Stanislav Bohm's avatar
Stanislav Bohm committed
47
    u64 = struct.Struct("<Q")
Stanislav Bohm's avatar
Stanislav Bohm committed
48
    u64u64 = struct.Struct("<QQ")
Stanislav Bohm's avatar
Stanislav Bohm committed
49

Stanislav Bohm's avatar
Stanislav Bohm committed
50 51 52 53 54 55 56 57 58 59 60
    def __init__(self):
        self.tasks = []
        self.task_types = set()

    def add(self, task):
        assert task.id is None
        task.id = len(self.tasks)
        self.tasks.append(task)
        self.task_types.add(task.task_type)
        return task

Stanislav Bohm's avatar
Stanislav Bohm committed
61 62 63
    def task_dslice(self, input):
        task = Task()
        task.task_type = self.TASK_SCHEDULER_DSLICE
Stanislav Bohm's avatar
Stanislav Bohm committed
64
        task.mode = MODE_SCHEDULER
Stanislav Bohm's avatar
Stanislav Bohm committed
65 66 67
        task.inputs = (input,)
        return self.add(task)

Stanislav Bohm's avatar
Stanislav Bohm committed
68 69 70 71 72 73 74
    def task_dget(self, input):
        task = Task()
        task.task_type = self.TASK_SCHEDULER_DGET
        task.mode = MODE_SCHEDULER
        task.inputs = (input,)
        return self.add(task)

Stanislav Bohm's avatar
Stanislav Bohm committed
75
    def task_const(self, data):
Stanislav Bohm's avatar
Stanislav Bohm committed
76 77 78
        task = Task()
        task.task_type = self.TASK_DATA_CONST
        task.config = data
Stanislav Bohm's avatar
Stanislav Bohm committed
79
        task.mode = MODE_SIMPLE
Stanislav Bohm's avatar
Stanislav Bohm committed
80
        return self.add(task)
81

Stanislav Bohm's avatar
Stanislav Bohm committed
82
    def task_merge(self, inputs):
Stanislav Bohm's avatar
Stanislav Bohm committed
83 84 85
        task = Task()
        task.task_type = self.TASK_DATA_MERGE
        task.inputs = inputs
Stanislav Bohm's avatar
Stanislav Bohm committed
86
        task.mode = MODE_SIMPLE
Stanislav Bohm's avatar
Stanislav Bohm committed
87 88 89 90 91 92
        return self.add(task)

    def task_open(self, filename):
        task = Task()
        task.task_type = self.TASK_DATA_OPEN
        task.config = filename
Stanislav Bohm's avatar
Stanislav Bohm committed
93
        task.mode = MODE_SIMPLE
Stanislav Bohm's avatar
Stanislav Bohm committed
94
        return self.add(task)
Stanislav Bohm's avatar
Stanislav Bohm committed
95

Stanislav Bohm's avatar
Stanislav Bohm committed
96
    def task_split(self, input, char=None):
Stanislav Bohm's avatar
Stanislav Bohm committed
97
        task = Task()
Stanislav Bohm's avatar
Stanislav Bohm committed
98
        task.task_type = self.TASK_DATA_SPLIT
Stanislav Bohm's avatar
Stanislav Bohm committed
99
        task.inputs = (input,)
Stanislav Bohm's avatar
Stanislav Bohm committed
100
        task.mode = MODE_SIMPLE
Stanislav Bohm's avatar
Stanislav Bohm committed
101
        return self.add(task)
Stanislav Bohm's avatar
Stanislav Bohm committed
102

Stanislav Bohm's avatar
Stanislav Bohm committed
103
    def task_run(self, args, inputs=(), outputs=(None,), stdin=None):
104 105 106
        if isinstance(args, str):
            args = args.split()

Stanislav Bohm's avatar
Stanislav Bohm committed
107 108 109
        if stdin is not None:
            inputs = ((stdin, None),) + tuple(inputs)

110 111 112 113 114 115 116 117 118 119 120 121 122
        task = Task()
        task.task_type = self.TASK_RUN
        task.inputs = tuple(i for i, fname in inputs)

        msg = loomrun_pb2.Run()
        msg.args.extend(args)

        msg.map_inputs.extend(fname if fname else "+in"
                              for i, fname in inputs)
        msg.map_outputs.extend(fname if fname else "+out"
                               for fname in outputs)
        task.config = msg.SerializeToString()
        return self.add(task)
Stanislav Bohm's avatar
Stanislav Bohm committed
123

Stanislav Bohm's avatar
Stanislav Bohm committed
124 125 126 127
    def task_array_make(self, inputs):
        task = Task()
        task.task_type = self.TASK_ARRAY_MAKE
        task.inputs = inputs
Stanislav Bohm's avatar
Stanislav Bohm committed
128
        task.mode = MODE_SIMPLE
Stanislav Bohm's avatar
Stanislav Bohm committed
129 130
        return self.add(task)

131 132 133 134 135 136 137 138 139 140 141 142 143 144
    def task_size(self, input):
        task = Task()
        task.task_type = self.TASK_BASE_SIZE
        task.inputs = (input,)
        task.mode = MODE_SIMPLE
        return self.add(task)

    def task_length(self, input):
        task = Task()
        task.task_type = self.TASK_BASE_LENGTH
        task.inputs = (input,)
        task.mode = MODE_SIMPLE
        return self.add(task)

Stanislav Bohm's avatar
Stanislav Bohm committed
145
    def task_get(self, input, index):
Stanislav Bohm's avatar
Stanislav Bohm committed
146
        task = Task()
Stanislav Bohm's avatar
Stanislav Bohm committed
147
        task.task_type = self.TASK_BASE_GET
Stanislav Bohm's avatar
Stanislav Bohm committed
148 149
        task.inputs = (input,)
        task.config = self.u64.pack(index)
Stanislav Bohm's avatar
Stanislav Bohm committed
150
        task.mode = MODE_SIMPLE
Stanislav Bohm's avatar
Stanislav Bohm committed
151
        return self.add(task)
Stanislav Bohm's avatar
Stanislav Bohm committed
152

Stanislav Bohm's avatar
Stanislav Bohm committed
153 154 155 156 157
    def task_slice(self, input, start, end):
        task = Task()
        task.task_type = self.TASK_BASE_SLICE
        task.inputs = (input,)
        task.config = self.u64u64.pack(start, end)
Stanislav Bohm's avatar
Stanislav Bohm committed
158
        task.mode = MODE_SIMPLE
Stanislav Bohm's avatar
Stanislav Bohm committed
159 160
        return self.add(task)

161
    def create_message(self, symbols):
Stanislav Bohm's avatar
Stanislav Bohm committed
162 163 164
        msg = loomplan_pb2.Plan()
        for task in self.tasks:
            t = msg.tasks.add()
165
            task.set_message(t, symbols)
Stanislav Bohm's avatar
Stanislav Bohm committed
166
        return msg
Stanislav Bohm's avatar
Stanislav Bohm committed
167

168 169 170 171 172 173 174 175 176 177
    def write_dot(self, filename, info=None):
        colors = ["red", "green", "blue", "orange", "violet"]
        if info:
            w = sorted(set(worker for id, worker in info))
            workers = {}
            for id, worker in info:
                workers[id] = w.index(worker)
            del w
        else:
            workers = None
Stanislav Bohm's avatar
Stanislav Bohm committed
178 179 180
        graph = gv.Graph()
        for task in self.tasks:
            node = graph.node(task.id)
181 182
            if workers:
                node.color = colors[workers[task.id] % len(colors)]
Stanislav Bohm's avatar
Stanislav Bohm committed
183 184 185 186
            node.label = "{}\n{}".format(str(task.id), task.task_type)
            for inp in task.inputs:
                graph.node(inp.id).add_arc(node)
        graph.write(filename)