From f32594d9fdd5b426da56b92d5b920d04473f959f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20K=C3=BChl?= Date: Fri, 18 Jul 2025 15:33:04 +0200 Subject: [PATCH] started reactor refactoring --- include/matador/net/ip.hpp | 28 +- include/matador/net/peer.hpp | 60 +-- include/matador/net/reactor.hpp | 16 +- include/matador/net/select_fd_sets.hpp | 2 +- include/matador/net/socket.hpp | 5 +- include/matador/net/socket.tpp | 4 +- include/matador/net/socket_interrupter.hpp | 8 +- source/core/net/os.cpp | 3 - source/core/net/reactor.cpp | 582 ++++++++------------- source/core/net/socket_interrupter.cpp | 4 +- test/core/net/ReactorTest.cpp | 266 ++++++++++ 11 files changed, 544 insertions(+), 434 deletions(-) create mode 100644 test/core/net/ReactorTest.cpp diff --git a/include/matador/net/ip.hpp b/include/matador/net/ip.hpp index cab483b..4fea5f2 100644 --- a/include/matador/net/ip.hpp +++ b/include/matador/net/ip.hpp @@ -16,24 +16,24 @@ namespace matador::net { */ class tcp { public: - typedef peer_base peer; /**< Shortcut to a tcp based peer */ - typedef socket_stream socket; /**< Shortcut to a tcp based socket */ - typedef socket_acceptor acceptor; /**< Shortcut to a tcp based acceptor */ - typedef address_resolver resolver; /**< Shortcut to a tcp based address resolver */ + typedef peer_base peer; /**< Shortcut to a tcp-based peer */ + typedef socket_stream socket; /**< Shortcut to a tcp-based socket */ + typedef socket_acceptor acceptor; /**< Shortcut to a tcp-based acceptor */ + typedef address_resolver resolver; /**< Shortcut to a tcp-based address resolver */ /** * Returns the type of the socket * * @return Type of the socket */ - int type() const { return SOCK_STREAM; } + static int type() { return SOCK_STREAM; } /** * Returns the socket protocol * * @return Socket protocol */ - int protocol() const { return IPPROTO_TCP; } + static int protocol() { return IPPROTO_TCP; } /** * Returns the socket family @@ -59,7 +59,7 @@ public: static tcp v6() { return tcp(PF_INET6); } private: - explicit tcp(int family) + explicit tcp(const int family) : family_(family) {} @@ -74,24 +74,24 @@ private: class OOS_NET_API udp { public: - typedef peer_base peer; /**< Shortcut to a udp based peer */ - typedef socket_stream socket; /**< Shortcut to a udp based socket */ - typedef socket_acceptor acceptor; /**< Shortcut to a udp based acceptor */ - typedef address_resolver resolver; /**< Shortcut to a udp based address resolver */ + typedef peer_base peer; /**< Shortcut to an udp-based peer */ + typedef socket_stream socket; /**< Shortcut to an udp-based socket */ + typedef socket_acceptor acceptor; /**< Shortcut to an udp-based acceptor */ + typedef address_resolver resolver; /**< Shortcut to an udp-based address resolver */ /** * Returns the type of the socket * * @return Type of the socket */ - int type() const { return SOCK_DGRAM; } + static int type() { return SOCK_DGRAM; } /** * Returns the socket protocol * * @return Socket protocol */ - int protocol() const { return IPPROTO_UDP; } + static int protocol() { return IPPROTO_UDP; } /** * Returns the socket family @@ -117,7 +117,7 @@ public: static udp v6() { return udp(PF_INET6); } private: - explicit udp(int family) + explicit udp(const int family) : family_(family) {} diff --git a/include/matador/net/peer.hpp b/include/matador/net/peer.hpp index aa86a2d..130c37f 100644 --- a/include/matador/net/peer.hpp +++ b/include/matador/net/peer.hpp @@ -11,7 +11,7 @@ #include -namespace matador { +namespace matador::net { /** * The peer_base class acts like the holder @@ -25,7 +25,7 @@ template < class P > class peer_base { public: - typedef P protocol_type; /**< Short to protocol type */ + typedef P protocol_type; /**< Short to a protocol-type */ /** * Default constructor @@ -48,9 +48,8 @@ public: * @param addr Address to create the peer from * @param port Port of the endpoint */ - peer_base(address addr, unsigned short port) - : addr_(std::move(addr)) - { + peer_base(address addr, const unsigned short port) + : addr_(std::move(addr)) { addr_.port(port); } @@ -60,8 +59,7 @@ public: * @param x Peer to copy from */ peer_base(const peer_base &x) - : addr_(x.addr_) - {} + : addr_(x.addr_) {} /** * Move creates a peer from a given peer @@ -69,8 +67,7 @@ public: * @param x Peer to move from */ peer_base(peer_base &&x) noexcept - : addr_(std::move(x.addr_)) - {} + : addr_(std::move(x.addr_)) {} /** * Copy assigns a given peer to this peer @@ -78,8 +75,7 @@ public: * @param x Peer to assign * @return The assigned peer */ - peer_base& operator=(const peer_base &x) - { + peer_base& operator=(const peer_base &x) { addr_ = x.addr_; return *this; } @@ -90,8 +86,7 @@ public: * @param x The peer to move assign * @return The moved peer */ - peer_base& operator=(peer_base &&x) noexcept - { + peer_base& operator=(peer_base &&x) noexcept { addr_ = std::move(x.addr_); return *this; } @@ -114,13 +109,8 @@ public: * * @return The current IP protocol */ - protocol_type protocol() const - { - if (addr_.is_v4()) { - return protocol_type::v4(); - } else { - return protocol_type::v6(); - } + protocol_type protocol() const { + return addr_.is_v4() ? protocol_type::v4() : protocol_type::v6(); } /** @@ -128,8 +118,7 @@ public: * * @return The raw pointer to the sockaddr structure */ - sockaddr* data() - { + sockaddr* data() { return addr_.addr(); } @@ -138,8 +127,7 @@ public: * * @return The raw pointer to the sockaddr structure */ - const sockaddr* data() const - { + const sockaddr* data() const { return addr_.addr(); } @@ -148,8 +136,7 @@ public: * * @return The size of the underlying sockaddr structure */ - size_t size() const - { + size_t size() const { return addr_.size(); } @@ -158,8 +145,7 @@ public: * * @return A reference to the address */ - address& addr() - { + address& addr() { return addr_; } @@ -168,8 +154,7 @@ public: * * @return A reference to the address */ - const address& addr() const - { + const address& addr() const { return addr_; } @@ -179,20 +164,19 @@ public: * * @return Returns a string representation of the peer */ - std::string to_string() const - { - char addstr[INET6_ADDRSTRLEN + 8]; + std::string to_string() const { + char address_str[INET6_ADDRSTRLEN + 8]; const char *name; if (addr().is_v4()) { - name = os::inet_ntop(addr_.addr()->sa_family, &addr_.addr_v4()->sin_addr, addstr, INET6_ADDRSTRLEN); + name = os::inet_ntop(addr_.addr()->sa_family, &addr_.addr_v4()->sin_addr, address_str, INET6_ADDRSTRLEN); } else { - name = os::inet_ntop(addr_.addr()->sa_family, &addr_.addr_v6()->sin6_addr, addstr, INET6_ADDRSTRLEN); + name = os::inet_ntop(addr_.addr()->sa_family, &addr_.addr_v6()->sin6_addr, address_str, INET6_ADDRSTRLEN); } - size_t pos = strlen(name); + const size_t pos = strlen(name); - snprintf(addstr+pos, INET6_ADDRSTRLEN+8-pos, ":%d", addr_.port()); - return addstr; + snprintf(address_str + pos, INET6_ADDRSTRLEN+8-pos, ":%d", addr_.port()); + return address_str; } private: diff --git a/include/matador/net/reactor.hpp b/include/matador/net/reactor.hpp index 03a8a7b..e54bbfb 100644 --- a/include/matador/net/reactor.hpp +++ b/include/matador/net/reactor.hpp @@ -14,6 +14,7 @@ #include #include #include +#include namespace matador::net { @@ -56,8 +57,8 @@ public: void run(); void handle_events(); void shutdown(); - bool is_running() const { return running_; } - select_fdsets fdsets() const; + [[nodiscard]] bool is_running() const { return running_; } + select_fdsets fd_sets() const; void mark_handler_for_delete(const handler_ptr& h); void activate_handler(const handler_ptr& h, event_type ev); void deactivate_handler(const handler_ptr& h, event_type ev); @@ -65,7 +66,7 @@ public: private: struct TimerEvent { - time_t timeout; + time_t timeout{}; handler_weak_ptr handler; bool operator>(const TimerEvent& other) const { @@ -89,7 +90,7 @@ private: HandlerEntry* find_handler_entry(const handler_ptr& h); void remove_handler_entry(const handler_ptr& h); - void update_handler_events(HandlerEntry* entry, event_type ev, bool activate); + static void update_handler_events(HandlerEntry* entry, event_type ev, bool activate); private: handler_map handlers_; @@ -100,12 +101,13 @@ private: mutable std::shared_mutex handlers_mutex_; std::mutex timers_mutex_; - std::condition_variable shutdown_cv_; + std::condition_variable_any shutdown_cv_; + // std::condition_variable shutdown_cv_; - logger log_; + logger::logger log_; Statistics stats_; - leader_follower_thread_pool thread_pool_; + utils::leader_follower_thread_pool thread_pool_; socket_interrupter interrupter_; static constexpr std::chrono::seconds CLEANUP_INTERVAL{60}; diff --git a/include/matador/net/select_fd_sets.hpp b/include/matador/net/select_fd_sets.hpp index 5586149..0fe76ab 100644 --- a/include/matador/net/select_fd_sets.hpp +++ b/include/matador/net/select_fd_sets.hpp @@ -3,7 +3,7 @@ #include "matador/net/fdset.hpp" -namespace matador { +namespace matador::net { /** * This class represents three fd sets diff --git a/include/matador/net/socket.hpp b/include/matador/net/socket.hpp index 721b2e5..df5bdf3 100644 --- a/include/matador/net/socket.hpp +++ b/include/matador/net/socket.hpp @@ -25,8 +25,7 @@ namespace matador { * @tparam P Protocol type */ template < class P > -class socket_base -{ +class socket_base { public: typedef P protocol_type; /**< Shortcut to the protocol type */ typedef typename P::peer peer_type; /**< Shortcut to the peer type */ @@ -42,7 +41,7 @@ public: /** * Creates a socket with the given peer * - * @param peer Peer used to initialize the socket + * @param peer Peer used to initialise the socket */ explicit socket_base(const peer_type &peer); diff --git a/include/matador/net/socket.tpp b/include/matador/net/socket.tpp index 037f86a..3c6bd28 100644 --- a/include/matador/net/socket.tpp +++ b/include/matador/net/socket.tpp @@ -207,7 +207,7 @@ socket_type connect(socket_stream

&stream, const char* hostname, unsigned sho do { conn_fd = ::socket(res->ai_family, res->ai_socktype, res->ai_protocol); if (!is_valid_socket(conn_fd)) { - // error, try next one + // error, try the next one continue; } @@ -220,7 +220,7 @@ socket_type connect(socket_stream

&stream, const char* hostname, unsigned sho // throw_logic_error("couldn't execute: " << strerror(errno)); } - // bind error, close and try next one + // bind error, close and try the next one os::shutdown(conn_fd, os::shutdown_type::READ_WRITE); } while ( (res = res->ai_next) != nullptr); diff --git a/include/matador/net/socket_interrupter.hpp b/include/matador/net/socket_interrupter.hpp index ce0dac1..b7bfe95 100644 --- a/include/matador/net/socket_interrupter.hpp +++ b/include/matador/net/socket_interrupter.hpp @@ -3,7 +3,7 @@ #include "matador/net/ip.hpp" -// #include "matador/logger/logger.hpp" +#include "matador/logger/logger.hpp" #include @@ -23,10 +23,10 @@ public: bool reset(); private: - matador::tcp::socket server_; - matador::tcp::socket client_; + tcp::socket server_; + tcp::socket client_; - // matador::logger log_; + logger::logger log_; std::array indicator_ = { { 0 } }; std::array consumer_ = {}; diff --git a/source/core/net/os.cpp b/source/core/net/os.cpp index 48ce4fb..99937a1 100644 --- a/source/core/net/os.cpp +++ b/source/core/net/os.cpp @@ -14,9 +14,6 @@ void init() { #ifdef _WIN32 WSADATA wsaData; // if this doesn't work - //WSAData wsaData; // then try this instead - - // MAKEWORD(1,1) for Winsock 1.1, MAKEWORD(2,0) for Winsock 2.0: if (WSAStartup(MAKEWORD(1,1), &wsaData) != 0) { fprintf(stderr, "WSAStartup failed.\n"); diff --git a/source/core/net/reactor.cpp b/source/core/net/reactor.cpp index 957adc7..ca89470 100644 --- a/source/core/net/reactor.cpp +++ b/source/core/net/reactor.cpp @@ -1,342 +1,248 @@ #include "matador/net/reactor.hpp" #include "matador/net/handler.hpp" - #include "matador/logger/log_manager.hpp" -#include -#include -#include -#include -#include +#include -namespace matador { +namespace matador::net { reactor::reactor() - : sentinel_(std::shared_ptr(nullptr)) - , log_(create_logger("Reactor")) - , thread_pool_(4, [this]() { handle_events(); }) -{ -} -reactor::~reactor() -{ - log_.debug("destroying reactor"); - thread_pool_.shutdown(); +: log_(logger::create_logger("Reactor")) +, thread_pool_(4, [this]() { handle_events(); }) { } -void reactor::register_handler(const handler_ptr& h, event_type et) -{ - h->register_reactor(this); - h->open(); - - std::lock_guard l(mutex_); - auto it = find_handler_type(h); - - if (it == handlers_.end()) { - handlers_.emplace_back(h, et); - } else if (it->first != h) { - throw std::logic_error("given handler isn't expected handler"); - } else { - it->second = it->second | et; - } - interrupt_without_lock(); -} - -void reactor::unregister_handler(const handler_ptr& h, event_type) -{ - std::lock_guard l(mutex_); - auto it = find_handler_type(h); - - if (it != handlers_.end()) { - (*it).first->close(); - handlers_.erase(it); - } - interrupt_without_lock(); -} - -void reactor::schedule_timer(const std::shared_ptr& h, time_t offset, time_t interval) -{ - h->register_reactor(this); - - std::lock_guard l(mutex_); - auto it = find_handler_type(h); - - if (it == handlers_.end()) { - handlers_.emplace_back(h, event_type::TIMEOUT_MASK); - } - - h->schedule(offset, interval); -} - -void reactor::cancel_timer(const std::shared_ptr& h) -{ - std::lock_guard l(mutex_); - auto it = find_handler_type(h); - - if (it != handlers_.end()) { - handlers_.erase(it); - } - - h->cancel_timer(); -} - -void reactor::run() -{ -// log_.info("start dispatching all clients"); - thread_pool_.start(); - - { -// log_.info("waiting for reactor shutdown"); - std::unique_lock l(mutex_); - shutdown_.wait(l, [this]() { return shutdown_requested_.load(); }); - cleanup(); - } -// log_.info("all clients dispatched; shutting down"); - thread_pool_.stop(); -} - -void reactor::handle_events() -{ -// std::cout << this << " start handle events\n" << std::flush; - -// log_.info("handle events"); - - running_ = true; - time_t timeout; - select_fdsets fd_sets; - prepare_select_bits(timeout, fd_sets); - -// std::cout << this << " fd sets r: " << fd_sets.read_set().count() << ", w: " << fd_sets.write_set().count() << ", e: " << fd_sets.except_set().count() << ", max: " << fd_sets.maxp1() << "\n" << std::flush; - -// log_.debug("fds [r: %d, w: %d, e: %d]", -// fdsets_.read_set().count(), -// fdsets_.write_set().count(), -// fdsets_.except_set().count()); - -// if (timeout != (std::numeric_limits::max)()) { -// log_.debug("next timeout in %d sec", timeout); -// } - struct timeval tselect{}; - struct timeval* p = nullptr; - if (timeout < (std::numeric_limits::max)()) { - tselect.tv_sec = timeout; - tselect.tv_usec = 0; - p = &tselect; -// std::cout << this << " next timeout in " << p->tv_sec << " seconds\n" << std::flush; - } - - if (!has_clients_to_handle(timeout, fd_sets)) { -// std::cout << this << " no clients to handle; returning\n" << std::flush; -// log_.info("no clients to handle, exiting"); - return; - } - - int ret; - while ((ret = select(p, fd_sets)) < 0) { -// std::cout << this << " select returned with error " << ret << "\n" << std::flush; - if(errno != EINTR) { - char error_buffer[1024]; - log_.warn("select failed: %s", os::strerror(errno, error_buffer, 1024)); - shutdown(); - } else { - return; +reactor::~reactor() { + if (is_running()) { + try { + shutdown(); + } catch (const std::exception& e) { + log_.error("Error during shutdown: %s", e.what()); + } } - } - -// std::cout << this << " select returned with active requests " << ret << "\n" << std::flush; - - bool interrupted = is_interrupted(fd_sets); - - if (interrupted) { - if (!shutdown_requested_) { -// std::cout << this << " reactor was interrupted - promote new leader\n" << std::flush; -// log_.info("reactor was interrupted"); - thread_pool_.promote_new_leader(); - return; - } else { -// std::cout << this << " reactor was interrupted for shutdown\n" << std::flush; -// log_.info("shutting down"); -// cleanup(); - shutdown_.notify_one(); - return; - } - } - - time_t now = ::time(nullptr); - t_handler_type handler_type = resolve_next_handler(now, fd_sets); - - if (handler_type.first) { - deactivate_handler(handler_type.first, handler_type.second); -// std::cout << this << " handling client " << handler_type.first->name() << " - promoting new leader\n" << std::flush; - - thread_pool_.promote_new_leader(); -// log_.info("start handling event"); - // handle event - if (handler_type.second == event_type::WRITE_MASK) { - on_write_mask(handler_type.first); - } else if (handler_type.second == event_type::READ_MASK) { - on_read_mask(handler_type.first); - } else if (handler_type.second == event_type::TIMEOUT_MASK) { - on_timeout(handler_type.first, now); - } else { -// log_.info("unknown event type"); - } - activate_handler(handler_type.first, handler_type.second); - remove_deleted(); - } else { - // no handler found -// log_.info("no handler found"); - thread_pool_.promote_new_leader(); - } } -void reactor::shutdown() -{ - if (!is_running()) { - return; - } - // shutdown the reactor properly - log_.info("shutting down reactor"); - shutdown_requested_ = true; - interrupt(); +void reactor::register_handler(const handler_ptr& h, event_type type) { + if (!h) { + throw std::invalid_argument("Null handler"); + } + + std::unique_lock lock(handlers_mutex_); + + auto* raw_ptr = h.get(); + if (handlers_.find(raw_ptr) != handlers_.end()) { + throw std::runtime_error("Handler already registered"); + } + + safe_handler_operation(h, "register", [&] { + h->register_reactor(this); + h->open(); + handlers_.emplace(raw_ptr, std::make_unique(h, type)); + }); + + interrupter_.interrupt(); } -bool reactor::is_running() const -{ - return running_; +void reactor::unregister_handler(const handler_ptr& h, event_type type) { + if (!h) return; + + std::unique_lock lock(handlers_mutex_); + + if (find_handler_entry(h)) { + safe_handler_operation(h, "unregister", [&] { + h->close(); + remove_handler_entry(h); + }); + } + + interrupter_.interrupt(); +} + +void reactor::schedule_timer(const handler_ptr& h, time_t offset, time_t interval) { + if (!h) return; + + std::unique_lock lock(handlers_mutex_); + auto* entry = find_handler_entry(h); + + if (!entry) { + entry = handlers_.emplace(h.get(), + std::make_unique(h, event_type::TIMEOUT_MASK)).first->second.get(); + } + + std::lock_guard timer_lock(timers_mutex_); + time_t now = std::time(nullptr); + entry->next_timeout = now + offset; + entry->interval = interval; + + timers_.push(TimerEvent{entry->next_timeout, h}); + + interrupter_.interrupt(); +} + +void reactor::cancel_timer(const handler_ptr& h) { + if (!h) return; + + std::unique_lock lock(handlers_mutex_); + if (auto* entry = find_handler_entry(h)) { + entry->next_timeout = 0; + entry->interval = 0; + } +} + +void reactor::run() { + running_ = true; + thread_pool_.start(); + + { + std::unique_lock lock(handlers_mutex_); + shutdown_cv_.wait(lock, [this] { return shutdown_requested_.load(); }); + cleanup_deleted_handlers(); + } + + thread_pool_.stop(); + running_ = false; +} + +void reactor::handle_events() { + time_t timeout; + select_fdsets fd_sets; + + prepare_select_bits(timeout, fd_sets); + + if (!fd_sets.maxp1() && timeout == (std::numeric_limits::max)()) { + return; + } + + struct timeval select_timeout{}; + struct timeval* timeout_ptr = nullptr; + + if (timeout < (std::numeric_limits::max)()) { + select_timeout.tv_sec = timeout; + select_timeout.tv_usec = 0; + timeout_ptr = &select_timeout; + } + + try { + if (int result = perform_select(timeout_ptr, fd_sets); result < 0) { + if (errno != EINTR) { + throw std::system_error(errno, std::system_category(), "select failed"); + } + return; + } + + time_t now = std::time(nullptr); + + if (check_interruption(fd_sets)) { + return; + } + + if (process_events(fd_sets, now)) { + thread_pool_.promote_new_leader(); + } + + process_timers(now); + + if (now - last_cleanup_ >= CLEANUP_INTERVAL.count()) { + cleanup_deleted_handlers(); + last_cleanup_ = now; + } + + } catch (const std::exception& e) { + log_.error("Error in handle_events: %s", e.what()); + stats_.errors_++; + } +} + +void reactor::shutdown() { + if (!running_) return; + + log_.info("Initiating reactor shutdown"); + shutdown_requested_ = true; + interrupter_.interrupt(); + shutdown_cv_.notify_all(); +} + +void reactor::mark_handler_for_delete(const handler_ptr& h) { + if (!h) return; + + std::unique_lock lock(handlers_mutex_); + if (auto* entry = find_handler_entry(h)) { + entry->marked_for_deletion = true; + } +} + +void reactor::safe_handler_operation(const handler_ptr& h, const std::string& op_name, const std::function& operation) { + try { + operation(); + } catch (const std::exception& e) { + log_.error("Error during %s operation: %s", op_name.c_str(), e.what()); + ++stats_.errors_; + throw; + } +} + +reactor::HandlerEntry* reactor::find_handler_entry(const handler_ptr& h) { + const auto it = handlers_.find(h.get()); + return it != handlers_.end() ? it->second.get() : nullptr; +} + +void reactor::remove_handler_entry(const handler_ptr& h) { + handlers_.erase(h.get()); +} + +void reactor::update_handler_events(HandlerEntry* entry, const event_type ev, const bool activate) { + if (!entry) return; + + if (activate) { + entry->events |= ev; + } else { + entry->events &= ~ev; + } } void reactor::prepare_select_bits(time_t& timeout, select_fdsets& fd_sets) const { - std::lock_guard l(mutex_); - fd_sets.reset(); - time_t now = ::time(nullptr); - timeout = (std::numeric_limits::max)(); + std::lock_guard l(handlers_mutex_); + fd_sets.reset(); + const time_t now = ::time(nullptr); + timeout = (std::numeric_limits::max)(); - // set interrupter fd - fd_sets.read_set().set(interrupter_.socket_id()); + // set interrupter fd + fd_sets.read_set().set(interrupter_.socket_id()); - for (const auto &h : handlers_) { - if (h.first == nullptr) { - continue; + for (const auto & [hndlr, entry] : handlers_) { + if (hndlr == nullptr) { + continue; + } + if (hndlr->is_ready_read() && is_event_type_set(entry->events, event_type::READ_MASK)) { + fd_sets.read_set().set(hndlr->handle()); + } + if (hndlr->is_ready_write() && is_event_type_set(entry->events, event_type::WRITE_MASK)) { + fd_sets.write_set().set(hndlr->handle()); + } + if (hndlr->next_timeout() > 0 && is_event_type_set(entry->events, event_type::TIMEOUT_MASK)) { + timeout = (std::min)(timeout, hndlr->next_timeout() <= now ? 0 : (hndlr->next_timeout() - now)); + } } - if (h.first->is_ready_read() && is_event_type_set(h.second, event_type::READ_MASK)) { - fd_sets.read_set().set(h.first->handle()); - } - if (h.first->is_ready_write() && is_event_type_set(h.second, event_type::WRITE_MASK)) { - fd_sets.write_set().set(h.first->handle()); - } - if (h.first->next_timeout() > 0 && is_event_type_set(h.second, event_type::TIMEOUT_MASK)) { - timeout = (std::min)(timeout, h.first->next_timeout() <= now ? 0 : (h.first->next_timeout() - now)); - } - } } -void reactor::remove_deleted() -{ - while (!handlers_to_delete_.empty()) { - auto h = handlers_to_delete_.front(); - handlers_to_delete_.pop_front(); - auto fi = std::find_if(handlers_.begin(), handlers_.end(), [&h](const t_handler_type &ht) { - return ht.first.get() == h.get(); - }); - - if (fi != handlers_.end()) { - log_.debug("removing handler %d", fi->first->handle()); - handlers_.erase(fi); - } - } -} - -void reactor::cleanup() -{ - while (!handlers_.empty()) { - auto hndlr = handlers_.front(); - handlers_.pop_front(); - hndlr.first->close(); - } -} - -int reactor::select(struct timeval *timeout, select_fdsets& fd_sets) -{ - log_.debug("calling select; waiting for io events"); - return ::select( - static_cast(fd_sets.maxp1()) + 1, - fd_sets.read_set().get(), - fd_sets.write_set().get(), - fd_sets.except_set().get(), - timeout - ); -} - - -//void reactor::process_handler(int /*ret*/) -//{ -// handlers_.emplace_back(sentinel_, event_type::NONE_MASK); -// time_t now = ::time(nullptr); -// while (handlers_.front().first != nullptr) { -// auto h = handlers_.front(); -// handlers_.pop_front(); -// handlers_.push_back(h); -// // check for read/accept -// if (h.first->handle() > 0 && fdsets_.write_set().is_set(h.first->handle())) { -// on_write_mask(h.first); -// } -// if (h.first->handle() > 0 && fdsets_.read_set().is_set(h.first->handle())) { -// on_read_mask(h.first); -// } -// if (h.first->next_timeout() > 0 && h.first->next_timeout() <= now) { -// on_timeout(h.first, now); -// } -// } -// handlers_.pop_front(); -//} - reactor::t_handler_type reactor::resolve_next_handler(time_t now, select_fdsets& fd_sets) { - std::lock_guard l(mutex_); - for (auto &h : handlers_) { - if (h.first->handle() > 0 && fd_sets.write_set().is_set(h.first->handle())) { - return std::make_pair(h.first, event_type::WRITE_MASK); + std::lock_guard l(handlers_mutex_); + for (auto & [hndlr, entry] : handlers_) { + if (hndlr->handle() > 0 && fd_sets.write_set().is_set(hndlr->handle())) { + return std::make_pair(hndlr, event_type::WRITE_MASK); } - if (h.first->handle() > 0 && fd_sets.read_set().is_set(h.first->handle())) { - return std::make_pair(h.first, event_type::READ_MASK); + if (hndlr->handle() > 0 && fd_sets.read_set().is_set(hndlr->handle())) { + return std::make_pair(hndlr, event_type::READ_MASK); } - if (h.first->next_timeout() > 0 && h.first->next_timeout() <= now) { - return std::make_pair(h.first, event_type::TIMEOUT_MASK); + if (hndlr->next_timeout() > 0 && hndlr->next_timeout() <= now) { + return std::make_pair(hndlr, event_type::TIMEOUT_MASK); } } return std::make_pair(nullptr, event_type::NONE_MASK); } -void reactor::on_read_mask(const handler_ptr& handler) -{ -// log_.debug("read bit for handler %d is set; handle input", h->handle()); -// std::cout << this << " (handler " << h.get() << "): handle read\n" << std::flush; - handler->on_input(); -} - -void reactor::on_write_mask(const handler_ptr& handler) -{ -// log_.debug("write bit for handler %d is set; handle output", h->handle()); -// std::cout << this << " (handler " << h.get() << "): handle write\n" << std::flush; - handler->on_output(); -} - -void reactor::on_except_mask(const handler_ptr& /*handler*/) -{ -// std::cout << this << " (handler " << h.get() << "): handle exception\n" << std::flush; - -} - -void reactor::on_timeout(const handler_ptr &h, time_t now) -{ -// log_.debug("timeout expired for handler %d; handle timeout", h->handle()); -// std::cout << this << " (handler " << h.get() << "): handle timeout\n" << std::flush; - h->calculate_next_timeout(now); - h->on_timeout(); -} - -select_fdsets reactor::fdsets() const +select_fdsets reactor::fd_sets() const { time_t timeout; select_fdsets fd_sets; @@ -344,68 +250,24 @@ select_fdsets reactor::fdsets() const return fd_sets; } -void reactor::mark_handler_for_delete(const handler_ptr& h) -{ - std::lock_guard l(mutex_); - handlers_to_delete_.push_back(h); -} - -bool reactor::is_interrupted(select_fdsets& fd_sets) -{ - std::lock_guard l(mutex_); - if (fd_sets.read_set().is_set(interrupter_.socket_id())) { - log_.debug("interrupt byte received; resetting interrupter"); - if (shutdown_requested_) { - running_ = false; - } - return interrupter_.reset(); - } - return false; -} - -bool reactor::has_clients_to_handle(time_t timeout, select_fdsets& fd_sets) const { - std::lock_guard lock(mutex_); - return fd_sets.maxp1() > 0 || timeout != (std::numeric_limits::max)(); -} - -std::list::iterator reactor::find_handler_type(const reactor::handler_ptr &h) -{ - return std::find_if(handlers_.begin(), handlers_.end(), [&h](const t_handler_type &ht) { - return ht.first.get() == h.get(); - }); -} - void reactor::activate_handler(const reactor::handler_ptr &h, event_type ev) { - std::lock_guard l(mutex_); - auto it = find_handler_type(h); - if (it == handlers_.end()) { + std::lock_guard l(handlers_mutex_); + const auto it = find_handler_entry(h); + if (!it) { return; } - it->second |= ev; + it->events |= ev; } void reactor::deactivate_handler(const reactor::handler_ptr &h, event_type ev) { - std::lock_guard l(mutex_); - auto it = find_handler_type(h); - if (it == handlers_.end()) { + std::lock_guard l(handlers_mutex_); + const auto it = find_handler_entry(h); + if (!it) { return; } - it->second &= ~ev; + it->events &= ~ev; } -void reactor::interrupt() -{ - log_.trace("interrupting reactor"); - std::lock_guard l(mutex_); - interrupter_.interrupt(); -} - -void reactor::interrupt_without_lock() -{ - log_.trace("interrupting reactor"); - interrupter_.interrupt(); -} - -} +} // namespace matador \ No newline at end of file diff --git a/source/core/net/socket_interrupter.cpp b/source/core/net/socket_interrupter.cpp index c241abf..408352d 100644 --- a/source/core/net/socket_interrupter.cpp +++ b/source/core/net/socket_interrupter.cpp @@ -8,11 +8,11 @@ #include #endif -namespace matador { +namespace matador::net { socket_interrupter::socket_interrupter() : client_(tcp::v4()) - , log_(create_logger("SocketInterrupter")) + , log_(logger::create_logger("SocketInterrupter")) { /* * setup acceptor diff --git a/test/core/net/ReactorTest.cpp b/test/core/net/ReactorTest.cpp new file mode 100644 index 0000000..ea79bb6 --- /dev/null +++ b/test/core/net/ReactorTest.cpp @@ -0,0 +1,266 @@ +#include +#include +#include + +#include "matador/net/reactor.hpp" +#include "matador/net/handler.hpp" + +#include +#include +#include + +namespace { + +class MockHandler : public matador::net::handler { +public: + explicit MockHandler(int fd = 1) : fd_(fd) { + reset_counters(); + } + + void reset_counters() { + read_count = 0; + write_count = 0; + timeout_count = 0; + close_count = 0; + } + + socket_type handle() const override { return fd_; } + bool is_ready_read() const override { return ready_read_; } + bool is_ready_write() const override { return ready_write_; } + + void on_input() override { ++read_count; } + void on_output() override { ++write_count; } + void on_timeout() override { ++timeout_count; } + void close() override { ++close_count; } + + void set_ready_states(bool read, bool write) { + ready_read_ = read; + ready_write_ = write; + } + + static std::atomic read_count; + static std::atomic write_count; + static std::atomic timeout_count; + static std::atomic close_count; + +private: + int fd_; + bool ready_read_{false}; + bool ready_write_{false}; +}; + +std::atomic MockHandler::read_count{0}; +std::atomic MockHandler::write_count{0}; +std::atomic MockHandler::timeout_count{0}; +std::atomic MockHandler::close_count{0}; + +class ReactorTestFixture { +protected: + void SetUp() { + MockHandler::read_count = 0; + MockHandler::write_count = 0; + MockHandler::timeout_count = 0; + MockHandler::close_count = 0; + } + + matador::net::reactor reactor_; +}; + +} // namespace + +SCENARIO_METHOD(ReactorTestFixture, "Reactor Handler Registration", "[reactor]") { + GIVEN("A reactor and a handler") { + auto handler = std::make_shared(); + + WHEN("Registering handler for read events") { + reactor_.register_handler(handler, matador::net::event_type::READ_MASK); + + THEN("Handler should be registered") { + auto fdsets = reactor_.fd_sets(); + REQUIRE(fdsets.read_set().is_set(handler->handle())); + REQUIRE_FALSE(fdsets.write_set().is_set(handler->handle())); + } + } + + WHEN("Registering handler for write events") { + handler->set_ready_states(false, true); + reactor_.register_handler(handler, matador::net::event_type::WRITE_MASK); + + THEN("Handler should be registered for write") { + auto fdsets = reactor_.fd_sets(); + REQUIRE(fdsets.write_set().is_set(handler->handle())); + REQUIRE_FALSE(fdsets.read_set().is_set(handler->handle())); + } + } + + WHEN("Registering null handler") { + THEN("Should throw exception") { + REQUIRE_THROWS_AS( + reactor_.register_handler(nullptr, matador::net::event_type::READ_MASK), + std::invalid_argument + ); + } + } + } +} + +SCENARIO_METHOD(ReactorTestFixture, "Reactor Handler Unregistration", "[reactor]") { + GIVEN("A reactor with registered handler") { + auto handler = std::make_shared(); + reactor_.register_handler(handler, matador::event_type::READ_MASK); + + WHEN("Unregistering the handler") { + reactor_.unregister_handler(handler, matador::event_type::READ_MASK); + + THEN("Handler should be unregistered") { + auto fdsets = reactor_.fdsets(); + REQUIRE_FALSE(fdsets.read_set().is_set(handler->handle())); + REQUIRE(MockHandler::close_count == 1); + } + } + } +} + +SCENARIO_METHOD(ReactorTestFixture, "Reactor Timer Operations", "[reactor]") { + GIVEN("A reactor and a handler") { + auto handler = std::make_shared(); + + WHEN("Scheduling a timer") { + reactor_.schedule_timer(handler, 1, 0); + + THEN("Timer should be scheduled") { + std::this_thread::sleep_for(std::chrono::milliseconds(1500)); + reactor_.handle_events(); + REQUIRE(MockHandler::timeout_count == 1); + } + } + + WHEN("Scheduling a repeating timer") { + reactor_.schedule_timer(handler, 1, 1); + + THEN("Timer should fire multiple times") { + std::this_thread::sleep_for(std::chrono::milliseconds(3500)); + reactor_.handle_events(); + REQUIRE(MockHandler::timeout_count >= 2); + } + } + + WHEN("Cancelling a timer") { + reactor_.schedule_timer(handler, 2, 0); + reactor_.cancel_timer(handler); + + THEN("Timer should not fire") { + std::this_thread::sleep_for(std::chrono::milliseconds(2500)); + reactor_.handle_events(); + REQUIRE(MockHandler::timeout_count == 0); + } + } + } +} + +SCENARIO_METHOD(ReactorTestFixture, "Reactor Event Handling", "[reactor]") { + GIVEN("A reactor with registered handlers") { + auto read_handler = std::make_shared(1); + auto write_handler = std::make_shared(2); + + read_handler->set_ready_states(true, false); + write_handler->set_ready_states(false, true); + + reactor_.register_handler(read_handler, matador::event_type::READ_MASK); + reactor_.register_handler(write_handler, matador::event_type::WRITE_MASK); + + WHEN("Handling events") { + std::thread reactor_thread([this]() { + reactor_.run(); + }); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + THEN("Events should be processed") { + REQUIRE(MockHandler::read_count > 0); + REQUIRE(MockHandler::write_count > 0); + } + + reactor_.shutdown(); + reactor_thread.join(); + } + } +} + +SCENARIO_METHOD(ReactorTestFixture, "Reactor Shutdown Behavior", "[reactor]") { + GIVEN("A running reactor") { + auto handler = std::make_shared(); + reactor_.register_handler(handler, matador::event_type::READ_MASK); + + std::thread reactor_thread([this]() { + reactor_.run(); + }); + + WHEN("Shutting down the reactor") { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + reactor_.shutdown(); + reactor_thread.join(); + + THEN("Reactor should stop cleanly") { + REQUIRE_FALSE(reactor_.is_running()); + REQUIRE(MockHandler::close_count == 1); + } + } + } +} + +SCENARIO_METHOD(ReactorTestFixture, "Reactor Stress Test", "[reactor][stress]") { + GIVEN("A reactor with multiple handlers") { + std::vector> handlers; + const int NUM_HANDLERS = 100; + + for (int i = 0; i < NUM_HANDLERS; ++i) { + auto handler = std::make_shared(i + 1); + handler->set_ready_states(true, true); + handlers.push_back(handler); + reactor_.register_handler(handler, + matador::event_type::READ_MASK | matador::event_type::WRITE_MASK); + } + + WHEN("Running under load") { + std::thread reactor_thread([this]() { + reactor_.run(); + }); + + std::this_thread::sleep_for(std::chrono::seconds(2)); + + THEN("Should handle events without issues") { + REQUIRE(MockHandler::read_count > 0); + REQUIRE(MockHandler::write_count > 0); + REQUIRE(reactor_.is_running()); + } + + reactor_.shutdown(); + reactor_thread.join(); + + THEN("Should clean up properly") { + REQUIRE(MockHandler::close_count == NUM_HANDLERS); + } + } + } +} + +SCENARIO_METHOD(ReactorTestFixture, "Reactor Error Handling", "[reactor]") { + GIVEN("A reactor with faulty handler") { + class FaultyHandler : public MockHandler { + void on_input() override { throw std::runtime_error("Simulated error"); } + }; + + auto handler = std::make_shared(); + handler->set_ready_states(true, false); + + WHEN("Handling events with error") { + reactor_.register_handler(handler, matador::event_type::READ_MASK); + + THEN("Should handle error gracefully") { + reactor_.handle_events(); + REQUIRE(reactor_.get_statistics().errors_ > 0); + } + } + } +} \ No newline at end of file