Commit d43381d0 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: Merge task has optional "delimiter" argument

parent 2d7d5103
......@@ -62,11 +62,13 @@ class PlanBuilder(object):
task.policy = POLICY_SIMPLE
return self.plan.add(task)
def task_merge(self, inputs):
def task_merge(self, inputs, delimiter=""):
task = Task()
task.task_type = self.TASK_DATA_MERGE
task.inputs = inputs
task.policy = POLICY_SIMPLE
if delimiter:
task.config = delimiter
return self.plan.add(task)
def task_open(self, filename):
......
......@@ -18,17 +18,19 @@ void ConstTask::start(DataVector &inputs)
finish(std::static_pointer_cast<Data>(output));
}
/** If there are more then 50 input or size is bigger then 20000,
/** If there are more then 50 input or size is bigger then 200kB,
* then merge task is run in thread */
bool MergeTask::run_in_thread(DataVector &input_data)
{
const size_t SIZE_LIMIT = 200 * 1024;
if (input_data.size() > 50) {
return true;
}
size_t size = 0;
for (auto& data : inputs) {
size += data->get_size();
if (size > 20000) {
if (size > SIZE_LIMIT) {
return true;
}
}
......@@ -40,17 +42,42 @@ std::shared_ptr<Data> MergeTask::run() {
for (auto& data : inputs) {
size += data->get_size();
}
const std::string &config = task->get_config();
if (inputs.size() > 1) {
size += (inputs.size() - 1) * config.size();
}
std::shared_ptr<Data> output = std::make_shared<RawData>();
RawData &data = static_cast<RawData&>(*output);
data.init_empty(worker, size);
char *dst = output->get_raw_data(worker);
for (auto& data : inputs) {
char *mem = data->get_raw_data(worker);
size_t size = data->get_size();
assert(mem || size == 0);
memcpy(dst, mem, size);
dst += size;
if (config.empty()) {
for (auto& data : inputs) {
char *mem = data->get_raw_data(worker);
size_t size = data->get_size();
assert(mem || size == 0);
memcpy(dst, mem, size);
dst += size;
}
} else {
bool first = true;
for (auto& data : inputs) {
if (first) {
first = false;
} else {
memcpy(dst, config.c_str(), config.size());
dst += config.size();
}
char *mem = data->get_raw_data(worker);
size_t size = data->get_size();
assert(mem || size == 0);
memcpy(dst, mem, size);
dst += size;
}
}
return output;
}
......
......@@ -60,6 +60,22 @@ def test_merge_w3(loom_env):
assert "ABCDE123" == loom_env.submit(p, c)
def test_merge_delimiter(loom_env):
loom_env.start(1)
p = loom_env.plan_builder()
consts = [p.task_const(str(i)) for i in xrange(10)]
c = p.task_merge(consts, "abc")
expected = "abc".join(str(i) for i in xrange(10))
assert expected == loom_env.submit(p, c)
def test_merge_empty_with_delimiter(loom_env):
loom_env.start(1)
p = loom_env.plan_builder()
c = p.task_merge((), "abc")
assert "" == loom_env.submit(p, c)
def test_run_separated_1_cpu(loom_env):
loom_env.start(1)
p = loom_env.plan_builder()
......@@ -198,7 +214,7 @@ def test_open_and_splitlines(loom_env):
c1 = p.task_slice(lines, 2, 6)
c2 = p.task_slice(lines, 0, 6)
c3 = p.task_slice(lines, 3, 60)
result1, result2, result3 = loom_env.submit(p, [c1,c2,c3])
result1, result2, result3 = loom_env.submit(p, [c1, c2, c3])
expect1 = "\n".join("Line {}".format(i) for i in xrange(3, 7)) + "\n"
assert result1 == expect1
......@@ -243,4 +259,5 @@ def test_size_and_length(loom_env):
c2 = p.task_length(a2)
u64 = struct.Struct("<Q")
[25, 0, 50, 2] == map(lambda x: u64.unpack(x)[0], loom_env.submit(p, (b1, c1, b2, c2)))
[25, 0, 50, 2] == map(lambda x: u64.unpack(x)[0],
loom_env.submit(p, (b1, c1, b2, c2)))
from loomenv import loom_env, LOOM_TESTPROG, LOOM_TEST_DATA_DIR # noqa
import os
loom_env # silence flake8
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment