Commit 3becffcd authored by Stanislav Bohm's avatar Stanislav Bohm

RF: separating libloom and libloomw

parent ff1f1012
......@@ -3,12 +3,13 @@
BASE_DIR=`dirname $0`/..
LIBLOOM_DIR=${BASE_DIR}/src/libloom
LIBLOOMW_DIR=${BASE_DIR}/src/libloomw
PYTHON_DIR=${BASE_DIR}/python/loom/pb
# LIBLOOM
protoc loomcomm.proto --cpp_out=${LIBLOOM_DIR}
protoc loomplan.proto --cpp_out=${LIBLOOM_DIR}
protoc loomrun.proto --cpp_out=${LIBLOOM_DIR}/tasks
protoc loomrun.proto --cpp_out=${LIBLOOMW_DIR}/tasks
# CLIENT (Python)
protoc loomcomm.proto --python_out=${PYTHON_DIR}
......
......@@ -11,6 +11,13 @@ add_library(libloom
log.h
pbutils.h
pbutils.cpp
dictionary.cpp
dictionary.h
types.h
loomcomm.pb.h
loomcomm.pb.cc
loomplan.pb.h
loomplan.pb.cc
)
target_include_directories(libloom PUBLIC ${PROJECT_SOURCE_DIR}/src)
......
......@@ -5,8 +5,8 @@
In the current version it provides std::make_unique
*/
#ifndef LIBLOOMNET_COMPAT_H
#define LIBLOOMNET_COMPAT_H
#ifndef LIBLOOM_COMPAT_H
#define LIBLOOM_COMPAT_H
#include <cstddef>
#include <memory>
......
......@@ -8,6 +8,7 @@
#include <vector>
namespace loom {
namespace base {
/** Container for symbols */
class Dictionary {
......@@ -15,17 +16,18 @@ class Dictionary {
public:
Dictionary();
loom::Id find_symbol_or_fail(const std::string &symbol) const;
loom::Id find_symbol(const std::string &symbol) const;
loom::Id find_or_create(const std::string &symbol);
const std::string& translate(loom::Id id);
loom::base::Id find_symbol_or_fail(const std::string &symbol) const;
loom::base::Id find_symbol(const std::string &symbol) const;
loom::base::Id find_or_create(const std::string &symbol);
const std::string& translate(loom::base::Id id);
std::vector<std::string> get_all_symbols() const;
private:
std::unordered_map<std::string, loom::Id> symbol_to_id;
std::unordered_map<std::string, loom::base::Id> symbol_to_id;
};
}
}
#endif // LIBLOOM_DICTIONARY_H
#ifndef LIBLOOMNET_LISTENER_H
#define LIBLOOMNET_LISTENER_H
#ifndef LIBLOOM_LISTENER_H
#define LIBLOOM_LISTENER_H
#include "socket.h"
......@@ -31,4 +31,4 @@ protected:
}
}
#endif // LIBLOOMNET_LISTENER_H
#endif // LIBLOOM_LISTENER_H
#ifndef LIBLOOMNET_LOG_H
#define LIBLOOMNET_LOG_H
#ifndef LIBLOOM_LOG_H
#define LIBLOOM_LOG_H
#include <sys/types.h>
#include "spdlog/spdlog.h"
......
#include "pbutils.h"
#include "libloomw/loomcomm.pb.h"
#include "loomcomm.pb.h"
#include "compat.h"
......
#ifndef LOOM_SERVER_UTILS_H
#define LOOM_SERVER_UTILS_H
#ifndef LIBLOOM_UTILS_H
#define LIBLOOM_UTILS_H
#include "sendbuffer.h"
#include "socket.h"
......@@ -21,4 +21,4 @@ void send_message(loom::base::Socket &socket, ::google::protobuf::MessageLite &m
}}
#endif // LOOM_SERVER_UTILS_H
#endif // LIBLOOM_UTILS_H
#ifndef LIBLOOMNET_SENDBUFFER_H
#define LIBLOOMNET_SENDBUFFER_H
#ifndef LIBLOOM_SENDBUFFER_H
#define LIBLOOM_SENDBUFFER_H
#include <uv.h>
#include <memory>
......@@ -97,4 +97,4 @@ protected:
}}
#endif // LIBLOOMNET_SENDBUFFER_H
#endif // LIBLOOM_SENDBUFFER_H
#ifndef LIBLOOMNET_SOCKET_H
#define LIBLOOMNET_SOCKET_H
#ifndef LIBLOOM_SOCKET_H
#define LIBLOOM_SOCKET_H
#include "sendbuffer.h"
......@@ -90,4 +90,4 @@ private:
}}
#endif // LIBLOOMNET_CONNECTION_H
#endif // LIBLOOM_CONNECTION_H
......@@ -2,13 +2,12 @@
#define LIBLOOM_TYPES_H
namespace loom {
namespace base {
const int PROTOCOL_VERSION = 1;
typedef int Id;
typedef int TaskId;
typedef int DataTypeId;
}
}
#endif // LIBLOOM_TYPES_H
......@@ -30,8 +30,6 @@ add_library(libloomw
threadjob.h
ttinstance.h
taskfactory.h
dictionary.cpp
dictionary.h
data.cpp
data.h
unpacking.cpp
......@@ -41,11 +39,6 @@ add_library(libloomw
task.cpp
task.h
taskdesc.h
loomcomm.pb.h
loomcomm.pb.cc
loomplan.pb.h
loomplan.pb.cc
types.h
config.cpp
config.h
utils.h
......
#ifndef LIBLOOM_INIT_H
#define LIBLOOM_INIT_H
#ifndef LIBLOOMW_INIT_H
#define LIBLOOMW_INIT_H
#include <string>
#include <argp.h>
......@@ -48,6 +48,3 @@ private:
#endif
......@@ -37,7 +37,7 @@ bool Data::has_raw_data() const
return false;
}
Id Data::get_type_id(Worker &worker) const
base::Id Data::get_type_id(Worker &worker) const
{
return worker.get_dictionary().find_symbol(get_type_name());
}
......
#ifndef LOOM_DATA_H
#define LOOM_DATA_H
#ifndef LIBLOOMW_DATA_H
#define LIBLOOMW_DATA_H
#include "types.h"
#include "loomcomm.pb.h"
#include "libloom/loomcomm.pb.h"
#include "libloom/types.h"
#include "libloom/sendbuffer.h"
#include <uv.h>
......@@ -51,7 +51,7 @@ public:
virtual bool has_raw_data() const;
loom::Id get_type_id(Worker &worker) const;
loom::base::Id get_type_id(Worker &worker) const;
protected:
};
......@@ -72,4 +72,4 @@ typedef std::vector<std::shared_ptr<Data>> DataVector;
}
#endif // LOOM_DATA_H
#endif // LIBLOOMW_DATA_H
......@@ -77,8 +77,8 @@ std::shared_ptr<Data> Array::get_at_index(size_t index)
size_t Array::serialize(Worker &worker, loom::base::SendBuffer &buffer, std::shared_ptr<Data> &data_ptr)
{
auto types = std::make_unique<base::MemItemWithSz>(sizeof(loom::Id) * length);
loom::Id *ts = reinterpret_cast<loom::Id*>(types->get_ptr());
auto types = std::make_unique<base::MemItemWithSz>(sizeof(loom::base::Id) * length);
loom::base::Id *ts = reinterpret_cast<loom::base::Id*>(types->get_ptr());
for (size_t i = 0; i < length; i++) {
ts[i] = items[i]->get_type_id(worker);
}
......
#ifndef LIBLOOM_DATA_ARRAY_H
#define LIBLOOM_DATA_ARRAY_H
#ifndef LIBLOOMW_DATA_ARRAY_H
#define LIBLOOMW_DATA_ARRAY_H
#include "../data.h"
#include "../unpacking.h"
......@@ -47,7 +47,7 @@ public:
private:
std::unique_ptr<DataUnpacker> unpacker;
std::vector<Id> types;
std::vector<loom::base::Id> types;
std::unique_ptr<std::shared_ptr<Data>[]> items;
size_t index;
Worker &worker;
......@@ -55,4 +55,4 @@ private:
}
#endif // LIBLOOM_DATA_ARRAY_H
#endif // LIBLOOMW_DATA_ARRAY_H
#ifndef LIBLOOM_EXTERNFILE_H
#define LIBLOOM_EXTERNFILE_H
#ifndef LIBLOOMW_EXTERNFILE_H
#define LIBLOOMW_EXTERNFILE_H
#include "../data.h"
......@@ -46,4 +46,4 @@ protected:
}
#endif // LIBLOOM_EXTERNFILE_H
#endif // LIBLOOMW_EXTERNFILE_H
#ifndef LIBLOOM_INDEX_H
#define LIBLOOM_INDEX_H
#ifndef LIBLOOMW_INDEX_H
#define LIBLOOMW_INDEX_H
#include "../data.h"
#include "../unpacking.h"
......@@ -50,4 +50,4 @@ public:
};
}
#endif // LIBLOOM_INDEX_H
#endif // LIBLOOMW_INDEX_H
#ifndef LIBLOOM_RAWDATA_H
#define LIBLOOM_RAWDATA_H
#ifndef LIBLOOMW_RAWDATA_H
#define LIBLOOMW_RAWDATA_H
#include "../data.h"
#include "../unpacking.h"
......@@ -71,4 +71,4 @@ private:
}
#endif // LIBLOOM_RAWDATA_H
#endif // LIBLOOMW_RAWDATA_H
#include "interconnect.h"
#include "worker.h"
#include "loomcomm.pb.h"
#include "libloom/loomcomm.pb.h"
#include "libloom/log.h"
#include "libloom/pbutils.h"
#include "libloom/sendbuffer.h"
......
#ifndef LOOM_INTERCONNECT_H
#define LOOM_INTERCONNECT_H
#ifndef LIBLOOMW_INTERCONNECT_H
#define LIBLOOMW_INTERCONNECT_H
#include "libloom/socket.h"
#include "libloom/listener.h"
......@@ -22,7 +22,7 @@ public:
InterConnection(Worker &worker);
~InterConnection();
void send(Id id, std::shared_ptr<Data> &data);
void send(base::Id id, std::shared_ptr<Data> &data);
std::string get_peername() {
return socket.get_peername();
......@@ -57,7 +57,7 @@ protected:
std::string address;
std::unique_ptr<DataUnpacker> unpacker;
Id unpacking_data_id;
base::Id unpacking_data_id;
static std::string make_address(const std::string &host, int port);
......@@ -66,4 +66,4 @@ protected:
}
#endif // LOOM_INTERCONNECT_H
#endif // LIBLOOMW_INTERCONNECT_H
......@@ -5,7 +5,7 @@ using namespace loom;
bool loom::Task::is_ready(const Worker &worker) const
{
for (Id id : inputs) {
for (auto id : inputs) {
if (!worker.has_data(id)) {
return false;
}
......
#ifndef LIBLOOM_TASK_H
#define LIBLOOM_TASK_H
#ifndef LIBLOOMW_TASK_H
#define LIBLOOMW_TASK_H
#include "types.h"
#include "libloom/types.h"
#include <vector>
#include <string>
......@@ -13,17 +13,17 @@ class Worker;
class Task {
public:
Task(Id id, int task_type, const std::string &config)
Task(base::Id id, int task_type, const std::string &config)
: id(id), task_type(task_type), config(config) {}
Task(Id id, int task_type, std::string &&config)
Task(base::Id id, int task_type, std::string &&config)
: id(id), task_type(task_type), config(std::move(config)) {}
Id get_id() const {
base::Id get_id() const {
return id;
}
Id get_task_type() const {
base::Id get_task_type() const {
return task_type;
}
......@@ -33,21 +33,21 @@ public:
bool is_ready(const Worker &worker) const;
void add_input(Id id) {
void add_input(base::Id id) {
inputs.push_back(id);
}
const std::vector<Id>& get_inputs() const {
const std::vector<base::Id>& get_inputs() const {
return inputs;
}
protected:
Id id;
Id task_type;
std::vector<Id> inputs;
base::Id id;
base::Id task_type;
std::vector<base::Id> inputs;
std::string config;
};
}
#endif // LIBLOOM_TASK_H
#endif // LIBLOOMW_TASK_H
#ifndef LIBLOOM_TASKREDIRECT_H
#define LIBLOOM_TASKREDIRECT_H
#ifndef LIBLOOMW_TASKREDIRECT_H
#define LIBLOOMW_TASKREDIRECT_H
#include "data.h"
......@@ -16,4 +16,4 @@ struct TaskDescription
}
#endif // LIBLOOM_TASKREDIRECT_H
#endif // LIBLOOMW_TASKREDIRECT_H
#ifndef LOOM_TASK_FACTORY_H
#define LOOM_TASK_FACTORY_H
#ifndef LIBLOOMW_TASK_FACTORY_H
#define LIBLOOMW_TASK_FACTORY_H
#include "data.h"
#include "task.h"
......@@ -45,4 +45,4 @@ public:
}
#endif // LOOM_TASK_FACTORY_H
#endif // LIBLOOMW_TASK_FACTORY_H
#ifndef LOOM_TASKINSTANCE_H
#define LOOM_TASKINSTANCE_H
#ifndef LIBLOOMW_TASKINSTANCE_H
#define LIBLOOMW_TASKINSTANCE_H
#include "data.h"
#include "task.h"
......@@ -30,11 +30,11 @@ public:
virtual ~TaskInstance();
int get_id() const {
base::Id get_id() const {
return task->get_id();
}
const std::vector<Id>& get_inputs() {
const std::vector<base::Id>& get_inputs() {
return task->get_inputs();
}
......@@ -56,4 +56,4 @@ protected:
}
#endif // LOOM_TASKINSTANCE_H
#endif // LIBLOOMW_TASKINSTANCE_H
#ifndef LIBLOOM_THREADJOB_H
#define LIBLOOM_THREADJOB_H
#ifndef LIBLOOMW_THREADJOB_H
#define LIBLOOMW_THREADJOB_H
#include "data.h"
#include "task.h"
......@@ -55,4 +55,4 @@ protected:
}
#endif // LIBLOOM_THREADJOB_H
#endif // LIBLOOMW_THREADJOB_H
#ifndef LOOM_TTINSTANCE_H
#define LOOM_TTINSTANCE_H
#ifndef LIBLOOMW_TTINSTANCE_H
#define LIBLOOMW_TTINSTANCE_H
#include "data.h"
#include "taskinstance.h"
......@@ -69,4 +69,4 @@ protected:
}
#endif // LOOM_TASKINSTANCE_H
#endif // LIBLOOMW_TASKINSTANCE_H
#ifndef LIBLOOM_UNPACKING_H
#define LIBLOOM_UNPACKING_H
#ifndef LIBLOOMW_UNPACKING_H
#define LIBLOOMW_UNPACKING_H
#include "data.h"
......@@ -29,4 +29,4 @@ using UnpackFactoryFn = std::function<std::unique_ptr<DataUnpacker>()>;
}
#endif // LIBLOOM_UNPACKING_H
#endif // LIBLOOMW_UNPACKING_H
#ifndef LOOM_UTILS_H
#define LOOM_UTILS_H
#ifndef LIBLOOMW_UTILS_H
#define LIBLOOMW_UTILS_H
#include <sys/types.h>
#include <libloom/log.h>
......@@ -14,4 +14,4 @@ size_t file_size(const char *path);
}
#endif // LOOM_UTILS_H
#endif // LIBLOOMW_UTILS_H
#include "worker.h"
#include "loomcomm.pb.h"
#include "utils.h"
#include "types.h"
#include "config.h"
#include "data/rawdata.h"
......@@ -15,6 +13,8 @@
#include "tasks/runtask.h"
#include "tasks/python.h"
#include "libloom/loomcomm.pb.h"
#include "libloom/types.h"
#include "libloom/log.h"
#include "libloom/sendbuffer.h"
#include "libloom/pbutils.h"
......@@ -338,7 +338,7 @@ void Worker::add_unpacker(const std::string &symbol, const UnpackFactoryFn &unpa
unregistered_unpack_ffs[symbol] = unpacker;
}
std::unique_ptr<DataUnpacker> Worker::get_unpacker(DataTypeId id)
std::unique_ptr<DataUnpacker> Worker::get_unpacker(base::Id id)
{
auto i = unpack_ffs.find(id);
assert(i != unpack_ffs.end());
......@@ -348,14 +348,14 @@ std::unique_ptr<DataUnpacker> Worker::get_unpacker(DataTypeId id)
void Worker::on_dictionary_updated()
{
for (auto &f : unregistered_task_factories) {
loom::Id id = dictionary.find_symbol_or_fail(f->get_name());
loom::base::Id id = dictionary.find_symbol_or_fail(f->get_name());
logger->debug("Registering task_factory: {} = {}", f->get_name(), id);
task_factories[id] = std::move(f);
}
unregistered_task_factories.clear();
for (auto &pair : unregistered_unpack_ffs) {
loom::Id id = dictionary.find_symbol_or_fail(pair.first);
loom::base::Id id = dictionary.find_symbol_or_fail(pair.first);
logger->debug("Registering unpack_factory: {} = {}", pair.first, id);
unpack_ffs[id] = pair.second;
}
......@@ -411,7 +411,7 @@ void Worker::task_failed(TaskInstance &task, const std::string &error_msg)
void Worker::task_redirect(TaskInstance &task,
std::unique_ptr<TaskDescription> new_task_desc)
{
loom::Id id = task.get_id();
loom::base::Id id = task.get_id();
logger->debug("Redirecting task id={} task_type={} n_inputs={}",
id, new_task_desc->task_type, new_task_desc->inputs.size());
remove_task(task, false);
......
#ifndef LOOM_WORKER_H
#define LOOM_WORKER_H
#ifndef LIBLOOMW_WORKER_H
#define LIBLOOMW_WORKER_H
#include "interconnect.h"
#include "taskinstance.h"
#include "unpacking.h"
#include "taskfactory.h"
#include "dictionary.h"
#include "libloom/dictionary.h"
#include "libloom/listener.h"
#include <uv.h>
......@@ -36,8 +36,8 @@ public:
void register_basic_tasks();
void new_task(std::unique_ptr<Task> task);
void send_data(const std::string &address, Id id, std::shared_ptr<Data> &data);
bool send_data(const std::string &address, Id id) {
void send_data(const std::string &address, base::Id id, std::shared_ptr<Data> &data);
bool send_data(const std::string &address, base::Id id) {
auto& data = public_data[id];
if (data.get() == nullptr) {
return false;
......@@ -49,15 +49,15 @@ public:
void task_finished(TaskInstance &task_instance, Data &data);
void task_failed(TaskInstance &task_instance, const std::string &error_msg);
void task_redirect(TaskInstance &task, std::unique_ptr<TaskDescription> new_task_desc);
void publish_data(Id id, const std::shared_ptr<Data> &data);
void remove_data(Id id);
void publish_data(base::Id id, const std::shared_ptr<Data> &data);
void remove_data(base::Id id);
bool has_data(Id id) const
bool has_data(base::Id id) const
{
return public_data.find(id) != public_data.end();
}
std::shared_ptr<Data>& get_data(Id id)
std::shared_ptr<Data>& get_data(base::Id id)
{
auto it = public_data.find(id);
assert(it != public_data.end());
......@@ -99,7 +99,7 @@ public:
return work_dir;
}
std::string get_run_dir(Id id);
std::string get_run_dir(base::Id id);
void check_waiting_tasks();
void check_ready_tasks();
......@@ -108,9 +108,9 @@ public:
void add_unpacker(const std::string &symbol, const UnpackFactoryFn &unpacker);
std::unique_ptr<DataUnpacker> get_unpacker(DataTypeId id);
std::unique_ptr<DataUnpacker> get_unpacker(base::Id id);
Dictionary& get_dictionary() {
base::Dictionary& get_dictionary() {
return dictionary;
}
......@@ -131,18 +131,18 @@ private:
std::vector<std::unique_ptr<TaskInstance>> active_tasks;
std::vector<std::unique_ptr<Task>> ready_tasks;
std::vector<std::unique_ptr<Task>> waiting_tasks;
std::unordered_map<Id, std::unique_ptr<TaskFactory>> task_factories;
std::unordered_map<base::Id, std::unique_ptr<TaskFactory>> task_factories;
std::unordered_map<int, std::shared_ptr<Data>> public_data;
std::string work_dir;
std::unordered_map<DataTypeId, UnpackFactoryFn> unpack_ffs;
std::unordered_map<base::Id, UnpackFactoryFn> unpack_ffs;
base::Socket server_conn;
std::unordered_map<std::string, std::unique_ptr<InterConnection>> connections;
std::vector<std::unique_ptr<InterConnection>> nonregistered_connections;
Dictionary dictionary;
base::Dictionary dictionary;
std::string server_address;
int server_port;
......@@ -157,4 +157,4 @@ private:
}
#endif // LOOM_WORKER_H
#endif // LIBLOOMW_WORKER_H
......@@ -29,7 +29,7 @@ add_executable(loom-server
$<TARGET_OBJECTS:loom-server-lib>
main.cpp)
target_link_libraries(loom-server libloom libloomw ${LIBUV_LIBRARY} pthread)
target_link_libraries(loom-server libloom ${LIBUV_LIBRARY} pthread)
target_link_libraries(loom-server ${PROTOBUF_LIBRARIES})
install (TARGETS loom-server DESTINATION bin)
......@@ -3,8 +3,8 @@
#include "libloomw/loomplan.pb.h"
#include "libloomw/loomcomm.pb.h"
#include "libloom/loomplan.pb.h"
#include "libloom/loomcomm.pb.h"
#include "libloom/log.h"
#include "libloom/compat.h"
#include "libloom/pbutils.h"
......@@ -48,7 +48,7 @@ void ClientConnection::on_message(const char *buffer, size_t size)
auto& task_manager = server.get_task_manager();
const loomplan::Plan &plan = submit.plan();
loom::Id id_base = server.new_id(plan.tasks_size());
loom::base::Id id_base = server.new_id(plan.tasks_size());
task_manager.add_plan(Plan(plan, id_base, server.get_dictionary()), submit.report());
logger->info("Plan submitted tasks={} report={}", plan.tasks_size(), submit.report());
}
......@@ -11,7 +11,7 @@ using namespace loom::base;
ComputationState::ComputationState(Server &server) : server(server)
{
loom::Dictionary &dictionary = server.get_dictionary();
Dictionary &dictionary = server.get_dictionary();
slice_task_id = dictionary.find_or_create("loom/base/slice");
get_task_id = dictionary.find_or_create("loom/base/get");
dslice_task_id = dictionary.find_or_create("loom/scheduler/dslice");
......@@ -58,7 +58,7 @@ void ComputationState::set_task_finished(const PlanNode&node, size_t size, size_
workers[wc].free_cpus += node.get_n_cpus();
}
void ComputationState::remove_state(loom::Id id)
void ComputationState::remove_state(loom::base::Id id)
{
auto it = states.find(id);
assert(it != states.end());
......@@ -67,7 +67,7 @@ void ComputationState::remove_state(loom::Id id)
void ComputationState::add_ready_nexts(const PlanNode &node)
{
for (loom::Id id : node.get_nexts()) {
for (loom::base::Id id : node.get_nexts()) {
const PlanNode &node = get_node(id);
if (is_ready(node)) {
if (node.get_policy() == PlanNode::POLICY_SCHEDULER) {
......@@ -84,7 +84,7 @@ bool ComputationState::is_finished() const
return states.empty();
}