Commit 7caba4cf authored by Stanislav Bohm's avatar Stanislav Bohm

FIX: Fixed task distribution

parent d89ecb91
......@@ -16,6 +16,10 @@ using namespace loom;
DummyWorker::DummyWorker(Server &server)
: server(server), listen_port(-1)
{
uv_loop_t *loop = server.get_loop();
if (loop == nullptr) {
return;
}
UV_CHECK(uv_tcp_init(server.get_loop(), &listen_socket));
listen_socket.data = this;
}
......
......@@ -45,9 +45,20 @@ void TaskManager::add_plan(const loomplan::Plan &plan, bool distribute)
}
for (int j = 0; j < inputs_size; j++) {
auto &task = tasks[pt.input_ids(j)];
auto id = pt.input_ids(j);
auto &task = tasks[id];
t->add_input(task.get());
task->add_next(t.get());
// Put next if there is not such
int j2;
for (j2 = 0; j2 < j; j2++) {
if (id == pt.input_ids(j2)) {
break;
}
}
if (j2 == j) {
task->add_next(t.get());
}
}
}
......@@ -161,8 +172,10 @@ void TaskManager::distribute_work(TaskNode::Vector &tasks)
assert(input->get_owners().size() >= 1);
WorkerConnection *owner = input->get_owners()[0];
owner->send_data(input->get_id(), load.connection.get_address(), false);
input->add_owner(&load.connection);
}
}
llog->alert("X = {}", task->get_id());
load.connection.send_task(task);
}
}
......
......@@ -51,5 +51,6 @@ def test_array_same_value(loom_env):
p = loom_env.plan()
a = p.task_const("ABC")
b = p.task_array_make((a, a, a, a))
p.write_dot("test.dot")
loom_env.start(1)
assert ["ABC"] * 4 == loom_env.submit(p, b)
......@@ -10,68 +10,106 @@
typedef std::unordered_set<TaskNode*> TaskSet;
typedef
std::unordered_map<WorkerConnection*, TaskSet>
DistMap;
std::unordered_map<WorkerConnection*, TaskSet>
DistMap;
static DistMap
to_distmap(TaskManager::WorkDistribution dist)
{
DistMap map;
for (auto &load : dist) {
assert (map.find(&load.connection) == map.end());
auto &set = map[&load.connection];
for (auto tn : load.tasks) {
set.insert(tn);
}
}
return map;
DistMap map;
for (auto &load : dist) {
assert (map.find(&load.connection) == map.end());
auto &set = map[&load.connection];
for (auto tn : load.tasks) {
set.insert(tn);
}
}
return map;
}
static std::unique_ptr<WorkerConnection>
simple_worker(Server &server, const std::string &name, int cpus=1)
{
std::vector<std::string> tt;
return std::make_unique<WorkerConnection>(server, nullptr, name, tt, cpus);
std::vector<std::string> tt;
return std::make_unique<WorkerConnection>(server, nullptr, name, tt, cpus);
}
TEST_CASE( "Server scheduling - separate tasks", "[scheduling]" ) {
Server server(NULL, 0);
TaskManager &manager = server.get_task_manager();
Server server(NULL, 0);
TaskManager &manager = server.get_task_manager();
loomplan::Plan plan;
loomplan::Task *t;
plan.add_tasks();
plan.add_tasks();
manager.add_plan(plan, false);
loomplan::Plan plan;
loomplan::Task *t;
plan.add_tasks();
plan.add_tasks();
manager.add_plan(plan, false);
TaskNode::Vector v;
v.push_back(&manager.get_task(0));
v.push_back(&manager.get_task(1));
TaskNode::Vector v;
v.push_back(&manager.get_task(0));
v.push_back(&manager.get_task(1));
auto d = to_distmap(manager.compute_distribution(v));
CHECK(d.size() == 0);
auto d = to_distmap(manager.compute_distribution(v));
CHECK(d.size() == 0);
std::vector<std::string> tt;
std::vector<std::string> tt;
auto wconn = simple_worker(server, "w1");
WorkerConnection *w1 = wconn.get();
server.add_worker_connection(std::move(wconn));
auto wconn = simple_worker(server, "w1");
WorkerConnection *w1 = wconn.get();
server.add_worker_connection(std::move(wconn));
auto d2 = to_distmap(manager.compute_distribution(v));
CHECK(d2.size() == 1);
auto d2 = to_distmap(manager.compute_distribution(v));
CHECK(d2.size() == 1);
wconn = simple_worker(server, "w1");
WorkerConnection *w2 = wconn.get();
server.add_worker_connection(std::move(wconn));
CHECK(server.get_connections().size() == 2);
wconn = simple_worker(server, "w2");
WorkerConnection *w2 = wconn.get();
server.add_worker_connection(std::move(wconn));
CHECK(server.get_connections().size() == 2);
auto d3 = to_distmap(manager.compute_distribution(v));
CHECK(d3.size() == 2);
auto d3 = to_distmap(manager.compute_distribution(v));
CHECK(d3.size() == 2);
bool first = d3[w1] == TaskSet{v[0]} && d3[w2] == TaskSet{v[1]};
bool second = d3[w1] == TaskSet{v[1]} && d3[w2] == TaskSet{v[0]};
bool result = first | second;
CHECK(result);
bool first = d3[w1] == TaskSet{v[0]} && d3[w2] == TaskSet{v[1]};
bool second = d3[w1] == TaskSet{v[1]} && d3[w2] == TaskSet{v[0]};
bool result = first | second;
CHECK(result);
}
TEST_CASE( "Server scheduling - basic tasks", "[scheduling]" ) {
Server server(NULL, 0);
TaskManager &manager = server.get_task_manager();
auto wconn = simple_worker(server, "w1");
WorkerConnection *w1 = wconn.get();
server.add_worker_connection(std::move(wconn));
loomplan::Plan plan;
loomplan::Task *t;
plan.add_tasks();
plan.add_tasks();
t = plan.add_tasks();
t->add_input_ids(0);
t->add_input_ids(0);
t->add_input_ids(0);
t->add_input_ids(1);
t->add_input_ids(1);
manager.add_plan(plan, false);
SECTION("First task") {
TaskNode::Vector v;
v.push_back(&manager.get_task(0));
auto d2 = to_distmap(manager.compute_distribution(v));
CHECK(d2.size() == 1);
CHECK(d2[w1].size() == 1);
}
SECTION("Last task") {
TaskNode::Vector v;
v.push_back(&manager.get_task(2));
auto d2 = to_distmap(manager.compute_distribution(v));
CHECK(d2.size() == 1);
CHECK(d2[w1].size() == 1);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment