Commit e544f769 authored by Stanislav Bohm's avatar Stanislav Bohm

WIP

parent c7b9d15a
......@@ -12,6 +12,7 @@ message Task {
repeated int32 input_ids = 3;
optional int32 resource_request_index = 5 [default = -1];
optional bool result = 6;
optional string checkpoint_path = 7;
optional string label = 12;
optional bytes metadata = 13;
......
......@@ -297,6 +297,7 @@ class Client(object):
futures = self.futures
results = []
for task in tasks:
task.validate()
if not isinstance(task, Task):
raise Exception("{} is not a task".format(task))
plan.add(task)
......
import os.path
from .client import LoomError
class Task(object):
......@@ -7,6 +10,11 @@ class Task(object):
resource_request = None
label = None
metadata = None
checkpoint_path = None
def validate(self):
if self.checkpoint is not None and not os.path.isabs(self.checkpoint):
raise LoomError("Checkpoint has to be absolute path")
def __repr__(self):
if self.label:
......
......@@ -113,6 +113,7 @@ void TaskManager::on_task_finished(loom::base::Id id, size_t size, size_t length
remove_node(*input_node);
}
}
if (!node.get_nexts().empty()) {
for (TaskNode *nn : node.get_nexts()) {
if (nn->input_is_ready(&node)) {
......
......@@ -29,6 +29,7 @@ struct TaskDef
loom::base::Id task_type;
std::string config;
std::bitset<1> flags;
std::string checkpoint_path;
};
enum class TaskStatus {
......
from loomenv import loom_env, LOOM_TESTPROG, LOOM_TEST_DATA_DIR # noqa
import loom.client.tasks as tasks # noqa
from loom import client
import pytest
loom_env # silence flake8
def test_checkpoint_basic(loom_env):
t1 = tasks.const("abcd")
t2 = tasks.const("XYZ")
t3 = tasks.merge((t1, t2))
t3.checkpoint_path = "abc"
loom_env.submit_and_gather(t3)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment