diff --git a/src/libloom/CMakeLists.txt b/src/libloom/CMakeLists.txt index 49694a49a4318532aaadea524ff94bb628eaec59..be925e2a912e0b4565df704cba59bde19c3a7103 100644 --- a/src/libloom/CMakeLists.txt +++ b/src/libloom/CMakeLists.txt @@ -28,7 +28,8 @@ add_library(libloom worker.h taskinstance.cpp taskinstance.h - ttinstance.cpp + threadjob.cpp + threadjob.h ttinstance.h taskfactory.h dictionary.cpp diff --git a/src/libloom/tasks/python.cpp b/src/libloom/tasks/python.cpp index 87d9613cb285a1b7818be0195f35558609735cf8..478bb52528017a50b619859333eabb0a2e3ea265 100644 --- a/src/libloom/tasks/python.cpp +++ b/src/libloom/tasks/python.cpp @@ -27,10 +27,10 @@ static void ensure_py_init() { } -void loom::PyCallTask::start(loom::DataVector &inputs) +PyCallJob::PyCallJob(Worker &worker, Task &task) + : ThreadJob(worker, task) { - ensure_py_init(); - ThreadTaskInstance::start(inputs); + ensure_py_init(); } static PyObject* data_vector_to_list(const DataVector &data) @@ -93,7 +93,7 @@ static std::string get_attr_string(PyObject *obj, const char *name) return result; } -std::shared_ptr<Data> PyCallTask::run() +std::shared_ptr<Data> PyCallJob::run() { // Obtain GIL PyGILState_STATE gstate; @@ -118,8 +118,8 @@ std::shared_ptr<Data> PyCallTask::run() assert(PyCallable_Check(call_fn)); PyObject *config_data = PyBytes_FromStringAndSize( - task->get_config().c_str(), - task->get_config().size()); + task.get_config().c_str(), + task.get_config().size()); assert(config_data); PyObject *py_inputs = data_vector_to_list(inputs); @@ -144,7 +144,7 @@ std::shared_ptr<Data> PyCallTask::run() assert(ptr); auto output = std::make_shared<RawData>(); - output->init_from_mem(worker.get_work_dir(), ptr, size); + output->init_from_mem(work_dir, ptr, size); Py_DECREF(result); PyGILState_Release(gstate); @@ -156,7 +156,7 @@ std::shared_ptr<Data> PyCallTask::run() assert(ptr); auto output = std::make_shared<RawData>(); - output->init_from_mem(worker.get_work_dir(), ptr, size); + output->init_from_mem(work_dir, ptr, size); Py_DECREF(result); PyGILState_Release(gstate); @@ -185,9 +185,9 @@ std::shared_ptr<Data> PyCallTask::run() } } -void PyCallTask::set_python_error() +void PyCallJob::set_python_error() { - loom::llog->error("Python error in task id={}", task->get_id()); + loom::llog->error("Python error in task id={}", task.get_id()); PyObject *excType, *excValue, *excTraceback; PyErr_Fetch(&excType, &excValue, &excTraceback); assert(excType); diff --git a/src/libloom/tasks/python.h b/src/libloom/tasks/python.h index 0246d403b0790ca72e0f4d26097817e78e359de1..f46589b636dded484bf61273caa2f3efe1d9e709 100644 --- a/src/libloom/tasks/python.h +++ b/src/libloom/tasks/python.h @@ -1,14 +1,15 @@ #ifndef LIBLOOM_TASKS_PYTHON_H #define LIBLOOM_TASKS_PYTHON_H -#include "libloom/ttinstance.h" +#include "libloom/threadjob.h" namespace loom { -class PyCallTask : public loom::ThreadTaskInstance +class PyCallJob : public loom::ThreadJob { public: - using ThreadTaskInstance::ThreadTaskInstance; + PyCallJob(Worker &worker, Task &task); + void start(loom::DataVector &inputs); std::shared_ptr<loom::Data> run(); private: diff --git a/src/libloom/tasks/rawdatatasks.cpp b/src/libloom/tasks/rawdatatasks.cpp index a198ca920fee015cfa14605dd7ca8210853170ff..5b3d02719eddd27ef23192ca8ae78ef2376a6cc3 100644 --- a/src/libloom/tasks/rawdatatasks.cpp +++ b/src/libloom/tasks/rawdatatasks.cpp @@ -21,11 +21,11 @@ void ConstTask::start(DataVector &inputs) /** If there are more then 50 input or size is bigger then 200kB, * then merge task is run in thread */ -bool MergeTask::run_in_thread(DataVector &input_data) +bool MergeJob::check_run_in_thread() { const size_t SIZE_LIMIT = 200 * 1024; - if (input_data.size() > 50) { + if (inputs.size() > 50) { return true; } size_t size = 0; @@ -38,20 +38,21 @@ bool MergeTask::run_in_thread(DataVector &input_data) return false; } -std::shared_ptr<Data> MergeTask::run() { +std::shared_ptr<Data> MergeJob::run() { size_t size = 0; for (auto& data : inputs) { size += data->get_size(); } - const std::string &config = task->get_config(); + const std::string &config = task.get_config(); + if (inputs.size() > 1) { size += (inputs.size() - 1) * config.size(); } std::shared_ptr<Data> output = std::make_shared<RawData>(); RawData &data = static_cast<RawData&>(*output); - char *dst = data.init_empty(worker.get_work_dir(), size); + char *dst = data.init_empty(work_dir, size); if (config.empty()) { for (auto& data : inputs) { diff --git a/src/libloom/tasks/rawdatatasks.h b/src/libloom/tasks/rawdatatasks.h index 41f4176f52dfed66bb974fbef2ddfc6eaca6b63f..994e6fd9c547a74b760d957b7bcdb84c623894e0 100644 --- a/src/libloom/tasks/rawdatatasks.h +++ b/src/libloom/tasks/rawdatatasks.h @@ -2,6 +2,7 @@ #define LIBLOOM_TASKS_RAWDATATASKS_H #include "libloom/ttinstance.h" +#include "libloom/threadjob.h" namespace loom { @@ -13,13 +14,12 @@ public: }; -class MergeTask : public loom::ThreadTaskInstance +class MergeJob : public loom::ThreadJob { public: - using ThreadTaskInstance::ThreadTaskInstance; + using ThreadJob::ThreadJob; - bool run_in_thread(loom::DataVector &input_data); -protected: + bool check_run_in_thread(); std::shared_ptr<loom::Data> run(); }; diff --git a/src/libloom/threadjob.cpp b/src/libloom/threadjob.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2dba8886d4b74b14b9133f97f3c113655cfb4fc6 --- /dev/null +++ b/src/libloom/threadjob.cpp @@ -0,0 +1,25 @@ + +#include "threadjob.h" +#include "worker.h" + +using namespace loom; + +ThreadJob::ThreadJob(Worker &worker, Task &task) + : task(task), work_dir(worker.get_work_dir()) +{ +} + +ThreadJob::~ThreadJob() +{ + +} + +void ThreadJob::set_error(const std::string &error_message) +{ + this->error_message = error_message; +} + +void ThreadJob::set_redirect(std::unique_ptr<TaskDescription> tredirect) +{ + task_redirect = std::move(tredirect); +} diff --git a/src/libloom/threadjob.h b/src/libloom/threadjob.h new file mode 100644 index 0000000000000000000000000000000000000000..149a6a570890f76b9b737b8a466a12e1228ad130 --- /dev/null +++ b/src/libloom/threadjob.h @@ -0,0 +1,58 @@ +#ifndef LIBLOOM_THREADJOB_H +#define LIBLOOM_THREADJOB_H + +#include "data.h" +#include "task.h" +#include "taskdesc.h" + +namespace loom { + +/** Base class for task instance with thread support */ +class ThreadJob +{ +public: + + ThreadJob(Worker &worker, Task &task); + virtual ~ThreadJob(); + + /** Method to decide if input is sufficiently big to use thread + * default implementation just returns true + */ + virtual bool check_run_in_thread() { + return true; + } + + virtual std::shared_ptr<Data> run() = 0; + + void set_inputs(DataVector &input_data) { + this->inputs = input_data; + } + + const std::string& get_error_message() { + return error_message; + } + + std::unique_ptr<TaskDescription>& get_task_redirect() { + return task_redirect; + } + + +protected: + + + void set_error(const std::string &error_message); + void set_redirect(std::unique_ptr<TaskDescription> tredirect); + + Task &task; + DataVector inputs; + std::string error_message; + std::unique_ptr<TaskDescription> task_redirect; + const std::string &work_dir; + + static void _work_cb(uv_work_t *req); + static void _after_work_cb(uv_work_t *req, int status); +}; + +} + +#endif // LIBLOOM_THREADJOB_H diff --git a/src/libloom/ttinstance.cpp b/src/libloom/ttinstance.cpp deleted file mode 100644 index 1ee7d4d9bfb8b7aba1f69b83ed503629fa0e7b1d..0000000000000000000000000000000000000000 --- a/src/libloom/ttinstance.cpp +++ /dev/null @@ -1,60 +0,0 @@ -#include "ttinstance.h" - -#include "worker.h" - -#include <sstream> - -using namespace loom; - -void ThreadTaskInstance::start(DataVector &input_data) -{ - this->inputs = input_data; - if (run_in_thread(input_data)) { - UV_CHECK(uv_queue_work(worker.get_loop(), &work, _work_cb, _after_work_cb)); - } else { - _work_cb(&work); - _after_work_cb(&work, 0); - } -} - -bool ThreadTaskInstance::run_in_thread(DataVector &input_data) -{ - return true; -} - -void ThreadTaskInstance::set_error(const std::string &error_message) -{ - this->error_message = error_message; -} - -void ThreadTaskInstance::set_redirect(std::unique_ptr<TaskDescription> tredirect) -{ - task_redirect = std::move(tredirect); -} - -void ThreadTaskInstance::_work_cb(uv_work_t *req) -{ - ThreadTaskInstance *ttinstance = static_cast<ThreadTaskInstance*>(req->data); - ttinstance->result = ttinstance->run(); -} - -void ThreadTaskInstance::_after_work_cb(uv_work_t *req, int status) -{ - UV_CHECK(status); - ThreadTaskInstance *ttinstance = static_cast<ThreadTaskInstance*>(req->data); - if (ttinstance->error_message.empty()) { - if (ttinstance->result && !ttinstance->task_redirect) { - ttinstance->finish(ttinstance->result); - } else if (!ttinstance->result && ttinstance->task_redirect) { - ttinstance->redirect(std::move(ttinstance->task_redirect)); - } else { - ttinstance->fail("ThreadTaskInstace::run returned nullptr or incosistent returned values"); - } - } else { - ttinstance->fail(ttinstance->error_message); - } -} - -ThreadTaskInstance::~ThreadTaskInstance() -{ -} diff --git a/src/libloom/ttinstance.h b/src/libloom/ttinstance.h index e842f635338a32e9db92a0614a7954237cdbd94b..2c94e3ea9e8120ae0f1c0d07f6ea82d6ab3c09ee 100644 --- a/src/libloom/ttinstance.h +++ b/src/libloom/ttinstance.h @@ -3,6 +3,7 @@ #include "data.h" #include "taskinstance.h" +#include "worker.h" #include<uv.h> @@ -13,45 +14,56 @@ namespace loom { /** Base class for task instance with thread support */ -class ThreadTaskInstance : public TaskInstance +template<typename T> class ThreadTaskInstance : public TaskInstance { public: ThreadTaskInstance(Worker &worker, std::unique_ptr<Task> task) - : TaskInstance(worker, std::move(task)) + : TaskInstance(worker, std::move(task)), job(worker, *this->task) { work.data = this; } - virtual ~ThreadTaskInstance(); - - virtual void start(DataVector &input_data); - - /** Method to decide if input is sufficiently big to use thread - * default implementation just returns true - */ - virtual bool run_in_thread(DataVector &input_data); - -protected: - - /** This method is called outside of main thread if run_in_thread has returned true - * IMPORTANT: It can read only member variable "inputs" and calls methods - * "set_error" or "set_redirect" - * All other things are not thread-safe! - * In case of error, call set_error and return nullptr - */ - virtual std::shared_ptr<Data> run() = 0; - void set_error(const std::string &error_message); - void set_redirect(std::unique_ptr<TaskDescription> tredirect); + virtual void start(DataVector &input_data) { + job.set_inputs(input_data); + if (job.check_run_in_thread()) { + UV_CHECK(uv_queue_work(worker.get_loop(), &work, _work_cb, _after_work_cb)); + } else { + _work_cb(&work); + _after_work_cb(&work, 0); + } + } - DataVector inputs; +protected: uv_work_t work; + T job; std::shared_ptr<Data> result; - std::string error_message; - std::unique_ptr<TaskDescription> task_redirect; - static void _work_cb(uv_work_t *req); - static void _after_work_cb(uv_work_t *req, int status); + static void _work_cb(uv_work_t *req) + { + ThreadTaskInstance *ttinstance = static_cast<ThreadTaskInstance*>(req->data); + ttinstance->result = ttinstance->job.run(); + } + + static void _after_work_cb(uv_work_t *req, int status) + { + UV_CHECK(status); + ThreadTaskInstance *ttinstance = static_cast<ThreadTaskInstance*>(req->data); + bool redirect = ttinstance->job.get_task_redirect(); + const std::string &err = ttinstance->job.get_error_message(); + if (err.empty()) { + if (ttinstance->result && !redirect) { + ttinstance->finish(ttinstance->result); + } else if (!ttinstance->result && redirect) { + ttinstance->redirect(std::move(ttinstance->job.get_task_redirect())); + } else { + ttinstance->fail("ThreadTaskInstace::run returned nullptr or incosistent returned values"); + } + } else { + ttinstance->fail(err); + } + } + }; } diff --git a/src/libloom/worker.cpp b/src/libloom/worker.cpp index 143822f10c088a989438086b51b151114ca646bd..551b5a2efed3bf4ab612f7ea41633e28a65bbb20 100644 --- a/src/libloom/worker.cpp +++ b/src/libloom/worker.cpp @@ -90,7 +90,7 @@ void Worker::register_basic_tasks() // RawData add_task_factory<ConstTask>("loom/data/const"); - add_task_factory<MergeTask>("loom/data/merge"); + add_task_factory<ThreadTaskInstance<MergeJob>>("loom/data/merge"); add_task_factory<OpenTask>("loom/data/open"); add_task_factory<SplitTask>("loom/data/split"); @@ -101,7 +101,7 @@ void Worker::register_basic_tasks() add_task_factory<RunTask>("loom/run/run"); // Python - add_task_factory<PyCallTask>("loom/py/call"); + add_task_factory<ThreadTaskInstance<PyCallJob>>("loom/py/call"); } diff --git a/tests/client/loomenv.py b/tests/client/loomenv.py index d0b36f26278d30148e57732e8844d6abc7bcca26..a69148cbc4e3d237c8da0ea171b5429e553565c8 100644 --- a/tests/client/loomenv.py +++ b/tests/client/loomenv.py @@ -82,7 +82,7 @@ class LoomEnv(Env): workers.append(w) time.sleep(0.1) if VALGRIND: - time.sleep(2) + time.sleep(4) assert not server.poll() assert not any(w.poll() for w in workers)