added a simple message bus

This commit is contained in:
Sascha Kühl 2025-08-09 15:39:13 +02:00
parent 35fad9f47c
commit 60cef7e938
5 changed files with 483 additions and 4 deletions

View File

@ -0,0 +1,222 @@
#ifndef MATADOR_MESSAGE_BUS_HPP
#define MATADOR_MESSAGE_BUS_HPP
#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <typeindex>
#include <typeinfo>
#include <unordered_map>
#include <vector>
#include <algorithm>
#include <stdexcept>
namespace matador::utils {
class message_bus; // forward
class message {
public:
message();
// Owning: copy into an owned shared_ptr (may allocate)
template<typename T>
explicit message(const T& value) {
auto strong = std::make_shared<T>(value); // allocate
owner_ = std::static_pointer_cast<void>(strong); // share ownership as void*
ptr_ = static_cast<const void*>(strong.get());
type_ = std::type_index(typeid(T));
}
// Non-owning: zero-copy reference to an existing object (no allocation)
// Caller must guarantee lifetime of `ref` for the usage duration.
template<typename T>
static message from_ref(const T& ref) {
message m;
m.ptr_ = static_cast<const void*>(std::addressof(ref));
m.type_ = std::type_index(typeid(T));
// owner_ remains null => non-owning
return m;
}
// Owning from shared_ptr (no extra copy; shares ownership)
template<typename T>
static message from_shared(std::shared_ptr<T> sp) {
message m;
m.owner_ = std::static_pointer_cast<void>(sp);
m.ptr_ = static_cast<const void*>(sp.get());
m.type_ = std::type_index(typeid(T));
return m;
}
[[nodiscard]] bool empty() const;
[[nodiscard]] std::type_index type() const;
template<typename T>
[[nodiscard]] bool is() const { return type_ == std::type_index(typeid(T)); }
// Access as typed reference. Works for both owning and non-owning.
// Throws std::bad_cast if type mismatch.
template<typename T>
const T& get() const {
if (!is<T>()) throw std::bad_cast();
const void* p = msg_ptr();
if (!p) throw std::runtime_error("AnyMessage: empty pointer");
return *static_cast<const T*>(p);
}
// Return the raw const void* pointer to the stored object (may be non-owning)
[[nodiscard]] const void* msg_ptr() const;
private:
const void* ptr_; // non-owning raw pointer (if set)
std::shared_ptr<void> owner_; // owning pointer if we made a copy or got shared_ptr
std::type_index type_{typeid(void)};
};
// Forward declare Subscription so MessageBus can return it.
class subscription {
public:
subscription() = default;
subscription(message_bus* bus, std::type_index type, uint64_t id);
// Move-only (non-copyable)
subscription(const subscription&) = delete;
subscription& operator=(const subscription&) = delete;
subscription(subscription&& other) noexcept;
subscription& operator=(subscription&& other) noexcept;
~subscription();
void unsubscribe();
[[nodiscard]] bool valid() const;
private:
message_bus* bus_ = nullptr;
std::type_index type_{typeid(void)};
uint64_t id_ = 0;
};
// === MessageBus: runtime-registered, thread-safe ===
class message_bus {
public:
using HandlerId = uint64_t;
message_bus() : next_id_{1} {}
// Subscribe with a free function or lambda
template<typename T>
subscription subscribe(std::function<void(const T&)> handler,
std::function<bool(const T&)> filter = nullptr)
{
auto id = next_id_.fetch_add(1, std::memory_order_relaxed);
std::unique_lock writeLock(mutex_);
auto &vec = handlers_[std::type_index(typeid(T))];
Entry e;
e.id = id;
e.handler = [h = std::move(handler)](const void* p) {
h(*static_cast<const T*>(p));
};
if (filter) {
e.filter = [f = std::move(filter)](const void* p) -> bool {
return f(*static_cast<const T*>(p));
};
} else {
e.filter = nullptr;
}
vec.emplace_back(std::move(e));
return {this, std::type_index(typeid(T)), id};
}
// Subscribe raw pointer instance; caller ensures lifetime
template<typename T, typename C>
subscription subscribe(C* instance, void (C::*memberFn)(const T&),
std::function<bool(const T&)> filter = nullptr)
{
auto fn = [instance, memberFn](const T& m) { (instance->*memberFn)(m); };
return subscribe<T>(std::function<void(const T&)>(fn), std::move(filter));
}
// Subscribe shared_ptr instance (safe)
template<typename T, typename C>
subscription subscribe(std::shared_ptr<C> instance, void (C::*memberFn)(const T&),
std::function<bool(const T&)> filter = nullptr)
{
std::weak_ptr<C> w = instance;
auto handler = [w, memberFn](const T& m) {
if (auto s = w.lock()) {
(s.get()->*memberFn)(m);
}
};
std::function<bool(const T&)> filt = nullptr;
if (filter) {
filt = [w, filter = std::move(filter)](const T& m) -> bool {
if (w.expired()) {
return false;
}
return filter(m);
};
}
return subscribe<T>(std::move(handler), std::move(filt));
}
// Unsubscribe by type-specific id (RAII calls this)
template<typename T>
void unsubscribe(const HandlerId id) {
unsubscribe(std::type_index(typeid(T)), id);
}
// Unsubscribe by explicit type_index and id
void unsubscribe(std::type_index type, HandlerId id);
// Unsubscribe by id regardless of type (O(#types) scan)
void unsubscribe(HandlerId id);
// Publish a typed message (fast)
template<typename T>
void publish(const T& msg) const {
std::vector<Entry> snapshot;
{
std::shared_lock readLock(mutex_);
const auto it = handlers_.find(std::type_index(typeid(T)));
if (it == handlers_.end()) {
return;
}
snapshot = it->second; // copy list to avoid holding lock during callbacks
}
for (const auto &e : snapshot) {
if (!e.filter || e.filter(&msg)) e.handler(&msg);
}
}
// Publish using AnyMessage (zero-copy if AnyMessage is non-owning)
void publish(const message& msg) const;
private:
friend class subscription;
struct Entry {
HandlerId id;
std::function<void(const void*)> handler;
std::function<bool(const void*)> filter;
};
mutable std::shared_mutex mutex_;
std::unordered_map<std::type_index, std::vector<Entry>> handlers_;
std::atomic<HandlerId> next_id_;
};
// RAII unsubscribe implementation
inline void subscription::unsubscribe() {
if (bus_) {
bus_->unsubscribe(type_, id_);
bus_ = nullptr;
}
}
}
#endif //MATADOR_MESSAGE_BUS_HPP

View File

@ -13,8 +13,8 @@ add_library(matador-core STATIC
../../include/matador/net/reactor.hpp
../../include/matador/net/select_fd_sets.hpp
../../include/matador/net/socket_interrupter.hpp
../../include/matador/object/attribute_definition_generator.hpp
../../include/matador/object/attribute_definition.hpp
../../include/matador/object/attribute_definition_generator.hpp
../../include/matador/object/basic_object_info.hpp
../../include/matador/object/error_code.hpp
../../include/matador/object/foreign_node_completer.hpp
@ -30,6 +30,7 @@ add_library(matador-core STATIC
../../include/matador/object/schema.hpp
../../include/matador/object/schema_node.hpp
../../include/matador/object/schema_node_iterator.hpp
../../include/matador/sql/statement_cache.hpp
../../include/matador/utils/access.hpp
../../include/matador/utils/attribute_reader.hpp
../../include/matador/utils/attribute_writer.hpp
@ -47,13 +48,14 @@ add_library(matador-core STATIC
../../include/matador/utils/errors.hpp
../../include/matador/utils/export.hpp
../../include/matador/utils/fetch_type.hpp
../../include/matador/utils/file.hpp
../../include/matador/utils/field_attributes.hpp
../../include/matador/utils/file.hpp
../../include/matador/utils/foreign_attributes.hpp
../../include/matador/utils/identifier.hpp
../../include/matador/utils/leader_follower_thread_pool.hpp
../../include/matador/utils/library.hpp
../../include/matador/utils/macro_map.hpp
../../include/matador/utils/message_bus.hpp
../../include/matador/utils/os.hpp
../../include/matador/utils/placeholder.hpp
../../include/matador/utils/primary_key_attribute.hpp
@ -73,8 +75,8 @@ add_library(matador-core STATIC
logger/log_manager.cpp
logger/logger.cpp
logger/rotating_file_sink.cpp
object/attribute_definition_generator.cpp
object/attribute_definition.cpp
object/attribute_definition_generator.cpp
object/basic_object_info.cpp
object/error_code.cpp
object/foreign_node_completer.cpp
@ -93,6 +95,7 @@ add_library(matador-core STATIC
utils/identifier.cpp
utils/leader_follower_thread_pool.cpp
utils/library.cpp
utils/message_bus.cpp
utils/os.cpp
utils/primary_key_attribute.cpp
utils/string.cpp
@ -101,7 +104,6 @@ add_library(matador-core STATIC
utils/uuid.cpp
utils/value.cpp
utils/version.cpp
../../include/matador/sql/statement_cache.hpp
)
target_link_libraries(matador-core ${CMAKE_DL_LIBS})

View File

@ -0,0 +1,97 @@
#include "matador/utils/message_bus.hpp"
namespace matador::utils {
message::message()
: ptr_(nullptr)
, owner_(nullptr)
, type_(typeid(void)) {}
bool message::empty() const {
return ptr_ == nullptr && !owner_;
}
std::type_index message::type() const {
return type_;
}
const void * message::msg_ptr() const {
return owner_ ? owner_.get() : ptr_;
}
subscription::subscription(message_bus *bus, const std::type_index type, const uint64_t id)
: bus_(bus)
, type_(type)
, id_(id) {}
subscription::subscription(subscription &&other) noexcept
: bus_(other.bus_)
, type_(other.type_)
, id_(other.id_) {
other.bus_ = nullptr;
}
subscription& subscription::operator=(subscription &&other) noexcept {
if (this != &other) {
unsubscribe();
bus_ = other.bus_;
type_ = other.type_;
id_ = other.id_;
other.bus_ = nullptr;
}
return *this;
}
subscription::~subscription() {
unsubscribe();
}
bool subscription::valid() const {
return bus_ != nullptr;
}
void message_bus::unsubscribe(const std::type_index type, HandlerId id) {
std::unique_lock writeLock(mutex_);
const auto it = handlers_.find(type);
if (it == handlers_.end()) {
return;
}
auto &handler_vector = it->second;
handler_vector.erase(std::remove_if(handler_vector.begin(), handler_vector.end(),
[id](const Entry &e) { return e.id == id; }),
handler_vector.end());
if (handler_vector.empty()) {
handlers_.erase(it);
}
}
void message_bus::unsubscribe(HandlerId id) {
std::unique_lock writeLock(mutex_);
for (auto it = handlers_.begin(); it != handlers_.end(); ) {
auto &handler_vector = it->second;
handler_vector.erase(std::remove_if(handler_vector.begin(), handler_vector.end(),
[id](const Entry &e) { return e.id == id; }),
handler_vector.end());
if (handler_vector.empty()) it = handlers_.erase(it);
else ++it;
}
}
void message_bus::publish(const message &msg) const {
if (msg.empty()) {
return;
}
std::vector<Entry> snapshot;
{
std::shared_lock readLock(mutex_);
const auto it = handlers_.find(msg.type());
if (it == handlers_.end()) {
return;
}
snapshot = it->second;
}
const void* p = msg.msg_ptr();
for (const auto &e : snapshot) {
if (!e.filter || e.filter(p)) e.handler(p);
}
}
}

View File

@ -17,6 +17,7 @@ add_executable(CoreTests
object/AttributeDefinitionGeneratorTest.cpp
object/PrimaryKeyResolverTest.cpp
object/SchemaTest.cpp
utils/MessageBusTest.cpp
)
target_link_libraries(CoreTests matador-core Catch2::Catch2WithMain)

View File

@ -0,0 +1,157 @@
#include <catch2/catch_test_macros.hpp>
#include "matador/utils/message_bus.hpp"
#include <thread>
struct TestEvent { int value; };
struct FilterEvent { int value; };
struct ClassEvent { std::string msg; };
class Receiver {
public:
void on_event(const ClassEvent& e) {
received.push_back(e.msg);
}
std::vector<std::string> received;
};
using namespace matador::utils;
TEST_CASE("Basic publish/subscribe works", "[MessageBus]") {
message_bus bus;
int counter = 0;
auto sub = bus.subscribe<TestEvent>([&](const TestEvent& e) {
counter += e.value;
});
bus.publish(TestEvent{1});
bus.publish(TestEvent{2});
REQUIRE(counter == 3);
}
TEST_CASE("Filtering works", "[MessageBus]") {
message_bus bus;
int counter = 0;
auto sub = bus.subscribe<FilterEvent>(
[&](const FilterEvent& e) { counter += e.value; },
[](const FilterEvent& e) { return e.value % 2 == 0; } // only even
);
bus.publish(FilterEvent{1}); // should be filtered out
bus.publish(FilterEvent{2});
bus.publish(FilterEvent{3}); // filtered
bus.publish(FilterEvent{4});
REQUIRE(counter == 6); // only 2 + 4
}
TEST_CASE("Member function subscription works", "[MessageBus]") {
message_bus bus;
Receiver r;
auto sub = bus.subscribe<ClassEvent>(&r, &Receiver::on_event);
bus.publish(ClassEvent{"hello"});
bus.publish(ClassEvent{"world"});
REQUIRE(r.received.size() == 2);
REQUIRE(r.received[0] == "hello");
REQUIRE(r.received[1] == "world");
}
TEST_CASE("Shared_ptr instance subscription works", "[MessageBus]") {
message_bus bus;
const auto r = std::make_shared<Receiver>();
auto sub = bus.subscribe<ClassEvent>(r, &Receiver::on_event);
bus.publish(ClassEvent{"foo"});
REQUIRE(r->received.size() == 1);
REQUIRE(r->received[0] == "foo");
}
TEST_CASE("RAII unsubscription works", "[MessageBus]") {
message_bus bus;
int counter = 0;
{
auto sub = bus.subscribe<TestEvent>([&](const TestEvent& e) {
counter += e.value;
});
bus.publish(TestEvent{5});
REQUIRE(counter == 5);
}
// sub goes out of scope, unsubscribed
bus.publish(TestEvent{5});
REQUIRE(counter == 5);
}
TEST_CASE("Type-erased AnyMessage publish works", "[MessageBus]") {
message_bus bus;
int counter = 0;
auto sub = bus.subscribe<TestEvent>([&](const TestEvent& e) {
counter += e.value;
});
const message msg(TestEvent{10});
bus.publish(msg);
REQUIRE(counter == 10);
}
TEST_CASE("Multiple subscribers all receive messages", "[MessageBus]") {
message_bus bus;
int a = 0, b = 0;
auto sub1 = bus.subscribe<TestEvent>([&](const TestEvent& e) { a += e.value; });
auto sub2 = bus.subscribe<TestEvent>([&](const TestEvent& e) { b += e.value; });
bus.publish(TestEvent{3});
REQUIRE(a == 3);
REQUIRE(b == 3);
}
TEST_CASE("Stress test with multi-threaded publish and subscribe", "[MessageBus][stress]") {
message_bus bus;
std::atomic<int> counter{0};
constexpr int numThreads = 8;
constexpr int eventsPerThread = 5000;
// Subscribe before launching publishers
auto sub = bus.subscribe<TestEvent>([&](const TestEvent& e) {
counter.fetch_add(e.value, std::memory_order_relaxed);
});
std::vector<std::thread> threads;
threads.reserve(numThreads);
// Publisher threads
for (int t = 0; t < numThreads; ++t) {
threads.emplace_back([&] {
for (int i = 0; i < eventsPerThread; ++i) {
bus.publish(TestEvent{1});
}
});
}
// Concurrent subscriber thread
threads.emplace_back([&] {
for (int i = 0; i < 1000; ++i) {
auto tempSub = bus.subscribe<TestEvent>([&](const TestEvent& e) {
counter.fetch_add(e.value, std::memory_order_relaxed);
});
std::this_thread::yield(); // simulate churn
}
});
for (auto& th : threads) {
th.join();
}
REQUIRE(counter.load(std::memory_order_relaxed) >= numThreads * eventsPerThread);
}