Commit a127abdf authored by Stanislav Bohm's avatar Stanislav Bohm

RF: RawData takes only work_dir for initiazation (now Worker)

parent 053dc1eb
......@@ -48,7 +48,7 @@ std::shared_ptr<Data> Index::get_at_index(size_t index)
size_t size = indices[index + 1] - addr;
auto data = std::make_shared<RawData>();
char *p2 = data->init_empty(worker, size);
char *p2 = data->init_empty(worker.get_work_dir(), size);
memcpy(p2, p1, size);
return data;
}
......@@ -76,7 +76,7 @@ std::shared_ptr<Data> Index::get_slice(size_t from, size_t to)
size_t size = to_addr - from_addr;
auto data = std::make_shared<RawData>();
char *p2 = data->init_empty(worker, size);
char *p2 = data->init_empty(worker.get_work_dir(), size);
memcpy(p2, p1, size);
return data;
}
......
......@@ -42,12 +42,12 @@ std::string RawData::get_type_name() const
return RawDataUnpacker::get_type_name();
}
char* RawData::init_empty(Worker &worker, size_t size)
char* RawData::init_empty(const std::string &work_dir, size_t size)
{
assert(data == nullptr);
if (filename.empty()) {
assign_filename(worker);
assign_filename(work_dir);
}
this->size = size;
......@@ -71,20 +71,20 @@ char* RawData::init_empty(Worker &worker, size_t size)
return data;
}
void RawData::assign_filename(Worker &worker)
void RawData::assign_filename(const std::string &work_dir)
{
assert(filename.empty());
int file_id = file_id_counter++;
std::stringstream s;
s << worker.get_work_dir() << "data/" << file_id;
s << work_dir << "data/" << file_id;
filename = s.str();
}
void RawData::init_from_file(Worker &worker)
void RawData::init_from_file(const std::string &work_dir)
{
assert(data == nullptr);
if (filename.empty()) {
assign_filename(worker);
assign_filename(work_dir);
}
size = file_size(filename.c_str());
}
......@@ -137,16 +137,16 @@ std::string RawData::get_info()
return "RawData file=" + filename;
}
void RawData::init_from_string(Worker &worker, const std::string &str)
void RawData::init_from_string(const std::string &work_dir, const std::string &str)
{
auto size = str.size();
char *mem = init_empty(worker, size);
char *mem = init_empty(work_dir, size);
memcpy(mem, str.c_str(), size);
}
void RawData::init_from_mem(Worker &worker, const void *ptr, size_t size)
void RawData::init_from_mem(const std::string &work_dir, const void *ptr, size_t size)
{
char *mem = init_empty(worker, size);
char *mem = init_empty(work_dir, size);
memcpy(mem, ptr, size);
}
......@@ -165,7 +165,7 @@ bool RawDataUnpacker::init(Worker &worker, Connection &connection, const loomcom
this->data = std::make_shared<RawData>();
RawData &data = static_cast<RawData&>(*this->data);
size_t size = msg.size();
pointer = data.init_empty(worker, size);
pointer = data.init_empty(worker.get_work_dir(), size);
if (size == 0) {
return true;
}
......
......@@ -27,11 +27,11 @@ public:
std::string get_info();
char* init_empty(Worker &worker, size_t size);
void init_from_string(Worker &worker, const std::string &str);
void init_from_mem(Worker &worker, const void *ptr, size_t size);
void assign_filename(Worker &worker);
void init_from_file(Worker &worker);
char* init_empty(const std::string &work_dir, size_t size);
void init_from_string(const std::string &work_dir, const std::string &str);
void init_from_mem(const std::string &work_dir, const void *ptr, size_t size);
void init_from_file(const std::string &work_dir);
void assign_filename(const std::string &work_dir);
std::string get_filename() const;
......
#include "basetasks.h"
#include "../data/rawdata.h"
#include "../worker.h"
//#include "libloom/log.h"
......@@ -34,7 +35,7 @@ void SizeTask::start(DataVector &inputs)
}
std::shared_ptr<Data> output = std::make_shared<RawData>();
RawData &data = static_cast<RawData&>(*output);
memcpy(data.init_empty(worker, sizeof(size_t)), &size, sizeof(size_t));
memcpy(data.init_empty(worker.get_work_dir(), sizeof(size_t)), &size, sizeof(size_t));
finish(output);
}
......@@ -46,6 +47,6 @@ void LengthTask::start(DataVector &inputs)
}
std::shared_ptr<Data> output = std::make_shared<RawData>();
RawData &data = static_cast<RawData&>(*output);
memcpy(data.init_empty(worker, sizeof(size_t)), &length, sizeof(size_t));
memcpy(data.init_empty(worker.get_work_dir(), sizeof(size_t)), &length, sizeof(size_t));
finish(output);
}
......@@ -3,6 +3,7 @@
#include "../data/rawdata.h"
#include "../log.h"
#include "../compat.h"
#include "../worker.h"
#include "python_wrapper.h"
#include <Python.h>
......@@ -143,7 +144,7 @@ std::shared_ptr<Data> PyCallTask::run()
assert(ptr);
auto output = std::make_shared<RawData>();
output->init_from_mem(worker, ptr, size);
output->init_from_mem(worker.get_work_dir(), ptr, size);
Py_DECREF(result);
PyGILState_Release(gstate);
......@@ -155,7 +156,7 @@ std::shared_ptr<Data> PyCallTask::run()
assert(ptr);
auto output = std::make_shared<RawData>();
output->init_from_mem(worker, ptr, size);
output->init_from_mem(worker.get_work_dir(), ptr, size);
Py_DECREF(result);
PyGILState_Release(gstate);
......
......@@ -5,6 +5,7 @@
#include "libloom/data/index.h"
#include "libloom/data/externfile.h"
#include "libloom/log.h"
#include "libloom/worker.h"
#include <string.h>
......@@ -14,7 +15,7 @@ using namespace loom;
void ConstTask::start(DataVector &inputs)
{
auto output = std::make_shared<RawData>();
output->init_from_string(worker, task->get_config());
output->init_from_string(worker.get_work_dir(), task->get_config());
finish(std::static_pointer_cast<Data>(output));
}
......@@ -50,7 +51,7 @@ std::shared_ptr<Data> MergeTask::run() {
std::shared_ptr<Data> output = std::make_shared<RawData>();
RawData &data = static_cast<RawData&>(*output);
char *dst = data.init_empty(worker, size);
char *dst = data.init_empty(worker.get_work_dir(), size);
if (config.empty()) {
for (auto& data : inputs) {
......
......@@ -211,7 +211,7 @@ void RunTask::_on_close(uv_handle_t *handle)
if (output_size == 1) {
std::shared_ptr<Data> data_ptr = std::make_shared<RawData>();
RawData& data = static_cast<RawData&>(*data_ptr);
data.assign_filename(task->worker);
data.assign_filename(task->worker.get_work_dir());
std::string path = task->get_path(msg.map_outputs(0));
std::string data_path = data.get_filename();
......@@ -222,7 +222,7 @@ void RunTask::_on_close(uv_handle_t *handle)
path, data_path);
log_errno_abort("rename");
}
data.init_from_file(task->worker);
data.init_from_file(task->worker.get_work_dir());
task->finish(data_ptr);
return;
}
......@@ -234,7 +234,7 @@ void RunTask::_on_close(uv_handle_t *handle)
items[i] = std::make_shared<RawData>();
RawData& data = static_cast<RawData&>(*items[i]);
data.assign_filename(task->worker);
data.assign_filename(task->worker.get_work_dir());
std::string path = task->get_path(msg.map_outputs(i));
std::string data_path = data.get_filename();
llog->debug("Storing file '{}'' as index={}", msg.map_outputs(i), i);
......@@ -245,7 +245,7 @@ void RunTask::_on_close(uv_handle_t *handle)
path, data_path);
log_errno_abort("rename");
}
data.init_from_file(task->worker);
data.init_from_file(task->worker.get_work_dir());
}
std::shared_ptr<Data> output = std::make_shared<Array>(output_size, std::move(items));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment