#include #include "matador/utils/message_bus.hpp" #include struct TestEvent { int value; }; struct FilterEvent { int value; }; struct ClassEvent { std::string msg; }; class Receiver { public: void on_event(const ClassEvent& e) { received.push_back(e.msg); } std::vector received; }; using namespace matador::utils; TEST_CASE("Basic publish/subscribe works", "[MessageBus]") { message_bus bus; int counter = 0; auto sub = bus.subscribe([&](const TestEvent& e) { counter += e.value; }); bus.publish(TestEvent{1}); bus.publish(TestEvent{2}); REQUIRE(counter == 3); } TEST_CASE("Filtering works", "[MessageBus]") { message_bus bus; int counter = 0; auto sub = bus.subscribe( [&](const FilterEvent& e) { counter += e.value; }, [](const FilterEvent& e) { return e.value % 2 == 0; } // only even ); bus.publish(FilterEvent{1}); // should be filtered out bus.publish(FilterEvent{2}); bus.publish(FilterEvent{3}); // filtered bus.publish(FilterEvent{4}); REQUIRE(counter == 6); // only 2 + 4 } TEST_CASE("Member function subscription works", "[MessageBus]") { message_bus bus; Receiver r; auto sub = bus.subscribe(&r, &Receiver::on_event); bus.publish(ClassEvent{"hello"}); bus.publish(ClassEvent{"world"}); REQUIRE(r.received.size() == 2); REQUIRE(r.received[0] == "hello"); REQUIRE(r.received[1] == "world"); } TEST_CASE("Shared_ptr instance subscription works", "[MessageBus]") { message_bus bus; const auto r = std::make_shared(); auto sub = bus.subscribe(r, &Receiver::on_event); bus.publish(ClassEvent{"foo"}); REQUIRE(r->received.size() == 1); REQUIRE(r->received[0] == "foo"); } TEST_CASE("RAII unsubscription works", "[MessageBus]") { message_bus bus; int counter = 0; { auto sub = bus.subscribe([&](const TestEvent& e) { counter += e.value; }); bus.publish(TestEvent{5}); REQUIRE(counter == 5); } // sub goes out of scope, unsubscribed bus.publish(TestEvent{5}); REQUIRE(counter == 5); } TEST_CASE("Type-erased AnyMessage publish works", "[MessageBus]") { message_bus bus; int counter = 0; auto sub = bus.subscribe([&](const TestEvent& e) { counter += e.value; }); const message msg(TestEvent{10}); bus.publish(msg); REQUIRE(counter == 10); } TEST_CASE("Multiple subscribers all receive messages", "[MessageBus]") { message_bus bus; int a = 0, b = 0; auto sub1 = bus.subscribe([&](const TestEvent& e) { a += e.value; }); auto sub2 = bus.subscribe([&](const TestEvent& e) { b += e.value; }); bus.publish(TestEvent{3}); REQUIRE(a == 3); REQUIRE(b == 3); } TEST_CASE("Stress test with multi-threaded publish and subscribe", "[MessageBus][stress]") { message_bus bus; std::atomic counter{0}; constexpr int numThreads = 8; constexpr int eventsPerThread = 5000; // Subscribe before launching publishers auto sub = bus.subscribe([&](const TestEvent& e) { counter.fetch_add(e.value, std::memory_order_relaxed); }); std::vector threads; threads.reserve(numThreads); // Publisher threads for (int t = 0; t < numThreads; ++t) { threads.emplace_back([&] { for (int i = 0; i < eventsPerThread; ++i) { bus.publish(TestEvent{1}); } }); } // Concurrent subscriber thread threads.emplace_back([&] { for (int i = 0; i < 1000; ++i) { auto tempSub = bus.subscribe([&](const TestEvent& e) { counter.fetch_add(e.value, std::memory_order_relaxed); }); std::this_thread::yield(); // simulate churn } }); for (auto& th : threads) { th.join(); } REQUIRE(counter.load(std::memory_order_relaxed) >= numThreads * eventsPerThread); }