Skip to content
Snippets Groups Projects
Select Git revision
  • 94e37e8f67df3d864f669b3409784b388c91cca8
  • master default
  • fixadd
  • dfs
  • exp
  • workercrash
  • s-dev
  • dev
  • v0.10
  • v0.9
  • v0.8
  • v0.7
  • v0.6
  • v0.4
  • v0.3
  • v0.2
16 results

server.cpp

Blame
  • user avatar
    Stanislav Bohm authored
    94e37e8f
    History
    server.cpp 5.90 KiB
    #include "server.h"
    
    #include "libloom/pbutils.h"
    #include "libloom/compat.h"
    #include "libloom/log.h"
    #include "libloom/fsutils.h"
    #include "pb/comm.pb.h"
    
    #include "loom_define.h"
    
    #include <sstream>
    
    using namespace loom;
    using namespace loom::base;
    
    Server::Server(uv_loop_t *loop, int port)
        : loop(loop),
          task_manager(*this),
          dummy_worker(*this),
          id_counter(0),
          task_distribution_active(false)
    {
        loom::base::logger->info("Starting loom server; version={}", LOOM_VERSION);
        /* Since the server do not implement fully resource management, we forces
         * symbol for the only schedulable resouce: loom/resource/cpus */
        dictionary.find_or_create("loom/resource/cpus");
    
        if (loop != NULL) {
            logger->info("Starting server on {}", port);
            listener.start(loop, port, [this]() {
                on_new_connection();
            });
            dummy_worker.start_listen();
            logger->debug("Dummy worker started at {}", dummy_worker.get_listen_port());
        }
    
        if (loop) {
            UV_CHECK(uv_idle_init(loop, &distribution_idle));
        }
    
        distribution_idle.data = this;
    }
    
    void Server::add_worker_connection(std::unique_ptr<WorkerConnection> &&conn)
    {
        connections.push_back(std::move(conn));
    }
    
    void Server::remove_worker_connection(WorkerConnection &conn)
    {
        task_manager.worker_fail(conn);
        auto i = std::find_if(
                    connections.begin(),
                    connections.end(),
                    [&](std::unique_ptr<WorkerConnection>& p) { return p.get() == &conn; } );
        assert(i != connections.end());
        connections.erase(i);
    }
    
    void Server::add_client_connection(std::unique_ptr<ClientConnection> conn)
    {
        // We now allow only one client
        assert(client_connection.get() == nullptr);
        client_connection = std::move(conn);
        assert(client_connection.get() != nullptr);
    }
    
    void Server::remove_client_connection(ClientConnection &conn)
    {
        assert(&conn == client_connection.get());
        client_connection.reset();
        task_manager.trash_all_tasks();
    }
    
    void Server::remove_freshconnection(FreshConnection &conn)
    {
        auto i = std::find_if(
                    fresh_connections.begin(),
                    fresh_connections.end(),
                    [&](std::unique_ptr<FreshConnection>& p) { return p.get() == &conn; } );
        assert(i != fresh_connections.end());
        fresh_connections.erase(i);
    }
    
    void Server::on_checkpoint_write_finished(loom::base::Id id, WorkerConnection *wc)
    {
        task_manager.on_checkpoint_write_finished(id, wc);
    }
    
    void Server::on_checkpoint_write_failed(loom::base::Id id, WorkerConnection *wc, const std::string &error_msg)
    {
        task_manager.on_checkpoint_write_failed(id, wc, error_msg);
    }
    
    void Server::on_checkpoint_load_finished(loom::base::Id id, WorkerConnection *wc, size_t size, size_t length)
    {
        task_manager.on_checkpoint_load_finished(id, wc, size, length);
    }
    
    void Server::on_checkpoint_load_failed(loom::base::Id id, WorkerConnection *wc, const std::string &error_msg)
    {
        task_manager.on_checkpoint_load_failed(id, wc, error_msg);
    }
    
    void Server::on_task_finished(loom::base::Id id, size_t size, size_t length, WorkerConnection *wc, bool checkpointing)
    {
        task_manager.on_task_finished(id, size, length, wc, checkpointing);
    }
    
    void Server::on_data_transferred(loom::base::Id id, WorkerConnection *wc)
    {
        task_manager.on_data_transferred(id, wc);
    }
    
    void Server::on_task_failed(Id id, WorkerConnection *wc, const std::string &error_msg)
    {
        task_manager.on_task_failed(id, wc, error_msg);
    }
    
    void Server::send_dictionary(loom::base::Socket &socket)
    {
        using namespace loom::pb::comm;
        WorkerCommand msg;
        msg.set_type(WorkerCommand_Type_DICTIONARY);
        std::vector<std::string> symbols = dictionary.get_all_symbols();
        for (std::string &symbol : symbols) {
            std::string *s = msg.add_symbols();
            *s = symbol;
        }
        loom::base::send_message(socket, msg);
    }
    
    int Server::get_worker_ncpus()
    {
        int count = 0;
        for (auto &w : connections) {
            count += w->get_resource_cpus();
        }
        return count;
    }
    
    void Server::on_new_connection()
    {
        auto connection = std::make_unique<FreshConnection>(*this);
        connection->accept(listener);
        fresh_connections.push_back(std::move(connection));
    }
    
    void Server::need_task_distribution()
    {
        if (task_distribution_active) {
            return;
        }
        task_distribution_active = true;
        UV_CHECK(uv_idle_start(&distribution_idle, _distribution_callback));
    }
    
    void Server::create_trace(const std::string &trace_path)
    {
        // Prepare trace
        logger->info("Trace directory: {}", trace_path);
        loom::base::make_path(trace_path.c_str(), S_IRWXU);
    
        uv_update_time(loop);
        trace = std::make_unique<ServerTrace>(loop);
        if (!trace->open(trace_path + "/server.ltrace")) {
            logger->error("Server trace file could not be created in {}", trace_path);
            trace.reset();
            return;
        }
    
        trace_dir = trace_path;
    
        for (auto &wc : connections) {
            wc->create_trace(trace_path);
        }
    
        trace->entry("TRACE", "server");
        trace->entry("VERSION", LOOM_VERSION);
    
        for (auto &symbol : dictionary.get_all_symbols()) {
            trace->entry("SYMBOL", symbol);
        }
    
        for (auto &wc : connections) {
            trace->trace_worker(wc.get());
        }
    }
    
    void Server::terminate()
    {
        /*for (auto &wc : connections) {
            wc->terminate();
        }
        uv_stop(loop);*/
        if (trace) {
            trace->flush();
        }
        exit(0);
    }
    
    void Server::create_file_in_trace_dir(const std::string &filename, const char *data, size_t size)
    {
        std::ofstream f(trace_dir + '/' + filename, std::ofstream::binary);
        if (!f.is_open()) {
            logger->error("Cannot open trace file for writing: {}", filename);
            return;
        }
        f.write(data, size);
    }
    
    void Server::_distribution_callback(uv_idle_t *idle)
    {
        UV_CHECK(uv_idle_stop(idle));
        Server *server = static_cast<Server*>(idle->data);
        server->task_distribution_active = false;
        server->task_manager.run_task_distribution();
    }