query/include/matador/utils/message_bus.hpp

347 lines
11 KiB
C++

#ifndef MATADOR_MESSAGE_BUS_HPP
#define MATADOR_MESSAGE_BUS_HPP
#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <typeindex>
#include <typeinfo>
#include <unordered_map>
#include <vector>
#include <algorithm>
#include <stdexcept>
namespace matador::utils {
class message_bus;
/**
* @brief Represents a generic, type-safe message object used for communication.
*/
class message {
public:
/**
* @brief Default constructor for an empty message.
*/
message();
/**
* @brief Constructs an owning message from a value.
*
* @tparam MessageType The type of the message.
* @param msg The message to be stored. A copy is made.
*/
template<typename MessageType>
explicit message(const MessageType& msg) {
auto strong = std::make_shared<MessageType>(msg); // allocate
owner_ = std::static_pointer_cast<void>(strong); // share ownership as void*
ptr_ = static_cast<const void*>(strong.get());
type_ = std::type_index(typeid(MessageType));
}
/**
* @brief Constructs a non-owning message from a reference.
*
* @tparam MessageType The type of the reference.
* @param msg A reference to the message data. The caller must ensure its lifetime during use.
* @return A non-owning message wrapping the reference.
*/
template<typename MessageType>
static message from_ref(const MessageType& msg) {
message m;
m.ptr_ = static_cast<const void*>(std::addressof(msg));
m.type_ = std::type_index(typeid(MessageType));
// owner_ remains null => non-owning
return m;
}
/**
* @brief Constructs an owning message from a shared pointer.
*
* @tparam MessageType The type of the object managed by the shared pointer.
* @param msg A shared pointer to the message.
* @return A message that shares ownership of the object.
*/
template<typename MessageType>
static message from_shared(std::shared_ptr<MessageType> msg) {
message m;
m.owner_ = std::static_pointer_cast<void>(msg);
m.ptr_ = static_cast<const void*>(msg.get());
m.type_ = std::type_index(typeid(MessageType));
return m;
}
/**
* @brief Checks if the message is empty.
*
* @return True if the message is empty, false otherwise.
*/
[[nodiscard]] bool empty() const;
/**
* @brief Gets the type of the object stored in the message.
*
* @return The type of the stored object as a std::type_index.
*/
[[nodiscard]] std::type_index type() const;
/**
* @brief Checks if the stored object is of the specified type.
*
* @tparam MessageType The type to check.
* @return True if the object matches the type, false otherwise.
*/
template<typename MessageType>
[[nodiscard]] bool is() const { return type_ == std::type_index(typeid(MessageType)); }
/**
* @brief Accesses the stored object as a typed reference.
*
* @tparam MessageType The expected type of the object.
* @return A constant reference to the stored object.
* @throws std::bad_cast if the type does not match.
* @throws std::runtime_error if the message is empty.
*/
template<typename MessageType>
const MessageType& get() const {
if (!is<MessageType>()) throw std::bad_cast();
const void* p = raw_ptr();
if (!p) throw std::runtime_error("AnyMessage: empty pointer");
return *static_cast<const MessageType*>(p);
}
/**
* @brief Retrieves the raw pointer to the stored object.
*
* @return A const void* pointing to the stored object (may be non-owning).
*/
[[nodiscard]] const void* raw_ptr() const;
private:
const void* ptr_; // non-owning raw pointer (if set)
std::shared_ptr<void> owner_; // owning pointer if we made a copy or got shared_ptr
std::type_index type_{typeid(void)};
};
/**
* @brief Represents a subscription to a message_bus, providing RAII-style unsubscription.
*/
class subscription {
public:
/**
* @brief Default constructor for an empty subscription.
*/
subscription() = default;
/**
* @brief Constructs a subscription for a specific handler and message type.
*
* @param bus Pointer to the message bus managing the subscription.
* @param type The type_index of the message type.
* @param id The unique ID of the handler.
*/
subscription(message_bus* bus, std::type_index type, uint64_t id);
// Prohibit copying of subscription objects.
subscription(const subscription&) = delete;
subscription& operator=(const subscription&) = delete;
// Allow move semantics for subscription objects.
subscription(subscription&& other) noexcept;
subscription& operator=(subscription&& other) noexcept;
/**
* @brief Destructor that ensures cleanup by unsubscribing from the bus.
*/
~subscription();
/**
* @brief Unsubscribes the handler from the message bus.
*/
void unsubscribe();
/**
* @brief Checks if the subscription is still valid.
*
* @return True if the subscription is valid, false otherwise.
*/
[[nodiscard]] bool valid() const;
private:
message_bus* bus_ = nullptr;
std::type_index type_{typeid(void)};
uint64_t id_ = 0;
};
/**
* @brief A thread-safe message bus for runtime message publishing and subscription handling.
*/
class message_bus {
public:
using HandlerId = uint64_t; /**< Type alias for handler IDs. */
/**
* @brief Constructs a new message_bus object.
*/
message_bus();
/**
* @brief Subscribes to messages of a specific type using a free function or lambda.
*
* @tparam MessageType The type of the message to subscribe to.
* @param handler The function to execute when a message of type T is published.
* @param filter An optional filter function to determine if the handler should be invoked.
* @return A subscription object to manage the handler's registration.
*/
template<typename MessageType>
subscription subscribe(std::function<void(const MessageType&)> handler,
std::function<bool(const MessageType&)> filter = nullptr)
{
auto id = next_id_.fetch_add(1, std::memory_order_relaxed);
std::unique_lock writeLock(mutex_);
auto &vec = handlers_[std::type_index(typeid(MessageType))];
Entry e;
e.id = id;
e.handler = [h = std::move(handler)](const void* p) {
h(*static_cast<const MessageType*>(p));
};
if (filter) {
e.filter = [f = std::move(filter)](const void* p) -> bool {
return f(*static_cast<const MessageType*>(p));
};
} else {
e.filter = nullptr;
}
vec.emplace_back(std::move(e));
return {this, std::type_index(typeid(MessageType)), id};
}
/**
* @brief Subscribes to messages using a class member function bound to an instance.
*
* @tparam MessageType The type of the message to subscribe to.
* @tparam CallerClass The class of the instance.
* @param instance A pointer to the instance.
* @param memberFn A pointer to the member function to execute.
* @param filter An optional filter function.
* @return A subscription object to manage the handler's registration.
*/
template<typename MessageType, typename CallerClass>
subscription subscribe(CallerClass* instance, void (CallerClass::*memberFn)(const MessageType&),
std::function<bool(const MessageType&)> filter = nullptr)
{
auto fn = [instance, memberFn](const MessageType& m) { (instance->*memberFn)(m); };
return subscribe<MessageType>(std::function<void(const MessageType&)>(fn), std::move(filter));
}
/**
* @brief Subscribes to messages using a member function bound to a shared_ptr instance.
*
* @tparam MessageType The type of the message to subscribe to.
* @tparam CallerClass The class of the shared_ptr instance.
* @param instance A shared pointer to the instance.
* @param memberFn A pointer to the member function to execute.
* @param filter An optional filter function.
* @return A subscription object to manage the handler's registration.
*/
template<typename MessageType, typename CallerClass>
subscription subscribe(std::shared_ptr<CallerClass> instance, void (CallerClass::*memberFn)(const MessageType&),
std::function<bool(const MessageType&)> filter = nullptr)
{
std::weak_ptr<CallerClass> w = instance;
auto handler = [w, memberFn](const MessageType& m) {
if (auto s = w.lock()) {
(s.get()->*memberFn)(m);
}
};
std::function<bool(const MessageType&)> local_filter = nullptr;
if (filter) {
local_filter = [w, filter = std::move(filter)](const MessageType& m) -> bool {
if (w.expired()) {
return false;
}
return filter(m);
};
}
return subscribe<MessageType>(std::move(handler), std::move(local_filter));
}
/**
* @brief Unsubscribes a handler from the bus using its type and ID.
*
* @tparam MessageType The type of the message.
* @param id The unique handler ID to unsubscribe.
*/
template<typename MessageType>
void unsubscribe(const HandlerId id) {
unsubscribe(std::type_index(typeid(MessageType)), id);
}
/**
* @brief Unsubscribes a handler by its type index and ID.
*
* @param type The type_index of the message type.
* @param id The unique handler ID to unsubscribe.
*/
void unsubscribe(std::type_index type, HandlerId id);
/**
* @brief Unsubscribes a handler by its unique ID, regardless of type.
*
* @param id The unique handler ID to unsubscribe.
*/
void unsubscribe(HandlerId id);
/**
* @brief Publishes a message of a specific type.
*
* @tparam MessageType The type of the message to publish.
* @param msg The message to publish.
*/
template<typename MessageType>
void publish(const MessageType& msg) const {
std::vector<Entry> snapshot;
{
std::shared_lock readLock(mutex_);
const auto it = handlers_.find(std::type_index(typeid(MessageType)));
if (it == handlers_.end()) {
return;
}
snapshot = it->second; // copy list to avoid holding lock during callbacks
}
for (const auto &e : snapshot) {
if (!e.filter || e.filter(&msg)) e.handler(&msg);
}
}
/**
* @brief Publishes a generic message object.
*
* @param msg The message to publish.
*/
void publish(const message& msg) const;
private:
friend class subscription;
struct Entry {
HandlerId id;
std::function<void(const void*)> handler;
std::function<bool(const void*)> filter;
};
mutable std::shared_mutex mutex_;
std::unordered_map<std::type_index, std::vector<Entry>> handlers_;
std::atomic<HandlerId> next_id_;
};
inline void subscription::unsubscribe() {
if (bus_) {
bus_->unsubscribe(type_, id_);
bus_ = nullptr;
}
}
}
#endif //MATADOR_MESSAGE_BUS_HPP