Commit 5e5b0325 authored by Stanislav Bohm's avatar Stanislav Bohm

FIX: Fixed several problems in scheduler

parent b09a8549
......@@ -40,7 +40,7 @@ void ComputationState::add_worker(WorkerConnection* wconn)
void ComputationState::set_running_task(WorkerConnection *wc, loom::Id id)
{
TaskState &state = get_state_or_create(id);
TaskState &state = get_state(id);
auto it = pending_tasks.find(id);
assert(it != pending_tasks.end());
pending_tasks.erase(it);
......@@ -75,12 +75,20 @@ void ComputationState::add_ready_nexts(const PlanNode &node)
if (node.get_policy() == PlanNode::POLICY_SCHEDULER) {
expand_node(node);
} else {
pending_tasks.insert(id);
add_pending_task(id);
}
}
}
}
void ComputationState::add_pending_task(loom::Id id)
{
loom::llog->debug("Add pending task and creating state id={}", id);
auto pair = states.emplace(std::make_pair(id, TaskState(get_node(id))));
assert(pair.second);
pending_tasks.insert(id);
}
void ComputationState::expand_node(const PlanNode &node)
{
loom::Id id = node.get_task_type();
......@@ -197,7 +205,7 @@ void ComputationState::make_expansion(std::vector<std::string> &configs,
t1.set_nexts(std::vector<loom::Id>{id_base2});
plan.add_node(std::move(t1));
pending_tasks.insert(id_base1);
add_pending_task(id_base1);
PlanNode t2(id_base2, -1,
node2.get_policy(), false,
......@@ -238,6 +246,8 @@ bool ComputationState::is_ready(const PlanNode &node)
TaskDistribution ComputationState::compute_distribution()
{
loom::llog->debug("Computation for distribution of {} task(s)", pending_tasks.size());
TaskDistribution result;
if (pending_tasks.empty()) {
return result;
......@@ -390,7 +400,7 @@ TaskState &ComputationState::get_state(loom::Id id)
}
TaskState &ComputationState::get_state_or_create(loom::Id id)
/*TaskState &ComputationState::get_state_or_create(loom::Id id)
{
auto it = states.find(id);
if (it == states.end()) {
......@@ -399,12 +409,12 @@ TaskState &ComputationState::get_state_or_create(loom::Id id)
it = p.first;
}
return it->second;
}
}*/
void ComputationState::add_ready_nodes(const std::vector<loom::Id> &ids)
{
for (loom::Id id : ids) {
pending_tasks.insert(id);
add_pending_task(id);
}
}
......
......@@ -36,7 +36,7 @@ public:
}
}
TaskState& get_state_or_create(loom::Id id);
//TaskState& get_state_or_create(loom::Id id);
const PlanNode& get_node(loom::Id id) {
return plan.get_node(id);
......@@ -79,6 +79,7 @@ private:
const PlanNode &node,
std::unordered_set<loom::Id> &nonlocals);
size_t task_transfer_cost(const PlanNode &node);
void add_pending_task(loom::Id id);
};
......
......@@ -47,7 +47,7 @@ void TaskManager::start_task(WorkerConnection *wc, Id task_id)
{
const PlanNode &node = cstate.get_node(task_id);
for (loom::Id id : node.get_inputs()) {
TaskState state = cstate.get_state_or_create(id);
TaskState &state = cstate.get_state(id);
TaskState::WStatus st = state.get_worker_status(wc);
if (st == TaskState::S_NONE) {
WorkerConnection *owner = state.get_first_owner();
......
#include "taskstate.h"
#include "plannode.h"
#include "workerconn.h"
#include <sstream>
TaskState::TaskState(const PlanNode &node)
: id(node.get_id()), ref_count(node.get_nexts().size())
......@@ -8,3 +11,15 @@ TaskState::TaskState(const PlanNode &node)
inc_ref_counter();
}
}
std::string TaskState::get_info() const
{
std::stringstream s;
s << "[TS id=" << id << " size=" << size;
for (auto &pair : workers) {
s << " " << pair.first->get_address() << ":";
s << pair.second;
}
s << "]";
return s.str();
}
......@@ -87,6 +87,8 @@ public:
}
}
std::string get_info() const;
private:
loom::Id id;
WorkerMap<WStatus> workers;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment