From 60cef7e938a0307c7deaea6d30fe85bcb16759c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20K=C3=BChl?= Date: Sat, 9 Aug 2025 15:39:13 +0200 Subject: [PATCH] added a simple message bus --- include/matador/utils/message_bus.hpp | 222 ++++++++++++++++++++++++++ source/core/CMakeLists.txt | 10 +- source/core/utils/message_bus.cpp | 97 +++++++++++ test/core/CMakeLists.txt | 1 + test/core/utils/MessageBusTest.cpp | 157 ++++++++++++++++++ 5 files changed, 483 insertions(+), 4 deletions(-) create mode 100644 include/matador/utils/message_bus.hpp create mode 100644 source/core/utils/message_bus.cpp create mode 100644 test/core/utils/MessageBusTest.cpp diff --git a/include/matador/utils/message_bus.hpp b/include/matador/utils/message_bus.hpp new file mode 100644 index 0000000..e99d74d --- /dev/null +++ b/include/matador/utils/message_bus.hpp @@ -0,0 +1,222 @@ +#ifndef MATADOR_MESSAGE_BUS_HPP +#define MATADOR_MESSAGE_BUS_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace matador::utils { +class message_bus; // forward + +class message { +public: + message(); + + // Owning: copy into an owned shared_ptr (may allocate) + template + explicit message(const T& value) { + auto strong = std::make_shared(value); // allocate + owner_ = std::static_pointer_cast(strong); // share ownership as void* + ptr_ = static_cast(strong.get()); + type_ = std::type_index(typeid(T)); + } + + // Non-owning: zero-copy reference to an existing object (no allocation) + // Caller must guarantee lifetime of `ref` for the usage duration. + template + static message from_ref(const T& ref) { + message m; + m.ptr_ = static_cast(std::addressof(ref)); + m.type_ = std::type_index(typeid(T)); + // owner_ remains null => non-owning + return m; + } + + // Owning from shared_ptr (no extra copy; shares ownership) + template + static message from_shared(std::shared_ptr sp) { + message m; + m.owner_ = std::static_pointer_cast(sp); + m.ptr_ = static_cast(sp.get()); + m.type_ = std::type_index(typeid(T)); + return m; + } + + [[nodiscard]] bool empty() const; + [[nodiscard]] std::type_index type() const; + + template + [[nodiscard]] bool is() const { return type_ == std::type_index(typeid(T)); } + + // Access as typed reference. Works for both owning and non-owning. + // Throws std::bad_cast if type mismatch. + template + const T& get() const { + if (!is()) throw std::bad_cast(); + const void* p = msg_ptr(); + if (!p) throw std::runtime_error("AnyMessage: empty pointer"); + return *static_cast(p); + } + + // Return the raw const void* pointer to the stored object (may be non-owning) + [[nodiscard]] const void* msg_ptr() const; + +private: + const void* ptr_; // non-owning raw pointer (if set) + std::shared_ptr owner_; // owning pointer if we made a copy or got shared_ptr + std::type_index type_{typeid(void)}; +}; + +// Forward declare Subscription so MessageBus can return it. +class subscription { +public: + subscription() = default; + subscription(message_bus* bus, std::type_index type, uint64_t id); + + // Move-only (non-copyable) + subscription(const subscription&) = delete; + subscription& operator=(const subscription&) = delete; + subscription(subscription&& other) noexcept; + subscription& operator=(subscription&& other) noexcept; + + ~subscription(); + + void unsubscribe(); + + [[nodiscard]] bool valid() const; + +private: + message_bus* bus_ = nullptr; + std::type_index type_{typeid(void)}; + uint64_t id_ = 0; +}; + +// === MessageBus: runtime-registered, thread-safe === +class message_bus { +public: + using HandlerId = uint64_t; + + message_bus() : next_id_{1} {} + + // Subscribe with a free function or lambda + template + subscription subscribe(std::function handler, + std::function filter = nullptr) + { + auto id = next_id_.fetch_add(1, std::memory_order_relaxed); + std::unique_lock writeLock(mutex_); + + auto &vec = handlers_[std::type_index(typeid(T))]; + Entry e; + e.id = id; + e.handler = [h = std::move(handler)](const void* p) { + h(*static_cast(p)); + }; + if (filter) { + e.filter = [f = std::move(filter)](const void* p) -> bool { + return f(*static_cast(p)); + }; + } else { + e.filter = nullptr; + } + vec.emplace_back(std::move(e)); + return {this, std::type_index(typeid(T)), id}; + } + + // Subscribe raw pointer instance; caller ensures lifetime + template + subscription subscribe(C* instance, void (C::*memberFn)(const T&), + std::function filter = nullptr) + { + auto fn = [instance, memberFn](const T& m) { (instance->*memberFn)(m); }; + return subscribe(std::function(fn), std::move(filter)); + } + + // Subscribe shared_ptr instance (safe) + template + subscription subscribe(std::shared_ptr instance, void (C::*memberFn)(const T&), + std::function filter = nullptr) + { + std::weak_ptr w = instance; + auto handler = [w, memberFn](const T& m) { + if (auto s = w.lock()) { + (s.get()->*memberFn)(m); + } + }; + + std::function filt = nullptr; + if (filter) { + filt = [w, filter = std::move(filter)](const T& m) -> bool { + if (w.expired()) { + return false; + } + return filter(m); + }; + } + return subscribe(std::move(handler), std::move(filt)); + } + + // Unsubscribe by type-specific id (RAII calls this) + template + void unsubscribe(const HandlerId id) { + unsubscribe(std::type_index(typeid(T)), id); + } + + // Unsubscribe by explicit type_index and id + void unsubscribe(std::type_index type, HandlerId id); + + // Unsubscribe by id regardless of type (O(#types) scan) + void unsubscribe(HandlerId id); + + // Publish a typed message (fast) + template + void publish(const T& msg) const { + std::vector snapshot; + { + std::shared_lock readLock(mutex_); + const auto it = handlers_.find(std::type_index(typeid(T))); + if (it == handlers_.end()) { + return; + } + snapshot = it->second; // copy list to avoid holding lock during callbacks + } + for (const auto &e : snapshot) { + if (!e.filter || e.filter(&msg)) e.handler(&msg); + } + } + + // Publish using AnyMessage (zero-copy if AnyMessage is non-owning) + void publish(const message& msg) const; + +private: + friend class subscription; + + struct Entry { + HandlerId id; + std::function handler; + std::function filter; + }; + + mutable std::shared_mutex mutex_; + std::unordered_map> handlers_; + std::atomic next_id_; +}; + +// RAII unsubscribe implementation +inline void subscription::unsubscribe() { + if (bus_) { + bus_->unsubscribe(type_, id_); + bus_ = nullptr; + } +} +} + +#endif //MATADOR_MESSAGE_BUS_HPP \ No newline at end of file diff --git a/source/core/CMakeLists.txt b/source/core/CMakeLists.txt index c22204c..5bebb12 100644 --- a/source/core/CMakeLists.txt +++ b/source/core/CMakeLists.txt @@ -13,8 +13,8 @@ add_library(matador-core STATIC ../../include/matador/net/reactor.hpp ../../include/matador/net/select_fd_sets.hpp ../../include/matador/net/socket_interrupter.hpp - ../../include/matador/object/attribute_definition_generator.hpp ../../include/matador/object/attribute_definition.hpp + ../../include/matador/object/attribute_definition_generator.hpp ../../include/matador/object/basic_object_info.hpp ../../include/matador/object/error_code.hpp ../../include/matador/object/foreign_node_completer.hpp @@ -30,6 +30,7 @@ add_library(matador-core STATIC ../../include/matador/object/schema.hpp ../../include/matador/object/schema_node.hpp ../../include/matador/object/schema_node_iterator.hpp + ../../include/matador/sql/statement_cache.hpp ../../include/matador/utils/access.hpp ../../include/matador/utils/attribute_reader.hpp ../../include/matador/utils/attribute_writer.hpp @@ -47,13 +48,14 @@ add_library(matador-core STATIC ../../include/matador/utils/errors.hpp ../../include/matador/utils/export.hpp ../../include/matador/utils/fetch_type.hpp - ../../include/matador/utils/file.hpp ../../include/matador/utils/field_attributes.hpp + ../../include/matador/utils/file.hpp ../../include/matador/utils/foreign_attributes.hpp ../../include/matador/utils/identifier.hpp ../../include/matador/utils/leader_follower_thread_pool.hpp ../../include/matador/utils/library.hpp ../../include/matador/utils/macro_map.hpp + ../../include/matador/utils/message_bus.hpp ../../include/matador/utils/os.hpp ../../include/matador/utils/placeholder.hpp ../../include/matador/utils/primary_key_attribute.hpp @@ -73,8 +75,8 @@ add_library(matador-core STATIC logger/log_manager.cpp logger/logger.cpp logger/rotating_file_sink.cpp - object/attribute_definition_generator.cpp object/attribute_definition.cpp + object/attribute_definition_generator.cpp object/basic_object_info.cpp object/error_code.cpp object/foreign_node_completer.cpp @@ -93,6 +95,7 @@ add_library(matador-core STATIC utils/identifier.cpp utils/leader_follower_thread_pool.cpp utils/library.cpp + utils/message_bus.cpp utils/os.cpp utils/primary_key_attribute.cpp utils/string.cpp @@ -101,7 +104,6 @@ add_library(matador-core STATIC utils/uuid.cpp utils/value.cpp utils/version.cpp - ../../include/matador/sql/statement_cache.hpp ) target_link_libraries(matador-core ${CMAKE_DL_LIBS}) diff --git a/source/core/utils/message_bus.cpp b/source/core/utils/message_bus.cpp new file mode 100644 index 0000000..e01351c --- /dev/null +++ b/source/core/utils/message_bus.cpp @@ -0,0 +1,97 @@ +#include "matador/utils/message_bus.hpp" + +namespace matador::utils { +message::message() +: ptr_(nullptr) +, owner_(nullptr) +, type_(typeid(void)) {} + +bool message::empty() const { + return ptr_ == nullptr && !owner_; +} + +std::type_index message::type() const { + return type_; +} + +const void * message::msg_ptr() const { + return owner_ ? owner_.get() : ptr_; +} + +subscription::subscription(message_bus *bus, const std::type_index type, const uint64_t id) +: bus_(bus) +, type_(type) +, id_(id) {} + +subscription::subscription(subscription &&other) noexcept +: bus_(other.bus_) +, type_(other.type_) +, id_(other.id_) { + other.bus_ = nullptr; +} + +subscription& subscription::operator=(subscription &&other) noexcept { + if (this != &other) { + unsubscribe(); + bus_ = other.bus_; + type_ = other.type_; + id_ = other.id_; + other.bus_ = nullptr; + } + return *this; +} + +subscription::~subscription() { + unsubscribe(); +} + +bool subscription::valid() const { + return bus_ != nullptr; +} + +void message_bus::unsubscribe(const std::type_index type, HandlerId id) { + std::unique_lock writeLock(mutex_); + const auto it = handlers_.find(type); + if (it == handlers_.end()) { + return; + } + auto &handler_vector = it->second; + handler_vector.erase(std::remove_if(handler_vector.begin(), handler_vector.end(), + [id](const Entry &e) { return e.id == id; }), + handler_vector.end()); + if (handler_vector.empty()) { + handlers_.erase(it); + } +} + +void message_bus::unsubscribe(HandlerId id) { + std::unique_lock writeLock(mutex_); + for (auto it = handlers_.begin(); it != handlers_.end(); ) { + auto &handler_vector = it->second; + handler_vector.erase(std::remove_if(handler_vector.begin(), handler_vector.end(), + [id](const Entry &e) { return e.id == id; }), + handler_vector.end()); + if (handler_vector.empty()) it = handlers_.erase(it); + else ++it; + } +} + +void message_bus::publish(const message &msg) const { + if (msg.empty()) { + return; + } + std::vector snapshot; + { + std::shared_lock readLock(mutex_); + const auto it = handlers_.find(msg.type()); + if (it == handlers_.end()) { + return; + } + snapshot = it->second; + } + const void* p = msg.msg_ptr(); + for (const auto &e : snapshot) { + if (!e.filter || e.filter(p)) e.handler(p); + } +} +} diff --git a/test/core/CMakeLists.txt b/test/core/CMakeLists.txt index f4e0205..6c02879 100644 --- a/test/core/CMakeLists.txt +++ b/test/core/CMakeLists.txt @@ -17,6 +17,7 @@ add_executable(CoreTests object/AttributeDefinitionGeneratorTest.cpp object/PrimaryKeyResolverTest.cpp object/SchemaTest.cpp + utils/MessageBusTest.cpp ) target_link_libraries(CoreTests matador-core Catch2::Catch2WithMain) diff --git a/test/core/utils/MessageBusTest.cpp b/test/core/utils/MessageBusTest.cpp new file mode 100644 index 0000000..9282a83 --- /dev/null +++ b/test/core/utils/MessageBusTest.cpp @@ -0,0 +1,157 @@ +#include + +#include "matador/utils/message_bus.hpp" + +#include + +struct TestEvent { int value; }; +struct FilterEvent { int value; }; +struct ClassEvent { std::string msg; }; + +class Receiver { +public: + void on_event(const ClassEvent& e) { + received.push_back(e.msg); + } + std::vector received; +}; + +using namespace matador::utils; + +TEST_CASE("Basic publish/subscribe works", "[MessageBus]") { + message_bus bus; + int counter = 0; + + auto sub = bus.subscribe([&](const TestEvent& e) { + counter += e.value; + }); + + bus.publish(TestEvent{1}); + bus.publish(TestEvent{2}); + + REQUIRE(counter == 3); +} + +TEST_CASE("Filtering works", "[MessageBus]") { + message_bus bus; + int counter = 0; + + auto sub = bus.subscribe( + [&](const FilterEvent& e) { counter += e.value; }, + [](const FilterEvent& e) { return e.value % 2 == 0; } // only even + ); + + bus.publish(FilterEvent{1}); // should be filtered out + bus.publish(FilterEvent{2}); + bus.publish(FilterEvent{3}); // filtered + bus.publish(FilterEvent{4}); + + REQUIRE(counter == 6); // only 2 + 4 +} + +TEST_CASE("Member function subscription works", "[MessageBus]") { + message_bus bus; + Receiver r; + + auto sub = bus.subscribe(&r, &Receiver::on_event); + bus.publish(ClassEvent{"hello"}); + bus.publish(ClassEvent{"world"}); + + REQUIRE(r.received.size() == 2); + REQUIRE(r.received[0] == "hello"); + REQUIRE(r.received[1] == "world"); +} + +TEST_CASE("Shared_ptr instance subscription works", "[MessageBus]") { + message_bus bus; + const auto r = std::make_shared(); + + auto sub = bus.subscribe(r, &Receiver::on_event); + bus.publish(ClassEvent{"foo"}); + + REQUIRE(r->received.size() == 1); + REQUIRE(r->received[0] == "foo"); +} + +TEST_CASE("RAII unsubscription works", "[MessageBus]") { + message_bus bus; + int counter = 0; + + { + auto sub = bus.subscribe([&](const TestEvent& e) { + counter += e.value; + }); + bus.publish(TestEvent{5}); + REQUIRE(counter == 5); + } + + // sub goes out of scope, unsubscribed + bus.publish(TestEvent{5}); + REQUIRE(counter == 5); +} + +TEST_CASE("Type-erased AnyMessage publish works", "[MessageBus]") { + message_bus bus; + int counter = 0; + + auto sub = bus.subscribe([&](const TestEvent& e) { + counter += e.value; + }); + + const message msg(TestEvent{10}); + bus.publish(msg); + + REQUIRE(counter == 10); +} + +TEST_CASE("Multiple subscribers all receive messages", "[MessageBus]") { + message_bus bus; + int a = 0, b = 0; + + auto sub1 = bus.subscribe([&](const TestEvent& e) { a += e.value; }); + auto sub2 = bus.subscribe([&](const TestEvent& e) { b += e.value; }); + + bus.publish(TestEvent{3}); + REQUIRE(a == 3); + REQUIRE(b == 3); +} + +TEST_CASE("Stress test with multi-threaded publish and subscribe", "[MessageBus][stress]") { + message_bus bus; + std::atomic counter{0}; + constexpr int numThreads = 8; + constexpr int eventsPerThread = 5000; + + // Subscribe before launching publishers + auto sub = bus.subscribe([&](const TestEvent& e) { + counter.fetch_add(e.value, std::memory_order_relaxed); + }); + + std::vector threads; + threads.reserve(numThreads); + + // Publisher threads + for (int t = 0; t < numThreads; ++t) { + threads.emplace_back([&] { + for (int i = 0; i < eventsPerThread; ++i) { + bus.publish(TestEvent{1}); + } + }); + } + + // Concurrent subscriber thread + threads.emplace_back([&] { + for (int i = 0; i < 1000; ++i) { + auto tempSub = bus.subscribe([&](const TestEvent& e) { + counter.fetch_add(e.value, std::memory_order_relaxed); + }); + std::this_thread::yield(); // simulate churn + } + }); + + for (auto& th : threads) { + th.join(); + } + + REQUIRE(counter.load(std::memory_order_relaxed) >= numThreads * eventsPerThread); +}