Commit 38f193fc authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: Scheduler respects cpu resources of tasks

parent 5e5b0325
from .client import Client, LoomException, TaskFailed # noqa
from .plan import Plan # noqa
from .plan import Plan, cpus, cpu1 # noqa
......@@ -18,7 +18,7 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='loomplan.proto',
package='loomplan',
serialized_pb=_b('\n\x0eloomplan.proto\x12\x08loomplan\"\xbc\x01\n\x04Task\x12\x11\n\ttask_type\x18\x01 \x02(\x05\x12\x0e\n\x06\x63onfig\x18\x02 \x02(\x0c\x12\x11\n\tinput_ids\x18\x03 \x03(\x05\x12\x36\n\x06policy\x18\x04 \x01(\x0e\x32\x15.loomplan.Task.Policy:\x0fPOLICY_STANDARD\"F\n\x06Policy\x12\x13\n\x0fPOLICY_STANDARD\x10\x01\x12\x11\n\rPOLICY_SIMPLE\x10\x02\x12\x14\n\x10POLICY_SCHEDULER\x10\x03\"9\n\x04Plan\x12\x1d\n\x05tasks\x18\x02 \x03(\x0b\x32\x0e.loomplan.Task\x12\x12\n\nresult_ids\x18\x03 \x03(\x05\x42\x02H\x03')
serialized_pb=_b('\n\x0eloomplan.proto\x12\x08loomplan\"\xe0\x01\n\x04Task\x12\x11\n\ttask_type\x18\x01 \x02(\x05\x12\x0e\n\x06\x63onfig\x18\x02 \x02(\x0c\x12\x11\n\tinput_ids\x18\x03 \x03(\x05\x12\x36\n\x06policy\x18\x04 \x01(\x0e\x32\x15.loomplan.Task.Policy:\x0fPOLICY_STANDARD\x12\"\n\x16resource_request_index\x18\x05 \x01(\x05:\x02-1\"F\n\x06Policy\x12\x13\n\x0fPOLICY_STANDARD\x10\x01\x12\x11\n\rPOLICY_SIMPLE\x10\x02\x12\x14\n\x10POLICY_SCHEDULER\x10\x03\"0\n\x08Resource\x12\x15\n\rresource_type\x18\x01 \x02(\x05\x12\r\n\x05value\x18\x02 \x02(\x05\"8\n\x0fResourceRequest\x12%\n\tresources\x18\x01 \x03(\x0b\x32\x12.loomplan.Resource\"o\n\x04Plan\x12\x34\n\x11resource_requests\x18\x01 \x03(\x0b\x32\x19.loomplan.ResourceRequest\x12\x1d\n\x05tasks\x18\x02 \x03(\x0b\x32\x0e.loomplan.Task\x12\x12\n\nresult_ids\x18\x03 \x03(\x05\x42\x02H\x03')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
......@@ -45,8 +45,8 @@ _TASK_POLICY = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
serialized_start=147,
serialized_end=217,
serialized_start=183,
serialized_end=253,
)
_sym_db.RegisterEnumDescriptor(_TASK_POLICY)
......@@ -86,6 +86,13 @@ _TASK = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='resource_request_index', full_name='loomplan.Task.resource_request_index', index=4,
number=5, type=5, cpp_type=1, label=1,
has_default_value=True, default_value=-1,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
......@@ -99,7 +106,74 @@ _TASK = _descriptor.Descriptor(
oneofs=[
],
serialized_start=29,
serialized_end=217,
serialized_end=253,
)
_RESOURCE = _descriptor.Descriptor(
name='Resource',
full_name='loomplan.Resource',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='resource_type', full_name='loomplan.Resource.resource_type', index=0,
number=1, type=5, cpp_type=1, label=2,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='value', full_name='loomplan.Resource.value', index=1,
number=2, type=5, cpp_type=1, label=2,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
extension_ranges=[],
oneofs=[
],
serialized_start=255,
serialized_end=303,
)
_RESOURCEREQUEST = _descriptor.Descriptor(
name='ResourceRequest',
full_name='loomplan.ResourceRequest',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='resources', full_name='loomplan.ResourceRequest.resources', index=0,
number=1, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
extension_ranges=[],
oneofs=[
],
serialized_start=305,
serialized_end=361,
)
......@@ -111,14 +185,21 @@ _PLAN = _descriptor.Descriptor(
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='tasks', full_name='loomplan.Plan.tasks', index=0,
name='resource_requests', full_name='loomplan.Plan.resource_requests', index=0,
number=1, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='tasks', full_name='loomplan.Plan.tasks', index=1,
number=2, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='result_ids', full_name='loomplan.Plan.result_ids', index=1,
name='result_ids', full_name='loomplan.Plan.result_ids', index=2,
number=3, type=5, cpp_type=1, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
......@@ -135,14 +216,18 @@ _PLAN = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=219,
serialized_end=276,
serialized_start=363,
serialized_end=474,
)
_TASK.fields_by_name['policy'].enum_type = _TASK_POLICY
_TASK_POLICY.containing_type = _TASK
_RESOURCEREQUEST.fields_by_name['resources'].message_type = _RESOURCE
_PLAN.fields_by_name['resource_requests'].message_type = _RESOURCEREQUEST
_PLAN.fields_by_name['tasks'].message_type = _TASK
DESCRIPTOR.message_types_by_name['Task'] = _TASK
DESCRIPTOR.message_types_by_name['Resource'] = _RESOURCE
DESCRIPTOR.message_types_by_name['ResourceRequest'] = _RESOURCEREQUEST
DESCRIPTOR.message_types_by_name['Plan'] = _PLAN
Task = _reflection.GeneratedProtocolMessageType('Task', (_message.Message,), dict(
......@@ -152,6 +237,20 @@ Task = _reflection.GeneratedProtocolMessageType('Task', (_message.Message,), dic
))
_sym_db.RegisterMessage(Task)
Resource = _reflection.GeneratedProtocolMessageType('Resource', (_message.Message,), dict(
DESCRIPTOR = _RESOURCE,
__module__ = 'loomplan_pb2'
# @@protoc_insertion_point(class_scope:loomplan.Resource)
))
_sym_db.RegisterMessage(Resource)
ResourceRequest = _reflection.GeneratedProtocolMessageType('ResourceRequest', (_message.Message,), dict(
DESCRIPTOR = _RESOURCEREQUEST,
__module__ = 'loomplan_pb2'
# @@protoc_insertion_point(class_scope:loomplan.ResourceRequest)
))
_sym_db.RegisterMessage(ResourceRequest)
Plan = _reflection.GeneratedProtocolMessageType('Plan', (_message.Message,), dict(
DESCRIPTOR = _PLAN,
__module__ = 'loomplan_pb2'
......
......@@ -17,12 +17,53 @@ class Task(object):
id = None
config = ""
policy = POLICY_STANDARD
resource_request = None
def set_message(self, msg, symbols):
def set_message(self, msg, symbols, requests):
msg.config = self.config
msg.task_type = symbols[self.task_type]
msg.input_ids.extend(t.id for t in self.inputs)
msg.policy = self.policy
if self.resource_request:
msg.resource_request_index = requests.index(self.resource_request)
class ResourceRequest(object):
def __init__(self):
self.resources = {}
def add_resource(self, name, value):
self.resources[name] = value
@property
def names(self):
return self.resources.keys()
def set_message(self, msg, symbols):
for name, value in self.resources.items():
r = msg.resources.add()
r.resource_type = symbols[name]
r.value = value
def __eq__(self, other):
if not isinstance(other, ResourceRequest):
return False
return self.resources == other.resources
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(tuple(sorted(self.resources.items())))
def cpus(value):
r = ResourceRequest()
r.add_resource("loom/resource/cpus", value)
return r
cpu1 = cpus(1)
class Plan(object):
......@@ -49,13 +90,11 @@ class Plan(object):
def __init__(self):
self.tasks = []
self.task_types = set()
def add(self, task):
assert task.id is None
task.id = len(self.tasks)
self.tasks.append(task)
self.task_types.add(task.task_type)
return task
def task_dslice(self, input):
......@@ -100,7 +139,13 @@ class Plan(object):
task.policy = POLICY_SIMPLE
return self.add(task)
def task_run(self, args, inputs=(), outputs=(None,), stdin=None):
def task_run(self,
args,
inputs=(),
outputs=(None,),
stdin=None,
request=cpu1):
if isinstance(args, str):
args = args.split()
......@@ -119,6 +164,7 @@ class Plan(object):
msg.map_outputs.extend(fname if fname else "+out"
for fname in outputs)
task.config = msg.SerializeToString()
task.resource_request = request
return self.add(task)
def task_array_make(self, inputs):
......@@ -160,9 +206,20 @@ class Plan(object):
def create_message(self, symbols):
msg = loomplan_pb2.Plan()
requests = set()
for task in self.tasks:
if task.resource_request:
requests.add(task.resource_request)
requests = list(requests)
for request in requests:
r = msg.resource_requests.add()
request.set_message(r, symbols)
for task in self.tasks:
t = msg.tasks.add()
task.set_message(t, symbols)
task.set_message(t, symbols, requests)
return msg
def write_dot(self, filename, info=None):
......
This diff is collapsed.
This diff is collapsed.
......@@ -13,9 +13,20 @@ message Task {
required bytes config = 2;
repeated int32 input_ids = 3;
optional Policy policy = 4 [default = POLICY_STANDARD];
optional int32 resource_request_index = 5 [default = -1];
}
message Resource {
required int32 resource_type = 1;
required int32 value = 2;
}
message ResourceRequest {
repeated Resource resources = 1;
}
message Plan {
repeated ResourceRequest resource_requests = 1;
repeated Task tasks = 2;
repeated int32 result_ids = 3;
}
......@@ -41,7 +41,7 @@ void ClientConnection::on_message(const char *buffer, size_t size)
auto& task_manager = server.get_task_manager();
loom::Id id_base = server.new_id(plan.tasks_size());
task_manager.add_plan(Plan(plan, id_base));
task_manager.add_plan(Plan(plan, id_base, server.get_dictionary()));
llog->info("Plan submitted tasks={}", plan.tasks_size());
}
......
......@@ -146,7 +146,7 @@ void ComputationState::expand_dslice(const PlanNode &node)
loom::llog->debug("Expanding 'dslice' id={} length={} pieces={} new_id_base={}",
node1.get_id(), length, configs.size(), id_base1);
PlanNode new_node(node1.get_id(),-1, PlanNode::POLICY_SIMPLE, false,
PlanNode new_node(node1.get_id(),-1, PlanNode::POLICY_SIMPLE, -1, false,
slice_task_id, "", node1.get_inputs());
make_expansion(configs, new_node, node2, id_base1, id_base2);
}
......@@ -174,7 +174,7 @@ void ComputationState::expand_dget(const PlanNode &node)
loom::llog->debug("Expanding 'dget' id={} length={} new_id_base={}",
node1.get_id(), length, id_base1);
PlanNode new_node(node1.get_id(),-1, PlanNode::POLICY_SIMPLE, false,
PlanNode new_node(node1.get_id(),-1, PlanNode::POLICY_SIMPLE, -1, false,
get_task_id, "", node1.get_inputs());
make_expansion(configs, new_node, node2, id_base1, id_base2);
}
......@@ -200,7 +200,7 @@ void ComputationState::make_expansion(std::vector<std::string> &configs,
for (std::string &config1 : configs) {
PlanNode t1(id_base1, -1,
node1.get_policy(), false,
node1.get_policy(), node1.get_n_cpus(), false,
node1.get_task_type(), config1, node1.get_inputs());
t1.set_nexts(std::vector<loom::Id>{id_base2});
plan.add_node(std::move(t1));
......@@ -208,7 +208,7 @@ void ComputationState::make_expansion(std::vector<std::string> &configs,
add_pending_task(id_base1);
PlanNode t2(id_base2, -1,
node2.get_policy(), false,
node2.get_policy(), node1.get_n_cpus(), false,
node2.get_task_type(), node2.get_config(),
std::vector<int>{id_base1});
t2.set_nexts(node2.get_nexts());
......@@ -244,6 +244,18 @@ bool ComputationState::is_ready(const PlanNode &node)
return true;
}
size_t ComputationState::get_max_cpus()
{
size_t max_cpus = 0;
for (auto &pair : workers) {
if (max_cpus < pair.first->get_resource_cpus()) {
max_cpus = pair.first->get_resource_cpus();
}
}
return max_cpus;
}
TaskDistribution ComputationState::compute_distribution()
{
loom::llog->debug("Computation for distribution of {} task(s)", pending_tasks.size());
......@@ -257,6 +269,11 @@ TaskDistribution ComputationState::compute_distribution()
return result;
}
size_t max_cpus = get_max_cpus();
if (max_cpus == 0) {
max_cpus = 1;
}
size_t n_tasks = pending_tasks.size();
size_t t_variables = n_workers * n_tasks;
......@@ -271,9 +288,12 @@ TaskDistribution ComputationState::compute_distribution()
/* Gather all inputs */
std::unordered_map<loom::Id, int> inputs;
std::vector<double> n_cpus; // we will later use it for coefs, we store it directly as double
n_cpus.reserve(pending_tasks.size());
size_t total_size = 0;
for (loom::Id id : pending_tasks) {
const PlanNode &node = get_node(id);
n_cpus.push_back(node.get_n_cpus());
for (loom::Id id2 : node.get_inputs()) {
auto it = inputs.find(id2);
if (it == inputs.end()) {
......@@ -289,8 +309,18 @@ TaskDistribution ComputationState::compute_distribution()
}
double task_cost = (total_size + 1) * 2 * TRANSFER_COST_COEF;
for (size_t i = 1; i <= t_variables; i++) {
costs[i] = -task_cost;
for (size_t i = 0; i < n_tasks; i++) {
double cpus = n_cpus[i];
double coef;
if (cpus < 1) {
coef = task_cost;
} else {
coef = task_cost * (cpus + (cpus * cpus) / (max_cpus * max_cpus));
}
for (size_t j = 0; j < n_workers; j++) {
// + 1 because we are counting from 1 ...
costs[i * n_workers + j + 1] = -coef;
}
}
size_t variables = costs.size() - 1;
......@@ -306,8 +336,6 @@ TaskDistribution ComputationState::compute_distribution()
std::vector<loom::Id> tasks(pending_tasks.begin(), pending_tasks.end());
for (loom::Id id : tasks) {
/* Task must be mapped or not-mapped
* Set [t_X,A] + [t_X,B] ... + [t_X,n_workers] = 1 */
indices.clear();
for (size_t i = 0; i < n_workers; i++) {
indices.push_back(task_id + i);
......@@ -343,7 +371,7 @@ TaskDistribution ComputationState::compute_distribution()
for (size_t j = 0; j < n_tasks; j++) {
indices[j] = j * n_workers + index + 1;
}
solver.add_constraint_lq(indices, ones, free_cpus);
solver.add_constraint_lq(indices, n_cpus, free_cpus);
}
solver.set_objective_fn(costs);
......
......@@ -80,6 +80,7 @@ private:
std::unordered_set<loom::Id> &nonlocals);
size_t task_transfer_cost(const PlanNode &node);
void add_pending_task(loom::Id id);
size_t get_max_cpus();
};
......
......@@ -24,13 +24,21 @@ Plan::Plan()
}
Plan::Plan(const loomplan::Plan &plan, loom::Id id_base)
Plan::Plan(const loomplan::Plan &plan, loom::Id id_base, loom::Dictionary &dictionary)
{
/*
auto task_size = plan.tasks_size();
int id_base = server.new_id(task_size);
*/
std::vector<int> resources;
loom::Id resource_ncpus = dictionary.find_or_create("loom/resource/cpus");
auto rr_size = plan.resource_requests_size();
for (int i = 0; i < rr_size; i++) {
auto &rr = plan.resource_requests(i);
assert(rr.resources_size() == 1);
assert(rr.resources(0).resource_type() == resource_ncpus);
resources.push_back(rr.resources(0).value());
}
auto task_size = plan.tasks_size();
tasks.reserve(task_size);
for (int i = 0; i < task_size; i++) {
const auto& pt = plan.tasks(i);
auto id = i + id_base;
......@@ -41,7 +49,13 @@ Plan::Plan(const loomplan::Plan &plan, loom::Id id_base)
inputs.push_back(id_base + pt.input_ids(j));
}
PlanNode pnode(id, i, read_task_policy(pt.policy()), false,
int n_cpus = 0;
if (pt.resource_request_index() != -1) {
assert(pt.resource_request_index() >= 0);
assert(pt.resource_request_index() < (int) resources.size());
n_cpus = resources[pt.resource_request_index()];
}
PlanNode pnode(id, i, read_task_policy(pt.policy()), n_cpus, false,
pt.task_type(), pt.config(), std::move(inputs));
tasks.emplace(std::make_pair(id, std::move(pnode)));
}
......
......@@ -2,6 +2,9 @@
#define LOOM_SERVER_PLAN_H
#include "plannode.h"
#include <libloom/dictionary.h>
#include <unordered_map>
#include <unordered_set>
#include <memory.h>
......@@ -14,7 +17,7 @@ class Plan {
public:
Plan();
Plan(const loomplan::Plan &plan, loom::Id id_base);
Plan(const loomplan::Plan &plan, loom::Id id_base, loom::Dictionary &dictionary);
template<typename F> void foreach_task(F f) const {
for (auto &pair : tasks) {
......
......@@ -27,11 +27,12 @@ public:
PlanNode(loom::Id id,
loom::Id client_id,
Policy policy,
int n_cpus,
bool result_flag,
int task_type,
const std::string &config,
std::vector<loom::Id> &&inputs)
: id(id), policy(policy), result_flag(result_flag), task_type(task_type),
: id(id), policy(policy), n_cpus(n_cpus), result_flag(result_flag), task_type(task_type),
inputs(std::move(inputs)), config(config),
client_id(client_id)
{}
......@@ -39,11 +40,12 @@ public:
PlanNode(loom::Id id,
loom::Id client_id,
Policy policy,
int n_cpus,
bool result_flag,
int task_type,
const std::string &config,
const std::vector<loom::Id> &inputs)
: id(id), policy(policy), result_flag(result_flag), task_type(task_type),
: id(id), policy(policy), n_cpus(n_cpus), result_flag(result_flag), task_type(task_type),
inputs(inputs), config(config),
client_id(client_id)
{}
......@@ -98,10 +100,14 @@ public:
result_flag = true;
}
int get_n_cpus() const {
return n_cpus;
}
private:
loom::Id id;
Policy policy;
int n_cpus; // TODO: Replace by resource index
bool result_flag;
loom::TaskId task_type;
std::vector<loom::Id> inputs;
......
......@@ -16,6 +16,10 @@ Server::Server(uv_loop_t *loop, int port)
dummy_worker(*this),
id_counter(1)
{
/* Since the server do not implement fully resource management, we forces
* symbol for the only schedulable resouce: loom/resource/cpus */
dictionary.find_or_create("loom/resource/cpus");
if (loop != NULL) {
UV_CHECK(uv_tcp_init(loop, &listen_socket));
listen_socket.data = this;
......
......@@ -92,9 +92,13 @@ void Solver::set_objective_fn(const std::vector<double> &values)
std::vector<double> Solver::solve()
{
constexpr bool debug = false;
lprec *lp = static_cast<lprec*>(lp_solver);
set_add_rowmode(lp, FALSE);
//write_LP(lp, stdout);
if (debug) {
write_LP(lp, stdout);
}
set_verbose(lp, IMPORTANT);
set_presolve(lp, /* PRESOLVE_ROWS | */ PRESOLVE_REDUCEMIP + PRESOLVE_KNAPSACK + PRESOLVE_COLS + PRESOLVE_LINDEP, get_presolveloops(lp));
......@@ -105,8 +109,10 @@ std::vector<double> Solver::solve()
std::vector<double> result(variables);
get_variables(lp, &result[0]);
/*for (size_t i = 0; i < variables; i++) {
loom::llog->alert("{}: {}", i + 1, result[i]);
}*/
if (debug) {
for (size_t i = 0; i < variables; i++) {
loom::llog->alert("{}: {}", i + 1, result[i]);
}
}
return result;
}
......@@ -3,6 +3,7 @@ from loomenv import loom_env, LOOM_TESTPROG, LOOM_TEST_DATA_DIR # noqa
import struct
from datetime import datetime
import os
import client
FILE1 = os.path.join(LOOM_TEST_DATA_DIR, "file1")
FILE2 = os.path.join(LOOM_TEST_DATA_DIR, "file2")
......@@ -107,6 +108,30 @@ def test_run_separated_4_cpu(loom_env):
assert c.total_seconds() < 0.06
def test_run_separated_4cpu_tasks_4_cpu(loom_env):
loom_env.start(1, cpus=4)
p = loom_env.plan()
args = pytestprog(0.3, stamp=True)
tasks = [p.task_run(args, request=client.cpus(4)) for i in range(4)]
results = loom_env.submit(p, tasks)
starts = []
for result in results:
line1, line2 = result.strip().split("\n")
starts.append(str2datetime(line1))
for i in range(len(starts) - 1):
a = starts[i + 1]
b = starts[i]
if a > b:
c = a - b
else:
c = b - a
assert 0.3 < c.total_seconds() < 0.45
def test_run_double_lines(loom_env):
p = loom_env.plan()
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment