#include "matador/net/reactor.hpp" #include "matador/net/handler.hpp" #include "matador/logger/log_manager.hpp" #include namespace matador::net { reactor::reactor() : log_(logger::create_logger("Reactor")) , thread_pool_(4, [this]() { handle_events(); }) { } reactor::~reactor() { if (is_running()) { try { shutdown(); } catch (const std::exception& e) { log_.error("Error during shutdown: %s", e.what()); } } } void reactor::register_handler(const handler_ptr& h, event_type type) { if (!h) { throw std::invalid_argument("Null handler"); } std::unique_lock lock(handlers_mutex_); auto* raw_ptr = h.get(); if (handlers_.find(raw_ptr) != handlers_.end()) { throw std::runtime_error("Handler already registered"); } safe_handler_operation(h, "register", [&] { h->register_reactor(this); h->open(); handlers_.emplace(raw_ptr, std::make_unique(h, type)); }); interrupter_.interrupt(); } void reactor::unregister_handler(const handler_ptr& h, event_type type) { if (!h) return; std::unique_lock lock(handlers_mutex_); if (find_handler_entry(h)) { safe_handler_operation(h, "unregister", [&] { h->close(); remove_handler_entry(h); }); } interrupter_.interrupt(); } void reactor::schedule_timer(const handler_ptr& h, time_t offset, time_t interval) { if (!h) return; std::unique_lock lock(handlers_mutex_); auto* entry = find_handler_entry(h); if (!entry) { entry = handlers_.emplace(h.get(), std::make_unique(h, event_type::TIMEOUT_MASK)).first->second.get(); } std::lock_guard timer_lock(timers_mutex_); time_t now = std::time(nullptr); entry->next_timeout = now + offset; entry->interval = interval; timers_.push(TimerEvent{entry->next_timeout, h}); interrupter_.interrupt(); } void reactor::cancel_timer(const handler_ptr& h) { if (!h) return; std::unique_lock lock(handlers_mutex_); if (auto* entry = find_handler_entry(h)) { entry->next_timeout = 0; entry->interval = 0; } } void reactor::run() { running_ = true; thread_pool_.start(); { std::unique_lock lock(handlers_mutex_); shutdown_cv_.wait(lock, [this] { return shutdown_requested_.load(); }); cleanup_deleted_handlers(); } thread_pool_.stop(); running_ = false; } void reactor::handle_events() { time_t timeout; select_fdsets fd_sets; prepare_select_bits(timeout, fd_sets); if (!fd_sets.maxp1() && timeout == (std::numeric_limits::max)()) { return; } struct timeval select_timeout{}; struct timeval* timeout_ptr = nullptr; if (timeout < (std::numeric_limits::max)()) { select_timeout.tv_sec = timeout; select_timeout.tv_usec = 0; timeout_ptr = &select_timeout; } try { if (int result = perform_select(timeout_ptr, fd_sets); result < 0) { if (errno != EINTR) { throw std::system_error(errno, std::system_category(), "select failed"); } return; } time_t now = std::time(nullptr); if (check_interruption(fd_sets)) { return; } if (process_events(fd_sets, now)) { thread_pool_.promote_new_leader(); } process_timers(now); if (now - last_cleanup_ >= CLEANUP_INTERVAL.count()) { cleanup_deleted_handlers(); last_cleanup_ = now; } } catch (const std::exception& e) { log_.error("Error in handle_events: %s", e.what()); stats_.errors_++; } } void reactor::shutdown() { if (!running_) return; log_.info("Initiating reactor shutdown"); shutdown_requested_ = true; interrupter_.interrupt(); shutdown_cv_.notify_all(); } void reactor::mark_handler_for_delete(const handler_ptr& h) { if (!h) return; std::unique_lock lock(handlers_mutex_); if (auto* entry = find_handler_entry(h)) { entry->marked_for_deletion = true; } } void reactor::safe_handler_operation(const handler_ptr& h, const std::string& op_name, const std::function& operation) { try { operation(); } catch (const std::exception& e) { log_.error("Error during %s operation: %s", op_name.c_str(), e.what()); ++stats_.errors_; throw; } } reactor::HandlerEntry* reactor::find_handler_entry(const handler_ptr& h) { const auto it = handlers_.find(h.get()); return it != handlers_.end() ? it->second.get() : nullptr; } void reactor::remove_handler_entry(const handler_ptr& h) { handlers_.erase(h.get()); } void reactor::update_handler_events(HandlerEntry* entry, const event_type ev, const bool activate) { if (!entry) return; if (activate) { entry->events |= ev; } else { entry->events &= ~ev; } } void reactor::prepare_select_bits(time_t& timeout, select_fdsets& fd_sets) const { std::lock_guard l(handlers_mutex_); fd_sets.reset(); const time_t now = ::time(nullptr); timeout = (std::numeric_limits::max)(); // set interrupter fd fd_sets.read_set().set(interrupter_.socket_id()); for (const auto & [hndlr, entry] : handlers_) { if (hndlr == nullptr) { continue; } if (hndlr->is_ready_read() && is_event_type_set(entry->events, event_type::READ_MASK)) { fd_sets.read_set().set(hndlr->handle()); } if (hndlr->is_ready_write() && is_event_type_set(entry->events, event_type::WRITE_MASK)) { fd_sets.write_set().set(hndlr->handle()); } if (hndlr->next_timeout() > 0 && is_event_type_set(entry->events, event_type::TIMEOUT_MASK)) { timeout = (std::min)(timeout, hndlr->next_timeout() <= now ? 0 : (hndlr->next_timeout() - now)); } } } reactor::t_handler_type reactor::resolve_next_handler(time_t now, select_fdsets& fd_sets) { std::lock_guard l(handlers_mutex_); for (auto & [hndlr, entry] : handlers_) { if (hndlr->handle() > 0 && fd_sets.write_set().is_set(hndlr->handle())) { return std::make_pair(hndlr, event_type::WRITE_MASK); } if (hndlr->handle() > 0 && fd_sets.read_set().is_set(hndlr->handle())) { return std::make_pair(hndlr, event_type::READ_MASK); } if (hndlr->next_timeout() > 0 && hndlr->next_timeout() <= now) { return std::make_pair(hndlr, event_type::TIMEOUT_MASK); } } return std::make_pair(nullptr, event_type::NONE_MASK); } select_fdsets reactor::fd_sets() const { time_t timeout; select_fdsets fd_sets; prepare_select_bits(timeout, fd_sets); return fd_sets; } void reactor::activate_handler(const reactor::handler_ptr &h, event_type ev) { std::lock_guard l(handlers_mutex_); const auto it = find_handler_entry(h); if (!it) { return; } it->events |= ev; } void reactor::deactivate_handler(const reactor::handler_ptr &h, event_type ev) { std::lock_guard l(handlers_mutex_); const auto it = find_handler_entry(h); if (!it) { return; } it->events &= ~ev; } } // namespace matador