Commit b76e74a6 authored by Stanislav Bohm's avatar Stanislav Bohm

RF: Separating internal ids and client ids

parent 473e3067
......@@ -81,6 +81,11 @@ void DWConnection::on_message(const char *buffer, size_t size)
loomcomm::DataPrologue msg;
msg.ParseFromArray(buffer, size);
auto data_id = msg.id();
TaskNode &node = worker.server.get_task_manager().get_task(data_id);
auto client_id = node.get_client_id();
msg.set_id(client_id);
loomcomm::ClientMessage cmsg;
cmsg.set_type(loomcomm::ClientMessage_Type_DATA);
*cmsg.mutable_data() = msg;
......@@ -89,8 +94,7 @@ void DWConnection::on_message(const char *buffer, size_t size)
assert(msg.has_data_size());
size_t data_size = msg.data_size();
auto data_id = msg.id();
llog->debug("Fetching data id={} data_size={}", data_id, data_size);
llog->debug("Fetching data id={} data_size={} client_id={}", data_id, data_size, client_id);
auto mem = std::make_unique<char[]>(data_size);
pointer = mem.get();
......
......@@ -14,8 +14,8 @@ Server::Server(uv_loop_t *loop, int port)
: loop(loop),
listen_port(port),
task_manager(*this),
dummy_worker(*this)
dummy_worker(*this),
id_counter(1)
{
if (loop != NULL) {
UV_CHECK(uv_tcp_init(loop, &listen_socket));
......
......@@ -59,6 +59,12 @@ public:
void inform_about_error(std::string &error_msg);
void inform_about_task_error(loom::Id id, WorkerConnection &wconn, const std::string &error_msg);
loom::Id new_id(int count = 1) {
auto id = id_counter;
id_counter += count;
return id;
}
private:
void start_listen();
......@@ -76,6 +82,7 @@ private:
TaskManager task_manager;
DummyWorker dummy_worker;
loom::Id id_counter;
loom::Dictionary dictionary;
static void _on_new_connection(uv_stream_t *stream, int status);
......
......@@ -27,17 +27,19 @@ void TaskManager::add_plan(const loomplan::Plan &plan, bool distribute)
}
auto task_size = plan.tasks_size();
int id_base = server.new_id(task_size);
for (int i = 0; i < task_size; i++) {
const auto& pt = plan.tasks(i);
tasks[i] = std::make_unique<TaskNode>(
i, type_task_translation[pt.task_type()], pt.config());
auto id = i + id_base;
tasks[id] = std::make_unique<TaskNode>(
id, i, type_task_translation[pt.task_type()], pt.config());
}
std::vector<TaskNode*> ready_tasks;
for (int i = 0; i < task_size; i++) {
const auto& pt = plan.tasks(i);
auto& t = tasks[i];
auto& t = tasks[id_base + i];
auto inputs_size = pt.input_ids_size();
if (inputs_size == 0) {
......@@ -47,7 +49,7 @@ void TaskManager::add_plan(const loomplan::Plan &plan, bool distribute)
for (int j = 0; j < inputs_size; j++) {
auto id = pt.input_ids(j);
auto &task = tasks[id];
auto &task = tasks[id_base + id];
t->add_input(task.get());
task->inc_ref_counter();
......@@ -67,6 +69,8 @@ void TaskManager::add_plan(const loomplan::Plan &plan, bool distribute)
for (int i = 0; i < plan.result_ids_size(); i++)
{
auto id = plan.result_ids(i);
assert (0 <= id && id < task_size);
id = id_base + id;
tasks[id]->inc_ref_counter();
results.insert(id);
}
......
......@@ -21,10 +21,10 @@ public:
FINISHED
};
TaskNode(loom::Id id, int task_type, const std::string &config)
TaskNode(loom::Id id, loom::Id client_id, int task_type, const std::string &config)
: state(WAITING), id(id), ref_count(0), task_type(task_type),
config(config),
size(0), length(0)
size(0), length(0), client_id(client_id)
{}
bool is_ready() {
......@@ -78,6 +78,10 @@ public:
return id;
}
loom::Id get_client_id() const {
return client_id;
}
loom::TaskId get_task_type() const {
return task_type;
}
......@@ -122,6 +126,8 @@ private:
size_t size;
size_t length;
loom::Id client_id;
};
......
......@@ -43,4 +43,4 @@ def test_cv_iris(loom_env):
for line in results:
assert line.startswith("Accuracy = ")
p.write_dot("test.dot", loom_env.client.info)
#p.write_dot("test.dot", loom_env.client.info)
......@@ -168,21 +168,15 @@ def test_open_and_splitlines(loom_env):
p = loom_env.plan()
a = p.task_open(FILE2)
c = p.task_split_lines(a, 2, 6)
result = loom_env.submit(p, c)
expect = "\n".join("Line {}".format(i) for i in xrange(3, 7)) + "\n"
assert result == expect
p = loom_env.plan()
a = p.task_open(FILE2)
c = p.task_split_lines(a, 0, 6)
result = loom_env.submit(p, c)
expect = "\n".join("Line {}".format(i) for i in xrange(1, 7)) + "\n"
assert result == expect
p = loom_env.plan()
a = p.task_open(FILE2)
c = p.task_split_lines(a, 3, 60)
result = loom_env.submit(p, c)
expect = "\n".join("Line {}".format(i) for i in xrange(4, 13)) + "\n"
assert result == expect
c1 = p.task_split_lines(a, 2, 6)
c2 = p.task_split_lines(a, 0, 6)
c3 = p.task_split_lines(a, 3, 60)
result1, result2, result3 = loom_env.submit(p, [c1,c2,c3])
expect1 = "\n".join("Line {}".format(i) for i in xrange(3, 7)) + "\n"
assert result1 == expect1
expect2 = "\n".join("Line {}".format(i) for i in xrange(1, 7)) + "\n"
assert result2 == expect2
expect3 = "\n".join("Line {}".format(i) for i in xrange(4, 13)) + "\n"
assert result3 == expect3
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment