started reactor refactoring

This commit is contained in:
Sascha Kühl 2025-07-18 15:33:04 +02:00
parent 3781bd0b66
commit f32594d9fd
11 changed files with 544 additions and 434 deletions

View File

@ -16,24 +16,24 @@ namespace matador::net {
*/ */
class tcp { class tcp {
public: public:
typedef peer_base<tcp> peer; /**< Shortcut to a tcp based peer */ typedef peer_base<tcp> peer; /**< Shortcut to a tcp-based peer */
typedef socket_stream<tcp> socket; /**< Shortcut to a tcp based socket */ typedef socket_stream<tcp> socket; /**< Shortcut to a tcp-based socket */
typedef socket_acceptor<tcp> acceptor; /**< Shortcut to a tcp based acceptor */ typedef socket_acceptor<tcp> acceptor; /**< Shortcut to a tcp-based acceptor */
typedef address_resolver<tcp> resolver; /**< Shortcut to a tcp based address resolver */ typedef address_resolver<tcp> resolver; /**< Shortcut to a tcp-based address resolver */
/** /**
* Returns the type of the socket * Returns the type of the socket
* *
* @return Type of the socket * @return Type of the socket
*/ */
int type() const { return SOCK_STREAM; } static int type() { return SOCK_STREAM; }
/** /**
* Returns the socket protocol * Returns the socket protocol
* *
* @return Socket protocol * @return Socket protocol
*/ */
int protocol() const { return IPPROTO_TCP; } static int protocol() { return IPPROTO_TCP; }
/** /**
* Returns the socket family * Returns the socket family
@ -59,7 +59,7 @@ public:
static tcp v6() { return tcp(PF_INET6); } static tcp v6() { return tcp(PF_INET6); }
private: private:
explicit tcp(int family) explicit tcp(const int family)
: family_(family) : family_(family)
{} {}
@ -74,24 +74,24 @@ private:
class OOS_NET_API udp class OOS_NET_API udp
{ {
public: public:
typedef peer_base<udp> peer; /**< Shortcut to a udp based peer */ typedef peer_base<udp> peer; /**< Shortcut to an udp-based peer */
typedef socket_stream<udp> socket; /**< Shortcut to a udp based socket */ typedef socket_stream<udp> socket; /**< Shortcut to an udp-based socket */
typedef socket_acceptor<udp> acceptor; /**< Shortcut to a udp based acceptor */ typedef socket_acceptor<udp> acceptor; /**< Shortcut to an udp-based acceptor */
typedef address_resolver<udp> resolver; /**< Shortcut to a udp based address resolver */ typedef address_resolver<udp> resolver; /**< Shortcut to an udp-based address resolver */
/** /**
* Returns the type of the socket * Returns the type of the socket
* *
* @return Type of the socket * @return Type of the socket
*/ */
int type() const { return SOCK_DGRAM; } static int type() { return SOCK_DGRAM; }
/** /**
* Returns the socket protocol * Returns the socket protocol
* *
* @return Socket protocol * @return Socket protocol
*/ */
int protocol() const { return IPPROTO_UDP; } static int protocol() { return IPPROTO_UDP; }
/** /**
* Returns the socket family * Returns the socket family
@ -117,7 +117,7 @@ public:
static udp v6() { return udp(PF_INET6); } static udp v6() { return udp(PF_INET6); }
private: private:
explicit udp(int family) explicit udp(const int family)
: family_(family) : family_(family)
{} {}

View File

@ -11,7 +11,7 @@
#include <cstring> #include <cstring>
namespace matador { namespace matador::net {
/** /**
* The peer_base class acts like the holder * The peer_base class acts like the holder
@ -25,7 +25,7 @@ template < class P >
class peer_base class peer_base
{ {
public: public:
typedef P protocol_type; /**< Short to protocol type */ typedef P protocol_type; /**< Short to a protocol-type */
/** /**
* Default constructor * Default constructor
@ -48,9 +48,8 @@ public:
* @param addr Address to create the peer from * @param addr Address to create the peer from
* @param port Port of the endpoint * @param port Port of the endpoint
*/ */
peer_base(address addr, unsigned short port) peer_base(address addr, const unsigned short port)
: addr_(std::move(addr)) : addr_(std::move(addr)) {
{
addr_.port(port); addr_.port(port);
} }
@ -60,8 +59,7 @@ public:
* @param x Peer to copy from * @param x Peer to copy from
*/ */
peer_base(const peer_base &x) peer_base(const peer_base &x)
: addr_(x.addr_) : addr_(x.addr_) {}
{}
/** /**
* Move creates a peer from a given peer * Move creates a peer from a given peer
@ -69,8 +67,7 @@ public:
* @param x Peer to move from * @param x Peer to move from
*/ */
peer_base(peer_base &&x) noexcept peer_base(peer_base &&x) noexcept
: addr_(std::move(x.addr_)) : addr_(std::move(x.addr_)) {}
{}
/** /**
* Copy assigns a given peer to this peer * Copy assigns a given peer to this peer
@ -78,8 +75,7 @@ public:
* @param x Peer to assign * @param x Peer to assign
* @return The assigned peer * @return The assigned peer
*/ */
peer_base& operator=(const peer_base &x) peer_base& operator=(const peer_base &x) {
{
addr_ = x.addr_; addr_ = x.addr_;
return *this; return *this;
} }
@ -90,8 +86,7 @@ public:
* @param x The peer to move assign * @param x The peer to move assign
* @return The moved peer * @return The moved peer
*/ */
peer_base& operator=(peer_base &&x) noexcept peer_base& operator=(peer_base &&x) noexcept {
{
addr_ = std::move(x.addr_); addr_ = std::move(x.addr_);
return *this; return *this;
} }
@ -114,13 +109,8 @@ public:
* *
* @return The current IP protocol * @return The current IP protocol
*/ */
protocol_type protocol() const protocol_type protocol() const {
{ return addr_.is_v4() ? protocol_type::v4() : protocol_type::v6();
if (addr_.is_v4()) {
return protocol_type::v4();
} else {
return protocol_type::v6();
}
} }
/** /**
@ -128,8 +118,7 @@ public:
* *
* @return The raw pointer to the sockaddr structure * @return The raw pointer to the sockaddr structure
*/ */
sockaddr* data() sockaddr* data() {
{
return addr_.addr(); return addr_.addr();
} }
@ -138,8 +127,7 @@ public:
* *
* @return The raw pointer to the sockaddr structure * @return The raw pointer to the sockaddr structure
*/ */
const sockaddr* data() const const sockaddr* data() const {
{
return addr_.addr(); return addr_.addr();
} }
@ -148,8 +136,7 @@ public:
* *
* @return The size of the underlying sockaddr structure * @return The size of the underlying sockaddr structure
*/ */
size_t size() const size_t size() const {
{
return addr_.size(); return addr_.size();
} }
@ -158,8 +145,7 @@ public:
* *
* @return A reference to the address * @return A reference to the address
*/ */
address& addr() address& addr() {
{
return addr_; return addr_;
} }
@ -168,8 +154,7 @@ public:
* *
* @return A reference to the address * @return A reference to the address
*/ */
const address& addr() const const address& addr() const {
{
return addr_; return addr_;
} }
@ -179,20 +164,19 @@ public:
* *
* @return Returns a string representation of the peer * @return Returns a string representation of the peer
*/ */
std::string to_string() const std::string to_string() const {
{ char address_str[INET6_ADDRSTRLEN + 8];
char addstr[INET6_ADDRSTRLEN + 8];
const char *name; const char *name;
if (addr().is_v4()) { if (addr().is_v4()) {
name = os::inet_ntop(addr_.addr()->sa_family, &addr_.addr_v4()->sin_addr, addstr, INET6_ADDRSTRLEN); name = os::inet_ntop(addr_.addr()->sa_family, &addr_.addr_v4()->sin_addr, address_str, INET6_ADDRSTRLEN);
} else { } else {
name = os::inet_ntop(addr_.addr()->sa_family, &addr_.addr_v6()->sin6_addr, addstr, INET6_ADDRSTRLEN); name = os::inet_ntop(addr_.addr()->sa_family, &addr_.addr_v6()->sin6_addr, address_str, INET6_ADDRSTRLEN);
} }
size_t pos = strlen(name); const size_t pos = strlen(name);
snprintf(addstr+pos, INET6_ADDRSTRLEN+8-pos, ":%d", addr_.port()); snprintf(address_str + pos, INET6_ADDRSTRLEN+8-pos, ":%d", addr_.port());
return addstr; return address_str;
} }
private: private:

View File

@ -14,6 +14,7 @@
#include <chrono> #include <chrono>
#include <queue> #include <queue>
#include <list> #include <list>
#include <shared_mutex>
namespace matador::net { namespace matador::net {
@ -56,8 +57,8 @@ public:
void run(); void run();
void handle_events(); void handle_events();
void shutdown(); void shutdown();
bool is_running() const { return running_; } [[nodiscard]] bool is_running() const { return running_; }
select_fdsets fdsets() const; select_fdsets fd_sets() const;
void mark_handler_for_delete(const handler_ptr& h); void mark_handler_for_delete(const handler_ptr& h);
void activate_handler(const handler_ptr& h, event_type ev); void activate_handler(const handler_ptr& h, event_type ev);
void deactivate_handler(const handler_ptr& h, event_type ev); void deactivate_handler(const handler_ptr& h, event_type ev);
@ -65,7 +66,7 @@ public:
private: private:
struct TimerEvent { struct TimerEvent {
time_t timeout; time_t timeout{};
handler_weak_ptr handler; handler_weak_ptr handler;
bool operator>(const TimerEvent& other) const { bool operator>(const TimerEvent& other) const {
@ -89,7 +90,7 @@ private:
HandlerEntry* find_handler_entry(const handler_ptr& h); HandlerEntry* find_handler_entry(const handler_ptr& h);
void remove_handler_entry(const handler_ptr& h); void remove_handler_entry(const handler_ptr& h);
void update_handler_events(HandlerEntry* entry, event_type ev, bool activate); static void update_handler_events(HandlerEntry* entry, event_type ev, bool activate);
private: private:
handler_map handlers_; handler_map handlers_;
@ -100,12 +101,13 @@ private:
mutable std::shared_mutex handlers_mutex_; mutable std::shared_mutex handlers_mutex_;
std::mutex timers_mutex_; std::mutex timers_mutex_;
std::condition_variable shutdown_cv_; std::condition_variable_any shutdown_cv_;
// std::condition_variable shutdown_cv_;
logger log_; logger::logger log_;
Statistics stats_; Statistics stats_;
leader_follower_thread_pool thread_pool_; utils::leader_follower_thread_pool thread_pool_;
socket_interrupter interrupter_; socket_interrupter interrupter_;
static constexpr std::chrono::seconds CLEANUP_INTERVAL{60}; static constexpr std::chrono::seconds CLEANUP_INTERVAL{60};

View File

@ -3,7 +3,7 @@
#include "matador/net/fdset.hpp" #include "matador/net/fdset.hpp"
namespace matador { namespace matador::net {
/** /**
* This class represents three fd sets * This class represents three fd sets

View File

@ -25,8 +25,7 @@ namespace matador {
* @tparam P Protocol type * @tparam P Protocol type
*/ */
template < class P > template < class P >
class socket_base class socket_base {
{
public: public:
typedef P protocol_type; /**< Shortcut to the protocol type */ typedef P protocol_type; /**< Shortcut to the protocol type */
typedef typename P::peer peer_type; /**< Shortcut to the peer type */ typedef typename P::peer peer_type; /**< Shortcut to the peer type */
@ -42,7 +41,7 @@ public:
/** /**
* Creates a socket with the given peer * Creates a socket with the given peer
* *
* @param peer Peer used to initialize the socket * @param peer Peer used to initialise the socket
*/ */
explicit socket_base(const peer_type &peer); explicit socket_base(const peer_type &peer);

View File

@ -207,7 +207,7 @@ socket_type connect(socket_stream<P> &stream, const char* hostname, unsigned sho
do { do {
conn_fd = ::socket(res->ai_family, res->ai_socktype, res->ai_protocol); conn_fd = ::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (!is_valid_socket(conn_fd)) { if (!is_valid_socket(conn_fd)) {
// error, try next one // error, try the next one
continue; continue;
} }
@ -220,7 +220,7 @@ socket_type connect(socket_stream<P> &stream, const char* hostname, unsigned sho
// throw_logic_error("couldn't execute: " << strerror(errno)); // throw_logic_error("couldn't execute: " << strerror(errno));
} }
// bind error, close and try next one // bind error, close and try the next one
os::shutdown(conn_fd, os::shutdown_type::READ_WRITE); os::shutdown(conn_fd, os::shutdown_type::READ_WRITE);
} while ( (res = res->ai_next) != nullptr); } while ( (res = res->ai_next) != nullptr);

View File

@ -3,7 +3,7 @@
#include "matador/net/ip.hpp" #include "matador/net/ip.hpp"
// #include "matador/logger/logger.hpp" #include "matador/logger/logger.hpp"
#include <array> #include <array>
@ -23,10 +23,10 @@ public:
bool reset(); bool reset();
private: private:
matador::tcp::socket server_; tcp::socket server_;
matador::tcp::socket client_; tcp::socket client_;
// matador::logger log_; logger::logger log_;
std::array<char, 1> indicator_ = { { 0 } }; std::array<char, 1> indicator_ = { { 0 } };
std::array<char, 1> consumer_ = {}; std::array<char, 1> consumer_ = {};

View File

@ -14,9 +14,6 @@ void init()
{ {
#ifdef _WIN32 #ifdef _WIN32
WSADATA wsaData; // if this doesn't work WSADATA wsaData; // if this doesn't work
//WSAData wsaData; // then try this instead
// MAKEWORD(1,1) for Winsock 1.1, MAKEWORD(2,0) for Winsock 2.0:
if (WSAStartup(MAKEWORD(1,1), &wsaData) != 0) { if (WSAStartup(MAKEWORD(1,1), &wsaData) != 0) {
fprintf(stderr, "WSAStartup failed.\n"); fprintf(stderr, "WSAStartup failed.\n");

View File

@ -1,342 +1,248 @@
#include "matador/net/reactor.hpp" #include "matador/net/reactor.hpp"
#include "matador/net/handler.hpp" #include "matador/net/handler.hpp"
#include "matador/logger/log_manager.hpp" #include "matador/logger/log_manager.hpp"
#include <algorithm> #include <system_error>
#include <limits>
#include <cerrno>
#include <ctime>
#include <iostream>
namespace matador { namespace matador::net {
reactor::reactor() reactor::reactor()
: sentinel_(std::shared_ptr<handler>(nullptr)) : log_(logger::create_logger("Reactor"))
, log_(create_logger("Reactor")) , thread_pool_(4, [this]() { handle_events(); }) {
, thread_pool_(4, [this]() { handle_events(); })
{
}
reactor::~reactor()
{
log_.debug("destroying reactor");
thread_pool_.shutdown();
} }
void reactor::register_handler(const handler_ptr& h, event_type et) reactor::~reactor() {
{ if (is_running()) {
try {
shutdown();
} catch (const std::exception& e) {
log_.error("Error during shutdown: %s", e.what());
}
}
}
void reactor::register_handler(const handler_ptr& h, event_type type) {
if (!h) {
throw std::invalid_argument("Null handler");
}
std::unique_lock lock(handlers_mutex_);
auto* raw_ptr = h.get();
if (handlers_.find(raw_ptr) != handlers_.end()) {
throw std::runtime_error("Handler already registered");
}
safe_handler_operation(h, "register", [&] {
h->register_reactor(this); h->register_reactor(this);
h->open(); h->open();
handlers_.emplace(raw_ptr, std::make_unique<HandlerEntry>(h, type));
});
std::lock_guard<std::mutex> l(mutex_); interrupter_.interrupt();
auto it = find_handler_type(h);
if (it == handlers_.end()) {
handlers_.emplace_back(h, et);
} else if (it->first != h) {
throw std::logic_error("given handler isn't expected handler");
} else {
it->second = it->second | et;
}
interrupt_without_lock();
} }
void reactor::unregister_handler(const handler_ptr& h, event_type) void reactor::unregister_handler(const handler_ptr& h, event_type type) {
{ if (!h) return;
std::lock_guard<std::mutex> l(mutex_);
auto it = find_handler_type(h);
if (it != handlers_.end()) { std::unique_lock lock(handlers_mutex_);
(*it).first->close();
handlers_.erase(it); if (find_handler_entry(h)) {
} safe_handler_operation(h, "unregister", [&] {
interrupt_without_lock(); h->close();
remove_handler_entry(h);
});
} }
void reactor::schedule_timer(const std::shared_ptr<handler>& h, time_t offset, time_t interval) interrupter_.interrupt();
{
h->register_reactor(this);
std::lock_guard<std::mutex> l(mutex_);
auto it = find_handler_type(h);
if (it == handlers_.end()) {
handlers_.emplace_back(h, event_type::TIMEOUT_MASK);
} }
h->schedule(offset, interval); void reactor::schedule_timer(const handler_ptr& h, time_t offset, time_t interval) {
if (!h) return;
std::unique_lock lock(handlers_mutex_);
auto* entry = find_handler_entry(h);
if (!entry) {
entry = handlers_.emplace(h.get(),
std::make_unique<HandlerEntry>(h, event_type::TIMEOUT_MASK)).first->second.get();
} }
void reactor::cancel_timer(const std::shared_ptr<handler>& h) std::lock_guard<std::mutex> timer_lock(timers_mutex_);
{ time_t now = std::time(nullptr);
std::lock_guard<std::mutex> l(mutex_); entry->next_timeout = now + offset;
auto it = find_handler_type(h); entry->interval = interval;
if (it != handlers_.end()) { timers_.push(TimerEvent{entry->next_timeout, h});
handlers_.erase(it);
interrupter_.interrupt();
} }
h->cancel_timer(); void reactor::cancel_timer(const handler_ptr& h) {
if (!h) return;
std::unique_lock lock(handlers_mutex_);
if (auto* entry = find_handler_entry(h)) {
entry->next_timeout = 0;
entry->interval = 0;
}
} }
void reactor::run() void reactor::run() {
{ running_ = true;
// log_.info("start dispatching all clients");
thread_pool_.start(); thread_pool_.start();
{ {
// log_.info("waiting for reactor shutdown"); std::unique_lock lock(handlers_mutex_);
std::unique_lock<std::mutex> l(mutex_); shutdown_cv_.wait(lock, [this] { return shutdown_requested_.load(); });
shutdown_.wait(l, [this]() { return shutdown_requested_.load(); }); cleanup_deleted_handlers();
cleanup();
} }
// log_.info("all clients dispatched; shutting down");
thread_pool_.stop(); thread_pool_.stop();
running_ = false;
} }
void reactor::handle_events() void reactor::handle_events() {
{
// std::cout << this << " start handle events\n" << std::flush;
// log_.info("handle events");
running_ = true;
time_t timeout; time_t timeout;
select_fdsets fd_sets; select_fdsets fd_sets;
prepare_select_bits(timeout, fd_sets); prepare_select_bits(timeout, fd_sets);
// std::cout << this << " fd sets r: " << fd_sets.read_set().count() << ", w: " << fd_sets.write_set().count() << ", e: " << fd_sets.except_set().count() << ", max: " << fd_sets.maxp1() << "\n" << std::flush; if (!fd_sets.maxp1() && timeout == (std::numeric_limits<time_t>::max)()) {
return;
}
// log_.debug("fds [r: %d, w: %d, e: %d]", struct timeval select_timeout{};
// fdsets_.read_set().count(), struct timeval* timeout_ptr = nullptr;
// fdsets_.write_set().count(),
// fdsets_.except_set().count());
// if (timeout != (std::numeric_limits<time_t>::max)()) {
// log_.debug("next timeout in %d sec", timeout);
// }
struct timeval tselect{};
struct timeval* p = nullptr;
if (timeout < (std::numeric_limits<time_t>::max)()) { if (timeout < (std::numeric_limits<time_t>::max)()) {
tselect.tv_sec = timeout; select_timeout.tv_sec = timeout;
tselect.tv_usec = 0; select_timeout.tv_usec = 0;
p = &tselect; timeout_ptr = &select_timeout;
// std::cout << this << " next timeout in " << p->tv_sec << " seconds\n" << std::flush;
} }
if (!has_clients_to_handle(timeout, fd_sets)) { try {
// std::cout << this << " no clients to handle; returning\n" << std::flush; if (int result = perform_select(timeout_ptr, fd_sets); result < 0) {
// log_.info("no clients to handle, exiting");
return;
}
int ret;
while ((ret = select(p, fd_sets)) < 0) {
// std::cout << this << " select returned with error " << ret << "\n" << std::flush;
if (errno != EINTR) { if (errno != EINTR) {
char error_buffer[1024]; throw std::system_error(errno, std::system_category(), "select failed");
log_.warn("select failed: %s", os::strerror(errno, error_buffer, 1024)); }
shutdown();
} else {
return; return;
} }
}
// std::cout << this << " select returned with active requests " << ret << "\n" << std::flush; time_t now = std::time(nullptr);
bool interrupted = is_interrupted(fd_sets); if (check_interruption(fd_sets)) {
if (interrupted) {
if (!shutdown_requested_) {
// std::cout << this << " reactor was interrupted - promote new leader\n" << std::flush;
// log_.info("reactor was interrupted");
thread_pool_.promote_new_leader();
return;
} else {
// std::cout << this << " reactor was interrupted for shutdown\n" << std::flush;
// log_.info("shutting down");
// cleanup();
shutdown_.notify_one();
return; return;
} }
}
time_t now = ::time(nullptr); if (process_events(fd_sets, now)) {
t_handler_type handler_type = resolve_next_handler(now, fd_sets);
if (handler_type.first) {
deactivate_handler(handler_type.first, handler_type.second);
// std::cout << this << " handling client " << handler_type.first->name() << " - promoting new leader\n" << std::flush;
thread_pool_.promote_new_leader();
// log_.info("start handling event");
// handle event
if (handler_type.second == event_type::WRITE_MASK) {
on_write_mask(handler_type.first);
} else if (handler_type.second == event_type::READ_MASK) {
on_read_mask(handler_type.first);
} else if (handler_type.second == event_type::TIMEOUT_MASK) {
on_timeout(handler_type.first, now);
} else {
// log_.info("unknown event type");
}
activate_handler(handler_type.first, handler_type.second);
remove_deleted();
} else {
// no handler found
// log_.info("no handler found");
thread_pool_.promote_new_leader(); thread_pool_.promote_new_leader();
} }
process_timers(now);
if (now - last_cleanup_ >= CLEANUP_INTERVAL.count()) {
cleanup_deleted_handlers();
last_cleanup_ = now;
} }
void reactor::shutdown() } catch (const std::exception& e) {
{ log_.error("Error in handle_events: %s", e.what());
if (!is_running()) { stats_.errors_++;
return;
} }
// shutdown the reactor properly }
log_.info("shutting down reactor");
void reactor::shutdown() {
if (!running_) return;
log_.info("Initiating reactor shutdown");
shutdown_requested_ = true; shutdown_requested_ = true;
interrupt(); interrupter_.interrupt();
shutdown_cv_.notify_all();
} }
bool reactor::is_running() const void reactor::mark_handler_for_delete(const handler_ptr& h) {
{ if (!h) return;
return running_;
std::unique_lock lock(handlers_mutex_);
if (auto* entry = find_handler_entry(h)) {
entry->marked_for_deletion = true;
}
}
void reactor::safe_handler_operation(const handler_ptr& h, const std::string& op_name, const std::function<void()>& operation) {
try {
operation();
} catch (const std::exception& e) {
log_.error("Error during %s operation: %s", op_name.c_str(), e.what());
++stats_.errors_;
throw;
}
}
reactor::HandlerEntry* reactor::find_handler_entry(const handler_ptr& h) {
const auto it = handlers_.find(h.get());
return it != handlers_.end() ? it->second.get() : nullptr;
}
void reactor::remove_handler_entry(const handler_ptr& h) {
handlers_.erase(h.get());
}
void reactor::update_handler_events(HandlerEntry* entry, const event_type ev, const bool activate) {
if (!entry) return;
if (activate) {
entry->events |= ev;
} else {
entry->events &= ~ev;
}
} }
void reactor::prepare_select_bits(time_t& timeout, select_fdsets& fd_sets) const void reactor::prepare_select_bits(time_t& timeout, select_fdsets& fd_sets) const
{ {
std::lock_guard<std::mutex> l(mutex_); std::lock_guard l(handlers_mutex_);
fd_sets.reset(); fd_sets.reset();
time_t now = ::time(nullptr); const time_t now = ::time(nullptr);
timeout = (std::numeric_limits<time_t>::max)(); timeout = (std::numeric_limits<time_t>::max)();
// set interrupter fd // set interrupter fd
fd_sets.read_set().set(interrupter_.socket_id()); fd_sets.read_set().set(interrupter_.socket_id());
for (const auto &h : handlers_) { for (const auto & [hndlr, entry] : handlers_) {
if (h.first == nullptr) { if (hndlr == nullptr) {
continue; continue;
} }
if (h.first->is_ready_read() && is_event_type_set(h.second, event_type::READ_MASK)) { if (hndlr->is_ready_read() && is_event_type_set(entry->events, event_type::READ_MASK)) {
fd_sets.read_set().set(h.first->handle()); fd_sets.read_set().set(hndlr->handle());
} }
if (h.first->is_ready_write() && is_event_type_set(h.second, event_type::WRITE_MASK)) { if (hndlr->is_ready_write() && is_event_type_set(entry->events, event_type::WRITE_MASK)) {
fd_sets.write_set().set(h.first->handle()); fd_sets.write_set().set(hndlr->handle());
} }
if (h.first->next_timeout() > 0 && is_event_type_set(h.second, event_type::TIMEOUT_MASK)) { if (hndlr->next_timeout() > 0 && is_event_type_set(entry->events, event_type::TIMEOUT_MASK)) {
timeout = (std::min)(timeout, h.first->next_timeout() <= now ? 0 : (h.first->next_timeout() - now)); timeout = (std::min)(timeout, hndlr->next_timeout() <= now ? 0 : (hndlr->next_timeout() - now));
} }
} }
} }
void reactor::remove_deleted()
{
while (!handlers_to_delete_.empty()) {
auto h = handlers_to_delete_.front();
handlers_to_delete_.pop_front();
auto fi = std::find_if(handlers_.begin(), handlers_.end(), [&h](const t_handler_type &ht) {
return ht.first.get() == h.get();
});
if (fi != handlers_.end()) {
log_.debug("removing handler %d", fi->first->handle());
handlers_.erase(fi);
}
}
}
void reactor::cleanup()
{
while (!handlers_.empty()) {
auto hndlr = handlers_.front();
handlers_.pop_front();
hndlr.first->close();
}
}
int reactor::select(struct timeval *timeout, select_fdsets& fd_sets)
{
log_.debug("calling select; waiting for io events");
return ::select(
static_cast<int>(fd_sets.maxp1()) + 1,
fd_sets.read_set().get(),
fd_sets.write_set().get(),
fd_sets.except_set().get(),
timeout
);
}
//void reactor::process_handler(int /*ret*/)
//{
// handlers_.emplace_back(sentinel_, event_type::NONE_MASK);
// time_t now = ::time(nullptr);
// while (handlers_.front().first != nullptr) {
// auto h = handlers_.front();
// handlers_.pop_front();
// handlers_.push_back(h);
// // check for read/accept
// if (h.first->handle() > 0 && fdsets_.write_set().is_set(h.first->handle())) {
// on_write_mask(h.first);
// }
// if (h.first->handle() > 0 && fdsets_.read_set().is_set(h.first->handle())) {
// on_read_mask(h.first);
// }
// if (h.first->next_timeout() > 0 && h.first->next_timeout() <= now) {
// on_timeout(h.first, now);
// }
// }
// handlers_.pop_front();
//}
reactor::t_handler_type reactor::resolve_next_handler(time_t now, select_fdsets& fd_sets) reactor::t_handler_type reactor::resolve_next_handler(time_t now, select_fdsets& fd_sets)
{ {
std::lock_guard<std::mutex> l(mutex_); std::lock_guard l(handlers_mutex_);
for (auto &h : handlers_) { for (auto & [hndlr, entry] : handlers_) {
if (h.first->handle() > 0 && fd_sets.write_set().is_set(h.first->handle())) { if (hndlr->handle() > 0 && fd_sets.write_set().is_set(hndlr->handle())) {
return std::make_pair(h.first, event_type::WRITE_MASK); return std::make_pair(hndlr, event_type::WRITE_MASK);
} }
if (h.first->handle() > 0 && fd_sets.read_set().is_set(h.first->handle())) { if (hndlr->handle() > 0 && fd_sets.read_set().is_set(hndlr->handle())) {
return std::make_pair(h.first, event_type::READ_MASK); return std::make_pair(hndlr, event_type::READ_MASK);
} }
if (h.first->next_timeout() > 0 && h.first->next_timeout() <= now) { if (hndlr->next_timeout() > 0 && hndlr->next_timeout() <= now) {
return std::make_pair(h.first, event_type::TIMEOUT_MASK); return std::make_pair(hndlr, event_type::TIMEOUT_MASK);
} }
} }
return std::make_pair(nullptr, event_type::NONE_MASK); return std::make_pair(nullptr, event_type::NONE_MASK);
} }
void reactor::on_read_mask(const handler_ptr& handler) select_fdsets reactor::fd_sets() const
{
// log_.debug("read bit for handler %d is set; handle input", h->handle());
// std::cout << this << " (handler " << h.get() << "): handle read\n" << std::flush;
handler->on_input();
}
void reactor::on_write_mask(const handler_ptr& handler)
{
// log_.debug("write bit for handler %d is set; handle output", h->handle());
// std::cout << this << " (handler " << h.get() << "): handle write\n" << std::flush;
handler->on_output();
}
void reactor::on_except_mask(const handler_ptr& /*handler*/)
{
// std::cout << this << " (handler " << h.get() << "): handle exception\n" << std::flush;
}
void reactor::on_timeout(const handler_ptr &h, time_t now)
{
// log_.debug("timeout expired for handler %d; handle timeout", h->handle());
// std::cout << this << " (handler " << h.get() << "): handle timeout\n" << std::flush;
h->calculate_next_timeout(now);
h->on_timeout();
}
select_fdsets reactor::fdsets() const
{ {
time_t timeout; time_t timeout;
select_fdsets fd_sets; select_fdsets fd_sets;
@ -344,68 +250,24 @@ select_fdsets reactor::fdsets() const
return fd_sets; return fd_sets;
} }
void reactor::mark_handler_for_delete(const handler_ptr& h)
{
std::lock_guard<std::mutex> l(mutex_);
handlers_to_delete_.push_back(h);
}
bool reactor::is_interrupted(select_fdsets& fd_sets)
{
std::lock_guard<std::mutex> l(mutex_);
if (fd_sets.read_set().is_set(interrupter_.socket_id())) {
log_.debug("interrupt byte received; resetting interrupter");
if (shutdown_requested_) {
running_ = false;
}
return interrupter_.reset();
}
return false;
}
bool reactor::has_clients_to_handle(time_t timeout, select_fdsets& fd_sets) const {
std::lock_guard<std::mutex> lock(mutex_);
return fd_sets.maxp1() > 0 || timeout != (std::numeric_limits<time_t>::max)();
}
std::list<reactor::t_handler_type>::iterator reactor::find_handler_type(const reactor::handler_ptr &h)
{
return std::find_if(handlers_.begin(), handlers_.end(), [&h](const t_handler_type &ht) {
return ht.first.get() == h.get();
});
}
void reactor::activate_handler(const reactor::handler_ptr &h, event_type ev) void reactor::activate_handler(const reactor::handler_ptr &h, event_type ev)
{ {
std::lock_guard<std::mutex> l(mutex_); std::lock_guard l(handlers_mutex_);
auto it = find_handler_type(h); const auto it = find_handler_entry(h);
if (it == handlers_.end()) { if (!it) {
return; return;
} }
it->second |= ev; it->events |= ev;
} }
void reactor::deactivate_handler(const reactor::handler_ptr &h, event_type ev) void reactor::deactivate_handler(const reactor::handler_ptr &h, event_type ev)
{ {
std::lock_guard<std::mutex> l(mutex_); std::lock_guard l(handlers_mutex_);
auto it = find_handler_type(h); const auto it = find_handler_entry(h);
if (it == handlers_.end()) { if (!it) {
return; return;
} }
it->second &= ~ev; it->events &= ~ev;
} }
void reactor::interrupt() } // namespace matador
{
log_.trace("interrupting reactor");
std::lock_guard<std::mutex> l(mutex_);
interrupter_.interrupt();
}
void reactor::interrupt_without_lock()
{
log_.trace("interrupting reactor");
interrupter_.interrupt();
}
}

View File

@ -8,11 +8,11 @@
#include <netinet/tcp.h> #include <netinet/tcp.h>
#endif #endif
namespace matador { namespace matador::net {
socket_interrupter::socket_interrupter() socket_interrupter::socket_interrupter()
: client_(tcp::v4()) : client_(tcp::v4())
, log_(create_logger("SocketInterrupter")) , log_(logger::create_logger("SocketInterrupter"))
{ {
/* /*
* setup acceptor * setup acceptor

View File

@ -0,0 +1,266 @@
#include <catch2/catch_test_macros.hpp>
#include <catch2/catch_approx.hpp>
#include <catch2/matchers/catch_matchers_all.hpp>
#include "matador/net/reactor.hpp"
#include "matador/net/handler.hpp"
#include <chrono>
#include <thread>
#include <atomic>
namespace {
class MockHandler : public matador::net::handler {
public:
explicit MockHandler(int fd = 1) : fd_(fd) {
reset_counters();
}
void reset_counters() {
read_count = 0;
write_count = 0;
timeout_count = 0;
close_count = 0;
}
socket_type handle() const override { return fd_; }
bool is_ready_read() const override { return ready_read_; }
bool is_ready_write() const override { return ready_write_; }
void on_input() override { ++read_count; }
void on_output() override { ++write_count; }
void on_timeout() override { ++timeout_count; }
void close() override { ++close_count; }
void set_ready_states(bool read, bool write) {
ready_read_ = read;
ready_write_ = write;
}
static std::atomic<int> read_count;
static std::atomic<int> write_count;
static std::atomic<int> timeout_count;
static std::atomic<int> close_count;
private:
int fd_;
bool ready_read_{false};
bool ready_write_{false};
};
std::atomic<int> MockHandler::read_count{0};
std::atomic<int> MockHandler::write_count{0};
std::atomic<int> MockHandler::timeout_count{0};
std::atomic<int> MockHandler::close_count{0};
class ReactorTestFixture {
protected:
void SetUp() {
MockHandler::read_count = 0;
MockHandler::write_count = 0;
MockHandler::timeout_count = 0;
MockHandler::close_count = 0;
}
matador::net::reactor reactor_;
};
} // namespace
SCENARIO_METHOD(ReactorTestFixture, "Reactor Handler Registration", "[reactor]") {
GIVEN("A reactor and a handler") {
auto handler = std::make_shared<MockHandler>();
WHEN("Registering handler for read events") {
reactor_.register_handler(handler, matador::net::event_type::READ_MASK);
THEN("Handler should be registered") {
auto fdsets = reactor_.fd_sets();
REQUIRE(fdsets.read_set().is_set(handler->handle()));
REQUIRE_FALSE(fdsets.write_set().is_set(handler->handle()));
}
}
WHEN("Registering handler for write events") {
handler->set_ready_states(false, true);
reactor_.register_handler(handler, matador::net::event_type::WRITE_MASK);
THEN("Handler should be registered for write") {
auto fdsets = reactor_.fd_sets();
REQUIRE(fdsets.write_set().is_set(handler->handle()));
REQUIRE_FALSE(fdsets.read_set().is_set(handler->handle()));
}
}
WHEN("Registering null handler") {
THEN("Should throw exception") {
REQUIRE_THROWS_AS(
reactor_.register_handler(nullptr, matador::net::event_type::READ_MASK),
std::invalid_argument
);
}
}
}
}
SCENARIO_METHOD(ReactorTestFixture, "Reactor Handler Unregistration", "[reactor]") {
GIVEN("A reactor with registered handler") {
auto handler = std::make_shared<MockHandler>();
reactor_.register_handler(handler, matador::event_type::READ_MASK);
WHEN("Unregistering the handler") {
reactor_.unregister_handler(handler, matador::event_type::READ_MASK);
THEN("Handler should be unregistered") {
auto fdsets = reactor_.fdsets();
REQUIRE_FALSE(fdsets.read_set().is_set(handler->handle()));
REQUIRE(MockHandler::close_count == 1);
}
}
}
}
SCENARIO_METHOD(ReactorTestFixture, "Reactor Timer Operations", "[reactor]") {
GIVEN("A reactor and a handler") {
auto handler = std::make_shared<MockHandler>();
WHEN("Scheduling a timer") {
reactor_.schedule_timer(handler, 1, 0);
THEN("Timer should be scheduled") {
std::this_thread::sleep_for(std::chrono::milliseconds(1500));
reactor_.handle_events();
REQUIRE(MockHandler::timeout_count == 1);
}
}
WHEN("Scheduling a repeating timer") {
reactor_.schedule_timer(handler, 1, 1);
THEN("Timer should fire multiple times") {
std::this_thread::sleep_for(std::chrono::milliseconds(3500));
reactor_.handle_events();
REQUIRE(MockHandler::timeout_count >= 2);
}
}
WHEN("Cancelling a timer") {
reactor_.schedule_timer(handler, 2, 0);
reactor_.cancel_timer(handler);
THEN("Timer should not fire") {
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
reactor_.handle_events();
REQUIRE(MockHandler::timeout_count == 0);
}
}
}
}
SCENARIO_METHOD(ReactorTestFixture, "Reactor Event Handling", "[reactor]") {
GIVEN("A reactor with registered handlers") {
auto read_handler = std::make_shared<MockHandler>(1);
auto write_handler = std::make_shared<MockHandler>(2);
read_handler->set_ready_states(true, false);
write_handler->set_ready_states(false, true);
reactor_.register_handler(read_handler, matador::event_type::READ_MASK);
reactor_.register_handler(write_handler, matador::event_type::WRITE_MASK);
WHEN("Handling events") {
std::thread reactor_thread([this]() {
reactor_.run();
});
std::this_thread::sleep_for(std::chrono::milliseconds(100));
THEN("Events should be processed") {
REQUIRE(MockHandler::read_count > 0);
REQUIRE(MockHandler::write_count > 0);
}
reactor_.shutdown();
reactor_thread.join();
}
}
}
SCENARIO_METHOD(ReactorTestFixture, "Reactor Shutdown Behavior", "[reactor]") {
GIVEN("A running reactor") {
auto handler = std::make_shared<MockHandler>();
reactor_.register_handler(handler, matador::event_type::READ_MASK);
std::thread reactor_thread([this]() {
reactor_.run();
});
WHEN("Shutting down the reactor") {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
reactor_.shutdown();
reactor_thread.join();
THEN("Reactor should stop cleanly") {
REQUIRE_FALSE(reactor_.is_running());
REQUIRE(MockHandler::close_count == 1);
}
}
}
}
SCENARIO_METHOD(ReactorTestFixture, "Reactor Stress Test", "[reactor][stress]") {
GIVEN("A reactor with multiple handlers") {
std::vector<std::shared_ptr<MockHandler>> handlers;
const int NUM_HANDLERS = 100;
for (int i = 0; i < NUM_HANDLERS; ++i) {
auto handler = std::make_shared<MockHandler>(i + 1);
handler->set_ready_states(true, true);
handlers.push_back(handler);
reactor_.register_handler(handler,
matador::event_type::READ_MASK | matador::event_type::WRITE_MASK);
}
WHEN("Running under load") {
std::thread reactor_thread([this]() {
reactor_.run();
});
std::this_thread::sleep_for(std::chrono::seconds(2));
THEN("Should handle events without issues") {
REQUIRE(MockHandler::read_count > 0);
REQUIRE(MockHandler::write_count > 0);
REQUIRE(reactor_.is_running());
}
reactor_.shutdown();
reactor_thread.join();
THEN("Should clean up properly") {
REQUIRE(MockHandler::close_count == NUM_HANDLERS);
}
}
}
}
SCENARIO_METHOD(ReactorTestFixture, "Reactor Error Handling", "[reactor]") {
GIVEN("A reactor with faulty handler") {
class FaultyHandler : public MockHandler {
void on_input() override { throw std::runtime_error("Simulated error"); }
};
auto handler = std::make_shared<FaultyHandler>();
handler->set_ready_states(true, false);
WHEN("Handling events with error") {
reactor_.register_handler(handler, matador::event_type::READ_MASK);
THEN("Should handle error gracefully") {
reactor_.handle_events();
REQUIRE(reactor_.get_statistics().errors_ > 0);
}
}
}
}