Commit 644b0f71 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: dget implemented

parent c2012daa
......@@ -22,6 +22,7 @@ class Task(object):
msg.config = self.config
msg.task_type = symbols[self.task_type]
msg.input_ids.extend(t.id for t in self.inputs)
msg.mode = self.mode
class Plan(object):
......@@ -39,6 +40,7 @@ class Plan(object):
TASK_RUN = "loom/run/run"
TASK_SCHEDULER_DSLICE = "loom/scheduler/dslice"
TASK_SCHEDULER_DGET = "loom/scheduler/dget"
u64 = struct.Struct("<Q")
u64u64 = struct.Struct("<QQ")
......@@ -61,6 +63,13 @@ class Plan(object):
task.inputs = (input,)
return self.add(task)
def task_dget(self, input):
task = Task()
task.task_type = self.TASK_SCHEDULER_DGET
task.mode = MODE_SCHEDULER
task.inputs = (input,)
return self.add(task)
def task_const(self, data):
task = Task()
task.task_type = self.TASK_DATA_CONST
......
......@@ -123,6 +123,15 @@ void Server::send_dictionary(Connection &connection)
connection.send_buffer(send_buffer);
}
int Server::get_worker_ncpus()
{
int count = 0;
for (auto &w : connections) {
count += w->get_resource_cpus();
}
return count;
}
void Server::start_listen()
{
struct sockaddr_in addr;
......
......@@ -67,6 +67,8 @@ public:
void send_dictionary(loom::Connection &connection);
int get_worker_ncpus();
private:
void start_listen();
......
......@@ -14,8 +14,11 @@ TaskManager::TaskManager(Server &server)
: server(server)
{
auto &dictionary = server.get_dictionary();
dslice_task_id = dictionary.find_or_create("loom/scheduler/dslice");
slice_task_id = dictionary.find_or_create("loom/base/slice");
get_task_id = dictionary.find_or_create("loom/base/get");
dslice_task_id = dictionary.find_or_create("loom/scheduler/dslice");
dget_task_id = dictionary.find_or_create("loom/scheduler/dget");
}
static TaskNode::TaskMode read_task_mode(loomplan::Task_Mode mode) {
......@@ -123,19 +126,19 @@ void TaskManager::on_task_finished(TaskNode &task)
std::vector<TaskNode*> ready;
task.collect_ready_nexts(ready);
bool dslice = false;
bool scheduler_mode = false;
for (TaskNode *node : ready) {
if (node->get_task_type() == dslice_task_id) {
dslice = true;
if (node->get_mode() == TaskNode::MODE_SCHEDULER) {
scheduler_mode = true;
break;
}
}
if (dslice) {
if (scheduler_mode) {
std::vector<TaskNode*> new_ready;
for (TaskNode *node : ready) {
if (node->get_task_type() == dslice_task_id) {
expand_dslice(node, new_ready);
for (TaskNode *node : ready) {
if (node->get_mode() == TaskNode::MODE_SCHEDULER) {
expand_scheduler_mode_task(node, new_ready);
} else {
new_ready.push_back(node);
}
......@@ -146,38 +149,76 @@ void TaskManager::on_task_finished(TaskNode &task)
distribute_work(ready);
}
void TaskManager::expand_dslice(TaskNode *node, TaskNode::Vector &tasks)
void TaskManager::expand_scheduler_mode_task(TaskNode *node, TaskNode::Vector &tasks)
{
size_t slice_count = 0;
for (auto &w : server.get_connections()) {
slice_count += w->get_resource_cpus();
}
slice_count *= 4;
assert (slice_count > 0);
assert (node->get_inputs().size() == 1); // Todo generalize
TaskNode *input = node->get_inputs()[0];
size_t length = input->get_length();
assert (node->get_nexts().size() == 1); // Todo generalize
TaskNode *next = node->get_nexts()[0];
size_t slice_size = round(static_cast<double>(length) / slice_count);
if (slice_size == 0) {
slice_size = 1;
if (node->get_task_type() == dslice_task_id) {
expand_dslice(node, tasks, length, next);
return;
}
if (node->get_task_type() == dget_task_id) {
expand_dget(node, tasks, length, next);
return;
}
llog->critical("Invalid scheduler task");
exit(1);
}
size_t i = 0;
slice_count = 0;
void TaskManager::expand_dget(TaskNode *node, TaskNode::Vector &tasks, size_t length, TaskNode *next)
{
if (llog->level() >= spdlog::level::debug) {
llog->debug("Expanding dget id={}; follow id={} length={}",
node->get_id(), next->get_type_name(server), length);
}
assert (node->get_nexts().size() == 1); // Todo generalize
TaskNode *next = node->get_nexts()[0];
auto& inputs = node->get_inputs();
std::vector<TaskNode*> new_tasks;
for (size_t i = 0; i < length; i++) {
std::string config(reinterpret_cast<char*>(&i), sizeof(size_t));
dynamic_expand_helper(inputs, next, get_task_id, config, tasks, new_tasks);
}
for (TaskNode *n : next->get_nexts()) {
n->replace_input(next, new_tasks);
}
for (TaskNode *n : node->get_inputs()) {
n->inc_ref_counter(length - 1);
}
}
void TaskManager::expand_dslice(TaskNode *node, TaskNode::Vector &tasks, size_t length, TaskNode *next)
{
if (llog->level() >= spdlog::level::debug) {
llog->debug("Expanding dslice dslice id={}; follow id={}",
node->get_id(), next->get_type_name(server));
}
auto& inputs = node->get_inputs();
size_t slice_count = server.get_worker_ncpus();
slice_count *= 4;
assert (slice_count > 0);
size_t slice_size = round(static_cast<double>(length) / slice_count);
if (slice_size == 0) {
slice_size = 1;
}
size_t i = 0;
slice_count = 0;
std::vector<TaskNode*> new_tasks;
while (i < length) {
loom::Id new_id = server.new_id(2);
size_t indices[2];
indices[0] = i;
indices[1] = i + slice_size;
......@@ -186,17 +227,7 @@ void TaskManager::expand_dslice(TaskNode *node, TaskNode::Vector &tasks)
}
i = indices[1];
std::string config(reinterpret_cast<char*>(&indices), sizeof(size_t) * 2);
auto new_slice = std::make_unique<TaskNode>(new_id, -1, TaskNode::MODE_SIMPLE, slice_task_id, config);
new_slice->set_inputs(node->get_inputs());
tasks.push_back(new_slice.get());
auto new_task = std::make_unique<TaskNode>(
new_id + 1, -1, next->get_mode(), next->get_task_type(), next->get_config());
new_task->add_input(new_slice.get());
new_slice->inc_ref_counter();
new_tasks.push_back(new_slice.get());
this->tasks[new_id] = std::move(new_slice);
dynamic_expand_helper(inputs, next, slice_task_id, config, tasks, new_tasks);
slice_count += 1;
}
......@@ -211,6 +242,28 @@ void TaskManager::expand_dslice(TaskNode *node, TaskNode::Vector &tasks)
}
}
void TaskManager::dynamic_expand_helper(
const TaskNode::Vector &inputs,
TaskNode *next,
Id task_type_id,
std::string &config,
TaskNode::Vector &tasks1,
TaskNode::Vector &tasks2)
{
Id new_id = server.new_id(2);
auto new_slice = std::make_unique<TaskNode>(new_id, -1, TaskNode::MODE_SIMPLE, task_type_id, config);
new_slice->set_inputs(inputs);
tasks1.push_back(new_slice.get());
auto new_task = std::make_unique<TaskNode>(
new_id + 1, -1, next->get_mode(), next->get_task_type(), next->get_config());
new_task->add_input(new_slice.get());
new_slice->inc_ref_counter();
tasks2.push_back(new_slice.get());
this->tasks[new_id] = std::move(new_slice);
}
struct _TaskInfo {
int priority;
TaskNode::Vector new_tasks;
......
......@@ -50,10 +50,24 @@ private:
std::vector<std::string> task_types;
loom::Id dslice_task_id;
loom::Id dget_task_id;
loom::Id slice_task_id;
loom::Id get_task_id;
void distribute_work(TaskNode::Vector &tasks);
void expand_dslice(TaskNode *node, TaskNode::Vector &tasks);
void expand_scheduler_mode_task(TaskNode *node, TaskNode::Vector &tasks);
void expand_dslice(TaskNode *node, TaskNode::Vector &tasks, size_t length, TaskNode *next);
void expand_dget(TaskNode *node, TaskNode::Vector &tasks, size_t length, TaskNode *next);
void dynamic_expand_helper(
const TaskNode::Vector &inputs,
TaskNode *next,
loom::Id task_type_id,
std::string &config,
TaskNode::Vector &tasks1,
TaskNode::Vector &tasks2);
};
......
......@@ -20,3 +20,18 @@ def test_dslice(loom_env):
assert len(result) >= 2
assert sum(result, []) == ["data{}".format(i) for i in xrange(16)]
def test_dget(loom_env):
loom_env.start(2)
p = loom_env.plan()
consts = []
for i in xrange(16):
consts.append(p.task_const("data{}".format(i)))
a = p.task_array_make(consts)
ds = p.task_dget(a)
f = p.task_run("/bin/cat", stdin=ds)
r = p.task_array_make((f,))
result = loom_env.submit(p, r)
assert result == ["data{}".format(i) for i in xrange(16)]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment