added net module

This commit is contained in:
Sascha Kühl 2025-06-27 15:19:25 +02:00
parent acd30afe6f
commit cd61ac7f00
39 changed files with 4599 additions and 12 deletions

View File

@ -0,0 +1,174 @@
#ifndef MATADOR_ACCEPTOR_HPP
#define MATADOR_ACCEPTOR_HPP
#include "matador/net/export.hpp"
#include "matador/net/handler.hpp"
#include "matador/net/handler_creator.hpp"
#include "matador/net/ip.hpp"
#include "matador/logger/logger.hpp"
#include <functional>
namespace matador {
/**
* The acceptor class is used to accept new connection
* within the reactor dispatcher.
*
* Once a new connection was accepted by the acceptor a
* new handler is created and registered within the reactor
* to handle the established connection
*/
class OOS_NET_API acceptor : public handler, public handler_creator
{
public:
typedef std::function<std::shared_ptr<handler>(tcp::socket sock, tcp::peer endpoint, acceptor *accptr)> t_accept_handler; /**< Shortcut to a function creating a handler on successfully accepted a new connection */
/**
* Default constructor
*/
acceptor();
/**
* Creates an acceptor with the given endpoint. The endpoint
* represents the address on which the acceptor listens for new
* connections
*
* @param endpoint Endpoint to listen for new connections
*/
explicit acceptor(tcp::peer endpoint);
/**
* Creates an acceptor with the given endpoint. The endpoint
* represents the address on which the acceptor listens for new
* connections. The given function is called when a new
* connection was accepted and returns a new handler for
* the new connection.
*
* @param endpoint Endpoint to listen for new connections
* @param on_new_connection Function creating a new handler for each accepted new connection
*/
acceptor(tcp::peer endpoint, t_accept_handler on_new_connection);
/**
* Destructor
*/
~acceptor() override;
/**
* When a new connection is accepted the given function
* is called to create a new handler for the connection
*
* @param on_new_connection Function creating a new handler for the new connection
*/
void accecpt(t_accept_handler on_new_connection);
/**
* Accepts new connection at the given endpoint.
* When a new connection is accepted the given function
* is called to create a new handler for the connection.
*
* @param endpoint Endpoint to listen for new connections
* @param on_new_connection Function creating a new handler for the new connection
*/
void accecpt(const tcp::peer& endpoint, t_accept_handler on_new_connection);
/**
* Opens the acceptor means the socket address of the
* endpoint is bound to the created listing socket
* Then the socket is used for listening for new
* connections.
*/
void open() override;
/**
* Returns the current listening socket fd
*
* @return Listening socket fd
*/
socket_type handle() const override;
/**
* Is called when a new connection wants to
* execute to the endpoint. Once the connection was accepted
* a new connection handler is created and the socket is
* passed to the handler. The handler is then registered
* to the reactor to disptach its read and write events.
*/
void on_input() override;
/**
* Does actually nothing
*/
void on_output() override {}
/**
* Does actually nothing
*/
void on_except() override {}
/**
* Does actually nothing
*/
void on_timeout() override {}
/**
* Does actually nothing
*/
void on_close() override {}
/**
* Closes the listen fd of the acceptor
*/
void close() override;
/**
* Returns always false because new connections
* are indicated as read events.
*
* @return Always false
*/
bool is_ready_write() const override;
/**
* Returns true if the acceptor was opened
* and a listen fd was created.
*
* @return True If a listen socket was created
*/
bool is_ready_read() const override;
/**
* Notifies the acceptor that
* this handler was closed.
*
* @param hndlr Closed handler.
*/
void notify_close(handler *hndlr) override;
std::string name() const override;
public:
/**
* Returns the current endpoint accepting new connection.
*
* @return Current listening endpoint
*/
const tcp::peer& endpoint() const;
private:
tcp::acceptor acceptor_;
tcp::peer endpoint_;
std::string name_ { "acceptor" };
t_accept_handler accept_handler_;
logger log_;
tcp::peer create_client_endpoint() const;
};
}
#endif //MATADOR_ACCEPTOR_HPP

View File

@ -0,0 +1,453 @@
#ifndef MATADOR_ADDRESS_HPP
#define MATADOR_ADDRESS_HPP
#include "matador/utils/os.hpp"
#include "matador/net/export.hpp"
#include "matador/net/os.hpp"
#include "matador/net/error.hpp"
#include <string>
#include <cstring>
#include <stdexcept>
#ifdef _WIN32
#include <ws2tcpip.h>
#else
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#endif
namespace matador {
/**
* Enum representing the protocol
* family IPv4 and IPv6
*/
enum protocol_family {
V4, /**< IPv4 enum value */
V6 /**< IPv6 enum value */
};
/// @cond MATADOR_DEV
template < protocol_family PF >
class address_router;
/// @endcond
/**
* The address class represents a IPv4 or
* IPv6 address.
*/
class OOS_NET_API address
{
public:
/**
* Default constructor
*/
address() = default;
/**
* Constructs an address from the given
* addr representing a IPv4 socket address
* structure
*
* @param addr Initial IPv4 Socket address
*/
explicit address(const sockaddr_in &addr);
/**
* Constructs an address from the given
* addr representing a IPv6 socket address
* structure
*
* @param addr Initial IPv6 Socket address
*/
explicit address(const sockaddr_in6 &addr);
/**
* Copy constructs an address from the
* given address x
*
* @param x Address to copy from
*/
address(const address &x) = default;
/**
* Copy assign an address from the
* given address x
*
* @param x Address to assign from
* @return The assigned address
*/
address& operator=(const address &x);
/**
* Move copy constructs an address from the
* given address x
*
* @param x Address to move copy from
*/
address(address &&x) noexcept;
/**
* Move assign an address from the
* given address x
*
* @param x Address to move assign from
* @return The moved address
*/
address& operator=(address &&x) noexcept;
/**
* Destructs the address
*/
~address();
/**
* Returns the address as unsigned long value
*
* @return The address as unsigned long value
*/
unsigned int to_ulong() const;
/**
* Return the address in string format either
* in IPv4 dotted format or in IPv6 colon based
* format
*
* @return The address as string
*/
std::string to_string() const;
/**
* Sets the port number
*
* @param pn Port number to set
*/
void port(unsigned short pn);
/**
* Returns the current port number of the
* address.
*
* @return The current port number
*/
unsigned short port() const;
/**
* Returns true if address is IPv4
*
* @return True if IPv4 address
*/
bool is_v4() const;
/**
* Returns true if address is IPv4
*
* @return True if IPv4 address
*/
bool is_v6() const;
/**
* Returns the raw sockaddr structure
*
* @return The raw sockaddr structure
*/
sockaddr* addr();
/**
* Returns the raw sockaddr structure
*
* @return The raw sockaddr structure
*/
const sockaddr* addr() const;
/**
* Returns the IPv4 sockaddr_in structure
*
* @return The IPv4 sockaddr_in structure
*/
sockaddr_in* addr_v4();
/**
* Returns the IPv4 sockaddr_in structure
*
* @return The IPv4 sockaddr_in structure
*/
const sockaddr_in* addr_v4() const;
/**
* Returns the IPv6 sockaddr_in6 structure
*
* @return The IPv6 sockaddr_in6 structure
*/
sockaddr_in6* addr_v6();
/**
* Returns the IPv6 sockaddr_in6 structure
*
* @return The IPv6 sockaddr_in6 structure
*/
const sockaddr_in6* addr_v6() const;
/**
* Returns the size of the underlying
* address structure.
*
* @return Size of the underlying address structure
*/
socklen_t size() const;
typedef address_router<V4> v4; /**< Shortcut to the internal IPv4 address router */
typedef address_router<V6> v6; /**< Shortcut to the internal IPv6 address router */
private:
void clear();
private:
template < protocol_family PF >
friend class address_router;
union socket_address {
sockaddr sa_raw;
sockaddr_in sa_in;
sockaddr_in6 sa_in6;
};
socket_address socket_address_ {};
socklen_t size_ = 0;
};
/// @cond MATADOR_DEV
template <>
class address_router<V4>
{
public:
address_router() = delete;
address_router& operator=(const address_router&) = delete;
address_router(const address_router&) = delete;
address_router& operator=(address_router&&) = delete;
address_router(address_router&&) = delete;
static address empty() { return mk_address(INADDR_ANY); };
static address any() { return mk_address(INADDR_ANY); }
static address loopback() { return mk_address(INADDR_LOOPBACK); }
static address broadcast() {return mk_address(INADDR_BROADCAST); }
static address from_ip(const std::string &str) { return from_ip(str.c_str()); }
static address from_ip(const char *str)
{
// now fill in the address info struct
// create and fill the hints struct
if (str == nullptr) {
return address();
}
// get address from string
sockaddr_in addr{};
int ret = os::inet_pton(PF_INET, str, &(addr.sin_addr));
if (ret == 1) {
addr.sin_family = PF_INET;
return address(addr);
} else if (ret == 0) {
detail::throw_logic_error("invalid ip address");
} else {
detail::throw_logic_error_with_errno("invalid ip address: %s", errno);
}
return address();
}
static address from_hostname(const std::string &str) { return from_hostname(str.c_str()); }
static address from_hostname(const char *str)
{
// now fill in the address info struct
// create and fill the hints struct
if (str == nullptr) {
return address();
}
// get address from string
sockaddr_in addr{};
int ret;
if ((ret = os::inet_pton(AF_INET, str, &addr.sin_addr)) == 1) {
addr.sin_family = PF_INET;
} else if (ret == -1) {
detail::throw_logic_error_with_errno("invalid address: %s", errno);
} else { /* 0 == try name */
struct addrinfo hints{};
struct addrinfo *result = nullptr;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_INET; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* Stream socket */
hints.ai_flags = AI_PASSIVE; /* Numeric or net network hostname */
hints.ai_protocol = 0; /* Any protocol */
hints.ai_canonname = nullptr;
hints.ai_addr = nullptr;
hints.ai_next = nullptr;
int s = getaddrinfo(str, nullptr, &hints, &result);
if (s != 0) {
detail::throw_logic_error_with_gai_errno("invalid ip address (getaddrinfo): %s", s);
}
/* getaddrinfo() returns a list of address structures.
Try each address until we successfully bind(2).
If socket(2) (or bind(2)) fails, we (close the socket
and) try the next address. */
// take first address
memcpy(&addr, result->ai_addr, sizeof(&result->ai_addr));
freeaddrinfo(result); /* No longer needed */
}
addr.sin_family = PF_INET;
memset(addr.sin_zero, '\0', sizeof(addr.sin_zero));
return address(addr);
}
private:
static address mk_address(unsigned int inaddr)
{
sockaddr_in addr{};
memset(&addr, 0, sizeof(addr));
addr.sin_family = PF_INET;
addr.sin_addr.s_addr = htonl(inaddr);
return address(addr);
}
};
template <>
class address_router<V6>
{
public:
address_router() = delete;
address_router& operator=(const address_router&) = delete;
address_router(const address_router&) = delete;
address_router& operator=(address_router&&) = delete;
address_router(address_router&&) = delete;
static address empty() { return mk_address(in6addr_any); }
static address any() { return mk_address(in6addr_any); }
static address loopback() { return mk_address(in6addr_loopback); }
static address broadcast() {return mk_multicast_address(); }
static address from_ip(const std::string &str)
{
return from_ip(str.c_str());
}
static address from_ip(const char *str)
{
// now fill in the address info struct
// create and fill the hints struct
if (str == nullptr) {
return address();
}
// get address from string
sockaddr_in6 addr{};
int ret = os::inet_pton(PF_INET6, str, &(addr.sin6_addr));
if (ret == 1) {
addr.sin6_family = PF_INET6;
return address(addr);
} else if (ret == 0) {
char message_buffer[1024];
os::sprintf(message_buffer, 1024, "INET_PTON (ip): invalid ip address [%s]", str);
detail::throw_logic_error(message_buffer);
} else {
char message_buffer[1024];
int err = errno;
os::sprintf(message_buffer, 1024, "INET_PTON (ip): invalid ip address [%s]: %%s", str);
detail::throw_logic_error_with_errno(message_buffer, err);
}
return address();
}
static address from_hostname(const std::string &str)
{
return from_hostname(str.c_str());
}
static address from_hostname(const char *str)
{
// now fill in the address info struct
// create and fill the hints struct
if (str == nullptr) {
return address();
}
// get address from string
sockaddr_in6 addr{};
int ret = os::inet_pton(PF_INET6, str, &addr.sin6_addr);
if (ret == 1) {
addr.sin6_family = PF_INET6;
} else if (ret == -1) {
char message_buffer[1024];
int err = errno;
os::sprintf(message_buffer, 1024, "INET_PTON (host): invalid ip address [%s] (errno: %d): %%s", str, err);
detail::throw_logic_error_with_errno(message_buffer, err);
} else { /* 0 == try name */
struct addrinfo hints{};
struct addrinfo *result = nullptr;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_INET6; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* Stream socket */
hints.ai_flags = AI_PASSIVE; /* Numeric or net network hostname */
hints.ai_protocol = 0; /* Any protocol */
hints.ai_canonname = nullptr;
hints.ai_addr = nullptr;
hints.ai_next = nullptr;
int s = getaddrinfo(str, nullptr, &hints, &result);
if (s != 0) {
char message_buffer[1024];
os::sprintf(message_buffer, 1024, "GETADDRINFO (host): invalid ip address [%s] (errno: %d): %%s", str, s);
detail::throw_logic_error_with_errno(message_buffer, s);
}
/* getaddrinfo() returns a list of address structures.
Try each address until we successfully bind(2).
If socket(2) (or bind(2)) fails, we (close the socket
and) try the next address. */
// take first address
memcpy(&addr, result->ai_addr, result->ai_addrlen);
freeaddrinfo(result); /* No longer needed */
}
addr.sin6_family = PF_INET6;
return address(addr);
}
private:
static OOS_NET_API const char *IP6ADDR_MULTICAST_ALLNODES;
static address mk_address(in6_addr in6addr)
{
sockaddr_in6 addr{};
memset(&addr, 0, sizeof(addr));
addr.sin6_family = PF_INET6;
addr.sin6_addr = in6addr;
return address(addr);
}
static address mk_multicast_address()
{
sockaddr_in6 addr{};
memset(&addr, 0, sizeof(addr));
addr.sin6_family = PF_INET6;
os::inet_pton(AF_INET6, IP6ADDR_MULTICAST_ALLNODES, &addr.sin6_addr);
return address(addr);
}
};
/// @endcond
}
#endif //MATADOR_ADDRESS_HPP

View File

@ -0,0 +1,115 @@
#ifndef MATADOR_ADDRESS_RESOLVER_HPP
#define MATADOR_ADDRESS_RESOLVER_HPP
#include "matador/net/export.hpp"
#include "matador/net/peer.hpp"
#include "matador/net/error.hpp"
#include <vector>
namespace matador {
/// @cond MATADOR_DEV
class tcp;
class udp;
namespace detail {
template < class P >
int determine_socktype();
template <>
OOS_NET_API int determine_socktype<tcp>();
template <>
OOS_NET_API int determine_socktype<udp>();
template < class P >
void initialize_hints(struct addrinfo &hints, int flags) {
memset(&hints,0,sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = determine_socktype<P>();
hints.ai_protocol = 0;
hints.ai_flags = flags;
}
/// @endcond
}
/**
* The address resolver resolves a given host and port
* to a peer object representing the given address
*
* @tparam P Type of protocol
*/
template < class P >
class address_resolver
{
public:
typedef typename P::peer peer; /**< Shortcut to peer type */
/**
* Default constructor
*/
address_resolver() = default;
/**
* Resolves the given host and port to a list
* of valid peers representing the ip addresses
* of the host either in IPv4 or IPv6 format
*
* @param hostname Hostname to resolve
* @param port Port to resolve
* @return A list of peers representing the host and port
*/
std::vector<peer> resolve(const std::string &hostname, const std::string &port);
/**
* Resolves the given host and port to a list
* of valid peers representing the ip addresses
* of the host either in IPv4 or IPv6 format
*
* @param hostname Hostname to resolve
* @param port Port to resolve
* @return A list of peers representing the host and port
*/
std::vector<peer> resolve(const char *hostname, const char *port);
};
/// @cond MATADOR_DEV
template<class P>
std::vector<typename address_resolver<P>::peer> address_resolver<P>::resolve(const std::string &hostname, const std::string &port)
{
return resolve(hostname.c_str(), port.c_str());
}
template < class P >
std::vector<typename address_resolver<P>::peer> address_resolver<P>::resolve(const char *hostname, const char *port)
{
struct addrinfo hints = {};
detail::initialize_hints<P>(hints, AI_PASSIVE);
struct addrinfo* res = nullptr;
struct addrinfo* head = nullptr;
int err = getaddrinfo(hostname, port, &hints, &res);
if (err != 0) {
detail::throw_logic_error_with_gai_errno("error on getaddrinfo: %s", err);
}
head = res;
std::vector<peer> peers;
do {
if (res->ai_family == PF_INET) {
peers.push_back(peer(address(*(struct sockaddr_in*)res->ai_addr)));
} else if (res->ai_family == PF_INET6) {
peers.push_back(peer(address(*(struct sockaddr_in6*)res->ai_addr)));
} // else -> not supported
} while ( (res = res->ai_next) != nullptr);
freeaddrinfo(head);
return peers;
}
/// @endcond
}
#endif //MATADOR_ADDRESS_RESOLVER_HPP

View File

@ -0,0 +1,146 @@
#ifndef MATADOR_CONNECTOR_HPP
#define MATADOR_CONNECTOR_HPP
#include "matador/net/export.hpp"
#include "matador/net/handler.hpp"
#include "matador/net/handler_creator.hpp"
#include "matador/net/ip.hpp"
#include "matador/logger/logger.hpp"
#include <functional>
namespace matador {
/**
* Connector which initiates a reactor based connection
* to a remote network host identified by a list
* of endpoints.
*
* Once a connection is established a handler is created
* with the given create handler function. The socket
* is passed to the created handler.
*/
class OOS_NET_API connector : public handler, public handler_creator
{
public:
typedef std::function<std::shared_ptr<handler>(tcp::socket sock, tcp::peer endpoint, connector *cnnctr)> t_connect_handler; /**< Shortcut to a function creating a handler on successfully execute to a host */
/**
* Default constructor
*/
connector();
/**
* Creates a new connector with the given
* create handler function
*
* @param on_new_connection Function which creates a handler on new connection
*/
explicit connector(t_connect_handler on_new_connection);
/**
* Initiates a execute to one of the given endpoints within the
* given reactor. Once a connection is established a new handler
* for this connection is created. The new connection is dispatched
* by the reactor.
*
* @param r Reactor to handle the connector and the created connection
* @param endpoints List of endpoints to
*/
void connect(reactor &r, const std::vector<tcp::peer> &endpoints);
/**
* Initiates a execute to one of the given endpoints within the
* given reactor. Once a connection is established a new handler
* for this connection is created with the given function. The new
* connection is dispatched by the reactor.
*
* @param r Reactor to handle the connector and the created connection
* @param endpoints List of endpoints to
* @param on_new_connection Function creating a new handler on new connection
*/
void connect(reactor &r, const std::vector<tcp::peer> &endpoints, t_connect_handler on_new_connection);
/**
* Opens the connector. Actually this does
* nothing at all.
*/
void open() override {}
/**
* Returns the handle of the connector. The result will
* always be zero, because the connector doesn't has a socket
*
* @return Always zero
*/
socket_type handle() const override;
/**
* Does nothing
*/
void on_input() override {}
/**
* Does nothing
*/
void on_output() override {}
/**
* Does nothing
*/
void on_except() override {}
/**
* The timeout call is used to establish the connection.
* Once the connection could be established within this call
* the timeout is canceled otherwise it will be tried again
* after three seconds
*/
void on_timeout() override;
/**
* Does nothing
*/
void on_close() override {}
/**
* Does nothing
*/
void close() override {}
/**
* Always false because connection is established via timeout
*
* @return Always false
*/
bool is_ready_write() const override;
/**
* Always false because connection is established via timeout
*
* @return Always false
*/
bool is_ready_read() const override;
/**
* Notifies the connector that
* this handler was closed.
*
* @param hndlr Closed handler.
*/
void notify_close(handler *hndlr) override;
std::string name() const override;
private:
t_connect_handler connect_handler_;
logger log_;
std::vector<tcp::peer> endpoints_;
};
}
#endif //MATADOR_CONNECTOR_HPP

View File

@ -0,0 +1,127 @@
#ifndef MATADOR_EVENT_TYPE_HPP
#define MATADOR_EVENT_TYPE_HPP
namespace matador::net {
/**
* Enum representing an event processing mask
* for the reactor io dispatcher. The mask values
* can be bitwise combined and are used to
* check if a handler (@see handler.hpp) is able
* to read, write or accept.
*/
enum class event_type : unsigned {
NONE_MASK = 0, /**< Enum value for no event */
READ_MASK = 1 << 0, /**< Enum value for read mask */
WRITE_MASK = 1 << 1, /**< Enum value for write mask */
EXCEPT_MASK = 1 << 2, /**< Enum value for except mask */
TIMEOUT_MASK = 1 << 3, /**< Enum value for timeout mask */
ACCEPT_MASK = READ_MASK, /**< Enum value for accept mask */
READ_WRITE_MASK = READ_MASK | WRITE_MASK, /**< Enum value for read write mask */
ALL_MASK = READ_MASK | WRITE_MASK | EXCEPT_MASK | ACCEPT_MASK /**< Enum value for all events mask */
};
/**
* event_type operator to concat to event types
* with or operator
*
* @param a Left event_type
* @param b Right event_type
* @return The combined result
*/
inline event_type operator|(event_type a, event_type b)
{
return static_cast<event_type>(static_cast<unsigned int>(a) | static_cast<unsigned int>(b));
}
/**
* event_type operator to concat to event types
* with and operator
*
* @param a Left event_type
* @param b Right event_type
* @return The combined result
*/
inline event_type operator&(event_type a, event_type b)
{
return static_cast<event_type>(static_cast<unsigned int>(a) & static_cast<unsigned int>(b));
}
/**
* event_type not operator
*
* @param a Left event_type
* @return The combined result
*/
inline event_type operator~ (event_type a)
{
return (event_type)~(int)a;
}
/**
* event_type operator to concat to event types
* with xor operator
*
* @param a Left event_type
* @param b Right event_type
* @return The combined result
*/
inline event_type operator^ (event_type a, event_type b)
{
return (event_type)((int)a ^ (int)b);
}
/**
* event_type operator to assign event type
* with or operator
*
* @param a Left event_type
* @param b Right event_type
* @return The combined result
*/
inline event_type& operator|= (event_type& a, event_type b)
{
return (event_type&)((int&)a |= (int)b);
}
/**
* event_type operator to assign event type
* with and operator
*
* @param a Left event_type
* @param b Right event_type
* @return The combined result
*/
inline event_type& operator&= (event_type& a, event_type b)
{
return (event_type&)((int&)a &= (int)b);
}
/**
* event_type operator to assign event type
* with xor operator
*
* @param a Left event_type
* @param b Right event_type
* @return The combined result
*/
inline event_type& operator^= (event_type& a, event_type b)
{
return (event_type&)((int&)a ^= (int)b);
}
/**
* Checks if a specific event_type is set in
* a given event type mask. If type is set
* true is returned.
*
* @param source Event type mask to check.
* @param needle Requested event type
* @return True if event type is set
*/
inline bool is_event_type_set(event_type source, event_type needle)
{
return static_cast<int>(source & needle) > 0;
}
}
#endif //MATADOR_EVENT_TYPE_HPP

View File

@ -0,0 +1,111 @@
#ifndef MATADOR_FDSET_HPP
#define MATADOR_FDSET_HPP
#include "matador/net/export.hpp"
#include "matador/net/os.hpp"
#include <set>
#include <functional>
#include <cstddef>
#ifdef _WIN32
#include <winsock2.h>
#else
#include <sys/select.h>
#endif
namespace matador {
/**
* This class represents a fd set
* used by the reactor class. There it
* is used in combination with a call
* to select.
*/
class OOS_NET_API fdset
{
public:
fdset(const fdset&) = delete;
fdset& operator=(const fdset&) = delete;
/**
* Default constructor creates
* an empty fd set
*/
fdset();
/**
* Destroys the fd set
*/
~fdset() = default;
fdset(fdset&& x) noexcept;
fdset& operator=(fdset&& x) noexcept;
/**
* Reset all bits to zero
*/
void reset();
/**
* Checks if the given fd is set
* in the fd set.
*
* @param fd Requested fd
* @return True if fd is set
*/
bool is_set(socket_type fd) const;
/**
* Clears the giveb fd from the set.
*
* @param fd fd to clear
*/
void clear(socket_type fd);
/**
* Sets the given fd in the fd set.
*
* @param fd fd to set
*/
void set(socket_type fd);
/**
* Returns the highest fd plus one.
* This is needed for the call to select
*
* @return Highest fd plus one
*/
socket_type maxp1() const;
/**
* Returns the current number of fd in the set
*
* @return Number of fd in set
*/
size_t count() const;
/**
* Checks if the set is empty
*
* @return True if the fd set is empty
*/
bool empty() const;
/**
* Returns a pointer to the underlying fd_set structure
*
* @return Pointer to the underlying fd_set structure
*/
fd_set* get();
private:
typedef std::set<socket_type, std::greater<> > int_set_t;
int_set_t max_fd_set_;
fd_set fd_set_ = {};
};
}
#endif //MATADOR_FDSET_HPP

View File

@ -0,0 +1,138 @@
#ifndef MATADOR_HANDLER_HPP
#define MATADOR_HANDLER_HPP
#include "matador/net/os.hpp"
#include <memory>
#include <string>
namespace matador::net {
class reactor;
/**
* Base class for all handlers used
* with the reactor. The handler must implement its
* interface to handle input, output, exceptional and
* timeout data.
*/
class handler : public std::enable_shared_from_this<handler>
{
public:
/**
* Virtual destructor
*/
virtual ~handler() = default;
/**
* Interface to open a handler. This
* is called when a handler is registered
* within the reactor
*/
virtual void open() = 0;
/**
* Interface to returns the socket handle
* of the concrete handler implementation.
*
* @return The socket fd
*/
virtual socket_type handle() const = 0;
/**
* Interface handling incoming data
*/
virtual void on_input() = 0;
/**
* Interface handling outgoing data
*/
virtual void on_output() = 0;
/**
* Interface handling exceptional data
*/
virtual void on_except() = 0;
/**
* Interface handling timout data
*/
virtual void on_timeout() = 0;
/**
* Interface called when the handler is closed
*/
virtual void on_close() = 0;
/**
* Interface implementation should close
* the handle gracefully
*/
virtual void close() = 0;
/**
* Interface should return true if there
* is outgoing data
*
* @return True if there is outgoing data
*/
virtual bool is_ready_write() const = 0;
/**
* Interface should return true if there
* is incoming data
*
* @return True if there is incoming data
*/
virtual bool is_ready_read() const = 0;
/**
* Returns the next timeout scheduled
* in the reactor
*
* @return Next timeout
*/
time_t next_timeout() const;
/**
* Returns the timeout interval in
* seconds
*
* @return Timeout interval
*/
time_t interval() const;
/**
* Get the name of the handler.
* The name don't need to be unique.
* It has only an informational purpose.
*
* @return The name of the handler
*/
virtual std::string name() const = 0;
protected:
/**
* Gets the underlying reactor
*
* @return The underlying reactor
*/
reactor* get_reactor() const;
private:
friend class reactor;
void register_reactor(reactor *r);
void schedule(time_t offset, time_t interval);
void cancel_timer();
void calculate_next_timeout(time_t now);
reactor *reactor_ = nullptr;
time_t next_timeout_ = 0;
time_t interval_ = 0;
};
}
#endif //MATADOR_HANDLER_HPP

View File

@ -0,0 +1,23 @@
#ifndef MATADOR_HANDLER_CREATOR_HPP
#define MATADOR_HANDLER_CREATOR_HPP
#include "matador/net/export.hpp"
namespace matador {
/// @cond MATADOR_DEV
class handler;
class OOS_NET_API handler_creator
{
public:
virtual ~handler_creator() = default;
virtual void notify_close(handler *hndlr) = 0;
};
/// @endcond
}
#endif //MATADOR_HANDLER_CREATOR_HPP

View File

@ -0,0 +1,128 @@
#ifndef MATADOR_IO_SERVICE_HPP
#define MATADOR_IO_SERVICE_HPP
#include "matador/utils/buffer.hpp"
#include "matador/net/export.hpp"
#include "matador/net/reactor.hpp"
#include "matador/net/acceptor.hpp"
#include "matador/net/connector.hpp"
#include "matador/net/stream_handler.hpp"
namespace matador {
/**
* IO Service is used to encapsulate the an instance
* of the reactor class.
*/
class OOS_NET_API io_service
{
public:
/**
* Creates a io_service
*/
io_service();
~io_service();
/**
* Starts the io_service with the underlying reactor
*/
void run();
/**
* Returns true if the io service is
* running
*
* @return True if service is running
*/
bool is_running() const;
/**
* Shuts down a running service
*/
void shutdown();
/**
* Adds the given acceptor for the
* given peer endpoint and callback.
*
* The callback is called, when a new connection
* was accepted.
*
* @tparam AcceptCallback Type of callback
* @param ac Acceptor used to accept connections
* @param ep Endpoint on which the acceptor will listen
* @param accept_callback Callback when connection was accepted
*/
template < typename AcceptCallback >
void accept(const std::shared_ptr<acceptor>& ac, const tcp::peer &ep, AcceptCallback accept_callback);
/**
* Adds the given acceptor for the
* given callback.
*
* The callback is called, when a new connection
* was accepted.
*
* @tparam AcceptCallback Type of callback
* @param ac Acceptor used to accept connections
* @param accept_callback Callback when connection was accepted
*/
template < typename AcceptCallback >
void accept(const std::shared_ptr<acceptor>& ac, AcceptCallback accept_callback);
/**
* Add the given connector for the given port
* and execute callback.
*
* Once a connection is established the callback
* is called.
*
* @tparam ConnectCallback Type of the callback
* @param co Connector Used to establish the connection
* @param port Port to execute to
* @param connect_callback Callback when connection was established
*/
template < typename ConnectCallback >
void connect(const std::shared_ptr<connector>& co, const std::string &port, ConnectCallback connect_callback);
private:
logger log_;
reactor reactor_;
};
template<typename AcceptCallback>
void io_service::accept(const std::shared_ptr<acceptor>& ac, const tcp::peer &ep, AcceptCallback accept_callback)
{
log_.info("registering acceptor for %s", ep.to_string().c_str());
ac->accecpt(ep, [accept_callback, this](tcp::socket sock, tcp::peer p, acceptor *accptr) {
return std::make_shared<stream_handler>(sock, p, accptr, accept_callback);
});
reactor_.register_handler(ac, event_type::ACCEPT_MASK);
}
template<typename AcceptCallback>
void io_service::accept(const std::shared_ptr<acceptor>& ac, AcceptCallback accept_callback)
{
log_.info("registering acceptor for %s", ac->endpoint().to_string().c_str());
ac->accecpt([accept_callback](tcp::socket sock, tcp::peer p, acceptor *accptr) {
return std::make_shared<stream_handler>(sock, p, accptr, accept_callback);
});
reactor_.register_handler(ac, event_type::ACCEPT_MASK);
}
template<typename ConnectCallback>
void io_service::connect(const std::shared_ptr<connector> &co, const std::string &port, ConnectCallback connect_callback)
{
log_.info("registering connector for localhost:%s", port.c_str());
tcp::resolver resolver;
auto endpoints = resolver.resolve("localhost", port);
co->connect(reactor_, endpoints, [connect_callback](const tcp::socket& sock, const tcp::peer &p, connector *cnnctr) {
return std::make_shared<stream_handler>(sock, p, cnnctr, connect_callback);
});
}
}
#endif //MATADOR_IO_SERVICE_HPP

View File

@ -0,0 +1,67 @@
#ifndef MATADOR_IO_STREAM_HPP
#define MATADOR_IO_STREAM_HPP
#include <functional>
#include <list>
#include "matador/net/export.hpp"
#include "matador/net/ip.hpp"
namespace matador {
class buffer;
class buffer_view;
/**
* The io stream class is proposed
* to be used with the io_service class
* and provides therefore an interface
* which is used by the io_service
*/
class OOS_NET_API io_stream
{
public:
typedef std::function<void(int ec, long nread)> t_read_handler; /**< Short for function to process read data */
typedef std::function<void(int ec, long nwrite)> t_write_handler; /**< Short for function to prepare data to write */
/**
* This interface is called when data should
* be read from a socket. Once the date was read
* the given read handler is called.
*
* @param buf Buffer to read the data in
* @param read_handler Handler to be called when data was read
*/
virtual void read(buffer_view buf, t_read_handler read_handler) = 0;
/**
* This interface is called when data should be written
* to a socket. Once the data was written the given
* write handler is called.
*
* @param buffers List of buffers containing the data to write
* @param write_handler Handler to be called when the data was written
*/
virtual void write(std::list<buffer_view> buffers, t_write_handler write_handler) = 0;
/**
* Closes the stream
*/
virtual void close_stream() = 0;
/**
* Returns the underlying stream socket
* @return
*/
virtual tcp::socket& stream() = 0;
/**
* Returns a name for the io stream.
*
* @return Name of the io stream
*/
virtual std::string name() const = 0;
};
}
#endif //MATADOR_IO_STREAM_HPP

130
include/matador/net/ip.hpp Normal file
View File

@ -0,0 +1,130 @@
#ifndef MATADOR_IP_HPP
#define MATADOR_IP_HPP
#include "matador/net/peer.hpp"
#include "matador/net/address_resolver.hpp"
#include "matador/net/socket.hpp"
#include "matador/net/socket_stream.hpp"
#include "matador/net/socket_acceptor.hpp"
namespace matador::net {
/**
* The tcp class represents all
* settings to handle tcp socket
* connections
*/
class tcp {
public:
typedef peer_base<tcp> peer; /**< Shortcut to a tcp based peer */
typedef socket_stream<tcp> socket; /**< Shortcut to a tcp based socket */
typedef socket_acceptor<tcp> acceptor; /**< Shortcut to a tcp based acceptor */
typedef address_resolver<tcp> resolver; /**< Shortcut to a tcp based address resolver */
/**
* Returns the type of the socket
*
* @return Type of the socket
*/
int type() const { return SOCK_STREAM; }
/**
* Returns the socket protocol
*
* @return Socket protocol
*/
int protocol() const { return IPPROTO_TCP; }
/**
* Returns the socket family
*
* @return Socket family
*/
int family() const { return family_; }
/**
* Creates an instance of tcp representing
* an IPv4 socket
*
* @return IPv4 tcp object
*/
static tcp v4() { return tcp(PF_INET); }
/**
* Creates an instance of tcp representing
* an IPv6 socket
*
* @return IPv6 tcp object
*/
static tcp v6() { return tcp(PF_INET6); }
private:
explicit tcp(int family)
: family_(family)
{}
int family_;
};
/**
* The udp class represents all
* settings to handle udp socket
* connections
*/
class OOS_NET_API udp
{
public:
typedef peer_base<udp> peer; /**< Shortcut to a udp based peer */
typedef socket_stream<udp> socket; /**< Shortcut to a udp based socket */
typedef socket_acceptor<udp> acceptor; /**< Shortcut to a udp based acceptor */
typedef address_resolver<udp> resolver; /**< Shortcut to a udp based address resolver */
/**
* Returns the type of the socket
*
* @return Type of the socket
*/
int type() const { return SOCK_DGRAM; }
/**
* Returns the socket protocol
*
* @return Socket protocol
*/
int protocol() const { return IPPROTO_UDP; }
/**
* Returns the socket family
*
* @return Socket family
*/
int family() const { return family_; }
/**
* Creates an instance of udp representing
* an IPv4 socket
*
* @return IPv4 udp object
*/
static udp v4() { return udp(PF_INET); }
/**
* Creates an instance of udp representing
* an IPv6 socket
*
* @return IPv6 udp object
*/
static udp v6() { return udp(PF_INET6); }
private:
explicit udp(int family)
: family_(family)
{}
int family_;
};
}
#endif //MATADOR_IP_HPP

View File

@ -0,0 +1,45 @@
#ifndef MATADOR_NET_OS_HPP
#define MATADOR_NET_OS_HPP
#include "matador/net/export.hpp"
#include <cstdio>
#if _WIN32
#include <WinSock2.h>
#endif
#ifdef _WIN32
using socket_type = SOCKET;
#else
using socket_type = int;
#endif
namespace matador {
namespace net {
OOS_NET_API void init();
OOS_NET_API void cleanup();
}
bool is_valid_socket(socket_type fd);
namespace os {
OOS_NET_API int inet_pton(int af, const char *src, void *dst);
OOS_NET_API const char* inet_ntop(int af, const void* src, char* dst, size_t size);
enum class shutdown_type
{
READ = 0,
WRITE = 1,
READ_WRITE = 2
};
OOS_NET_API int shutdown(socket_type fd, shutdown_type type);
OOS_NET_API int close(socket_type fd);
}
}
#endif //MATADOR_NET_OS_HPP

View File

@ -0,0 +1,203 @@
#ifndef MATADOR_PEER_HPP
#define MATADOR_PEER_HPP
#include "matador/net/address.hpp"
#ifdef _WIN32
//#include <ws2tcpip.h>
#else
#include <netinet/in.h>
#endif
#include <cstring>
namespace matador {
/**
* The peer_base class acts like the holder
* of network endpoint information like socket
* address and port. The template argument
* sets the protocol type either TCP or UDP.
*
* @tparam P Protocol type
*/
template < class P >
class peer_base
{
public:
typedef P protocol_type; /**< Short to protocol type */
/**
* Default constructor
*/
peer_base() = default;
/**
* Creates a peer from a given address. Port is set
* to zero.
*
* @param addr Address to create the peer from
*/
explicit peer_base(address addr)
: addr_(std::move(addr))
{}
/**
* Creates a peer from a given address and port
*
* @param addr Address to create the peer from
* @param port Port of the endpoint
*/
peer_base(address addr, unsigned short port)
: addr_(std::move(addr))
{
addr_.port(port);
}
/**
* Copy creates a peer from the given peer
*
* @param x Peer to copy from
*/
peer_base(const peer_base &x)
: addr_(x.addr_)
{}
/**
* Move creates a peer from a given peer
*
* @param x Peer to move from
*/
peer_base(peer_base &&x) noexcept
: addr_(std::move(x.addr_))
{}
/**
* Copy assigns a given peer to this peer
*
* @param x Peer to assign
* @return The assigned peer
*/
peer_base& operator=(const peer_base &x)
{
addr_ = x.addr_;
return *this;
}
/**
* Assign moves the given peer to this peer
*
* @param x The peer to move assign
* @return The moved peer
*/
peer_base& operator=(peer_base &&x) noexcept
{
addr_ = std::move(x.addr_);
return *this;
}
/**
* Destructor
*/
~peer_base() = default;
/**
* Returns the current port of the peer.
*
* @return The current port
*/
int port() const { return addr_.port(); }
/**
* Returns the current IP protocol of the peer
* address which is either IPv4 or IPv6
*
* @return The current IP protocol
*/
protocol_type protocol() const
{
if (addr_.is_v4()) {
return protocol_type::v4();
} else {
return protocol_type::v6();
}
}
/**
* Returns the raw pointer to the sockaddr structure
*
* @return The raw pointer to the sockaddr structure
*/
sockaddr* data()
{
return addr_.addr();
}
/**
* Returns the raw pointer to the sockaddr structure
*
* @return The raw pointer to the sockaddr structure
*/
const sockaddr* data() const
{
return addr_.addr();
}
/**
* Returns the size of the underlying sockaddr structure
*
* @return The size of the underlying sockaddr structure
*/
size_t size() const
{
return addr_.size();
}
/**
* Returns a reference to the address
*
* @return A reference to the address
*/
address& addr()
{
return addr_;
}
/**
* Returns a reference to the address
*
* @return A reference to the address
*/
const address& addr() const
{
return addr_;
}
/**
* Converts the peer endpoint to a string in
* the format [ip]:[port]
*
* @return Returns a string representation of the peer
*/
std::string to_string() const
{
char addstr[INET6_ADDRSTRLEN + 8];
const char *name;
if (addr().is_v4()) {
name = os::inet_ntop(addr_.addr()->sa_family, &addr_.addr_v4()->sin_addr, addstr, INET6_ADDRSTRLEN);
} else {
name = os::inet_ntop(addr_.addr()->sa_family, &addr_.addr_v6()->sin6_addr, addstr, INET6_ADDRSTRLEN);
}
size_t pos = strlen(name);
snprintf(addstr+pos, INET6_ADDRSTRLEN+8-pos, ":%d", addr_.port());
return addstr;
}
private:
address addr_;
};
}
#endif //MATADOR_PEER_HPP

View File

@ -0,0 +1,117 @@
#ifndef REACTOR_HPP
#define REACTOR_HPP
#include "matador/net/event_type.hpp"
#include "matador/net/select_fd_sets.hpp"
#include "matador/net/socket_interrupter.hpp"
#include "matador/utils/leader_follower_thread_pool.hpp"
#include <unordered_map>
#include <memory>
#include <atomic>
#include <mutex>
#include <chrono>
#include <queue>
#include <list>
namespace matador::net {
class handler;
class reactor {
public:
using handler_ptr = std::shared_ptr<handler>;
using handler_weak_ptr = std::weak_ptr<handler>;
struct Statistics {
std::atomic<size_t> total_events_handled_{0};
std::atomic<size_t> read_events_{0};
std::atomic<size_t> write_events_{0};
std::atomic<size_t> timer_events_{0};
std::atomic<size_t> errors_{0};
};
struct HandlerEntry {
explicit HandlerEntry(handler_ptr h, event_type et)
: handler(std::move(h)), events(et) {}
handler_ptr handler;
event_type events;
time_t next_timeout{0};
time_t interval{0};
bool marked_for_deletion{false};
};
reactor();
~reactor();
reactor(const reactor&) = delete;
reactor& operator=(const reactor&) = delete;
void register_handler(const handler_ptr& h, event_type type);
void unregister_handler(const handler_ptr& h, event_type type);
void schedule_timer(const handler_ptr& h, time_t offset, time_t interval);
void cancel_timer(const handler_ptr& h);
void run();
void handle_events();
void shutdown();
bool is_running() const { return running_; }
select_fdsets fdsets() const;
void mark_handler_for_delete(const handler_ptr& h);
void activate_handler(const handler_ptr& h, event_type ev);
void deactivate_handler(const handler_ptr& h, event_type ev);
const Statistics& get_statistics() const { return stats_; }
private:
struct TimerEvent {
time_t timeout;
handler_weak_ptr handler;
bool operator>(const TimerEvent& other) const {
return timeout > other.timeout;
}
};
using handler_map = std::unordered_map<handler*, std::unique_ptr<HandlerEntry>>;
using timer_queue = std::priority_queue<TimerEvent, std::vector<TimerEvent>, std::greater<>>;
void prepare_select_bits(time_t& timeout, select_fdsets& fd_sets) const;
bool process_events(const select_fdsets& fd_sets, time_t now);
void process_timers(time_t now);
void cleanup_deleted_handlers();
void handle_error(const std::string& operation, const handler_ptr& h);
int perform_select(struct timeval* timeout, select_fdsets& fd_sets);
bool check_interruption(const select_fdsets& fd_sets);
void safe_handler_operation(const handler_ptr& h, const std::string& op_name,
const std::function<void()>& operation);
HandlerEntry* find_handler_entry(const handler_ptr& h);
void remove_handler_entry(const handler_ptr& h);
void update_handler_events(HandlerEntry* entry, event_type ev, bool activate);
private:
handler_map handlers_;
timer_queue timers_;
std::atomic<bool> running_{false};
std::atomic<bool> shutdown_requested_{false};
mutable std::shared_mutex handlers_mutex_;
std::mutex timers_mutex_;
std::condition_variable shutdown_cv_;
logger log_;
Statistics stats_;
leader_follower_thread_pool thread_pool_;
socket_interrupter interrupter_;
static constexpr std::chrono::seconds CLEANUP_INTERVAL{60};
time_t last_cleanup_{0};
};
} // namespace matador
#endif //REACTOR_HPP

View File

@ -0,0 +1,132 @@
#ifndef MATADOR_SELECT_FDSETS_HPP
#define MATADOR_SELECT_FDSETS_HPP
#include "matador/net/fdset.hpp"
namespace matador {
/**
* This class represents three fd sets
* needed for the system call to select.
*/
class select_fdsets
{
public:
/**
* These enum values are used
* to identify and access the three
* different fd sets
*/
typedef enum {
read_type = 0, /**< Enum value for the read fd set */
write_type = 1, /**< Enum value for the write fd set */
except_type /**< Enum value for the exceptional fd set */
} fdset_type;
/**
* Returns the highest fd value over
* all three fd sets plus one. This is used
* by the system call to select.
*
* @return Highest fd value over all sets
*/
socket_type maxp1() const;
/**
* Returns the fdset identified by the given type
*
* @param type Requested fdset type
* @return The requested fd set
*/
fdset& fd_set(fdset_type type);
/**
* Returns the read fd set.
*
* @return The read fd set.
*/
fdset& read_set();
/**
* Returns the read fd set.
*
* @return The read fd set.
*/
const fdset& read_set() const;
/**
* Returns the write fd set.
*
* @return The write fd set.
*/
fdset& write_set();
/**
* Returns the write fd set.
*
* @return The write fd set.
*/
const fdset& write_set() const;
/**
* Returns the exceptional fd set.
*
* @return The exceptional fd set.
*/
fdset& except_set();
/**
* Returns the exceptional fd set.
*
* @return The exceptional fd set.
*/
const fdset& except_set() const;
/**
* Reset all bits in all three sets to zero
*/
void reset();
/**
* Resets the fd set identified by the
* given type
*
* @param type Type of set to reset
*/
void reset(fdset_type type);
/**
* Checks if the given fd is set in the fdset
* identified by the given type. If fd
* is set true returned.
*
* @param type Requested fd set type
* @param fd fd to check
* @return True if fd is set in fdset
*/
bool is_set(fdset_type type, int fd) const;
/**
* Clears the given fd is set in the fdset
* identified by the given type.
*
* @param type Requested fd set type
* @param fd fd to clear
*/
void clear(fdset_type type, int fd);
/**
* Sets the bit for the given fd is set
* in the fdset identified by the given type.
*
* @param type Requested fd set type
* @param fd fd to set
*/
void set(fdset_type type, int fd);
private:
fdset fdsets_[3];
};
}
#endif //MATADOR_SELECT_FDSETS_HPP

View File

@ -0,0 +1,168 @@
#ifndef MATADOR_SOCKET_HPP
#define MATADOR_SOCKET_HPP
#include <stdexcept>
#include <sstream>
#ifdef _WIN32
#else
#include <sys/socket.h>
#include <fcntl.h>
#include <netdb.h>
#include <arpa/inet.h>
#endif
#include <cstring>
namespace matador {
/**
* Base class for several kind of socket
* classes (acceptor, stream) representing a
* socket. The protocol is selected via the
* template parameter (/sa tcp and udp classes)
*
* @tparam P Protocol type
*/
template < class P >
class socket_base
{
public:
typedef P protocol_type; /**< Shortcut to the protocol type */
typedef typename P::peer peer_type; /**< Shortcut to the peer type */
/**
* Creates a socket for a specific given
* protocol
*
* @param protocol Initial protocol
*/
explicit socket_base(const protocol_type &protocol);
/**
* Creates a socket with the given peer
*
* @param peer Peer used to initialize the socket
*/
explicit socket_base(const peer_type &peer);
/**
* Opens a socket. On success a positive socket id (fd)
* is returned. In case of error -1 is returned and
* errno is set.
*
* @param protocol Protocol for which a socket is created
* @return The socket fd or -1 on error
*/
socket_type open(const protocol_type &protocol);
/**
* Closes the open socket
*/
void close();
/**
* Returns true if the socket is open (created)
*
* @return True on open socket
*/
bool is_open() const;
/**
* Releases the socket fd and sets
* the internal socket to zero (0).
*
* After the socket is released the user
* is in charge to take of the socket
*
* @return The released socket fd
*/
socket_type release();
/**
* Connect to the given peer. If the connection
* could be established true is returned,
* otherwise false is returned and errno is set.
*
* @param p Peer to execute to
* @return True on successful connection
*/
bool connect(const typename protocol_type::peer &p);
/**
* Sets the socket either blocking (false) or
* non blocking (true).
*
* @param nb True sets the socket non blocking false blocking
*/
void non_blocking(bool nb);
/**
* Returns true if the socket is non blocking
* otherwise returns false
* @return True if socket is non blocking
*/
bool non_blocking() const;
/**
* Set or unset the cose on exec flag
* for the socket
*
* @param nb Flag to set or unset cloexec option
*/
void cloexec(bool nb);
/**
* Returns true if close on exec option is set
*
* @return True on set cloexec option
*/
bool cloexec() const;
/**
* Sets a socket option represented by name. If option
* was successfully set true is returned. Otherwise false
* and errno ist set.
*
* @param name Option name
* @param value Flag to set or unset the option
* @return True on success
*/
bool options(int name, bool value);
/**
* Returns the underlying socket fd
*
* @return Underlying socket fd
*/
socket_type id() const;
/**
* Assigns the given socket fd to this
* socket. If the socket is already opened
* an exception is thrown.
*
* @param sock The socket fd to assign
*/
void assign(socket_type sock);
protected:
/// @cond MATADOR_DEV
socket_base() = default;
~socket_base() = default;
socket_type open(int family, int type, int protocol);
socket_type sock_ = 0;
std::string name_;
#ifdef _WIN32
bool is_nonblocking_ = false;
#endif
/// @endcond
};
}
#include "matador/net/socket.tpp"
#endif //MATADOR_SOCKET_HPP

View File

@ -0,0 +1,257 @@
#include "matador/net/socket.hpp"
#include "matador/net/socket_stream.hpp"
#include "matador/net/os.hpp"
#include "matador/net/error.hpp"
#include "matador/net/ip.hpp"
#include <cstring>
namespace matador {
#define throw_logic_error(msg) \
do { \
std::stringstream str; \
str << msg; \
throw std::logic_error(str.str()); \
} while(false);
template < class P >
socket_base<P>::socket_base(const protocol_type &protocol)
{
open(protocol);
}
template < class P >
socket_base<P>::socket_base(const peer_type &peer)
{
open(peer.protocol());
}
template < class P >
socket_type socket_base<P>::open(const protocol_type &protocol)
{
return open(protocol.family(), protocol.type(), protocol.protocol());
}
template < class P >
void socket_base<P>::close()
{
if (sock_ <= 0) {
return;
}
//os::shutdown(sock_, os::shutdown_type::READ_WRITE);
os::close(sock_);
sock_ = 0;
}
template < class P >
bool socket_base<P>::is_open() const
{
return sock_ > 0;
}
template < class P >
socket_type socket_base<P>::release()
{
auto tmp_fd = sock_;
sock_ = 0;
return tmp_fd;
}
template < class P >
bool socket_base<P>::connect(const typename protocol_type::peer &p)
{
return ::connect(sock_, p.data(), static_cast<int>(p.size())) == 0;
}
template < class P >
void socket_base<P>::non_blocking(bool nb)
{
#ifdef WIN32
unsigned long nonblock = nb ? 0 : 1;
// fcntl doesn't do the right thing, but the similar ioctl does
// warning: is that still true? and does it the right thing for
// set blocking as well?
if (ioctlsocket(sock_, FIONBIO, &nonblock) != 0) {
throw std::logic_error("ioctlsocket: couldn't set flags");
}
is_nonblocking_ = nb;
#else
int val = fcntl(sock_, F_GETFL, 0);
if (val < 0) {
throw std::logic_error("fcntl: couldn't get flags");
}
int flag = (nb ? O_NONBLOCK : 0);
if (fcntl(sock_, F_SETFL, val | flag) < 0) {
std::string err(strerror(errno));
throw std::logic_error("fcntl: couldn't set flags (" + err + ")");
}
#endif
}
template < class P >
bool socket_base<P>::non_blocking() const
{
#ifdef _WIN32
return is_nonblocking_;
#else
int val = fcntl(sock_, F_GETFL, 0);
if (val < 0) {
std::string err(strerror(errno));
throw std::logic_error("fcntl: couldn't get flags (" + err + ")");
}
return (val & O_NONBLOCK) > 0;
#endif
}
template < class P >
void socket_base<P>::cloexec(bool nb)
{
#ifdef WIN32
unsigned long cloexec = 1;
// fcntl doesn't do the right thing, but the simular ioctl does
// warning: is that still true? and does it the right thing for
// set blocking as well?
if (ioctlsocket(sock_, FIONBIO, &cloexec) != 0) {
throw std::logic_error("ioctlsocket: couldn't set flags");
}
#else
int val = fcntl(sock_, F_GETFL, 0);
if (val < 0) {
throw std::logic_error("fcntl: couldn't get flags");
}
int flag = (nb ? FD_CLOEXEC : 0);
if (fcntl(sock_, F_SETFL, val | flag) < 0) {
std::string err(strerror(errno));
throw std::logic_error("fcntl: couldn't set flags (" + err + ")");
}
#endif
}
template < class P >
bool socket_base<P>::cloexec() const
{
int val = fcntl(sock_, F_GETFL, 0);
if (val < 0) {
std::string err(strerror(errno));
throw std::logic_error("fcntl: couldn't get flags (" + err + ")");
}
return (val & FD_CLOEXEC) > 0;
}
template < class P >
bool socket_base<P>::options(int name, bool value)
{
const char flag = static_cast<char>(value ? 1 : 0);
return setsockopt(sock_, IPPROTO_TCP, name, &flag, sizeof(flag)) == 0;
}
template < class P >
socket_type socket_base<P>::id() const
{
return sock_;
}
template < class P >
void socket_base<P>::assign(socket_type sock)
{
if (is_open()) {
throw std::logic_error("couldn't assign: socket already opened");
}
struct sockaddr_in addr{};
socklen_t addr_size = sizeof(struct sockaddr_in);
if (getpeername(sock, (struct sockaddr *)&addr, &addr_size) == 0) {
//char *clientip = new char[20];
char s[INET6_ADDRSTRLEN];
os::inet_ntop(addr.sin_family, &addr.sin_addr, s, sizeof s);
#ifdef _WIN32
// strcpy_s(clientip, 20, s);
#else
// strcpy(clientip, s);
#endif
}
sock_ = sock;
}
template < class P >
socket_type socket_base<P>::open(int family, int type, int protocol)
{
sock_ = ::socket(family, type, protocol);
return sock_;
}
template < class P >
socket_type connect(socket_stream<P> &stream, const char* hostname, unsigned short port)
{
char portstr[6];
sprintf(portstr, "%d", port);
// const char* portname = "daytime";
struct addrinfo hints = {};
memset(&hints,0,sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
struct addrinfo* res = nullptr;
struct addrinfo* head = nullptr;
int err = getaddrinfo(hostname, portstr, &hints, &res);
if (err != 0) {
detail::throw_logic_error_with_gai_errno("failed to resolve local socket address: %s", err);
}
head = res;
socket_type conn_fd = 0;
socket_type ret = 0;
do {
conn_fd = ::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (!is_valid_socket(conn_fd)) {
// error, try next one
continue;
}
ret = ::connect(conn_fd, res->ai_addr, res->ai_addrlen);
if (ret == 0) {
// success
stream.assign(conn_fd);
break;
// } else {
// throw_logic_error("couldn't execute: " << strerror(errno));
}
// bind error, close and try next one
os::shutdown(conn_fd, os::shutdown_type::READ_WRITE);
} while ( (res = res->ai_next) != nullptr);
if (res == nullptr) {
throw_logic_error("couldn't execute to " << hostname << ":" << port);
}
freeaddrinfo(head);
return ret;
}
template < class P >
int connect(socket_stream<P> &stream, peer_base<P> endpoint)
{
auto pt = endpoint.protocol();
auto fd = ::socket(pt.family(), pt.type(), pt.protocol());
if (!is_valid_socket(fd)) {
return static_cast<int>(fd);
}
auto ret = ::connect(fd, endpoint.data(), static_cast<int>(endpoint.size()));
if (ret == 0) {
stream.assign(fd);
} else {
os::shutdown(fd, os::shutdown_type::READ_WRITE);
}
return ret;
}
}

View File

@ -0,0 +1,357 @@
#ifndef MATADOR_SOCKET_ACCEPTOR_HPP
#define MATADOR_SOCKET_ACCEPTOR_HPP
#include "matador/net/socket.hpp"
namespace matador {
/**
* The socket acceptor class provides the
* base functionality of the socket base class
* plus acceptor specific functions like
* listen, bind or accept
*
* @tparam P Socket protocol type
*/
template < class P >
class socket_acceptor : public socket_base<P>
{
public:
typedef socket_base<P> base; /**< Shortcut to base socket type */
typedef socket_stream<P> stream_type; /**< Shortcut to socket stream type */
typedef typename base::protocol_type protocol_type; /**< Shortcut to protocol type */
typedef typename base::peer_type peer_type; /**< Shortcut to peer type */
/**
* Default constructor
*/
socket_acceptor() = default;
/**
* Constructs a socket acceptor for
* the given peer
*
* @param peer Peer to construct an acceptor from
*/
explicit socket_acceptor(peer_type &peer);
/**
* Constructs an acceptor for given hostname and port
*
* @param hostname Hostname of the accepting endpoint
* @param port Portnumber of the accepting endpoint
*/
socket_acceptor(const char* hostname, unsigned short port);
/**
* Creates a listening socket and binds
* the given hostname and port to it
*
* Returns zero (0) on success and -1 on error
* with errno set
*
* @param hostname Hostname to bind
* @param port Portnumber to bind
* @return Returns zero (0) on success.
*/
int bind(const char* hostname, unsigned short port);
/**
* Creates a listening socket and binds
* the given peer endpoint to it
*
* Returns zero (0) on success and -1 on error
* with errno set
*
* @param peer Binds the given peer endpoint to the socket
* @return Returns zero (0) on success.
*/
int bind(peer_type &peer);
/**
* Start listening to the bound endpoint using
* the internally created socket.
*
* Returns zero (0) on success and -1 on error
* with errno set
*
* @param backlog Number of backlog
* @return Returns zero (0) on success.
*/
int listen(int backlog);
/**
* Returns a pointer to the underlying
* concrete internet address of the given
* socket address structure
*
* @param sa Socket address
* @return Pointer to the internet address
*/
void* get_in_addr(struct sockaddr *sa);
/**
* Returns the port number of the given
* socket address structure
*
* @param sa Socket address
* @return The port number
*/
unsigned short get_port(struct sockaddr *sa);
/**
* Get the remote address and port as string
* representation.
*
* @param remote_addr Remote socket address structure
* @return String representation of the remote address
*/
std::string get_remote_address(struct sockaddr_storage &remote_addr);
/**
* Accept a connection and assign the socket descriptor
* to the given socket stream.
*
* Once the descriptor is assigned to the stream it
* can be used to read and write data to it.
*
* @param stream Stream to use after connection was accepted
* @return The fd of the new connection
*/
int accept(stream_type &stream);
/**
* Accept a connection and assign the socket descriptor
* to the given socket stream.
*
* Once the descriptor is assigned to the stream it
* can be used to read and write data to it.
*
* The given peer endpoint is filled with the
* address information of the remote endpoint.
*
* @param stream Stream to use after connection was accepted
* @param endpoint Will be filled with the remote endpoint information
* @return The fd of the new connection
*/
int accept(stream_type &stream, peer_type &endpoint);
/**
* Sets or clears the reuse address flag. If the
* given value is true the flag is set otherwise the
* flag is cleared.
*
* @param reuse Indicates if the reuse address flag should be set
* @return 0 if setting was successful, -1 on error
*/
int reuse_address(bool reuse);
/**
* Returns true if the flag is set otherwise
* false is returned.
*
* @return True if flag is set
*/
bool reuse_address() const;
};
/// @cond MATADOR_DEV
template < class P >
socket_acceptor<P>::socket_acceptor(peer_type &peer)
: socket_base<P>(peer)
{
bind(peer);
}
template < class P >
socket_acceptor<P>::socket_acceptor(const char* hostname, unsigned short port)
{
bind(hostname, port);
}
template < class P >
int socket_acceptor<P>::bind(const char* hostname, unsigned short port)
{
char portstr[6];
sprintf(portstr, "%d", port);
// const char* portname = "daytime";
struct addrinfo hints = {};
memset(&hints,0,sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = 0;
hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
struct addrinfo* res = nullptr;
struct addrinfo* head = nullptr;
int err = getaddrinfo(hostname, portstr, &hints, &res);
if (err != 0) {
throw_logic_error("failed to resolve local socket address (error: " << err << ")");
}
head = res;
int listenfd = 0;
int ret = 0;
const int on = 1;
do {
listenfd = ::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (listenfd < 0) {
// error, try next one
continue;
}
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
throw std::logic_error(strerror(errno));
}
ret = ::bind(listenfd, res->ai_addr, res->ai_addrlen);
if (ret == 0) {
// success
this->assign(listenfd);
break;
} else {
throw_logic_error("couldn't bind to " << hostname << ":" << port << ": " << strerror(errno));
}
// bind error, close and try next one
this->close();
} while ( (res = res->ai_next) != nullptr);
if (res == nullptr) {
throw_logic_error("couldn't bind to " << hostname << ":" << port);
}
freeaddrinfo(head);
return ret;
}
template < class P >
int socket_acceptor<P>::bind(peer_type &peer)
{
socket_type listen_fd = ::socket(peer.protocol().family(), peer.protocol().type(), peer.protocol().protocol());
if (!is_valid_socket(listen_fd)) {
// error, try next one
return static_cast<int>(listen_fd);
}
#ifdef _WIN32
const char on = 1;
#else
const int on = 1;
#endif
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
detail::throw_logic_error_with_errno("setsockopt error: %s", errno);
}
int ret = ::bind(listen_fd, peer.data(), static_cast<int>(peer.size()));
if (ret == 0) {
// success
this->assign(listen_fd);
} else {
detail::throw_logic_error_with_errno("couldn't bind fd: %s", errno);
}
size_t s = peer.size();
ret = getsockname(this->id(), peer.data(), (socklen_t*)&s);
return ret;
}
template < class P >
int socket_acceptor<P>::listen(int backlog)
{
return ::listen(this->id(), backlog);
}
template < class P >
void* socket_acceptor<P>::get_in_addr(struct sockaddr *sa)
{
if (sa->sa_family == AF_INET) {
return &(((struct sockaddr_in*)sa)->sin_addr);
} else {
return &(((struct sockaddr_in6*)sa)->sin6_addr);
}
}
template < class P >
unsigned short socket_acceptor<P>::get_port(struct sockaddr *sa)
{
if (sa->sa_family == AF_INET) {
return ntohs(((struct sockaddr_in*)sa)->sin_port);
} else {
return ntohs(((struct sockaddr_in6*)sa)->sin6_port);
}
}
template < class P >
std::string socket_acceptor<P>::get_remote_address(struct sockaddr_storage &remote_addr)
{
char s[INET6_ADDRSTRLEN];
os::inet_ntop(remote_addr.ss_family,
get_in_addr((struct sockaddr *)&remote_addr),
s, sizeof s);
std::stringstream ra;
ra << s << ":" << get_port((struct sockaddr *)&remote_addr);
return ra.str();
}
template < class P >
int socket_acceptor<P>::accept(stream_type &stream)
{
struct sockaddr_storage remote_addr = {};
// address_type remote_addr;
socklen_t addrlen = sizeof(remote_addr);
auto fd = ::accept(this->id(), (struct sockaddr *)&remote_addr, &addrlen);
if (is_valid_socket(fd)) {
stream.assign(fd);
stream.non_blocking(true);
stream.cloexec(true);
// } else {
// detail::throw_logic_error_with_errno("accept failed: %s", errno);
}
return static_cast<int>(fd);
}
template<class P>
int socket_acceptor<P>::accept(stream_type &stream, peer_type &endpoint)
{
auto addr_len = static_cast<socklen_t>(endpoint.size());
auto fd = ::accept(this->id(), endpoint.data(), &addr_len);
if (is_valid_socket(fd)) {
stream.assign(fd);
stream.non_blocking(true);
stream.cloexec(true);
// } else {
// detail::throw_logic_error_with_errno("accept failed: %s", errno);
}
return static_cast<int>(fd);
}
template < class P >
int socket_acceptor<P>::reuse_address(bool reuse)
{
const int option(reuse ? 1 : 0);
return setsockopt(this->id(), SOL_SOCKET, SO_REUSEADDR, (char*)&option, sizeof(option));
}
template < class P >
bool socket_acceptor<P>::reuse_address() const
{
size_t option {};
socklen_t i;
i = sizeof(option);
getsockopt(this->id(), SOL_SOCKET, SO_REUSEADDR, (char*)&option, &i);
return option > 0;
}
/// @endcond
}
#endif //MATADOR_SOCKET_ACCEPTOR_HPP

View File

@ -0,0 +1,38 @@
#ifndef MATADOR_SOCKET_INTERRUPTER_HPP
#define MATADOR_SOCKET_INTERRUPTER_HPP
#include "matador/net/ip.hpp"
// #include "matador/logger/logger.hpp"
#include <array>
namespace matador::net {
/// @cond MATADOR_DEV
class OOS_NET_API socket_interrupter
{
public:
socket_interrupter();
~socket_interrupter();
socket_type socket_id() const;
void interrupt();
bool reset();
private:
matador::tcp::socket server_;
matador::tcp::socket client_;
// matador::logger log_;
std::array<char, 1> indicator_ = { { 0 } };
std::array<char, 1> consumer_ = {};
};
/// @endcond
}
#endif //MATADOR_SOCKET_INTERRUPTER_HPP

View File

@ -0,0 +1,95 @@
#ifndef MATADOR_SOCKET_STREAM_HPP
#define MATADOR_SOCKET_STREAM_HPP
#ifdef _WIN64
#define ssize_t __int64
#elif _WIN32
#define ssize_t long
#endif
#include "matador/net/socket.hpp"
namespace matador {
/**
* The class represents a read write socket. It is
* independent to the protocol type (UDP or TCP).
*
* It provides to methods receive and send to read
* or write data to an open socket.
*
* @tparam P Protocol type of the socket
*/
template < class P >
class socket_stream : public socket_base<P>
{
public:
typedef socket_base<P> base; /**< Shortcut to socket base class */
typedef typename base::protocol_type protocol_type; /**< Shortcut to protocol type */
typedef typename base::peer_type peer_type; /**< Shortcut to the peer type */
/**
* Creates an uninitialized socket stream
*/
socket_stream() = default;
/**
* Creates a socket stream for the given
* protocol type.
*
* @param protocol Type of the protocol
*/
explicit socket_stream(const protocol_type &protocol);
/**
* Receives data from the underlying open socket.
* All received data is stored in the given buffer.
* The limit of the data to receive is the size of
* the buffer
*
* Number of bytes is returned. If returned value is zero (0)
* the socket was closed. If the return value is -1 an error
* occurred or the socket was blocked (in case of nonblocking)
* The concrete error code and message can be retrieved with
* errno.
*
* @tparam Buffer type of the buffer object.
* @param buffer The buffer object.
* @return The number of bytes received, -1 on error or 0 on close
*/
template < class Buffer >
ssize_t receive(Buffer &buffer);
/**
*
* @tparam Buffer type of the buffer object.
* @param buffer The buffer object.
* @return The number of bytes sent, -1 on error or 0 on close
*/
template < class Buffer >
ssize_t send(const Buffer &buffer);
};
/// @cond MATADOR_DEV
template < class P >
socket_stream<P>::socket_stream(const protocol_type &protocol)
: base(protocol)
{}
template < class P >
template < class Buffer >
ssize_t socket_stream<P>::receive(Buffer &buffer)
{
return ::recv(this->id(), buffer.data(), static_cast<int>(buffer.capacity()), 0);
}
template < class P >
template < class Buffer >
ssize_t socket_stream<P>::send(const Buffer &buffer)
{
return ::send(this->id(), buffer.data(), static_cast<int>(buffer.size()), 0);
}
/// @endcond
}
#endif //MATADOR_SOCKET_STREAM_HPP

View File

@ -0,0 +1,96 @@
#ifndef MATADOR_STREAM_HANDLER_HPP
#define MATADOR_STREAM_HANDLER_HPP
#include "matador/utils/buffer_view.hpp"
#include "matador/logger/logger.hpp"
#include "matador/net/export.hpp"
#include "matador/net/handler.hpp"
#include "matador/net/ip.hpp"
#include "matador/net/io_stream.hpp"
#include <atomic>
#include <mutex>
namespace matador {
class handler_creator;
/**
* The stream_handler class implements the
* handler and io_stream interface and is
* used with the io_service to handle socket
* connections more easily in comparison
* with the plain reactor boilerplate.
*
* Instances of the class are used internally
* within the io_service hiding the read and write
* wiring. The user just use the interface
* provided by the io_service to setup
* a server.
*/
class OOS_NET_API stream_handler : public handler, public io_stream
{
public:
typedef std::function<void(tcp::peer, io_stream &)> t_init_handler; /**< Shortcut to the initialize function */
public:
/**
* Creates a new stream_handler for the given socket.
* The acceptor is the link to the creation source where
* the given init function is called when the handler
* is initialized
*
* @param sock Socket to read and write on
* @param endpoint Endpoint of the connection
* @param creator Pointer to the creating handler class
* @param init_handler Initialize function
*/
stream_handler(tcp::socket sock, tcp::peer endpoint, handler_creator *creator, t_init_handler init_handler);
void open() override;
socket_type handle() const override;
void on_input() override;
void on_output() override;
void on_except() override {}
void on_timeout() override {}
void on_close() override;
void close() override;
bool is_ready_write() const override;
bool is_ready_read() const override;
void read(buffer_view buf, t_read_handler read_handler) override;
void write(std::list<buffer_view> buffers, t_write_handler write_handler) override;
void close_stream() override;
tcp::socket &stream() override;
std::string name() const override;
private:
logger log_;
tcp::socket stream_;
tcp::peer endpoint_;
std::string name_;
buffer_view read_buffer_;
std::list<buffer_view> write_buffers_;
handler_creator *creator_ = nullptr;
t_init_handler init_handler_;
t_read_handler on_read_;
t_write_handler on_write_;
std::atomic_bool is_ready_to_read_ { false };
std::atomic_bool is_ready_to_write_ { false };
mutable std::mutex mutex_;
};
}
#endif //MATADOR_STREAM_HANDLER_HPP

View File

@ -10,6 +10,7 @@ class collection {
public: public:
private: private:
std::vector<Type> data_;
}; };

View File

@ -16,14 +16,13 @@ namespace matador::utils {
* This thread pool class implements the * This thread pool class implements the
* leader follower pattern. * leader follower pattern.
*/ */
class leader_follower_thread_pool class leader_follower_thread_pool {
{
private:
leader_follower_thread_pool(const leader_follower_thread_pool &);
leader_follower_thread_pool &operator=(const leader_follower_thread_pool &);
public: public:
leader_follower_thread_pool(const leader_follower_thread_pool&) = delete;
leader_follower_thread_pool& operator=(const leader_follower_thread_pool&) = delete;
leader_follower_thread_pool(leader_follower_thread_pool&&) = delete;
leader_follower_thread_pool& operator=(leader_follower_thread_pool&&) = delete;
/** /**
* Creates a new leader follower thread pool instance * Creates a new leader follower thread pool instance
* with the given thread pool size and given join * with the given thread pool size and given join
@ -34,7 +33,7 @@ public:
* @param join_func Join function. * @param join_func Join function.
*/ */
template<typename F> template<typename F>
leader_follower_thread_pool(std::size_t size, F join_func) leader_follower_thread_pool(const std::size_t size, F join_func)
: num_threads_(size), join_(join_func) : num_threads_(size), join_(join_func)
, follower_(size) {} , follower_(size) {}
@ -60,7 +59,7 @@ public:
* *
* @return Number of threads. * @return Number of threads.
*/ */
std::size_t size() const; [[nodiscard]] std::size_t size() const;
/** /**
* Shuts the thread pool down. * Shuts the thread pool down.
@ -81,14 +80,14 @@ public:
* *
* @return Number of follower threads. * @return Number of follower threads.
*/ */
std::size_t num_follower() const; [[nodiscard]] std::size_t num_follower() const;
/** /**
* Returns true if the thread pool is running. * Returns true if the thread pool is running.
* *
* @return True if thread pool is running. * @return True if thread pool is running.
*/ */
bool is_running() const; [[nodiscard]] bool is_running() const;
private: private:
/* /*

View File

@ -41,7 +41,7 @@ template <>
struct std::hash<matador::utils::uuid> { struct std::hash<matador::utils::uuid> {
std::size_t operator()(const matador::utils::uuid& u) const noexcept { std::size_t operator()(const matador::utils::uuid& u) const noexcept {
std::size_t h = 0; std::size_t h = 0;
for (uint32_t val : u.data()) { for (const uint32_t val : u.data()) {
h ^= std::hash<uint32_t>{}(val) + 0x9e3779b9 + (h << 6) + (h >> 2); h ^= std::hash<uint32_t>{}(val) + 0x9e3779b9 + (h << 6) + (h >> 2);
} }
return h; return h;

View File

@ -1,4 +1,10 @@
add_library(matador-core STATIC add_library(matador-core STATIC
../../include/matador/net/event_type.hpp
../../include/matador/net/handler.hpp
../../include/matador/net/os.hpp
../../include/matador/net/reactor.hpp
../../include/matador/net/select_fd_sets.hpp
../../include/matador/net/socket_interrupter.hpp
../../include/matador/object/attribute_definition_generator.hpp ../../include/matador/object/attribute_definition_generator.hpp
../../include/matador/object/attribute_definition.hpp ../../include/matador/object/attribute_definition.hpp
../../include/matador/object/basic_object_info.hpp ../../include/matador/object/basic_object_info.hpp

View File

@ -0,0 +1,119 @@
#include <utility>
#include "matador/net/acceptor.hpp"
#include "matador/net/reactor.hpp"
#include "matador/logger/log_manager.hpp"
namespace matador {
acceptor::acceptor()
: log_(matador::create_logger("Acceptor"))
{}
acceptor::acceptor(tcp::peer endpoint)
: endpoint_(std::move(endpoint))
, log_(matador::create_logger("Acceptor"))
{}
acceptor::acceptor(tcp::peer endpoint, t_accept_handler make_handler)
: endpoint_(std::move(endpoint))
, accept_handler_(std::move(make_handler))
, log_(matador::create_logger("Acceptor"))
{}
acceptor::~acceptor()
{
acceptor_.close();
}
void acceptor::accecpt(acceptor::t_accept_handler on_new_connection)
{
accept_handler_ = std::move(on_new_connection);
}
void acceptor::accecpt(const tcp::peer &endpoint, acceptor::t_accept_handler on_new_connection)
{
endpoint_ = endpoint;
accecpt(std::move(on_new_connection));
}
void acceptor::open()
{
acceptor_.bind(endpoint_);
acceptor_.listen(10);
name_ += " (fd: " + std::to_string(acceptor_.id()) + ")";
log_.debug("fd %d: accepting connections", handle());
}
socket_type acceptor::handle() const
{
return acceptor_.id();
}
void acceptor::on_input()
{
tcp::socket sock;
tcp::peer endpoint = create_client_endpoint();
log_.debug("fd %d: accepting connection ...", handle());
int ret = acceptor_.accept(sock, endpoint);
if (ret < 0) {
char error_buffer[1024];
os::strerror(errno, error_buffer, 1024);
log_.error("accept failed: %s", error_buffer);
} else {
// create new client handler
log_.debug("accepted connection from %s", endpoint.to_string().c_str());
auto h = accept_handler_(sock, endpoint, this);
get_reactor()->register_handler(h, event_type::READ_WRITE_MASK);
log_.debug("fd %d: accepted socket id %d", handle(), sock.id());
}
}
void acceptor::close()
{
log_.debug("closing acceptor %d", acceptor_.id());
acceptor_.close();
// Todo: unregister from reactor (maybe observer pattern?)
// notify()
}
bool acceptor::is_ready_write() const
{
return false;
}
bool acceptor::is_ready_read() const
{
return handle() > 0;
}
const tcp::peer &acceptor::endpoint() const
{
return endpoint_;
}
tcp::peer acceptor::create_client_endpoint() const
{
if (endpoint_.addr().is_v4()) {
return matador::tcp::peer(address::v4::empty());
} else {
return matador::tcp::peer(address::v6::empty());
}
}
void acceptor::notify_close(handler *)
{
}
std::string acceptor::name() const
{
return name_;
}
}

154
source/core/net/address.cpp Normal file
View File

@ -0,0 +1,154 @@
#include "matador/net/address.hpp"
#ifdef _WIN32
#else
#include <netinet/in.h>
#include <arpa/inet.h>
#endif
#include <stdexcept>
namespace matador {
const char* address_router<V6>::IP6ADDR_MULTICAST_ALLNODES = "FF02::1";
address::address(const sockaddr_in &addr)
: size_(sizeof(sockaddr_in))
{
socket_address_.sa_in = addr;
}
address::address(const sockaddr_in6 &addr)
: size_(sizeof(sockaddr_in6))
{
socket_address_.sa_in6 = addr;
}
address& address::operator=(const address &x)
{
if (this == &x) {
return *this;
}
clear();
size_ = x.size_;
socket_address_ = x.socket_address_;
return *this;
}
address::address(address &&x) noexcept
: socket_address_(x.socket_address_)
, size_(x.size_)
{
x.size_ = 0;
}
address& address::operator=(address &&x) noexcept
{
if (this == &x) {
return *this;
}
clear();
socket_address_ = x.socket_address_;
size_ = x.size_;
x.size_ = 0;
return *this;
}
address::~address()
{
clear();
}
unsigned int address::to_ulong() const
{
if (socket_address_.sa_raw.sa_family == PF_INET) {
return socket_address_.sa_in.sin_addr.s_addr;
} else {
return 0;
//return reinterpret_cast<sockaddr_in6 *>(addr_)->sin6_addr;
}
}
std::string address::to_string() const
{
char addstr[INET6_ADDRSTRLEN];
if (is_v4()) {
os::inet_ntop(socket_address_.sa_raw.sa_family, &socket_address_.sa_in.sin_addr, addstr, INET6_ADDRSTRLEN);
} else {
os::inet_ntop(socket_address_.sa_raw.sa_family, &socket_address_.sa_in6.sin6_addr, addstr, INET6_ADDRSTRLEN);
}
return std::string(addstr);
}
void address::port(unsigned short pn)
{
if (is_v4()) {
socket_address_.sa_in.sin_port = htons(pn);
} else {
socket_address_.sa_in6.sin6_port = htons(pn);
}
}
unsigned short address::port() const
{
if (is_v4()) {
return ntohs(socket_address_.sa_in.sin_port);
} else {
return ntohs(socket_address_.sa_in6.sin6_port);
}
}
bool address::is_v4() const
{
return socket_address_.sa_raw.sa_family == PF_INET;
}
bool address::is_v6() const
{
return socket_address_.sa_raw.sa_family == PF_INET6;
}
sockaddr *address::addr()
{
return &socket_address_.sa_raw;
}
const sockaddr *address::addr() const
{
return &socket_address_.sa_raw;
}
sockaddr_in *address::addr_v4()
{
return &socket_address_.sa_in;
}
const sockaddr_in *address::addr_v4() const
{
return &socket_address_.sa_in;
}
sockaddr_in6 *address::addr_v6()
{
return &socket_address_.sa_in6;
}
const sockaddr_in6 *address::addr_v6() const
{
return &socket_address_.sa_in6;
}
socklen_t address::size() const
{
return size_;
}
void address::clear()
{
if (is_v4()) {
memset(&socket_address_.sa_in, 0, sizeof(socket_address_.sa_in));
} else {
memset(&socket_address_.sa_in6, 0, sizeof(socket_address_.sa_in6));
}
}
}

View File

@ -0,0 +1,19 @@
#include "matador/net/address_resolver.hpp"
namespace matador {
namespace detail {
template<>
int determine_socktype<tcp>()
{
return SOCK_STREAM;
}
template<>
int determine_socktype<udp>()
{
return SOCK_DGRAM;
}
}
}

View File

@ -0,0 +1,84 @@
#include "matador/logger/log_manager.hpp"
#include "matador/net/connector.hpp"
#include "matador/net/reactor.hpp"
#include "matador/utils/os.hpp"
#include <utility>
namespace matador {
connector::connector()
: log_(matador::create_logger("Connector"))
{}
connector::connector(t_connect_handler on_new_connection)
: connect_handler_(std::move(on_new_connection))
, log_(matador::create_logger("Connector"))
{}
void connector::connect(reactor &r, const std::vector<tcp::peer> &endpoints)
{
endpoints_ = endpoints;
r.schedule_timer(shared_from_this(), 0, 3);
}
void connector::connect(reactor &r, const std::vector<tcp::peer> &endpoints, t_connect_handler on_new_connection)
{
connect_handler_ = std::move(on_new_connection);
connect(r, endpoints);
}
socket_type connector::handle() const
{
return 0;
}
void connector::on_timeout()
{
tcp::socket stream;
for (const auto &ep : endpoints_) {
if (!ep.addr().is_v4()) {
continue;
}
auto ret = matador::connect(stream, ep);
if (ret != 0) {
char error_buffer[1024];
log_.error("couldn't establish connection to: %s", ep.to_string().c_str(), os::strerror(errno, error_buffer, 1024));
continue;
} else {
log_.info("connection established to %s (fd: %d)", ep.to_string().c_str(), stream.id());
}
stream.non_blocking(true);
auto h = connect_handler_(stream, ep, this);
get_reactor()->register_handler(h, event_type::READ_WRITE_MASK);
get_reactor()->cancel_timer(shared_from_this());
break;
}
endpoints_.clear();
}
bool connector::is_ready_write() const
{
return false;
}
bool connector::is_ready_read() const
{
return false;
}
void connector::notify_close(handler *)
{
}
std::string connector::name() const
{
return "connector";
}
}

38
source/core/net/error.cpp Normal file
View File

@ -0,0 +1,38 @@
#include "matador/net/error.hpp"
#include "matador/utils/os.hpp"
#include <stdexcept>
#ifdef _WIN32
#include <WS2tcpip.h>
#else
#include <netdb.h>
#endif
namespace matador {
namespace detail {
void throw_logic_error(const char* msg)
{
throw std::logic_error(msg);
}
void throw_logic_error_with_errno(const char* msg, int err)
{
char error_buffer[1024];
os::strerror(err, error_buffer, 1024);
char message_buffer[1024];
os::sprintf(message_buffer, 1024, msg, error_buffer);
throw std::logic_error(message_buffer);
}
void throw_logic_error_with_gai_errno(const char* msg, int err)
{
char message_buffer[1024];
os::sprintf(message_buffer, 1024, msg, gai_strerror(err));
throw std::logic_error(message_buffer);
}
}
}

66
source/core/net/fdset.cpp Normal file
View File

@ -0,0 +1,66 @@
#include "matador/net/fdset.hpp"
namespace matador {
fdset::fdset()
{
reset();
}
fdset::fdset(fdset&& x) noexcept
: max_fd_set_(std::move(x.max_fd_set_))
, fd_set_(x.fd_set_) {}
fdset& fdset::operator=(fdset&& x) noexcept
{
max_fd_set_ = std::move(x.max_fd_set_);
fd_set_ = x.fd_set_;
return *this;
}
// set all bits to zero
void fdset::reset()
{
FD_ZERO(&fd_set_);
max_fd_set_.clear();
max_fd_set_.insert(0);
}
bool fdset::is_set(socket_type fd) const
{
return FD_ISSET(fd, &fd_set_) > 0;
}
void fdset::clear(socket_type fd)
{
FD_CLR(fd, &fd_set_);
max_fd_set_.erase(fd);
}
void fdset::set(socket_type fd)
{
FD_SET(fd, &fd_set_);
max_fd_set_.insert(fd);
}
socket_type fdset::maxp1() const
{
return *max_fd_set_.begin();
}
size_t fdset::count() const
{
return max_fd_set_.size() - 1;
}
bool fdset::empty() const
{
return count() == 0;
}
fd_set* fdset::get()
{
return &fd_set_;
}
}

View File

@ -0,0 +1,48 @@
#include "matador/net/handler.hpp"
#include <ctime>
namespace matador {
time_t handler::next_timeout() const
{
return next_timeout_;
}
time_t handler::interval() const
{
return interval_;
}
reactor *handler::get_reactor() const
{
return reactor_;
}
void handler::register_reactor(reactor *r)
{
reactor_ = r;
}
void handler::schedule(time_t offset, time_t interval)
{
next_timeout_ = ::time(nullptr) + offset;
interval_ = interval;
}
void handler::cancel_timer()
{
next_timeout_ = 0;
interval_ = 0;
}
void handler::calculate_next_timeout(time_t now)
{
if (interval_ > 0) {
next_timeout_ = now + interval_;
} else {
next_timeout_ = 0;
}
}
}

View File

@ -0,0 +1,31 @@
#include "matador/net/io_service.hpp"
#include "matador/logger/log_manager.hpp"
namespace matador {
io_service::io_service()
: log_(matador::create_logger("IOService"))
{}
io_service::~io_service()
{
reactor_.shutdown();
}
void io_service::run()
{
reactor_.run();
}
bool io_service::is_running() const
{
return reactor_.is_running();
}
void io_service::shutdown()
{
reactor_.shutdown();
}
}

View File

@ -0,0 +1,106 @@
#include "matador/net/leader_follower_thread_pool.hpp"
#include "matador/utils/thread_helper.hpp"
#include <algorithm>
namespace matador {
leader_follower_thread_pool::~leader_follower_thread_pool()
{
shutdown();
}
void leader_follower_thread_pool::start()
{
is_running_ = true;
for (std::size_t i = 0; i < num_threads_; ++i) {
threads_.emplace_back([this] { execute(); });
}
log_.info("thread pool started with %d threads", num_threads_);
}
void leader_follower_thread_pool::stop()
{
is_running_ = false;
}
void leader_follower_thread_pool::promote_new_leader()
{
std::lock_guard<std::mutex> l(mutex_);
if (leader_ != std::this_thread::get_id()) {
return;
}
leader_ = null_id;
signal_ready_ = true;
condition_synchronizer_.notify_one();
}
std::size_t leader_follower_thread_pool::size() const
{
return threads_.size();
}
void leader_follower_thread_pool::shutdown()
{
{
const std::lock_guard<std::mutex> l(mutex_);
// if (!is_running_) {
// return;
// }
stop();
log_.info("shutting down; notifying all tasks");
signal_shutdown_ = true;
condition_synchronizer_.notify_all();
}
std::for_each(threads_.begin(), threads_.end(), [](thread_vector_t::reference item) {
if (item.joinable()) {
item.join();
}
});
}
std::thread::id leader_follower_thread_pool::leader() {
const std::lock_guard<std::mutex> l(mutex_);
return leader_;
}
std::size_t leader_follower_thread_pool::num_follower() const {
return follower_;
}
bool leader_follower_thread_pool::is_running() const
{
return is_running_;
}
void leader_follower_thread_pool::execute() {
std::unique_lock<std::mutex> l(mutex_);
while (is_running_) {
while (leader_ != null_id) {
// log_.info("thread <%d> waiting for synchronizer (leader %d)",
// acquire_thread_index(std::this_thread::get_id()),
// acquire_thread_index(leader_));
condition_synchronizer_.wait(l, [this]() { return signal_ready_ || signal_shutdown_; });
signal_ready_ = false;
if (!is_running_) {
return;
}
}
// log_.info("new leader <%d> (thread <%d> is now follower)",acquire_thread_index(std::this_thread::get_id()) , acquire_thread_index(leader_));
leader_ = std::this_thread::get_id();
l.unlock();
join_();
// log_.info("thread <%d> finished work", acquire_thread_index(std::this_thread::get_id()));
l.lock();
}
}
}

84
source/core/net/os.cpp Normal file
View File

@ -0,0 +1,84 @@
#include "matador/net/os.hpp"
#ifdef _WIN32
#include <Ws2tcpip.h>
#else
#include <arpa/inet.h>
#include <unistd.h>
#endif
namespace matador {
namespace net {
void init()
{
#ifdef _WIN32
WSADATA wsaData; // if this doesn't work
//WSAData wsaData; // then try this instead
// MAKEWORD(1,1) for Winsock 1.1, MAKEWORD(2,0) for Winsock 2.0:
if (WSAStartup(MAKEWORD(1,1), &wsaData) != 0) {
fprintf(stderr, "WSAStartup failed.\n");
exit(1);
}
#endif
}
void cleanup()
{
#ifdef _WIN32
WSACleanup();
#endif
}
}
bool is_valid_socket(socket_type fd) {
#ifdef _WIN32
return fd != INVALID_SOCKET;
#else
return fd >= 0;
#endif
}
namespace os {
int inet_pton(int af, const char *src, void *dst)
{
#ifdef _WIN32
return ::InetPton(af, src, dst);
#else
return ::inet_pton(af, src, dst);
#endif
}
const char* inet_ntop(int af, const void* src, char* dst, size_t size)
{
#ifdef _WIN32
return ::InetNtop(af, const_cast<void*>(src), dst, size);
#else
return ::inet_ntop(af, src, dst, size);
#endif
}
int close(socket_type fd)
{
#ifdef _WIN32
return ::closesocket(fd);
#else
return ::close(fd);
#endif
}
int shutdown(socket_type fd, shutdown_type type)
{
#ifdef _WIN32
return ::shutdown(fd, static_cast<int>(type));
#else
return ::shutdown(fd, static_cast<int>(type));
#endif
}
}
}

411
source/core/net/reactor.cpp Normal file
View File

@ -0,0 +1,411 @@
#include "matador/net/reactor.hpp"
#include "matador/net/handler.hpp"
#include "matador/logger/log_manager.hpp"
#include <algorithm>
#include <limits>
#include <cerrno>
#include <ctime>
#include <iostream>
namespace matador {
reactor::reactor()
: sentinel_(std::shared_ptr<handler>(nullptr))
, log_(create_logger("Reactor"))
, thread_pool_(4, [this]() { handle_events(); })
{
}
reactor::~reactor()
{
log_.debug("destroying reactor");
thread_pool_.shutdown();
}
void reactor::register_handler(const handler_ptr& h, event_type et)
{
h->register_reactor(this);
h->open();
std::lock_guard<std::mutex> l(mutex_);
auto it = find_handler_type(h);
if (it == handlers_.end()) {
handlers_.emplace_back(h, et);
} else if (it->first != h) {
throw std::logic_error("given handler isn't expected handler");
} else {
it->second = it->second | et;
}
interrupt_without_lock();
}
void reactor::unregister_handler(const handler_ptr& h, event_type)
{
std::lock_guard<std::mutex> l(mutex_);
auto it = find_handler_type(h);
if (it != handlers_.end()) {
(*it).first->close();
handlers_.erase(it);
}
interrupt_without_lock();
}
void reactor::schedule_timer(const std::shared_ptr<handler>& h, time_t offset, time_t interval)
{
h->register_reactor(this);
std::lock_guard<std::mutex> l(mutex_);
auto it = find_handler_type(h);
if (it == handlers_.end()) {
handlers_.emplace_back(h, event_type::TIMEOUT_MASK);
}
h->schedule(offset, interval);
}
void reactor::cancel_timer(const std::shared_ptr<handler>& h)
{
std::lock_guard<std::mutex> l(mutex_);
auto it = find_handler_type(h);
if (it != handlers_.end()) {
handlers_.erase(it);
}
h->cancel_timer();
}
void reactor::run()
{
// log_.info("start dispatching all clients");
thread_pool_.start();
{
// log_.info("waiting for reactor shutdown");
std::unique_lock<std::mutex> l(mutex_);
shutdown_.wait(l, [this]() { return shutdown_requested_.load(); });
cleanup();
}
// log_.info("all clients dispatched; shutting down");
thread_pool_.stop();
}
void reactor::handle_events()
{
// std::cout << this << " start handle events\n" << std::flush;
// log_.info("handle events");
running_ = true;
time_t timeout;
select_fdsets fd_sets;
prepare_select_bits(timeout, fd_sets);
// std::cout << this << " fd sets r: " << fd_sets.read_set().count() << ", w: " << fd_sets.write_set().count() << ", e: " << fd_sets.except_set().count() << ", max: " << fd_sets.maxp1() << "\n" << std::flush;
// log_.debug("fds [r: %d, w: %d, e: %d]",
// fdsets_.read_set().count(),
// fdsets_.write_set().count(),
// fdsets_.except_set().count());
// if (timeout != (std::numeric_limits<time_t>::max)()) {
// log_.debug("next timeout in %d sec", timeout);
// }
struct timeval tselect{};
struct timeval* p = nullptr;
if (timeout < (std::numeric_limits<time_t>::max)()) {
tselect.tv_sec = timeout;
tselect.tv_usec = 0;
p = &tselect;
// std::cout << this << " next timeout in " << p->tv_sec << " seconds\n" << std::flush;
}
if (!has_clients_to_handle(timeout, fd_sets)) {
// std::cout << this << " no clients to handle; returning\n" << std::flush;
// log_.info("no clients to handle, exiting");
return;
}
int ret;
while ((ret = select(p, fd_sets)) < 0) {
// std::cout << this << " select returned with error " << ret << "\n" << std::flush;
if(errno != EINTR) {
char error_buffer[1024];
log_.warn("select failed: %s", os::strerror(errno, error_buffer, 1024));
shutdown();
} else {
return;
}
}
// std::cout << this << " select returned with active requests " << ret << "\n" << std::flush;
bool interrupted = is_interrupted(fd_sets);
if (interrupted) {
if (!shutdown_requested_) {
// std::cout << this << " reactor was interrupted - promote new leader\n" << std::flush;
// log_.info("reactor was interrupted");
thread_pool_.promote_new_leader();
return;
} else {
// std::cout << this << " reactor was interrupted for shutdown\n" << std::flush;
// log_.info("shutting down");
// cleanup();
shutdown_.notify_one();
return;
}
}
time_t now = ::time(nullptr);
t_handler_type handler_type = resolve_next_handler(now, fd_sets);
if (handler_type.first) {
deactivate_handler(handler_type.first, handler_type.second);
// std::cout << this << " handling client " << handler_type.first->name() << " - promoting new leader\n" << std::flush;
thread_pool_.promote_new_leader();
// log_.info("start handling event");
// handle event
if (handler_type.second == event_type::WRITE_MASK) {
on_write_mask(handler_type.first);
} else if (handler_type.second == event_type::READ_MASK) {
on_read_mask(handler_type.first);
} else if (handler_type.second == event_type::TIMEOUT_MASK) {
on_timeout(handler_type.first, now);
} else {
// log_.info("unknown event type");
}
activate_handler(handler_type.first, handler_type.second);
remove_deleted();
} else {
// no handler found
// log_.info("no handler found");
thread_pool_.promote_new_leader();
}
}
void reactor::shutdown()
{
if (!is_running()) {
return;
}
// shutdown the reactor properly
log_.info("shutting down reactor");
shutdown_requested_ = true;
interrupt();
}
bool reactor::is_running() const
{
return running_;
}
void reactor::prepare_select_bits(time_t& timeout, select_fdsets& fd_sets) const
{
std::lock_guard<std::mutex> l(mutex_);
fd_sets.reset();
time_t now = ::time(nullptr);
timeout = (std::numeric_limits<time_t>::max)();
// set interrupter fd
fd_sets.read_set().set(interrupter_.socket_id());
for (const auto &h : handlers_) {
if (h.first == nullptr) {
continue;
}
if (h.first->is_ready_read() && is_event_type_set(h.second, event_type::READ_MASK)) {
fd_sets.read_set().set(h.first->handle());
}
if (h.first->is_ready_write() && is_event_type_set(h.second, event_type::WRITE_MASK)) {
fd_sets.write_set().set(h.first->handle());
}
if (h.first->next_timeout() > 0 && is_event_type_set(h.second, event_type::TIMEOUT_MASK)) {
timeout = (std::min)(timeout, h.first->next_timeout() <= now ? 0 : (h.first->next_timeout() - now));
}
}
}
void reactor::remove_deleted()
{
while (!handlers_to_delete_.empty()) {
auto h = handlers_to_delete_.front();
handlers_to_delete_.pop_front();
auto fi = std::find_if(handlers_.begin(), handlers_.end(), [&h](const t_handler_type &ht) {
return ht.first.get() == h.get();
});
if (fi != handlers_.end()) {
log_.debug("removing handler %d", fi->first->handle());
handlers_.erase(fi);
}
}
}
void reactor::cleanup()
{
while (!handlers_.empty()) {
auto hndlr = handlers_.front();
handlers_.pop_front();
hndlr.first->close();
}
}
int reactor::select(struct timeval *timeout, select_fdsets& fd_sets)
{
log_.debug("calling select; waiting for io events");
return ::select(
static_cast<int>(fd_sets.maxp1()) + 1,
fd_sets.read_set().get(),
fd_sets.write_set().get(),
fd_sets.except_set().get(),
timeout
);
}
//void reactor::process_handler(int /*ret*/)
//{
// handlers_.emplace_back(sentinel_, event_type::NONE_MASK);
// time_t now = ::time(nullptr);
// while (handlers_.front().first != nullptr) {
// auto h = handlers_.front();
// handlers_.pop_front();
// handlers_.push_back(h);
// // check for read/accept
// if (h.first->handle() > 0 && fdsets_.write_set().is_set(h.first->handle())) {
// on_write_mask(h.first);
// }
// if (h.first->handle() > 0 && fdsets_.read_set().is_set(h.first->handle())) {
// on_read_mask(h.first);
// }
// if (h.first->next_timeout() > 0 && h.first->next_timeout() <= now) {
// on_timeout(h.first, now);
// }
// }
// handlers_.pop_front();
//}
reactor::t_handler_type reactor::resolve_next_handler(time_t now, select_fdsets& fd_sets)
{
std::lock_guard<std::mutex> l(mutex_);
for (auto &h : handlers_) {
if (h.first->handle() > 0 && fd_sets.write_set().is_set(h.first->handle())) {
return std::make_pair(h.first, event_type::WRITE_MASK);
}
if (h.first->handle() > 0 && fd_sets.read_set().is_set(h.first->handle())) {
return std::make_pair(h.first, event_type::READ_MASK);
}
if (h.first->next_timeout() > 0 && h.first->next_timeout() <= now) {
return std::make_pair(h.first, event_type::TIMEOUT_MASK);
}
}
return std::make_pair(nullptr, event_type::NONE_MASK);
}
void reactor::on_read_mask(const handler_ptr& handler)
{
// log_.debug("read bit for handler %d is set; handle input", h->handle());
// std::cout << this << " (handler " << h.get() << "): handle read\n" << std::flush;
handler->on_input();
}
void reactor::on_write_mask(const handler_ptr& handler)
{
// log_.debug("write bit for handler %d is set; handle output", h->handle());
// std::cout << this << " (handler " << h.get() << "): handle write\n" << std::flush;
handler->on_output();
}
void reactor::on_except_mask(const handler_ptr& /*handler*/)
{
// std::cout << this << " (handler " << h.get() << "): handle exception\n" << std::flush;
}
void reactor::on_timeout(const handler_ptr &h, time_t now)
{
// log_.debug("timeout expired for handler %d; handle timeout", h->handle());
// std::cout << this << " (handler " << h.get() << "): handle timeout\n" << std::flush;
h->calculate_next_timeout(now);
h->on_timeout();
}
select_fdsets reactor::fdsets() const
{
time_t timeout;
select_fdsets fd_sets;
prepare_select_bits(timeout, fd_sets);
return fd_sets;
}
void reactor::mark_handler_for_delete(const handler_ptr& h)
{
std::lock_guard<std::mutex> l(mutex_);
handlers_to_delete_.push_back(h);
}
bool reactor::is_interrupted(select_fdsets& fd_sets)
{
std::lock_guard<std::mutex> l(mutex_);
if (fd_sets.read_set().is_set(interrupter_.socket_id())) {
log_.debug("interrupt byte received; resetting interrupter");
if (shutdown_requested_) {
running_ = false;
}
return interrupter_.reset();
}
return false;
}
bool reactor::has_clients_to_handle(time_t timeout, select_fdsets& fd_sets) const {
std::lock_guard<std::mutex> lock(mutex_);
return fd_sets.maxp1() > 0 || timeout != (std::numeric_limits<time_t>::max)();
}
std::list<reactor::t_handler_type>::iterator reactor::find_handler_type(const reactor::handler_ptr &h)
{
return std::find_if(handlers_.begin(), handlers_.end(), [&h](const t_handler_type &ht) {
return ht.first.get() == h.get();
});
}
void reactor::activate_handler(const reactor::handler_ptr &h, event_type ev)
{
std::lock_guard<std::mutex> l(mutex_);
auto it = find_handler_type(h);
if (it == handlers_.end()) {
return;
}
it->second |= ev;
}
void reactor::deactivate_handler(const reactor::handler_ptr &h, event_type ev)
{
std::lock_guard<std::mutex> l(mutex_);
auto it = find_handler_type(h);
if (it == handlers_.end()) {
return;
}
it->second &= ~ev;
}
void reactor::interrupt()
{
log_.trace("interrupting reactor");
std::lock_guard<std::mutex> l(mutex_);
interrupter_.interrupt();
}
void reactor::interrupt_without_lock()
{
log_.trace("interrupting reactor");
interrupter_.interrupt();
}
}

View File

@ -0,0 +1,74 @@
#include "matador/net/select_fdsets.hpp"
#include <algorithm>
namespace matador {
socket_type select_fdsets::maxp1() const
{
return (std::max)(fdsets_[0].maxp1(), (std::max)(fdsets_[1].maxp1(), fdsets_[2].maxp1()));
}
fdset& select_fdsets::fd_set(fdset_type type)
{
return fdsets_[type];
}
fdset& select_fdsets::read_set()
{
return fdsets_[0];
}
const fdset& select_fdsets::read_set() const
{
return fdsets_[0];
}
fdset& select_fdsets::write_set()
{
return fdsets_[1];
}
const fdset& select_fdsets::write_set() const
{
return fdsets_[1];
}
fdset& select_fdsets::except_set()
{
return fdsets_[2];
}
const fdset& select_fdsets::except_set() const
{
return fdsets_[2];
}
void select_fdsets::reset()
{
for (auto &fdset : fdsets_) {
fdset.reset();
}
}
void select_fdsets::reset(fdset_type type)
{
fdsets_[type].reset();
}
bool select_fdsets::is_set(fdset_type type, int fd) const
{
return fdsets_[type].is_set(fd);
}
void select_fdsets::clear(fdset_type type, int fd)
{
fdsets_[type].clear(fd);
}
void select_fdsets::set(fdset_type type, int fd)
{
fdsets_[type].set(fd);
}
}

View File

@ -0,0 +1,74 @@
#include "matador/net/socket_interrupter.hpp"
#include "matador/utils/buffer_view.hpp"
#include "matador/logger/log_manager.hpp"
#ifndef _WIN32
#include <netinet/tcp.h>
#endif
namespace matador {
socket_interrupter::socket_interrupter()
: client_(tcp::v4())
, log_(create_logger("SocketInterrupter"))
{
/*
* setup acceptor
* - create socket
* - set reuse address option
* - bind to localhost:0 (to get random port)
* - get address
* - listen
*/
tcp::acceptor acceptor;
acceptor.reuse_address(true);
tcp::peer local(address::v4::loopback());
acceptor.bind(local);
acceptor.listen(SOMAXCONN);
log_.debug("listening for interruptions at %d", acceptor.id());
/*
* setup connection
* - connect to server
* - accept client
* - prepare server
*/
client_.connect(tcp::peer(address::v4::loopback(), local.port()));
acceptor.accept(server_);
client_.non_blocking(true);
client_.options(TCP_NODELAY, true);
}
socket_interrupter::~socket_interrupter()
{
client_.close();
server_.close();
}
socket_type socket_interrupter::socket_id() const
{
return server_.id();
}
void socket_interrupter::interrupt()
{
buffer_view buf(indicator_);
log_.debug("fd %d: sending interrupt to fd %d", client_.id(), server_.id());
client_.send(buf);
}
bool socket_interrupter::reset()
{
buffer_view buf(consumer_);
log_.debug("reading interrupt byte");
auto nread = server_.receive(buf);
bool interrupted = nread > 0;
while (nread == static_cast<ssize_t>(buf.capacity())) {
nread = server_.receive(buf);
}
return interrupted;
}
}

View File

@ -0,0 +1,153 @@
#include "matador/net/stream_handler.hpp"
#include "matador/net/handler_creator.hpp"
#include "matador/net/reactor.hpp"
#include "matador/utils/buffer_view.hpp"
#include "matador/logger/log_manager.hpp"
#include <cerrno>
#include <chrono>
#include <iostream>
namespace matador {
stream_handler::stream_handler(tcp::socket sock, tcp::peer endpoint, handler_creator *creator, t_init_handler init_handler)
: log_(create_logger("StreamHandler"))
, stream_(std::move(sock))
, endpoint_(std::move(endpoint))
, name_(endpoint.to_string() + " (fd: " + std::to_string(stream_.id()) + ")")
, creator_(creator)
, init_handler_(std::move(init_handler))
{
log_.debug("%s: created stream handler", name_.c_str());
}
void stream_handler::open()
{
init_handler_(endpoint_, *this);
}
socket_type stream_handler::handle() const
{
return stream_.id();
}
void stream_handler::on_input()
{
auto len = stream_.receive(read_buffer_);
log_.trace("%s: read %d bytes", name().c_str(), len);
if (len == 0) {
on_close();
} else if (len < 0 && errno != EWOULDBLOCK) {
char error_buffer[1024];
log_.error("%s: error on read: %s", name().c_str(), os::strerror(errno, error_buffer, 1024));
is_ready_to_read_ = false;
on_read_(static_cast<long>(len), static_cast<long>(len));
on_close();
} else {
log_.debug("%s: received %d bytes (data: %s)", name().c_str(), len, read_buffer_.data());
read_buffer_.bump(len);
is_ready_to_read_ = false;
on_read_(0, static_cast<long>(len));
}
}
void stream_handler::on_output()
{
ssize_t bytes_total = 0;
auto start = std::chrono::high_resolution_clock::now();
while (!write_buffers_.empty()) {
buffer_view &bv = write_buffers_.front();
auto len = stream_.send(bv);
log_.trace("%s: sent %d bytes", name().c_str(), len);
if (len == 0) {
on_close();
} else if (len < 0 && errno != EWOULDBLOCK) {
char error_buffer[1024];
log_.error("%s: error on write: %s", name().c_str(), os::strerror(errno, error_buffer, 1024));
on_close();
is_ready_to_write_ = false;
on_write_(static_cast<long>(len), static_cast<long>(len));
} else if (len < 0 && errno == EWOULDBLOCK) {
log_.debug("%s: sent %d bytes (blocked)", name().c_str(), bytes_total);
} else {
bytes_total += len;
bv.bump(len);
if (bv.full()) {
write_buffers_.pop_front();
}
}
}
auto end = std::chrono::high_resolution_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
log_.debug("%s: sent %d bytes (%dms)", name().c_str(), bytes_total, elapsed);
is_ready_to_write_ = false;
on_write_(0, static_cast<long>(bytes_total));
}
void stream_handler::on_close()
{
log_.debug("%s: closing connection", name().c_str(), handle());
stream_.close();
creator_->notify_close( this );
auto self = shared_from_this();
get_reactor()->mark_handler_for_delete(self);
get_reactor()->unregister_handler(self, event_type::READ_WRITE_MASK);
}
void stream_handler::close()
{
if (!stream_.is_open()) {
return;
}
log_.debug("%s: closing connection", name().c_str(), handle());
stream_.close();
creator_->notify_close( this );
}
bool stream_handler::is_ready_write() const
{
return is_ready_to_write_ && !write_buffers_.empty();
}
bool stream_handler::is_ready_read() const
{
return is_ready_to_read_ && !read_buffer_.full();
}
void stream_handler::read(buffer_view buf, t_read_handler read_handler)
{
on_read_ = std::move(read_handler);
read_buffer_ = std::move(buf);
is_ready_to_read_ = true;
get_reactor()->interrupt();
}
void stream_handler::write(std::list<buffer_view> buffers, io_stream::t_write_handler write_handler)
{
on_write_ = std::move(write_handler);
write_buffers_ = std::move(buffers);
is_ready_to_write_ = true;
get_reactor()->interrupt();
}
void stream_handler::close_stream()
{
on_close();
}
tcp::socket &stream_handler::stream()
{
return stream_;
}
std::string stream_handler::name() const
{
return name_;
}
}