Commit 08cb3cc7 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: dslice

parent 6d35b004
......@@ -30,6 +30,16 @@ Id Dictionary::find_or_create(const std::string &symbol)
}
}
const std::string& Dictionary::translate(Id id)
{
for (auto &i : symbol_to_id) {
if (i.second == id) {
return i.first;
}
}
assert(0);
}
std::vector<std::string> Dictionary::get_all_symbols() const
{
std::vector<std::string> symbols;
......
......@@ -16,6 +16,7 @@ public:
loom::Id lookup_symbol(const std::string &symbol);
loom::Id find_or_create(const std::string &symbol);
const std::string& translate(loom::Id id);
std::vector<std::string> get_all_symbols() const;
......
......@@ -68,8 +68,8 @@ public:
private:
void start_listen();
loom::Dictionary dictionary;
uv_loop_t *loop;
std::vector<std::unique_ptr<WorkerConnection>> connections;
std::vector<std::unique_ptr<FreshConnection>> fresh_connections;
......@@ -83,8 +83,6 @@ private:
DummyWorker dummy_worker;
loom::Id id_counter;
loom::Dictionary dictionary;
static void _on_new_connection(uv_stream_t *stream, int status);
};
......
......@@ -13,7 +13,8 @@ using namespace loom;
TaskManager::TaskManager(Server &server)
: server(server)
{
dslice_task_id = server.get_dictionary().find_or_create("scheduler/dslice");
slice_task_id = server.get_dictionary().find_or_create("base/slice");
}
void TaskManager::add_plan(const loomplan::Plan &plan, bool distribute)
......@@ -67,7 +68,7 @@ void TaskManager::add_plan(const loomplan::Plan &plan, bool distribute)
}
for (int i = 0; i < plan.result_ids_size(); i++)
{
{
auto id = plan.result_ids(i);
assert (0 <= id && id < task_size);
id = id_base + id;
......@@ -100,7 +101,7 @@ void TaskManager::on_task_finished(TaskNode &task)
}
}
} else {
llog->debug("Job id={} finished", id);
llog->debug("Job id={} finished (length={})", id, task.get_length());
}
for (TaskNode *input : task.get_inputs()) {
......@@ -114,9 +115,95 @@ void TaskManager::on_task_finished(TaskNode &task)
std::vector<TaskNode*> ready;
task.collect_ready_nexts(ready);
bool dslice = false;
for (TaskNode *node : ready) {
if (node->get_task_type() == dslice_task_id) {
dslice = true;
break;
}
}
if (dslice) {
std::vector<TaskNode*> new_ready;
for (TaskNode *node : ready) {
if (node->get_task_type() == dslice_task_id) {
expand_dslice(node, new_ready);
} else {
new_ready.push_back(node);
}
}
ready = std::move(new_ready);
}
distribute_work(ready);
}
void TaskManager::expand_dslice(TaskNode *node, TaskNode::Vector &tasks)
{
size_t slice_count = 0;
for (auto &w : server.get_connections()) {
slice_count += w->get_resource_cpus();
}
slice_count *= 4;
assert (slice_count > 0);
assert (node->get_inputs().size() == 1); // Todo generalize
TaskNode *input = node->get_inputs()[0];
size_t length = input->get_length();
size_t slice_size = round(static_cast<double>(length) / slice_count);
if (slice_size == 0) {
slice_size = 1;
}
size_t i = 0;
slice_count = 0;
assert (node->get_nexts().size() == 1); // Todo generalize
TaskNode *next = node->get_nexts()[0];
if (llog->level() >= spdlog::level::debug) {
llog->debug("Expanding dslice dslice id={}; follow id={}",
node->get_id(), next->get_type_name(server));
}
std::vector<TaskNode*> new_tasks;
while (i < length) {
loom::Id new_id = server.new_id(2);
size_t indices[2];
indices[0] = i;
indices[1] = i + slice_size;
if (indices[1] > length) {
indices[1] = length;
}
i = indices[1];
std::string config(reinterpret_cast<char*>(&indices), sizeof(size_t) * 2);
auto new_slice = std::make_unique<TaskNode>(new_id, -1, slice_task_id, config);
new_slice->set_inputs(node->get_inputs());
tasks.push_back(new_slice.get());
auto new_task = std::make_unique<TaskNode>(
new_id + 1, -1, next->get_task_type(), next->get_config());
new_task->add_input(new_slice.get());
new_slice->inc_ref_counter();
new_tasks.push_back(new_slice.get());
this->tasks[new_id] = std::move(new_slice);
slice_count += 1;
}
llog->debug("length = {}; slice_count={}; slice_size={}", length, slice_count, slice_size);
for (TaskNode *n : next->get_nexts()) {
n->replace_input(next, new_tasks);
}
for (TaskNode *n : node->get_inputs()) {
n->inc_ref_counter(slice_count - 1);
}
}
struct _TaskInfo {
int priority;
TaskNode::Vector new_tasks;
......@@ -188,7 +275,7 @@ TaskManager::WorkDistribution TaskManager::compute_distribution(TaskNode::Vector
}
void TaskManager::distribute_work(TaskNode::Vector &tasks)
{
{
if (tasks.size() == 0) {
return;
}
......
......@@ -41,13 +41,18 @@ public:
return *tasks[id];
}
private:
Server &server;
std::unordered_map<loom::Id, std::unique_ptr<TaskNode>> tasks;
std::unordered_set<loom::Id> results;
std::vector<std::string> task_types;
loom::Id dslice_task_id;
loom::Id slice_task_id;
void distribute_work(TaskNode::Vector &tasks);
void expand_dslice(TaskNode *node, TaskNode::Vector &tasks);
};
......
#include "tasknode.h"
#include "server.h"
#include <sstream>
#include <iomanip>
void TaskNode::replace_input(TaskNode *old_input, const std::vector<TaskNode *> &new_inputs)
{
auto i = std::find(inputs.begin(), inputs.end(), old_input);
assert (i != inputs.end());
auto i2 = inputs.erase(i);
inputs.insert(i2, new_inputs.begin(), new_inputs.end());
// Update next in new_inputs
for (TaskNode *n : new_inputs) {
n->add_next(this);
}
// Update next in old_input
auto i3 = std::find(old_input->nexts.begin(), old_input->nexts.end(), this);
assert (i3 != inputs.end());
old_input->nexts.erase(i3);
}
std::string TaskNode::get_type_name(Server &server)
{
return server.get_dictionary().translate(task_type);
}
std::string TaskNode::get_info(Server &server)
{
std::stringstream s;
s << "T[" << id << "/" << client_id << " refs=" << ref_count;
s << " task=" << get_type_name(server);
s << " config(" << config.size() << ")=";
size_t sz = config.size();
const size_t limit = sz > 8 ? 8 : sz;
for (size_t i = 0; i < limit; i++) {
s << std::hex << std::setfill('0') << std::setw(2) << (int) config[i];
}
if (sz > limit) {
s << "...";
}
s << "]";
return s.str();
}
......@@ -9,6 +9,7 @@
#include <assert.h>
class WorkerConnection;
class Server;
class TaskNode {
public:
......@@ -46,6 +47,10 @@ public:
}
}
void set_inputs(const TaskNode::Vector &inputs) {
this->inputs = inputs;
}
void add_input(TaskNode *task) {
inputs.push_back(task);
}
......@@ -54,8 +59,8 @@ public:
nexts.push_back(task);
}
void inc_ref_counter() {
ref_count += 1;
void inc_ref_counter(int count = 1) {
ref_count += count;
}
bool dec_ref_counter() {
......@@ -109,10 +114,24 @@ public:
this->length = length;
}
const Vector& get_inputs() {
const Vector& get_inputs() const {
return inputs;
}
const Vector& get_nexts() const {
return nexts;
}
size_t get_length() const {
return length;
}
void replace_input(TaskNode *old_input, const std::vector<TaskNode*> &new_inputs);
std::string get_type_name(Server &server);
std::string get_info(Server &server);
private:
State state;
loom::Id id;
......
......@@ -71,3 +71,4 @@ void SplitTask::start(DataVector &inputs)
std::shared_ptr<Data> result = std::make_shared<Index>(worker, input, indices.size() - 1, std::move(indices_data));
finish(result);
}
......@@ -6,7 +6,7 @@ loom_env # silence flake8
def test_dslice(loom_env):
loom_env.start(1)
loom_env.start(2)
p = loom_env.plan()
consts = []
......@@ -14,3 +14,9 @@ def test_dslice(loom_env):
consts.append(p.task_const("data{}".format(i)))
a = p.task_array_make(consts)
ds = p.task_dslice(a)
f = p.task_get(ds, 0)
r = p.task_array_make((f,))
result = loom_env.submit(p, r)
assert len(result) >= 2
assert sum(result, []) == ["data{}".format(i) for i in xrange(16)]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment