Commit 4957a5ae authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: New basic tasks

parent b76e74a6
......@@ -28,15 +28,20 @@ class Task(object):
class Plan(object):
TASK_BASE_GET = "base/get"
TASK_BASE_SLICE = "base/slice"
TASK_DATA_CONST = "data/const"
TASK_DATA_MERGE = "data/merge"
TASK_DATA_OPEN = "data/open"
TASK_DATA_SPLIT_LINES = "data/split_lines"
TASK_ARRAY_MAKE = "array/make"
TASK_ARRAY_GET = "array/get"
TASK_RUN = "run/run"
TASK_SCHEDULER_DSLICE = "scheduler/dslice"
u64 = struct.Struct("<Q")
u64u64 = struct.Struct("<QQ")
......@@ -51,6 +56,12 @@ class Plan(object):
self.task_types.add(task.task_type)
return task
def task_dslice(self, input):
task = Task()
task.task_type = self.TASK_SCHEDULER_DSLICE
task.inputs = (input,)
return self.add(task)
def task_const(self, data):
task = Task()
task.task_type = self.TASK_DATA_CONST
......@@ -103,13 +114,20 @@ class Plan(object):
task.inputs = inputs
return self.add(task)
def task_array_get(self, input, index):
def task_get(self, input, index):
task = Task()
task.task_type = self.TASK_ARRAY_GET
task.task_type = self.TASK_BASE_GET
task.inputs = (input,)
task.config = self.u64.pack(index)
return self.add(task)
def task_slice(self, input, start, end):
task = Task()
task.task_type = self.TASK_BASE_SLICE
task.inputs = (input,)
task.config = self.u64u64.pack(start, end)
return self.add(task)
def create_message(self):
task_types = list(self.task_types)
task_types.sort()
......
......@@ -13,6 +13,16 @@ size_t Data::get_length() const
return 0;
}
std::shared_ptr<Data> Data::get_at_index(size_t index)
{
assert(0);
}
std::shared_ptr<Data> Data::get_slice(size_t from, size_t to)
{
assert(0);
}
void Data::serialize(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr)
{
loomcomm::Data msg;
......
......@@ -24,6 +24,8 @@ public:
virtual size_t get_size() = 0;
virtual std::string get_info() = 0;
virtual size_t get_length() const;
virtual std::shared_ptr<Data> get_at_index(size_t index);
virtual std::shared_ptr<Data> get_slice(size_t from, size_t to);
void serialize(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr);
virtual void init_message(Worker &worker, loomcomm::Data &msg) const;
......
......@@ -4,18 +4,20 @@
#include "../worker.h"
#include "../log.h"
loom::Array::Array(size_t length, std::unique_ptr<std::shared_ptr<Data>[]> items)
using namespace loom;
Array::Array(size_t length, std::unique_ptr<std::shared_ptr<Data>[]> items)
: length(length), items(std::move(items))
{
}
loom::Array::~Array()
Array::~Array()
{
llog->debug("Disposing array");
}
size_t loom::Array::get_size()
size_t Array::get_size()
{
size_t size = 0;
for (size_t i = 0; i < length; i++) {
......@@ -24,30 +26,62 @@ size_t loom::Array::get_size()
return size;
}
std::string loom::Array::get_info()
std::string Array::get_info()
{
return "Array";
}
std::shared_ptr<Data> Array::get_slice(size_t from, size_t to)
{
if (from > length) {
from = length;
}
if (to > length) {
to = length;
}
size_t size;
if (from >= to) {
size = 0;
} else {
size = to - from;
}
auto items = std::make_unique<std::shared_ptr<Data>[]>(size);
size_t j = 0;
for (size_t i = from; i < to; i++, j++) {
items[j] = this->items[i];
}
return std::make_shared<Array>(size, std::move(items));
}
std::shared_ptr<Data> &Array::get_ref_at_index(size_t index)
{
return "Array";
assert(index < length);
return items[index];
}
std::shared_ptr<loom::Data>& loom::Array::get_at_index(size_t index)
std::shared_ptr<Data> Array::get_at_index(size_t index)
{
assert(index < length);
return items[index];
}
void loom::Array::serialize_data(loom::Worker &worker, loom::SendBuffer &buffer, std::shared_ptr<loom::Data> &data_ptr)
void Array::serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr)
{
for (size_t i = 0; i < length; i++) {
items[i]->serialize(worker, buffer, items[i]);
}
}
loom::ArrayUnpacker::~ArrayUnpacker()
ArrayUnpacker::~ArrayUnpacker()
{
}
bool loom::ArrayUnpacker::init(loom::Worker &worker, loom::Connection &connection, const loomcomm::Data &msg)
bool ArrayUnpacker::init(Worker &worker, Connection &connection, const loomcomm::Data &msg)
{
length = msg.length();
index = 0;
......@@ -61,7 +95,7 @@ bool loom::ArrayUnpacker::init(loom::Worker &worker, loom::Connection &connectio
}
}
bool loom::ArrayUnpacker::on_message(loom::Connection &connection, const char *data, size_t size)
bool ArrayUnpacker::on_message(Connection &connection, const char *data, size_t size)
{
if (unpacker) {
bool r = unpacker->on_message(connection, data, size);
......@@ -80,12 +114,12 @@ bool loom::ArrayUnpacker::on_message(loom::Connection &connection, const char *d
}
}
void loom::ArrayUnpacker::on_data_chunk(const char *data, size_t size)
void ArrayUnpacker::on_data_chunk(const char *data, size_t size)
{
unpacker->on_data_chunk(data, size);;
}
bool loom::ArrayUnpacker::on_data_finish(loom::Connection &connection)
bool ArrayUnpacker::on_data_finish(Connection &connection)
{
bool r = unpacker->on_data_finish(connection);
if (r) {
......@@ -94,12 +128,12 @@ bool loom::ArrayUnpacker::on_data_finish(loom::Connection &connection)
return false;
}
void loom::ArrayUnpacker::finish()
void ArrayUnpacker::finish()
{
data = std::make_shared<Array>(length, std::move(items));
}
bool loom::ArrayUnpacker::finish_data()
bool ArrayUnpacker::finish_data()
{
items[index] = unpacker->get_data();
unpacker.reset();
......
......@@ -22,7 +22,10 @@ public:
size_t get_size();
std::string get_info();
std::shared_ptr<Data>& get_at_index(size_t index);
std::shared_ptr<Data> get_at_index(size_t index);
std::shared_ptr<Data> get_slice(size_t from, size_t to);
std::shared_ptr<Data>& get_ref_at_index(size_t index);
void serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr);
......
......@@ -6,6 +6,8 @@ add_executable(loom-worker
runtask.h
basictasks.cpp
basictasks.h
rawdatatasks.cpp
rawdatatasks.h
arraytasks.cpp
arraytasks.h
main.cpp)
......
......@@ -14,14 +14,3 @@ void ArrayMakeTask::start(loom::DataVector &inputs)
std::shared_ptr<Data> output = std::make_shared<Array>(size, std::move(items));
finish(output);
}
void ArrayGetTask::start(loom::DataVector &inputs)
{
assert(inputs.size() == 1);
Array *array = dynamic_cast<Array*>((*inputs[0]).get());
assert(array);
assert(task->get_config().size() == sizeof(size_t));
const size_t *index = reinterpret_cast<const size_t*>(task->get_config().data());
finish(array->get_at_index(*index));
}
......@@ -10,13 +10,4 @@ public:
void start(loom::DataVector &inputs);
};
class ArrayGetTask : public loom::TaskInstance
{
public:
using TaskInstance::TaskInstance;
void start(loom::DataVector &inputs);
};
#endif // LOOM_WORKER_ARRAYTASKS_H
#include "basictasks.h"
#include "libloom/compat.h"
#include "libloom/databuilder.h"
#include "libloom/data/rawdata.h"
#include "libloom/data/externfile.h"
#include "basictasks.h"
#include <string.h>
//#include "libloom/log.h"
using namespace loom;
void ConstTask::start(DataVector &inputs)
{
auto& config = task->get_config();
std::shared_ptr<Data> output = std::make_shared<RawData>();
RawData &data = static_cast<RawData&>(*output);
memcpy(data.init_empty_file(worker, config.size()), config.c_str(), config.size());
finish(output);
}
void MergeTask::start(DataVector &inputs) {
size_t size = 0;
for (auto& data : inputs) {
size += (*data)->get_size();
}
std::shared_ptr<Data> output = std::make_shared<RawData>();
RawData &data = static_cast<RawData&>(*output);
data.init_empty_file(worker, size);
char *dst = output->get_raw_data(worker);
for (auto& data : inputs) {
char *mem = (*data)->get_raw_data(worker);
size_t size = (*data)->get_size();
assert(mem || size == 0);
memcpy(dst, mem, size);
dst += size;
}
finish(output);
}
void OpenTask::start(DataVector &inputs)
void GetTask::start(loom::DataVector &inputs)
{
std::shared_ptr<Data> data = std::make_shared<ExternFile>(task->get_config());
finish(data);
assert(inputs.size() == 1);
assert(task->get_config().size() == sizeof(size_t));
const size_t *index = reinterpret_cast<const size_t*>(task->get_config().data());
std::shared_ptr<Data> &input = *(inputs[0]);
auto result = input->get_at_index(*index);
finish(result);
}
void LineSplitTask::start(DataVector &inputs)
void SliceTask::start(DataVector &inputs)
{
assert(inputs.size() == 1);
assert(sizeof(uint64_t) * 2 == task->get_config().size());
const uint64_t *indices = reinterpret_cast<const uint64_t*>(task->get_config().c_str());
uint64_t start = indices[0];
uint64_t count = indices[1] - start;
auto input = *inputs[0];
char *start_ptr = input->get_raw_data(worker);
char *end_ptr = start_ptr + input->get_size();
if (start) {
while (start_ptr != end_ptr) {
if (*start_ptr == '\n') {
start--;
if (start == 0) {
start_ptr++;
break;
}
}
start_ptr++;
}
}
char *data_end = start_ptr;
if (count) {
while (data_end != end_ptr) {
if (*data_end == '\n') {
count--;
if (count == 0) {
data_end++;
break;
}
}
data_end++;
}
}
size_t data_size = data_end - start_ptr;
std::shared_ptr<Data> output = std::make_shared<RawData>();
RawData &data = static_cast<RawData&>(*output);
data.init_empty_file(worker, data_size);
char *dst = output->get_raw_data(worker);
memcpy(dst, start_ptr, data_size);
finish(output);
assert(task->get_config().size() == sizeof(size_t) * 2);
const size_t *index = reinterpret_cast<const size_t*>(task->get_config().data());
std::shared_ptr<Data> &input = *(inputs[0]);
auto result = input->get_slice(index[0], index[1]);
finish(result);
}
......@@ -3,7 +3,7 @@
#include "libloom/taskinstance.h"
class ConstTask : public loom::TaskInstance
class GetTask : public loom::TaskInstance
{
public:
using TaskInstance::TaskInstance;
......@@ -11,28 +11,11 @@ public:
};
class MergeTask : public loom::TaskInstance
class SliceTask : public loom::TaskInstance
{
public:
using TaskInstance::TaskInstance;
void start(loom::DataVector &inputs);
};
class OpenTask : public loom::TaskInstance
{
public:
using TaskInstance::TaskInstance;
void start(loom::DataVector &inputs);
};
class LineSplitTask : public loom::TaskInstance
{
public:
using TaskInstance::TaskInstance;
void start(loom::DataVector &inputs);
};
#endif // LOOM_WORKER_BASICTASKS_H
#include "runtask.h"
#include "basictasks.h"
#include "rawdatatasks.h"
#include "arraytasks.h"
#include "libloom/worker.h"
......@@ -112,9 +113,13 @@ int main(int argc, char **argv)
config.work_dir);
loom::llog->info("Worker started; listening on port {}", worker.get_listen_port());
// Basic
// Base
worker.add_task_factory(
std::make_unique<SimpleTaskFactory<RunTask>>("run/run"));
std::make_unique<SimpleTaskFactory<GetTask>>("base/get"));
worker.add_task_factory(
std::make_unique<SimpleTaskFactory<SliceTask>>("base/slice"));
// RawData
worker.add_task_factory(
std::make_unique<SimpleTaskFactory<ConstTask>>("data/const"));
worker.add_task_factory(
......@@ -127,8 +132,10 @@ int main(int argc, char **argv)
// Arrays
worker.add_task_factory(
std::make_unique<SimpleTaskFactory<ArrayMakeTask>>("array/make"));
// Run
worker.add_task_factory(
std::make_unique<SimpleTaskFactory<ArrayGetTask>>("array/get"));
std::make_unique<SimpleTaskFactory<RunTask>>("run/run"));
worker.set_cpus(config.cpus);
uv_run(&loop, UV_RUN_DEFAULT);
......
......@@ -12,10 +12,10 @@ def test_make_array(loom_env):
c = p.task_const("")
d = p.task_array_make((a, b, c))
e0 = p.task_array_get(d, 0)
e1 = p.task_array_get(d, 1)
e2 = p.task_array_get(d, 2)
e0 = p.task_get(d, 0)
e1 = p.task_get(d, 1)
e2 = p.task_get(d, 2)
result_d, result_e0, result_e1, result_e2 = \
loom_env.submit(p, (d, e0, e1, e2))
assert result_d == ["ABC", "123456", ""]
......@@ -24,6 +24,28 @@ def test_make_array(loom_env):
assert result_e2 == ""
def test_slice_array(loom_env):
loom_env.start(1)
p = loom_env.plan()
items = [p.task_const(str(i)) for i in xrange(20)]
a = p.task_array_make(items)
e0 = p.task_slice(a, 0, 100)
e1 = p.task_slice(a, 50, 100)
e2 = p.task_slice(a, 2, 4)
e3 = p.task_slice(a, 1, 0)
e4 = p.task_slice(a, 4, 100)
r0, r1, r2, r3, r4 = \
loom_env.submit(p, (e0, e1, e2, e3, e4))
assert r0 == list(map(str, range(20)))
assert r1 == []
assert r2 == ['2', '3']
assert r3 == []
assert r4 == list(map(str, range(4, 20)))
def test_make_empty_array(loom_env):
p = loom_env.plan()
a = p.task_array_make(())
......@@ -41,7 +63,7 @@ def test_array_of_array(loom_env):
d = p.task_array_make((a, b))
e = p.task_array_make((d, c))
f = p.task_array_get(e, 0)
f = p.task_get(e, 0)
loom_env.start(1)
result_d, result_e, result_f = loom_env.submit(p, (d, e, f))
......
from loomenv import loom_env, LOOM_TESTPROG, LOOM_TEST_DATA_DIR # noqa
import os
loom_env # silence flake8
def test_dslice(loom_env):
loom_env.start(1)
p = loom_env.plan()
consts = []
for i in xrange(16):
consts.append(p.task_const("data{}".format(i)))
a = p.task_array_make(consts)
ds = p.task_dslice(a)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment