From 4f017ac1de72f032761f415a61b94a622411fca9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20K=C3=BChl?= Date: Sun, 10 Aug 2025 22:12:13 +0200 Subject: [PATCH] extended statement_cache with message bus events --- include/matador/sql/statement_cache.hpp | 32 +++++++----- include/matador/utils/message_bus.hpp | 6 +-- source/orm/sql/statement_cache.cpp | 43 ++++++++-------- test/orm/sql/StatementCacheTest.cpp | 68 ++++++++++++++++--------- 4 files changed, 87 insertions(+), 62 deletions(-) diff --git a/include/matador/sql/statement_cache.hpp b/include/matador/sql/statement_cache.hpp index 7921254..3f7da79 100644 --- a/include/matador/sql/statement_cache.hpp +++ b/include/matador/sql/statement_cache.hpp @@ -4,6 +4,8 @@ #include "matador/sql/executor.hpp" #include "matador/sql/statement.hpp" +#include "matador/utils/message_bus.hpp" + #include #include #include @@ -12,17 +14,18 @@ namespace matador::sql { class connection_pool; -struct statement_cache_event { - enum class Type { Accessed, Added, Evicted }; - Type type; +struct statement_event { std::string sql; std::chrono::steady_clock::time_point timestamp; }; -class statement_cache_observer_interface { -public: - virtual void on_event(const statement_cache_event&) = 0; - virtual ~statement_cache_observer_interface() = default; +struct statement_accessed_event : statement_event {}; +struct statement_added_event : statement_event {}; +struct statement_evicted_event : statement_event {}; + + +struct statement_cache_config { + size_t max_size; }; class statement_cache final { @@ -36,7 +39,7 @@ private: }; public: - explicit statement_cache(connection_pool &pool, size_t max_size = 50); + statement_cache(utils::message_bus &bus, connection_pool &pool, size_t max_size = 50); statement_cache(const statement_cache &) = delete; statement_cache &operator=(const statement_cache &) = delete; statement_cache(statement_cache &&) = delete; @@ -49,13 +52,13 @@ public: [[nodiscard]] size_t capacity() const; [[nodiscard]] bool empty() const; - void subscribe(statement_cache_observer_interface &observer); + template + utils::subscription subscribe(std::function handler) { + return bus_.subscribe(handler); + } + private: - void push(statement_cache_event::Type type, const std::string& sql) const; - -private: - size_t max_size_{}; std::list usage_list_; // LRU: front = most recent, back = least recent std::unordered_map cache_map_; @@ -64,7 +67,8 @@ private: connection_pool &pool_; const sql::dialect &dialect_; - std::vector observers_; + utils::message_bus &bus_; }; + } #endif //STATEMENT_CACHE_HPP diff --git a/include/matador/utils/message_bus.hpp b/include/matador/utils/message_bus.hpp index e99d74d..a972e8a 100644 --- a/include/matador/utils/message_bus.hpp +++ b/include/matador/utils/message_bus.hpp @@ -152,16 +152,16 @@ public: } }; - std::function filt = nullptr; + std::function local_filter = nullptr; if (filter) { - filt = [w, filter = std::move(filter)](const T& m) -> bool { + local_filter = [w, filter = std::move(filter)](const T& m) -> bool { if (w.expired()) { return false; } return filter(m); }; } - return subscribe(std::move(handler), std::move(filt)); + return subscribe(std::move(handler), std::move(local_filter)); } // Unsubscribe by type-specific id (RAII calls this) diff --git a/source/orm/sql/statement_cache.cpp b/source/orm/sql/statement_cache.cpp index 4efe14d..17fc66b 100644 --- a/source/orm/sql/statement_cache.cpp +++ b/source/orm/sql/statement_cache.cpp @@ -16,14 +16,25 @@ public: std::chrono::milliseconds max_wait{250}; }; + struct execution_metrics { + std::chrono::steady_clock::time_point lock_attempt_start; + std::chrono::steady_clock::time_point lock_acquired{}; + std::chrono::steady_clock::time_point execution_start{}; + std::chrono::steady_clock::time_point execution_end{}; + size_t lock_attempts{0}; + }; + explicit statement_cache_proxy(std::unique_ptr&& stmt, connection_pool &pool, const size_t connection_id) : statement_proxy(std::move(stmt)) , pool_(pool) , connection_id_(connection_id) {} utils::result execute(interface::parameter_binder& bindings) override { - auto result = try_with_retry([this, &bindings]() -> utils::result { + execution_metrics metrics{std::chrono::steady_clock::now()}; + + auto result = try_with_retry([this, &bindings, &metrics]() -> utils::result { if (!try_lock()) { + ++metrics.lock_attempts; return utils::failure(utils::error{ error_code::STATEMENT_LOCKED, "Failed to execute statement because it is already in use" @@ -45,8 +56,11 @@ public: } utils::result, utils::error> fetch(interface::parameter_binder& bindings) override { - auto result = try_with_retry([this, &bindings]() -> utils::result, utils::error> { + execution_metrics metrics{std::chrono::steady_clock::now()}; + + auto result = try_with_retry([this, &bindings, &metrics]() -> utils::result, utils::error> { if (!try_lock()) { + ++metrics.lock_attempts; return utils::failure(utils::error{ error_code::STATEMENT_LOCKED, "Failed to execute statement because it is already in use" @@ -114,10 +128,11 @@ private: }; } -statement_cache::statement_cache(connection_pool &pool, const size_t max_size) +statement_cache::statement_cache(utils::message_bus &bus, connection_pool &pool, const size_t max_size) : max_size_(max_size) , pool_(pool) -, dialect_(backend_provider::instance().connection_dialect(pool_.info().type)) {} +, dialect_(backend_provider::instance().connection_dialect(pool_.info().type)) +, bus_(bus) {} utils::result statement_cache::acquire(const query_context& ctx) { std::unique_lock lock(mutex_); @@ -128,7 +143,7 @@ utils::result statement_cache::acquire(const query_cont if (const auto it = cache_map_.find(key); it != cache_map_.end()) { usage_list_.splice(usage_list_.begin(), usage_list_, it->second.position); it->second.last_access = now; - push(statement_cache_event::Type::Accessed, ctx.sql); + bus_.publish({ctx.sql, std::chrono::steady_clock::now()}); return utils::ok(it->second.stmt); } // Prepare a new statement @@ -149,7 +164,7 @@ utils::result statement_cache::acquire(const query_cont const auto& key_to_remove = usage_list_.back(); cache_map_.erase(key_to_remove); usage_list_.pop_back(); - push(statement_cache_event::Type::Evicted, ctx.sql); + bus_.publish({ctx.sql, std::chrono::steady_clock::now()}); } usage_list_.push_front(key); @@ -160,7 +175,7 @@ utils::result statement_cache::acquire(const query_cont std::chrono::steady_clock::now(), usage_list_.begin()} }).first; - push(statement_cache_event::Type::Added, ctx.sql); + bus_.publish({ctx.sql, std::chrono::steady_clock::now()}); return utils::ok(it->second.stmt); } @@ -176,18 +191,4 @@ bool statement_cache::empty() const { return cache_map_.empty(); } -void statement_cache::subscribe(statement_cache_observer_interface &observer) { - observers_.push_back(&observer); -} - -void statement_cache::push(statement_cache_event::Type type, const std::string &sql) const { - using Clock = std::chrono::steady_clock; - const auto ts = Clock::now(); - const statement_cache_event evt{type, sql, ts}; - for (auto& obs : observers_) { - if (obs) { - obs->on_event(evt); - } - } -} } diff --git a/test/orm/sql/StatementCacheTest.cpp b/test/orm/sql/StatementCacheTest.cpp index e6cbdac..c77df4a 100644 --- a/test/orm/sql/StatementCacheTest.cpp +++ b/test/orm/sql/StatementCacheTest.cpp @@ -7,6 +7,8 @@ #include "matador/sql/error_code.hpp" #include "matador/sql/statement_cache.hpp" +#include "matador/utils/message_bus.hpp" + #include "../backend/test_backend_service.hpp" #include "ConnectionPoolFixture.hpp" @@ -18,32 +20,47 @@ using namespace matador::test; using namespace matador::sql; using namespace matador::query; +using namespace matador::utils; -class RecordingObserver final : public statement_cache_observer_interface { +class RecordingObserver final { public: - void on_event(const statement_cache_event& evt) override { - std::lock_guard lock(mutex); - events.push(evt); + explicit RecordingObserver(message_bus &bus) { + subscriptions.push_back(bus.subscribe([this](const statement_accessed_event &ev) { + std::lock_guard lock(mutex); + events.push(message::from_ref(ev)); + })); + subscriptions.push_back(bus.subscribe([this](const statement_added_event &ev) { + std::lock_guard lock(mutex); + events.push(message::from_ref(ev)); + })); + subscriptions.push_back(bus.subscribe([this](const statement_evicted_event &ev) { + std::lock_guard lock(mutex); + events.push(message::from_ref(ev)); + })); } - std::optional poll() { + std::optional poll() { std::lock_guard lock(mutex); - if (events.empty()) return std::nullopt; + if (events.empty()) { + return std::nullopt; + } auto evt = events.front(); events.pop(); return evt; } private: + std::vector subscriptions; std::mutex mutex; - std::queue events; + std::queue events; }; TEST_CASE("Test statement cache", "[statement][cache]") { backend_provider::instance().register_backend("noop", std::make_unique()); + matador::utils::message_bus bus; connection_pool pool("noop://noop.db", 4); - statement_cache cache(pool, 2); + statement_cache cache(bus, pool, 2); query_context ctx; ctx.sql = "SELECT * FROM person"; @@ -83,9 +100,9 @@ TEST_CASE("Test LRU cache evicts oldest entries", "[statement][cache][evict]") { backend_provider::instance().register_backend("noop", std::make_unique()); connection_pool pool("noop://noop.db", 4); - statement_cache cache(pool, 2); - RecordingObserver observer; - cache.subscribe(observer); + message_bus bus; + statement_cache cache(bus, pool, 2); + RecordingObserver observer(bus); REQUIRE(cache.capacity() == 2); REQUIRE(cache.empty()); @@ -116,8 +133,12 @@ TEST_CASE("Test LRU cache evicts oldest entries", "[statement][cache][evict]") { int added = 0, evicted = 0; while (auto e = observer.poll()) { - if (e->type == statement_cache_event::Type::Added) added++; - if (e->type == statement_cache_event::Type::Evicted) evicted++; + if (e->is()) { + added++; + } + if (e->is()) { + evicted++; + } } REQUIRE(added >= 3); REQUIRE(evicted >= 1); @@ -127,9 +148,9 @@ TEST_CASE("Test statement reuse avoids reprepare", "[statement][cache][prepare]" backend_provider::instance().register_backend("noop", std::make_unique()); connection_pool pool("noop://noop.db", 4); - statement_cache cache(pool, 2); - RecordingObserver observer; - cache.subscribe(observer); + message_bus bus; + statement_cache cache(bus, pool, 2); + RecordingObserver observer(bus); REQUIRE(cache.capacity() == 2); REQUIRE(cache.empty()); @@ -140,8 +161,6 @@ TEST_CASE("Test statement reuse avoids reprepare", "[statement][cache][prepare]" result = cache.acquire({"SELECT * FROM person"}); REQUIRE(result); auto stmt2 = result.value(); - - } TEST_CASE("Multithreaded stress test", "[statement][cache][stress]") { @@ -157,9 +176,9 @@ TEST_CASE("Multithreaded stress test", "[statement][cache][stress]") { } connection_pool pool("noop://noop.db", 4); - statement_cache cache(pool, 5); - RecordingObserver observer; - cache.subscribe(observer); + message_bus bus; + statement_cache cache(bus, pool, 5); + RecordingObserver observer(bus); auto start_time = std::chrono::steady_clock::now(); @@ -203,7 +222,7 @@ TEST_CASE("Multithreaded stress test", "[statement][cache][stress]") { // Some events should be generated int accessed = 0; while (auto e = observer.poll()) { - if (e->type == statement_cache_event::Type::Accessed) accessed++; + if (e->is()) accessed++; } REQUIRE(accessed > 0); } @@ -212,12 +231,13 @@ TEST_CASE("Race condition simulation with mixed access", "[statement_cache][race backend_provider::instance().register_backend("noop", std::make_unique()); connection_pool pool("noop://noop.db", 4); - statement_cache cache(pool, 5); + message_bus bus; + statement_cache cache(bus, pool, 5); constexpr int threads = 8; constexpr int operations = 500; - auto task = [&](int id) { + auto task = [&](int /*id*/) { for (int i = 0; i < operations; ++i) { auto sql = "SELECT " + std::to_string(i % 10); auto result = cache.acquire({sql});