Commit 88b95e1e authored by Stanislav Bohm's avatar Stanislav Bohm

New standard task: "open"

parent 493af0b2
......@@ -39,6 +39,14 @@ class MergeTask(Task):
self.inputs = inputs
class OpenTask(Task):
task_type = "open"
def __init__(self, filename):
self.config = filename
class RunTask(Task):
task_type = "run"
......@@ -128,6 +136,9 @@ class Plan(object):
def task_const(self, data):
return self.add(ConstTask(data))
def task_open(self, filename):
return self.add(OpenTask(filename))
def task_merge(self, inputs):
return self.add(MergeTask(inputs))
......
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14 -g -Wall")
add_library(libloom
data/externfile.cpp
data/externfile.h
data/rawdata.h
data/rawdata.cpp
connection.cpp
connection.h
worker.cpp
......@@ -13,8 +17,6 @@ add_library(libloom
data.h
unpacking.cpp
unpacking.h
rawdata.h
rawdata.cpp
interconnect.h
interconnect.cpp
task.cpp
......
#include "externfile.h"
#include "../utils.h"
#include "../log.h"
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
using namespace loom;
ExternFile::ExternFile(const std::string &filename)
: data(nullptr), filename(filename)
{
size = file_size(filename.c_str());
}
ExternFile::~ExternFile()
{
if (data) {
munmap(data, size);
}
}
std::string loom::ExternFile::get_info()
{
return "<ExternFile '" + filename + "'>";
}
void ExternFile::serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr)
{
assert(0); // TODO
}
std::string ExternFile::get_filename(Worker &worker) const
{
return filename;
}
void ExternFile::open()
{
llog->debug("Opening extern file {}", filename);
int fd = ::open(filename.c_str(), O_RDONLY, S_IRUSR | S_IWUSR);
if (fd < 0) {
log_errno_abort("open");
}
map(fd, false);
::close(fd);
}
void ExternFile::map(int fd, bool write)
{
assert(data == nullptr);
assert(fd >= 0);
int flags = PROT_READ;
if (write) {
flags |= PROT_WRITE;
}
data = (char*) mmap(0, size, flags, MAP_SHARED, fd, 0);
if (data == MAP_FAILED) {
llog->critical("Cannot mmap '{}' size={}", filename, size);
log_errno_abort("mmap");
}
}
#ifndef LIBLOOM_EXTERNFILE_H
#define LIBLOOM_EXTERNFILE_H
#include "../data.h"
namespace loom {
class ExternFile : public Data {
public:
static const int TYPE_ID = 301;
ExternFile(const std::string &filename);
~ExternFile();
int get_type_id() {
return TYPE_ID;
}
size_t get_size() {
return size;
}
char *get_raw_data(Worker &worker)
{
if (data == nullptr) {
open();
}
return data;
}
std::string get_info();
void serialize_data(Worker &worker, SendBuffer &buffer, std::shared_ptr<Data> &data_ptr);
std::string get_filename(Worker &worker) const;
private:
void open();
void map(int fd, bool write);
char *data;
size_t size;
std::string filename;
static size_t file_id_counter;
};
}
#endif // LIBLOOM_EXTERNFILE_H
#include "rawdata.h"
#include "log.h"
#include "utils.h"
#include "worker.h"
#include "../log.h"
#include "../utils.h"
#include "../worker.h"
#include <sstream>
#include <assert.h>
......@@ -106,13 +106,6 @@ void RawData::open(Worker &worker)
llog->critical("Cannot open data {}", get_filename(worker));
log_errno_abort("open");
}
struct stat finfo;
memset(&finfo, 0, sizeof(finfo));
if (fstat(fd, &finfo) == -1)
{
log_errno_abort("fstat");
}
size = finfo.st_size;
map(fd, false);
::close(fd);
}
......@@ -130,6 +123,7 @@ void RawData::map(int fd, bool write)
}
data = (char*) mmap(0, size, flags, MAP_SHARED, fd, 0);
if (data == MAP_FAILED) {
llog->critical("Cannot mmap data file_id={}", file_id);
log_errno_abort("mmap");
}
}
......
#ifndef LIBLOOM_RAWDATA_H
#define LIBLOOM_RAWDATA_H
#include "data.h"
#include "../data.h"
namespace loom {
......@@ -37,7 +37,6 @@ public:
void init_from_file(Worker &worker);
std::string get_filename(Worker &worker) const;
int get_fd(Worker &worker) const;
private:
......@@ -52,6 +51,7 @@ private:
};
class RawDataUnpacker : public DataUnpacker
{
public:
......
......@@ -3,7 +3,7 @@
#include "utils.h"
#include "log.h"
#include "types.h"
#include "rawdata.h"
#include "data/rawdata.h"
#include <stdlib.h>
#include <sstream>
......
#include "dummyworker.h"
#include "server.h"
#include <libloom/utils.h>
#include <libloom/log.h>
#include <libloom/loomcomm.pb.h>
#include <libloom/sendbuffer.h>
#include <sstream>
#include <assert.h>
using namespace loom;
DummyWorker::DummyWorker(Server &server)
: server(server), listen_port(-1)
{
UV_CHECK(uv_tcp_init(server.get_loop(), &listen_socket));
listen_socket.data = this;
}
void DummyWorker::start_listen()
{
struct sockaddr_in addr;
UV_CHECK(uv_ip4_addr("0.0.0.0", 0, &addr));
UV_CHECK(uv_tcp_bind(&listen_socket, (const struct sockaddr *) &addr, 0));
UV_CHECK(uv_listen((uv_stream_t *) &listen_socket, 10, _on_new_connection));
struct sockaddr_in sockname;
int namelen = sizeof(sockname);
uv_tcp_getsockname(&listen_socket, (sockaddr*) &sockname, &namelen);
listen_port = ntohs(sockname.sin_port);
}
std::string DummyWorker::get_address() const
{
std::stringstream s;
s << "!:" << get_listen_port();
return s.str();
}
void DummyWorker::_on_new_connection(uv_stream_t *stream, int status)
{
UV_CHECK(status);
DummyWorker *worker = static_cast<DummyWorker*>(stream->data);
auto connection = std::make_unique<DWConnection>(*worker);
connection->accept(&worker->listen_socket);
llog->debug("Worker data connection from {}", connection->get_peername());
worker->connections.push_back(std::move(connection));
}
DWConnection::DWConnection(DummyWorker &worker)
: SimpleConnectionCallback(worker.server.get_loop()), worker(worker), registered(false)
{
}
DWConnection::~DWConnection()
{
}
void DWConnection::on_message(const char *buffer, size_t size)
{
if (!registered) {
// This is first message: Announce, we do not care, so we drop it
registered = true;
return;
}
assert(this->send_buffer.get() == nullptr);
send_buffer = std::make_unique<SendBuffer>();
loomcomm::DataPrologue msg;
msg.ParseFromArray(buffer, size);
send_buffer->add(msg);
assert(msg.has_data_size());
size_t data_size = msg.data_size();
llog->debug("Fetching data id={} data_size={}", msg.id(), data_size);
auto mem = std::make_unique<char[]>(data_size);
pointer = mem.get();
send_buffer->add(std::move(mem), data_size);
connection.set_raw_read(data_size);
}
void DWConnection::on_data_chunk(const char *buffer, size_t size)
{
memcpy(pointer, buffer, size);
pointer += size;
}
void DWConnection::on_data_finish()
{
llog->debug("Resending data to client");
worker.server.get_client_connection().send_buffer(send_buffer.release());
}
void DWConnection::on_close()
{
llog->debug("Worker closing data connection from {}", connection.get_peername());
auto& connections = worker.connections;
for (auto i = connections.begin(); i != connections.begin(); i++) {
if ((*i).get() == this) {
connections.erase(i);
}
}
assert(0);
}
#ifndef LIBLOOM_DUMMYWORKER_H
#define LIBLOOM_DUMMYWORKER_H
#include <string>
#include <uv.h>
#include <memory>
#include <vector>
#include <libloom/connection.h>
class Server;
class DWConnection;
class DummyWorker
{
friend class DWConnection;
public:
DummyWorker(Server &server);
void start_listen();
std::string get_address() const;
int get_listen_port() const {
return listen_port;
}
protected:
Server &server;
std::vector<std::unique_ptr<DWConnection>> connections;
uv_tcp_t listen_socket;
int listen_port;
static void _on_new_connection(uv_stream_t *stream, int status);
};
class DWConnection : public loom::SimpleConnectionCallback
{
public:
DWConnection(DummyWorker &worker);
~DWConnection();
std::string get_peername() {
return connection.get_peername();
}
void accept(uv_tcp_t *listen_socket) {
connection.accept(listen_socket);
}
protected:
void on_message(const char *buffer, size_t size);
void on_data_chunk(const char *buffer, size_t size);
void on_data_finish();
void on_close();
DummyWorker &worker;
std::unique_ptr<loom::SendBuffer> send_buffer;
char *pointer;
size_t size;
bool registered;
};
#endif // LIBLOOM_DUMMYWORKER_H
#include "basictasks.h"
#include "libloom/databuilder.h"
#include "libloom/rawdata.h"
#include "libloom/data/rawdata.h"
#include "libloom/data/externfile.h"
#include <string.h>
......@@ -45,3 +46,14 @@ void MergeTask::start(DataVector &inputs) {
}
finish(std::move(output));
}
OpenTask::OpenTask(Worker &worker, std::unique_ptr<Task> task)
: TaskInstance(worker, std::move(task))
{
}
void OpenTask::start(DataVector &inputs)
{
finish(std::make_unique<ExternFile>(task->get_config()));
}
......@@ -18,5 +18,12 @@ public:
void start(loom::DataVector &inputs);
};
class OpenTask : public loom::TaskInstance
{
public:
OpenTask(loom::Worker &worker, std::unique_ptr<loom::Task> task);
void start(loom::DataVector &inputs);
};
#endif // LOOM_WORKER_BASICTASKS_H
......@@ -116,6 +116,8 @@ int main(int argc, char **argv)
std::make_unique<SimpleTaskFactory<ConstTask>>("const"));
worker.add_task_factory(
std::make_unique<SimpleTaskFactory<MergeTask>>("merge"));
worker.add_task_factory(
std::make_unique<SimpleTaskFactory<OpenTask>>("open"));
worker.set_cpus(config.cpus);
//worker.add_task_factory<MergeTask>("merge");
uv_run(&loop, UV_RUN_DEFAULT);
......
......@@ -2,7 +2,7 @@
#include "runtask.h"
#include "libloom/worker.h"
#include "libloom/rawdata.h"
#include "libloom/data/rawdata.h"
#include "libloom/log.h"
#include "loomrun.pb.h"
......
from loomenv import loom_env, LOOM_TESTPROG # noqa
from loomenv import loom_env, LOOM_TESTPROG, LOOM_TEST_DATA_DIR # noqa
from datetime import datetime
import os
FILE1 = os.path.join(LOOM_TEST_DATA_DIR, "file1")
FILE2 = os.path.join(LOOM_TEST_DATA_DIR, "file2")
dir(loom_env) # silence flake8
......@@ -142,3 +146,16 @@ def test_run_files(loom_env):
result = loom_env.submit(p, c1)
assert result == "cdef" * 100
def test_open(loom_env):
p = loom_env.plan()
a = p.task_open(FILE1)
b = p.task_open(FILE2)
c = p.task_merge((a, b))
loom_env.start(1)
result = loom_env.submit(p, c)
expect = ("This is file 1\n" +
"\n".join("Line {}".format(i) for i in xrange(1, 13)) +
"\n")
assert result == expect
......@@ -17,6 +17,7 @@ LOOM_CLIENT = os.path.join(LOOM_ROOT, "src")
LOOM_TEST_BUILD_DIR = os.path.join(LOOM_TESTDIR, "build")
LOOM_TESTPROG = os.path.join(LOOM_TESTDIR, "testprog.py")
LOOM_TEST_DATA_DIR = os.path.join(LOOM_TESTDIR, "testdata")
sys.path.insert(0, LOOM_CLIENT)
......
Line 1
Line 2
Line 3
Line 4
Line 5
Line 6
Line 7
Line 8
Line 9
Line 10
Line 11
Line 12
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment