#ifndef MATADOR_MESSAGE_BUS_HPP #define MATADOR_MESSAGE_BUS_HPP #include #include #include #include #include #include #include #include #include #include #include namespace matador::utils { class message_bus; // forward class message { public: message(); // Owning: copy into an owned shared_ptr (may allocate) template explicit message(const T& value) { auto strong = std::make_shared(value); // allocate owner_ = std::static_pointer_cast(strong); // share ownership as void* ptr_ = static_cast(strong.get()); type_ = std::type_index(typeid(T)); } // Non-owning: zero-copy reference to an existing object (no allocation) // Caller must guarantee lifetime of `ref` for the usage duration. template static message from_ref(const T& ref) { message m; m.ptr_ = static_cast(std::addressof(ref)); m.type_ = std::type_index(typeid(T)); // owner_ remains null => non-owning return m; } // Owning from shared_ptr (no extra copy; shares ownership) template static message from_shared(std::shared_ptr sp) { message m; m.owner_ = std::static_pointer_cast(sp); m.ptr_ = static_cast(sp.get()); m.type_ = std::type_index(typeid(T)); return m; } [[nodiscard]] bool empty() const; [[nodiscard]] std::type_index type() const; template [[nodiscard]] bool is() const { return type_ == std::type_index(typeid(T)); } // Access as typed reference. Works for both owning and non-owning. // Throws std::bad_cast if type mismatch. template const T& get() const { if (!is()) throw std::bad_cast(); const void* p = msg_ptr(); if (!p) throw std::runtime_error("AnyMessage: empty pointer"); return *static_cast(p); } // Return the raw const void* pointer to the stored object (may be non-owning) [[nodiscard]] const void* msg_ptr() const; private: const void* ptr_; // non-owning raw pointer (if set) std::shared_ptr owner_; // owning pointer if we made a copy or got shared_ptr std::type_index type_{typeid(void)}; }; // Forward declare Subscription so MessageBus can return it. class subscription { public: subscription() = default; subscription(message_bus* bus, std::type_index type, uint64_t id); // Move-only (non-copyable) subscription(const subscription&) = delete; subscription& operator=(const subscription&) = delete; subscription(subscription&& other) noexcept; subscription& operator=(subscription&& other) noexcept; ~subscription(); void unsubscribe(); [[nodiscard]] bool valid() const; private: message_bus* bus_ = nullptr; std::type_index type_{typeid(void)}; uint64_t id_ = 0; }; // === MessageBus: runtime-registered, thread-safe === class message_bus { public: using HandlerId = uint64_t; message_bus() : next_id_{1} {} // Subscribe with a free function or lambda template subscription subscribe(std::function handler, std::function filter = nullptr) { auto id = next_id_.fetch_add(1, std::memory_order_relaxed); std::unique_lock writeLock(mutex_); auto &vec = handlers_[std::type_index(typeid(T))]; Entry e; e.id = id; e.handler = [h = std::move(handler)](const void* p) { h(*static_cast(p)); }; if (filter) { e.filter = [f = std::move(filter)](const void* p) -> bool { return f(*static_cast(p)); }; } else { e.filter = nullptr; } vec.emplace_back(std::move(e)); return {this, std::type_index(typeid(T)), id}; } // Subscribe raw pointer instance; caller ensures lifetime template subscription subscribe(C* instance, void (C::*memberFn)(const T&), std::function filter = nullptr) { auto fn = [instance, memberFn](const T& m) { (instance->*memberFn)(m); }; return subscribe(std::function(fn), std::move(filter)); } // Subscribe shared_ptr instance (safe) template subscription subscribe(std::shared_ptr instance, void (C::*memberFn)(const T&), std::function filter = nullptr) { std::weak_ptr w = instance; auto handler = [w, memberFn](const T& m) { if (auto s = w.lock()) { (s.get()->*memberFn)(m); } }; std::function local_filter = nullptr; if (filter) { local_filter = [w, filter = std::move(filter)](const T& m) -> bool { if (w.expired()) { return false; } return filter(m); }; } return subscribe(std::move(handler), std::move(local_filter)); } // Unsubscribe by type-specific id (RAII calls this) template void unsubscribe(const HandlerId id) { unsubscribe(std::type_index(typeid(T)), id); } // Unsubscribe by explicit type_index and id void unsubscribe(std::type_index type, HandlerId id); // Unsubscribe by id regardless of type (O(#types) scan) void unsubscribe(HandlerId id); // Publish a typed message (fast) template void publish(const T& msg) const { std::vector snapshot; { std::shared_lock readLock(mutex_); const auto it = handlers_.find(std::type_index(typeid(T))); if (it == handlers_.end()) { return; } snapshot = it->second; // copy list to avoid holding lock during callbacks } for (const auto &e : snapshot) { if (!e.filter || e.filter(&msg)) e.handler(&msg); } } // Publish using AnyMessage (zero-copy if AnyMessage is non-owning) void publish(const message& msg) const; private: friend class subscription; struct Entry { HandlerId id; std::function handler; std::function filter; }; mutable std::shared_mutex mutex_; std::unordered_map> handlers_; std::atomic next_id_; }; // RAII unsubscribe implementation inline void subscription::unsubscribe() { if (bus_) { bus_->unsubscribe(type_, id_); bus_ = nullptr; } } } #endif //MATADOR_MESSAGE_BUS_HPP