...
 
Commits (2)
import cloudpickle
import loom_c
import threading
class Context:
......@@ -41,3 +41,11 @@ def execute(fn_obj, data, inputs, task_id):
return fn_obj(context, *inputs)
else:
return fn_obj(*inputs)
unpickle_lock = threading.Lock()
def unpickle(data):
with unpickle_lock:
return cloudpickle.loads(data)
\ No newline at end of file
......@@ -38,12 +38,12 @@ void loom::ensure_py_init() {
PyObject* loom::deserialize_pyobject(const void *mem, size_t size)
{
// Get cloudpickle
PyObject *cloudpickle = PyImport_ImportModule("cloudpickle");
PyObject *cloudpickle = PyImport_ImportModule("loom.wside.core");
if(!cloudpickle) {
return nullptr;
}
PyObject *loads = PyObject_GetAttrString(cloudpickle, "loads");
PyObject *loads = PyObject_GetAttrString(cloudpickle, "unpickle");
Py_DECREF(cloudpickle);
if(!loads) {
......
......@@ -44,8 +44,8 @@ void ComputationState::plan_node(TaskNode &node, bool load_checkpoints, std::vec
plan_node(*input_node, load_checkpoints, to_load);
if (!input_node->is_computed()) {
remaining_inputs += 1;
input_node->add_next(&node);
}
input_node->add_next(&node);
}
node.set_remaining_inputs(remaining_inputs);
if (remaining_inputs == 0) {
......