412 lines
11 KiB
C++
412 lines
11 KiB
C++
#include "matador/net/reactor.hpp"
|
|
#include "matador/net/handler.hpp"
|
|
|
|
#include "matador/logger/log_manager.hpp"
|
|
|
|
#include <algorithm>
|
|
#include <limits>
|
|
#include <cerrno>
|
|
#include <ctime>
|
|
#include <iostream>
|
|
|
|
namespace matador {
|
|
|
|
reactor::reactor()
|
|
: sentinel_(std::shared_ptr<handler>(nullptr))
|
|
, log_(create_logger("Reactor"))
|
|
, thread_pool_(4, [this]() { handle_events(); })
|
|
{
|
|
}
|
|
reactor::~reactor()
|
|
{
|
|
log_.debug("destroying reactor");
|
|
thread_pool_.shutdown();
|
|
}
|
|
|
|
void reactor::register_handler(const handler_ptr& h, event_type et)
|
|
{
|
|
h->register_reactor(this);
|
|
h->open();
|
|
|
|
std::lock_guard<std::mutex> l(mutex_);
|
|
auto it = find_handler_type(h);
|
|
|
|
if (it == handlers_.end()) {
|
|
handlers_.emplace_back(h, et);
|
|
} else if (it->first != h) {
|
|
throw std::logic_error("given handler isn't expected handler");
|
|
} else {
|
|
it->second = it->second | et;
|
|
}
|
|
interrupt_without_lock();
|
|
}
|
|
|
|
void reactor::unregister_handler(const handler_ptr& h, event_type)
|
|
{
|
|
std::lock_guard<std::mutex> l(mutex_);
|
|
auto it = find_handler_type(h);
|
|
|
|
if (it != handlers_.end()) {
|
|
(*it).first->close();
|
|
handlers_.erase(it);
|
|
}
|
|
interrupt_without_lock();
|
|
}
|
|
|
|
void reactor::schedule_timer(const std::shared_ptr<handler>& h, time_t offset, time_t interval)
|
|
{
|
|
h->register_reactor(this);
|
|
|
|
std::lock_guard<std::mutex> l(mutex_);
|
|
auto it = find_handler_type(h);
|
|
|
|
if (it == handlers_.end()) {
|
|
handlers_.emplace_back(h, event_type::TIMEOUT_MASK);
|
|
}
|
|
|
|
h->schedule(offset, interval);
|
|
}
|
|
|
|
void reactor::cancel_timer(const std::shared_ptr<handler>& h)
|
|
{
|
|
std::lock_guard<std::mutex> l(mutex_);
|
|
auto it = find_handler_type(h);
|
|
|
|
if (it != handlers_.end()) {
|
|
handlers_.erase(it);
|
|
}
|
|
|
|
h->cancel_timer();
|
|
}
|
|
|
|
void reactor::run()
|
|
{
|
|
// log_.info("start dispatching all clients");
|
|
thread_pool_.start();
|
|
|
|
{
|
|
// log_.info("waiting for reactor shutdown");
|
|
std::unique_lock<std::mutex> l(mutex_);
|
|
shutdown_.wait(l, [this]() { return shutdown_requested_.load(); });
|
|
cleanup();
|
|
}
|
|
// log_.info("all clients dispatched; shutting down");
|
|
thread_pool_.stop();
|
|
}
|
|
|
|
void reactor::handle_events()
|
|
{
|
|
// std::cout << this << " start handle events\n" << std::flush;
|
|
|
|
// log_.info("handle events");
|
|
|
|
running_ = true;
|
|
time_t timeout;
|
|
select_fdsets fd_sets;
|
|
prepare_select_bits(timeout, fd_sets);
|
|
|
|
// std::cout << this << " fd sets r: " << fd_sets.read_set().count() << ", w: " << fd_sets.write_set().count() << ", e: " << fd_sets.except_set().count() << ", max: " << fd_sets.maxp1() << "\n" << std::flush;
|
|
|
|
// log_.debug("fds [r: %d, w: %d, e: %d]",
|
|
// fdsets_.read_set().count(),
|
|
// fdsets_.write_set().count(),
|
|
// fdsets_.except_set().count());
|
|
|
|
// if (timeout != (std::numeric_limits<time_t>::max)()) {
|
|
// log_.debug("next timeout in %d sec", timeout);
|
|
// }
|
|
struct timeval tselect{};
|
|
struct timeval* p = nullptr;
|
|
if (timeout < (std::numeric_limits<time_t>::max)()) {
|
|
tselect.tv_sec = timeout;
|
|
tselect.tv_usec = 0;
|
|
p = &tselect;
|
|
// std::cout << this << " next timeout in " << p->tv_sec << " seconds\n" << std::flush;
|
|
}
|
|
|
|
if (!has_clients_to_handle(timeout, fd_sets)) {
|
|
// std::cout << this << " no clients to handle; returning\n" << std::flush;
|
|
// log_.info("no clients to handle, exiting");
|
|
return;
|
|
}
|
|
|
|
int ret;
|
|
while ((ret = select(p, fd_sets)) < 0) {
|
|
// std::cout << this << " select returned with error " << ret << "\n" << std::flush;
|
|
if(errno != EINTR) {
|
|
char error_buffer[1024];
|
|
log_.warn("select failed: %s", os::strerror(errno, error_buffer, 1024));
|
|
shutdown();
|
|
} else {
|
|
return;
|
|
}
|
|
}
|
|
|
|
// std::cout << this << " select returned with active requests " << ret << "\n" << std::flush;
|
|
|
|
bool interrupted = is_interrupted(fd_sets);
|
|
|
|
if (interrupted) {
|
|
if (!shutdown_requested_) {
|
|
// std::cout << this << " reactor was interrupted - promote new leader\n" << std::flush;
|
|
// log_.info("reactor was interrupted");
|
|
thread_pool_.promote_new_leader();
|
|
return;
|
|
} else {
|
|
// std::cout << this << " reactor was interrupted for shutdown\n" << std::flush;
|
|
// log_.info("shutting down");
|
|
// cleanup();
|
|
shutdown_.notify_one();
|
|
return;
|
|
}
|
|
}
|
|
|
|
time_t now = ::time(nullptr);
|
|
t_handler_type handler_type = resolve_next_handler(now, fd_sets);
|
|
|
|
if (handler_type.first) {
|
|
deactivate_handler(handler_type.first, handler_type.second);
|
|
// std::cout << this << " handling client " << handler_type.first->name() << " - promoting new leader\n" << std::flush;
|
|
|
|
thread_pool_.promote_new_leader();
|
|
// log_.info("start handling event");
|
|
// handle event
|
|
if (handler_type.second == event_type::WRITE_MASK) {
|
|
on_write_mask(handler_type.first);
|
|
} else if (handler_type.second == event_type::READ_MASK) {
|
|
on_read_mask(handler_type.first);
|
|
} else if (handler_type.second == event_type::TIMEOUT_MASK) {
|
|
on_timeout(handler_type.first, now);
|
|
} else {
|
|
// log_.info("unknown event type");
|
|
}
|
|
activate_handler(handler_type.first, handler_type.second);
|
|
remove_deleted();
|
|
} else {
|
|
// no handler found
|
|
// log_.info("no handler found");
|
|
thread_pool_.promote_new_leader();
|
|
}
|
|
}
|
|
|
|
void reactor::shutdown()
|
|
{
|
|
if (!is_running()) {
|
|
return;
|
|
}
|
|
// shutdown the reactor properly
|
|
log_.info("shutting down reactor");
|
|
shutdown_requested_ = true;
|
|
interrupt();
|
|
}
|
|
|
|
bool reactor::is_running() const
|
|
{
|
|
return running_;
|
|
}
|
|
|
|
void reactor::prepare_select_bits(time_t& timeout, select_fdsets& fd_sets) const
|
|
{
|
|
std::lock_guard<std::mutex> l(mutex_);
|
|
fd_sets.reset();
|
|
time_t now = ::time(nullptr);
|
|
timeout = (std::numeric_limits<time_t>::max)();
|
|
|
|
// set interrupter fd
|
|
fd_sets.read_set().set(interrupter_.socket_id());
|
|
|
|
for (const auto &h : handlers_) {
|
|
if (h.first == nullptr) {
|
|
continue;
|
|
}
|
|
if (h.first->is_ready_read() && is_event_type_set(h.second, event_type::READ_MASK)) {
|
|
fd_sets.read_set().set(h.first->handle());
|
|
}
|
|
if (h.first->is_ready_write() && is_event_type_set(h.second, event_type::WRITE_MASK)) {
|
|
fd_sets.write_set().set(h.first->handle());
|
|
}
|
|
if (h.first->next_timeout() > 0 && is_event_type_set(h.second, event_type::TIMEOUT_MASK)) {
|
|
timeout = (std::min)(timeout, h.first->next_timeout() <= now ? 0 : (h.first->next_timeout() - now));
|
|
}
|
|
}
|
|
}
|
|
|
|
void reactor::remove_deleted()
|
|
{
|
|
while (!handlers_to_delete_.empty()) {
|
|
auto h = handlers_to_delete_.front();
|
|
handlers_to_delete_.pop_front();
|
|
auto fi = std::find_if(handlers_.begin(), handlers_.end(), [&h](const t_handler_type &ht) {
|
|
return ht.first.get() == h.get();
|
|
});
|
|
|
|
if (fi != handlers_.end()) {
|
|
log_.debug("removing handler %d", fi->first->handle());
|
|
handlers_.erase(fi);
|
|
}
|
|
}
|
|
}
|
|
|
|
void reactor::cleanup()
|
|
{
|
|
while (!handlers_.empty()) {
|
|
auto hndlr = handlers_.front();
|
|
handlers_.pop_front();
|
|
hndlr.first->close();
|
|
}
|
|
}
|
|
|
|
int reactor::select(struct timeval *timeout, select_fdsets& fd_sets)
|
|
{
|
|
log_.debug("calling select; waiting for io events");
|
|
return ::select(
|
|
static_cast<int>(fd_sets.maxp1()) + 1,
|
|
fd_sets.read_set().get(),
|
|
fd_sets.write_set().get(),
|
|
fd_sets.except_set().get(),
|
|
timeout
|
|
);
|
|
}
|
|
|
|
|
|
//void reactor::process_handler(int /*ret*/)
|
|
//{
|
|
// handlers_.emplace_back(sentinel_, event_type::NONE_MASK);
|
|
// time_t now = ::time(nullptr);
|
|
// while (handlers_.front().first != nullptr) {
|
|
// auto h = handlers_.front();
|
|
// handlers_.pop_front();
|
|
// handlers_.push_back(h);
|
|
// // check for read/accept
|
|
// if (h.first->handle() > 0 && fdsets_.write_set().is_set(h.first->handle())) {
|
|
// on_write_mask(h.first);
|
|
// }
|
|
// if (h.first->handle() > 0 && fdsets_.read_set().is_set(h.first->handle())) {
|
|
// on_read_mask(h.first);
|
|
// }
|
|
// if (h.first->next_timeout() > 0 && h.first->next_timeout() <= now) {
|
|
// on_timeout(h.first, now);
|
|
// }
|
|
// }
|
|
// handlers_.pop_front();
|
|
//}
|
|
|
|
reactor::t_handler_type reactor::resolve_next_handler(time_t now, select_fdsets& fd_sets)
|
|
{
|
|
std::lock_guard<std::mutex> l(mutex_);
|
|
for (auto &h : handlers_) {
|
|
if (h.first->handle() > 0 && fd_sets.write_set().is_set(h.first->handle())) {
|
|
return std::make_pair(h.first, event_type::WRITE_MASK);
|
|
}
|
|
if (h.first->handle() > 0 && fd_sets.read_set().is_set(h.first->handle())) {
|
|
return std::make_pair(h.first, event_type::READ_MASK);
|
|
}
|
|
if (h.first->next_timeout() > 0 && h.first->next_timeout() <= now) {
|
|
return std::make_pair(h.first, event_type::TIMEOUT_MASK);
|
|
}
|
|
}
|
|
return std::make_pair(nullptr, event_type::NONE_MASK);
|
|
}
|
|
|
|
void reactor::on_read_mask(const handler_ptr& handler)
|
|
{
|
|
// log_.debug("read bit for handler %d is set; handle input", h->handle());
|
|
// std::cout << this << " (handler " << h.get() << "): handle read\n" << std::flush;
|
|
handler->on_input();
|
|
}
|
|
|
|
void reactor::on_write_mask(const handler_ptr& handler)
|
|
{
|
|
// log_.debug("write bit for handler %d is set; handle output", h->handle());
|
|
// std::cout << this << " (handler " << h.get() << "): handle write\n" << std::flush;
|
|
handler->on_output();
|
|
}
|
|
|
|
void reactor::on_except_mask(const handler_ptr& /*handler*/)
|
|
{
|
|
// std::cout << this << " (handler " << h.get() << "): handle exception\n" << std::flush;
|
|
|
|
}
|
|
|
|
void reactor::on_timeout(const handler_ptr &h, time_t now)
|
|
{
|
|
// log_.debug("timeout expired for handler %d; handle timeout", h->handle());
|
|
// std::cout << this << " (handler " << h.get() << "): handle timeout\n" << std::flush;
|
|
h->calculate_next_timeout(now);
|
|
h->on_timeout();
|
|
}
|
|
|
|
select_fdsets reactor::fdsets() const
|
|
{
|
|
time_t timeout;
|
|
select_fdsets fd_sets;
|
|
prepare_select_bits(timeout, fd_sets);
|
|
return fd_sets;
|
|
}
|
|
|
|
void reactor::mark_handler_for_delete(const handler_ptr& h)
|
|
{
|
|
std::lock_guard<std::mutex> l(mutex_);
|
|
handlers_to_delete_.push_back(h);
|
|
}
|
|
|
|
bool reactor::is_interrupted(select_fdsets& fd_sets)
|
|
{
|
|
std::lock_guard<std::mutex> l(mutex_);
|
|
if (fd_sets.read_set().is_set(interrupter_.socket_id())) {
|
|
log_.debug("interrupt byte received; resetting interrupter");
|
|
if (shutdown_requested_) {
|
|
running_ = false;
|
|
}
|
|
return interrupter_.reset();
|
|
}
|
|
return false;
|
|
}
|
|
|
|
bool reactor::has_clients_to_handle(time_t timeout, select_fdsets& fd_sets) const {
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
|
return fd_sets.maxp1() > 0 || timeout != (std::numeric_limits<time_t>::max)();
|
|
}
|
|
|
|
std::list<reactor::t_handler_type>::iterator reactor::find_handler_type(const reactor::handler_ptr &h)
|
|
{
|
|
return std::find_if(handlers_.begin(), handlers_.end(), [&h](const t_handler_type &ht) {
|
|
return ht.first.get() == h.get();
|
|
});
|
|
}
|
|
|
|
void reactor::activate_handler(const reactor::handler_ptr &h, event_type ev)
|
|
{
|
|
std::lock_guard<std::mutex> l(mutex_);
|
|
auto it = find_handler_type(h);
|
|
if (it == handlers_.end()) {
|
|
return;
|
|
}
|
|
it->second |= ev;
|
|
}
|
|
|
|
void reactor::deactivate_handler(const reactor::handler_ptr &h, event_type ev)
|
|
{
|
|
std::lock_guard<std::mutex> l(mutex_);
|
|
auto it = find_handler_type(h);
|
|
if (it == handlers_.end()) {
|
|
return;
|
|
}
|
|
it->second &= ~ev;
|
|
}
|
|
|
|
void reactor::interrupt()
|
|
{
|
|
log_.trace("interrupting reactor");
|
|
std::lock_guard<std::mutex> l(mutex_);
|
|
interrupter_.interrupt();
|
|
}
|
|
|
|
void reactor::interrupt_without_lock()
|
|
{
|
|
log_.trace("interrupting reactor");
|
|
interrupter_.interrupt();
|
|
}
|
|
|
|
}
|