Commit bb6a3621 authored by Stanislav Bohm's avatar Stanislav Bohm

RF: Scheduler refactoring

parent d231acc2
......@@ -6,8 +6,6 @@ add_library(loom-server-lib OBJECT
dummyworker.h
taskmanager.cpp
taskmanager.h
tasknode.cpp
tasknode.h
workerconn.cpp
workerconn.h
freshconn.cpp
......@@ -16,6 +14,10 @@ add_library(loom-server-lib OBJECT
clientconn.h
compstate.h
compstate.cpp
taskstate.h
taskstate.cpp
plannode.cpp
plannode.h
plan.h
plan.cpp)
......
......@@ -61,12 +61,12 @@ void ComputationState::remove_state(loom::Id id)
states.erase(it);
}
void ComputationState::add_ready_nexts(const TaskNode &node)
void ComputationState::add_ready_nexts(const PlanNode &node)
{
for (loom::Id id : node.get_nexts()) {
const TaskNode &node = get_node(id);
const PlanNode &node = get_node(id);
if (is_ready(node)) {
if (node.get_policy() == TaskNode::POLICY_SCHEDULER) {
if (node.get_policy() == PlanNode::POLICY_SCHEDULER) {
expand_node(node);
} else {
pending_tasks.insert(id);
......@@ -75,7 +75,7 @@ void ComputationState::add_ready_nexts(const TaskNode &node)
}
}
void ComputationState::expand_node(const TaskNode &node)
void ComputationState::expand_node(const PlanNode &node)
{
loom::Id id = node.get_task_type();
......@@ -89,7 +89,7 @@ void ComputationState::expand_node(const TaskNode &node)
}
}
void ComputationState::expand_dslice(const TaskNode &node)
void ComputationState::expand_dslice(const PlanNode &node)
{
size_t n_cpus = 0;
for (auto& pair : workers) {
......@@ -97,10 +97,10 @@ void ComputationState::expand_dslice(const TaskNode &node)
}
assert(n_cpus);
const TaskNode &node1 = node;
const PlanNode &node1 = node;
assert(node1.get_nexts().size() == 1);
// Do a copy again
const TaskNode &node2 = get_node(node.get_nexts()[0]);
const PlanNode &node2 = get_node(node.get_nexts()[0]);
std::vector<loom::Id> inputs = node.get_inputs();
assert(inputs.size() == 1);
......@@ -132,17 +132,17 @@ void ComputationState::expand_dslice(const TaskNode &node)
loom::llog->debug("Expanding 'dslice' id={} length={} pieces={} new_id_base={}",
node1.get_id(), length, configs.size(), id_base1);
TaskNode new_node(node1.get_id(),-1, TaskNode::POLICY_SIMPLE, false,
PlanNode new_node(node1.get_id(),-1, PlanNode::POLICY_SIMPLE, false,
slice_task_id, "", node1.get_inputs());
make_expansion(configs, new_node, node2, id_base1, id_base2);
}
void ComputationState::expand_dget(const TaskNode &node)
void ComputationState::expand_dget(const PlanNode &node)
{
const TaskNode &node1 = node;
const PlanNode &node1 = node;
assert(node1.get_nexts().size() == 1);
// Do a copy again
const TaskNode &node2 = get_node(node.get_nexts()[0]);
const PlanNode &node2 = get_node(node.get_nexts()[0]);
std::vector<loom::Id> inputs = node.get_inputs();
assert(inputs.size() == 1);
......@@ -160,20 +160,20 @@ void ComputationState::expand_dget(const TaskNode &node)
loom::llog->debug("Expanding 'dget' id={} length={} new_id_base={}",
node1.get_id(), length, id_base1);
TaskNode new_node(node1.get_id(),-1, TaskNode::POLICY_SIMPLE, false,
PlanNode new_node(node1.get_id(),-1, PlanNode::POLICY_SIMPLE, false,
get_task_id, "", node1.get_inputs());
make_expansion(configs, new_node, node2, id_base1, id_base2);
}
void ComputationState::make_expansion(std::vector<std::string> &configs,
const TaskNode &n1,
const TaskNode &n2,
const PlanNode &n1,
const PlanNode &n2,
loom::Id id_base1,
loom::Id id_base2)
{
TaskNode node1 = n1; // Make copy
TaskNode node2 = n2; // Make copy
PlanNode node1 = n1; // Make copy
PlanNode node2 = n2; // Make copy
plan.remove_node(node1.get_id());
plan.remove_node(node2.get_id());
......@@ -185,7 +185,7 @@ void ComputationState::make_expansion(std::vector<std::string> &configs,
ids2.reserve(size);
for (std::string &config1 : configs) {
TaskNode t1(id_base1, -1,
PlanNode t1(id_base1, -1,
node1.get_policy(), false,
node1.get_task_type(), config1, node1.get_inputs());
t1.set_nexts(std::vector<loom::Id>{id_base2});
......@@ -193,7 +193,7 @@ void ComputationState::make_expansion(std::vector<std::string> &configs,
pending_tasks.insert(id_base1);
TaskNode t2(id_base2, -1,
PlanNode t2(id_base2, -1,
node2.get_policy(), false,
node2.get_task_type(), node2.get_config(),
std::vector<int>{id_base1});
......@@ -217,7 +217,7 @@ void ComputationState::make_expansion(std::vector<std::string> &configs,
}
bool ComputationState::is_ready(const TaskNode &node)
bool ComputationState::is_ready(const PlanNode &node)
{
for (loom::Id id : node.get_inputs()) {
if (states.find(id) == states.end()) {
......@@ -252,7 +252,7 @@ TaskDistribution ComputationState::compute_distribution()
}
for (loom::Id id : pending_tasks) {
const TaskNode &node = get_node(id);
const PlanNode &node = get_node(id);
int index;
if (node.get_inputs().size() == 1) {
index = node.get_inputs()[0];
......@@ -270,11 +270,3 @@ void ComputationState::add_ready_nodes(std::vector<loom::Id> &ids)
pending_tasks.insert(id);
}
}
TaskState::TaskState(const TaskNode &node)
: id(node.get_id()), ref_count(node.get_nexts().size())
{
if (node.is_result()) {
inc_ref_counter();
}
}
......@@ -2,104 +2,10 @@
#define LOOM_SERVER_COMPSTATE_H
#include "plan.h"
#include "taskstate.h"
#include <unordered_map>
class WorkerConnection;
template<typename T> using WorkerMap = std::unordered_map<WorkerConnection*, T>;
class ComputationState;
class Server;
class TaskState {
public:
enum WStatus {
NONE,
READY,
RUNNING,
OWNER,
};
TaskState(const TaskNode &node);
loom::Id get_id() const {
return id;
}
void inc_ref_counter(int count = 1) {
ref_count += count;
}
bool dec_ref_counter() {
return --ref_count <= 0;
}
int get_ref_counter() const {
return ref_count;
}
WorkerMap<WStatus>& get_workers() {
return workers;
}
size_t get_size() const {
return size;
}
void set_size(size_t size) {
this->size = size;
}
size_t get_length() const {
return length;
}
void set_length(size_t length) {
this->length = length;
}
WStatus get_worker_status(WorkerConnection *wc) {
auto i = workers.find(wc);
if (i == workers.end()) {
return NONE;
}
return i->second;
}
WorkerConnection *get_first_owner() {
for (auto &p : workers) {
if (p.second == OWNER) {
return p.first;
}
}
return nullptr;
}
void set_worker_status(WorkerConnection *wc, WStatus ws) {
workers[wc] = ws;
}
template<typename F> void foreach_owner(F f) {
for(auto &pair : workers) {
if (pair.second == OWNER) {
f(pair.first);
}
}
}
private:
loom::Id id;
WorkerMap<WStatus> workers;
int ref_count;
size_t size;
size_t length;
};
struct WorkerInfo {
WorkerInfo() : n_tasks(0) {}
int n_tasks;
};
using TaskDistribution = std::unordered_map<WorkerConnection*, std::vector<loom::Id>>;
class ComputationState {
......@@ -138,7 +44,7 @@ public:
return it->second;
}
const TaskNode& get_node(loom::Id id) {
const PlanNode& get_node(loom::Id id) {
return plan.get_node(id);
}
......@@ -151,8 +57,8 @@ public:
void remove_state(loom::Id id);
bool is_ready(const TaskNode &node);
void add_ready_nexts(const TaskNode &node);
bool is_ready(const PlanNode &node);
void add_ready_nexts(const PlanNode &node);
private:
std::unordered_map<loom::Id, TaskState> states;
......@@ -167,14 +73,14 @@ private:
loom::Id slice_task_id;
loom::Id get_task_id;
WorkerConnection *get_best_holder_of_deps(TaskNode *task);
WorkerConnection *find_best_worker_for_node(TaskNode *task);
WorkerConnection *get_best_holder_of_deps(PlanNode *task);
WorkerConnection *find_best_worker_for_node(PlanNode *task);
void expand_node(const TaskNode &node);
void expand_dslice(const TaskNode &node);
void expand_dget(const TaskNode &node);
void make_expansion(std::vector<std::string> &configs, const TaskNode &node1,
const TaskNode &node2, loom::Id id_base1, loom::Id id_base2);
void expand_node(const PlanNode &node);
void expand_dslice(const PlanNode &node);
void expand_dget(const PlanNode &node);
void make_expansion(std::vector<std::string> &configs, const PlanNode &node1,
const PlanNode &node2, loom::Id id_base1, loom::Id id_base2);
};
......
......@@ -5,14 +5,14 @@
#include "libloom/loomplan.pb.h"
#include "libloom/log.h"
static TaskNode::Policy read_task_policy(loomplan::Task_Policy policy) {
static PlanNode::Policy read_task_policy(loomplan::Task_Policy policy) {
switch(policy) {
case loomplan::Task_Policy_POLICY_STANDARD:
return TaskNode::POLICY_STANDARD;
return PlanNode::POLICY_STANDARD;
case loomplan::Task_Policy_POLICY_SIMPLE:
return TaskNode::POLICY_SIMPLE;
return PlanNode::POLICY_SIMPLE;
case loomplan::Task_Policy_POLICY_SCHEDULER:
return TaskNode::POLICY_SCHEDULER;
return PlanNode::POLICY_SCHEDULER;
default:
loom::llog->critical("Invalid task policy");
exit(1);
......@@ -41,9 +41,9 @@ Plan::Plan(const loomplan::Plan &plan, loom::Id id_base)
inputs.push_back(id_base + pt.input_ids(j));
}
TaskNode tnode(id, i, read_task_policy(pt.policy()), false,
PlanNode pnode(id, i, read_task_policy(pt.policy()), false,
pt.task_type(), pt.config(), std::move(inputs));
tasks.emplace(std::make_pair(id, std::move(tnode)));
tasks.emplace(std::make_pair(id, std::move(pnode)));
}
for (auto& pair : tasks) {
......@@ -73,7 +73,7 @@ Plan::Plan(const loomplan::Plan &plan, loom::Id id_base)
std::vector<loom::Id> Plan::get_init_tasks() const
{
std::vector<loom::Id> result;
foreach_task([&result](const TaskNode &node){
foreach_task([&result](const PlanNode &node){
if (node.get_inputs().empty()) {
result.push_back(node.id);
}
......
#ifndef LOOM_SERVER_PLAN_H
#define LOOM_SERVER_PLAN_H
#include "tasknode.h"
#include "plannode.h"
#include <unordered_map>
#include <unordered_set>
#include <memory.h>
......@@ -22,13 +22,13 @@ public:
}
}
const TaskNode& get_node(loom::Id id) const {
const PlanNode& get_node(loom::Id id) const {
auto it = tasks.find(id);
assert(it != tasks.end());
return it->second;
}
TaskNode& get_node(loom::Id id) {
PlanNode& get_node(loom::Id id) {
auto it = tasks.find(id);
assert(it != tasks.end());
return it->second;
......@@ -52,7 +52,7 @@ public:
}
private:
std::unordered_map<loom::Id, TaskNode> tasks;
std::unordered_map<loom::Id, PlanNode> tasks;
};
#endif // LOOM_SERVER_PLAN_H
#include "tasknode.h"
#include "plannode.h"
#include "server.h"
#include <sstream>
#include <iomanip>
void TaskNode::replace_input(loom::Id old_input, const std::vector<loom::Id> &new_inputs)
void PlanNode::replace_input(loom::Id old_input, const std::vector<loom::Id> &new_inputs)
{
auto i = std::find(inputs.begin(), inputs.end(), old_input);
assert (i != inputs.end());
......@@ -13,7 +13,7 @@ void TaskNode::replace_input(loom::Id old_input, const std::vector<loom::Id> &ne
inputs.insert(i2, new_inputs.begin(), new_inputs.end());
}
void TaskNode::replace_next(loom::Id old_next, const std::vector<loom::Id> &new_nexts)
void PlanNode::replace_next(loom::Id old_next, const std::vector<loom::Id> &new_nexts)
{
auto i = std::find(nexts.begin(), nexts.end(), old_next);
assert (i != nexts.end());
......@@ -21,12 +21,12 @@ void TaskNode::replace_next(loom::Id old_next, const std::vector<loom::Id> &new_
nexts.insert(nexts.end(), new_nexts.begin(), new_nexts.end());
}
std::string TaskNode::get_type_name(Server &server)
std::string PlanNode::get_type_name(Server &server)
{
return server.get_dictionary().translate(task_type);
}
std::string TaskNode::get_info(Server &server)
std::string PlanNode::get_info(Server &server)
{
std::stringstream s;
s << "T[" << id << "/" << client_id;
......
......@@ -13,10 +13,10 @@ class Server;
class WorkerConnection;
/** A task in the task graph in the server */
class TaskNode {
class PlanNode {
public:
friend class Plan;
typedef std::vector<TaskNode*> Vector;
typedef std::vector<PlanNode*> Vector;
enum Policy {
POLICY_STANDARD,
......@@ -24,7 +24,7 @@ public:
POLICY_SCHEDULER
};
TaskNode(loom::Id id,
PlanNode(loom::Id id,
loom::Id client_id,
Policy policy,
bool result_flag,
......@@ -36,7 +36,7 @@ public:
client_id(client_id)
{}
TaskNode(loom::Id id,
PlanNode(loom::Id id,
loom::Id client_id,
Policy policy,
bool result_flag,
......
......@@ -45,7 +45,7 @@ void TaskManager::distribute_work(TaskDistribution &distribution)
void TaskManager::start_task(WorkerConnection *wc, Id task_id)
{
const TaskNode &node = cstate.get_node(task_id);
const PlanNode &node = cstate.get_node(task_id);
for (loom::Id id : node.get_inputs()) {
TaskState state = cstate.get_state_or_create(id);
TaskState::WStatus st = state.get_worker_status(wc);
......@@ -73,7 +73,7 @@ void TaskManager::remove_state(TaskState &state)
void TaskManager::on_task_finished(loom::Id id, size_t size, size_t length, WorkerConnection *wc)
{
cstate.set_task_finished(id, size, length, wc);
const TaskNode &node = cstate.get_node(id);
const PlanNode &node = cstate.get_node(id);
if (node.is_result()) {
llog->debug("Job id={} [RESULT] finished", id);
......
......@@ -19,11 +19,11 @@ class TaskManager
public:
struct WorkerLoad {
WorkerLoad(WorkerConnection &connection, TaskNode::Vector &&tasks)
WorkerLoad(WorkerConnection &connection, PlanNode::Vector &&tasks)
: connection(connection), tasks(tasks) {}
WorkerConnection &connection;
TaskNode::Vector tasks;
PlanNode::Vector tasks;
};
typedef std::vector<WorkerLoad> WorkDistribution;
......
#include "taskstate.h"
#include "plannode.h"
TaskState::TaskState(const PlanNode &node)
: id(node.get_id()), ref_count(node.get_nexts().size())
{
if (node.is_result()) {
inc_ref_counter();
}
}
#ifndef LOOM_SERVER_TASKSTATE_H
#define LOOM_SERVER_TASKSTATE_H
#include <unordered_map>
#include "libloom/types.h"
class WorkerConnection;
template<typename T> using WorkerMap = std::unordered_map<WorkerConnection*, T>;
class ComputationState;
class PlanNode;
class TaskState {
public:
enum WStatus {
NONE,
READY,
RUNNING,
OWNER,
};
TaskState(const PlanNode &node);
loom::Id get_id() const {
return id;
}
void inc_ref_counter(int count = 1) {
ref_count += count;
}
bool dec_ref_counter() {
return --ref_count <= 0;
}
int get_ref_counter() const {
return ref_count;
}
WorkerMap<WStatus>& get_workers() {
return workers;
}
size_t get_size() const {
return size;
}
void set_size(size_t size) {
this->size = size;
}
size_t get_length() const {
return length;
}
void set_length(size_t length) {
this->length = length;
}
WStatus get_worker_status(WorkerConnection *wc) {
auto i = workers.find(wc);
if (i == workers.end()) {
return NONE;
}
return i->second;
}
WorkerConnection *get_first_owner() {
for (auto &p : workers) {
if (p.second == OWNER) {
return p.first;
}
}
return nullptr;
}
void set_worker_status(WorkerConnection *wc, WStatus ws) {
workers[wc] = ws;
}
template<typename F> void foreach_owner(F f) {
for(auto &pair : workers) {
if (pair.second == OWNER) {
f(pair.first);
}
}
}
private:
loom::Id id;
WorkerMap<WStatus> workers;
int ref_count;
size_t size;
size_t length;
};
struct WorkerInfo {
WorkerInfo() : n_tasks(0) {}
int n_tasks;
};
#endif // LOOM_SERVER_TASKSTATE_H
......@@ -54,7 +54,7 @@ void WorkerConnection::on_close()
server.remove_worker_connection(*this);
}
void WorkerConnection::send_task(const TaskNode &task)
void WorkerConnection::send_task(const PlanNode &task)
{
auto id = task.get_id();
llog->debug("Assigning task id={} to address={}", id, address);
......
......@@ -5,7 +5,7 @@
#include "libloom/types.h"
class Server;
class TaskNode;
class PlanNode;
/** Connection to worker */
......@@ -22,7 +22,7 @@ public:
void on_close();
void send_task(const TaskNode &task);
void send_task(const PlanNode &task);
void send_data(loom::Id id, const std::string &address, bool with_size);
void remove_data(loom::Id id);
......
......@@ -13,7 +13,7 @@ template<typename T> std::set<T> v_to_s(std::vector<T> &v) {
return std::set<T>(v.begin(), v.end());
}
typedef std::unordered_set<TaskNode*> TaskSet;
typedef std::unordered_set<PlanNode*> TaskSet;
typedef
std::unordered_map<WorkerConnection*, TaskSet>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment