Commit d18fe288 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: Basic removing of data for finished tasks

parent 95315818
......@@ -18,7 +18,7 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='loomcomm.proto',
package='loomcomm',
serialized_pb=_b('\n\x0eloomcomm.proto\x12\x08loomcomm\"\xbb\x01\n\x08Register\x12\x18\n\x10protocol_version\x18\x01 \x02(\x05\x12%\n\x04type\x18\x02 \x02(\x0e\x32\x17.loomcomm.Register.Type\x12\x0c\n\x04port\x18\x03 \x01(\x05\x12\x12\n\ntask_types\x18\x04 \x03(\t\x12\x0c\n\x04\x63pus\x18\x05 \x01(\x05\x12\x0c\n\x04info\x18\n \x01(\x08\"0\n\x04Type\x12\x13\n\x0fREGISTER_WORKER\x10\x01\x12\x13\n\x0fREGISTER_CLIENT\x10\x02\"&\n\rServerMessage\"\x15\n\x04Type\x12\r\n\tSTART_JOB\x10\x01\"\xc4\x01\n\rWorkerCommand\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.WorkerCommand.Type\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x11\n\ttask_type\x18\x03 \x01(\x05\x12\x13\n\x0btask_config\x18\x04 \x01(\t\x12\x13\n\x0btask_inputs\x18\x05 \x03(\x05\x12\x0f\n\x07\x61\x64\x64ress\x18\n \x01(\t\x12\x11\n\twith_size\x18\x0b \x01(\x08\"\x1a\n\x04Type\x12\x08\n\x04TASK\x10\x01\x12\x08\n\x04SEND\x10\x02\"\x1c\n\x0eWorkerResponse\x12\n\n\x02id\x18\x02 \x01(\x05\"\x18\n\x08\x41nnounce\x12\x0c\n\x04port\x18\x01 \x02(\x05\"-\n\x0c\x44\x61taPrologue\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x11\n\tdata_size\x18\x03 \x01(\x04\"%\n\x04\x44\x61ta\x12\x0f\n\x07type_id\x18\x01 \x02(\x05\x12\x0c\n\x04size\x18\x02 \x01(\x04\"\"\n\x04Info\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x0e\n\x06worker\x18\x02 \x02(\t\"\x9b\x01\n\rClientMessage\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.ClientMessage.Type\x12$\n\x04\x64\x61ta\x18\x02 \x01(\x0b\x32\x16.loomcomm.DataPrologue\x12\x1c\n\x04info\x18\x03 \x01(\x0b\x32\x0e.loomcomm.Info\"\x1a\n\x04Type\x12\x08\n\x04\x44\x41TA\x10\x01\x12\x08\n\x04INFO\x10\x02\x42\x02H\x03')
serialized_pb=_b('\n\x0eloomcomm.proto\x12\x08loomcomm\"\xbb\x01\n\x08Register\x12\x18\n\x10protocol_version\x18\x01 \x02(\x05\x12%\n\x04type\x18\x02 \x02(\x0e\x32\x17.loomcomm.Register.Type\x12\x0c\n\x04port\x18\x03 \x01(\x05\x12\x12\n\ntask_types\x18\x04 \x03(\t\x12\x0c\n\x04\x63pus\x18\x05 \x01(\x05\x12\x0c\n\x04info\x18\n \x01(\x08\"0\n\x04Type\x12\x13\n\x0fREGISTER_WORKER\x10\x01\x12\x13\n\x0fREGISTER_CLIENT\x10\x02\"&\n\rServerMessage\"\x15\n\x04Type\x12\r\n\tSTART_JOB\x10\x01\"\xd0\x01\n\rWorkerCommand\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.WorkerCommand.Type\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x11\n\ttask_type\x18\x03 \x01(\x05\x12\x13\n\x0btask_config\x18\x04 \x01(\t\x12\x13\n\x0btask_inputs\x18\x05 \x03(\x05\x12\x0f\n\x07\x61\x64\x64ress\x18\n \x01(\t\x12\x11\n\twith_size\x18\x0b \x01(\x08\"&\n\x04Type\x12\x08\n\x04TASK\x10\x01\x12\x08\n\x04SEND\x10\x02\x12\n\n\x06REMOVE\x10\x03\"\x1c\n\x0eWorkerResponse\x12\n\n\x02id\x18\x02 \x01(\x05\"\x18\n\x08\x41nnounce\x12\x0c\n\x04port\x18\x01 \x02(\x05\"-\n\x0c\x44\x61taPrologue\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x11\n\tdata_size\x18\x03 \x01(\x04\"%\n\x04\x44\x61ta\x12\x0f\n\x07type_id\x18\x01 \x02(\x05\x12\x0c\n\x04size\x18\x02 \x01(\x04\"\"\n\x04Info\x12\n\n\x02id\x18\x01 \x02(\x05\x12\x0e\n\x06worker\x18\x02 \x02(\t\"\x9b\x01\n\rClientMessage\x12*\n\x04type\x18\x01 \x02(\x0e\x32\x1c.loomcomm.ClientMessage.Type\x12$\n\x04\x64\x61ta\x18\x02 \x01(\x0b\x32\x16.loomcomm.DataPrologue\x12\x1c\n\x04info\x18\x03 \x01(\x0b\x32\x0e.loomcomm.Info\"\x1a\n\x04Type\x12\x08\n\x04\x44\x41TA\x10\x01\x12\x08\n\x04INFO\x10\x02\x42\x02H\x03')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
......@@ -78,11 +78,15 @@ _WORKERCOMMAND_TYPE = _descriptor.EnumDescriptor(
name='SEND', index=1, number=2,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='REMOVE', index=2, number=3,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=429,
serialized_end=455,
serialized_end=467,
)
_sym_db.RegisterEnumDescriptor(_WORKERCOMMAND_TYPE)
......@@ -103,8 +107,8 @@ _CLIENTMESSAGE_TYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
serialized_start=765,
serialized_end=791,
serialized_start=777,
serialized_end=803,
)
_sym_db.RegisterEnumDescriptor(_CLIENTMESSAGE_TYPE)
......@@ -268,7 +272,7 @@ _WORKERCOMMAND = _descriptor.Descriptor(
oneofs=[
],
serialized_start=259,
serialized_end=455,
serialized_end=467,
)
......@@ -297,8 +301,8 @@ _WORKERRESPONSE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=457,
serialized_end=485,
serialized_start=469,
serialized_end=497,
)
......@@ -327,8 +331,8 @@ _ANNOUNCE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=487,
serialized_end=511,
serialized_start=499,
serialized_end=523,
)
......@@ -364,8 +368,8 @@ _DATAPROLOGUE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=513,
serialized_end=558,
serialized_start=525,
serialized_end=570,
)
......@@ -401,8 +405,8 @@ _DATA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=560,
serialized_end=597,
serialized_start=572,
serialized_end=609,
)
......@@ -438,8 +442,8 @@ _INFO = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=599,
serialized_end=633,
serialized_start=611,
serialized_end=645,
)
......@@ -483,8 +487,8 @@ _CLIENTMESSAGE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=636,
serialized_end=791,
serialized_start=648,
serialized_end=803,
)
_REGISTER.fields_by_name['type'].enum_type = _REGISTER_TYPE
......
......@@ -28,7 +28,7 @@ char *Data::get_raw_data(Worker &worker)
return nullptr;
}
std::string Data::get_filename(Worker &worker) const
std::string Data::get_filename() const
{
return "";
}
......
......@@ -30,7 +30,7 @@ public:
virtual void serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr) = 0;
virtual char *get_raw_data(Worker &worker);
virtual std::string get_filename(Worker &worker) const;
virtual std::string get_filename() const;
};
class DataUnpacker
......
......@@ -34,7 +34,7 @@ void ExternFile::serialize_data(Worker &worker, SendBuffer &buffer, std::shared_
assert(0); // TODO
}
std::string ExternFile::get_filename(Worker &worker) const
std::string ExternFile::get_filename() const
{
return filename;
}
......
......@@ -33,7 +33,7 @@ public:
std::string get_info();
void serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr);
std::string get_filename(Worker &worker) const;
std::string get_filename() const;
private:
......
......@@ -17,46 +17,50 @@ using namespace loom;
size_t RawData::file_id_counter = 1;
RawData::RawData()
: data(nullptr), size(0), file_id(0)
: data(nullptr), size(0)
{
}
RawData::~RawData()
{
if (data != nullptr) {
if (file_id) {
munmap(data, size);
} else {
delete [] data;
llog->debug("Disposing raw data filename={}", filename);
if (filename.empty()) {
assert(data == nullptr);
} else {
if (munmap(data, size)) {
log_errno_abort("munmap");
}
if (unlink(filename.c_str())) {
log_errno_abort("unlink");
}
}
}
char* RawData::init_memonly(size_t size)
/*char* RawData::init_memonly(size_t size)
{
assert(data == nullptr);
assert(file_id == 0);
this->size = size;
data = new char[size];
return data;
}
}*/
char* RawData::init_empty_file(Worker &worker, size_t size)
{
assert(data == nullptr);
if (file_id == 0) {
assign_file_id();
if (filename.empty()) {
assign_filename(worker);
}
this->size = size;
int fd = ::open(get_filename(worker).c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
int fd = ::open(filename.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
if (fd < 0) {
llog->critical("Cannot open data {} for writing", get_filename(worker));
llog->critical("Cannot open data {} for writing", filename);
log_errno_abort("open");
}
......@@ -74,36 +78,36 @@ char* RawData::init_empty_file(Worker &worker, size_t size)
return data;
}
void RawData::assign_file_id()
void RawData::assign_filename(Worker &worker)
{
assert(file_id == 0);
file_id = file_id_counter++;
assert(filename.empty());
int file_id = file_id_counter++;
std::stringstream s;
s << worker.get_work_dir() << "data/" << file_id;
filename = s.str();
}
void RawData::init_from_file(Worker &worker)
{
assert(data == nullptr);
if (file_id == 0) {
assign_file_id();
if (filename.empty()) {
assign_filename(worker);
}
size = file_size(get_filename(worker).c_str());
size = file_size(filename.c_str());
}
std::string RawData::get_filename(Worker &worker) const
std::string RawData::get_filename() const
{
assert(file_id);
std::stringstream s;
s << worker.get_work_dir() << "data/" << file_id;
return s.str();
return filename;
}
void RawData::open(Worker &worker)
{
assert(file_id);
assert(!filename.empty());
int fd = ::open(get_filename(worker).c_str(), O_RDONLY, S_IRUSR | S_IWUSR);
int fd = ::open(filename.c_str(), O_RDONLY, S_IRUSR | S_IWUSR);
if (fd < 0) {
llog->critical("Cannot open data {}", get_filename(worker));
llog->critical("Cannot open data {}", filename);
log_errno_abort("open");
}
map(fd, false);
......@@ -114,7 +118,7 @@ void RawData::open(Worker &worker)
void RawData::map(int fd, bool write)
{
assert(data == nullptr);
assert(file_id);
assert(!filename.empty());
assert(fd >= 0);
int flags = PROT_READ;
......@@ -123,7 +127,7 @@ void RawData::map(int fd, bool write)
}
data = (char*) mmap(0, size, flags, MAP_SHARED, fd, 0);
if (data == MAP_FAILED) {
llog->critical("Cannot mmap data file_id={}", file_id);
llog->critical("Cannot mmap data filename={}", filename);
log_errno_abort("mmap");
}
}
......
......@@ -31,12 +31,12 @@ public:
std::string get_info();
void serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr);
char* init_memonly(size_t size);
//char* init_memonly(size_t size);
char* init_empty_file(Worker &worker, size_t size);
void assign_file_id();
void assign_filename(Worker &worker);
void init_from_file(Worker &worker);
std::string get_filename(Worker &worker) const;
std::string get_filename() const;
private:
......@@ -45,7 +45,7 @@ private:
char *data;
size_t size;
size_t file_id;
std::string filename;
static size_t file_id_counter;
......
......@@ -639,6 +639,7 @@ bool WorkerCommand_Type_IsValid(int value) {
switch(value) {
case 1:
case 2:
case 3:
return true;
default:
return false;
......@@ -648,6 +649,7 @@ bool WorkerCommand_Type_IsValid(int value) {
#ifndef _MSC_VER
const WorkerCommand_Type WorkerCommand::TASK;
const WorkerCommand_Type WorkerCommand::SEND;
const WorkerCommand_Type WorkerCommand::REMOVE;
const WorkerCommand_Type WorkerCommand::Type_MIN;
const WorkerCommand_Type WorkerCommand::Type_MAX;
const int WorkerCommand::Type_ARRAYSIZE;
......
......@@ -61,11 +61,12 @@ const int ServerMessage_Type_Type_ARRAYSIZE = ServerMessage_Type_Type_MAX + 1;
enum WorkerCommand_Type {
WorkerCommand_Type_TASK = 1,
WorkerCommand_Type_SEND = 2
WorkerCommand_Type_SEND = 2,
WorkerCommand_Type_REMOVE = 3
};
bool WorkerCommand_Type_IsValid(int value);
const WorkerCommand_Type WorkerCommand_Type_Type_MIN = WorkerCommand_Type_TASK;
const WorkerCommand_Type WorkerCommand_Type_Type_MAX = WorkerCommand_Type_SEND;
const WorkerCommand_Type WorkerCommand_Type_Type_MAX = WorkerCommand_Type_REMOVE;
const int WorkerCommand_Type_Type_ARRAYSIZE = WorkerCommand_Type_Type_MAX + 1;
enum ClientMessage_Type {
......@@ -395,6 +396,7 @@ class WorkerCommand : public ::google::protobuf::MessageLite {
typedef WorkerCommand_Type Type;
static const Type TASK = WorkerCommand_Type_TASK;
static const Type SEND = WorkerCommand_Type_SEND;
static const Type REMOVE = WorkerCommand_Type_REMOVE;
static inline bool Type_IsValid(int value) {
return WorkerCommand_Type_IsValid(value);
}
......
......@@ -186,6 +186,14 @@ void Worker::publish_data(Id id, std::unique_ptr<Data> data)
check_waiting_tasks();
}
void Worker::remove_data(Id id)
{
llog->debug("Removing data id={}", id);
auto i = public_data.find(id);
assert(i != public_data.end());
public_data.erase(i);
}
InterConnection& Worker::get_connection(const std::string &address)
{
auto &connection = connections[address];
......@@ -317,7 +325,6 @@ void Worker::check_waiting_tasks()
}
}
void Worker::remove_task(TaskInstance &task)
{
for (auto i = active_tasks.begin(); i != active_tasks.end(); i++) {
......@@ -390,6 +397,10 @@ void ServerConnection::on_message(const char *data, size_t size)
worker.new_task(std::move(task));
break;
}
case loomcomm::WorkerCommand_Type_REMOVE: {
worker.remove_data(msg.id());
break;
}
case loomcomm::WorkerCommand_Type_SEND: {
auto& address = msg.address();
/* "!" means address to server, so we replace the sign to proper address */
......
......@@ -63,6 +63,7 @@ public:
void task_finished(TaskInstance &task_instance);
void publish_data(Id id, std::unique_ptr<Data> data);
void remove_data(loom::Id id);
bool has_data(Id id) const
{
......@@ -119,7 +120,7 @@ private:
void register_worker();
void start_listen();
void remove_task(TaskInstance &task);
void remove_task(TaskInstance &task);
void start_task(std::unique_ptr<Task> task);
//int get_listen_port();
......
......@@ -29,9 +29,12 @@ message WorkerCommand {
enum Type {
TASK = 1;
SEND = 2;
REMOVE = 3;
}
required Type type = 1;
// ALL
optional int32 id = 2;
// TASK
......
......@@ -85,7 +85,8 @@ void DWConnection::on_message(const char *buffer, size_t size)
assert(msg.has_data_size());
size_t data_size = msg.data_size();
llog->debug("Fetching data id={} data_size={}", msg.id(), data_size);
auto data_id = msg.id();
llog->debug("Fetching data id={} data_size={}", data_id, data_size);
auto mem = std::make_unique<char[]>(data_size);
pointer = mem.get();
......
......@@ -7,6 +7,7 @@
#include <vector>
#include <libloom/connection.h>
#include <libloom/types.h>
class Server;
class DWConnection;
......@@ -59,7 +60,6 @@ protected:
DummyWorker &worker;
std::unique_ptr<loom::SendBuffer> send_buffer;
char *pointer;
size_t size;
bool registered;
};
......
......@@ -52,9 +52,16 @@ void TaskManager::add_plan(const loomplan::Plan &plan, bool distribute)
}
for (int i = 0; i < plan.result_ids_size(); i++)
{
results.insert(plan.result_ids(i));
{
auto id = plan.result_ids(i);
tasks[id]->inc_ref_counter();
results.insert(id);
}
for (auto &t : tasks) {
assert(t.second->get_ref_counter() > 0);
}
if (distribute) {
distribute_work(ready_tasks);
}
......@@ -68,9 +75,26 @@ void TaskManager::on_task_finished(TaskNode &task)
const auto &owners = task.get_owners();
assert(owners.size());
owners[0]->send_data(id, server.get_dummy_worker().get_address(), true);
if (task.dec_ref_counter()) {
assert(task.get_ref_counter() == 0);
for (WorkerConnection *owner : task.get_owners()) {
owner->remove_data(task.get_id());
}
}
} else {
llog->debug("Job id={} finished", id);
}
for (TaskNode *input : task.get_inputs()) {
if (input->dec_ref_counter()) {
assert(input->get_ref_counter() == 0);
for (WorkerConnection *owner : input->get_owners()) {
owner->remove_data(input->get_id());
}
}
}
std::vector<TaskNode*> ready;
task.collect_ready_nexts(ready);
distribute_work(ready);
......
......@@ -22,7 +22,7 @@ public:
};
TaskNode(loom::Id id, int task_type, const std::string &config)
: state(WAITING), id(id), task_type(task_type), config(config) {}
: state(WAITING), id(id), ref_count(0), task_type(task_type), config(config) {}
bool is_ready() {
assert(state == WAITING);
......@@ -49,6 +49,19 @@ public:
void add_next(TaskNode *task) {
nexts.push_back(task);
ref_count += 1;
}
void inc_ref_counter() {
ref_count += 1;
}
bool dec_ref_counter() {
return --ref_count <= 0;
}
int get_ref_counter() const {
return ref_count;
}
void set_state(State state) {
......@@ -95,6 +108,7 @@ public:
private:
State state;
loom::Id id;
int ref_count;
loom::TaskId task_type;
Vector inputs;
Vector nexts;
......
......@@ -90,3 +90,11 @@ void WorkerConnection::send_data(Id id, const std::string &address, bool with_si
connection->send_message(msg);
}
void WorkerConnection::remove_data(Id id)
{
loomcomm::WorkerCommand msg;
msg.set_type(loomcomm::WorkerCommand_Type_REMOVE);
msg.set_id(id);
connection->send_message(msg);
}
......@@ -23,6 +23,7 @@ public:
void send_task(TaskNode *task);
void send_data(loom::Id id, const std::string &address, bool with_size);
void remove_data(loom::Id id);
std::unordered_map<loom::Id, TaskNode*>& get_tasks() {
return tasks;
......
......@@ -49,7 +49,7 @@ void RunTask::start(DataVector &inputs)
std::string work_dir = worker.get_run_dir(get_id());
options.cwd = work_dir.c_str();
if (llog->level() == spdlog::level::debug) {
if (llog->level() <= spdlog::level::debug) {
std::stringstream s;
s << msg.args(0);
for (int i = 1; i < args_size; i++) {
......@@ -74,7 +74,7 @@ void RunTask::start(DataVector &inputs)
assert(map.input_index() >= 0 && map.input_index() < input_size);
auto& input = *inputs[map.input_index()];
std::string path = get_path(map.filename());
std::string filename = input->get_filename(worker);
std::string filename = input->get_filename();
assert(!filename.empty());
llog->debug("Creating symlink of '{}'", map.filename());
if (symlink(filename.c_str(), path.c_str())) {
......@@ -156,10 +156,10 @@ void RunTask::_on_close(uv_handle_t *handle)
continue;
}
auto data = std::make_unique<RawData>();
data->assign_file_id();
data->assign_filename(task->worker);
std::string path = task->get_path(map.filename());
std::string data_path = data->get_filename(task->worker);
std::string data_path = data->get_filename();
llog->debug("Storing file '{}'' as index={}", map.filename(), i);
//data->create(task->worker, 10);
if (unlikely(rename(path.c_str(),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment