Commit 29e89444 authored by Stanislav Bohm's avatar Stanislav Bohm

RF: Small API refactoring

parent 1239c6ca
......@@ -48,7 +48,7 @@ std::shared_ptr<Data> Index::get_at_index(size_t index)
size_t size = indices[index + 1] - addr;
auto data = std::make_shared<RawData>();
data->init_empty_file(worker, size);
data->init_empty(worker, size);
char *p2 = data->get_raw_data(worker);
memcpy(p2, p1, size);
return data;
......@@ -77,7 +77,7 @@ std::shared_ptr<Data> Index::get_slice(size_t from, size_t to)
size_t size = to_addr - from_addr;
auto data = std::make_shared<RawData>();
data->init_empty_file(worker, size);
data->init_empty(worker, size);
char *p2 = data->get_raw_data(worker);
memcpy(p2, p1, size);
return data;
......
......@@ -51,7 +51,7 @@ std::string RawData::get_type_name() const
return data;
}*/
char* RawData::init_empty_file(Worker &worker, size_t size)
char* RawData::init_empty(Worker &worker, size_t size)
{
assert(data == nullptr);
......@@ -141,6 +141,13 @@ std::string RawData::get_info()
return "RawData";
}
void RawData::init_from_string(Worker &worker, const std::string &str)
{
auto size = str.size();
char *mem = init_empty(worker, size);
memcpy(mem, str.c_str(), size);
}
void RawData::serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr)
{
buffer.add(data_ptr, get_raw_data(worker), size);
......@@ -156,7 +163,7 @@ bool RawDataUnpacker::init(Worker &worker, Connection &connection, const loomcom
this->data = std::make_shared<RawData>();
RawData &data = static_cast<RawData&>(*this->data);
size_t size = msg.size();
pointer = data.init_empty_file(worker, size);
pointer = data.init_empty(worker, size);
if (size == 0) {
return true;
}
......
......@@ -27,7 +27,8 @@ public:
std::string get_info();
char* init_empty_file(Worker &worker, size_t size);
char* init_empty(Worker &worker, size_t size);
void init_from_string(Worker &worker, const std::string &str);
void assign_filename(Worker &worker);
void init_from_file(Worker &worker);
......
......@@ -43,7 +43,7 @@ void TaskInstance::fail_libuv(const std::string &error_msg, int error_code)
fail(s.str());
}
void TaskInstance::finish(std::shared_ptr<Data> &output)
void TaskInstance::finish(const std::shared_ptr<Data> &output)
{
assert(output);
worker.publish_data(get_id(), output);
......
......@@ -46,7 +46,8 @@ public:
protected:
void fail(const std::string &error_msg);
void fail_libuv(const std::string &error_msg, int error_code);
void finish(std::shared_ptr<Data> &output);
void finish(const std::shared_ptr<Data> &output);
Worker &worker;
std::unique_ptr<Task> task;
......
......@@ -34,7 +34,7 @@ void SizeTask::start(DataVector &inputs)
}
std::shared_ptr<Data> output = std::make_shared<RawData>();
RawData &data = static_cast<RawData&>(*output);
memcpy(data.init_empty_file(worker, sizeof(size_t)), &size, sizeof(size_t));
memcpy(data.init_empty(worker, sizeof(size_t)), &size, sizeof(size_t));
finish(output);
}
......@@ -46,6 +46,6 @@ void LengthTask::start(DataVector &inputs)
}
std::shared_ptr<Data> output = std::make_shared<RawData>();
RawData &data = static_cast<RawData&>(*output);
memcpy(data.init_empty_file(worker, sizeof(size_t)), &length, sizeof(size_t));
memcpy(data.init_empty(worker, sizeof(size_t)), &length, sizeof(size_t));
finish(output);
}
......@@ -13,11 +13,9 @@ using namespace loom;
void ConstTask::start(DataVector &inputs)
{
auto& config = task->get_config();
std::shared_ptr<Data> output = std::make_shared<RawData>();
RawData &data = static_cast<RawData&>(*output);
memcpy(data.init_empty_file(worker, config.size()), config.c_str(), config.size());
finish(output);
auto output = std::make_shared<RawData>();
output->init_from_string(worker, task->get_config());
finish(std::static_pointer_cast<Data>(output));
}
/** If there are more then 50 input or size is bigger then 20000,
......@@ -44,7 +42,7 @@ std::shared_ptr<Data> MergeTask::run() {
}
std::shared_ptr<Data> output = std::make_shared<RawData>();
RawData &data = static_cast<RawData&>(*output);
data.init_empty_file(worker, size);
data.init_empty(worker, size);
char *dst = output->get_raw_data(worker);
for (auto& data : inputs) {
......
......@@ -216,7 +216,7 @@ void Worker::start_task(std::unique_ptr<Task> task)
t->start();
}*/
void Worker::publish_data(Id id, std::shared_ptr<Data> &data)
void Worker::publish_data(Id id, const std::shared_ptr<Data> &data)
{
llog->debug("Publishing data id={} size={}", id, data->get_size());
public_data[id] = data;
......
......@@ -67,7 +67,7 @@ public:
void task_finished(TaskInstance &task_instance, Data &data);
void task_failed(TaskInstance &task_instance, const std::string &error_msg);
void publish_data(Id id, std::shared_ptr<Data> &data);
void publish_data(Id id, const std::shared_ptr<Data> &data);
void remove_data(Id id);
bool has_data(Id id) const
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment