query/source/core/net/stream_handler.cpp

154 lines
4.0 KiB
C++

#include "matador/net/stream_handler.hpp"
#include "matador/net/handler_creator.hpp"
#include "matador/net/reactor.hpp"
#include "matador/utils/buffer_view.hpp"
#include "matador/logger/log_manager.hpp"
#include <cerrno>
#include <chrono>
#include <iostream>
namespace matador {
stream_handler::stream_handler(tcp::socket sock, tcp::peer endpoint, handler_creator *creator, t_init_handler init_handler)
: log_(create_logger("StreamHandler"))
, stream_(std::move(sock))
, endpoint_(std::move(endpoint))
, name_(endpoint.to_string() + " (fd: " + std::to_string(stream_.id()) + ")")
, creator_(creator)
, init_handler_(std::move(init_handler))
{
log_.debug("%s: created stream handler", name_.c_str());
}
void stream_handler::open()
{
init_handler_(endpoint_, *this);
}
socket_type stream_handler::handle() const
{
return stream_.id();
}
void stream_handler::on_input()
{
auto len = stream_.receive(read_buffer_);
log_.trace("%s: read %d bytes", name().c_str(), len);
if (len == 0) {
on_close();
} else if (len < 0 && errno != EWOULDBLOCK) {
char error_buffer[1024];
log_.error("%s: error on read: %s", name().c_str(), os::strerror(errno, error_buffer, 1024));
is_ready_to_read_ = false;
on_read_(static_cast<long>(len), static_cast<long>(len));
on_close();
} else {
log_.debug("%s: received %d bytes (data: %s)", name().c_str(), len, read_buffer_.data());
read_buffer_.bump(len);
is_ready_to_read_ = false;
on_read_(0, static_cast<long>(len));
}
}
void stream_handler::on_output()
{
ssize_t bytes_total = 0;
auto start = std::chrono::high_resolution_clock::now();
while (!write_buffers_.empty()) {
buffer_view &bv = write_buffers_.front();
auto len = stream_.send(bv);
log_.trace("%s: sent %d bytes", name().c_str(), len);
if (len == 0) {
on_close();
} else if (len < 0 && errno != EWOULDBLOCK) {
char error_buffer[1024];
log_.error("%s: error on write: %s", name().c_str(), os::strerror(errno, error_buffer, 1024));
on_close();
is_ready_to_write_ = false;
on_write_(static_cast<long>(len), static_cast<long>(len));
} else if (len < 0 && errno == EWOULDBLOCK) {
log_.debug("%s: sent %d bytes (blocked)", name().c_str(), bytes_total);
} else {
bytes_total += len;
bv.bump(len);
if (bv.full()) {
write_buffers_.pop_front();
}
}
}
auto end = std::chrono::high_resolution_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
log_.debug("%s: sent %d bytes (%dms)", name().c_str(), bytes_total, elapsed);
is_ready_to_write_ = false;
on_write_(0, static_cast<long>(bytes_total));
}
void stream_handler::on_close()
{
log_.debug("%s: closing connection", name().c_str(), handle());
stream_.close();
creator_->notify_close( this );
auto self = shared_from_this();
get_reactor()->mark_handler_for_delete(self);
get_reactor()->unregister_handler(self, event_type::READ_WRITE_MASK);
}
void stream_handler::close()
{
if (!stream_.is_open()) {
return;
}
log_.debug("%s: closing connection", name().c_str(), handle());
stream_.close();
creator_->notify_close( this );
}
bool stream_handler::is_ready_write() const
{
return is_ready_to_write_ && !write_buffers_.empty();
}
bool stream_handler::is_ready_read() const
{
return is_ready_to_read_ && !read_buffer_.full();
}
void stream_handler::read(buffer_view buf, t_read_handler read_handler)
{
on_read_ = std::move(read_handler);
read_buffer_ = std::move(buf);
is_ready_to_read_ = true;
get_reactor()->interrupt();
}
void stream_handler::write(std::list<buffer_view> buffers, io_stream::t_write_handler write_handler)
{
on_write_ = std::move(write_handler);
write_buffers_ = std::move(buffers);
is_ready_to_write_ = true;
get_reactor()->interrupt();
}
void stream_handler::close_stream()
{
on_close();
}
tcp::socket &stream_handler::stream()
{
return stream_;
}
std::string stream_handler::name() const
{
return name_;
}
}