query/include/matador/net/reactor.hpp

120 lines
3.6 KiB
C++

#ifndef REACTOR_HPP
#define REACTOR_HPP
#include "matador/net/event_type.hpp"
#include "matador/net/select_fd_sets.hpp"
#include "matador/net/socket_interrupter.hpp"
#include "matador/utils/leader_follower_thread_pool.hpp"
#include <unordered_map>
#include <memory>
#include <atomic>
#include <mutex>
#include <chrono>
#include <queue>
#include <list>
#include <shared_mutex>
namespace matador::net {
class handler;
class reactor {
public:
using handler_ptr = std::shared_ptr<handler>;
using handler_weak_ptr = std::weak_ptr<handler>;
struct Statistics {
std::atomic<size_t> total_events_handled_{0};
std::atomic<size_t> read_events_{0};
std::atomic<size_t> write_events_{0};
std::atomic<size_t> timer_events_{0};
std::atomic<size_t> errors_{0};
};
struct HandlerEntry {
explicit HandlerEntry(handler_ptr h, event_type et)
: handler(std::move(h)), events(et) {}
handler_ptr handler;
event_type events;
time_t next_timeout{0};
time_t interval{0};
bool marked_for_deletion{false};
};
reactor();
~reactor();
reactor(const reactor&) = delete;
reactor& operator=(const reactor&) = delete;
void register_handler(const handler_ptr& h, event_type type);
void unregister_handler(const handler_ptr& h, event_type type);
void schedule_timer(const handler_ptr& h, time_t offset, time_t interval);
void cancel_timer(const handler_ptr& h);
void run();
void handle_events();
void shutdown();
[[nodiscard]] bool is_running() const { return running_; }
select_fdsets fd_sets() const;
void mark_handler_for_delete(const handler_ptr& h);
void activate_handler(const handler_ptr& h, event_type ev);
void deactivate_handler(const handler_ptr& h, event_type ev);
const Statistics& get_statistics() const { return stats_; }
private:
struct TimerEvent {
time_t timeout{};
handler_weak_ptr handler;
bool operator>(const TimerEvent& other) const {
return timeout > other.timeout;
}
};
using handler_map = std::unordered_map<handler*, std::unique_ptr<HandlerEntry>>;
using timer_queue = std::priority_queue<TimerEvent, std::vector<TimerEvent>, std::greater<>>;
void prepare_select_bits(time_t& timeout, select_fdsets& fd_sets) const;
bool process_events(const select_fdsets& fd_sets, time_t now);
void process_timers(time_t now);
void cleanup_deleted_handlers();
void handle_error(const std::string& operation, const handler_ptr& h);
int perform_select(struct timeval* timeout, select_fdsets& fd_sets);
bool check_interruption(const select_fdsets& fd_sets);
void safe_handler_operation(const handler_ptr& h, const std::string& op_name,
const std::function<void()>& operation);
HandlerEntry* find_handler_entry(const handler_ptr& h);
void remove_handler_entry(const handler_ptr& h);
static void update_handler_events(HandlerEntry* entry, event_type ev, bool activate);
private:
handler_map handlers_;
timer_queue timers_;
std::atomic<bool> running_{false};
std::atomic<bool> shutdown_requested_{false};
mutable std::shared_mutex handlers_mutex_;
std::mutex timers_mutex_;
std::condition_variable_any shutdown_cv_;
// std::condition_variable shutdown_cv_;
logger::logger log_;
Statistics stats_;
utils::leader_follower_thread_pool thread_pool_;
socket_interrupter interrupter_;
static constexpr std::chrono::seconds CLEANUP_INTERVAL{60};
time_t last_cleanup_{0};
};
} // namespace matador
#endif //REACTOR_HPP