Commit ff1f1012 authored by Stanislav Bohm's avatar Stanislav Bohm

RF: Logging moved from libloomw to libloom

parent 82a58ff1
......@@ -10,23 +10,23 @@ using namespace loom::base;
namespace loom {
namespace base {
std::shared_ptr<spdlog::logger> log = spdlog::stdout_logger_mt("net", true);
std::shared_ptr<spdlog::logger> logger = spdlog::stdout_logger_mt("net", true);
}}
void loom::base::report_uv_error(int error_code, int line_number, const char *filename)
{
log->critical("libuv fail: {} ({}:{})", uv_strerror(error_code), filename, line_number);
logger->critical("libuv fail: {} ({}:{})", uv_strerror(error_code), filename, line_number);
exit(1);
}
void loom::base::log_errno_abort(const char *tmp)
{
log->critical("{}: {}", tmp, strerror(errno));
logger->critical("{}: {}", tmp, strerror(errno));
abort();
}
void loom::base::log_errno_abort(const char *tmp, const char *tmp2)
{
log->critical("{}: {} ({})", tmp, strerror(errno), tmp2);
logger->critical("{}: {} ({})", tmp, strerror(errno), tmp2);
abort();
}
......@@ -21,7 +21,7 @@ void report_uv_error(int error_code, int line_number, const char *filename) __at
void log_errno_abort(const char *tmp) __attribute__ ((noreturn));
void log_errno_abort(const char *tmp, const char *tmp2) __attribute__ ((noreturn));
extern std::shared_ptr<spdlog::logger> log;
extern std::shared_ptr<spdlog::logger> logger;
}
}
......
......@@ -45,8 +45,6 @@ add_library(libloomw
loomcomm.pb.cc
loomplan.pb.h
loomplan.pb.cc
log.h
log.cpp
types.h
config.cpp
config.h
......
#include "data.h"
#include "worker.h"
#include "log.h"
using namespace loom;
......
#include "array.h"
#include "libloom/compat.h"
#include "libloom/log.h"
#include "../worker.h"
#include "../log.h"
using namespace loom;
using namespace loom::base;
Array::Array(size_t length, std::unique_ptr<std::shared_ptr<Data>[]> items)
: length(length), items(std::move(items))
......@@ -14,7 +15,7 @@ Array::Array(size_t length, std::unique_ptr<std::shared_ptr<Data>[]> items)
Array::~Array()
{
llog->debug("Disposing array");
logger->debug("Disposing array");
}
size_t Array::get_size()
......
#include "externfile.h"
#include "../utils.h"
#include "../log.h"
#include "libloom/log.h"
#include <assert.h>
#include <sys/types.h>
......@@ -10,6 +10,7 @@
using namespace loom;
using namespace loom::base;
std::string ExternFile::get_type_name() const
{
......@@ -46,7 +47,7 @@ std::string ExternFile::get_filename() const
void ExternFile::open()
{
llog->debug("Opening extern file {}", filename);
logger->debug("Opening extern file {}", filename);
int fd = ::open(filename.c_str(), O_RDONLY, S_IRUSR | S_IWUSR);
if (fd < 0) {
base::log_errno_abort("open");
......@@ -74,7 +75,7 @@ void ExternFile::map(int fd, bool write)
}
data = (char*) mmap(0, size, flags, MAP_SHARED, fd, 0);
if (data == MAP_FAILED) {
llog->critical("Cannot mmap '{}' size={}", filename, size);
logger->critical("Cannot mmap '{}' size={}", filename, size);
base::log_errno_abort("mmap");
}
}
......@@ -2,10 +2,11 @@
#include "index.h"
#include "rawdata.h"
#include "../log.h"
#include "libloom/log.h"
#include "../worker.h"
using namespace loom;
using namespace loom::base;
Index::Index(Worker &worker, std::shared_ptr<Data> &data, size_t length, std::unique_ptr<size_t[]> indices)
: worker(worker), data(data), length(length), indices(std::move(indices))
......@@ -15,7 +16,7 @@ Index::Index(Worker &worker, std::shared_ptr<Data> &data, size_t length, std::un
Index::~Index()
{
llog->debug("Disposing index");
logger->debug("Disposing index");
}
std::string Index::get_type_name() const
......@@ -82,7 +83,7 @@ std::shared_ptr<Data> Index::get_slice(size_t from, size_t to)
size_t Index::serialize(Worker &worker, loom::base::SendBuffer &buffer, std::shared_ptr<Data> &data_ptr)
{
llog->critical("Index::serialize_data");
logger->critical("Index::serialize_data");
abort();
}
......
#include "rawdata.h"
#include "../log.h"
#include "libloom/log.h"
#include "libloom/sendbuffer.h"
#include "../utils.h"
#include "../worker.h"
#include "libloom/sendbuffer.h"
#include <sstream>
#include <assert.h>
......@@ -14,6 +15,7 @@
#include <unistd.h>
using namespace loom;
using namespace loom::base;
size_t RawData::file_id_counter = 1;
......@@ -24,7 +26,7 @@ RawData::RawData()
RawData::~RawData()
{
llog->debug("Disposing raw data filename={} size={}", filename, size);
logger->debug("Disposing raw data filename={} size={}", filename, size);
if (filename.empty()) {
assert(data == nullptr);
......@@ -55,7 +57,7 @@ char* RawData::init_empty(const std::string &work_dir, size_t size)
int fd = ::open(filename.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
if (fd < 0) {
llog->critical("Cannot open data {} for writing", filename);
logger->critical("Cannot open data {} for writing", filename);
log_errno_abort("open");
}
......@@ -103,7 +105,7 @@ void RawData::open() const
assert(!filename.empty());
int fd = ::open(filename.c_str(), O_RDONLY, S_IRUSR | S_IWUSR);
if (fd < 0) {
llog->critical("Cannot open data {}", filename);
logger->critical("Cannot open data {}", filename);
log_errno_abort("open");
}
map(fd, false);
......@@ -128,7 +130,7 @@ void RawData::map(int fd, bool write) const
}
data = (char*) mmap(0, size, flags, MAP_SHARED, fd, 0);
if (data == MAP_FAILED) {
llog->critical("Cannot mmap data filename={}", filename);
logger->critical("Cannot mmap data filename={}", filename);
log_errno_abort("mmap");
}
}
......
#include "dictionary.h"
#include "log.h"
#include "libloom/log.h"
#include <assert.h>
using namespace loom;
using namespace loom::base;
Dictionary::Dictionary()
{
......@@ -15,7 +16,7 @@ Id Dictionary::find_symbol_or_fail(const std::string &symbol) const
{
auto i = symbol_to_id.find(symbol);
if(i == symbol_to_id.end()) {
llog->critical("Unknown symbol '{}'", symbol);
logger->critical("Unknown symbol '{}'", symbol);
exit(1);
}
assert(i->second != -1);
......
#include "interconnect.h"
#include "worker.h"
#include "loomcomm.pb.h"
#include "log.h"
#include "libloom/log.h"
#include "libloom/pbutils.h"
#include "libloom/sendbuffer.h"
#include <sstream>
using namespace loom;
using namespace loom::base;
InterConnection::InterConnection(Worker &worker)
: socket(worker.get_loop()), worker(worker), unpacking_data_id(-1)
{
socket.set_on_close([this]() {
llog->debug("Interconnection closed");
logger->debug("Interconnection closed");
this->worker.unregister_connection(*this);
});
......@@ -37,7 +38,7 @@ InterConnection::~InterConnection()
void InterConnection::on_connect()
{
llog->info("Connected to {}", get_address());
logger->info("Connected to {}", get_address());
loomcomm::Announce msg;
msg.set_port(worker.get_listen_port());
......@@ -72,7 +73,7 @@ void InterConnection::on_message(const char *buffer, size_t size)
loomcomm::DataHeader msg;
assert(msg.ParseFromArray(buffer, size));
unpacking_data_id = msg.id();
llog->debug("Interconnect: Receving data_id={}", unpacking_data_id);
logger->debug("Interconnect: Receving data_id={}", unpacking_data_id);
unpacker = worker.get_unpacker(msg.type_id());
switch(unpacker->get_initial_mode()) {
case DataUnpacker::MESSAGE:
......@@ -91,7 +92,7 @@ void InterConnection::on_message(const char *buffer, size_t size)
assert(msg.ParseFromArray(buffer, size));
std::stringstream s;
address = make_address(get_peername(), msg.port());
llog->debug("Interconnection from worker {} accepted", address);
logger->debug("Interconnection from worker {} accepted", address);
worker.register_connection(*this);
}
}
......
#include "log.h"
namespace loom {
std::shared_ptr<spdlog::logger> llog = spdlog::stdout_logger_mt("loom", true);
}
#ifndef LIBLOOM_LOG
#define LIBLOOM_LOG
#include "spdlog/spdlog.h"
namespace loom {
/** Main loom log */
extern std::shared_ptr<spdlog::logger> llog;
}
#endif
#include "taskinstance.h"
#include "worker.h"
#include "utils.h"
#include "log.h"
#include "libloom/log.h"
#include <sstream>
using namespace loom;
using namespace loom::base;
TaskInstance::~TaskInstance()
{
......@@ -23,7 +24,7 @@ const std::string TaskInstance::get_task_dir()
}
if (mkdir(name.c_str(), S_IRWXU)) {
llog->critical("Cannot create directory {}", name);
logger->critical("Cannot create directory {}", name);
log_errno_abort("mkdir");
}
......
#include "python.h"
#include "../data/rawdata.h"
#include "../log.h"
#include "libloom/log.h"
#include "libloom/compat.h"
#include "../worker.h"
#include "../data/rawdata.h"
#include "python_wrapper.h"
#include <Python.h>
using namespace loom;
using namespace loom::base;
/** Ensures that python is initialized,
* if already initialized, then does nothing */
......@@ -187,7 +190,7 @@ std::shared_ptr<Data> PyCallJob::run()
void PyCallJob::set_python_error()
{
loom::llog->error("Python error in task id={}", task.get_id());
logger->error("Python error in task id={}", task.get_id());
PyObject *excType, *excValue, *excTraceback;
PyErr_Fetch(&excType, &excValue, &excTraceback);
assert(excType);
......@@ -210,7 +213,7 @@ void PyCallJob::set_python_error()
assert(str);
std::string error_msg(str, size);
loom::llog->error("Python exception: {}", error_msg);
logger->error("Python exception: {}", error_msg);
set_error(std::string("Python exception:\n") + error_msg);
......
......@@ -4,7 +4,7 @@
#include "libloomw/data/rawdata.h"
#include "libloomw/data/index.h"
#include "libloomw/data/externfile.h"
#include "libloomw/log.h"
#include "libloom/log.h"
#include "libloomw/worker.h"
#include <string.h>
......
......@@ -4,7 +4,7 @@
#include "libloomw/worker.h"
#include "libloomw/data/rawdata.h"
#include "libloomw/data/array.h"
#include "libloomw/log.h"
#include "libloom/log.h"
#include "loomrun.pb.h"
#include "libloomw/utils.h"
......@@ -12,6 +12,7 @@
#include <fstream>
using namespace loom;
using namespace loom::base;
RunTask::RunTask(Worker &worker, std::unique_ptr<Task> task)
: TaskInstance(worker, std::move(task)), exit_status(0)
......@@ -26,7 +27,7 @@ void RunTask::start(DataVector &inputs)
{
std::string run_dir = worker.get_run_dir(get_id());
if (make_path(run_dir.c_str(), S_IRWXU)) {
llog->critical("Cannot make {}", run_dir);
logger->critical("Cannot make {}", run_dir);
log_errno_abort("make_path");
}
......@@ -69,10 +70,10 @@ void RunTask::start(DataVector &inputs)
std::string filename = inputs[i]->get_filename();
assert(!filename.empty());
if (!task->get_inputs().empty()) {
llog->debug("Creating symlink of '{}' for input id={} filename={}",
logger->debug("Creating symlink of '{}' for input id={} filename={}",
msg.map_inputs(i), task->get_inputs()[i], filename);
} else {
llog->debug("Creating symlink of '{}' for filename={}",
logger->debug("Creating symlink of '{}' for filename={}",
msg.map_inputs(i), filename);
}
if (symlink(filename.c_str(), path.c_str())) {
......@@ -126,13 +127,13 @@ void RunTask::start(DataVector &inputs)
std::string work_dir = worker.get_run_dir(get_id());
options.cwd = work_dir.c_str();
if (llog->level() == spdlog::level::debug) {
if (logger->level() == spdlog::level::debug) {
std::stringstream s;
s << msg.args(0);
for (int i = 1; i < args_size; i++) {
s << ' ' << msg.args(i);
}
llog->debug("Running command {}", s.str());
logger->debug("Running command {}", s.str());
}
......@@ -179,7 +180,7 @@ void RunTask::_on_exit(uv_process_t *process, int64_t exit_status, int term_sign
void RunTask::_on_close(uv_handle_t *handle)
{
RunTask *task = static_cast<RunTask*>(handle->data);
llog->debug("Process id={} finished (exit_status={})", task->get_id(), task->exit_status);
logger->debug("Process id={} finished (exit_status={})", task->get_id(), task->exit_status);
if (task->exit_status) {
std::stringstream s;
......@@ -216,10 +217,10 @@ void RunTask::_on_close(uv_handle_t *handle)
std::string path = task->get_path(msg.map_outputs(0));
std::string data_path = data.get_filename();
llog->debug("Returning file '{}'' as result", msg.map_outputs(0));
logger->debug("Returning file '{}'' as result", msg.map_outputs(0));
if (unlikely(rename(path.c_str(),
data_path.c_str()))) {
llog->critical("Cannot move {} to {}",
logger->critical("Cannot move {} to {}",
path, data_path);
log_errno_abort("rename");
}
......@@ -238,11 +239,11 @@ void RunTask::_on_close(uv_handle_t *handle)
data.assign_filename(task->worker.get_work_dir());
std::string path = task->get_path(msg.map_outputs(i));
std::string data_path = data.get_filename();
llog->debug("Storing file '{}'' as index={}", msg.map_outputs(i), i);
logger->debug("Storing file '{}'' as index={}", msg.map_outputs(i), i);
//data->create(task->worker, 10);
if (unlikely(rename(path.c_str(),
data_path.c_str()))) {
llog->critical("Cannot move {} to {}",
logger->critical("Cannot move {} to {}",
path, data_path);
log_errno_abort("rename");
}
......@@ -265,7 +266,7 @@ void RunTask::_on_read(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
return;
}
if (nread < 0) {
llog->critical("Invalid read in task id={}", task->get_id());
logger->critical("Invalid read in task id={}", task->get_id());
exit(1);
}
......
#include "utils.h"
#include "log.h"
#include "libloom/log.h"
#include "uv.h"
#include <errno.h>
......@@ -8,13 +8,7 @@
#include <limits.h>
using namespace loom;
void loom::report_uv_error(int error_code, int line_number, const char *filename)
{
llog->critical("libuv fail: {} ({}:{})", uv_strerror(error_code), filename, line_number);
exit(1);
}
using namespace loom::base;
static bool is_directory(const char *path)
{
......@@ -68,15 +62,3 @@ int loom::make_path(const char *path, mode_t mode)
}
return 0;
}
void loom::log_errno_abort(const char *tmp)
{
llog->critical("{}: {}", tmp, strerror(errno));
abort();
}
void loom::log_errno_abort(const char *tmp, const char *tmp2)
{
llog->critical("{}: {} ({})", tmp, strerror(errno), tmp2);
abort();
}
......@@ -9,14 +9,6 @@
namespace loom {
#define likely(x) __builtin_expect((x),1)
#define unlikely(x) __builtin_expect((x),0)
void report_uv_error(int error_code, int line_number, const char *filename) __attribute__ ((noreturn));
void log_errno_abort(const char *tmp) __attribute__ ((noreturn));
void log_errno_abort(const char *tmp, const char *tmp2) __attribute__ ((noreturn));
int make_path(const char *path, mode_t mode);
size_t file_size(const char *path);
......
This diff is collapsed.
......@@ -5,12 +5,13 @@
#include "libloomw/loomplan.pb.h"
#include "libloomw/loomcomm.pb.h"
#include "libloomw/log.h"
#include "libloom/log.h"
#include "libloom/compat.h"
#include "libloom/pbutils.h"
using namespace loom;
using namespace loom::base;
ClientConnection::ClientConnection(Server &server,
std::unique_ptr<loom::base::Socket> socket)
......@@ -20,11 +21,11 @@ ClientConnection::ClientConnection(Server &server,
on_message(buffer, size);
});
this->socket->set_on_close([this](){
llog->info("Client disconnected");
logger->info("Client disconnected");
this->server.remove_client_connection(*this);
});
llog->info("Client {} connected", this->socket->get_peername());
logger->info("Client {} connected", this->socket->get_peername());
// Send dictionary
loomcomm::ClientMessage cmsg;
......@@ -41,7 +42,7 @@ ClientConnection::ClientConnection(Server &server,
void ClientConnection::on_message(const char *buffer, size_t size)
{
llog->debug("Plan received");
logger->debug("Plan received");
loomcomm::ClientSubmit submit;
submit.ParseFromArray(buffer, size);
auto& task_manager = server.get_task_manager();
......@@ -49,5 +50,5 @@ void ClientConnection::on_message(const char *buffer, size_t size)
const loomplan::Plan &plan = submit.plan();
loom::Id id_base = server.new_id(plan.tasks_size());
task_manager.add_plan(Plan(plan, id_base, server.get_dictionary()), submit.report());
llog->info("Plan submitted tasks={} report={}", plan.tasks_size(), submit.report());
logger->info("Plan submitted tasks={} report={}", plan.tasks_size(), submit.report());
}
......@@ -3,10 +3,11 @@
#include "workerconn.h"
#include "server.h"
#include "libloomw/log.h"
#include "libloom/log.h"
constexpr static double TRANSFER_COST_COEF = 1.0 / (1024 * 1024); // 1MB = 1cost
using namespace loom::base;
ComputationState::ComputationState(Server &server) : server(server)
{
......@@ -85,7 +86,7 @@ bool ComputationState::is_finished() const
void ComputationState::add_pending_task(loom::Id id)
{
loom::llog->debug("Add pending task and creating state id={}", id);
logger->debug("Add pending task and creating state id={}", id);
auto pair = states.emplace(std::make_pair(id, TaskState(get_node(id))));
assert(pair.second);
pending_tasks.insert(id);
......@@ -100,7 +101,7 @@ void ComputationState::expand_node(const PlanNode &node)
} else if (id == dget_task_id) {
expand_dget(node);
} else {
loom::llog->critical("Unknown scheduler task: {}", id);
logger->critical("Unknown scheduler task: {}", id);
exit(1);
}
}
......@@ -145,8 +146,8 @@ void ComputationState::expand_dslice(const PlanNode &node)
loom::Id id_base1 = server.new_id(configs.size());
loom::Id id_base2 = server.new_id(configs.size());
loom::llog->debug("Expanding 'dslice' id={} length={} pieces={} new_id_base={}",
node1.get_id(), length, configs.size(), id_base1);
logger->debug("Expanding 'dslice' id={} length={} pieces={} new_id_base={}",
node1.get_id(), length, configs.size(), id_base1);
PlanNode new_node(node1.get_id(),-1, PlanNode::POLICY_SIMPLE, -1, false,
slice_task_id, "", node1.get_inputs());
......@@ -173,8 +174,8 @@ void ComputationState::expand_dget(const PlanNode &node)
loom::Id id_base1 = server.new_id(configs.size());
loom::Id id_base2 = server.new_id(configs.size());
loom::llog->debug("Expanding 'dget' id={} length={} new_id_base={}",
node1.get_id(), length, id_base1);
logger->debug("Expanding 'dget' id={} length={} new_id_base={}",
node1.get_id(), length, id_base1);
PlanNode new_node(node1.get_id(),-1, PlanNode::POLICY_SIMPLE, -1, false,
get_task_id, "", node1.get_inputs());
......@@ -261,7 +262,7 @@ TaskState &ComputationState::get_state(loom::Id id)
{
auto it = states.find(id);
if (it == states.end()) {
loom::llog->critical("Cannot find state for id={}", id);
logger->critical("Cannot find state for id={}", id);
abort();
}
return it->second;
......@@ -272,7 +273,7 @@ TaskState &ComputationState::get_state(loom::Id id)
{
auto it = states.find(id);
if (it == states.end()) {
loom::llog->debug("Creating state id={}", id);
loom::logger->debug("Creating state id={}", id);
auto p = states.emplace(std::make_pair(id, TaskState(get_node(id))));
it = p.first;
}
......
......@@ -5,8 +5,7 @@
#include <libloom/compat.h>
#include <libloom/pbutils.h>
#include <libloomw/utils.h>
#include <libloomw/log.h>
#include <libloom/log.h>
#include <libloomw/loomcomm.pb.h>
......@@ -14,6 +13,7 @@
#include <assert.h>
using namespace loom;
using namespace loom::base;
DummyWorker::DummyWorker(Server &server)
: server(server)
......@@ -30,7 +30,7 @@ void DummyWorker::start_listen()
listener.start(loop, 0, [this]() {
auto connection = std::make_unique<DWConnection>(*this);
connection->accept(listener);
llog->debug("Worker data connection from {}", connection->get_peername());
logger->debug("Worker data connection from {}", connection->get_peername());
connections.push_back(std::move(connection));
});
}
......@@ -46,7 +46,7 @@ DWConnection::DWConnection(DummyWorker &worker)
: worker(worker), socket(worker.get_server().get_loop()), remaining_messages(0), registered(false)
{
this->socket.set_on_close([this]() {
llog->critical("Worker closing data connection from {}", this->socket.get_peername());
logger->critical("Worker closing data connection from {}", this->socket.get_peername());
assert(0);
});
......@@ -80,7 +80,7 @@ void DWConnection::on_message(const char *buffer, size_t size)
remaining_messages--;
if (remaining_messages == 0) {
llog->debug("DummyWorker: Resending data to client");
logger->debug("DummyWorker: Resending data to client");
auto& server = worker.get_server();
assert(server.has_client_connection());
server.get_client_connection().send(std::move(send_buffer));
......@@ -99,7 +99,7 @@ void DWConnection::on_message(const char *buffer, size_t size)
auto data_id = msg.id();
auto client_id = worker.server.translate_to_client_id(data_id);
msg.set_id(client_id);
llog->debug("DummyWorker: Capturing data for client data_id={} (messages={})", data_id, remaining_messages);
logger->debug("DummyWorker: Capturing data for client data_id={} (messages={})", data_id, remaining_messages);
loomcomm::ClientMessage cmsg;
cmsg.set_type(loomcomm::ClientMessage_Type_DATA);
......
......@@ -4,19 +4,20 @@
#include "server.h"
#include "libloom/compat.h"
#include "libloomw/log.h"
#include "libloom/log.h"
#include "libloomw/loomcomm.pb.h"
#include <sstream>
using namespace loom;
using namespace loom::base;
FreshConnection::FreshConnection(Server &server) :
server(server),
socket(std::make_unique<base::Socket>(server.get_loop()))
{
socket->set_on_close([this](){
llog->error("Connection closed without registration");
logger->error("Connection closed without registration");
this->server.remove_freshconnection(*this);
});
......@@ -27,7 +28,7 @@ FreshConnection::FreshConnection(Server &server) :
void FreshConnection::accept(loom::base::Listener &listener) {
listener.accept(*socket);
llog->debug("New connection to server ({})", socket->get_peername());
logger->debug("New connection to server ({})", socket->get_peername());
}
void FreshConnection::on_message(const char *buffer, size_t size)
......@@ -35,14 +36,14 @@ void FreshConnection::on_message(const char *buffer, size_t size)
loomcomm::Register msg;
bool r = msg.ParseFromArray(buffer, size);
if (!r) {
llog->error("Invalid registration message from {}",
logger->error("Invalid registration message from {}",
socket->get_peername());
socket->close_and_discard_remaining_data();
return;
}
if (msg.protocol_version() != PROTOCOL_VERSION) {
llog->error("Connection from {} registered with invalid protocol version",
logger->error("Connection from {} registered with invalid protocol version",
socket->get_peername());
socket->close_and_discard_remaining_data();
return;
......@@ -83,6 +84,6 @@ void FreshConnection::on_message(const char *buffer, size_t size)
server.remove_freshconnection(*this);
return;
}
llog->error("Invalid registration from {}", socket->get_peername());
logger->error("Invalid registration from {}", socket->get_peername());
socket->close_and_discard_remaining_data();
}
......@@ -3,7 +3,7 @@
#include <algorithm>
#include "libloomw/loomplan.pb.h"
#include "libloomw/log.h"
#include "libloom/log.h"