Commit 8a1d192c authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: Indication when the plan is finished

parent 679e2977
......@@ -18,7 +18,7 @@ Connection::Connection(ConnectionCallback *callback, uv_loop_t *loop)
}
Connection::~Connection()
{
{
assert(state == ConnectionClosed);
}
......@@ -49,7 +49,7 @@ void Connection::close_and_discard_remaining_data()
}
void Connection::accept(uv_tcp_t *listen_socket)
{
{
UV_CHECK(uv_accept((uv_stream_t*) listen_socket, (uv_stream_t*) &socket));
uv_read_start((uv_stream_t *)&socket, _buf_alloc, _on_read);
state = ConnectionOpen;
......@@ -137,7 +137,7 @@ void Connection::send_buffer(SendBuffer *buffer)
}
void Connection::_on_read(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
{
{
Connection *connection = static_cast<Connection *>(stream->data);
if (nread == UV_EOF) {
if (buf->base) {
......
......@@ -81,6 +81,11 @@ void ComputationState::add_ready_nexts(const PlanNode &node)
}
}
bool ComputationState::is_finished() const
{
return states.empty();
}
void ComputationState::add_pending_task(loom::Id id)
{
loom::llog->debug("Add pending task and creating state id={}", id);
......@@ -361,8 +366,6 @@ TaskDistribution ComputationState::compute_distribution()
task_id += n_workers;
}
/* Linit for each worker
* Set [t_0,A] + [t_1,A] ... + [t_L,A] <= free_cpus(A) */
indices.resize(n_tasks);
for (auto &pair : workers) {
WorkerConnection *wc = pair.first;
......
......@@ -53,6 +53,7 @@ public:
bool is_ready(const PlanNode &node);
void add_ready_nexts(const PlanNode &node);
bool is_finished() const;
private:
std::unordered_map<loom::Id, TaskState> states;
......
......@@ -112,6 +112,9 @@ void DWConnection::on_data_finish()
{
llog->debug("Resending data to client");
worker.server.get_client_connection().send_buffer(send_buffer.release());
if (worker.server.get_task_manager().is_plan_finished()) {
loom::llog->info("Plan is finished");
}
}
void DWConnection::on_close()
......
......@@ -86,6 +86,9 @@ void TaskManager::on_task_finished(loom::Id id, size_t size, size_t length, Work
if (state.dec_ref_counter()) {
remove_state(state);
}
if (cstate.is_finished()) {
loom::llog->debug("Plan is finished");
}
} else {
llog->debug("Job id={} finished (size={}, length={})", id, size, length);
}
......@@ -103,6 +106,7 @@ void TaskManager::on_task_finished(loom::Id id, size_t size, size_t length, Work
}
cstate.add_ready_nexts(node);
TaskDistribution distribution(cstate.compute_distribution());
distribute_work(distribution);
}
......
......@@ -38,6 +38,10 @@ public:
void on_task_finished(loom::Id id, size_t size, size_t length, WorkerConnection *wc);
void register_worker(WorkerConnection *wc);
bool is_plan_finished() const {
return cstate.is_finished();
}
private:
Server &server;
ComputationState cstate;
......
......@@ -79,6 +79,16 @@ public:
workers[wc] = ws;
}
bool is_running() const {
for(auto &pair : workers) {
if (pair.second == S_RUNNING) {
return true;
}
}
return false;
}
template<typename F> void foreach_owner(F f) {
for(auto &pair : workers) {
if (pair.second == S_OWNER) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment