Commit 6d35b004 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: Index datatype

parent 4957a5ae
......@@ -34,7 +34,7 @@ class Plan(object):
TASK_DATA_CONST = "data/const"
TASK_DATA_MERGE = "data/merge"
TASK_DATA_OPEN = "data/open"
TASK_DATA_SPLIT_LINES = "data/split_lines"
TASK_DATA_SPLIT = "data/split"
TASK_ARRAY_MAKE = "array/make"
......@@ -80,10 +80,9 @@ class Plan(object):
task.config = filename
return self.add(task)
def task_split_lines(self, input, start, end):
def task_split(self, input, char=None):
task = Task()
task.task_type = self.TASK_DATA_SPLIT_LINES
task.config = self.u64u64.pack(start, end)
task.task_type = self.TASK_DATA_SPLIT
task.inputs = (input,)
return self.add(task)
......
......@@ -6,6 +6,8 @@ add_library(libloom
data/rawdata.cpp
data/array.h
data/array.cpp
data/index.h
data/index.cpp
connection.cpp
connection.h
worker.cpp
......
......@@ -8,7 +8,7 @@ Data::~Data() {
}
size_t Data::get_length() const
size_t Data::get_length()
{
return 0;
}
......
......@@ -23,7 +23,7 @@ public:
virtual int get_type_id() = 0;
virtual size_t get_size() = 0;
virtual std::string get_info() = 0;
virtual size_t get_length() const;
virtual size_t get_length();
virtual std::shared_ptr<Data> get_at_index(size_t index);
virtual std::shared_ptr<Data> get_slice(size_t from, size_t to);
......
......@@ -105,7 +105,7 @@ bool ArrayUnpacker::on_message(Connection &connection, const char *data, size_t
return false;
}
loomcomm::Data msg;
msg.ParseFromArray(data, size);
assert(msg.ParseFromArray(data, size));
unpacker = worker->unpack(msg.type_id());
if (unpacker->init(*worker, connection, msg)) {
return finish_data();
......
......@@ -16,8 +16,8 @@ public:
return TYPE_ID;
}
size_t get_length() const {
return length;
size_t get_length() {
return length;
}
size_t get_size();
......
#include "index.h"
#include "rawdata.h"
#include "../log.h"
#include "../sendbuffer.h"
#include "../worker.h"
using namespace loom;
Index::Index(Worker &worker, std::shared_ptr<Data> &data, size_t length, std::unique_ptr<size_t[]> indices)
: worker(worker), data(data), length(length), indices(std::move(indices))
{
}
Index::~Index()
{
llog->debug("Disposing index");
}
size_t Index::get_length()
{
return length;
}
size_t Index::get_size()
{
return data->get_size() + sizeof(size_t) * (length + 1);
}
std::string Index::get_info()
{
return "Index";
}
std::shared_ptr<Data> Index::get_at_index(size_t index)
{
assert (index < length);
size_t addr = indices[index];
char *p1 = this->data->get_raw_data(worker);
p1 += addr;
size_t size = indices[index + 1] - addr;
auto data = std::make_shared<RawData>();
data->init_empty_file(worker, size);
char *p2 = data->get_raw_data(worker);
memcpy(p2, p1, size);
return data;
}
std::shared_ptr<Data> Index::get_slice(size_t from, size_t to)
{
if (from > length) {
from = length;
}
if (to > length) {
to = length;
}
if (from > to) {
from = to;
}
size_t from_addr = indices[from];
size_t to_addr = indices[to];
char *p1 = this->data->get_raw_data(worker);
p1 += from_addr;
size_t size = to_addr - from_addr;
auto data = std::make_shared<RawData>();
data->init_empty_file(worker, size);
char *p2 = data->get_raw_data(worker);
memcpy(p2, p1, size);
return data;
}
void Index::serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr)
{
buffer.add(data_ptr, (char*) &indices[0], sizeof(size_t) * (length + 1));
data->serialize(worker, buffer, data);
}
IndexUnpacker::~IndexUnpacker()
{
}
bool IndexUnpacker::init(Worker &worker, Connection &connection, const loomcomm::Data &msg)
{
this->worker = &worker;
length = msg.length();
indices = std::make_unique<size_t[]>(length + 1);
indices_ptr = (char*) &indices[0];
connection.set_raw_read((length + 1) * sizeof(size_t));
return false;
}
bool IndexUnpacker::on_message(Connection &connection, const char *data, size_t size)
{
if (unpacker) {
if (unpacker->on_message(connection, data, size)) {
finish_data();
return true;
}
return false;
}
loomcomm::Data msg;
assert(msg.ParseFromArray(data, size));
unpacker = worker->unpack(msg.type_id());
if (unpacker->init(*worker, connection, msg)) {
finish_data();
return true;
} else {
return false;
}
}
void IndexUnpacker::on_data_chunk(const char *data, size_t size)
{
if (unpacker) {
unpacker->on_data_chunk(data, size);
return;
}
memcpy(indices_ptr, data, size);
indices_ptr += size;
}
bool IndexUnpacker::on_data_finish(Connection &connection)
{
if (unpacker) {
if (unpacker->on_data_finish(connection)) {
finish_data();
return true;
}
}
return false;
}
void IndexUnpacker::finish_data()
{
std::shared_ptr<Data> &inner_data = unpacker->get_data();
data = std::make_shared<Index>(*worker, inner_data, length, std::move(indices));
}
#ifndef LIBLOOM_INDEX_H
#define LIBLOOM_INDEX_H
#include "../data.h"
#include <memory>
namespace loom {
class Worker;
class Index : public Data {
public:
static const int TYPE_ID = 500;
Index(Worker &worker,
std::shared_ptr<Data> &data,
size_t length,
std::unique_ptr<size_t[]> indices);
~Index();
int get_type_id() {
return TYPE_ID;
}
size_t get_length();
size_t get_size();
std::string get_info();
std::shared_ptr<Data> get_at_index(size_t index);
std::shared_ptr<Data> get_slice(size_t from, size_t to);
void serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr);
private:
Worker &worker;
std::shared_ptr<Data> data;
size_t length;
std::unique_ptr<size_t[]> indices;
};
class IndexUnpacker : public DataUnpacker
{
public:
~IndexUnpacker();
bool init(Worker &worker, Connection &connection, const loomcomm::Data &msg);
bool on_message(Connection &connection, const char *data, size_t size);
void on_data_chunk(const char *data, size_t size);
bool on_data_finish(Connection &connection);
protected:
void finish_data();
std::unique_ptr<size_t[]> indices;
char *indices_ptr;
Worker *worker = nullptr;
size_t length;
std::unique_ptr<DataUnpacker> unpacker;
};
}
#endif // LIBLOOM_INDEX_H
......@@ -58,7 +58,7 @@ void InterConnection::on_message(const char *buffer, size_t size)
if (data_id > 0) {
assert(data_unpacker.get() == nullptr);
loomcomm::Data msg;
msg.ParseFromArray(buffer, size);
assert(msg.ParseFromArray(buffer, size));
data_unpacker = worker.unpack(msg.type_id());
if (data_unpacker->init(worker, connection, msg)) {
finish_data();
......@@ -66,7 +66,7 @@ void InterConnection::on_message(const char *buffer, size_t size)
return;
} else if (address.size()) {
loomcomm::DataPrologue msg;
msg.ParseFromArray(buffer, size);
assert(msg.ParseFromArray(buffer, size));
auto id = msg.id();
data_id = id;
if (msg.has_data_size()) {
......@@ -77,7 +77,7 @@ void InterConnection::on_message(const char *buffer, size_t size)
} else {
// First message
loomcomm::Announce msg;
msg.ParseFromArray(buffer, size);
assert(msg.ParseFromArray(buffer, size));
std::stringstream s;
address = make_address(connection.get_peername(), msg.port());
llog->debug("Interconnection from worker {} accepted", address);
......
......@@ -5,6 +5,7 @@
#include "types.h"
#include "data/rawdata.h"
#include "data/array.h"
#include "data/index.h"
#include <stdlib.h>
#include <sstream>
......@@ -71,6 +72,8 @@ Worker::Worker(uv_loop_t *loop,
add_unpacker(Array::TYPE_ID,
std::make_unique<SimpleUnpackFactory<ArrayUnpacker>>());
add_unpacker(Index::TYPE_ID,
std::make_unique<SimpleUnpackFactory<IndexUnpacker>>());
resource_cpus = 1;
}
......
......@@ -127,7 +127,7 @@ int main(int argc, char **argv)
worker.add_task_factory(
std::make_unique<SimpleTaskFactory<OpenTask>>("data/open"));
worker.add_task_factory(
std::make_unique<SimpleTaskFactory<LineSplitTask>>("data/split_lines"));
std::make_unique<SimpleTaskFactory<SplitTask>>("data/split"));
// Arrays
worker.add_task_factory(
......
#include "rawdatatasks.h"
#include "libloom/compat.h"
#include "libloom/databuilder.h"
#include "libloom/data/rawdata.h"
#include "libloom/data/index.h"
#include "libloom/data/externfile.h"
#include "libloom/log.h"
#include <string.h>
using namespace loom;
void ConstTask::start(DataVector &inputs)
{
auto& config = task->get_config();
std::shared_ptr<Data> output = std::make_shared<RawData>();
RawData &data = static_cast<RawData&>(*output);
memcpy(data.init_empty_file(worker, config.size()), config.c_str(), config.size());
finish(output);
}
void MergeTask::start(DataVector &inputs) {
size_t size = 0;
for (auto& data : inputs) {
size += (*data)->get_size();
}
std::shared_ptr<Data> output = std::make_shared<RawData>();
RawData &data = static_cast<RawData&>(*output);
data.init_empty_file(worker, size);
char *dst = output->get_raw_data(worker);
for (auto& data : inputs) {
char *mem = (*data)->get_raw_data(worker);
size_t size = (*data)->get_size();
assert(mem || size == 0);
memcpy(dst, mem, size);
dst += size;
}
finish(output);
}
void OpenTask::start(DataVector &inputs)
{
std::shared_ptr<Data> data = std::make_shared<ExternFile>(task->get_config());
finish(data);
}
void SplitTask::start(DataVector &inputs)
{
assert(inputs.size() == 1);
char split_char = '\n';
std::vector<size_t> indices;
auto input = *inputs[0];
char *ptr = input->get_raw_data(worker);
size_t size = input->get_size();
indices.push_back(0);
for (size_t i = 0; i < size - 1; i++) {
if (ptr[i] == split_char) {
indices.push_back(i + 1);
}
}
indices.push_back(size);
auto indices_data = std::make_unique<size_t[]>(indices.size());
memcpy(&indices_data[0], &indices[0], sizeof(size_t) * indices.size());
std::shared_ptr<Data> result = std::make_shared<Index>(worker, input, indices.size() - 1, std::move(indices_data));
finish(result);
}
#ifndef LOOM_WORKER_RAWDATATASKS_H
#define LOOM_WORKER_RAWDATATASKS_H
#include "libloom/taskinstance.h"
class ConstTask : public loom::TaskInstance
{
public:
using TaskInstance::TaskInstance;
void start(loom::DataVector &inputs);
};
class MergeTask : public loom::TaskInstance
{
public:
using TaskInstance::TaskInstance;
void start(loom::DataVector &inputs);
};
class OpenTask : public loom::TaskInstance
{
public:
using TaskInstance::TaskInstance;
void start(loom::DataVector &inputs);
};
class SplitTask : public loom::TaskInstance
{
public:
using TaskInstance::TaskInstance;
void start(loom::DataVector &inputs);
};
#endif // LOOM_WORKER_RAWDATATASKS_H
......@@ -15,11 +15,11 @@ def test_cv_iris(loom_env):
loom_env.info = True
p = loom_env.plan()
a = p.task_open(IRIS_DATA)
data = p.task_open(IRIS_DATA)
data = p.task_run(("sort", "--random-sort", "-"), [(data, None)])
lines = p.task_split(data)
b = p.task_run(("sort", "--random-sort", "-"), [(a, None)])
chunks = [p.task_split_lines(b, i * CHUNK_SIZE, (i + 1) * CHUNK_SIZE)
chunks = [p.task_slice(lines, i * CHUNK_SIZE, (i + 1) * CHUNK_SIZE)
for i in xrange(CHUNKS)]
trainsets = [p.task_merge(chunks[:i] + chunks[i + 1:])
......
......@@ -168,9 +168,10 @@ def test_open_and_splitlines(loom_env):
p = loom_env.plan()
a = p.task_open(FILE2)
c1 = p.task_split_lines(a, 2, 6)
c2 = p.task_split_lines(a, 0, 6)
c3 = p.task_split_lines(a, 3, 60)
lines = p.task_split(a)
c1 = p.task_slice(lines, 2, 6)
c2 = p.task_slice(lines, 0, 6)
c3 = p.task_slice(lines, 3, 60)
result1, result2, result3 = loom_env.submit(p, [c1,c2,c3])
expect1 = "\n".join("Line {}".format(i) for i in xrange(3, 7)) + "\n"
assert result1 == expect1
......@@ -180,3 +181,23 @@ def test_open_and_splitlines(loom_env):
expect3 = "\n".join("Line {}".format(i) for i in xrange(4, 13)) + "\n"
assert result3 == expect3
def test_split(loom_env):
loom_env.start(1)
text = "Line1\nLine2\nLine3\nLine4"
p = loom_env.plan()
a = p.task_const(text)
b = p.task_split(a)
c = p.task_get(b, 1)
d = p.task_get(b, 3)
e = p.task_slice(b, 0, 2)
f = p.task_slice(b, 10, 20)
r1, r2, r3, r4 = loom_env.submit(p, (c, d, e, f))
assert r1 == "Line2\n"
assert r2 == "Line4"
assert r3 == "Line1\nLine2\n"
assert r4 == ""
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment