Commit c2012daa authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: ThreadTaskInstance added

parent 803e97b6
......@@ -24,6 +24,8 @@ add_library(libloom
worker.h
taskinstance.cpp
taskinstance.h
ttinstance.cpp
ttinstance.h
taskfactory.h
dictionary.cpp
dictionary.h
......
......@@ -8,6 +8,11 @@
using namespace loom;
TaskInstance::~TaskInstance()
{
}
const std::string TaskInstance::get_task_dir()
{
std::stringstream s;
......
......@@ -29,7 +29,7 @@ public:
}
virtual ~TaskInstance() {}
virtual ~TaskInstance();
int get_id() const {
return task->get_id();
......
......@@ -20,8 +20,24 @@ void ConstTask::start(DataVector &inputs)
finish(output);
}
/** If there are more then 50 input or size is bigger then 20000,
* then merge task is run in thread */
bool MergeTask::run_in_thread(DataVector &input_data)
{
if (input_data.size() > 50) {
return true;
}
size_t size = 0;
for (auto& data : inputs) {
size += (*data)->get_size();
if (size > 20000) {
return true;
}
}
return false;
}
void MergeTask::start(DataVector &inputs) {
std::shared_ptr<Data> MergeTask::run() {
size_t size = 0;
for (auto& data : inputs) {
size += (*data)->get_size();
......@@ -38,7 +54,7 @@ void MergeTask::start(DataVector &inputs) {
memcpy(dst, mem, size);
dst += size;
}
finish(output);
return output;
}
void OpenTask::start(DataVector &inputs)
......
#ifndef LIBLOOM_TASKS_RAWDATATASKS_H
#define LIBLOOM_TASKS_RAWDATATASKS_H
#include "libloom/taskinstance.h"
#include "libloom/ttinstance.h"
class ConstTask : public loom::TaskInstance
{
......@@ -11,11 +11,14 @@ public:
};
class MergeTask : public loom::TaskInstance
class MergeTask : public loom::ThreadTaskInstance
{
public:
using TaskInstance::TaskInstance;
void start(loom::DataVector &inputs);
using ThreadTaskInstance::ThreadTaskInstance;
bool run_in_thread(loom::DataVector &input_data);
protected:
std::shared_ptr<loom::Data> run();
};
......
#include "ttinstance.h"
#include "worker.h"
using namespace loom;
void ThreadTaskInstance::start(DataVector &input_data)
{
this->inputs = input_data;
if (run_in_thread(input_data)) {
UV_CHECK(uv_queue_work(worker.get_loop(), &work, _work_cb, _after_work_cb));
} else {
_work_cb(&work);
_after_work_cb(&work, 0);
}
}
bool ThreadTaskInstance::run_in_thread(DataVector &input_data)
{
return true;
}
void ThreadTaskInstance::set_error(std::string &error_message)
{
this->error_message = error_message;
}
void ThreadTaskInstance::_work_cb(uv_work_t *req)
{
ThreadTaskInstance *ttinstance = static_cast<ThreadTaskInstance*>(req->data);
ttinstance->result = ttinstance->run();
}
void ThreadTaskInstance::_after_work_cb(uv_work_t *req, int status)
{
UV_CHECK(status);
ThreadTaskInstance *ttinstance = static_cast<ThreadTaskInstance*>(req->data);
if (ttinstance->error_message.empty()) {
if (ttinstance->result) {
ttinstance->finish(ttinstance->result);
} else {
ttinstance->fail("ThreadTaskInstace::run has returned nullptr");
}
} else {
ttinstance->fail(ttinstance->error_message);
}
}
ThreadTaskInstance::~ThreadTaskInstance()
{
}
#ifndef LOOM_TTINSTANCE_H
#define LOOM_TTINSTANCE_H
#include "data.h"
#include "taskinstance.h"
#include<uv.h>
#include<string>
#include<memory>
#include<vector>
namespace loom {
/** Base class for task instance with thread support */
class ThreadTaskInstance : public TaskInstance
{
public:
ThreadTaskInstance(Worker &worker, std::unique_ptr<Task> task)
: TaskInstance(worker, std::move(task))
{
work.data = this;
}
virtual ~ThreadTaskInstance();
virtual void start(DataVector &input_data);
/** Method to decide if input is sufficiently big to use thread
* default implementation just returns true
*/
virtual bool run_in_thread(DataVector &input_data);
protected:
/** This method is called outside of main thread if run_in_thread has returned true
* IMPORTANT: It can read only member variable "inputs" and calls method "set_error"
* All other things are not thread-safe!
* In case of error, call set_error and return nullptr
*/
virtual std::shared_ptr<Data> run() = 0;
void set_error(std::string &error_message);
DataVector inputs;
uv_work_t work;
std::shared_ptr<Data> result;
std::string error_message;
static void _work_cb(uv_work_t *req);
static void _after_work_cb(uv_work_t *req, int status);
};
}
#endif // LOOM_TASKINSTANCE_H
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment