tasks.py 3.13 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156

from .task import Task, ResourceRequest
from .task import POLICY_SCHEDULER, POLICY_SIMPLE

import struct
from ..pb import loomrun_pb2 as loomrun


def cpus(value):
    r = ResourceRequest()
    r.add_resource("loom/resource/cpus", value)
    return r

cpu1 = cpus(1)


BASE_GET = "loom/base/get"
BASE_SLICE = "loom/base/slice"
BASE_SIZE = "loom/base/size"
BASE_LENGTH = "loom/base/length"

DATA_CONST = "loom/data/const"
DATA_MERGE = "loom/data/merge"
DATA_OPEN = "loom/data/open"
DATA_SPLIT = "loom/data/split"

ARRAY_MAKE = "loom/array/make"

RUN = "loom/run/run"

SCHEDULER_DSLICE = "loom/scheduler/dslice"
SCHEDULER_DGET = "loom/scheduler/dget"

u64 = struct.Struct("<Q")
u64u64 = struct.Struct("<QQ")


def dslice(input):
    task = Task()
    task.task_type = SCHEDULER_DSLICE
    task.policy = POLICY_SCHEDULER
    task.inputs = (input,)
    return task


def dget(input):
    task = Task()
    task.task_type = SCHEDULER_DGET
    task.policy = POLICY_SCHEDULER
    task.inputs = (input,)
    return task


def const(data):
    task = Task()
    task.task_type = DATA_CONST
    task.config = data
    task.policy = POLICY_SIMPLE
    return task


def merge(inputs, delimiter=""):
    assert isinstance(delimiter, str)
    task = Task()
    task.task_type = DATA_MERGE
    task.inputs = inputs
    task.policy = POLICY_SIMPLE
    if delimiter:
        task.config = delimiter
    return task


def open(filename):
    task = Task()
    task.task_type = DATA_OPEN
    task.config = filename
    task.policy = POLICY_SIMPLE
    return task


def split(input, char=None):
    task = Task()
    task.task_type = DATA_SPLIT
    task.inputs = (input,)
    task.policy = POLICY_SIMPLE
    return task


def run(args,
        inputs=(),
        outputs=(None,),
        stdin=None,
        request=cpu1):

    if isinstance(args, str):
        args = args.split()

    if stdin is not None:
        inputs = ((stdin, None),) + tuple(inputs)

    task = Task()
    task.task_type = RUN
    task.inputs = tuple(i for i, fname in inputs)

    msg = loomrun.Run()
    msg.args.extend(args)

    msg.map_inputs.extend(fname if fname else "+in"
                          for i, fname in inputs)
    msg.map_outputs.extend(fname if fname else "+out"
                           for fname in outputs)
    task.config = msg.SerializeToString()
    task.resource_request = request
    return task


def array_make(inputs):
    task = Task()
    task.task_type = ARRAY_MAKE
    task.inputs = inputs
    task.policy = POLICY_SIMPLE
    return task


def size(input):
    task = Task()
    task.task_type = BASE_SIZE
    task.inputs = (input,)
    task.policy = POLICY_SIMPLE
    return task


def length(input):
    task = Task()
    task.task_type = BASE_LENGTH
    task.inputs = (input,)
    task.policy = POLICY_SIMPLE
    return task


def get(input, index):
    task = Task()
    task.task_type = BASE_GET
    task.inputs = (input,)
    task.config = u64.pack(index)
    task.policy = POLICY_SIMPLE
    return task


def slice(input, start, end):
    task = Task()
    task.task_type = BASE_SLICE
    task.inputs = (input,)
    task.config = u64u64.pack(start, end)
    task.policy = POLICY_SIMPLE
    return task