Commit 8cf4ab31 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: Task split_lines

parent 88b95e1e
import loomplan_pb2 import loomplan_pb2
import loomrun_pb2 import loomrun_pb2
import struct
class Task(object): class Task(object):
...@@ -47,6 +48,16 @@ class OpenTask(Task): ...@@ -47,6 +48,16 @@ class OpenTask(Task):
self.config = filename self.config = filename
class SplitLinesTask(Task):
task_type = "split_lines"
struct = u32 = struct.Struct("<QQ")
def __init__(self, input, start, end):
self.config = self.struct.pack(start, end)
self.inputs = (input,)
class RunTask(Task): class RunTask(Task):
task_type = "run" task_type = "run"
...@@ -142,6 +153,9 @@ class Plan(object): ...@@ -142,6 +153,9 @@ class Plan(object):
def task_merge(self, inputs): def task_merge(self, inputs):
return self.add(MergeTask(inputs)) return self.add(MergeTask(inputs))
def task_split_lines(self, input, start, end):
return self.add(SplitLinesTask(input, start, end))
def task_run(self, args, stdin=None, stdout=None, variable=None): def task_run(self, args, stdin=None, stdout=None, variable=None):
return self.add( return self.add(
RunTask(args, stdin=stdin, stdout=stdout, variable=variable)) RunTask(args, stdin=stdin, stdout=stdout, variable=variable))
......
...@@ -8,11 +8,6 @@ ...@@ -8,11 +8,6 @@
using namespace loom; using namespace loom;
ConstTask::ConstTask(Worker &worker, std::unique_ptr<Task> task)
: TaskInstance(worker, std::move(task))
{
}
void ConstTask::start(DataVector &inputs) void ConstTask::start(DataVector &inputs)
{ {
...@@ -22,11 +17,6 @@ void ConstTask::start(DataVector &inputs) ...@@ -22,11 +17,6 @@ void ConstTask::start(DataVector &inputs)
finish(std::move(output)); finish(std::move(output));
} }
MergeTask::MergeTask(Worker &worker, std::unique_ptr<Task> task)
: TaskInstance(worker, std::move(task))
{
}
void MergeTask::start(DataVector &inputs) { void MergeTask::start(DataVector &inputs) {
size_t size = 0; size_t size = 0;
...@@ -47,13 +37,56 @@ void MergeTask::start(DataVector &inputs) { ...@@ -47,13 +37,56 @@ void MergeTask::start(DataVector &inputs) {
finish(std::move(output)); finish(std::move(output));
} }
OpenTask::OpenTask(Worker &worker, std::unique_ptr<Task> task) void OpenTask::start(DataVector &inputs)
: TaskInstance(worker, std::move(task))
{ {
finish(std::make_unique<ExternFile>(task->get_config()));
} }
void OpenTask::start(DataVector &inputs) void LineSplitTask::start(DataVector &inputs)
{ {
finish(std::make_unique<ExternFile>(task->get_config())); assert(inputs.size() == 1);
assert(sizeof(uint64_t) * 2 == task->get_config().size());
const uint64_t *indices = reinterpret_cast<const uint64_t*>(task->get_config().c_str());
uint64_t start = indices[0];
uint64_t count = indices[1] - start;
auto input = *inputs[0];
char *start_ptr = input->get_raw_data(worker);
char *end_ptr = start_ptr + input->get_size();
if (start) {
while (start_ptr != end_ptr) {
if (*start_ptr == '\n') {
start--;
if (start == 0) {
start_ptr++;
break;
}
}
start_ptr++;
}
}
char *data_end = start_ptr;
if (count) {
while (data_end != end_ptr) {
if (*data_end == '\n') {
count--;
if (count == 0) {
data_end++;
break;
}
}
data_end++;
}
}
size_t data_size = data_end - start_ptr;
auto output = std::make_unique<RawData>();
output->init_empty_file(worker, data_size);
char *dst = output->get_raw_data(worker);
memcpy(dst, start_ptr, data_size);
finish(std::move(output));
} }
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
class ConstTask : public loom::TaskInstance class ConstTask : public loom::TaskInstance
{ {
public: public:
ConstTask(loom::Worker &worker, std::unique_ptr<loom::Task> task); using TaskInstance::TaskInstance;
void start(loom::DataVector &inputs); void start(loom::DataVector &inputs);
}; };
...@@ -14,14 +14,23 @@ public: ...@@ -14,14 +14,23 @@ public:
class MergeTask : public loom::TaskInstance class MergeTask : public loom::TaskInstance
{ {
public: public:
MergeTask(loom::Worker &worker, std::unique_ptr<loom::Task> task); using TaskInstance::TaskInstance;
void start(loom::DataVector &inputs); void start(loom::DataVector &inputs);
}; };
class OpenTask : public loom::TaskInstance class OpenTask : public loom::TaskInstance
{ {
public: public:
OpenTask(loom::Worker &worker, std::unique_ptr<loom::Task> task); using TaskInstance::TaskInstance;
void start(loom::DataVector &inputs);
};
class LineSplitTask : public loom::TaskInstance
{
public:
using TaskInstance::TaskInstance;
void start(loom::DataVector &inputs); void start(loom::DataVector &inputs);
}; };
......
...@@ -118,6 +118,8 @@ int main(int argc, char **argv) ...@@ -118,6 +118,8 @@ int main(int argc, char **argv)
std::make_unique<SimpleTaskFactory<MergeTask>>("merge")); std::make_unique<SimpleTaskFactory<MergeTask>>("merge"));
worker.add_task_factory( worker.add_task_factory(
std::make_unique<SimpleTaskFactory<OpenTask>>("open")); std::make_unique<SimpleTaskFactory<OpenTask>>("open"));
worker.add_task_factory(
std::make_unique<SimpleTaskFactory<LineSplitTask>>("split_lines"));
worker.set_cpus(config.cpus); worker.set_cpus(config.cpus);
//worker.add_task_factory<MergeTask>("merge"); //worker.add_task_factory<MergeTask>("merge");
uv_run(&loop, UV_RUN_DEFAULT); uv_run(&loop, UV_RUN_DEFAULT);
......
...@@ -148,7 +148,7 @@ def test_run_files(loom_env): ...@@ -148,7 +148,7 @@ def test_run_files(loom_env):
assert result == "cdef" * 100 assert result == "cdef" * 100
def test_open(loom_env): def test_open_and_merge(loom_env):
p = loom_env.plan() p = loom_env.plan()
a = p.task_open(FILE1) a = p.task_open(FILE1)
b = p.task_open(FILE2) b = p.task_open(FILE2)
...@@ -159,3 +159,28 @@ def test_open(loom_env): ...@@ -159,3 +159,28 @@ def test_open(loom_env):
"\n".join("Line {}".format(i) for i in xrange(1, 13)) + "\n".join("Line {}".format(i) for i in xrange(1, 13)) +
"\n") "\n")
assert result == expect assert result == expect
def test_open_and_splitlines(loom_env):
loom_env.start(1)
p = loom_env.plan()
a = p.task_open(FILE2)
c = p.task_split_lines(a, 2, 6)
result = loom_env.submit(p, c)
expect = "\n".join("Line {}".format(i) for i in xrange(3, 7)) + "\n"
assert result == expect
p = loom_env.plan()
a = p.task_open(FILE2)
c = p.task_split_lines(a, 0, 6)
result = loom_env.submit(p, c)
expect = "\n".join("Line {}".format(i) for i in xrange(1, 7)) + "\n"
assert result == expect
p = loom_env.plan()
a = p.task_open(FILE2)
c = p.task_split_lines(a, 3, 60)
result = loom_env.submit(p, c)
expect = "\n".join("Line {}".format(i) for i in xrange(4, 13)) + "\n"
assert result == expect
...@@ -3,6 +3,7 @@ import time ...@@ -3,6 +3,7 @@ import time
import sys import sys
from datetime import datetime from datetime import datetime
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description='Simple program called in test') description='Simple program called in test')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment