Commit d231acc2 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: New scheduling infrastructure

parent ef148a8d
#include "dictionary.h"
#include "log.h"
#include <assert.h>
......@@ -13,7 +14,10 @@ Dictionary::Dictionary()
Id Dictionary::lookup_symbol(const std::string &symbol)
{
auto i = symbol_to_id.find(symbol);
assert(i != symbol_to_id.end());
if(i == symbol_to_id.end()) {
llog->critical("Unknown symbol '{}'", symbol);
exit(1);
}
assert(i->second != -1);
return i->second;
}
......
......@@ -13,7 +13,11 @@ add_library(loom-server-lib OBJECT
freshconn.cpp
freshconn.h
clientconn.cpp
clientconn.h)
clientconn.h
compstate.h
compstate.cpp
plan.h
plan.cpp)
target_include_directories(loom-server-lib PUBLIC ${PROJECT_SOURCE_DIR}/src)
......
......@@ -39,7 +39,9 @@ void ClientConnection::on_message(const char *buffer, size_t size)
loomplan::Plan plan;
plan.ParseFromArray(buffer, size);
auto& task_manager = server.get_task_manager();
task_manager.add_plan(plan);
loom::Id id_base = server.new_id(plan.tasks_size());
task_manager.add_plan(Plan(plan, id_base));
llog->info("Plan submitted tasks={}", plan.tasks_size());
}
......
#include "compstate.h"
#include "workerconn.h"
#include "server.h"
#include "libloom/log.h"
#include <limits.h>
ComputationState::ComputationState(Server &server) : server(server)
{
loom::Dictionary &dictionary = server.get_dictionary();
slice_task_id = dictionary.find_or_create("loom/base/slice");
get_task_id = dictionary.find_or_create("loom/base/get");
dslice_task_id = dictionary.find_or_create("loom/scheduler/dslice");
dget_task_id = dictionary.find_or_create("loom/scheduler/dget");
}
void ComputationState::set_plan(Plan &&plan)
{
this->plan = std::move(plan);
}
TaskDistribution ComputationState::compute_initial_distribution()
{
auto task_ids = plan.get_init_tasks();
add_ready_nodes(task_ids);
return compute_distribution();
}
void ComputationState::add_worker(WorkerConnection* wconn)
{
workers[wconn] = WorkerInfo();
}
void ComputationState::set_running_task(WorkerConnection *wc, loom::Id id)
{
TaskState &state = get_state_or_create(id);
auto it = pending_tasks.find(id);
assert(it != pending_tasks.end());
pending_tasks.erase(it);
assert(state.get_worker_status(wc) == TaskState::NONE);
state.set_worker_status(wc, TaskState::RUNNING);
workers[wc].n_tasks++;
}
void ComputationState::set_task_finished(loom::Id id, size_t size, size_t length, WorkerConnection *wc)
{
TaskState &state = get_state(id);
assert(state.get_worker_status(wc) == TaskState::RUNNING);
state.set_worker_status(wc, TaskState::OWNER);
state.set_size(size);
state.set_length(length);
}
void ComputationState::remove_state(loom::Id id)
{
auto it = states.find(id);
assert(it != states.end());
states.erase(it);
}
void ComputationState::add_ready_nexts(const TaskNode &node)
{
for (loom::Id id : node.get_nexts()) {
const TaskNode &node = get_node(id);
if (is_ready(node)) {
if (node.get_policy() == TaskNode::POLICY_SCHEDULER) {
expand_node(node);
} else {
pending_tasks.insert(id);
}
}
}
}
void ComputationState::expand_node(const TaskNode &node)
{
loom::Id id = node.get_task_type();
if (id == dslice_task_id) {
expand_dslice(node);
} else if (id == dget_task_id) {
expand_dget(node);
} else {
loom::llog->critical("Unknown scheduler task: {}", id);
exit(1);
}
}
void ComputationState::expand_dslice(const TaskNode &node)
{
size_t n_cpus = 0;
for (auto& pair : workers) {
n_cpus += pair.first->get_resource_cpus();
}
assert(n_cpus);
const TaskNode &node1 = node;
assert(node1.get_nexts().size() == 1);
// Do a copy again
const TaskNode &node2 = get_node(node.get_nexts()[0]);
std::vector<loom::Id> inputs = node.get_inputs();
assert(inputs.size() == 1);
TaskState &input = get_state(inputs[0]);
size_t length = input.get_length();
size_t slice_size = round(static_cast<double>(length) / (n_cpus * 4));
// * 4 is just an heuristic, we probably need a better adjustment
if (slice_size == 0) {
slice_size = 1;
}
size_t i = 0;
std::vector<std::string> configs;
while (i < length) {
size_t indices[2];
indices[0] = i;
indices[1] = i + slice_size;
if (indices[1] > length) {
indices[1] = length;
}
i = indices[1];
configs.push_back(std::string(reinterpret_cast<char*>(&indices), sizeof(size_t) * 2));
}
loom::Id id_base1 = server.new_id(configs.size());
loom::Id id_base2 = server.new_id(configs.size());
loom::llog->debug("Expanding 'dslice' id={} length={} pieces={} new_id_base={}",
node1.get_id(), length, configs.size(), id_base1);
TaskNode new_node(node1.get_id(),-1, TaskNode::POLICY_SIMPLE, false,
slice_task_id, "", node1.get_inputs());
make_expansion(configs, new_node, node2, id_base1, id_base2);
}
void ComputationState::expand_dget(const TaskNode &node)
{
const TaskNode &node1 = node;
assert(node1.get_nexts().size() == 1);
// Do a copy again
const TaskNode &node2 = get_node(node.get_nexts()[0]);
std::vector<loom::Id> inputs = node.get_inputs();
assert(inputs.size() == 1);
TaskState &input = get_state(inputs[0]);
size_t length = input.get_length();
std::vector<std::string> configs;
for (size_t i = 0; i < length; i++) {
configs.push_back(std::string(reinterpret_cast<char*>(&i), sizeof(size_t)));
}
loom::Id id_base1 = server.new_id(configs.size());
loom::Id id_base2 = server.new_id(configs.size());
loom::llog->debug("Expanding 'dget' id={} length={} new_id_base={}",
node1.get_id(), length, id_base1);
TaskNode new_node(node1.get_id(),-1, TaskNode::POLICY_SIMPLE, false,
get_task_id, "", node1.get_inputs());
make_expansion(configs, new_node, node2, id_base1, id_base2);
}
void ComputationState::make_expansion(std::vector<std::string> &configs,
const TaskNode &n1,
const TaskNode &n2,
loom::Id id_base1,
loom::Id id_base2)
{
TaskNode node1 = n1; // Make copy
TaskNode node2 = n2; // Make copy
plan.remove_node(node1.get_id());
plan.remove_node(node2.get_id());
size_t size = configs.size();
std::vector<int> ids1;
ids1.reserve(size);
std::vector<int> ids2;
ids2.reserve(size);
for (std::string &config1 : configs) {
TaskNode t1(id_base1, -1,
node1.get_policy(), false,
node1.get_task_type(), config1, node1.get_inputs());
t1.set_nexts(std::vector<loom::Id>{id_base2});
plan.add_node(std::move(t1));
pending_tasks.insert(id_base1);
TaskNode t2(id_base2, -1,
node2.get_policy(), false,
node2.get_task_type(), node2.get_config(),
std::vector<int>{id_base1});
t2.set_nexts(node2.get_nexts());
plan.add_node(std::move(t2));
ids1.push_back(id_base1);
id_base1++;
ids2.push_back(id_base2);
id_base2++;
}
for (loom::Id id : node1.get_inputs()) {
plan.get_node(id).replace_next(node1.get_id(), ids1);
get_state(id).inc_ref_counter(size - 1);
}
for (loom::Id id : node2.get_nexts()) {
plan.get_node(id).replace_input(node2.get_id(), ids2);
}
}
bool ComputationState::is_ready(const TaskNode &node)
{
for (loom::Id id : node.get_inputs()) {
if (states.find(id) == states.end()) {
return false;
}
if (get_state(id).get_first_owner() == nullptr) {
return false;
}
}
return true;
}
TaskDistribution ComputationState::compute_distribution()
{
TaskDistribution result;
if (pending_tasks.empty()) {
return result;
}
size_t n_workers = workers.size();
if (n_workers == 0) {
return result;
}
std::vector<WorkerConnection*> wc(n_workers, NULL);
int i = 0;
for (auto &pair : workers) {
wc[i] = pair.first;
i++;
}
for (loom::Id id : pending_tasks) {
const TaskNode &node = get_node(id);
int index;
if (node.get_inputs().size() == 1) {
index = node.get_inputs()[0];
} else {
index = id;
}
result[wc[index % n_workers]].push_back(id);
}
return result;
}
void ComputationState::add_ready_nodes(std::vector<loom::Id> &ids)
{
for (loom::Id id : ids) {
pending_tasks.insert(id);
}
}
TaskState::TaskState(const TaskNode &node)
: id(node.get_id()), ref_count(node.get_nexts().size())
{
if (node.is_result()) {
inc_ref_counter();
}
}
#ifndef LOOM_SERVER_COMPSTATE_H
#define LOOM_SERVER_COMPSTATE_H
#include "plan.h"
#include <unordered_map>
class WorkerConnection;
template<typename T> using WorkerMap = std::unordered_map<WorkerConnection*, T>;
class ComputationState;
class Server;
class TaskState {
public:
enum WStatus {
NONE,
READY,
RUNNING,
OWNER,
};
TaskState(const TaskNode &node);
loom::Id get_id() const {
return id;
}
void inc_ref_counter(int count = 1) {
ref_count += count;
}
bool dec_ref_counter() {
return --ref_count <= 0;
}
int get_ref_counter() const {
return ref_count;
}
WorkerMap<WStatus>& get_workers() {
return workers;
}
size_t get_size() const {
return size;
}
void set_size(size_t size) {
this->size = size;
}
size_t get_length() const {
return length;
}
void set_length(size_t length) {
this->length = length;
}
WStatus get_worker_status(WorkerConnection *wc) {
auto i = workers.find(wc);
if (i == workers.end()) {
return NONE;
}
return i->second;
}
WorkerConnection *get_first_owner() {
for (auto &p : workers) {
if (p.second == OWNER) {
return p.first;
}
}
return nullptr;
}
void set_worker_status(WorkerConnection *wc, WStatus ws) {
workers[wc] = ws;
}
template<typename F> void foreach_owner(F f) {
for(auto &pair : workers) {
if (pair.second == OWNER) {
f(pair.first);
}
}
}
private:
loom::Id id;
WorkerMap<WStatus> workers;
int ref_count;
size_t size;
size_t length;
};
struct WorkerInfo {
WorkerInfo() : n_tasks(0) {}
int n_tasks;
};
using TaskDistribution = std::unordered_map<WorkerConnection*, std::vector<loom::Id>>;
class ComputationState {
public:
ComputationState(Server &server);
void set_plan(Plan &&plan);
void add_worker(WorkerConnection* wc);
void set_running_task(WorkerConnection *wc, loom::Id id);
TaskDistribution compute_initial_distribution();
TaskDistribution compute_distribution();
TaskState& get_state(loom::Id id) {
auto it = states.find(id);
assert(it != states.end());
return it->second;
}
TaskState* get_state_ptr(loom::Id id) {
auto it = states.find(id);
if (it != states.end()) {
return &it->second;
} else {
return nullptr;
}
}
TaskState& get_state_or_create(loom::Id id) {
auto it = states.find(id);
if (it == states.end()) {
auto p = states.emplace(std::make_pair(id, TaskState(get_node(id))));
it = p.first;
}
return it->second;
}
const TaskNode& get_node(loom::Id id) {
return plan.get_node(id);
}
void add_ready_nodes(std::vector<loom::Id> &ids);
void set_task_finished(loom::Id id, size_t size, size_t length, WorkerConnection *wc);
const Plan& get_plan() const {
return plan;
}
void remove_state(loom::Id id);
bool is_ready(const TaskNode &node);
void add_ready_nexts(const TaskNode &node);
private:
std::unordered_map<loom::Id, TaskState> states;
std::unordered_map<WorkerConnection*, WorkerInfo> workers;
std::unordered_set<loom::Id> pending_tasks;
Plan plan;
Server &server;
loom::Id dslice_task_id;
loom::Id dget_task_id;
loom::Id slice_task_id;
loom::Id get_task_id;
WorkerConnection *get_best_holder_of_deps(TaskNode *task);
WorkerConnection *find_best_worker_for_node(TaskNode *task);
void expand_node(const TaskNode &node);
void expand_dslice(const TaskNode &node);
void expand_dget(const TaskNode &node);
void make_expansion(std::vector<std::string> &configs, const TaskNode &node1,
const TaskNode &node2, loom::Id id_base1, loom::Id id_base2);
};
#endif // LOOM_SERVER_COMPSTATE_H
......@@ -82,8 +82,7 @@ void DWConnection::on_message(const char *buffer, size_t size)
msg.ParseFromArray(buffer, size);
auto data_id = msg.id();
TaskNode &node = worker.server.get_task_manager().get_task(data_id);
auto client_id = node.get_client_id();
auto client_id = worker.server.translate_to_client_id(data_id);
msg.set_id(client_id);
loomcomm::ClientMessage cmsg;
......
#include "plan.h"
#include <algorithm>
#include "libloom/loomplan.pb.h"
#include "libloom/log.h"
static TaskNode::Policy read_task_policy(loomplan::Task_Policy policy) {
switch(policy) {
case loomplan::Task_Policy_POLICY_STANDARD:
return TaskNode::POLICY_STANDARD;
case loomplan::Task_Policy_POLICY_SIMPLE:
return TaskNode::POLICY_SIMPLE;
case loomplan::Task_Policy_POLICY_SCHEDULER:
return TaskNode::POLICY_SCHEDULER;
default:
loom::llog->critical("Invalid task policy");
exit(1);
}
}
Plan::Plan()
{
}
Plan::Plan(const loomplan::Plan &plan, loom::Id id_base)
{
/*
auto task_size = plan.tasks_size();
int id_base = server.new_id(task_size);
*/
auto task_size = plan.tasks_size();
for (int i = 0; i < task_size; i++) {
const auto& pt = plan.tasks(i);
auto id = i + id_base;
std::vector<int> inputs;
auto inputs_size = pt.input_ids_size();
for (int j = 0; j < inputs_size; j++) {
inputs.push_back(id_base + pt.input_ids(j));
}
TaskNode tnode(id, i, read_task_policy(pt.policy()), false,
pt.task_type(), pt.config(), std::move(inputs));
tasks.emplace(std::make_pair(id, std::move(tnode)));
}
for (auto& pair : tasks) {
auto& task = pair.second;
loom::Id my_id = task.id;
std::vector<loom::Id> inps(task.get_inputs());
std::sort(inps.begin(), inps.end());
loom::Id prev = -1;
for (loom::Id id : inps) {
if (prev != id) {
auto it = tasks.find(id);
assert(it != tasks.end());
it->second.nexts.push_back(my_id);
prev = id;
}
}
}
for (int i = 0; i < plan.result_ids_size(); i++)
{
auto it = tasks.find(plan.result_ids(i) + id_base);
assert(it != tasks.end());
it->second.set_as_result();
}
}
std::vector<loom::Id> Plan::get_init_tasks() const
{
std::vector<loom::Id> result;
foreach_task([&result](const TaskNode &node){
if (node.get_inputs().empty()) {
result.push_back(node.id);
}
});
return result;
}
#ifndef LOOM_SERVER_PLAN_H
#define LOOM_SERVER_PLAN_H
#include "tasknode.h"
#include <unordered_map>
#include <unordered_set>
#include <memory.h>
namespace loomplan {
class Plan;
}
class Plan {
public:
Plan();
Plan(const loomplan::Plan &plan, loom::Id id_base);
template<typename F> void foreach_task(F f) const {
for (auto &pair : tasks) {
f(pair.second);
}
}
const TaskNode& get_node(loom::Id id) const {
auto it = tasks.find(id);
assert(it != tasks.end());
return it->second;
}
TaskNode& get_node(loom::Id id) {
auto it = tasks.find(id);
assert(it != tasks.end());
return it->second;
}
size_t get_size() const {
return tasks.size();
}
std::vector<loom::Id> get_init_tasks() const;
template<typename T>
void add_node(T&& task) {
tasks.emplace(std::make_pair(task.get_id(), std::forward<T>(task)));
}
void remove_node(loom::Id id) {
auto it = tasks.find(id);
assert(it != tasks.end());
tasks.erase(it);
}
private:
std::unordered_map<loom::Id, TaskNode> tasks;
};
#endif // LOOM_SERVER_PLAN_H
......@@ -29,6 +29,12 @@ Server::Server(uv_loop_t *loop, int port)
}
}
void Server::add_worker_connection(std::unique_ptr<WorkerConnection> conn)
{
task_manager.register_worker(conn.get());
connections.push_back(std::move(conn));
}
void Server::remove_worker_connection(WorkerConnection &conn)
{
auto i = std::find_if(
......@@ -53,6 +59,11 @@ void Server::remove_client_connection(ClientConnection &conn)
client_connection.reset();
}
loom::Id Server::translate_to_client_id(loom::Id id) const
{
return task_manager.get_plan().get_node(id).get_client_id();
}
void Server::remove_freshconnection(FreshConnection &conn)
{
auto i = std::find_if(
......@@ -63,14 +74,16 @@ void Server::remove_freshconnection(FreshConnection &conn)
fresh_connections.erase(i);
}
void Server::on_task_finished(TaskNode &task)
void Server::on_task_finished(loom::Id id, size_t size, size_t length, WorkerConnection *wc)
{
assert(client_connection);
if (client_connection->has_info_flag()) {
loomcomm::ClientMessage cmsg;
assert(0);
/*loomcomm::ClientMessage cmsg;
cmsg.set_type(loomcomm::ClientMessage_Type_INFO);
loomcomm::Info *info = cmsg.mutable_info();
info->set_id(task.get_id());
const auto& owners = task.get_owners();
assert(owners.size());
info->set_worker(owners.back()->get_address());
......@@ -78,9 +91,9 @@ void Server::on_task_finished(TaskNode &task)
SendBuffer *buffer = new SendBuffer;
buffer->add(cmsg);
client_connection->send_buffer(buffer);
client_connection->send_buffer(buffer);*/
}
task_manager.on_task_finished(task);
task_manager.on_task_finished(id, size, length, wc);
}
void Server::inform_about_error(std::string &error_msg)
......
......@@ -21,14 +21,14 @@ public:
return loop;
}
void add_worker_connection(std::unique_ptr<WorkerConnection> conn) {
connections.push_back(std::move(conn));
}
void add_worker_connection(std::unique_ptr<WorkerConnection> conn);
void remove_worker_connection(WorkerConnection &conn);
void add_client_connection(std::unique_ptr<ClientConnection> conn);
void remove_client_connection(ClientConnection &conn);
loom::Id translate_to_client_id(loom::Id id) const;