Commit b4a41743 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: Embedded python in worker

parent cf70ccf3
......@@ -4,6 +4,7 @@ from .task import POLICY_SCHEDULER, POLICY_SIMPLE
import struct
from ..pb import loomrun_pb2 as loomrun
import cloudpickle
def cpus(value):
......@@ -31,6 +32,9 @@ RUN = "loom/run/run"
SCHEDULER_DSLICE = "loom/scheduler/dslice"
SCHEDULER_DGET = "loom/scheduler/dget"
PY_CALL = "loom/py/call"
u64 = struct.Struct("<Q")
u64u64 = struct.Struct("<QQ")
......@@ -154,3 +158,11 @@ def slice(input, start, end):
task.config = u64u64.pack(start, end)
task.policy = POLICY_SIMPLE
return task
def py_call(obj, inputs=(), request=cpu1):
task = Task()
task.task_type = PY_CALL
task.inputs = inputs
task.config = cloudpickle.dumps(obj)
return task
import cloudpickle
def unpack_and_execute(data, inputs):
obj = cloudpickle.loads(data)
return obj(*inputs)
......@@ -16,6 +16,10 @@ add_library(libloom
tasks/arraytasks.h
tasks/runtask.cpp
tasks/runtask.h
tasks/python_wrapper.cpp
tasks/python_wrapper.h
tasks/python.cpp
tasks/python.h
tasks/loomrun.pb.cc
tasks/loomrun.pb.h
connection.cpp
......
......@@ -42,7 +42,7 @@ void Data::init_message(Worker &worker, loomcomm::Data &msg) const
}
char *Data::get_raw_data(Worker &worker)
const char * Data::get_raw_data() const
{
return nullptr;
}
......
......@@ -42,7 +42,7 @@ public:
void serialize(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr);
/** Get pointer to raw data, returns nullptr when it is not possible */
virtual char *get_raw_data(Worker &worker);
virtual const char *get_raw_data() const;
/** Returns a filename if data obeject is mapped from file, empty string otherwise */
virtual std::string get_filename() const;
......
......@@ -19,7 +19,7 @@ std::string ExternFile::get_type_name() const
ExternFile::ExternFile(const std::string &filename)
: data(nullptr), filename(filename)
{
size = file_size(filename.c_str());
open();
}
ExternFile::~ExternFile()
......@@ -51,6 +51,14 @@ void ExternFile::open()
if (fd < 0) {
log_errno_abort("open");
}
size = lseek(fd, (size_t)0, SEEK_END);
if (size == (size_t) -1) {
log_errno_abort("lseek");
}
lseek(fd, 0, SEEK_SET);
map(fd, false);
::close(fd);
}
......
......@@ -17,11 +17,8 @@ public:
return size;
}
char *get_raw_data(Worker &worker)
const char *get_raw_data() const
{
if (data == nullptr) {
open();
}
return data;
}
......
......@@ -43,13 +43,12 @@ std::shared_ptr<Data> Index::get_at_index(size_t index)
{
assert (index < length);
size_t addr = indices[index];
char *p1 = this->data->get_raw_data(worker);
const char *p1 = this->data->get_raw_data();
p1 += addr;
size_t size = indices[index + 1] - addr;
auto data = std::make_shared<RawData>();
data->init_empty(worker, size);
char *p2 = data->get_raw_data(worker);
char *p2 = data->init_empty(worker, size);
memcpy(p2, p1, size);
return data;
}
......@@ -71,14 +70,13 @@ std::shared_ptr<Data> Index::get_slice(size_t from, size_t to)
size_t from_addr = indices[from];
size_t to_addr = indices[to];
char *p1 = this->data->get_raw_data(worker);
const char *p1 = this->data->get_raw_data();
p1 += from_addr;
size_t size = to_addr - from_addr;
auto data = std::make_shared<RawData>();
data->init_empty(worker, size);
char *p2 = data->get_raw_data(worker);
char *p2 = data->init_empty(worker, size);
memcpy(p2, p1, size);
return data;
}
......
......@@ -94,7 +94,7 @@ std::string RawData::get_filename() const
return filename;
}
void RawData::open(Worker &worker)
void RawData::open() const
{
if (size == 0) {
return;
......@@ -107,10 +107,11 @@ void RawData::open(Worker &worker)
}
map(fd, false);
::close(fd);
assert(data);
}
void RawData::map(int fd, bool write)
void RawData::map(int fd, bool write) const
{
assert(data == nullptr);
assert(!filename.empty());
......@@ -143,9 +144,15 @@ void RawData::init_from_string(Worker &worker, const std::string &str)
memcpy(mem, str.c_str(), size);
}
void RawData::init_from_mem(Worker &worker, const void *ptr, size_t size)
{
char *mem = init_empty(worker, size);
memcpy(mem, ptr, size);
}
void RawData::serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr)
{
buffer.add(data_ptr, get_raw_data(worker), size);
buffer.add(data_ptr, get_raw_data(), size);
}
RawDataUnpacker::~RawDataUnpacker()
......
......@@ -17,10 +17,10 @@ public:
return size;
}
char *get_raw_data(Worker &worker)
const char *get_raw_data() const
{
if (data == nullptr) {
open(worker);
open();
}
return data;
}
......@@ -29,6 +29,7 @@ public:
char* init_empty(Worker &worker, size_t size);
void init_from_string(Worker &worker, const std::string &str);
void init_from_mem(Worker &worker, const void *ptr, size_t size);
void assign_filename(Worker &worker);
void init_from_file(Worker &worker);
......@@ -37,15 +38,14 @@ public:
protected:
void serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr);
void open(Worker &worker);
void map(int fd, bool write);
void open() const;
void map(int fd, bool write) const;
char *data;
mutable char *data;
size_t size;
std::string filename;
static size_t file_id_counter;
};
......
......@@ -45,8 +45,8 @@ public:
}
void add(std::shared_ptr<Data> &data, char *data_ptr, size_t size) {
bufs.emplace_back(uv_buf_t {data_ptr, size});
void add(std::shared_ptr<Data> &data, const char *data_ptr, size_t size) {
bufs.emplace_back(uv_buf_t {const_cast<char*>(data_ptr), size});
data_vector.push_back(data);
}
......
#include "python.h"
#include "../data/rawdata.h"
#include "../log.h"
#include "python_wrapper.h"
#include <Python.h>
using namespace loom;
/** Ensures that python is initialized,
* if already initialized, then does nothing */
static void ensure_py_init() {
static bool python_inited = false;
if (python_inited) {
return;
}
python_inited = true;
Py_Initialize();
PyEval_InitThreads();
data_wrapper_init();
PyEval_ReleaseLock();
}
void loom::PyCallTask::start(loom::DataVector &inputs)
{
ensure_py_init();
ThreadTaskInstance::start(inputs);
}
static PyObject* vector_of_data_to_list(const DataVector &data)
{
PyObject *list = PyTuple_New(data.size());
assert(list);
size_t i = 0;
for (auto& item : data) {
PyTuple_SET_ITEM(list, i, (PyObject*) data_wrapper_create(item));
i++;
}
return list;
}
std::shared_ptr<Data> PyCallTask::run()
{
// Obtain GIL
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
// Get loom.wep
PyObject *loom_wep_call = PyImport_ImportModule("loom.wep.call");
if(!loom_wep_call) {
set_python_error();
PyGILState_Release(gstate);
return nullptr;
}
PyObject *call_fn = PyObject_GetAttrString(loom_wep_call, "unpack_and_execute");
Py_DECREF(loom_wep_call);
if(!call_fn) {
set_python_error();
PyGILState_Release(gstate);
return nullptr;
}
assert(PyCallable_Check(call_fn));
PyObject *config_data = PyBytes_FromStringAndSize(
task->get_config().c_str(),
task->get_config().size());
assert(config_data);
PyObject *py_inputs = vector_of_data_to_list(inputs);
assert(py_inputs);
// call "call"
PyObject *result = PyObject_CallFunctionObjArgs(call_fn, config_data, py_inputs, NULL);
Py_DECREF(call_fn);
Py_DECREF(config_data);
if(!result) {
set_python_error();
PyGILState_Release(gstate);
return nullptr;
}
if (!PyUnicode_Check(result)) {
Py_DECREF(result);
set_error("Invalid result from python code");
PyGILState_Release(gstate);
return nullptr;
}
Py_ssize_t size;
char *str = PyUnicode_AsUTF8AndSize(result, &size);
assert(str);
Py_DECREF(result);
auto output = std::make_shared<RawData>();
output->init_from_mem(worker, str, size);
// Release GIL
PyGILState_Release(gstate);
return output;
}
void PyCallTask::set_python_error()
{
loom::llog->error("Python error in task id={}", task->get_id());
PyObject *excType, *excValue, *excTraceback;
PyErr_Fetch(&excType, &excValue, &excTraceback);
assert(excType);
PyErr_NormalizeException(&excType, &excValue, &excTraceback);
if (!excTraceback) {
excTraceback = Py_None;
Py_INCREF(excTraceback);
}
PyException_SetTraceback(excValue, excTraceback);
PyObject *str_obj = PyObject_Str(excValue);
assert(str_obj);
PyObject_Print(excValue, stdout, 0);
Py_ssize_t size;
char *str = PyUnicode_AsUTF8AndSize(str_obj, &size);
assert(str);
std::string error_msg(str, size);
loom::llog->error("Python exception: {}", error_msg);
set_error(std::string("Python exception:\n") + error_msg);
Py_DECREF(str_obj);
PyErr_Clear();
}
#ifndef LIBLOOM_TASKS_PYTHON_H
#define LIBLOOM_TASKS_PYTHON_H
#include "libloom/ttinstance.h"
namespace loom {
class PyCallTask : public loom::ThreadTaskInstance
{
public:
using ThreadTaskInstance::ThreadTaskInstance;
void start(loom::DataVector &inputs);
std::shared_ptr<loom::Data> run();
private:
void set_python_error();
};
}
#endif // LIBLOOM_TASKS_PYTHON_H
#include "python_wrapper.h"
static void
data_wrapper_dealloc(DataWrapper* self)
{
self->data.~__shared_ptr();
Py_TYPE(self)->tp_free((PyObject*)self);
}
static PyObject *
data_wrapper_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
DataWrapper *self;
self = (DataWrapper *)type->tp_alloc(type, 0);
if (self != NULL) {
new (&self->data) std::shared_ptr<loom::Data>();
}
return (PyObject *)self;
}
static PyObject *
data_wrapper_size(DataWrapper* self)
{
assert(self->data);
return PyLong_FromUnsignedLong(self->data->get_size());
}
static PyObject *
data_wrapper_read(DataWrapper* self)
{
assert(self->data);
size_t size = self->data->get_size();
const char *ptr = self->data->get_raw_data();
return PyBytes_FromStringAndSize(ptr, size);
}
static PyMethodDef data_wrapper_methods[] = {
{"size", (PyCFunction)data_wrapper_size, METH_NOARGS,
"Return the size of the data object"
},
{"read", (PyCFunction)data_wrapper_read, METH_NOARGS,
"Return byte representation of data object"
},
{NULL} /* Sentinel */
};
static PyTypeObject data_wrapper_type = {
PyVarObject_HEAD_INIT(NULL, 0)
"Data", /* tp_name */
sizeof(DataWrapper), /* tp_basicsize */
0, /* tp_itemsize */
(destructor) data_wrapper_dealloc, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_reserved */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
0, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT, /* tp_flags */
"DataObject", /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
data_wrapper_methods, /* tp_methods */
0,
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
0, /* tp_init */
0, /* tp_alloc */
data_wrapper_new, /* tp_new */
};
void data_wrapper_init()
{
assert(!PyType_Ready(&data_wrapper_type));
}
DataWrapper *data_wrapper_create(const std::shared_ptr<loom::Data> &data)
{
DataWrapper *self = (DataWrapper*) PyObject_CallFunctionObjArgs((PyObject*) &data_wrapper_type, NULL);
assert(self);
self->data = data;
return self;
}
#ifndef LIBLOOM_TASKS_PYTHON_WRAPPER_H
#define LIBLOOM_TASKS_PYTHON_WRAPPER_H
#include "../data/rawdata.h"
#include <Python.h>
typedef struct {
PyObject_HEAD
std::shared_ptr<loom::Data> data;
} DataWrapper;
void data_wrapper_init();
DataWrapper *data_wrapper_create(const std::shared_ptr<loom::Data> &data);
#endif // LIBLOOM_TASKS_PYTHON_WRAPPER_H
......@@ -50,12 +50,11 @@ std::shared_ptr<Data> MergeTask::run() {
std::shared_ptr<Data> output = std::make_shared<RawData>();
RawData &data = static_cast<RawData&>(*output);
data.init_empty(worker, size);
char *dst = output->get_raw_data(worker);
char *dst = data.init_empty(worker, size);
if (config.empty()) {
for (auto& data : inputs) {
char *mem = data->get_raw_data(worker);
const char *mem = data->get_raw_data();
size_t size = data->get_size();
assert(mem || size == 0);
memcpy(dst, mem, size);
......@@ -72,7 +71,7 @@ std::shared_ptr<Data> MergeTask::run() {
dst += config.size();
}
char *mem = data->get_raw_data(worker);
const char *mem = data->get_raw_data();
size_t size = data->get_size();
assert(mem || size == 0);
memcpy(dst, mem, size);
......@@ -95,7 +94,7 @@ void SplitTask::start(DataVector &inputs)
std::vector<size_t> indices;
auto& input = inputs[0];
char *ptr = input->get_raw_data(worker);
const char *ptr = input->get_raw_data();
size_t size = input->get_size();
indices.push_back(0);
......
......@@ -58,7 +58,7 @@ void RunTask::start(DataVector &inputs)
const std::string &name = msg.map_inputs(i);
if (!name.empty() && name[0] == '$') {
const char *raw_data = inputs[i]->get_raw_data(worker);
const char *raw_data = inputs[i]->get_raw_data();
assert(raw_data);
variables[name] = std::string(raw_data, inputs[i]->get_size());
continue;
......
......@@ -20,7 +20,7 @@ bool ThreadTaskInstance::run_in_thread(DataVector &input_data)
return true;
}
void ThreadTaskInstance::set_error(std::string &error_message)
void ThreadTaskInstance::set_error(const std::string &error_message)
{
this->error_message = error_message;
}
......
......@@ -40,7 +40,7 @@ protected:
* In case of error, call set_error and return nullptr
*/
virtual std::shared_ptr<Data> run() = 0;
void set_error(std::string &error_message);
void set_error(const std::string &error_message);
DataVector inputs;
uv_work_t work;
......
......@@ -12,6 +12,7 @@
#include "tasks/rawdatatasks.h"
#include "tasks/arraytasks.h"
#include "tasks/runtask.h"
#include "tasks/python.h"
#include <stdlib.h>
#include <sstream>
......@@ -98,6 +99,9 @@ void Worker::register_basic_tasks()
// Run
add_task_factory<RunTask>("loom/run/run");
// Python
add_task_factory<PyCallTask>("loom/py/call");
}
......
......@@ -15,7 +15,7 @@ class Scheduler
std::vector<size_t> inputs;
std::vector<size_t> next_inputs;
std::vector<loom::Id> nexts;
std::vector<loom::Id> ids;
std::vector<loom::Id> ids;
};
struct Worker {
......
......@@ -4,4 +4,5 @@ add_executable(loom-worker
target_include_directories(loom-worker PUBLIC ${PROJECT_SOURCE_DIR})
target_link_libraries(loom-worker libloom ${LIBUV_LIBRARY} pthread)
target_link_libraries(loom-worker ${PROTOBUF_LIBRARIES})
target_link_libraries(loom-worker ${PYTHON_LIBRARIES})
install (TARGETS loom-worker DESTINATION bin)
......@@ -31,11 +31,11 @@ class Env():
def __init__(self):
self.processes = []
def start_process(self, name, args):
def start_process(self, name, args, env=None):
fname = os.path.join(LOOM_TEST_BUILD_DIR, name)
with open(fname + ".out", "w") as out:
with open(fname + ".err", "w") as err:
p = subprocess.Popen(args, stdout=out, stderr=err)
p = subprocess.Popen(args, stdout=out, stderr=err, env=env)
self.processes.append((name, p))
return p
......@@ -72,8 +72,13 @@ class LoomEnv(Env):
if VALGRIND:
time.sleep(2)
worker_args = valgrind_args + worker_args
env = os.environ.copy()
if "PYTHONPATH" in env:
env["PYTHONPATH"] = LOOM_PYTHON + ":" + env["PYTHONPATH"]
else:
env["PYTHONPATH"] = LOOM_PYTHON
for i in range(workers_count):
w = self.start_process("worker{}".format(i), worker_args)
w = self.start_process("worker{}".format(i), worker_args, env)
workers.append(w)
time.sleep(0.1)
if VALGRIND:
......
from loomenv import loom_env, LOOM_TESTPROG, LOOM_TEST_DATA_DIR # noqa
import loom.client.tasks as tasks # noqa
from loom.client import TaskFailed
import pytest
loom_env # silence flake8
def test_py_call(loom_env):
def f(a, b):
return "{}, {}, {}, {}".format(
str(a.read(), encoding="ascii"), a.size(),
str(b.read(), encoding="ascii"), b.size())
def g():
return "Test"
loom_env.start(1)
c = tasks.const("ABC")
d = tasks.const("12345")
p = tasks.py_call(f, (c, d))
q = tasks.py_call(g)
result1, result2 = loom_env.submit((p, q))
assert result1 == b"ABC, 3, 12345, 5"
assert result2 == b"Test"
def test_py_fail_too_many_args(loom_env):
def g():
return "Test"
loom_env.start(1)
c = tasks.const("ABC")
a = tasks.py_call(g, (c,))
with pytest.raises(TaskFailed):
loom_env.submit(a)
def test_py_fail_too_few_args(loom_env):
def f(a):
return "ABC"
loom_env.start(1)
a = tasks.py_call(f, ())
with pytest.raises(TaskFailed):
loom_env.submit(a)
def test_py_fail_invalid_result(loom_env):
def f():
return 42.0
loom_env.start(1)
a = tasks.py_call(f, ())
with pytest.raises(TaskFailed):
loom_env.submit(a)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment