Commit 7e2afba4 authored by Stanislav Bohm's avatar Stanislav Bohm

ENH: Introduced SendBuffer class

parent a3b5327b
......@@ -15,8 +15,8 @@ add_library(libloom
interconnect.cpp
task.cpp
task.h
fileservice.cpp
fileservice.h
sendbuffer.h
sendbuffer.cpp
loomcomm.pb.h
loomcomm.pb.cc
loomplan.pb.h
......
#include "connection.h"
#include "utils.h"
#include "log.h" // DEBUG
#include <string.h>
using namespace loom;
struct SendBuffer {
uv_write_t write_req;
std::unique_ptr<char[]> data;
};
Connection::Connection(ConnectionCallback *callback, uv_loop_t *loop)
: callback(callback),
state(ConnectionNew),
......@@ -121,28 +116,25 @@ void Connection::connect(std::string host, int port)
UV_CHECK(uv_tcp_connect(connect, &socket, (const struct sockaddr *)&dest, _on_connection));
}
void Connection::_on_fbb_write(uv_write_t *write_req, int status)
void Connection::_on_write(uv_write_t *write_req, int status)
{
UV_CHECK(status);
SendBuffer *buffer = static_cast<SendBuffer *>(write_req->data);
delete buffer;
buffer->on_finish(status);
}
void Connection::send_message(google::protobuf::MessageLite &message)
{
SendBuffer *buffer = new SendBuffer;
buffer->write_req.data = buffer;
uint32_t size = message.ByteSize();
buffer->data = std::make_unique<char[]>(size + sizeof(size));
uv_buf_t buf;
buf.base = buffer->data.get();
buf.len = size + sizeof(size);
uint32_t *size_ptr = reinterpret_cast<uint32_t *>(buf.base);
*size_ptr = size;
message.SerializeToArray(buf.base + sizeof(size), size);
UV_CHECK(uv_write(&buffer->write_req, (uv_stream_t *) &socket, &buf, 1, _on_fbb_write));
SendBuffer *buffer = new SendBuffer();
buffer->add(message);
send_buffer(buffer);
}
void Connection::send_buffer(SendBuffer *buffer)
{
uv_buf_t *bufs = buffer->get_uv_bufs();
size_t count = buffer->get_uv_bufs_count();
UV_CHECK(uv_write(&buffer->request, (uv_stream_t *) &socket, bufs, count, _on_write));
}
void Connection::_on_read(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
......
#ifndef LOOM_CONNECTION_H
#define LOOM_CONNECTION_H
#include "loomcomm.pb.h"
#include "utils.h"
#include "sendbuffer.h"
#include <uv.h>
#include <string>
#include <functional>
#include <memory>
#include <assert.h>
namespace loom {
......@@ -63,6 +64,7 @@ public:
void connect(std::string host, int port);
void send_message(::google::protobuf::MessageLite &message);
void send_buffer(SendBuffer *buffer);
void close();
void close_and_discard_remaining_data();
......@@ -83,7 +85,7 @@ protected:
private:
static void _on_connection(uv_connect_t *connect, int status);
static void _on_fbb_write(uv_write_t *write_req, int status);
static void _on_write(uv_write_t *write_req, int status);
static void _on_read(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf);
static void _on_close(uv_handle_t *handle);
static void _buf_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
......
//#include "fileservice.h"
//#include "worker.h"
//#include "utils.h"
//#include "log.h"
//#include <sys/stat.h>
//using namespace loom;
//FileService::FileService(FileServiceCallback &callback,
// const std::string &filename)
// : callback(callback), filename(filename)
//{
// request1.data = this;
// request2.data = this;
//}
//void FileService::start()
//{
// llog->debug("Opening file service: {}", filename);
// uv_loop_t *loop = callback.get_worker().get_loop();
// UV_CHECK(uv_fs_open(
// loop, &request1, filename.c_str(), O_WRONLY, 0, _on_open));
//}
//void FileService::_on_open(uv_fs_t *request)
//{
// FileService *service = static_cast<FileService*>(request->data);
// uv_buf_t buf = service->input_data->get_uv_buf(worker);
// uv_loop_t *loop = service->callback.get_worker().get_loop();
// UV_CHECK(uv_fs_write(loop,
// &service->request1,
// request->file,
// &buf, 1, 0, _on_write));
//}
//void FileService::_on_write(uv_fs_t *request)
//{
// FileService *service = static_cast<FileService*>(request->data);
// service->input_data.reset();
// service->callback.on_finish();
//}
//#ifndef LIBLOOM_FILESERVICE_H
//#define LIBLOOM_FILESERVICE_H
//#include <uv.h>
//#include <string>
//#include <memory>
//namespace loom {
//class Worker;
//class Data;
//class FileServiceCallback
//{
//public:
// virtual ~FileServiceCallback() {}
// virtual void on_finish() = 0;
// virtual Worker& get_worker() = 0;
//};
//class FileService
//{
//public:
// FileService(FileServiceCallback &callback, const std::string &filename);
// void set_input_data(std::shared_ptr<Data> &data) {
// input_data = data;
// }
// void start();
//protected:
// FileServiceCallback &callback;
// std::string filename;
// std::shared_ptr<Data> input_data;
// uv_fs_t request1;
// uv_fs_t request2;
// static void _on_open(uv_fs_t *request);
// static void _on_write(uv_fs_t *request);
//};
//}
//#endif
......@@ -17,7 +17,7 @@ InterConnection::InterConnection(Worker &worker)
InterConnection::~InterConnection()
{
assert(send_records.size() == 0);
}
void InterConnection::on_connection()
......@@ -28,9 +28,10 @@ void InterConnection::on_connection()
msg.set_port(worker.get_listen_port());
send_message(msg);
for (auto& record : send_records) {
_send(*record);
for (auto& buffer : early_sends) {
connection.send_buffer(buffer.release());
}
early_sends.clear();
}
void InterConnection::on_close()
......@@ -78,52 +79,29 @@ void InterConnection::on_data_finish()
void InterConnection::send(std::shared_ptr<Data> &data)
{
loomcomm::Data msg;
msg.set_id(data->get_id());
msg.set_size(data->get_size());
size_t msg_size = msg.ByteSize();
//std::cout << "MSG =" << msg_size << std::endl;
auto msg_data = std::make_unique<char[]>(msg_size + sizeof(uint32_t));
uint32_t *size_ptr = reinterpret_cast<uint32_t *>(msg_data.get());
*size_ptr = msg_size;
msg.SerializeToArray(msg_data.get() + sizeof(uint32_t), msg_size);
auto record = std::make_unique<SendRecord>();
record->request.data = this;
record->raw_message = std::move(msg_data);
record->raw_message_size = msg_size + sizeof(uint32_t);
record->data = data;
send_records.push_back(std::move(record));
SendBuffer *buffer = new SendBuffer();
buffer->add(msg);
buffer->add(data, data->get_data(worker), data->get_size());
Connection::State state = connection.get_state();
assert(state == Connection::ConnectionOpen ||
state == Connection::ConnectionConnecting);
if (state == Connection::ConnectionOpen) {
_send(*send_records[send_records.size() - 1].get());
connection.send_buffer(buffer);
} else {
early_sends.push_back(std::unique_ptr<SendBuffer>(buffer));
}
}
void InterConnection::_send(InterConnection::SendRecord &record)
void InterConnection::send(std::unique_ptr<SendBuffer> buffer)
{
llog->debug("Sending data id={} size={}",
record.data->get_id(), record.data->get_size());
uv_buf_t bufs[2];
bufs[0].base = record.raw_message.get();
bufs[0].len = record.raw_message_size;
bufs[1] = record.data->get_uv_buf(worker);
connection.send(&record.request, bufs, 2, _on_write);
}
void InterConnection::_on_write(uv_write_t *write_req, int status)
{
UV_CHECK(status);
InterConnection *connection = static_cast<InterConnection*>(write_req->data);
assert(connection->send_records.size());
connection->send_records.erase(connection->send_records.begin());
}
std::string InterConnection::make_address(const std::string &host, int port)
......
......@@ -11,6 +11,7 @@ namespace loom {
class Worker;
class DataBuilder;
class SendBuffer;
class InterConnection : public SimpleConnectionCallback
{
......@@ -19,6 +20,7 @@ public:
~InterConnection();
void send(std::shared_ptr<Data> &data);
void send(std::unique_ptr<SendBuffer> buffer);
void accept(uv_tcp_t *listen_socket) {
connection.accept(listen_socket);
}
......@@ -36,14 +38,7 @@ public:
protected:
struct SendRecord {
uv_write_t request;
std::unique_ptr<char[]> raw_message;
size_t raw_message_size;
std::shared_ptr<Data> data;
};
void _send(SendRecord &record);
void _send(SendBuffer &buffer);
void on_message(const char *buffer, size_t size);
void on_data_chunk(const char *buffer, size_t size);
......@@ -54,12 +49,11 @@ protected:
Worker &worker;
std::string address;
std::vector<std::unique_ptr<SendRecord>> send_records;
std::unique_ptr<DataBuilder> data_builder;
static void _on_write(uv_write_t *write_req, int status);
static std::string make_address(const std::string &host, int port);
std::vector<std::unique_ptr<SendBuffer>> early_sends;
};
}
......
#include "sendbuffer.h"
#include "utils.h"
#include "loomcomm.pb.h"
using namespace loom;
SendBuffer::~SendBuffer()
{
}
void SendBuffer::on_finish(int status)
{
UV_CHECK(status);
delete this;
}
void SendBuffer::add(google::protobuf::MessageLite &message)
{
uint32_t size = message.ByteSize();
auto data = std::make_unique<char[]>(size + sizeof(size));
uint32_t *size_ptr = reinterpret_cast<uint32_t *>(data.get());
*size_ptr = size;
message.SerializeToArray(data.get() + sizeof(size), size);
add(std::move(data), size + sizeof(size));
}
#ifndef LIBLOOM_SENDBUFFER_H
#define LIBLOOM_SENDBUFFER_H
#include <uv.h>
#include <memory>
#include <vector>
namespace google {
namespace protobuf {
class MessageLite;
}
}
namespace loom {
class Data;
class SendBuffer {
public:
SendBuffer() {
request.data = this;
}
virtual ~SendBuffer();
virtual void on_finish(int status);
uv_buf_t *get_uv_bufs() {
return &bufs[0];
}
size_t get_uv_bufs_count() {
return bufs.size();
}
void add(std::unique_ptr<char[]> data, size_t size) {
bufs.emplace_back(uv_buf_t {data.get(), size});
raw_memory.push_back(std::move(data));
}
void add(std::shared_ptr<Data> &data, char *data_ptr, size_t size) {
bufs.emplace_back(uv_buf_t {data_ptr, size});
data_vector.push_back(data);
}
void add(::google::protobuf::MessageLite &message);
uv_write_t request;
protected:
std::vector<uv_buf_t> bufs;
std::vector<std::unique_ptr<char[]>> raw_memory;
std::vector<std::shared_ptr<Data>> data_vector;
};
}
#endif // LIBLOOM_SENDBUFFER_H
......@@ -3,6 +3,10 @@
#include "libloom/connection.h"
namespace loom {
class SendBuffer;
}
class Server;
class ClientConnection : public loom::ConnectionCallback {
......@@ -13,8 +17,12 @@ public:
void on_message(const char *buffer, size_t size);
void on_close();
void send(uv_write_t *request, uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb) {
/*void send(uv_write_t *request, uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb) {
connection->send(request, bufs, nbufs, cb);
}*/
void send_buffer(loom::SendBuffer *buffer) {
connection->send_buffer(buffer);
}
protected:
......
#include "resendtask.h"
#include "server.h"
#include "libloom/loomcomm.pb.h"
#include "libloom/log.h"
using namespace loom;
ResendTask::ResendTask(Server &server, std::unique_ptr<Task> task)
: TaskInstance(server.get_dummy_worker(), std::move(task)), server(server)
: TaskInstance(server.get_dummy_worker(),std::move(task)),
server(server), buffer(*this)
{
request.data = this;
}
void ResendTask::start(DataVector &input_data)
{
assert(input_data.size() == 1);
data = *input_data[0];
auto data = *input_data[0];
llog->debug("Resending data id={} to client", data->get_id());
loomcomm::Data msg;
msg.set_id(data->get_id());
msg.set_size(data->get_size());
llog->debug("Resending data id={} to client", data->get_id());
buffer.add(msg);
buffer.add(data, data->get_data(worker), data->get_size());
auto size = msg.ByteSize();
message_data = std::make_unique<char[]>(size + sizeof(uint32_t));
uint32_t *size_ptr = (uint32_t*) message_data.get();
*size_ptr = size;
msg.SerializePartialToArray(message_data.get() + sizeof(uint32_t), size);
message_size = size;
uv_buf_t bufs[2];
bufs[0].base = message_data.get();
bufs[0].len = message_size + sizeof(uint32_t);
bufs[1] = data->get_uv_buf(worker);
auto &connection = server.get_client_connection();
connection.send(&request, bufs, 2, _on_write);
}
void ResendTask::_on_write(uv_write_t *write_req, int status)
{
UV_CHECK(status);
ResendTask *task = static_cast<ResendTask*>(write_req->data);
llog->debug("Resend task id={} finished", task->get_id());
task->finish_without_data();
connection.send_buffer(&buffer);
}
ResendTaskFactory::ResendTaskFactory(Server &server)
......@@ -58,3 +40,15 @@ std::unique_ptr<TaskInstance> ResendTaskFactory::make_instance(Worker &worker, s
{
return std::make_unique<ResendTask>(server, std::move(task));
}
ResendTask::_SendBuffer::_SendBuffer(ResendTask &task) : task(task)
{
}
void ResendTask::_SendBuffer::on_finish(int status)
{
UV_CHECK(status);
llog->debug("Resend task id={} finished", task.get_id());
task.finish_without_data();
}
......@@ -3,6 +3,7 @@
#include "libloom/taskinstance.h"
#include "libloom/taskfactory.h"
#include "libloom/sendbuffer.h"
namespace loom {
class Data;
......@@ -13,16 +14,21 @@ class Server;
class ResendTask : public loom::TaskInstance
{
public:
class _SendBuffer : public loom::SendBuffer {
public:
_SendBuffer(ResendTask &task);
~_SendBuffer() {}
void on_finish(int status);
protected:
ResendTask &task;
};
ResendTask(Server &server, std::unique_ptr<loom::Task> task);
void start(loom::DataVector &input_data);
protected:
Server &server;
std::shared_ptr<loom::Data> data;
uv_write_t request;
std::unique_ptr<char[]> message_data;
size_t message_size;
Server &server;
_SendBuffer buffer;
static void _on_write(uv_write_t *write_req, int status);
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment