#ifndef REACTOR_HPP #define REACTOR_HPP #include "matador/net/event_type.hpp" #include "matador/net/select_fd_sets.hpp" #include "matador/net/socket_interrupter.hpp" #include "matador/utils/leader_follower_thread_pool.hpp" #include #include #include #include #include #include #include namespace matador::net { class handler; class reactor { public: using handler_ptr = std::shared_ptr; using handler_weak_ptr = std::weak_ptr; struct Statistics { std::atomic total_events_handled_{0}; std::atomic read_events_{0}; std::atomic write_events_{0}; std::atomic timer_events_{0}; std::atomic errors_{0}; }; struct HandlerEntry { explicit HandlerEntry(handler_ptr h, event_type et) : handler(std::move(h)), events(et) {} handler_ptr handler; event_type events; time_t next_timeout{0}; time_t interval{0}; bool marked_for_deletion{false}; }; reactor(); ~reactor(); reactor(const reactor&) = delete; reactor& operator=(const reactor&) = delete; void register_handler(const handler_ptr& h, event_type type); void unregister_handler(const handler_ptr& h, event_type type); void schedule_timer(const handler_ptr& h, time_t offset, time_t interval); void cancel_timer(const handler_ptr& h); void run(); void handle_events(); void shutdown(); bool is_running() const { return running_; } select_fdsets fdsets() const; void mark_handler_for_delete(const handler_ptr& h); void activate_handler(const handler_ptr& h, event_type ev); void deactivate_handler(const handler_ptr& h, event_type ev); const Statistics& get_statistics() const { return stats_; } private: struct TimerEvent { time_t timeout; handler_weak_ptr handler; bool operator>(const TimerEvent& other) const { return timeout > other.timeout; } }; using handler_map = std::unordered_map>; using timer_queue = std::priority_queue, std::greater<>>; void prepare_select_bits(time_t& timeout, select_fdsets& fd_sets) const; bool process_events(const select_fdsets& fd_sets, time_t now); void process_timers(time_t now); void cleanup_deleted_handlers(); void handle_error(const std::string& operation, const handler_ptr& h); int perform_select(struct timeval* timeout, select_fdsets& fd_sets); bool check_interruption(const select_fdsets& fd_sets); void safe_handler_operation(const handler_ptr& h, const std::string& op_name, const std::function& operation); HandlerEntry* find_handler_entry(const handler_ptr& h); void remove_handler_entry(const handler_ptr& h); void update_handler_events(HandlerEntry* entry, event_type ev, bool activate); private: handler_map handlers_; timer_queue timers_; std::atomic running_{false}; std::atomic shutdown_requested_{false}; mutable std::shared_mutex handlers_mutex_; std::mutex timers_mutex_; std::condition_variable shutdown_cv_; logger log_; Statistics stats_; leader_follower_thread_pool thread_pool_; socket_interrupter interrupter_; static constexpr std::chrono::seconds CLEANUP_INTERVAL{60}; time_t last_cleanup_{0}; }; } // namespace matador #endif //REACTOR_HPP