Commit b847643e authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: A brand new scheduler

parent a7e484b9
......@@ -2,6 +2,8 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -g -Wall")
add_library(loom-server-lib OBJECT
server.cpp
server.h
scheduler.cpp
scheduler.h
dummyworker.cpp
dummyworker.h
taskmanager.cpp
......@@ -19,9 +21,7 @@ add_library(loom-server-lib OBJECT
plannode.cpp
plannode.h
plan.h
plan.cpp
solver.h
solver.cpp)
plan.cpp)
target_include_directories(loom-server-lib PUBLIC ${PROJECT_SOURCE_DIR}/src)
......
......@@ -2,7 +2,6 @@
#include "compstate.h"
#include "workerconn.h"
#include "server.h"
#include "solver.h"
#include "libloom/log.h"
......@@ -17,27 +16,22 @@ ComputationState::ComputationState(Server &server) : server(server)
dslice_task_id = dictionary.find_or_create("loom/scheduler/dslice");
dget_task_id = dictionary.find_or_create("loom/scheduler/dget");
base_time = uv_now(server.get_loop());
if (server.get_loop()) {
base_time = uv_now(server.get_loop());
}
}
void ComputationState::set_plan(Plan &&plan)
{
this->plan = std::move(plan);
}
TaskDistribution ComputationState::compute_initial_distribution()
{
auto task_ids = plan.get_init_tasks();
add_ready_nodes(task_ids);
return compute_distribution();
}
void ComputationState::add_worker(WorkerConnection* wconn)
{
int index = workers.size();
auto &w = workers[wconn];
w.index = index;
w.n_tasks = 0;
w.free_cpus = wconn->get_resource_cpus();
}
void ComputationState::set_running_task(const PlanNode &node, WorkerConnection *wc)
......@@ -49,7 +43,7 @@ void ComputationState::set_running_task(const PlanNode &node, WorkerConnection *
assert(state.get_worker_status(wc) == TaskState::S_NONE);
state.set_worker_status(wc, TaskState::S_RUNNING);
workers[wc].n_tasks += node.get_n_cpus();
workers[wc].free_cpus -= node.get_n_cpus();
}
void ComputationState::set_task_finished(const PlanNode&node, size_t size, size_t length, WorkerConnection *wc)
......@@ -59,7 +53,7 @@ void ComputationState::set_task_finished(const PlanNode&node, size_t size, size_
state.set_worker_status(wc, TaskState::S_OWNER);
state.set_size(size);
state.set_length(length);
workers[wc].n_tasks -= node.get_n_cpus();
workers[wc].free_cpus += node.get_n_cpus();
}
void ComputationState::remove_state(loom::Id id)
......@@ -262,165 +256,6 @@ int ComputationState::get_max_cpus()
return max_cpus;
}
TaskDistribution ComputationState::compute_distribution()
{
loom::llog->debug("Computation of distribution: {} task(s)", pending_tasks.size());
TaskDistribution result;
if (pending_tasks.empty()) {
return result;
}
size_t n_workers = workers.size();
if (n_workers == 0) {
return result;
}
int max_cpus = get_max_cpus();
if (max_cpus == 0) {
max_cpus = 1;
}
size_t n_tasks = pending_tasks.size();
size_t t_variables = n_workers * n_tasks;
// + 1 because lp solve indexes from 1 :(
std::vector<double> costs(t_variables + 1, 0.0);
std::vector<loom::Id> tasks(pending_tasks.begin(), pending_tasks.end());
/* [t_0,A] [t_0,B] ... [t_1,A] [t_1,B] ... [m_0,A] [m_0,B] ... [m_2,A] [m_2,B]
* pending tasks t_X,A - X=task_index, A=worker_index
* movable tasks m_X,A - X=task_index, A=worker_index
*/
/* Gather all inputs
and estimate the max transfer cost */
std::unordered_map<loom::Id, int> inputs;
std::vector<double> n_cpus; // we will later use it for coefs, we store it directly as double
n_cpus.reserve(tasks.size());
size_t total_size = 0;
for (loom::Id id : tasks) {
const PlanNode &node = get_node(id);
n_cpus.push_back(node.get_n_cpus());
for (loom::Id id2 : node.get_inputs()) {
auto it = inputs.find(id2);
if (it == inputs.end()) {
inputs[id2] = costs.size();
TaskState &state = get_state(id2);
total_size += state.get_size();
double cost = static_cast<double>(state.get_size()) * TRANSFER_COST_COEF;
for (size_t i = 0; i < n_workers; i++) {
costs.push_back(cost);
}
}
}
}
/* Setup coeficients for exexuting a task */
double task_cost = (total_size + 1) * 2 * TRANSFER_COST_COEF;
for (size_t i = 0; i < n_tasks; i++) {
double cpus = n_cpus[i];
double coef;
if (cpus < 1) {
coef = task_cost;
} else {
coef = task_cost * (cpus + (cpus * cpus) / (max_cpus * max_cpus));
}
for (size_t j = 0; j < n_workers; j++) {
// + 1 because we are counting from 1 ...
costs[i * n_workers + j + 1] = -coef;
}
}
/* Initialize solver and helper structures */
size_t variables = costs.size() - 1;
Solver solver(variables);
std::vector<int> indices;
indices.reserve(n_workers * n_tasks);
std::vector<double> ones(variables, 1.0);
size_t task_id = 1;
/* Task constraints */
for (loom::Id id : tasks) {
indices.clear();
for (size_t i = 0; i < n_workers; i++) {
indices.push_back(task_id + i);
}
/* Task can be executed only once */
solver.add_constraint_lq(indices, ones, 1);
const PlanNode &node = get_node(id);
std::unordered_set<loom::Id> nonlocals;
for (auto pair : workers) {
WorkerConnection *wc = pair.first;
size_t index = pair.second.index;
nonlocals.clear();
collect_requirements_for_node(wc, node, nonlocals);
for (loom::Id id2 : nonlocals) {
/* What dataobjects have to be transfered? */
solver.add_constraint_lq(task_id + index, inputs[id2] + index);
}
}
task_id += n_workers;
}
indices.resize(n_tasks);
for (auto &pair : workers) {
WorkerConnection *wc = pair.first;
size_t index = pair.second.index;
size_t free_cpus = wc->get_resource_cpus() - pair.second.n_tasks;
for (size_t j = 0; j < n_tasks; j++) {
indices[j] = j * n_workers + index + 1;
}
/* Capacity limit for each worker */
solver.add_constraint_lq(indices, n_cpus, free_cpus);
}
solver.set_objective_fn(costs);
std::vector<double> solution = solver.solve();
assert(solution.size() == variables);
std::vector<WorkerConnection*> wconns(n_workers, nullptr);
for (auto &pair : workers) {
wconns[pair.second.index] = pair.first;
}
for (size_t i = 0; i < t_variables; i++) {
if (solution[i]) {
size_t worker_index = i % n_workers;
size_t task_index = i / n_workers;
assert(task_index < n_tasks);
result[wconns[worker_index]].push_back(tasks[task_index]);
}
}
return result;
/*
std::vector<WorkerConnection*> wc(n_workers, NULL);
int i = 0;
for (auto &pair : workers) {
wc[i] = pair.first;
i++;
}
for (loom::Id id : pending_tasks) {
const PlanNode &node = get_node(id);
int index;
if (node.get_inputs().size() == 1) {
index = node.get_inputs()[0];
} else {
index = id;
}
result[wc[index % n_workers]].push_back(id);
}
*/
}
TaskState &ComputationState::get_state(loom::Id id)
{
auto it = states.find(id);
......
......@@ -6,15 +6,13 @@
class Server;
using TaskDistribution = std::unordered_map<WorkerConnection*, std::vector<loom::Id>>;
class ComputationState {
public:
struct WorkerInfo {
WorkerInfo() : n_tasks(0) {}
size_t index;
int n_tasks;
int free_cpus;
};
public:
ComputationState(Server &server);
void set_plan(Plan &&plan);
......@@ -22,8 +20,8 @@ public:
void set_running_task(const PlanNode &node, WorkerConnection *wc);
TaskDistribution compute_initial_distribution();
TaskDistribution compute_distribution();
/*TaskDistribution compute_initial_distribution();
TaskDistribution compute_distribution();*/
TaskState& get_state(loom::Id id);
......@@ -49,6 +47,10 @@ public:
return !pending_tasks.empty();
}
const std::unordered_set<loom::Id>& get_pending_tasks() const {
return pending_tasks;
}
const Plan& get_plan() const {
return plan;
}
......@@ -63,6 +65,11 @@ public:
return base_time;
}
const std::unordered_map<WorkerConnection*, WorkerInfo>&
get_worker_info() const {
return workers;
}
private:
std::unordered_map<loom::Id, TaskState> states;
std::unordered_map<WorkerConnection*, WorkerInfo> workers;
......
This diff is collapsed.
#ifndef LOOM_SERVER_SCHEDULER_H
#define LOOM_SERVER_SCHEDULER_H
#include "compstate.h"
using TaskDistribution = std::unordered_map<WorkerConnection*, std::vector<loom::Id>>;
class Scheduler
{
struct SUnit { // Scheduling Unit
int n_cpus;
int64_t bonus_score;
int64_t expected_size;
std::vector<size_t> inputs;
std::vector<size_t> next_inputs;
std::vector<loom::Id> nexts;
std::vector<loom::Id> ids;
};
struct Worker {
int free_cpus;
int max_cpus;
WorkerConnection *wc;
};
struct UW {
size_t unit_index;
size_t worker_index;
};
struct DataObj {
size_t size;
std::vector<size_t> owners;
};
public:
Scheduler(ComputationState &cstate);
TaskDistribution schedule();
private:
std::vector<Worker> workers;
std::vector<SUnit> s_units;
std::vector<DataObj> data;
bool find_best(UW &uw);
void apply_uw(const UW &uw);
void create_derived_units();
static Scheduler::SUnit merge_units(const SUnit &u1, const SUnit &u2);
};
#endif
......@@ -32,7 +32,11 @@ Server::Server(uv_loop_t *loop, int port)
dummy_worker.start_listen();
loom::llog->debug("Dummy worker started at {}", dummy_worker.get_listen_port());
}
UV_CHECK(uv_idle_init(loop, &distribution_idle));
if (loop) {
UV_CHECK(uv_idle_init(loop, &distribution_idle));
}
distribution_idle.data = this;
}
......
#include "solver.h"
#include <libloom/log.h>
#include <libloom/utils.h>
#include <lpsolve/lp_lib.h>
#define LP_CHECK(call) \
{ \
if (unlikely((!call))) { \
report_lp_error(__LINE__, __FILE__); \
} \
}
static void report_lp_error(int line, const char* filename)
{
loom::llog->critical("lp_solve error at {}:{}", filename, line);
exit(1);
}
Solver::Solver(size_t variables)
: lp_solver(nullptr)
{
assert(variables > 0);
assert(sizeof(REAL) == sizeof(double));
lprec *lp;
lp = make_lp(0, variables);
LP_CHECK(lp);
for (size_t i = 1; i <= variables; i++) {
set_binary(lp, i, TRUE);
}
set_add_rowmode(lp, TRUE);
lp_solver = lp;
this->variables = variables;
}
Solver::~Solver()
{
lprec *lp = static_cast<lprec*>(lp_solver);
if (lp) {
delete_lp(lp);
}
}
void Solver::add_constraint_lq(int index1, int index2)
{
if (index1 == index2) {
return;
}
double v[2] = { 1.0, -1.0 };
int i[2];
i[0] = index1;
i[1] = index2;
lprec *lp = static_cast<lprec*>(lp_solver);
LP_CHECK(add_constraintex(lp, 2, v, i, LE, 0.0));
}
void Solver::add_constraint_eq(const std::vector<int> &indices, const std::vector<double> &values, double rhs)
{
lprec *lp = static_cast<lprec*>(lp_solver);
size_t size = indices.size();
assert(values.size() >= size);
LP_CHECK(add_constraintex(
lp, size,
const_cast<double*>(&values[0]),
const_cast<int*>(&indices[0]),
EQ, rhs));
}
void Solver::add_constraint_lq(const std::vector<int> &indices, const std::vector<double> &values, double rhs)
{
lprec *lp = static_cast<lprec*>(lp_solver);
size_t size = indices.size();
assert(values.size() >= size);
LP_CHECK(add_constraintex(lp, size,
const_cast<double*>(&values[0]),
const_cast<int*>(&indices[0]),
LE, rhs));
}
void Solver::set_objective_fn(const std::vector<double> &values)
{
assert(values.size() == variables + 1);
lprec *lp = static_cast<lprec*>(lp_solver);
set_obj_fn(lp, const_cast<double*>(&values[0]));
}
std::vector<double> Solver::solve()
{
constexpr bool debug = false;
lprec *lp = static_cast<lprec*>(lp_solver);
set_add_rowmode(lp, FALSE);
if (debug) {
write_LP(lp, stdout);
}
set_verbose(lp, IMPORTANT);
set_presolve(lp, /* PRESOLVE_ROWS | */ PRESOLVE_REDUCEMIP + PRESOLVE_KNAPSACK + PRESOLVE_COLS + PRESOLVE_LINDEP, get_presolveloops(lp));
int ret = ::solve(lp);
assert(ret == OPTIMAL || ret == SUBOPTIMAL);
std::vector<double> result(variables);
get_variables(lp, &result[0]);
if (debug) {
for (size_t i = 0; i < variables; i++) {
loom::llog->alert("{}: {}", i + 1, result[i]);
}
}
return result;
}
#ifndef LOOM_SERVER_SOLVER_H
#define LOOM_SERVER_SOLVER_H
#include <vector>
class Solver
{
public:
Solver(std::size_t variables);
~Solver();
void add_constraint_lq(int index1, int index2);
void add_constraint_eq(const std::vector<int> &indices, const std::vector<double> &values, double rhs);
void add_constraint_lq(const std::vector<int> &indices, const std::vector<double> &values, double rhs);
void set_objective_fn(const std::vector<double> &values);
std::vector<double> solve();
protected:
/* lp_solve heavily pollutes namespace with macros,
* so we do not want to include the library publicaly
* lp_solver is in fact lprec internaly casted to the
* the right type */
void *lp_solver;
std::size_t variables;
};
#endif // LOOM_SERVER_SOLVER_H
......@@ -19,12 +19,11 @@ TaskManager::TaskManager(Server &server)
void TaskManager::add_plan(Plan &&plan, bool report)
{
this->report = report;
cstate.set_plan(std::move(plan));
TaskDistribution distribution(cstate.compute_initial_distribution());
distribute_work(distribution);
cstate.set_plan(std::move(plan));
distribute_work(Scheduler(cstate).schedule());
}
void TaskManager::distribute_work(TaskDistribution &distribution)
void TaskManager::distribute_work(const TaskDistribution &distribution)
{
if (distribution.size() == 0) {
return;
......@@ -151,6 +150,5 @@ void TaskManager::register_worker(WorkerConnection *wc)
void TaskManager::run_task_distribution()
{
TaskDistribution d = cstate.compute_distribution();
distribute_work(d);
distribute_work(Scheduler(cstate).schedule());
}
......@@ -2,6 +2,7 @@
#define LOOM_SERVER_TASKMANAGER_H
#include "compstate.h"
#include "scheduler.h"
#include <vector>
#include <unordered_map>
......@@ -48,7 +49,7 @@ private:
Server &server;
ComputationState cstate;
void distribute_work(TaskDistribution &distribution);
void distribute_work(const TaskDistribution &distribution);
void start_task(WorkerConnection *wc, loom::Id task_id);
void remove_state(TaskState &state);
......
......@@ -89,7 +89,15 @@ public:
}
template<typename F> void foreach_owner(F f) {
template<typename F> void foreach_source(F f) const {
for(auto &pair : workers) {
if (pair.second == S_OWNER || pair.second == S_RUNNING) {
f(pair.first);
}
}
}
template<typename F> void foreach_owner(F f) const {
for(auto &pair : workers) {
if (pair.second == S_OWNER) {
f(pair.first);
......
......@@ -5,7 +5,7 @@ target_include_directories(Catch INTERFACE ${PROJECT_SOURCE_DIR}/tests)
add_executable(server-test
$<TARGET_OBJECTS:loom-server-lib>
scheduler.cpp
test_scheduler.cpp
main.cpp)
target_link_libraries(server-test Catch libloom ${LIBUV_LIBRARY} pthread)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment