273 lines
7.1 KiB
C++
273 lines
7.1 KiB
C++
#include "matador/net/reactor.hpp"
|
|
#include "matador/net/handler.hpp"
|
|
#include "matador/logger/log_manager.hpp"
|
|
|
|
#include <system_error>
|
|
|
|
namespace matador::net {
|
|
|
|
reactor::reactor()
|
|
: log_(logger::create_logger("Reactor"))
|
|
, thread_pool_(4, [this]() { handle_events(); }) {
|
|
}
|
|
|
|
reactor::~reactor() {
|
|
if (is_running()) {
|
|
try {
|
|
shutdown();
|
|
} catch (const std::exception& e) {
|
|
log_.error("Error during shutdown: %s", e.what());
|
|
}
|
|
}
|
|
}
|
|
|
|
void reactor::register_handler(const handler_ptr& h, event_type type) {
|
|
if (!h) {
|
|
throw std::invalid_argument("Null handler");
|
|
}
|
|
|
|
std::unique_lock lock(handlers_mutex_);
|
|
|
|
auto* raw_ptr = h.get();
|
|
if (handlers_.find(raw_ptr) != handlers_.end()) {
|
|
throw std::runtime_error("Handler already registered");
|
|
}
|
|
|
|
safe_handler_operation(h, "register", [&] {
|
|
h->register_reactor(this);
|
|
h->open();
|
|
handlers_.emplace(raw_ptr, std::make_unique<HandlerEntry>(h, type));
|
|
});
|
|
|
|
interrupter_.interrupt();
|
|
}
|
|
|
|
void reactor::unregister_handler(const handler_ptr& h, event_type type) {
|
|
if (!h) return;
|
|
|
|
std::unique_lock lock(handlers_mutex_);
|
|
|
|
if (find_handler_entry(h)) {
|
|
safe_handler_operation(h, "unregister", [&] {
|
|
h->close();
|
|
remove_handler_entry(h);
|
|
});
|
|
}
|
|
|
|
interrupter_.interrupt();
|
|
}
|
|
|
|
void reactor::schedule_timer(const handler_ptr& h, time_t offset, time_t interval) {
|
|
if (!h) return;
|
|
|
|
std::unique_lock lock(handlers_mutex_);
|
|
auto* entry = find_handler_entry(h);
|
|
|
|
if (!entry) {
|
|
entry = handlers_.emplace(h.get(),
|
|
std::make_unique<HandlerEntry>(h, event_type::TIMEOUT_MASK)).first->second.get();
|
|
}
|
|
|
|
std::lock_guard<std::mutex> timer_lock(timers_mutex_);
|
|
time_t now = std::time(nullptr);
|
|
entry->next_timeout = now + offset;
|
|
entry->interval = interval;
|
|
|
|
timers_.push(TimerEvent{entry->next_timeout, h});
|
|
|
|
interrupter_.interrupt();
|
|
}
|
|
|
|
void reactor::cancel_timer(const handler_ptr& h) {
|
|
if (!h) return;
|
|
|
|
std::unique_lock lock(handlers_mutex_);
|
|
if (auto* entry = find_handler_entry(h)) {
|
|
entry->next_timeout = 0;
|
|
entry->interval = 0;
|
|
}
|
|
}
|
|
|
|
void reactor::run() {
|
|
running_ = true;
|
|
thread_pool_.start();
|
|
|
|
{
|
|
std::unique_lock lock(handlers_mutex_);
|
|
shutdown_cv_.wait(lock, [this] { return shutdown_requested_.load(); });
|
|
cleanup_deleted_handlers();
|
|
}
|
|
|
|
thread_pool_.stop();
|
|
running_ = false;
|
|
}
|
|
|
|
void reactor::handle_events() {
|
|
time_t timeout;
|
|
select_fdsets fd_sets;
|
|
|
|
prepare_select_bits(timeout, fd_sets);
|
|
|
|
if (!fd_sets.maxp1() && timeout == (std::numeric_limits<time_t>::max)()) {
|
|
return;
|
|
}
|
|
|
|
struct timeval select_timeout{};
|
|
struct timeval* timeout_ptr = nullptr;
|
|
|
|
if (timeout < (std::numeric_limits<time_t>::max)()) {
|
|
select_timeout.tv_sec = timeout;
|
|
select_timeout.tv_usec = 0;
|
|
timeout_ptr = &select_timeout;
|
|
}
|
|
|
|
try {
|
|
if (int result = perform_select(timeout_ptr, fd_sets); result < 0) {
|
|
if (errno != EINTR) {
|
|
throw std::system_error(errno, std::system_category(), "select failed");
|
|
}
|
|
return;
|
|
}
|
|
|
|
time_t now = std::time(nullptr);
|
|
|
|
if (check_interruption(fd_sets)) {
|
|
return;
|
|
}
|
|
|
|
if (process_events(fd_sets, now)) {
|
|
thread_pool_.promote_new_leader();
|
|
}
|
|
|
|
process_timers(now);
|
|
|
|
if (now - last_cleanup_ >= CLEANUP_INTERVAL.count()) {
|
|
cleanup_deleted_handlers();
|
|
last_cleanup_ = now;
|
|
}
|
|
|
|
} catch (const std::exception& e) {
|
|
log_.error("Error in handle_events: %s", e.what());
|
|
stats_.errors_++;
|
|
}
|
|
}
|
|
|
|
void reactor::shutdown() {
|
|
if (!running_) return;
|
|
|
|
log_.info("Initiating reactor shutdown");
|
|
shutdown_requested_ = true;
|
|
interrupter_.interrupt();
|
|
shutdown_cv_.notify_all();
|
|
}
|
|
|
|
void reactor::mark_handler_for_delete(const handler_ptr& h) {
|
|
if (!h) return;
|
|
|
|
std::unique_lock lock(handlers_mutex_);
|
|
if (auto* entry = find_handler_entry(h)) {
|
|
entry->marked_for_deletion = true;
|
|
}
|
|
}
|
|
|
|
void reactor::safe_handler_operation(const handler_ptr& h, const std::string& op_name, const std::function<void()>& operation) {
|
|
try {
|
|
operation();
|
|
} catch (const std::exception& e) {
|
|
log_.error("Error during %s operation: %s", op_name.c_str(), e.what());
|
|
++stats_.errors_;
|
|
throw;
|
|
}
|
|
}
|
|
|
|
reactor::HandlerEntry* reactor::find_handler_entry(const handler_ptr& h) {
|
|
const auto it = handlers_.find(h.get());
|
|
return it != handlers_.end() ? it->second.get() : nullptr;
|
|
}
|
|
|
|
void reactor::remove_handler_entry(const handler_ptr& h) {
|
|
handlers_.erase(h.get());
|
|
}
|
|
|
|
void reactor::update_handler_events(HandlerEntry* entry, const event_type ev, const bool activate) {
|
|
if (!entry) return;
|
|
|
|
if (activate) {
|
|
entry->events |= ev;
|
|
} else {
|
|
entry->events &= ~ev;
|
|
}
|
|
}
|
|
|
|
void reactor::prepare_select_bits(time_t& timeout, select_fdsets& fd_sets) const
|
|
{
|
|
std::lock_guard l(handlers_mutex_);
|
|
fd_sets.reset();
|
|
const time_t now = ::time(nullptr);
|
|
timeout = (std::numeric_limits<time_t>::max)();
|
|
|
|
// set interrupter fd
|
|
fd_sets.read_set().set(interrupter_.socket_id());
|
|
|
|
for (const auto & [hndlr, entry] : handlers_) {
|
|
if (hndlr == nullptr) {
|
|
continue;
|
|
}
|
|
if (hndlr->is_ready_read() && is_event_type_set(entry->events, event_type::READ_MASK)) {
|
|
fd_sets.read_set().set(hndlr->handle());
|
|
}
|
|
if (hndlr->is_ready_write() && is_event_type_set(entry->events, event_type::WRITE_MASK)) {
|
|
fd_sets.write_set().set(hndlr->handle());
|
|
}
|
|
if (hndlr->next_timeout() > 0 && is_event_type_set(entry->events, event_type::TIMEOUT_MASK)) {
|
|
timeout = (std::min)(timeout, hndlr->next_timeout() <= now ? 0 : (hndlr->next_timeout() - now));
|
|
}
|
|
}
|
|
}
|
|
|
|
reactor::t_handler_type reactor::resolve_next_handler(time_t now, select_fdsets& fd_sets)
|
|
{
|
|
std::lock_guard l(handlers_mutex_);
|
|
for (auto & [hndlr, entry] : handlers_) {
|
|
if (hndlr->handle() > 0 && fd_sets.write_set().is_set(hndlr->handle())) {
|
|
return std::make_pair(hndlr, event_type::WRITE_MASK);
|
|
}
|
|
if (hndlr->handle() > 0 && fd_sets.read_set().is_set(hndlr->handle())) {
|
|
return std::make_pair(hndlr, event_type::READ_MASK);
|
|
}
|
|
if (hndlr->next_timeout() > 0 && hndlr->next_timeout() <= now) {
|
|
return std::make_pair(hndlr, event_type::TIMEOUT_MASK);
|
|
}
|
|
}
|
|
return std::make_pair(nullptr, event_type::NONE_MASK);
|
|
}
|
|
|
|
select_fdsets reactor::fd_sets() const
|
|
{
|
|
time_t timeout;
|
|
select_fdsets fd_sets;
|
|
prepare_select_bits(timeout, fd_sets);
|
|
return fd_sets;
|
|
}
|
|
|
|
void reactor::activate_handler(const reactor::handler_ptr &h, event_type ev)
|
|
{
|
|
std::lock_guard l(handlers_mutex_);
|
|
const auto it = find_handler_entry(h);
|
|
if (!it) {
|
|
return;
|
|
}
|
|
it->events |= ev;
|
|
}
|
|
|
|
void reactor::deactivate_handler(const reactor::handler_ptr &h, event_type ev)
|
|
{
|
|
std::lock_guard l(handlers_mutex_);
|
|
const auto it = find_handler_entry(h);
|
|
if (!it) {
|
|
return;
|
|
}
|
|
it->events &= ~ev;
|
|
}
|
|
|
|
} // namespace matador
|