From cd61ac7f007f5bc05a1e373217cff035cf0fbd9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20K=C3=BChl?= Date: Fri, 27 Jun 2025 15:19:25 +0200 Subject: [PATCH] added net module --- include/matador/net/acceptor.hpp | 174 +++++++ include/matador/net/address.hpp | 453 ++++++++++++++++++ include/matador/net/address_resolver.hpp | 115 +++++ include/matador/net/connector.hpp | 146 ++++++ include/matador/net/event_type.hpp | 127 +++++ include/matador/net/fdset.hpp | 111 +++++ include/matador/net/handler.hpp | 138 ++++++ include/matador/net/handler_creator.hpp | 23 + include/matador/net/io_service.hpp | 128 +++++ include/matador/net/io_stream.hpp | 67 +++ include/matador/net/ip.hpp | 130 +++++ include/matador/net/os.hpp | 45 ++ include/matador/net/peer.hpp | 203 ++++++++ include/matador/net/reactor.hpp | 117 +++++ include/matador/net/select_fd_sets.hpp | 132 +++++ include/matador/net/socket.hpp | 168 +++++++ include/matador/net/socket.tpp | 257 ++++++++++ include/matador/net/socket_acceptor.hpp | 357 ++++++++++++++ include/matador/net/socket_interrupter.hpp | 38 ++ include/matador/net/socket_stream.hpp | 95 ++++ include/matador/net/stream_handler.hpp | 96 ++++ include/matador/object/collection.hpp | 1 + .../utils/leader_follower_thread_pool.hpp | 21 +- include/matador/utils/uuid.hpp | 2 +- source/core/CMakeLists.txt | 6 + source/core/net/acceptor.cpp | 119 +++++ source/core/net/address.cpp | 154 ++++++ source/core/net/address_resolver.cpp | 19 + source/core/net/connector.cpp | 84 ++++ source/core/net/error.cpp | 38 ++ source/core/net/fdset.cpp | 66 +++ source/core/net/handler.cpp | 48 ++ source/core/net/io_service.cpp | 31 ++ .../core/net/leader_follower_thread_pool.cpp | 106 ++++ source/core/net/os.cpp | 84 ++++ source/core/net/reactor.cpp | 411 ++++++++++++++++ source/core/net/select_fdsets.cpp | 74 +++ source/core/net/socket_interrupter.cpp | 74 +++ source/core/net/stream_handler.cpp | 153 ++++++ 39 files changed, 4599 insertions(+), 12 deletions(-) create mode 100644 include/matador/net/acceptor.hpp create mode 100644 include/matador/net/address.hpp create mode 100644 include/matador/net/address_resolver.hpp create mode 100644 include/matador/net/connector.hpp create mode 100644 include/matador/net/event_type.hpp create mode 100644 include/matador/net/fdset.hpp create mode 100644 include/matador/net/handler.hpp create mode 100644 include/matador/net/handler_creator.hpp create mode 100644 include/matador/net/io_service.hpp create mode 100644 include/matador/net/io_stream.hpp create mode 100644 include/matador/net/ip.hpp create mode 100644 include/matador/net/os.hpp create mode 100644 include/matador/net/peer.hpp create mode 100644 include/matador/net/reactor.hpp create mode 100644 include/matador/net/select_fd_sets.hpp create mode 100644 include/matador/net/socket.hpp create mode 100644 include/matador/net/socket.tpp create mode 100644 include/matador/net/socket_acceptor.hpp create mode 100644 include/matador/net/socket_interrupter.hpp create mode 100644 include/matador/net/socket_stream.hpp create mode 100644 include/matador/net/stream_handler.hpp create mode 100644 source/core/net/acceptor.cpp create mode 100644 source/core/net/address.cpp create mode 100644 source/core/net/address_resolver.cpp create mode 100644 source/core/net/connector.cpp create mode 100644 source/core/net/error.cpp create mode 100644 source/core/net/fdset.cpp create mode 100644 source/core/net/handler.cpp create mode 100644 source/core/net/io_service.cpp create mode 100644 source/core/net/leader_follower_thread_pool.cpp create mode 100644 source/core/net/os.cpp create mode 100644 source/core/net/reactor.cpp create mode 100644 source/core/net/select_fdsets.cpp create mode 100644 source/core/net/socket_interrupter.cpp create mode 100644 source/core/net/stream_handler.cpp diff --git a/include/matador/net/acceptor.hpp b/include/matador/net/acceptor.hpp new file mode 100644 index 0000000..1137466 --- /dev/null +++ b/include/matador/net/acceptor.hpp @@ -0,0 +1,174 @@ +#ifndef MATADOR_ACCEPTOR_HPP +#define MATADOR_ACCEPTOR_HPP + +#include "matador/net/export.hpp" +#include "matador/net/handler.hpp" +#include "matador/net/handler_creator.hpp" +#include "matador/net/ip.hpp" + +#include "matador/logger/logger.hpp" + +#include + +namespace matador { + +/** + * The acceptor class is used to accept new connection + * within the reactor dispatcher. + * + * Once a new connection was accepted by the acceptor a + * new handler is created and registered within the reactor + * to handle the established connection + */ +class OOS_NET_API acceptor : public handler, public handler_creator +{ +public: + typedef std::function(tcp::socket sock, tcp::peer endpoint, acceptor *accptr)> t_accept_handler; /**< Shortcut to a function creating a handler on successfully accepted a new connection */ + + /** + * Default constructor + */ + acceptor(); + + /** + * Creates an acceptor with the given endpoint. The endpoint + * represents the address on which the acceptor listens for new + * connections + * + * @param endpoint Endpoint to listen for new connections + */ + explicit acceptor(tcp::peer endpoint); + + /** + * Creates an acceptor with the given endpoint. The endpoint + * represents the address on which the acceptor listens for new + * connections. The given function is called when a new + * connection was accepted and returns a new handler for + * the new connection. + * + * @param endpoint Endpoint to listen for new connections + * @param on_new_connection Function creating a new handler for each accepted new connection + */ + acceptor(tcp::peer endpoint, t_accept_handler on_new_connection); + + /** + * Destructor + */ + ~acceptor() override; + + /** + * When a new connection is accepted the given function + * is called to create a new handler for the connection + * + * @param on_new_connection Function creating a new handler for the new connection + */ + void accecpt(t_accept_handler on_new_connection); + + /** + * Accepts new connection at the given endpoint. + * When a new connection is accepted the given function + * is called to create a new handler for the connection. + * + * @param endpoint Endpoint to listen for new connections + * @param on_new_connection Function creating a new handler for the new connection + */ + void accecpt(const tcp::peer& endpoint, t_accept_handler on_new_connection); + + /** + * Opens the acceptor means the socket address of the + * endpoint is bound to the created listing socket + * Then the socket is used for listening for new + * connections. + */ + void open() override; + + /** + * Returns the current listening socket fd + * + * @return Listening socket fd + */ + socket_type handle() const override; + + /** + * Is called when a new connection wants to + * execute to the endpoint. Once the connection was accepted + * a new connection handler is created and the socket is + * passed to the handler. The handler is then registered + * to the reactor to disptach its read and write events. + */ + void on_input() override; + + /** + * Does actually nothing + */ + void on_output() override {} + + /** + * Does actually nothing + */ + void on_except() override {} + + /** + * Does actually nothing + */ + void on_timeout() override {} + + /** + * Does actually nothing + */ + void on_close() override {} + + /** + * Closes the listen fd of the acceptor + */ + void close() override; + + /** + * Returns always false because new connections + * are indicated as read events. + * + * @return Always false + */ + bool is_ready_write() const override; + + /** + * Returns true if the acceptor was opened + * and a listen fd was created. + * + * @return True If a listen socket was created + */ + bool is_ready_read() const override; + + /** + * Notifies the acceptor that + * this handler was closed. + * + * @param hndlr Closed handler. + */ + void notify_close(handler *hndlr) override; + + std::string name() const override; + +public: + /** + * Returns the current endpoint accepting new connection. + * + * @return Current listening endpoint + */ + const tcp::peer& endpoint() const; + +private: + tcp::acceptor acceptor_; + tcp::peer endpoint_; + + std::string name_ { "acceptor" }; + + t_accept_handler accept_handler_; + + logger log_; + + tcp::peer create_client_endpoint() const; +}; + +} +#endif //MATADOR_ACCEPTOR_HPP diff --git a/include/matador/net/address.hpp b/include/matador/net/address.hpp new file mode 100644 index 0000000..0f1d6dc --- /dev/null +++ b/include/matador/net/address.hpp @@ -0,0 +1,453 @@ +#ifndef MATADOR_ADDRESS_HPP +#define MATADOR_ADDRESS_HPP + +#include "matador/utils/os.hpp" + +#include "matador/net/export.hpp" +#include "matador/net/os.hpp" +#include "matador/net/error.hpp" + +#include +#include +#include + +#ifdef _WIN32 +#include +#else +#include +#include +#include +#include +#endif + +namespace matador { + +/** + * Enum representing the protocol + * family IPv4 and IPv6 + */ +enum protocol_family { + V4, /**< IPv4 enum value */ + V6 /**< IPv6 enum value */ +}; + +/// @cond MATADOR_DEV +template < protocol_family PF > +class address_router; +/// @endcond + +/** + * The address class represents a IPv4 or + * IPv6 address. + */ +class OOS_NET_API address +{ +public: + /** + * Default constructor + */ + address() = default; + + /** + * Constructs an address from the given + * addr representing a IPv4 socket address + * structure + * + * @param addr Initial IPv4 Socket address + */ + explicit address(const sockaddr_in &addr); + + /** + * Constructs an address from the given + * addr representing a IPv6 socket address + * structure + * + * @param addr Initial IPv6 Socket address + */ + explicit address(const sockaddr_in6 &addr); + + /** + * Copy constructs an address from the + * given address x + * + * @param x Address to copy from + */ + address(const address &x) = default; + + /** + * Copy assign an address from the + * given address x + * + * @param x Address to assign from + * @return The assigned address + */ + address& operator=(const address &x); + + /** + * Move copy constructs an address from the + * given address x + * + * @param x Address to move copy from + */ + address(address &&x) noexcept; + + /** + * Move assign an address from the + * given address x + * + * @param x Address to move assign from + * @return The moved address + */ + address& operator=(address &&x) noexcept; + + /** + * Destructs the address + */ + ~address(); + + /** + * Returns the address as unsigned long value + * + * @return The address as unsigned long value + */ + unsigned int to_ulong() const; + + /** + * Return the address in string format either + * in IPv4 dotted format or in IPv6 colon based + * format + * + * @return The address as string + */ + std::string to_string() const; + + /** + * Sets the port number + * + * @param pn Port number to set + */ + void port(unsigned short pn); + + /** + * Returns the current port number of the + * address. + * + * @return The current port number + */ + unsigned short port() const; + + /** + * Returns true if address is IPv4 + * + * @return True if IPv4 address + */ + bool is_v4() const; + + /** + * Returns true if address is IPv4 + * + * @return True if IPv4 address + */ + bool is_v6() const; + + /** + * Returns the raw sockaddr structure + * + * @return The raw sockaddr structure + */ + sockaddr* addr(); + + /** + * Returns the raw sockaddr structure + * + * @return The raw sockaddr structure + */ + const sockaddr* addr() const; + + /** + * Returns the IPv4 sockaddr_in structure + * + * @return The IPv4 sockaddr_in structure + */ + sockaddr_in* addr_v4(); + + /** + * Returns the IPv4 sockaddr_in structure + * + * @return The IPv4 sockaddr_in structure + */ + const sockaddr_in* addr_v4() const; + + /** + * Returns the IPv6 sockaddr_in6 structure + * + * @return The IPv6 sockaddr_in6 structure + */ + sockaddr_in6* addr_v6(); + + /** + * Returns the IPv6 sockaddr_in6 structure + * + * @return The IPv6 sockaddr_in6 structure + */ + const sockaddr_in6* addr_v6() const; + + /** + * Returns the size of the underlying + * address structure. + * + * @return Size of the underlying address structure + */ + socklen_t size() const; + + typedef address_router v4; /**< Shortcut to the internal IPv4 address router */ + typedef address_router v6; /**< Shortcut to the internal IPv6 address router */ + +private: + void clear(); + +private: + template < protocol_family PF > + friend class address_router; + + union socket_address { + sockaddr sa_raw; + sockaddr_in sa_in; + sockaddr_in6 sa_in6; + }; + + socket_address socket_address_ {}; + + socklen_t size_ = 0; +}; + +/// @cond MATADOR_DEV +template <> +class address_router +{ +public: + address_router() = delete; + address_router& operator=(const address_router&) = delete; + address_router(const address_router&) = delete; + address_router& operator=(address_router&&) = delete; + address_router(address_router&&) = delete; + + static address empty() { return mk_address(INADDR_ANY); }; + static address any() { return mk_address(INADDR_ANY); } + static address loopback() { return mk_address(INADDR_LOOPBACK); } + static address broadcast() {return mk_address(INADDR_BROADCAST); } + static address from_ip(const std::string &str) { return from_ip(str.c_str()); } + static address from_ip(const char *str) + { + // now fill in the address info struct + // create and fill the hints struct + if (str == nullptr) { + return address(); + } + // get address from string + + sockaddr_in addr{}; + int ret = os::inet_pton(PF_INET, str, &(addr.sin_addr)); + if (ret == 1) { + addr.sin_family = PF_INET; + return address(addr); + } else if (ret == 0) { + detail::throw_logic_error("invalid ip address"); + } else { + detail::throw_logic_error_with_errno("invalid ip address: %s", errno); + } + return address(); + } + + static address from_hostname(const std::string &str) { return from_hostname(str.c_str()); } + static address from_hostname(const char *str) + { + // now fill in the address info struct + // create and fill the hints struct + if (str == nullptr) { + return address(); + } + // get address from string + sockaddr_in addr{}; + + int ret; + + if ((ret = os::inet_pton(AF_INET, str, &addr.sin_addr)) == 1) { + addr.sin_family = PF_INET; + } else if (ret == -1) { + detail::throw_logic_error_with_errno("invalid address: %s", errno); + } else { /* 0 == try name */ + struct addrinfo hints{}; + struct addrinfo *result = nullptr; + + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_INET; /* Allow IPv4 or IPv6 */ + hints.ai_socktype = SOCK_STREAM; /* Stream socket */ + hints.ai_flags = AI_PASSIVE; /* Numeric or net network hostname */ + hints.ai_protocol = 0; /* Any protocol */ + hints.ai_canonname = nullptr; + hints.ai_addr = nullptr; + hints.ai_next = nullptr; + + int s = getaddrinfo(str, nullptr, &hints, &result); + if (s != 0) { + detail::throw_logic_error_with_gai_errno("invalid ip address (getaddrinfo): %s", s); + } + + /* getaddrinfo() returns a list of address structures. + Try each address until we successfully bind(2). + If socket(2) (or bind(2)) fails, we (close the socket + and) try the next address. */ + + // take first address + memcpy(&addr, result->ai_addr, sizeof(&result->ai_addr)); + + freeaddrinfo(result); /* No longer needed */ + } + addr.sin_family = PF_INET; + memset(addr.sin_zero, '\0', sizeof(addr.sin_zero)); + return address(addr); + } + +private: + static address mk_address(unsigned int inaddr) + { + sockaddr_in addr{}; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = PF_INET; + addr.sin_addr.s_addr = htonl(inaddr); + return address(addr); + } +}; + +template <> +class address_router +{ +public: + address_router() = delete; + address_router& operator=(const address_router&) = delete; + address_router(const address_router&) = delete; + address_router& operator=(address_router&&) = delete; + address_router(address_router&&) = delete; + + static address empty() { return mk_address(in6addr_any); } + static address any() { return mk_address(in6addr_any); } + static address loopback() { return mk_address(in6addr_loopback); } + static address broadcast() {return mk_multicast_address(); } + + static address from_ip(const std::string &str) + { + return from_ip(str.c_str()); + } + + static address from_ip(const char *str) + { + // now fill in the address info struct + // create and fill the hints struct + if (str == nullptr) { + return address(); + } + // get address from string + + sockaddr_in6 addr{}; + int ret = os::inet_pton(PF_INET6, str, &(addr.sin6_addr)); + if (ret == 1) { + addr.sin6_family = PF_INET6; + return address(addr); + } else if (ret == 0) { + char message_buffer[1024]; + os::sprintf(message_buffer, 1024, "INET_PTON (ip): invalid ip address [%s]", str); + detail::throw_logic_error(message_buffer); + } else { + char message_buffer[1024]; + int err = errno; + os::sprintf(message_buffer, 1024, "INET_PTON (ip): invalid ip address [%s]: %%s", str); + detail::throw_logic_error_with_errno(message_buffer, err); + } + return address(); + } + + static address from_hostname(const std::string &str) + { + return from_hostname(str.c_str()); + } + + static address from_hostname(const char *str) + { + // now fill in the address info struct + // create and fill the hints struct + if (str == nullptr) { + return address(); + } + // get address from string + sockaddr_in6 addr{}; + + int ret = os::inet_pton(PF_INET6, str, &addr.sin6_addr); + + if (ret == 1) { + addr.sin6_family = PF_INET6; + } else if (ret == -1) { + char message_buffer[1024]; + int err = errno; + os::sprintf(message_buffer, 1024, "INET_PTON (host): invalid ip address [%s] (errno: %d): %%s", str, err); + detail::throw_logic_error_with_errno(message_buffer, err); + } else { /* 0 == try name */ + struct addrinfo hints{}; + struct addrinfo *result = nullptr; + + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_INET6; /* Allow IPv4 or IPv6 */ + hints.ai_socktype = SOCK_STREAM; /* Stream socket */ + hints.ai_flags = AI_PASSIVE; /* Numeric or net network hostname */ + hints.ai_protocol = 0; /* Any protocol */ + hints.ai_canonname = nullptr; + hints.ai_addr = nullptr; + hints.ai_next = nullptr; + + int s = getaddrinfo(str, nullptr, &hints, &result); + if (s != 0) { + char message_buffer[1024]; + os::sprintf(message_buffer, 1024, "GETADDRINFO (host): invalid ip address [%s] (errno: %d): %%s", str, s); + detail::throw_logic_error_with_errno(message_buffer, s); + } + + /* getaddrinfo() returns a list of address structures. + Try each address until we successfully bind(2). + If socket(2) (or bind(2)) fails, we (close the socket + and) try the next address. */ + + // take first address + memcpy(&addr, result->ai_addr, result->ai_addrlen); + + freeaddrinfo(result); /* No longer needed */ + } + addr.sin6_family = PF_INET6; + return address(addr); + } + +private: + static OOS_NET_API const char *IP6ADDR_MULTICAST_ALLNODES; + + static address mk_address(in6_addr in6addr) + { + sockaddr_in6 addr{}; + memset(&addr, 0, sizeof(addr)); + addr.sin6_family = PF_INET6; + addr.sin6_addr = in6addr; + return address(addr); + } + + static address mk_multicast_address() + { + sockaddr_in6 addr{}; + memset(&addr, 0, sizeof(addr)); + addr.sin6_family = PF_INET6; + os::inet_pton(AF_INET6, IP6ADDR_MULTICAST_ALLNODES, &addr.sin6_addr); + return address(addr); + } +}; + +/// @endcond + +} +#endif //MATADOR_ADDRESS_HPP diff --git a/include/matador/net/address_resolver.hpp b/include/matador/net/address_resolver.hpp new file mode 100644 index 0000000..4c3928b --- /dev/null +++ b/include/matador/net/address_resolver.hpp @@ -0,0 +1,115 @@ +#ifndef MATADOR_ADDRESS_RESOLVER_HPP +#define MATADOR_ADDRESS_RESOLVER_HPP + +#include "matador/net/export.hpp" +#include "matador/net/peer.hpp" +#include "matador/net/error.hpp" + +#include + +namespace matador { +/// @cond MATADOR_DEV +class tcp; +class udp; + +namespace detail { + +template < class P > +int determine_socktype(); + +template <> +OOS_NET_API int determine_socktype(); + +template <> +OOS_NET_API int determine_socktype(); + +template < class P > +void initialize_hints(struct addrinfo &hints, int flags) { + memset(&hints,0,sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = determine_socktype

(); + hints.ai_protocol = 0; + hints.ai_flags = flags; +} +/// @endcond +} + +/** + * The address resolver resolves a given host and port + * to a peer object representing the given address + * + * @tparam P Type of protocol + */ +template < class P > +class address_resolver +{ +public: + typedef typename P::peer peer; /**< Shortcut to peer type */ + + /** + * Default constructor + */ + address_resolver() = default; + + /** + * Resolves the given host and port to a list + * of valid peers representing the ip addresses + * of the host either in IPv4 or IPv6 format + * + * @param hostname Hostname to resolve + * @param port Port to resolve + * @return A list of peers representing the host and port + */ + std::vector resolve(const std::string &hostname, const std::string &port); + + /** + * Resolves the given host and port to a list + * of valid peers representing the ip addresses + * of the host either in IPv4 or IPv6 format + * + * @param hostname Hostname to resolve + * @param port Port to resolve + * @return A list of peers representing the host and port + */ + std::vector resolve(const char *hostname, const char *port); + +}; + +/// @cond MATADOR_DEV +template +std::vector::peer> address_resolver

::resolve(const std::string &hostname, const std::string &port) +{ + return resolve(hostname.c_str(), port.c_str()); +} + +template < class P > +std::vector::peer> address_resolver

::resolve(const char *hostname, const char *port) +{ + struct addrinfo hints = {}; + detail::initialize_hints

(hints, AI_PASSIVE); + struct addrinfo* res = nullptr; + struct addrinfo* head = nullptr; + int err = getaddrinfo(hostname, port, &hints, &res); + + if (err != 0) { + detail::throw_logic_error_with_gai_errno("error on getaddrinfo: %s", err); + } + + head = res; + std::vector peers; + do { + + if (res->ai_family == PF_INET) { + peers.push_back(peer(address(*(struct sockaddr_in*)res->ai_addr))); + } else if (res->ai_family == PF_INET6) { + peers.push_back(peer(address(*(struct sockaddr_in6*)res->ai_addr))); + } // else -> not supported + } while ( (res = res->ai_next) != nullptr); + + freeaddrinfo(head); + + return peers; +} +/// @endcond +} +#endif //MATADOR_ADDRESS_RESOLVER_HPP diff --git a/include/matador/net/connector.hpp b/include/matador/net/connector.hpp new file mode 100644 index 0000000..f75bab0 --- /dev/null +++ b/include/matador/net/connector.hpp @@ -0,0 +1,146 @@ +#ifndef MATADOR_CONNECTOR_HPP +#define MATADOR_CONNECTOR_HPP + +#include "matador/net/export.hpp" +#include "matador/net/handler.hpp" +#include "matador/net/handler_creator.hpp" +#include "matador/net/ip.hpp" + +#include "matador/logger/logger.hpp" + +#include + +namespace matador { + +/** + * Connector which initiates a reactor based connection + * to a remote network host identified by a list + * of endpoints. + * + * Once a connection is established a handler is created + * with the given create handler function. The socket + * is passed to the created handler. + */ +class OOS_NET_API connector : public handler, public handler_creator +{ +public: + typedef std::function(tcp::socket sock, tcp::peer endpoint, connector *cnnctr)> t_connect_handler; /**< Shortcut to a function creating a handler on successfully execute to a host */ + + /** + * Default constructor + */ + connector(); + + /** + * Creates a new connector with the given + * create handler function + * + * @param on_new_connection Function which creates a handler on new connection + */ + explicit connector(t_connect_handler on_new_connection); + + + /** + * Initiates a execute to one of the given endpoints within the + * given reactor. Once a connection is established a new handler + * for this connection is created. The new connection is dispatched + * by the reactor. + * + * @param r Reactor to handle the connector and the created connection + * @param endpoints List of endpoints to + */ + void connect(reactor &r, const std::vector &endpoints); + /** + * Initiates a execute to one of the given endpoints within the + * given reactor. Once a connection is established a new handler + * for this connection is created with the given function. The new + * connection is dispatched by the reactor. + * + * @param r Reactor to handle the connector and the created connection + * @param endpoints List of endpoints to + * @param on_new_connection Function creating a new handler on new connection + */ + void connect(reactor &r, const std::vector &endpoints, t_connect_handler on_new_connection); + + /** + * Opens the connector. Actually this does + * nothing at all. + */ + void open() override {} + + /** + * Returns the handle of the connector. The result will + * always be zero, because the connector doesn't has a socket + * + * @return Always zero + */ + socket_type handle() const override; + + /** + * Does nothing + */ + void on_input() override {} + + /** + * Does nothing + */ + void on_output() override {} + + /** + * Does nothing + */ + void on_except() override {} + + /** + * The timeout call is used to establish the connection. + * Once the connection could be established within this call + * the timeout is canceled otherwise it will be tried again + * after three seconds + */ + void on_timeout() override; + + /** + * Does nothing + */ + void on_close() override {} + + /** + * Does nothing + */ + void close() override {} + + /** + * Always false because connection is established via timeout + * + * @return Always false + */ + bool is_ready_write() const override; + + /** + * Always false because connection is established via timeout + * + * @return Always false + */ + bool is_ready_read() const override; + + /** + * Notifies the connector that + * this handler was closed. + * + * @param hndlr Closed handler. + */ + void notify_close(handler *hndlr) override; + + std::string name() const override; + +private: + t_connect_handler connect_handler_; + + logger log_; + + std::vector endpoints_; +}; + +} + +#endif //MATADOR_CONNECTOR_HPP diff --git a/include/matador/net/event_type.hpp b/include/matador/net/event_type.hpp new file mode 100644 index 0000000..88cc148 --- /dev/null +++ b/include/matador/net/event_type.hpp @@ -0,0 +1,127 @@ +#ifndef MATADOR_EVENT_TYPE_HPP +#define MATADOR_EVENT_TYPE_HPP + +namespace matador::net { + +/** + * Enum representing an event processing mask + * for the reactor io dispatcher. The mask values + * can be bitwise combined and are used to + * check if a handler (@see handler.hpp) is able + * to read, write or accept. + */ +enum class event_type : unsigned { + NONE_MASK = 0, /**< Enum value for no event */ + READ_MASK = 1 << 0, /**< Enum value for read mask */ + WRITE_MASK = 1 << 1, /**< Enum value for write mask */ + EXCEPT_MASK = 1 << 2, /**< Enum value for except mask */ + TIMEOUT_MASK = 1 << 3, /**< Enum value for timeout mask */ + ACCEPT_MASK = READ_MASK, /**< Enum value for accept mask */ + READ_WRITE_MASK = READ_MASK | WRITE_MASK, /**< Enum value for read write mask */ + ALL_MASK = READ_MASK | WRITE_MASK | EXCEPT_MASK | ACCEPT_MASK /**< Enum value for all events mask */ +}; + +/** + * event_type operator to concat to event types + * with or operator + * + * @param a Left event_type + * @param b Right event_type + * @return The combined result + */ +inline event_type operator|(event_type a, event_type b) +{ + return static_cast(static_cast(a) | static_cast(b)); +} + +/** + * event_type operator to concat to event types + * with and operator + * + * @param a Left event_type + * @param b Right event_type + * @return The combined result + */ +inline event_type operator&(event_type a, event_type b) +{ + return static_cast(static_cast(a) & static_cast(b)); +} + +/** + * event_type not operator + * + * @param a Left event_type + * @return The combined result + */ +inline event_type operator~ (event_type a) +{ + return (event_type)~(int)a; +} + +/** + * event_type operator to concat to event types + * with xor operator + * + * @param a Left event_type + * @param b Right event_type + * @return The combined result + */ +inline event_type operator^ (event_type a, event_type b) +{ + return (event_type)((int)a ^ (int)b); +} + +/** + * event_type operator to assign event type + * with or operator + * + * @param a Left event_type + * @param b Right event_type + * @return The combined result + */ +inline event_type& operator|= (event_type& a, event_type b) +{ + return (event_type&)((int&)a |= (int)b); +} + +/** + * event_type operator to assign event type + * with and operator + * + * @param a Left event_type + * @param b Right event_type + * @return The combined result + */ +inline event_type& operator&= (event_type& a, event_type b) +{ + return (event_type&)((int&)a &= (int)b); +} + +/** + * event_type operator to assign event type + * with xor operator + * + * @param a Left event_type + * @param b Right event_type + * @return The combined result + */ +inline event_type& operator^= (event_type& a, event_type b) +{ + return (event_type&)((int&)a ^= (int)b); +} +/** + * Checks if a specific event_type is set in + * a given event type mask. If type is set + * true is returned. + * + * @param source Event type mask to check. + * @param needle Requested event type + * @return True if event type is set + */ +inline bool is_event_type_set(event_type source, event_type needle) +{ + return static_cast(source & needle) > 0; +} + +} +#endif //MATADOR_EVENT_TYPE_HPP diff --git a/include/matador/net/fdset.hpp b/include/matador/net/fdset.hpp new file mode 100644 index 0000000..49ac106 --- /dev/null +++ b/include/matador/net/fdset.hpp @@ -0,0 +1,111 @@ +#ifndef MATADOR_FDSET_HPP +#define MATADOR_FDSET_HPP + +#include "matador/net/export.hpp" +#include "matador/net/os.hpp" + +#include +#include +#include + +#ifdef _WIN32 +#include +#else +#include +#endif + +namespace matador { + +/** + * This class represents a fd set + * used by the reactor class. There it + * is used in combination with a call + * to select. + */ +class OOS_NET_API fdset +{ +public: + fdset(const fdset&) = delete; + fdset& operator=(const fdset&) = delete; + + /** + * Default constructor creates + * an empty fd set + */ + fdset(); + + /** + * Destroys the fd set + */ + ~fdset() = default; + + fdset(fdset&& x) noexcept; + fdset& operator=(fdset&& x) noexcept; + + /** + * Reset all bits to zero + */ + void reset(); + + /** + * Checks if the given fd is set + * in the fd set. + * + * @param fd Requested fd + * @return True if fd is set + */ + bool is_set(socket_type fd) const; + + /** + * Clears the giveb fd from the set. + * + * @param fd fd to clear + */ + void clear(socket_type fd); + + /** + * Sets the given fd in the fd set. + * + * @param fd fd to set + */ + void set(socket_type fd); + + /** + * Returns the highest fd plus one. + * This is needed for the call to select + * + * @return Highest fd plus one + */ + socket_type maxp1() const; + + /** + * Returns the current number of fd in the set + * + * @return Number of fd in set + */ + size_t count() const; + + /** + * Checks if the set is empty + * + * @return True if the fd set is empty + */ + bool empty() const; + + /** + * Returns a pointer to the underlying fd_set structure + * + * @return Pointer to the underlying fd_set structure + */ + fd_set* get(); + +private: + typedef std::set > int_set_t; + int_set_t max_fd_set_; + + fd_set fd_set_ = {}; +}; + +} + +#endif //MATADOR_FDSET_HPP diff --git a/include/matador/net/handler.hpp b/include/matador/net/handler.hpp new file mode 100644 index 0000000..a6ac7f3 --- /dev/null +++ b/include/matador/net/handler.hpp @@ -0,0 +1,138 @@ +#ifndef MATADOR_HANDLER_HPP +#define MATADOR_HANDLER_HPP + +#include "matador/net/os.hpp" + +#include +#include + +namespace matador::net { + +class reactor; + +/** + * Base class for all handlers used + * with the reactor. The handler must implement its + * interface to handle input, output, exceptional and + * timeout data. + */ +class handler : public std::enable_shared_from_this +{ +public: + /** + * Virtual destructor + */ + virtual ~handler() = default; + + /** + * Interface to open a handler. This + * is called when a handler is registered + * within the reactor + */ + virtual void open() = 0; + + /** + * Interface to returns the socket handle + * of the concrete handler implementation. + * + * @return The socket fd + */ + virtual socket_type handle() const = 0; + + /** + * Interface handling incoming data + */ + virtual void on_input() = 0; + + /** + * Interface handling outgoing data + */ + virtual void on_output() = 0; + + /** + * Interface handling exceptional data + */ + virtual void on_except() = 0; + + /** + * Interface handling timout data + */ + virtual void on_timeout() = 0; + + /** + * Interface called when the handler is closed + */ + virtual void on_close() = 0; + + /** + * Interface implementation should close + * the handle gracefully + */ + virtual void close() = 0; + + /** + * Interface should return true if there + * is outgoing data + * + * @return True if there is outgoing data + */ + virtual bool is_ready_write() const = 0; + + /** + * Interface should return true if there + * is incoming data + * + * @return True if there is incoming data + */ + virtual bool is_ready_read() const = 0; + + /** + * Returns the next timeout scheduled + * in the reactor + * + * @return Next timeout + */ + time_t next_timeout() const; + + /** + * Returns the timeout interval in + * seconds + * + * @return Timeout interval + */ + time_t interval() const; + + /** + * Get the name of the handler. + * The name don't need to be unique. + * It has only an informational purpose. + * + * @return The name of the handler + */ + virtual std::string name() const = 0; + +protected: + + /** + * Gets the underlying reactor + * + * @return The underlying reactor + */ + reactor* get_reactor() const; + +private: + friend class reactor; + + void register_reactor(reactor *r); + void schedule(time_t offset, time_t interval); + void cancel_timer(); + void calculate_next_timeout(time_t now); + + reactor *reactor_ = nullptr; + + time_t next_timeout_ = 0; + time_t interval_ = 0; +}; + +} +#endif //MATADOR_HANDLER_HPP diff --git a/include/matador/net/handler_creator.hpp b/include/matador/net/handler_creator.hpp new file mode 100644 index 0000000..39b19e6 --- /dev/null +++ b/include/matador/net/handler_creator.hpp @@ -0,0 +1,23 @@ +#ifndef MATADOR_HANDLER_CREATOR_HPP +#define MATADOR_HANDLER_CREATOR_HPP + +#include "matador/net/export.hpp" + +namespace matador { + +/// @cond MATADOR_DEV + +class handler; + +class OOS_NET_API handler_creator +{ +public: + virtual ~handler_creator() = default; + + virtual void notify_close(handler *hndlr) = 0; +}; + +/// @endcond + +} +#endif //MATADOR_HANDLER_CREATOR_HPP diff --git a/include/matador/net/io_service.hpp b/include/matador/net/io_service.hpp new file mode 100644 index 0000000..ae5bcce --- /dev/null +++ b/include/matador/net/io_service.hpp @@ -0,0 +1,128 @@ +#ifndef MATADOR_IO_SERVICE_HPP +#define MATADOR_IO_SERVICE_HPP + +#include "matador/utils/buffer.hpp" + +#include "matador/net/export.hpp" +#include "matador/net/reactor.hpp" +#include "matador/net/acceptor.hpp" +#include "matador/net/connector.hpp" +#include "matador/net/stream_handler.hpp" + +namespace matador { + +/** + * IO Service is used to encapsulate the an instance + * of the reactor class. + */ +class OOS_NET_API io_service +{ +public: + /** + * Creates a io_service + */ + io_service(); + + ~io_service(); + /** + * Starts the io_service with the underlying reactor + */ + void run(); + + /** + * Returns true if the io service is + * running + * + * @return True if service is running + */ + bool is_running() const; + + /** + * Shuts down a running service + */ + void shutdown(); + + /** + * Adds the given acceptor for the + * given peer endpoint and callback. + * + * The callback is called, when a new connection + * was accepted. + * + * @tparam AcceptCallback Type of callback + * @param ac Acceptor used to accept connections + * @param ep Endpoint on which the acceptor will listen + * @param accept_callback Callback when connection was accepted + */ + template < typename AcceptCallback > + void accept(const std::shared_ptr& ac, const tcp::peer &ep, AcceptCallback accept_callback); + + /** + * Adds the given acceptor for the + * given callback. + * + * The callback is called, when a new connection + * was accepted. + * + * @tparam AcceptCallback Type of callback + * @param ac Acceptor used to accept connections + * @param accept_callback Callback when connection was accepted + */ + template < typename AcceptCallback > + void accept(const std::shared_ptr& ac, AcceptCallback accept_callback); + + /** + * Add the given connector for the given port + * and execute callback. + * + * Once a connection is established the callback + * is called. + * + * @tparam ConnectCallback Type of the callback + * @param co Connector Used to establish the connection + * @param port Port to execute to + * @param connect_callback Callback when connection was established + */ + template < typename ConnectCallback > + void connect(const std::shared_ptr& co, const std::string &port, ConnectCallback connect_callback); + +private: + logger log_; + reactor reactor_; +}; + +template +void io_service::accept(const std::shared_ptr& ac, const tcp::peer &ep, AcceptCallback accept_callback) +{ + log_.info("registering acceptor for %s", ep.to_string().c_str()); + ac->accecpt(ep, [accept_callback, this](tcp::socket sock, tcp::peer p, acceptor *accptr) { + return std::make_shared(sock, p, accptr, accept_callback); + }); + + reactor_.register_handler(ac, event_type::ACCEPT_MASK); +} + +template +void io_service::accept(const std::shared_ptr& ac, AcceptCallback accept_callback) +{ + log_.info("registering acceptor for %s", ac->endpoint().to_string().c_str()); + ac->accecpt([accept_callback](tcp::socket sock, tcp::peer p, acceptor *accptr) { + return std::make_shared(sock, p, accptr, accept_callback); + }); + + reactor_.register_handler(ac, event_type::ACCEPT_MASK); +} + +template +void io_service::connect(const std::shared_ptr &co, const std::string &port, ConnectCallback connect_callback) +{ + log_.info("registering connector for localhost:%s", port.c_str()); + tcp::resolver resolver; + auto endpoints = resolver.resolve("localhost", port); + co->connect(reactor_, endpoints, [connect_callback](const tcp::socket& sock, const tcp::peer &p, connector *cnnctr) { + return std::make_shared(sock, p, cnnctr, connect_callback); + }); +} + +} +#endif //MATADOR_IO_SERVICE_HPP diff --git a/include/matador/net/io_stream.hpp b/include/matador/net/io_stream.hpp new file mode 100644 index 0000000..ce37a00 --- /dev/null +++ b/include/matador/net/io_stream.hpp @@ -0,0 +1,67 @@ +#ifndef MATADOR_IO_STREAM_HPP +#define MATADOR_IO_STREAM_HPP + +#include +#include + +#include "matador/net/export.hpp" +#include "matador/net/ip.hpp" + +namespace matador { + +class buffer; +class buffer_view; + +/** + * The io stream class is proposed + * to be used with the io_service class + * and provides therefore an interface + * which is used by the io_service + */ +class OOS_NET_API io_stream +{ +public: + typedef std::function t_read_handler; /**< Short for function to process read data */ + typedef std::function t_write_handler; /**< Short for function to prepare data to write */ + + /** + * This interface is called when data should + * be read from a socket. Once the date was read + * the given read handler is called. + * + * @param buf Buffer to read the data in + * @param read_handler Handler to be called when data was read + */ + virtual void read(buffer_view buf, t_read_handler read_handler) = 0; + + /** + * This interface is called when data should be written + * to a socket. Once the data was written the given + * write handler is called. + * + * @param buffers List of buffers containing the data to write + * @param write_handler Handler to be called when the data was written + */ + virtual void write(std::list buffers, t_write_handler write_handler) = 0; + + /** + * Closes the stream + */ + virtual void close_stream() = 0; + + /** + * Returns the underlying stream socket + * @return + */ + virtual tcp::socket& stream() = 0; + + /** + * Returns a name for the io stream. + * + * @return Name of the io stream + */ + virtual std::string name() const = 0; +}; + +} +#endif //MATADOR_IO_STREAM_HPP diff --git a/include/matador/net/ip.hpp b/include/matador/net/ip.hpp new file mode 100644 index 0000000..cab483b --- /dev/null +++ b/include/matador/net/ip.hpp @@ -0,0 +1,130 @@ +#ifndef MATADOR_IP_HPP +#define MATADOR_IP_HPP + +#include "matador/net/peer.hpp" +#include "matador/net/address_resolver.hpp" +#include "matador/net/socket.hpp" +#include "matador/net/socket_stream.hpp" +#include "matador/net/socket_acceptor.hpp" + +namespace matador::net { + +/** + * The tcp class represents all + * settings to handle tcp socket + * connections + */ +class tcp { +public: + typedef peer_base peer; /**< Shortcut to a tcp based peer */ + typedef socket_stream socket; /**< Shortcut to a tcp based socket */ + typedef socket_acceptor acceptor; /**< Shortcut to a tcp based acceptor */ + typedef address_resolver resolver; /**< Shortcut to a tcp based address resolver */ + + /** + * Returns the type of the socket + * + * @return Type of the socket + */ + int type() const { return SOCK_STREAM; } + + /** + * Returns the socket protocol + * + * @return Socket protocol + */ + int protocol() const { return IPPROTO_TCP; } + + /** + * Returns the socket family + * + * @return Socket family + */ + int family() const { return family_; } + + /** + * Creates an instance of tcp representing + * an IPv4 socket + * + * @return IPv4 tcp object + */ + static tcp v4() { return tcp(PF_INET); } + + /** + * Creates an instance of tcp representing + * an IPv6 socket + * + * @return IPv6 tcp object + */ + static tcp v6() { return tcp(PF_INET6); } + +private: + explicit tcp(int family) + : family_(family) + {} + + int family_; +}; + +/** + * The udp class represents all + * settings to handle udp socket + * connections + */ +class OOS_NET_API udp +{ +public: + typedef peer_base peer; /**< Shortcut to a udp based peer */ + typedef socket_stream socket; /**< Shortcut to a udp based socket */ + typedef socket_acceptor acceptor; /**< Shortcut to a udp based acceptor */ + typedef address_resolver resolver; /**< Shortcut to a udp based address resolver */ + + /** + * Returns the type of the socket + * + * @return Type of the socket + */ + int type() const { return SOCK_DGRAM; } + + /** + * Returns the socket protocol + * + * @return Socket protocol + */ + int protocol() const { return IPPROTO_UDP; } + + /** + * Returns the socket family + * + * @return Socket family + */ + int family() const { return family_; } + + /** + * Creates an instance of udp representing + * an IPv4 socket + * + * @return IPv4 udp object + */ + static udp v4() { return udp(PF_INET); } + + /** + * Creates an instance of udp representing + * an IPv6 socket + * + * @return IPv6 udp object + */ + static udp v6() { return udp(PF_INET6); } + +private: + explicit udp(int family) + : family_(family) + {} + + int family_; +}; + + +} + +#endif //MATADOR_IP_HPP diff --git a/include/matador/net/os.hpp b/include/matador/net/os.hpp new file mode 100644 index 0000000..d7a6acd --- /dev/null +++ b/include/matador/net/os.hpp @@ -0,0 +1,45 @@ +#ifndef MATADOR_NET_OS_HPP +#define MATADOR_NET_OS_HPP + +#include "matador/net/export.hpp" + +#include + +#if _WIN32 +#include +#endif + +#ifdef _WIN32 + using socket_type = SOCKET; +#else + using socket_type = int; +#endif + +namespace matador { +namespace net { + +OOS_NET_API void init(); +OOS_NET_API void cleanup(); + +} + +bool is_valid_socket(socket_type fd); + +namespace os { + +OOS_NET_API int inet_pton(int af, const char *src, void *dst); +OOS_NET_API const char* inet_ntop(int af, const void* src, char* dst, size_t size); + +enum class shutdown_type +{ + READ = 0, + WRITE = 1, + READ_WRITE = 2 +}; + +OOS_NET_API int shutdown(socket_type fd, shutdown_type type); +OOS_NET_API int close(socket_type fd); + +} +} +#endif //MATADOR_NET_OS_HPP diff --git a/include/matador/net/peer.hpp b/include/matador/net/peer.hpp new file mode 100644 index 0000000..aa86a2d --- /dev/null +++ b/include/matador/net/peer.hpp @@ -0,0 +1,203 @@ +#ifndef MATADOR_PEER_HPP +#define MATADOR_PEER_HPP + +#include "matador/net/address.hpp" + +#ifdef _WIN32 +//#include +#else +#include +#endif + +#include + +namespace matador { + +/** + * The peer_base class acts like the holder + * of network endpoint information like socket + * address and port. The template argument + * sets the protocol type either TCP or UDP. + * + * @tparam P Protocol type + */ +template < class P > +class peer_base +{ +public: + typedef P protocol_type; /**< Short to protocol type */ + + /** + * Default constructor + */ + peer_base() = default; + + /** + * Creates a peer from a given address. Port is set + * to zero. + * + * @param addr Address to create the peer from + */ + explicit peer_base(address addr) + : addr_(std::move(addr)) + {} + + /** + * Creates a peer from a given address and port + * + * @param addr Address to create the peer from + * @param port Port of the endpoint + */ + peer_base(address addr, unsigned short port) + : addr_(std::move(addr)) + { + addr_.port(port); + } + + /** + * Copy creates a peer from the given peer + * + * @param x Peer to copy from + */ + peer_base(const peer_base &x) + : addr_(x.addr_) + {} + + /** + * Move creates a peer from a given peer + * + * @param x Peer to move from + */ + peer_base(peer_base &&x) noexcept + : addr_(std::move(x.addr_)) + {} + + /** + * Copy assigns a given peer to this peer + * + * @param x Peer to assign + * @return The assigned peer + */ + peer_base& operator=(const peer_base &x) + { + addr_ = x.addr_; + return *this; + } + + /** + * Assign moves the given peer to this peer + * + * @param x The peer to move assign + * @return The moved peer + */ + peer_base& operator=(peer_base &&x) noexcept + { + addr_ = std::move(x.addr_); + return *this; + } + + /** + * Destructor + */ + ~peer_base() = default; + + /** + * Returns the current port of the peer. + * + * @return The current port + */ + int port() const { return addr_.port(); } + + /** + * Returns the current IP protocol of the peer + * address which is either IPv4 or IPv6 + * + * @return The current IP protocol + */ + protocol_type protocol() const + { + if (addr_.is_v4()) { + return protocol_type::v4(); + } else { + return protocol_type::v6(); + } + } + + /** + * Returns the raw pointer to the sockaddr structure + * + * @return The raw pointer to the sockaddr structure + */ + sockaddr* data() + { + return addr_.addr(); + } + + /** + * Returns the raw pointer to the sockaddr structure + * + * @return The raw pointer to the sockaddr structure + */ + const sockaddr* data() const + { + return addr_.addr(); + } + + /** + * Returns the size of the underlying sockaddr structure + * + * @return The size of the underlying sockaddr structure + */ + size_t size() const + { + return addr_.size(); + } + + /** + * Returns a reference to the address + * + * @return A reference to the address + */ + address& addr() + { + return addr_; + } + + /** + * Returns a reference to the address + * + * @return A reference to the address + */ + const address& addr() const + { + return addr_; + } + + /** + * Converts the peer endpoint to a string in + * the format [ip]:[port] + * + * @return Returns a string representation of the peer + */ + std::string to_string() const + { + char addstr[INET6_ADDRSTRLEN + 8]; + const char *name; + if (addr().is_v4()) { + name = os::inet_ntop(addr_.addr()->sa_family, &addr_.addr_v4()->sin_addr, addstr, INET6_ADDRSTRLEN); + } else { + name = os::inet_ntop(addr_.addr()->sa_family, &addr_.addr_v6()->sin6_addr, addstr, INET6_ADDRSTRLEN); + } + + size_t pos = strlen(name); + + snprintf(addstr+pos, INET6_ADDRSTRLEN+8-pos, ":%d", addr_.port()); + return addstr; + } + +private: + address addr_; +}; + +} +#endif //MATADOR_PEER_HPP diff --git a/include/matador/net/reactor.hpp b/include/matador/net/reactor.hpp new file mode 100644 index 0000000..03a8a7b --- /dev/null +++ b/include/matador/net/reactor.hpp @@ -0,0 +1,117 @@ +#ifndef REACTOR_HPP +#define REACTOR_HPP + +#include "matador/net/event_type.hpp" +#include "matador/net/select_fd_sets.hpp" +#include "matador/net/socket_interrupter.hpp" + +#include "matador/utils/leader_follower_thread_pool.hpp" + +#include +#include +#include +#include +#include +#include +#include + +namespace matador::net { + +class handler; + +class reactor { +public: + using handler_ptr = std::shared_ptr; + using handler_weak_ptr = std::weak_ptr; + + struct Statistics { + std::atomic total_events_handled_{0}; + std::atomic read_events_{0}; + std::atomic write_events_{0}; + std::atomic timer_events_{0}; + std::atomic errors_{0}; + }; + + struct HandlerEntry { + explicit HandlerEntry(handler_ptr h, event_type et) + : handler(std::move(h)), events(et) {} + + handler_ptr handler; + event_type events; + time_t next_timeout{0}; + time_t interval{0}; + bool marked_for_deletion{false}; + }; + + reactor(); + ~reactor(); + + reactor(const reactor&) = delete; + reactor& operator=(const reactor&) = delete; + + void register_handler(const handler_ptr& h, event_type type); + void unregister_handler(const handler_ptr& h, event_type type); + void schedule_timer(const handler_ptr& h, time_t offset, time_t interval); + void cancel_timer(const handler_ptr& h); + void run(); + void handle_events(); + void shutdown(); + bool is_running() const { return running_; } + select_fdsets fdsets() const; + void mark_handler_for_delete(const handler_ptr& h); + void activate_handler(const handler_ptr& h, event_type ev); + void deactivate_handler(const handler_ptr& h, event_type ev); + const Statistics& get_statistics() const { return stats_; } + +private: + struct TimerEvent { + time_t timeout; + handler_weak_ptr handler; + + bool operator>(const TimerEvent& other) const { + return timeout > other.timeout; + } + }; + + using handler_map = std::unordered_map>; + using timer_queue = std::priority_queue, std::greater<>>; + + void prepare_select_bits(time_t& timeout, select_fdsets& fd_sets) const; + bool process_events(const select_fdsets& fd_sets, time_t now); + void process_timers(time_t now); + void cleanup_deleted_handlers(); + void handle_error(const std::string& operation, const handler_ptr& h); + int perform_select(struct timeval* timeout, select_fdsets& fd_sets); + bool check_interruption(const select_fdsets& fd_sets); + + void safe_handler_operation(const handler_ptr& h, const std::string& op_name, + const std::function& operation); + + HandlerEntry* find_handler_entry(const handler_ptr& h); + void remove_handler_entry(const handler_ptr& h); + void update_handler_events(HandlerEntry* entry, event_type ev, bool activate); + +private: + handler_map handlers_; + timer_queue timers_; + + std::atomic running_{false}; + std::atomic shutdown_requested_{false}; + + mutable std::shared_mutex handlers_mutex_; + std::mutex timers_mutex_; + std::condition_variable shutdown_cv_; + + logger log_; + Statistics stats_; + + leader_follower_thread_pool thread_pool_; + socket_interrupter interrupter_; + + static constexpr std::chrono::seconds CLEANUP_INTERVAL{60}; + time_t last_cleanup_{0}; +}; + +} // namespace matador + +#endif //REACTOR_HPP diff --git a/include/matador/net/select_fd_sets.hpp b/include/matador/net/select_fd_sets.hpp new file mode 100644 index 0000000..5586149 --- /dev/null +++ b/include/matador/net/select_fd_sets.hpp @@ -0,0 +1,132 @@ +#ifndef MATADOR_SELECT_FDSETS_HPP +#define MATADOR_SELECT_FDSETS_HPP + +#include "matador/net/fdset.hpp" + +namespace matador { + +/** + * This class represents three fd sets + * needed for the system call to select. + */ +class select_fdsets +{ +public: + /** + * These enum values are used + * to identify and access the three + * different fd sets + */ + typedef enum { + read_type = 0, /**< Enum value for the read fd set */ + write_type = 1, /**< Enum value for the write fd set */ + except_type /**< Enum value for the exceptional fd set */ + } fdset_type; + + /** + * Returns the highest fd value over + * all three fd sets plus one. This is used + * by the system call to select. + * + * @return Highest fd value over all sets + */ + socket_type maxp1() const; + + /** + * Returns the fdset identified by the given type + * + * @param type Requested fdset type + * @return The requested fd set + */ + fdset& fd_set(fdset_type type); + + /** + * Returns the read fd set. + * + * @return The read fd set. + */ + fdset& read_set(); + + /** + * Returns the read fd set. + * + * @return The read fd set. + */ + const fdset& read_set() const; + + /** + * Returns the write fd set. + * + * @return The write fd set. + */ + fdset& write_set(); + + /** + * Returns the write fd set. + * + * @return The write fd set. + */ + const fdset& write_set() const; + + /** + * Returns the exceptional fd set. + * + * @return The exceptional fd set. + */ + fdset& except_set(); + + /** + * Returns the exceptional fd set. + * + * @return The exceptional fd set. + */ + const fdset& except_set() const; + + /** + * Reset all bits in all three sets to zero + */ + void reset(); + + /** + * Resets the fd set identified by the + * given type + * + * @param type Type of set to reset + */ + void reset(fdset_type type); + + /** + * Checks if the given fd is set in the fdset + * identified by the given type. If fd + * is set true returned. + * + * @param type Requested fd set type + * @param fd fd to check + * @return True if fd is set in fdset + */ + bool is_set(fdset_type type, int fd) const; + + /** + * Clears the given fd is set in the fdset + * identified by the given type. + * + * @param type Requested fd set type + * @param fd fd to clear + */ + void clear(fdset_type type, int fd); + + /** + * Sets the bit for the given fd is set + * in the fdset identified by the given type. + * + * @param type Requested fd set type + * @param fd fd to set + */ + void set(fdset_type type, int fd); + +private: + fdset fdsets_[3]; +}; + +} +#endif //MATADOR_SELECT_FDSETS_HPP diff --git a/include/matador/net/socket.hpp b/include/matador/net/socket.hpp new file mode 100644 index 0000000..8bbbdda --- /dev/null +++ b/include/matador/net/socket.hpp @@ -0,0 +1,168 @@ +#ifndef MATADOR_SOCKET_HPP +#define MATADOR_SOCKET_HPP + +#include +#include + +#ifdef _WIN32 +#else +#include +#include +#include +#include +#endif + +#include + +namespace matador { + +/** + * Base class for several kind of socket + * classes (acceptor, stream) representing a + * socket. The protocol is selected via the + * template parameter (/sa tcp and udp classes) + * + * @tparam P Protocol type + */ +template < class P > +class socket_base +{ +public: + typedef P protocol_type; /**< Shortcut to the protocol type */ + typedef typename P::peer peer_type; /**< Shortcut to the peer type */ + + /** + * Creates a socket for a specific given + * protocol + * + * @param protocol Initial protocol + */ + explicit socket_base(const protocol_type &protocol); + + /** + * Creates a socket with the given peer + * + * @param peer Peer used to initialize the socket + */ + explicit socket_base(const peer_type &peer); + + /** + * Opens a socket. On success a positive socket id (fd) + * is returned. In case of error -1 is returned and + * errno is set. + * + * @param protocol Protocol for which a socket is created + * @return The socket fd or -1 on error + */ + socket_type open(const protocol_type &protocol); + + /** + * Closes the open socket + */ + void close(); + + /** + * Returns true if the socket is open (created) + * + * @return True on open socket + */ + bool is_open() const; + + /** + * Releases the socket fd and sets + * the internal socket to zero (0). + * + * After the socket is released the user + * is in charge to take of the socket + * + * @return The released socket fd + */ + socket_type release(); + + /** + * Connect to the given peer. If the connection + * could be established true is returned, + * otherwise false is returned and errno is set. + * + * @param p Peer to execute to + * @return True on successful connection + */ + bool connect(const typename protocol_type::peer &p); + + /** + * Sets the socket either blocking (false) or + * non blocking (true). + * + * @param nb True sets the socket non blocking false blocking + */ + void non_blocking(bool nb); + + /** + * Returns true if the socket is non blocking + * otherwise returns false + * @return True if socket is non blocking + */ + bool non_blocking() const; + + /** + * Set or unset the cose on exec flag + * for the socket + * + * @param nb Flag to set or unset cloexec option + */ + void cloexec(bool nb); + + /** + * Returns true if close on exec option is set + * + * @return True on set cloexec option + */ + bool cloexec() const; + + /** + * Sets a socket option represented by name. If option + * was successfully set true is returned. Otherwise false + * and errno ist set. + * + * @param name Option name + * @param value Flag to set or unset the option + * @return True on success + */ + bool options(int name, bool value); + + /** + * Returns the underlying socket fd + * + * @return Underlying socket fd + */ + socket_type id() const; + + /** + * Assigns the given socket fd to this + * socket. If the socket is already opened + * an exception is thrown. + * + * @param sock The socket fd to assign + */ + void assign(socket_type sock); + +protected: +/// @cond MATADOR_DEV + socket_base() = default; + ~socket_base() = default; + + socket_type open(int family, int type, int protocol); + + socket_type sock_ = 0; + std::string name_; +#ifdef _WIN32 + bool is_nonblocking_ = false; +#endif +/// @endcond +}; + +} + +#include "matador/net/socket.tpp" + +#endif //MATADOR_SOCKET_HPP diff --git a/include/matador/net/socket.tpp b/include/matador/net/socket.tpp new file mode 100644 index 0000000..037f86a --- /dev/null +++ b/include/matador/net/socket.tpp @@ -0,0 +1,257 @@ +#include "matador/net/socket.hpp" +#include "matador/net/socket_stream.hpp" +#include "matador/net/os.hpp" +#include "matador/net/error.hpp" +#include "matador/net/ip.hpp" + +#include + +namespace matador { + +#define throw_logic_error(msg) \ + do { \ + std::stringstream str; \ + str << msg; \ + throw std::logic_error(str.str()); \ + } while(false); + +template < class P > +socket_base

::socket_base(const protocol_type &protocol) +{ + open(protocol); +} + +template < class P > +socket_base

::socket_base(const peer_type &peer) +{ + open(peer.protocol()); +} + +template < class P > +socket_type socket_base

::open(const protocol_type &protocol) +{ + return open(protocol.family(), protocol.type(), protocol.protocol()); +} + +template < class P > +void socket_base

::close() +{ + if (sock_ <= 0) { + return; + } + //os::shutdown(sock_, os::shutdown_type::READ_WRITE); + os::close(sock_); + sock_ = 0; +} + +template < class P > +bool socket_base

::is_open() const +{ + return sock_ > 0; +} + +template < class P > +socket_type socket_base

::release() +{ + auto tmp_fd = sock_; + sock_ = 0; + return tmp_fd; +} + +template < class P > +bool socket_base

::connect(const typename protocol_type::peer &p) +{ + return ::connect(sock_, p.data(), static_cast(p.size())) == 0; +} + +template < class P > +void socket_base

::non_blocking(bool nb) +{ +#ifdef WIN32 + unsigned long nonblock = nb ? 0 : 1; + // fcntl doesn't do the right thing, but the similar ioctl does + // warning: is that still true? and does it the right thing for + // set blocking as well? + if (ioctlsocket(sock_, FIONBIO, &nonblock) != 0) { + throw std::logic_error("ioctlsocket: couldn't set flags"); + } + is_nonblocking_ = nb; +#else + int val = fcntl(sock_, F_GETFL, 0); + if (val < 0) { + throw std::logic_error("fcntl: couldn't get flags"); + } + + int flag = (nb ? O_NONBLOCK : 0); + if (fcntl(sock_, F_SETFL, val | flag) < 0) { + std::string err(strerror(errno)); + throw std::logic_error("fcntl: couldn't set flags (" + err + ")"); + } +#endif +} + +template < class P > +bool socket_base

::non_blocking() const +{ +#ifdef _WIN32 + return is_nonblocking_; +#else + int val = fcntl(sock_, F_GETFL, 0); + if (val < 0) { + std::string err(strerror(errno)); + throw std::logic_error("fcntl: couldn't get flags (" + err + ")"); + } + return (val & O_NONBLOCK) > 0; +#endif +} + +template < class P > +void socket_base

::cloexec(bool nb) +{ +#ifdef WIN32 + unsigned long cloexec = 1; + // fcntl doesn't do the right thing, but the simular ioctl does + // warning: is that still true? and does it the right thing for + // set blocking as well? + if (ioctlsocket(sock_, FIONBIO, &cloexec) != 0) { + throw std::logic_error("ioctlsocket: couldn't set flags"); + } +#else + int val = fcntl(sock_, F_GETFL, 0); + if (val < 0) { + throw std::logic_error("fcntl: couldn't get flags"); + } + + int flag = (nb ? FD_CLOEXEC : 0); + if (fcntl(sock_, F_SETFL, val | flag) < 0) { + std::string err(strerror(errno)); + throw std::logic_error("fcntl: couldn't set flags (" + err + ")"); + } +#endif +} + +template < class P > +bool socket_base

::cloexec() const +{ + int val = fcntl(sock_, F_GETFL, 0); + if (val < 0) { + std::string err(strerror(errno)); + throw std::logic_error("fcntl: couldn't get flags (" + err + ")"); + } + return (val & FD_CLOEXEC) > 0; +} + +template < class P > +bool socket_base

::options(int name, bool value) +{ + const char flag = static_cast(value ? 1 : 0); + return setsockopt(sock_, IPPROTO_TCP, name, &flag, sizeof(flag)) == 0; +} + +template < class P > +socket_type socket_base

::id() const +{ + return sock_; +} + +template < class P > +void socket_base

::assign(socket_type sock) +{ + if (is_open()) { + throw std::logic_error("couldn't assign: socket already opened"); + } + + struct sockaddr_in addr{}; + socklen_t addr_size = sizeof(struct sockaddr_in); + if (getpeername(sock, (struct sockaddr *)&addr, &addr_size) == 0) { + //char *clientip = new char[20]; + char s[INET6_ADDRSTRLEN]; + os::inet_ntop(addr.sin_family, &addr.sin_addr, s, sizeof s); +#ifdef _WIN32 +// strcpy_s(clientip, 20, s); +#else +// strcpy(clientip, s); +#endif + } + sock_ = sock; +} + +template < class P > +socket_type socket_base

::open(int family, int type, int protocol) +{ + sock_ = ::socket(family, type, protocol); + return sock_; +} + +template < class P > +socket_type connect(socket_stream

&stream, const char* hostname, unsigned short port) +{ + char portstr[6]; + sprintf(portstr, "%d", port); +// const char* portname = "daytime"; + struct addrinfo hints = {}; + memset(&hints,0,sizeof(hints)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + struct addrinfo* res = nullptr; + struct addrinfo* head = nullptr; + int err = getaddrinfo(hostname, portstr, &hints, &res); + if (err != 0) { + detail::throw_logic_error_with_gai_errno("failed to resolve local socket address: %s", err); + } + + head = res; + + socket_type conn_fd = 0; + socket_type ret = 0; + do { + conn_fd = ::socket(res->ai_family, res->ai_socktype, res->ai_protocol); + if (!is_valid_socket(conn_fd)) { + // error, try next one + continue; + } + + ret = ::connect(conn_fd, res->ai_addr, res->ai_addrlen); + if (ret == 0) { + // success + stream.assign(conn_fd); + break; +// } else { +// throw_logic_error("couldn't execute: " << strerror(errno)); + } + + // bind error, close and try next one + os::shutdown(conn_fd, os::shutdown_type::READ_WRITE); + } while ( (res = res->ai_next) != nullptr); + + if (res == nullptr) { + throw_logic_error("couldn't execute to " << hostname << ":" << port); + } + + freeaddrinfo(head); + + return ret; +} + +template < class P > +int connect(socket_stream

&stream, peer_base

endpoint) +{ + auto pt = endpoint.protocol(); + + auto fd = ::socket(pt.family(), pt.type(), pt.protocol()); + + if (!is_valid_socket(fd)) { + return static_cast(fd); + } + + auto ret = ::connect(fd, endpoint.data(), static_cast(endpoint.size())); + if (ret == 0) { + stream.assign(fd); + } else { + os::shutdown(fd, os::shutdown_type::READ_WRITE); + } + + return ret; +} + +} \ No newline at end of file diff --git a/include/matador/net/socket_acceptor.hpp b/include/matador/net/socket_acceptor.hpp new file mode 100644 index 0000000..d98c2cf --- /dev/null +++ b/include/matador/net/socket_acceptor.hpp @@ -0,0 +1,357 @@ +#ifndef MATADOR_SOCKET_ACCEPTOR_HPP +#define MATADOR_SOCKET_ACCEPTOR_HPP + +#include "matador/net/socket.hpp" + +namespace matador { + +/** + * The socket acceptor class provides the + * base functionality of the socket base class + * plus acceptor specific functions like + * listen, bind or accept + * + * @tparam P Socket protocol type + */ +template < class P > +class socket_acceptor : public socket_base

+{ +public: + typedef socket_base

base; /**< Shortcut to base socket type */ + typedef socket_stream

stream_type; /**< Shortcut to socket stream type */ + typedef typename base::protocol_type protocol_type; /**< Shortcut to protocol type */ + typedef typename base::peer_type peer_type; /**< Shortcut to peer type */ + + /** + * Default constructor + */ + socket_acceptor() = default; + + /** + * Constructs a socket acceptor for + * the given peer + * + * @param peer Peer to construct an acceptor from + */ + explicit socket_acceptor(peer_type &peer); + + /** + * Constructs an acceptor for given hostname and port + * + * @param hostname Hostname of the accepting endpoint + * @param port Portnumber of the accepting endpoint + */ + socket_acceptor(const char* hostname, unsigned short port); + + /** + * Creates a listening socket and binds + * the given hostname and port to it + * + * Returns zero (0) on success and -1 on error + * with errno set + * + * @param hostname Hostname to bind + * @param port Portnumber to bind + * @return Returns zero (0) on success. + */ + int bind(const char* hostname, unsigned short port); + + /** + * Creates a listening socket and binds + * the given peer endpoint to it + * + * Returns zero (0) on success and -1 on error + * with errno set + * + * @param peer Binds the given peer endpoint to the socket + * @return Returns zero (0) on success. + */ + int bind(peer_type &peer); + + /** + * Start listening to the bound endpoint using + * the internally created socket. + * + * Returns zero (0) on success and -1 on error + * with errno set + * + * @param backlog Number of backlog + * @return Returns zero (0) on success. + */ + int listen(int backlog); + + /** + * Returns a pointer to the underlying + * concrete internet address of the given + * socket address structure + * + * @param sa Socket address + * @return Pointer to the internet address + */ + void* get_in_addr(struct sockaddr *sa); + + /** + * Returns the port number of the given + * socket address structure + * + * @param sa Socket address + * @return The port number + */ + unsigned short get_port(struct sockaddr *sa); + + /** + * Get the remote address and port as string + * representation. + * + * @param remote_addr Remote socket address structure + * @return String representation of the remote address + */ + std::string get_remote_address(struct sockaddr_storage &remote_addr); + + /** + * Accept a connection and assign the socket descriptor + * to the given socket stream. + * + * Once the descriptor is assigned to the stream it + * can be used to read and write data to it. + * + * @param stream Stream to use after connection was accepted + * @return The fd of the new connection + */ + int accept(stream_type &stream); + + /** + * Accept a connection and assign the socket descriptor + * to the given socket stream. + * + * Once the descriptor is assigned to the stream it + * can be used to read and write data to it. + * + * The given peer endpoint is filled with the + * address information of the remote endpoint. + * + * @param stream Stream to use after connection was accepted + * @param endpoint Will be filled with the remote endpoint information + * @return The fd of the new connection + */ + int accept(stream_type &stream, peer_type &endpoint); + + /** + * Sets or clears the reuse address flag. If the + * given value is true the flag is set otherwise the + * flag is cleared. + * + * @param reuse Indicates if the reuse address flag should be set + * @return 0 if setting was successful, -1 on error + */ + int reuse_address(bool reuse); + + /** + * Returns true if the flag is set otherwise + * false is returned. + * + * @return True if flag is set + */ + bool reuse_address() const; +}; + +/// @cond MATADOR_DEV +template < class P > +socket_acceptor

::socket_acceptor(peer_type &peer) +: socket_base

(peer) +{ + bind(peer); +} + +template < class P > +socket_acceptor

::socket_acceptor(const char* hostname, unsigned short port) +{ + bind(hostname, port); +} + +template < class P > +int socket_acceptor

::bind(const char* hostname, unsigned short port) +{ + char portstr[6]; + sprintf(portstr, "%d", port); +// const char* portname = "daytime"; + struct addrinfo hints = {}; + memset(&hints,0,sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = 0; + hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; + struct addrinfo* res = nullptr; + struct addrinfo* head = nullptr; + int err = getaddrinfo(hostname, portstr, &hints, &res); + if (err != 0) { + throw_logic_error("failed to resolve local socket address (error: " << err << ")"); + } + + head = res; + + int listenfd = 0; + int ret = 0; + const int on = 1; + do { + listenfd = ::socket(res->ai_family, res->ai_socktype, res->ai_protocol); + if (listenfd < 0) { + // error, try next one + continue; + } + + if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) { + throw std::logic_error(strerror(errno)); + } + + ret = ::bind(listenfd, res->ai_addr, res->ai_addrlen); + if (ret == 0) { + // success + this->assign(listenfd); + break; + } else { + throw_logic_error("couldn't bind to " << hostname << ":" << port << ": " << strerror(errno)); + } + + // bind error, close and try next one + this->close(); + } while ( (res = res->ai_next) != nullptr); + + if (res == nullptr) { + throw_logic_error("couldn't bind to " << hostname << ":" << port); + } + + freeaddrinfo(head); + + return ret; +} + +template < class P > +int socket_acceptor

::bind(peer_type &peer) +{ + socket_type listen_fd = ::socket(peer.protocol().family(), peer.protocol().type(), peer.protocol().protocol()); + if (!is_valid_socket(listen_fd)) { + // error, try next one + return static_cast(listen_fd); + } + +#ifdef _WIN32 + const char on = 1; +#else + const int on = 1; +#endif + if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) { + detail::throw_logic_error_with_errno("setsockopt error: %s", errno); + } + + int ret = ::bind(listen_fd, peer.data(), static_cast(peer.size())); + if (ret == 0) { + // success + this->assign(listen_fd); + } else { + detail::throw_logic_error_with_errno("couldn't bind fd: %s", errno); + } + size_t s = peer.size(); + ret = getsockname(this->id(), peer.data(), (socklen_t*)&s); + return ret; +} + +template < class P > +int socket_acceptor

::listen(int backlog) +{ + return ::listen(this->id(), backlog); +} + +template < class P > +void* socket_acceptor

::get_in_addr(struct sockaddr *sa) +{ + if (sa->sa_family == AF_INET) { + return &(((struct sockaddr_in*)sa)->sin_addr); + } else { + return &(((struct sockaddr_in6*)sa)->sin6_addr); + } +} + +template < class P > +unsigned short socket_acceptor

::get_port(struct sockaddr *sa) +{ + if (sa->sa_family == AF_INET) { + return ntohs(((struct sockaddr_in*)sa)->sin_port); + } else { + return ntohs(((struct sockaddr_in6*)sa)->sin6_port); + } +} + +template < class P > +std::string socket_acceptor

::get_remote_address(struct sockaddr_storage &remote_addr) +{ + char s[INET6_ADDRSTRLEN]; + + os::inet_ntop(remote_addr.ss_family, + get_in_addr((struct sockaddr *)&remote_addr), + s, sizeof s); + + std::stringstream ra; + + ra << s << ":" << get_port((struct sockaddr *)&remote_addr); + return ra.str(); +} + +template < class P > +int socket_acceptor

::accept(stream_type &stream) +{ + struct sockaddr_storage remote_addr = {}; +// address_type remote_addr; + socklen_t addrlen = sizeof(remote_addr); + auto fd = ::accept(this->id(), (struct sockaddr *)&remote_addr, &addrlen); + + if (is_valid_socket(fd)) { + stream.assign(fd); + stream.non_blocking(true); + stream.cloexec(true); +// } else { +// detail::throw_logic_error_with_errno("accept failed: %s", errno); + } + + return static_cast(fd); +} + +template +int socket_acceptor

::accept(stream_type &stream, peer_type &endpoint) +{ + auto addr_len = static_cast(endpoint.size()); + auto fd = ::accept(this->id(), endpoint.data(), &addr_len); + + if (is_valid_socket(fd)) { + stream.assign(fd); + stream.non_blocking(true); + stream.cloexec(true); +// } else { +// detail::throw_logic_error_with_errno("accept failed: %s", errno); + } + + return static_cast(fd); +} + +template < class P > +int socket_acceptor

::reuse_address(bool reuse) +{ + const int option(reuse ? 1 : 0); + return setsockopt(this->id(), SOL_SOCKET, SO_REUSEADDR, (char*)&option, sizeof(option)); +} + +template < class P > +bool socket_acceptor

::reuse_address() const +{ + size_t option {}; + socklen_t i; + i = sizeof(option); + getsockopt(this->id(), SOL_SOCKET, SO_REUSEADDR, (char*)&option, &i); + return option > 0; +} + +/// @endcond + +} + +#endif //MATADOR_SOCKET_ACCEPTOR_HPP diff --git a/include/matador/net/socket_interrupter.hpp b/include/matador/net/socket_interrupter.hpp new file mode 100644 index 0000000..ce0dac1 --- /dev/null +++ b/include/matador/net/socket_interrupter.hpp @@ -0,0 +1,38 @@ +#ifndef MATADOR_SOCKET_INTERRUPTER_HPP +#define MATADOR_SOCKET_INTERRUPTER_HPP + +#include "matador/net/ip.hpp" + +// #include "matador/logger/logger.hpp" + +#include + +namespace matador::net { + +/// @cond MATADOR_DEV + +class OOS_NET_API socket_interrupter +{ +public: + socket_interrupter(); + ~socket_interrupter(); + + socket_type socket_id() const; + + void interrupt(); + bool reset(); + +private: + matador::tcp::socket server_; + matador::tcp::socket client_; + + // matador::logger log_; + + std::array indicator_ = { { 0 } }; + std::array consumer_ = {}; +}; +/// @endcond + +} + +#endif //MATADOR_SOCKET_INTERRUPTER_HPP diff --git a/include/matador/net/socket_stream.hpp b/include/matador/net/socket_stream.hpp new file mode 100644 index 0000000..0c4ad0c --- /dev/null +++ b/include/matador/net/socket_stream.hpp @@ -0,0 +1,95 @@ +#ifndef MATADOR_SOCKET_STREAM_HPP +#define MATADOR_SOCKET_STREAM_HPP + +#ifdef _WIN64 +#define ssize_t __int64 +#elif _WIN32 +#define ssize_t long +#endif + +#include "matador/net/socket.hpp" + +namespace matador { + +/** + * The class represents a read write socket. It is + * independent to the protocol type (UDP or TCP). + * + * It provides to methods receive and send to read + * or write data to an open socket. + * + * @tparam P Protocol type of the socket + */ +template < class P > +class socket_stream : public socket_base

+{ +public: + typedef socket_base

base; /**< Shortcut to socket base class */ + typedef typename base::protocol_type protocol_type; /**< Shortcut to protocol type */ + typedef typename base::peer_type peer_type; /**< Shortcut to the peer type */ + + /** + * Creates an uninitialized socket stream + */ + socket_stream() = default; + + /** + * Creates a socket stream for the given + * protocol type. + * + * @param protocol Type of the protocol + */ + explicit socket_stream(const protocol_type &protocol); + + /** + * Receives data from the underlying open socket. + * All received data is stored in the given buffer. + * The limit of the data to receive is the size of + * the buffer + * + * Number of bytes is returned. If returned value is zero (0) + * the socket was closed. If the return value is -1 an error + * occurred or the socket was blocked (in case of nonblocking) + * The concrete error code and message can be retrieved with + * errno. + * + * @tparam Buffer type of the buffer object. + * @param buffer The buffer object. + * @return The number of bytes received, -1 on error or 0 on close + */ + template < class Buffer > + ssize_t receive(Buffer &buffer); + + /** + * + * @tparam Buffer type of the buffer object. + * @param buffer The buffer object. + * @return The number of bytes sent, -1 on error or 0 on close + */ + template < class Buffer > + ssize_t send(const Buffer &buffer); +}; + +/// @cond MATADOR_DEV +template < class P > +socket_stream

::socket_stream(const protocol_type &protocol) +: base(protocol) +{} + +template < class P > +template < class Buffer > +ssize_t socket_stream

::receive(Buffer &buffer) +{ + return ::recv(this->id(), buffer.data(), static_cast(buffer.capacity()), 0); +} + +template < class P > +template < class Buffer > +ssize_t socket_stream

::send(const Buffer &buffer) +{ + return ::send(this->id(), buffer.data(), static_cast(buffer.size()), 0); +} +/// @endcond +} + +#endif //MATADOR_SOCKET_STREAM_HPP diff --git a/include/matador/net/stream_handler.hpp b/include/matador/net/stream_handler.hpp new file mode 100644 index 0000000..dfa55b9 --- /dev/null +++ b/include/matador/net/stream_handler.hpp @@ -0,0 +1,96 @@ +#ifndef MATADOR_STREAM_HANDLER_HPP +#define MATADOR_STREAM_HANDLER_HPP + +#include "matador/utils/buffer_view.hpp" + +#include "matador/logger/logger.hpp" + +#include "matador/net/export.hpp" +#include "matador/net/handler.hpp" +#include "matador/net/ip.hpp" +#include "matador/net/io_stream.hpp" + +#include +#include + +namespace matador { + +class handler_creator; + +/** + * The stream_handler class implements the + * handler and io_stream interface and is + * used with the io_service to handle socket + * connections more easily in comparison + * with the plain reactor boilerplate. + * + * Instances of the class are used internally + * within the io_service hiding the read and write + * wiring. The user just use the interface + * provided by the io_service to setup + * a server. + */ +class OOS_NET_API stream_handler : public handler, public io_stream +{ +public: + typedef std::function t_init_handler; /**< Shortcut to the initialize function */ + +public: + /** + * Creates a new stream_handler for the given socket. + * The acceptor is the link to the creation source where + * the given init function is called when the handler + * is initialized + * + * @param sock Socket to read and write on + * @param endpoint Endpoint of the connection + * @param creator Pointer to the creating handler class + * @param init_handler Initialize function + */ + stream_handler(tcp::socket sock, tcp::peer endpoint, handler_creator *creator, t_init_handler init_handler); + + void open() override; + socket_type handle() const override; + void on_input() override; + void on_output() override; + void on_except() override {} + void on_timeout() override {} + void on_close() override; + void close() override; + bool is_ready_write() const override; + bool is_ready_read() const override; + + void read(buffer_view buf, t_read_handler read_handler) override; + void write(std::list buffers, t_write_handler write_handler) override; + + void close_stream() override; + + tcp::socket &stream() override; + + std::string name() const override; + +private: + logger log_; + tcp::socket stream_; + tcp::peer endpoint_; + + std::string name_; + + buffer_view read_buffer_; + + std::list write_buffers_; + + handler_creator *creator_ = nullptr; + + t_init_handler init_handler_; + t_read_handler on_read_; + t_write_handler on_write_; + + std::atomic_bool is_ready_to_read_ { false }; + std::atomic_bool is_ready_to_write_ { false }; + + mutable std::mutex mutex_; +}; + +} +#endif //MATADOR_STREAM_HANDLER_HPP diff --git a/include/matador/object/collection.hpp b/include/matador/object/collection.hpp index 01e49dd..402c6dd 100644 --- a/include/matador/object/collection.hpp +++ b/include/matador/object/collection.hpp @@ -10,6 +10,7 @@ class collection { public: private: + std::vector data_; }; diff --git a/include/matador/utils/leader_follower_thread_pool.hpp b/include/matador/utils/leader_follower_thread_pool.hpp index 748b13d..3569ec6 100644 --- a/include/matador/utils/leader_follower_thread_pool.hpp +++ b/include/matador/utils/leader_follower_thread_pool.hpp @@ -16,14 +16,13 @@ namespace matador::utils { * This thread pool class implements the * leader follower pattern. */ -class leader_follower_thread_pool -{ -private: - leader_follower_thread_pool(const leader_follower_thread_pool &); - - leader_follower_thread_pool &operator=(const leader_follower_thread_pool &); - +class leader_follower_thread_pool { public: + leader_follower_thread_pool(const leader_follower_thread_pool&) = delete; + leader_follower_thread_pool& operator=(const leader_follower_thread_pool&) = delete; + leader_follower_thread_pool(leader_follower_thread_pool&&) = delete; + leader_follower_thread_pool& operator=(leader_follower_thread_pool&&) = delete; + /** * Creates a new leader follower thread pool instance * with the given thread pool size and given join @@ -34,7 +33,7 @@ public: * @param join_func Join function. */ template - leader_follower_thread_pool(std::size_t size, F join_func) + leader_follower_thread_pool(const std::size_t size, F join_func) : num_threads_(size), join_(join_func) , follower_(size) {} @@ -60,7 +59,7 @@ public: * * @return Number of threads. */ - std::size_t size() const; + [[nodiscard]] std::size_t size() const; /** * Shuts the thread pool down. @@ -81,14 +80,14 @@ public: * * @return Number of follower threads. */ - std::size_t num_follower() const; + [[nodiscard]] std::size_t num_follower() const; /** * Returns true if the thread pool is running. * * @return True if thread pool is running. */ - bool is_running() const; + [[nodiscard]] bool is_running() const; private: /* diff --git a/include/matador/utils/uuid.hpp b/include/matador/utils/uuid.hpp index 3624bc7..7c25e5b 100644 --- a/include/matador/utils/uuid.hpp +++ b/include/matador/utils/uuid.hpp @@ -41,7 +41,7 @@ template <> struct std::hash { std::size_t operator()(const matador::utils::uuid& u) const noexcept { std::size_t h = 0; - for (uint32_t val : u.data()) { + for (const uint32_t val : u.data()) { h ^= std::hash{}(val) + 0x9e3779b9 + (h << 6) + (h >> 2); } return h; diff --git a/source/core/CMakeLists.txt b/source/core/CMakeLists.txt index 2eaf6bb..6481a65 100644 --- a/source/core/CMakeLists.txt +++ b/source/core/CMakeLists.txt @@ -1,4 +1,10 @@ add_library(matador-core STATIC + ../../include/matador/net/event_type.hpp + ../../include/matador/net/handler.hpp + ../../include/matador/net/os.hpp + ../../include/matador/net/reactor.hpp + ../../include/matador/net/select_fd_sets.hpp + ../../include/matador/net/socket_interrupter.hpp ../../include/matador/object/attribute_definition_generator.hpp ../../include/matador/object/attribute_definition.hpp ../../include/matador/object/basic_object_info.hpp diff --git a/source/core/net/acceptor.cpp b/source/core/net/acceptor.cpp new file mode 100644 index 0000000..3b4a85c --- /dev/null +++ b/source/core/net/acceptor.cpp @@ -0,0 +1,119 @@ +#include +#include "matador/net/acceptor.hpp" +#include "matador/net/reactor.hpp" + +#include "matador/logger/log_manager.hpp" + +namespace matador { + +acceptor::acceptor() + : log_(matador::create_logger("Acceptor")) +{} + +acceptor::acceptor(tcp::peer endpoint) +: endpoint_(std::move(endpoint)) +, log_(matador::create_logger("Acceptor")) +{} + +acceptor::acceptor(tcp::peer endpoint, t_accept_handler make_handler) + : endpoint_(std::move(endpoint)) + , accept_handler_(std::move(make_handler)) + , log_(matador::create_logger("Acceptor")) +{} + +acceptor::~acceptor() +{ + acceptor_.close(); +} + +void acceptor::accecpt(acceptor::t_accept_handler on_new_connection) +{ + accept_handler_ = std::move(on_new_connection); +} + +void acceptor::accecpt(const tcp::peer &endpoint, acceptor::t_accept_handler on_new_connection) +{ + endpoint_ = endpoint; + accecpt(std::move(on_new_connection)); +} + +void acceptor::open() +{ + acceptor_.bind(endpoint_); + acceptor_.listen(10); + name_ += " (fd: " + std::to_string(acceptor_.id()) + ")"; + log_.debug("fd %d: accepting connections", handle()); +} + +socket_type acceptor::handle() const +{ + return acceptor_.id(); +} + +void acceptor::on_input() +{ + tcp::socket sock; + + tcp::peer endpoint = create_client_endpoint(); + log_.debug("fd %d: accepting connection ...", handle()); + int ret = acceptor_.accept(sock, endpoint); + + if (ret < 0) { + char error_buffer[1024]; + os::strerror(errno, error_buffer, 1024); + log_.error("accept failed: %s", error_buffer); + } else { + // create new client handler + log_.debug("accepted connection from %s", endpoint.to_string().c_str()); + + auto h = accept_handler_(sock, endpoint, this); + + get_reactor()->register_handler(h, event_type::READ_WRITE_MASK); + + log_.debug("fd %d: accepted socket id %d", handle(), sock.id()); + } +} + +void acceptor::close() +{ + log_.debug("closing acceptor %d", acceptor_.id()); + acceptor_.close(); + // Todo: unregister from reactor (maybe observer pattern?) + // notify() +} + +bool acceptor::is_ready_write() const +{ + return false; +} + +bool acceptor::is_ready_read() const +{ + return handle() > 0; +} + +const tcp::peer &acceptor::endpoint() const +{ + return endpoint_; +} + +tcp::peer acceptor::create_client_endpoint() const +{ + if (endpoint_.addr().is_v4()) { + return matador::tcp::peer(address::v4::empty()); + } else { + return matador::tcp::peer(address::v6::empty()); + } +} + +void acceptor::notify_close(handler *) +{ + +} + +std::string acceptor::name() const +{ + return name_; +} + +} \ No newline at end of file diff --git a/source/core/net/address.cpp b/source/core/net/address.cpp new file mode 100644 index 0000000..c1fcfe4 --- /dev/null +++ b/source/core/net/address.cpp @@ -0,0 +1,154 @@ +#include "matador/net/address.hpp" + +#ifdef _WIN32 +#else +#include +#include +#endif + +#include + +namespace matador { + +const char* address_router::IP6ADDR_MULTICAST_ALLNODES = "FF02::1"; + +address::address(const sockaddr_in &addr) + : size_(sizeof(sockaddr_in)) +{ + socket_address_.sa_in = addr; +} + +address::address(const sockaddr_in6 &addr) + : size_(sizeof(sockaddr_in6)) +{ + socket_address_.sa_in6 = addr; +} + +address& address::operator=(const address &x) +{ + if (this == &x) { + return *this; + } + clear(); + size_ = x.size_; + socket_address_ = x.socket_address_; + return *this; +} + +address::address(address &&x) noexcept + : socket_address_(x.socket_address_) + , size_(x.size_) +{ + x.size_ = 0; +} + +address& address::operator=(address &&x) noexcept +{ + if (this == &x) { + return *this; + } + clear(); + socket_address_ = x.socket_address_; + size_ = x.size_; + x.size_ = 0; + return *this; +} + +address::~address() +{ + clear(); +} + +unsigned int address::to_ulong() const +{ + if (socket_address_.sa_raw.sa_family == PF_INET) { + return socket_address_.sa_in.sin_addr.s_addr; + } else { + return 0; + //return reinterpret_cast(addr_)->sin6_addr; + } +} + +std::string address::to_string() const +{ + char addstr[INET6_ADDRSTRLEN]; + if (is_v4()) { + os::inet_ntop(socket_address_.sa_raw.sa_family, &socket_address_.sa_in.sin_addr, addstr, INET6_ADDRSTRLEN); + } else { + os::inet_ntop(socket_address_.sa_raw.sa_family, &socket_address_.sa_in6.sin6_addr, addstr, INET6_ADDRSTRLEN); + } + return std::string(addstr); +} + +void address::port(unsigned short pn) +{ + if (is_v4()) { + socket_address_.sa_in.sin_port = htons(pn); + } else { + socket_address_.sa_in6.sin6_port = htons(pn); + } +} + +unsigned short address::port() const +{ + if (is_v4()) { + return ntohs(socket_address_.sa_in.sin_port); + } else { + return ntohs(socket_address_.sa_in6.sin6_port); + } +} + +bool address::is_v4() const +{ + return socket_address_.sa_raw.sa_family == PF_INET; +} + +bool address::is_v6() const +{ + return socket_address_.sa_raw.sa_family == PF_INET6; +} + +sockaddr *address::addr() +{ + return &socket_address_.sa_raw; +} + +const sockaddr *address::addr() const +{ + return &socket_address_.sa_raw; +} + +sockaddr_in *address::addr_v4() +{ + return &socket_address_.sa_in; +} + +const sockaddr_in *address::addr_v4() const +{ + return &socket_address_.sa_in; +} + +sockaddr_in6 *address::addr_v6() +{ + return &socket_address_.sa_in6; +} + +const sockaddr_in6 *address::addr_v6() const +{ + return &socket_address_.sa_in6; +} + +socklen_t address::size() const +{ + return size_; +} + +void address::clear() +{ + if (is_v4()) { + memset(&socket_address_.sa_in, 0, sizeof(socket_address_.sa_in)); + } else { + memset(&socket_address_.sa_in6, 0, sizeof(socket_address_.sa_in6)); + } +} +} diff --git a/source/core/net/address_resolver.cpp b/source/core/net/address_resolver.cpp new file mode 100644 index 0000000..c6bd45c --- /dev/null +++ b/source/core/net/address_resolver.cpp @@ -0,0 +1,19 @@ +#include "matador/net/address_resolver.hpp" + +namespace matador { +namespace detail { + +template<> +int determine_socktype() +{ + return SOCK_STREAM; +} + +template<> +int determine_socktype() +{ + return SOCK_DGRAM; +} + +} +} \ No newline at end of file diff --git a/source/core/net/connector.cpp b/source/core/net/connector.cpp new file mode 100644 index 0000000..8f4db6e --- /dev/null +++ b/source/core/net/connector.cpp @@ -0,0 +1,84 @@ +#include "matador/logger/log_manager.hpp" + +#include "matador/net/connector.hpp" +#include "matador/net/reactor.hpp" + +#include "matador/utils/os.hpp" + +#include + +namespace matador { + +connector::connector() + : log_(matador::create_logger("Connector")) +{} + +connector::connector(t_connect_handler on_new_connection) + : connect_handler_(std::move(on_new_connection)) + , log_(matador::create_logger("Connector")) +{} + +void connector::connect(reactor &r, const std::vector &endpoints) +{ + endpoints_ = endpoints; + r.schedule_timer(shared_from_this(), 0, 3); +} + +void connector::connect(reactor &r, const std::vector &endpoints, t_connect_handler on_new_connection) +{ + connect_handler_ = std::move(on_new_connection); + connect(r, endpoints); +} + +socket_type connector::handle() const +{ + return 0; +} + +void connector::on_timeout() +{ + tcp::socket stream; + for (const auto &ep : endpoints_) { + if (!ep.addr().is_v4()) { + continue; + } + auto ret = matador::connect(stream, ep); + if (ret != 0) { + char error_buffer[1024]; + log_.error("couldn't establish connection to: %s", ep.to_string().c_str(), os::strerror(errno, error_buffer, 1024)); + continue; + } else { + log_.info("connection established to %s (fd: %d)", ep.to_string().c_str(), stream.id()); + } + stream.non_blocking(true); + auto h = connect_handler_(stream, ep, this); + + get_reactor()->register_handler(h, event_type::READ_WRITE_MASK); + get_reactor()->cancel_timer(shared_from_this()); + + break; + } + endpoints_.clear(); +} + +bool connector::is_ready_write() const +{ + return false; +} + +bool connector::is_ready_read() const +{ + return false; +} + +void connector::notify_close(handler *) +{ + +} + +std::string connector::name() const +{ + return "connector"; +} + +} \ No newline at end of file diff --git a/source/core/net/error.cpp b/source/core/net/error.cpp new file mode 100644 index 0000000..127e235 --- /dev/null +++ b/source/core/net/error.cpp @@ -0,0 +1,38 @@ +#include "matador/net/error.hpp" +#include "matador/utils/os.hpp" + +#include + +#ifdef _WIN32 +#include +#else +#include +#endif + +namespace matador { +namespace detail { + +void throw_logic_error(const char* msg) +{ + throw std::logic_error(msg); +} + +void throw_logic_error_with_errno(const char* msg, int err) +{ + char error_buffer[1024]; + os::strerror(err, error_buffer, 1024); + + char message_buffer[1024]; + os::sprintf(message_buffer, 1024, msg, error_buffer); + throw std::logic_error(message_buffer); +} + +void throw_logic_error_with_gai_errno(const char* msg, int err) +{ + char message_buffer[1024]; + os::sprintf(message_buffer, 1024, msg, gai_strerror(err)); + throw std::logic_error(message_buffer); +} + +} +} \ No newline at end of file diff --git a/source/core/net/fdset.cpp b/source/core/net/fdset.cpp new file mode 100644 index 0000000..917777b --- /dev/null +++ b/source/core/net/fdset.cpp @@ -0,0 +1,66 @@ +#include "matador/net/fdset.hpp" + +namespace matador { + +fdset::fdset() +{ + reset(); +} + +fdset::fdset(fdset&& x) noexcept +: max_fd_set_(std::move(x.max_fd_set_)) +, fd_set_(x.fd_set_) {} + +fdset& fdset::operator=(fdset&& x) noexcept +{ + max_fd_set_ = std::move(x.max_fd_set_); + fd_set_ = x.fd_set_; + return *this; +} + +// set all bits to zero +void fdset::reset() +{ + FD_ZERO(&fd_set_); + max_fd_set_.clear(); + max_fd_set_.insert(0); +} + +bool fdset::is_set(socket_type fd) const +{ + return FD_ISSET(fd, &fd_set_) > 0; +} + +void fdset::clear(socket_type fd) +{ + FD_CLR(fd, &fd_set_); + max_fd_set_.erase(fd); +} + +void fdset::set(socket_type fd) +{ + FD_SET(fd, &fd_set_); + max_fd_set_.insert(fd); +} + +socket_type fdset::maxp1() const +{ + return *max_fd_set_.begin(); +} + +size_t fdset::count() const +{ + return max_fd_set_.size() - 1; +} + +bool fdset::empty() const +{ + return count() == 0; +} + +fd_set* fdset::get() +{ + return &fd_set_; +} + +} diff --git a/source/core/net/handler.cpp b/source/core/net/handler.cpp new file mode 100644 index 0000000..5dba2c9 --- /dev/null +++ b/source/core/net/handler.cpp @@ -0,0 +1,48 @@ +#include "matador/net/handler.hpp" + +#include + +namespace matador { + +time_t handler::next_timeout() const +{ + return next_timeout_; +} + +time_t handler::interval() const +{ + return interval_; +} + +reactor *handler::get_reactor() const +{ + return reactor_; +} + +void handler::register_reactor(reactor *r) +{ + reactor_ = r; +} + +void handler::schedule(time_t offset, time_t interval) +{ + next_timeout_ = ::time(nullptr) + offset; + interval_ = interval; +} + +void handler::cancel_timer() +{ + next_timeout_ = 0; + interval_ = 0; +} + +void handler::calculate_next_timeout(time_t now) +{ + if (interval_ > 0) { + next_timeout_ = now + interval_; + } else { + next_timeout_ = 0; + } +} + +} diff --git a/source/core/net/io_service.cpp b/source/core/net/io_service.cpp new file mode 100644 index 0000000..905a180 --- /dev/null +++ b/source/core/net/io_service.cpp @@ -0,0 +1,31 @@ +#include "matador/net/io_service.hpp" + +#include "matador/logger/log_manager.hpp" + +namespace matador { + +io_service::io_service() + : log_(matador::create_logger("IOService")) +{} + +io_service::~io_service() +{ + reactor_.shutdown(); +} + +void io_service::run() +{ + reactor_.run(); +} + +bool io_service::is_running() const +{ + return reactor_.is_running(); +} + +void io_service::shutdown() +{ + reactor_.shutdown(); +} + +} \ No newline at end of file diff --git a/source/core/net/leader_follower_thread_pool.cpp b/source/core/net/leader_follower_thread_pool.cpp new file mode 100644 index 0000000..042fec4 --- /dev/null +++ b/source/core/net/leader_follower_thread_pool.cpp @@ -0,0 +1,106 @@ +#include "matador/net/leader_follower_thread_pool.hpp" + +#include "matador/utils/thread_helper.hpp" + +#include + +namespace matador { + +leader_follower_thread_pool::~leader_follower_thread_pool() +{ + shutdown(); +} + +void leader_follower_thread_pool::start() +{ + is_running_ = true; + for (std::size_t i = 0; i < num_threads_; ++i) { + threads_.emplace_back([this] { execute(); }); + } + log_.info("thread pool started with %d threads", num_threads_); +} + +void leader_follower_thread_pool::stop() +{ + is_running_ = false; +} + +void leader_follower_thread_pool::promote_new_leader() +{ + std::lock_guard l(mutex_); + + if (leader_ != std::this_thread::get_id()) { + return; + } + + leader_ = null_id; + + signal_ready_ = true; + condition_synchronizer_.notify_one(); +} + +std::size_t leader_follower_thread_pool::size() const +{ + return threads_.size(); +} + +void leader_follower_thread_pool::shutdown() +{ + { + const std::lock_guard l(mutex_); +// if (!is_running_) { +// return; +// } + stop(); + log_.info("shutting down; notifying all tasks"); + signal_shutdown_ = true; + condition_synchronizer_.notify_all(); + } + + std::for_each(threads_.begin(), threads_.end(), [](thread_vector_t::reference item) { + if (item.joinable()) { + item.join(); + } + }); +} + +std::thread::id leader_follower_thread_pool::leader() { + const std::lock_guard l(mutex_); + return leader_; +} + +std::size_t leader_follower_thread_pool::num_follower() const { + return follower_; +} + +bool leader_follower_thread_pool::is_running() const +{ + return is_running_; +} + +void leader_follower_thread_pool::execute() { + std::unique_lock l(mutex_); + while (is_running_) { + while (leader_ != null_id) { +// log_.info("thread <%d> waiting for synchronizer (leader %d)", +// acquire_thread_index(std::this_thread::get_id()), +// acquire_thread_index(leader_)); + condition_synchronizer_.wait(l, [this]() { return signal_ready_ || signal_shutdown_; }); + signal_ready_ = false; + if (!is_running_) { + return; + } + } + +// log_.info("new leader <%d> (thread <%d> is now follower)",acquire_thread_index(std::this_thread::get_id()) , acquire_thread_index(leader_)); + leader_ = std::this_thread::get_id(); + + l.unlock(); + + join_(); + +// log_.info("thread <%d> finished work", acquire_thread_index(std::this_thread::get_id())); + l.lock(); + } +} +} \ No newline at end of file diff --git a/source/core/net/os.cpp b/source/core/net/os.cpp new file mode 100644 index 0000000..48ce4fb --- /dev/null +++ b/source/core/net/os.cpp @@ -0,0 +1,84 @@ +#include "matador/net/os.hpp" + +#ifdef _WIN32 +#include +#else +#include +#include +#endif + +namespace matador { +namespace net { + +void init() +{ +#ifdef _WIN32 + WSADATA wsaData; // if this doesn't work + //WSAData wsaData; // then try this instead + + // MAKEWORD(1,1) for Winsock 1.1, MAKEWORD(2,0) for Winsock 2.0: + + if (WSAStartup(MAKEWORD(1,1), &wsaData) != 0) { + fprintf(stderr, "WSAStartup failed.\n"); + exit(1); + } +#endif +} + +void cleanup() +{ +#ifdef _WIN32 + WSACleanup(); +#endif +} + +} + +bool is_valid_socket(socket_type fd) { +#ifdef _WIN32 + return fd != INVALID_SOCKET; +#else + return fd >= 0; +#endif +} + +namespace os { + +int inet_pton(int af, const char *src, void *dst) +{ +#ifdef _WIN32 + return ::InetPton(af, src, dst); +#else + return ::inet_pton(af, src, dst); +#endif +} + +const char* inet_ntop(int af, const void* src, char* dst, size_t size) +{ +#ifdef _WIN32 + return ::InetNtop(af, const_cast(src), dst, size); +#else + return ::inet_ntop(af, src, dst, size); +#endif +} + +int close(socket_type fd) +{ +#ifdef _WIN32 + return ::closesocket(fd); +#else + return ::close(fd); +#endif +} + +int shutdown(socket_type fd, shutdown_type type) +{ +#ifdef _WIN32 + return ::shutdown(fd, static_cast(type)); +#else + return ::shutdown(fd, static_cast(type)); +#endif +} + +} +} diff --git a/source/core/net/reactor.cpp b/source/core/net/reactor.cpp new file mode 100644 index 0000000..957adc7 --- /dev/null +++ b/source/core/net/reactor.cpp @@ -0,0 +1,411 @@ +#include "matador/net/reactor.hpp" +#include "matador/net/handler.hpp" + +#include "matador/logger/log_manager.hpp" + +#include +#include +#include +#include +#include + +namespace matador { + +reactor::reactor() + : sentinel_(std::shared_ptr(nullptr)) + , log_(create_logger("Reactor")) + , thread_pool_(4, [this]() { handle_events(); }) +{ +} +reactor::~reactor() +{ + log_.debug("destroying reactor"); + thread_pool_.shutdown(); +} + +void reactor::register_handler(const handler_ptr& h, event_type et) +{ + h->register_reactor(this); + h->open(); + + std::lock_guard l(mutex_); + auto it = find_handler_type(h); + + if (it == handlers_.end()) { + handlers_.emplace_back(h, et); + } else if (it->first != h) { + throw std::logic_error("given handler isn't expected handler"); + } else { + it->second = it->second | et; + } + interrupt_without_lock(); +} + +void reactor::unregister_handler(const handler_ptr& h, event_type) +{ + std::lock_guard l(mutex_); + auto it = find_handler_type(h); + + if (it != handlers_.end()) { + (*it).first->close(); + handlers_.erase(it); + } + interrupt_without_lock(); +} + +void reactor::schedule_timer(const std::shared_ptr& h, time_t offset, time_t interval) +{ + h->register_reactor(this); + + std::lock_guard l(mutex_); + auto it = find_handler_type(h); + + if (it == handlers_.end()) { + handlers_.emplace_back(h, event_type::TIMEOUT_MASK); + } + + h->schedule(offset, interval); +} + +void reactor::cancel_timer(const std::shared_ptr& h) +{ + std::lock_guard l(mutex_); + auto it = find_handler_type(h); + + if (it != handlers_.end()) { + handlers_.erase(it); + } + + h->cancel_timer(); +} + +void reactor::run() +{ +// log_.info("start dispatching all clients"); + thread_pool_.start(); + + { +// log_.info("waiting for reactor shutdown"); + std::unique_lock l(mutex_); + shutdown_.wait(l, [this]() { return shutdown_requested_.load(); }); + cleanup(); + } +// log_.info("all clients dispatched; shutting down"); + thread_pool_.stop(); +} + +void reactor::handle_events() +{ +// std::cout << this << " start handle events\n" << std::flush; + +// log_.info("handle events"); + + running_ = true; + time_t timeout; + select_fdsets fd_sets; + prepare_select_bits(timeout, fd_sets); + +// std::cout << this << " fd sets r: " << fd_sets.read_set().count() << ", w: " << fd_sets.write_set().count() << ", e: " << fd_sets.except_set().count() << ", max: " << fd_sets.maxp1() << "\n" << std::flush; + +// log_.debug("fds [r: %d, w: %d, e: %d]", +// fdsets_.read_set().count(), +// fdsets_.write_set().count(), +// fdsets_.except_set().count()); + +// if (timeout != (std::numeric_limits::max)()) { +// log_.debug("next timeout in %d sec", timeout); +// } + struct timeval tselect{}; + struct timeval* p = nullptr; + if (timeout < (std::numeric_limits::max)()) { + tselect.tv_sec = timeout; + tselect.tv_usec = 0; + p = &tselect; +// std::cout << this << " next timeout in " << p->tv_sec << " seconds\n" << std::flush; + } + + if (!has_clients_to_handle(timeout, fd_sets)) { +// std::cout << this << " no clients to handle; returning\n" << std::flush; +// log_.info("no clients to handle, exiting"); + return; + } + + int ret; + while ((ret = select(p, fd_sets)) < 0) { +// std::cout << this << " select returned with error " << ret << "\n" << std::flush; + if(errno != EINTR) { + char error_buffer[1024]; + log_.warn("select failed: %s", os::strerror(errno, error_buffer, 1024)); + shutdown(); + } else { + return; + } + } + +// std::cout << this << " select returned with active requests " << ret << "\n" << std::flush; + + bool interrupted = is_interrupted(fd_sets); + + if (interrupted) { + if (!shutdown_requested_) { +// std::cout << this << " reactor was interrupted - promote new leader\n" << std::flush; +// log_.info("reactor was interrupted"); + thread_pool_.promote_new_leader(); + return; + } else { +// std::cout << this << " reactor was interrupted for shutdown\n" << std::flush; +// log_.info("shutting down"); +// cleanup(); + shutdown_.notify_one(); + return; + } + } + + time_t now = ::time(nullptr); + t_handler_type handler_type = resolve_next_handler(now, fd_sets); + + if (handler_type.first) { + deactivate_handler(handler_type.first, handler_type.second); +// std::cout << this << " handling client " << handler_type.first->name() << " - promoting new leader\n" << std::flush; + + thread_pool_.promote_new_leader(); +// log_.info("start handling event"); + // handle event + if (handler_type.second == event_type::WRITE_MASK) { + on_write_mask(handler_type.first); + } else if (handler_type.second == event_type::READ_MASK) { + on_read_mask(handler_type.first); + } else if (handler_type.second == event_type::TIMEOUT_MASK) { + on_timeout(handler_type.first, now); + } else { +// log_.info("unknown event type"); + } + activate_handler(handler_type.first, handler_type.second); + remove_deleted(); + } else { + // no handler found +// log_.info("no handler found"); + thread_pool_.promote_new_leader(); + } +} + +void reactor::shutdown() +{ + if (!is_running()) { + return; + } + // shutdown the reactor properly + log_.info("shutting down reactor"); + shutdown_requested_ = true; + interrupt(); +} + +bool reactor::is_running() const +{ + return running_; +} + +void reactor::prepare_select_bits(time_t& timeout, select_fdsets& fd_sets) const +{ + std::lock_guard l(mutex_); + fd_sets.reset(); + time_t now = ::time(nullptr); + timeout = (std::numeric_limits::max)(); + + // set interrupter fd + fd_sets.read_set().set(interrupter_.socket_id()); + + for (const auto &h : handlers_) { + if (h.first == nullptr) { + continue; + } + if (h.first->is_ready_read() && is_event_type_set(h.second, event_type::READ_MASK)) { + fd_sets.read_set().set(h.first->handle()); + } + if (h.first->is_ready_write() && is_event_type_set(h.second, event_type::WRITE_MASK)) { + fd_sets.write_set().set(h.first->handle()); + } + if (h.first->next_timeout() > 0 && is_event_type_set(h.second, event_type::TIMEOUT_MASK)) { + timeout = (std::min)(timeout, h.first->next_timeout() <= now ? 0 : (h.first->next_timeout() - now)); + } + } +} + +void reactor::remove_deleted() +{ + while (!handlers_to_delete_.empty()) { + auto h = handlers_to_delete_.front(); + handlers_to_delete_.pop_front(); + auto fi = std::find_if(handlers_.begin(), handlers_.end(), [&h](const t_handler_type &ht) { + return ht.first.get() == h.get(); + }); + + if (fi != handlers_.end()) { + log_.debug("removing handler %d", fi->first->handle()); + handlers_.erase(fi); + } + } +} + +void reactor::cleanup() +{ + while (!handlers_.empty()) { + auto hndlr = handlers_.front(); + handlers_.pop_front(); + hndlr.first->close(); + } +} + +int reactor::select(struct timeval *timeout, select_fdsets& fd_sets) +{ + log_.debug("calling select; waiting for io events"); + return ::select( + static_cast(fd_sets.maxp1()) + 1, + fd_sets.read_set().get(), + fd_sets.write_set().get(), + fd_sets.except_set().get(), + timeout + ); +} + + +//void reactor::process_handler(int /*ret*/) +//{ +// handlers_.emplace_back(sentinel_, event_type::NONE_MASK); +// time_t now = ::time(nullptr); +// while (handlers_.front().first != nullptr) { +// auto h = handlers_.front(); +// handlers_.pop_front(); +// handlers_.push_back(h); +// // check for read/accept +// if (h.first->handle() > 0 && fdsets_.write_set().is_set(h.first->handle())) { +// on_write_mask(h.first); +// } +// if (h.first->handle() > 0 && fdsets_.read_set().is_set(h.first->handle())) { +// on_read_mask(h.first); +// } +// if (h.first->next_timeout() > 0 && h.first->next_timeout() <= now) { +// on_timeout(h.first, now); +// } +// } +// handlers_.pop_front(); +//} + +reactor::t_handler_type reactor::resolve_next_handler(time_t now, select_fdsets& fd_sets) +{ + std::lock_guard l(mutex_); + for (auto &h : handlers_) { + if (h.first->handle() > 0 && fd_sets.write_set().is_set(h.first->handle())) { + return std::make_pair(h.first, event_type::WRITE_MASK); + } + if (h.first->handle() > 0 && fd_sets.read_set().is_set(h.first->handle())) { + return std::make_pair(h.first, event_type::READ_MASK); + } + if (h.first->next_timeout() > 0 && h.first->next_timeout() <= now) { + return std::make_pair(h.first, event_type::TIMEOUT_MASK); + } + } + return std::make_pair(nullptr, event_type::NONE_MASK); +} + +void reactor::on_read_mask(const handler_ptr& handler) +{ +// log_.debug("read bit for handler %d is set; handle input", h->handle()); +// std::cout << this << " (handler " << h.get() << "): handle read\n" << std::flush; + handler->on_input(); +} + +void reactor::on_write_mask(const handler_ptr& handler) +{ +// log_.debug("write bit for handler %d is set; handle output", h->handle()); +// std::cout << this << " (handler " << h.get() << "): handle write\n" << std::flush; + handler->on_output(); +} + +void reactor::on_except_mask(const handler_ptr& /*handler*/) +{ +// std::cout << this << " (handler " << h.get() << "): handle exception\n" << std::flush; + +} + +void reactor::on_timeout(const handler_ptr &h, time_t now) +{ +// log_.debug("timeout expired for handler %d; handle timeout", h->handle()); +// std::cout << this << " (handler " << h.get() << "): handle timeout\n" << std::flush; + h->calculate_next_timeout(now); + h->on_timeout(); +} + +select_fdsets reactor::fdsets() const +{ + time_t timeout; + select_fdsets fd_sets; + prepare_select_bits(timeout, fd_sets); + return fd_sets; +} + +void reactor::mark_handler_for_delete(const handler_ptr& h) +{ + std::lock_guard l(mutex_); + handlers_to_delete_.push_back(h); +} + +bool reactor::is_interrupted(select_fdsets& fd_sets) +{ + std::lock_guard l(mutex_); + if (fd_sets.read_set().is_set(interrupter_.socket_id())) { + log_.debug("interrupt byte received; resetting interrupter"); + if (shutdown_requested_) { + running_ = false; + } + return interrupter_.reset(); + } + return false; +} + +bool reactor::has_clients_to_handle(time_t timeout, select_fdsets& fd_sets) const { + std::lock_guard lock(mutex_); + return fd_sets.maxp1() > 0 || timeout != (std::numeric_limits::max)(); +} + +std::list::iterator reactor::find_handler_type(const reactor::handler_ptr &h) +{ + return std::find_if(handlers_.begin(), handlers_.end(), [&h](const t_handler_type &ht) { + return ht.first.get() == h.get(); + }); +} + +void reactor::activate_handler(const reactor::handler_ptr &h, event_type ev) +{ + std::lock_guard l(mutex_); + auto it = find_handler_type(h); + if (it == handlers_.end()) { + return; + } + it->second |= ev; +} + +void reactor::deactivate_handler(const reactor::handler_ptr &h, event_type ev) +{ + std::lock_guard l(mutex_); + auto it = find_handler_type(h); + if (it == handlers_.end()) { + return; + } + it->second &= ~ev; +} + +void reactor::interrupt() +{ + log_.trace("interrupting reactor"); + std::lock_guard l(mutex_); + interrupter_.interrupt(); +} + +void reactor::interrupt_without_lock() +{ + log_.trace("interrupting reactor"); + interrupter_.interrupt(); +} + +} diff --git a/source/core/net/select_fdsets.cpp b/source/core/net/select_fdsets.cpp new file mode 100644 index 0000000..9770eea --- /dev/null +++ b/source/core/net/select_fdsets.cpp @@ -0,0 +1,74 @@ +#include "matador/net/select_fdsets.hpp" + +#include + +namespace matador { + +socket_type select_fdsets::maxp1() const +{ + return (std::max)(fdsets_[0].maxp1(), (std::max)(fdsets_[1].maxp1(), fdsets_[2].maxp1())); +} + +fdset& select_fdsets::fd_set(fdset_type type) +{ + return fdsets_[type]; +} + +fdset& select_fdsets::read_set() +{ + return fdsets_[0]; +} + +const fdset& select_fdsets::read_set() const +{ + return fdsets_[0]; +} + +fdset& select_fdsets::write_set() +{ + return fdsets_[1]; +} + +const fdset& select_fdsets::write_set() const +{ + return fdsets_[1]; +} + +fdset& select_fdsets::except_set() +{ + return fdsets_[2]; +} + +const fdset& select_fdsets::except_set() const +{ + return fdsets_[2]; +} + +void select_fdsets::reset() +{ + for (auto &fdset : fdsets_) { + fdset.reset(); + } +} + +void select_fdsets::reset(fdset_type type) +{ + fdsets_[type].reset(); +} + +bool select_fdsets::is_set(fdset_type type, int fd) const +{ + return fdsets_[type].is_set(fd); +} + +void select_fdsets::clear(fdset_type type, int fd) +{ + fdsets_[type].clear(fd); +} + +void select_fdsets::set(fdset_type type, int fd) +{ + fdsets_[type].set(fd); +} + +} diff --git a/source/core/net/socket_interrupter.cpp b/source/core/net/socket_interrupter.cpp new file mode 100644 index 0000000..c241abf --- /dev/null +++ b/source/core/net/socket_interrupter.cpp @@ -0,0 +1,74 @@ +#include "matador/net/socket_interrupter.hpp" + +#include "matador/utils/buffer_view.hpp" + +#include "matador/logger/log_manager.hpp" + +#ifndef _WIN32 +#include +#endif + +namespace matador { + +socket_interrupter::socket_interrupter() + : client_(tcp::v4()) + , log_(create_logger("SocketInterrupter")) +{ + /* + * setup acceptor + * - create socket + * - set reuse address option + * - bind to localhost:0 (to get random port) + * - get address + * - listen + */ + tcp::acceptor acceptor; + acceptor.reuse_address(true); + tcp::peer local(address::v4::loopback()); + acceptor.bind(local); + acceptor.listen(SOMAXCONN); + + log_.debug("listening for interruptions at %d", acceptor.id()); + /* + * setup connection + * - connect to server + * - accept client + * - prepare server + */ + client_.connect(tcp::peer(address::v4::loopback(), local.port())); + acceptor.accept(server_); + client_.non_blocking(true); + client_.options(TCP_NODELAY, true); +} + +socket_interrupter::~socket_interrupter() +{ + client_.close(); + server_.close(); +} + +socket_type socket_interrupter::socket_id() const +{ + return server_.id(); +} + +void socket_interrupter::interrupt() +{ + buffer_view buf(indicator_); + log_.debug("fd %d: sending interrupt to fd %d", client_.id(), server_.id()); + client_.send(buf); +} + +bool socket_interrupter::reset() +{ + buffer_view buf(consumer_); + log_.debug("reading interrupt byte"); + auto nread = server_.receive(buf); + bool interrupted = nread > 0; + while (nread == static_cast(buf.capacity())) { + nread = server_.receive(buf); + } + return interrupted; +} + +} diff --git a/source/core/net/stream_handler.cpp b/source/core/net/stream_handler.cpp new file mode 100644 index 0000000..c9d65cf --- /dev/null +++ b/source/core/net/stream_handler.cpp @@ -0,0 +1,153 @@ +#include "matador/net/stream_handler.hpp" +#include "matador/net/handler_creator.hpp" +#include "matador/net/reactor.hpp" + +#include "matador/utils/buffer_view.hpp" + +#include "matador/logger/log_manager.hpp" + +#include +#include +#include + +namespace matador { + +stream_handler::stream_handler(tcp::socket sock, tcp::peer endpoint, handler_creator *creator, t_init_handler init_handler) + : log_(create_logger("StreamHandler")) + , stream_(std::move(sock)) + , endpoint_(std::move(endpoint)) + , name_(endpoint.to_string() + " (fd: " + std::to_string(stream_.id()) + ")") + , creator_(creator) + , init_handler_(std::move(init_handler)) +{ + + log_.debug("%s: created stream handler", name_.c_str()); +} + +void stream_handler::open() +{ + init_handler_(endpoint_, *this); +} + +socket_type stream_handler::handle() const +{ + return stream_.id(); +} + +void stream_handler::on_input() +{ + auto len = stream_.receive(read_buffer_); + log_.trace("%s: read %d bytes", name().c_str(), len); + if (len == 0) { + on_close(); + } else if (len < 0 && errno != EWOULDBLOCK) { + char error_buffer[1024]; + log_.error("%s: error on read: %s", name().c_str(), os::strerror(errno, error_buffer, 1024)); + is_ready_to_read_ = false; + on_read_(static_cast(len), static_cast(len)); + on_close(); + } else { + log_.debug("%s: received %d bytes (data: %s)", name().c_str(), len, read_buffer_.data()); + read_buffer_.bump(len); + is_ready_to_read_ = false; + on_read_(0, static_cast(len)); + } +} + +void stream_handler::on_output() +{ + ssize_t bytes_total = 0; + auto start = std::chrono::high_resolution_clock::now(); + while (!write_buffers_.empty()) { + buffer_view &bv = write_buffers_.front(); + + auto len = stream_.send(bv); + log_.trace("%s: sent %d bytes", name().c_str(), len); + + if (len == 0) { + on_close(); + } else if (len < 0 && errno != EWOULDBLOCK) { + char error_buffer[1024]; + log_.error("%s: error on write: %s", name().c_str(), os::strerror(errno, error_buffer, 1024)); + on_close(); + is_ready_to_write_ = false; + on_write_(static_cast(len), static_cast(len)); + } else if (len < 0 && errno == EWOULDBLOCK) { + log_.debug("%s: sent %d bytes (blocked)", name().c_str(), bytes_total); + } else { + bytes_total += len; + bv.bump(len); + if (bv.full()) { + write_buffers_.pop_front(); + } + } + } + auto end = std::chrono::high_resolution_clock::now(); + auto elapsed = std::chrono::duration_cast(end - start); + log_.debug("%s: sent %d bytes (%dms)", name().c_str(), bytes_total, elapsed); + is_ready_to_write_ = false; + on_write_(0, static_cast(bytes_total)); +} + +void stream_handler::on_close() +{ + log_.debug("%s: closing connection", name().c_str(), handle()); + stream_.close(); + creator_->notify_close( this ); + auto self = shared_from_this(); + get_reactor()->mark_handler_for_delete(self); + get_reactor()->unregister_handler(self, event_type::READ_WRITE_MASK); +} + +void stream_handler::close() +{ + if (!stream_.is_open()) { + return; + } + log_.debug("%s: closing connection", name().c_str(), handle()); + stream_.close(); + creator_->notify_close( this ); +} + +bool stream_handler::is_ready_write() const +{ + return is_ready_to_write_ && !write_buffers_.empty(); +} + +bool stream_handler::is_ready_read() const +{ + return is_ready_to_read_ && !read_buffer_.full(); +} + +void stream_handler::read(buffer_view buf, t_read_handler read_handler) +{ + on_read_ = std::move(read_handler); + read_buffer_ = std::move(buf); + is_ready_to_read_ = true; + get_reactor()->interrupt(); +} + +void stream_handler::write(std::list buffers, io_stream::t_write_handler write_handler) +{ + on_write_ = std::move(write_handler); + write_buffers_ = std::move(buffers); + is_ready_to_write_ = true; + get_reactor()->interrupt(); +} + +void stream_handler::close_stream() +{ + on_close(); +} + +tcp::socket &stream_handler::stream() +{ + return stream_; +} + +std::string stream_handler::name() const +{ + return name_; +} + +}