#include "matador/net/reactor.hpp" #include "matador/net/handler.hpp" #include "matador/logger/log_manager.hpp" #include #include #include #include #include namespace matador { reactor::reactor() : sentinel_(std::shared_ptr(nullptr)) , log_(create_logger("Reactor")) , thread_pool_(4, [this]() { handle_events(); }) { } reactor::~reactor() { log_.debug("destroying reactor"); thread_pool_.shutdown(); } void reactor::register_handler(const handler_ptr& h, event_type et) { h->register_reactor(this); h->open(); std::lock_guard l(mutex_); auto it = find_handler_type(h); if (it == handlers_.end()) { handlers_.emplace_back(h, et); } else if (it->first != h) { throw std::logic_error("given handler isn't expected handler"); } else { it->second = it->second | et; } interrupt_without_lock(); } void reactor::unregister_handler(const handler_ptr& h, event_type) { std::lock_guard l(mutex_); auto it = find_handler_type(h); if (it != handlers_.end()) { (*it).first->close(); handlers_.erase(it); } interrupt_without_lock(); } void reactor::schedule_timer(const std::shared_ptr& h, time_t offset, time_t interval) { h->register_reactor(this); std::lock_guard l(mutex_); auto it = find_handler_type(h); if (it == handlers_.end()) { handlers_.emplace_back(h, event_type::TIMEOUT_MASK); } h->schedule(offset, interval); } void reactor::cancel_timer(const std::shared_ptr& h) { std::lock_guard l(mutex_); auto it = find_handler_type(h); if (it != handlers_.end()) { handlers_.erase(it); } h->cancel_timer(); } void reactor::run() { // log_.info("start dispatching all clients"); thread_pool_.start(); { // log_.info("waiting for reactor shutdown"); std::unique_lock l(mutex_); shutdown_.wait(l, [this]() { return shutdown_requested_.load(); }); cleanup(); } // log_.info("all clients dispatched; shutting down"); thread_pool_.stop(); } void reactor::handle_events() { // std::cout << this << " start handle events\n" << std::flush; // log_.info("handle events"); running_ = true; time_t timeout; select_fdsets fd_sets; prepare_select_bits(timeout, fd_sets); // std::cout << this << " fd sets r: " << fd_sets.read_set().count() << ", w: " << fd_sets.write_set().count() << ", e: " << fd_sets.except_set().count() << ", max: " << fd_sets.maxp1() << "\n" << std::flush; // log_.debug("fds [r: %d, w: %d, e: %d]", // fdsets_.read_set().count(), // fdsets_.write_set().count(), // fdsets_.except_set().count()); // if (timeout != (std::numeric_limits::max)()) { // log_.debug("next timeout in %d sec", timeout); // } struct timeval tselect{}; struct timeval* p = nullptr; if (timeout < (std::numeric_limits::max)()) { tselect.tv_sec = timeout; tselect.tv_usec = 0; p = &tselect; // std::cout << this << " next timeout in " << p->tv_sec << " seconds\n" << std::flush; } if (!has_clients_to_handle(timeout, fd_sets)) { // std::cout << this << " no clients to handle; returning\n" << std::flush; // log_.info("no clients to handle, exiting"); return; } int ret; while ((ret = select(p, fd_sets)) < 0) { // std::cout << this << " select returned with error " << ret << "\n" << std::flush; if(errno != EINTR) { char error_buffer[1024]; log_.warn("select failed: %s", os::strerror(errno, error_buffer, 1024)); shutdown(); } else { return; } } // std::cout << this << " select returned with active requests " << ret << "\n" << std::flush; bool interrupted = is_interrupted(fd_sets); if (interrupted) { if (!shutdown_requested_) { // std::cout << this << " reactor was interrupted - promote new leader\n" << std::flush; // log_.info("reactor was interrupted"); thread_pool_.promote_new_leader(); return; } else { // std::cout << this << " reactor was interrupted for shutdown\n" << std::flush; // log_.info("shutting down"); // cleanup(); shutdown_.notify_one(); return; } } time_t now = ::time(nullptr); t_handler_type handler_type = resolve_next_handler(now, fd_sets); if (handler_type.first) { deactivate_handler(handler_type.first, handler_type.second); // std::cout << this << " handling client " << handler_type.first->name() << " - promoting new leader\n" << std::flush; thread_pool_.promote_new_leader(); // log_.info("start handling event"); // handle event if (handler_type.second == event_type::WRITE_MASK) { on_write_mask(handler_type.first); } else if (handler_type.second == event_type::READ_MASK) { on_read_mask(handler_type.first); } else if (handler_type.second == event_type::TIMEOUT_MASK) { on_timeout(handler_type.first, now); } else { // log_.info("unknown event type"); } activate_handler(handler_type.first, handler_type.second); remove_deleted(); } else { // no handler found // log_.info("no handler found"); thread_pool_.promote_new_leader(); } } void reactor::shutdown() { if (!is_running()) { return; } // shutdown the reactor properly log_.info("shutting down reactor"); shutdown_requested_ = true; interrupt(); } bool reactor::is_running() const { return running_; } void reactor::prepare_select_bits(time_t& timeout, select_fdsets& fd_sets) const { std::lock_guard l(mutex_); fd_sets.reset(); time_t now = ::time(nullptr); timeout = (std::numeric_limits::max)(); // set interrupter fd fd_sets.read_set().set(interrupter_.socket_id()); for (const auto &h : handlers_) { if (h.first == nullptr) { continue; } if (h.first->is_ready_read() && is_event_type_set(h.second, event_type::READ_MASK)) { fd_sets.read_set().set(h.first->handle()); } if (h.first->is_ready_write() && is_event_type_set(h.second, event_type::WRITE_MASK)) { fd_sets.write_set().set(h.first->handle()); } if (h.first->next_timeout() > 0 && is_event_type_set(h.second, event_type::TIMEOUT_MASK)) { timeout = (std::min)(timeout, h.first->next_timeout() <= now ? 0 : (h.first->next_timeout() - now)); } } } void reactor::remove_deleted() { while (!handlers_to_delete_.empty()) { auto h = handlers_to_delete_.front(); handlers_to_delete_.pop_front(); auto fi = std::find_if(handlers_.begin(), handlers_.end(), [&h](const t_handler_type &ht) { return ht.first.get() == h.get(); }); if (fi != handlers_.end()) { log_.debug("removing handler %d", fi->first->handle()); handlers_.erase(fi); } } } void reactor::cleanup() { while (!handlers_.empty()) { auto hndlr = handlers_.front(); handlers_.pop_front(); hndlr.first->close(); } } int reactor::select(struct timeval *timeout, select_fdsets& fd_sets) { log_.debug("calling select; waiting for io events"); return ::select( static_cast(fd_sets.maxp1()) + 1, fd_sets.read_set().get(), fd_sets.write_set().get(), fd_sets.except_set().get(), timeout ); } //void reactor::process_handler(int /*ret*/) //{ // handlers_.emplace_back(sentinel_, event_type::NONE_MASK); // time_t now = ::time(nullptr); // while (handlers_.front().first != nullptr) { // auto h = handlers_.front(); // handlers_.pop_front(); // handlers_.push_back(h); // // check for read/accept // if (h.first->handle() > 0 && fdsets_.write_set().is_set(h.first->handle())) { // on_write_mask(h.first); // } // if (h.first->handle() > 0 && fdsets_.read_set().is_set(h.first->handle())) { // on_read_mask(h.first); // } // if (h.first->next_timeout() > 0 && h.first->next_timeout() <= now) { // on_timeout(h.first, now); // } // } // handlers_.pop_front(); //} reactor::t_handler_type reactor::resolve_next_handler(time_t now, select_fdsets& fd_sets) { std::lock_guard l(mutex_); for (auto &h : handlers_) { if (h.first->handle() > 0 && fd_sets.write_set().is_set(h.first->handle())) { return std::make_pair(h.first, event_type::WRITE_MASK); } if (h.first->handle() > 0 && fd_sets.read_set().is_set(h.first->handle())) { return std::make_pair(h.first, event_type::READ_MASK); } if (h.first->next_timeout() > 0 && h.first->next_timeout() <= now) { return std::make_pair(h.first, event_type::TIMEOUT_MASK); } } return std::make_pair(nullptr, event_type::NONE_MASK); } void reactor::on_read_mask(const handler_ptr& handler) { // log_.debug("read bit for handler %d is set; handle input", h->handle()); // std::cout << this << " (handler " << h.get() << "): handle read\n" << std::flush; handler->on_input(); } void reactor::on_write_mask(const handler_ptr& handler) { // log_.debug("write bit for handler %d is set; handle output", h->handle()); // std::cout << this << " (handler " << h.get() << "): handle write\n" << std::flush; handler->on_output(); } void reactor::on_except_mask(const handler_ptr& /*handler*/) { // std::cout << this << " (handler " << h.get() << "): handle exception\n" << std::flush; } void reactor::on_timeout(const handler_ptr &h, time_t now) { // log_.debug("timeout expired for handler %d; handle timeout", h->handle()); // std::cout << this << " (handler " << h.get() << "): handle timeout\n" << std::flush; h->calculate_next_timeout(now); h->on_timeout(); } select_fdsets reactor::fdsets() const { time_t timeout; select_fdsets fd_sets; prepare_select_bits(timeout, fd_sets); return fd_sets; } void reactor::mark_handler_for_delete(const handler_ptr& h) { std::lock_guard l(mutex_); handlers_to_delete_.push_back(h); } bool reactor::is_interrupted(select_fdsets& fd_sets) { std::lock_guard l(mutex_); if (fd_sets.read_set().is_set(interrupter_.socket_id())) { log_.debug("interrupt byte received; resetting interrupter"); if (shutdown_requested_) { running_ = false; } return interrupter_.reset(); } return false; } bool reactor::has_clients_to_handle(time_t timeout, select_fdsets& fd_sets) const { std::lock_guard lock(mutex_); return fd_sets.maxp1() > 0 || timeout != (std::numeric_limits::max)(); } std::list::iterator reactor::find_handler_type(const reactor::handler_ptr &h) { return std::find_if(handlers_.begin(), handlers_.end(), [&h](const t_handler_type &ht) { return ht.first.get() == h.get(); }); } void reactor::activate_handler(const reactor::handler_ptr &h, event_type ev) { std::lock_guard l(mutex_); auto it = find_handler_type(h); if (it == handlers_.end()) { return; } it->second |= ev; } void reactor::deactivate_handler(const reactor::handler_ptr &h, event_type ev) { std::lock_guard l(mutex_); auto it = find_handler_type(h); if (it == handlers_.end()) { return; } it->second &= ~ev; } void reactor::interrupt() { log_.trace("interrupting reactor"); std::lock_guard l(mutex_); interrupter_.interrupt(); } void reactor::interrupt_without_lock() { log_.trace("interrupting reactor"); interrupter_.interrupt(); } }