Commit 1f14b3b7 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: ThreadJob introduced

parent a127abdf
......@@ -28,7 +28,8 @@ add_library(libloom
worker.h
taskinstance.cpp
taskinstance.h
ttinstance.cpp
threadjob.cpp
threadjob.h
ttinstance.h
taskfactory.h
dictionary.cpp
......
......@@ -27,10 +27,10 @@ static void ensure_py_init() {
}
void loom::PyCallTask::start(loom::DataVector &inputs)
PyCallJob::PyCallJob(Worker &worker, Task &task)
: ThreadJob(worker, task)
{
ensure_py_init();
ThreadTaskInstance::start(inputs);
ensure_py_init();
}
static PyObject* data_vector_to_list(const DataVector &data)
......@@ -93,7 +93,7 @@ static std::string get_attr_string(PyObject *obj, const char *name)
return result;
}
std::shared_ptr<Data> PyCallTask::run()
std::shared_ptr<Data> PyCallJob::run()
{
// Obtain GIL
PyGILState_STATE gstate;
......@@ -118,8 +118,8 @@ std::shared_ptr<Data> PyCallTask::run()
assert(PyCallable_Check(call_fn));
PyObject *config_data = PyBytes_FromStringAndSize(
task->get_config().c_str(),
task->get_config().size());
task.get_config().c_str(),
task.get_config().size());
assert(config_data);
PyObject *py_inputs = data_vector_to_list(inputs);
......@@ -144,7 +144,7 @@ std::shared_ptr<Data> PyCallTask::run()
assert(ptr);
auto output = std::make_shared<RawData>();
output->init_from_mem(worker.get_work_dir(), ptr, size);
output->init_from_mem(work_dir, ptr, size);
Py_DECREF(result);
PyGILState_Release(gstate);
......@@ -156,7 +156,7 @@ std::shared_ptr<Data> PyCallTask::run()
assert(ptr);
auto output = std::make_shared<RawData>();
output->init_from_mem(worker.get_work_dir(), ptr, size);
output->init_from_mem(work_dir, ptr, size);
Py_DECREF(result);
PyGILState_Release(gstate);
......@@ -185,9 +185,9 @@ std::shared_ptr<Data> PyCallTask::run()
}
}
void PyCallTask::set_python_error()
void PyCallJob::set_python_error()
{
loom::llog->error("Python error in task id={}", task->get_id());
loom::llog->error("Python error in task id={}", task.get_id());
PyObject *excType, *excValue, *excTraceback;
PyErr_Fetch(&excType, &excValue, &excTraceback);
assert(excType);
......
#ifndef LIBLOOM_TASKS_PYTHON_H
#define LIBLOOM_TASKS_PYTHON_H
#include "libloom/ttinstance.h"
#include "libloom/threadjob.h"
namespace loom {
class PyCallTask : public loom::ThreadTaskInstance
class PyCallJob : public loom::ThreadJob
{
public:
using ThreadTaskInstance::ThreadTaskInstance;
PyCallJob(Worker &worker, Task &task);
void start(loom::DataVector &inputs);
std::shared_ptr<loom::Data> run();
private:
......
......@@ -21,11 +21,11 @@ void ConstTask::start(DataVector &inputs)
/** If there are more then 50 input or size is bigger then 200kB,
* then merge task is run in thread */
bool MergeTask::run_in_thread(DataVector &input_data)
bool MergeJob::check_run_in_thread()
{
const size_t SIZE_LIMIT = 200 * 1024;
if (input_data.size() > 50) {
if (inputs.size() > 50) {
return true;
}
size_t size = 0;
......@@ -38,20 +38,21 @@ bool MergeTask::run_in_thread(DataVector &input_data)
return false;
}
std::shared_ptr<Data> MergeTask::run() {
std::shared_ptr<Data> MergeJob::run() {
size_t size = 0;
for (auto& data : inputs) {
size += data->get_size();
}
const std::string &config = task->get_config();
const std::string &config = task.get_config();
if (inputs.size() > 1) {
size += (inputs.size() - 1) * config.size();
}
std::shared_ptr<Data> output = std::make_shared<RawData>();
RawData &data = static_cast<RawData&>(*output);
char *dst = data.init_empty(worker.get_work_dir(), size);
char *dst = data.init_empty(work_dir, size);
if (config.empty()) {
for (auto& data : inputs) {
......
......@@ -2,6 +2,7 @@
#define LIBLOOM_TASKS_RAWDATATASKS_H
#include "libloom/ttinstance.h"
#include "libloom/threadjob.h"
namespace loom {
......@@ -13,13 +14,12 @@ public:
};
class MergeTask : public loom::ThreadTaskInstance
class MergeJob : public loom::ThreadJob
{
public:
using ThreadTaskInstance::ThreadTaskInstance;
using ThreadJob::ThreadJob;
bool run_in_thread(loom::DataVector &input_data);
protected:
bool check_run_in_thread();
std::shared_ptr<loom::Data> run();
};
......
#include "threadjob.h"
#include "worker.h"
using namespace loom;
ThreadJob::ThreadJob(Worker &worker, Task &task)
: task(task), work_dir(worker.get_work_dir())
{
}
ThreadJob::~ThreadJob()
{
}
void ThreadJob::set_error(const std::string &error_message)
{
this->error_message = error_message;
}
void ThreadJob::set_redirect(std::unique_ptr<TaskDescription> tredirect)
{
task_redirect = std::move(tredirect);
}
#ifndef LIBLOOM_THREADJOB_H
#define LIBLOOM_THREADJOB_H
#include "data.h"
#include "task.h"
#include "taskdesc.h"
namespace loom {
/** Base class for task instance with thread support */
class ThreadJob
{
public:
ThreadJob(Worker &worker, Task &task);
virtual ~ThreadJob();
/** Method to decide if input is sufficiently big to use thread
* default implementation just returns true
*/
virtual bool check_run_in_thread() {
return true;
}
virtual std::shared_ptr<Data> run() = 0;
void set_inputs(DataVector &input_data) {
this->inputs = input_data;
}
const std::string& get_error_message() {
return error_message;
}
std::unique_ptr<TaskDescription>& get_task_redirect() {
return task_redirect;
}
protected:
void set_error(const std::string &error_message);
void set_redirect(std::unique_ptr<TaskDescription> tredirect);
Task &task;
DataVector inputs;
std::string error_message;
std::unique_ptr<TaskDescription> task_redirect;
const std::string &work_dir;
static void _work_cb(uv_work_t *req);
static void _after_work_cb(uv_work_t *req, int status);
};
}
#endif // LIBLOOM_THREADJOB_H
#include "ttinstance.h"
#include "worker.h"
#include <sstream>
using namespace loom;
void ThreadTaskInstance::start(DataVector &input_data)
{
this->inputs = input_data;
if (run_in_thread(input_data)) {
UV_CHECK(uv_queue_work(worker.get_loop(), &work, _work_cb, _after_work_cb));
} else {
_work_cb(&work);
_after_work_cb(&work, 0);
}
}
bool ThreadTaskInstance::run_in_thread(DataVector &input_data)
{
return true;
}
void ThreadTaskInstance::set_error(const std::string &error_message)
{
this->error_message = error_message;
}
void ThreadTaskInstance::set_redirect(std::unique_ptr<TaskDescription> tredirect)
{
task_redirect = std::move(tredirect);
}
void ThreadTaskInstance::_work_cb(uv_work_t *req)
{
ThreadTaskInstance *ttinstance = static_cast<ThreadTaskInstance*>(req->data);
ttinstance->result = ttinstance->run();
}
void ThreadTaskInstance::_after_work_cb(uv_work_t *req, int status)
{
UV_CHECK(status);
ThreadTaskInstance *ttinstance = static_cast<ThreadTaskInstance*>(req->data);
if (ttinstance->error_message.empty()) {
if (ttinstance->result && !ttinstance->task_redirect) {
ttinstance->finish(ttinstance->result);
} else if (!ttinstance->result && ttinstance->task_redirect) {
ttinstance->redirect(std::move(ttinstance->task_redirect));
} else {
ttinstance->fail("ThreadTaskInstace::run returned nullptr or incosistent returned values");
}
} else {
ttinstance->fail(ttinstance->error_message);
}
}
ThreadTaskInstance::~ThreadTaskInstance()
{
}
......@@ -3,6 +3,7 @@
#include "data.h"
#include "taskinstance.h"
#include "worker.h"
#include<uv.h>
......@@ -13,45 +14,56 @@
namespace loom {
/** Base class for task instance with thread support */
class ThreadTaskInstance : public TaskInstance
template<typename T> class ThreadTaskInstance : public TaskInstance
{
public:
ThreadTaskInstance(Worker &worker, std::unique_ptr<Task> task)
: TaskInstance(worker, std::move(task))
: TaskInstance(worker, std::move(task)), job(worker, *this->task)
{
work.data = this;
}
virtual ~ThreadTaskInstance();
virtual void start(DataVector &input_data);
/** Method to decide if input is sufficiently big to use thread
* default implementation just returns true
*/
virtual bool run_in_thread(DataVector &input_data);
protected:
/** This method is called outside of main thread if run_in_thread has returned true
* IMPORTANT: It can read only member variable "inputs" and calls methods
* "set_error" or "set_redirect"
* All other things are not thread-safe!
* In case of error, call set_error and return nullptr
*/
virtual std::shared_ptr<Data> run() = 0;
void set_error(const std::string &error_message);
void set_redirect(std::unique_ptr<TaskDescription> tredirect);
virtual void start(DataVector &input_data) {
job.set_inputs(input_data);
if (job.check_run_in_thread()) {
UV_CHECK(uv_queue_work(worker.get_loop(), &work, _work_cb, _after_work_cb));
} else {
_work_cb(&work);
_after_work_cb(&work, 0);
}
}
DataVector inputs;
protected:
uv_work_t work;
T job;
std::shared_ptr<Data> result;
std::string error_message;
std::unique_ptr<TaskDescription> task_redirect;
static void _work_cb(uv_work_t *req);
static void _after_work_cb(uv_work_t *req, int status);
static void _work_cb(uv_work_t *req)
{
ThreadTaskInstance *ttinstance = static_cast<ThreadTaskInstance*>(req->data);
ttinstance->result = ttinstance->job.run();
}
static void _after_work_cb(uv_work_t *req, int status)
{
UV_CHECK(status);
ThreadTaskInstance *ttinstance = static_cast<ThreadTaskInstance*>(req->data);
bool redirect = ttinstance->job.get_task_redirect();
const std::string &err = ttinstance->job.get_error_message();
if (err.empty()) {
if (ttinstance->result && !redirect) {
ttinstance->finish(ttinstance->result);
} else if (!ttinstance->result && redirect) {
ttinstance->redirect(std::move(ttinstance->job.get_task_redirect()));
} else {
ttinstance->fail("ThreadTaskInstace::run returned nullptr or incosistent returned values");
}
} else {
ttinstance->fail(err);
}
}
};
}
......
......@@ -90,7 +90,7 @@ void Worker::register_basic_tasks()
// RawData
add_task_factory<ConstTask>("loom/data/const");
add_task_factory<MergeTask>("loom/data/merge");
add_task_factory<ThreadTaskInstance<MergeJob>>("loom/data/merge");
add_task_factory<OpenTask>("loom/data/open");
add_task_factory<SplitTask>("loom/data/split");
......@@ -101,7 +101,7 @@ void Worker::register_basic_tasks()
add_task_factory<RunTask>("loom/run/run");
// Python
add_task_factory<PyCallTask>("loom/py/call");
add_task_factory<ThreadTaskInstance<PyCallJob>>("loom/py/call");
}
......
......@@ -82,7 +82,7 @@ class LoomEnv(Env):
workers.append(w)
time.sleep(0.1)
if VALGRIND:
time.sleep(2)
time.sleep(4)
assert not server.poll()
assert not any(w.poll() for w in workers)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment