#ifndef MATADOR_MESSAGE_BUS_HPP #define MATADOR_MESSAGE_BUS_HPP #include #include #include #include #include #include #include #include #include #include #include namespace matador::utils { class message_bus; /** * @brief Represents a generic, type-safe message object used for communication. */ class message { public: /** * @brief Default constructor for an empty message. */ message(); /** * @brief Constructs an owning message from a value. * * @tparam MessageType The type of the message. * @param msg The message to be stored. A copy is made. */ template explicit message(const MessageType& msg) { auto strong = std::make_shared(msg); // allocate owner_ = std::static_pointer_cast(strong); // share ownership as void* ptr_ = static_cast(strong.get()); type_ = std::type_index(typeid(MessageType)); } /** * @brief Constructs a non-owning message from a reference. * * @tparam MessageType The type of the reference. * @param msg A reference to the message data. The caller must ensure its lifetime during use. * @return A non-owning message wrapping the reference. */ template static message from_ref(const MessageType& msg) { message m; m.ptr_ = static_cast(std::addressof(msg)); m.type_ = std::type_index(typeid(MessageType)); // owner_ remains null => non-owning return m; } /** * @brief Constructs an owning message from a shared pointer. * * @tparam MessageType The type of the object managed by the shared pointer. * @param msg A shared pointer to the message. * @return A message that shares ownership of the object. */ template static message from_shared(std::shared_ptr msg) { message m; m.owner_ = std::static_pointer_cast(msg); m.ptr_ = static_cast(msg.get()); m.type_ = std::type_index(typeid(MessageType)); return m; } /** * @brief Checks if the message is empty. * * @return True if the message is empty, false otherwise. */ [[nodiscard]] bool empty() const; /** * @brief Gets the type of the object stored in the message. * * @return The type of the stored object as a std::type_index. */ [[nodiscard]] std::type_index type() const; /** * @brief Checks if the stored object is of the specified type. * * @tparam MessageType The type to check. * @return True if the object matches the type, false otherwise. */ template [[nodiscard]] bool is() const { return type_ == std::type_index(typeid(MessageType)); } /** * @brief Accesses the stored object as a typed reference. * * @tparam MessageType The expected type of the object. * @return A constant reference to the stored object. * @throws std::bad_cast if the type does not match. * @throws std::runtime_error if the message is empty. */ template const MessageType& get() const { if (!is()) throw std::bad_cast(); const void* p = raw_ptr(); if (!p) throw std::runtime_error("AnyMessage: empty pointer"); return *static_cast(p); } /** * @brief Retrieves the raw pointer to the stored object. * * @return A const void* pointing to the stored object (may be non-owning). */ [[nodiscard]] const void* raw_ptr() const; private: const void* ptr_; // non-owning raw pointer (if set) std::shared_ptr owner_; // owning pointer if we made a copy or got shared_ptr std::type_index type_{typeid(void)}; }; /** * @brief Represents a subscription to a message_bus, providing RAII-style unsubscription. */ class subscription { public: /** * @brief Default constructor for an empty subscription. */ subscription() = default; /** * @brief Constructs a subscription for a specific handler and message type. * * @param bus Pointer to the message bus managing the subscription. * @param type The type_index of the message type. * @param id The unique ID of the handler. */ subscription(message_bus* bus, std::type_index type, uint64_t id); // Prohibit copying of subscription objects. subscription(const subscription&) = delete; subscription& operator=(const subscription&) = delete; // Allow move semantics for subscription objects. subscription(subscription&& other) noexcept; subscription& operator=(subscription&& other) noexcept; /** * @brief Destructor that ensures cleanup by unsubscribing from the bus. */ ~subscription(); /** * @brief Unsubscribes the handler from the message bus. */ void unsubscribe(); /** * @brief Checks if the subscription is still valid. * * @return True if the subscription is valid, false otherwise. */ [[nodiscard]] bool valid() const; private: message_bus* bus_ = nullptr; std::type_index type_{typeid(void)}; uint64_t id_ = 0; }; /** * @brief A thread-safe message bus for runtime message publishing and subscription handling. */ class message_bus { public: using HandlerId = uint64_t; /**< Type alias for handler IDs. */ /** * @brief Constructs a new message_bus object. */ message_bus(); /** * @brief Subscribes to messages of a specific type using a free function or lambda. * * @tparam MessageType The type of the message to subscribe to. * @param handler The function to execute when a message of type T is published. * @param filter An optional filter function to determine if the handler should be invoked. * @return A subscription object to manage the handler's registration. */ template subscription subscribe(std::function handler, std::function filter = nullptr) { auto id = next_id_.fetch_add(1, std::memory_order_relaxed); std::unique_lock writeLock(mutex_); auto &vec = handlers_[std::type_index(typeid(MessageType))]; Entry e; e.id = id; e.handler = [h = std::move(handler)](const void* p) { h(*static_cast(p)); }; if (filter) { e.filter = [f = std::move(filter)](const void* p) -> bool { return f(*static_cast(p)); }; } else { e.filter = nullptr; } vec.emplace_back(std::move(e)); return {this, std::type_index(typeid(MessageType)), id}; } /** * @brief Subscribes to messages using a class member function bound to an instance. * * @tparam MessageType The type of the message to subscribe to. * @tparam CallerClass The class of the instance. * @param instance A pointer to the instance. * @param memberFn A pointer to the member function to execute. * @param filter An optional filter function. * @return A subscription object to manage the handler's registration. */ template subscription subscribe(CallerClass* instance, void (CallerClass::*memberFn)(const MessageType&), std::function filter = nullptr) { auto fn = [instance, memberFn](const MessageType& m) { (instance->*memberFn)(m); }; return subscribe(std::function(fn), std::move(filter)); } /** * @brief Subscribes to messages using a member function bound to a shared_ptr instance. * * @tparam MessageType The type of the message to subscribe to. * @tparam CallerClass The class of the shared_ptr instance. * @param instance A shared pointer to the instance. * @param memberFn A pointer to the member function to execute. * @param filter An optional filter function. * @return A subscription object to manage the handler's registration. */ template subscription subscribe(std::shared_ptr instance, void (CallerClass::*memberFn)(const MessageType&), std::function filter = nullptr) { std::weak_ptr w = instance; auto handler = [w, memberFn](const MessageType& m) { if (auto s = w.lock()) { (s.get()->*memberFn)(m); } }; std::function local_filter = nullptr; if (filter) { local_filter = [w, filter = std::move(filter)](const MessageType& m) -> bool { if (w.expired()) { return false; } return filter(m); }; } return subscribe(std::move(handler), std::move(local_filter)); } /** * @brief Unsubscribes a handler from the bus using its type and ID. * * @tparam MessageType The type of the message. * @param id The unique handler ID to unsubscribe. */ template void unsubscribe(const HandlerId id) { unsubscribe(std::type_index(typeid(MessageType)), id); } /** * @brief Unsubscribes a handler by its type index and ID. * * @param type The type_index of the message type. * @param id The unique handler ID to unsubscribe. */ void unsubscribe(std::type_index type, HandlerId id); /** * @brief Unsubscribes a handler by its unique ID, regardless of type. * * @param id The unique handler ID to unsubscribe. */ void unsubscribe(HandlerId id); /** * @brief Publishes a message of a specific type. * * @tparam MessageType The type of the message to publish. * @param msg The message to publish. */ template void publish(const MessageType& msg) const { std::vector snapshot; { std::shared_lock readLock(mutex_); const auto it = handlers_.find(std::type_index(typeid(MessageType))); if (it == handlers_.end()) { return; } snapshot = it->second; // copy list to avoid holding lock during callbacks } for (const auto &e : snapshot) { if (!e.filter || e.filter(&msg)) e.handler(&msg); } } /** * @brief Publishes a generic message object. * * @param msg The message to publish. */ void publish(const message& msg) const; private: friend class subscription; struct Entry { HandlerId id; std::function handler; std::function filter; }; mutable std::shared_mutex mutex_; std::unordered_map> handlers_; std::atomic next_id_; }; inline void subscription::unsubscribe() { if (bus_) { bus_->unsubscribe(type_, id_); bus_ = nullptr; } } } #endif //MATADOR_MESSAGE_BUS_HPP