query/source/core/net/reactor.cpp

273 lines
7.1 KiB
C++

#include "matador/net/reactor.hpp"
#include "matador/net/handler.hpp"
#include "matador/logger/log_manager.hpp"
#include <system_error>
namespace matador::net {
reactor::reactor()
: log_(logger::create_logger("Reactor"))
, thread_pool_(4, [this]() { handle_events(); }) {
}
reactor::~reactor() {
if (is_running()) {
try {
shutdown();
} catch (const std::exception& e) {
log_.error("Error during shutdown: %s", e.what());
}
}
}
void reactor::register_handler(const handler_ptr& h, event_type type) {
if (!h) {
throw std::invalid_argument("Null handler");
}
std::unique_lock lock(handlers_mutex_);
auto* raw_ptr = h.get();
if (handlers_.find(raw_ptr) != handlers_.end()) {
throw std::runtime_error("Handler already registered");
}
safe_handler_operation(h, "register", [&] {
h->register_reactor(this);
h->open();
handlers_.emplace(raw_ptr, std::make_unique<HandlerEntry>(h, type));
});
interrupter_.interrupt();
}
void reactor::unregister_handler(const handler_ptr& h, event_type type) {
if (!h) return;
std::unique_lock lock(handlers_mutex_);
if (find_handler_entry(h)) {
safe_handler_operation(h, "unregister", [&] {
h->close();
remove_handler_entry(h);
});
}
interrupter_.interrupt();
}
void reactor::schedule_timer(const handler_ptr& h, time_t offset, time_t interval) {
if (!h) return;
std::unique_lock lock(handlers_mutex_);
auto* entry = find_handler_entry(h);
if (!entry) {
entry = handlers_.emplace(h.get(),
std::make_unique<HandlerEntry>(h, event_type::TIMEOUT_MASK)).first->second.get();
}
std::lock_guard<std::mutex> timer_lock(timers_mutex_);
time_t now = std::time(nullptr);
entry->next_timeout = now + offset;
entry->interval = interval;
timers_.push(TimerEvent{entry->next_timeout, h});
interrupter_.interrupt();
}
void reactor::cancel_timer(const handler_ptr& h) {
if (!h) return;
std::unique_lock lock(handlers_mutex_);
if (auto* entry = find_handler_entry(h)) {
entry->next_timeout = 0;
entry->interval = 0;
}
}
void reactor::run() {
running_ = true;
thread_pool_.start();
{
std::unique_lock lock(handlers_mutex_);
shutdown_cv_.wait(lock, [this] { return shutdown_requested_.load(); });
cleanup_deleted_handlers();
}
thread_pool_.stop();
running_ = false;
}
void reactor::handle_events() {
time_t timeout;
select_fdsets fd_sets;
prepare_select_bits(timeout, fd_sets);
if (!fd_sets.maxp1() && timeout == (std::numeric_limits<time_t>::max)()) {
return;
}
struct timeval select_timeout{};
struct timeval* timeout_ptr = nullptr;
if (timeout < (std::numeric_limits<time_t>::max)()) {
select_timeout.tv_sec = timeout;
select_timeout.tv_usec = 0;
timeout_ptr = &select_timeout;
}
try {
if (int result = perform_select(timeout_ptr, fd_sets); result < 0) {
if (errno != EINTR) {
throw std::system_error(errno, std::system_category(), "select failed");
}
return;
}
time_t now = std::time(nullptr);
if (check_interruption(fd_sets)) {
return;
}
if (process_events(fd_sets, now)) {
thread_pool_.promote_new_leader();
}
process_timers(now);
if (now - last_cleanup_ >= CLEANUP_INTERVAL.count()) {
cleanup_deleted_handlers();
last_cleanup_ = now;
}
} catch (const std::exception& e) {
log_.error("Error in handle_events: %s", e.what());
stats_.errors_++;
}
}
void reactor::shutdown() {
if (!running_) return;
log_.info("Initiating reactor shutdown");
shutdown_requested_ = true;
interrupter_.interrupt();
shutdown_cv_.notify_all();
}
void reactor::mark_handler_for_delete(const handler_ptr& h) {
if (!h) return;
std::unique_lock lock(handlers_mutex_);
if (auto* entry = find_handler_entry(h)) {
entry->marked_for_deletion = true;
}
}
void reactor::safe_handler_operation(const handler_ptr& h, const std::string& op_name, const std::function<void()>& operation) {
try {
operation();
} catch (const std::exception& e) {
log_.error("Error during %s operation: %s", op_name.c_str(), e.what());
++stats_.errors_;
throw;
}
}
reactor::HandlerEntry* reactor::find_handler_entry(const handler_ptr& h) {
const auto it = handlers_.find(h.get());
return it != handlers_.end() ? it->second.get() : nullptr;
}
void reactor::remove_handler_entry(const handler_ptr& h) {
handlers_.erase(h.get());
}
void reactor::update_handler_events(HandlerEntry* entry, const event_type ev, const bool activate) {
if (!entry) return;
if (activate) {
entry->events |= ev;
} else {
entry->events &= ~ev;
}
}
void reactor::prepare_select_bits(time_t& timeout, select_fdsets& fd_sets) const
{
std::lock_guard l(handlers_mutex_);
fd_sets.reset();
const time_t now = ::time(nullptr);
timeout = (std::numeric_limits<time_t>::max)();
// set interrupter fd
fd_sets.read_set().set(interrupter_.socket_id());
for (const auto & [hndlr, entry] : handlers_) {
if (hndlr == nullptr) {
continue;
}
if (hndlr->is_ready_read() && is_event_type_set(entry->events, event_type::READ_MASK)) {
fd_sets.read_set().set(hndlr->handle());
}
if (hndlr->is_ready_write() && is_event_type_set(entry->events, event_type::WRITE_MASK)) {
fd_sets.write_set().set(hndlr->handle());
}
if (hndlr->next_timeout() > 0 && is_event_type_set(entry->events, event_type::TIMEOUT_MASK)) {
timeout = (std::min)(timeout, hndlr->next_timeout() <= now ? 0 : (hndlr->next_timeout() - now));
}
}
}
reactor::t_handler_type reactor::resolve_next_handler(time_t now, select_fdsets& fd_sets)
{
std::lock_guard l(handlers_mutex_);
for (auto & [hndlr, entry] : handlers_) {
if (hndlr->handle() > 0 && fd_sets.write_set().is_set(hndlr->handle())) {
return std::make_pair(hndlr, event_type::WRITE_MASK);
}
if (hndlr->handle() > 0 && fd_sets.read_set().is_set(hndlr->handle())) {
return std::make_pair(hndlr, event_type::READ_MASK);
}
if (hndlr->next_timeout() > 0 && hndlr->next_timeout() <= now) {
return std::make_pair(hndlr, event_type::TIMEOUT_MASK);
}
}
return std::make_pair(nullptr, event_type::NONE_MASK);
}
select_fdsets reactor::fd_sets() const
{
time_t timeout;
select_fdsets fd_sets;
prepare_select_bits(timeout, fd_sets);
return fd_sets;
}
void reactor::activate_handler(const reactor::handler_ptr &h, event_type ev)
{
std::lock_guard l(handlers_mutex_);
const auto it = find_handler_entry(h);
if (!it) {
return;
}
it->events |= ev;
}
void reactor::deactivate_handler(const reactor::handler_ptr &h, event_type ev)
{
std::lock_guard l(handlers_mutex_);
const auto it = find_handler_entry(h);
if (!it) {
return;
}
it->events &= ~ev;
}
} // namespace matador