Commit 492bb781 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: Scheduling moved to idle callback

parent 1b5408ed
......@@ -45,6 +45,10 @@ public:
void add_ready_nodes(const std::vector<loom::Id> &ids);
void set_task_finished(const PlanNode& node, size_t size, size_t length, WorkerConnection *wc);
bool has_pending_tasks() const {
return !pending_tasks.empty();
}
const Plan& get_plan() const {
return plan;
}
......
......@@ -14,7 +14,8 @@ Server::Server(uv_loop_t *loop, int port)
listen_port(port),
task_manager(*this),
dummy_worker(*this),
id_counter(1)
id_counter(1),
task_distribution_active(false)
{
/* Since the server do not implement fully resource management, we forces
* symbol for the only schedulable resouce: loom/resource/cpus */
......@@ -31,6 +32,8 @@ Server::Server(uv_loop_t *loop, int port)
dummy_worker.start_listen();
loom::llog->debug("Dummy worker started at {}", dummy_worker.get_listen_port());
}
UV_CHECK(uv_idle_init(loop, &distribution_idle));
distribution_idle.data = this;
}
void Server::add_worker_connection(std::unique_ptr<WorkerConnection> conn)
......@@ -164,3 +167,20 @@ void Server::report_event(std::unique_ptr<loomcomm::Event> event)
buffer->add(cmsg);
client_connection->send_buffer(std::move(buffer));
}
void Server::need_task_distribution()
{
if (task_distribution_active) {
return;
}
task_distribution_active = true;
UV_CHECK(uv_idle_start(&distribution_idle, _distribution_callback));
}
void Server::_distribution_callback(uv_idle_t *idle)
{
UV_CHECK(uv_idle_stop(idle));
Server *server = static_cast<Server*>(idle->data);
server->task_distribution_active = false;
server->task_manager.run_task_distribution();
}
......@@ -77,6 +77,8 @@ public:
int get_worker_ncpus();
void report_event(std::unique_ptr<loomcomm::Event> event);
void need_task_distribution();
private:
void start_listen();
......@@ -96,6 +98,10 @@ private:
loom::Id id_counter;
static void _on_new_connection(uv_stream_t *stream, int status);
bool task_distribution_active;
uv_idle_t distribution_idle;
static void _distribution_callback(uv_idle_t *idle);
};
#endif // LOOM_SERVER_SERVER_H
......@@ -139,11 +139,18 @@ void TaskManager::on_task_finished(loom::Id id, size_t size, size_t length, Work
cstate.add_ready_nexts(node);
TaskDistribution distribution(cstate.compute_distribution());
distribute_work(distribution);
if (cstate.has_pending_tasks()) {
server.need_task_distribution();
}
}
void TaskManager::register_worker(WorkerConnection *wc)
{
cstate.add_worker(wc);
}
void TaskManager::run_task_distribution()
{
TaskDistribution d = cstate.compute_distribution();
distribute_work(d);
}
......@@ -42,6 +42,8 @@ public:
return cstate.is_finished();
}
void run_task_distribution();
private:
Server &server;
ComputationState cstate;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment