extended statement_cache with message bus events
This commit is contained in:
parent
60cef7e938
commit
4f017ac1de
|
|
@ -4,6 +4,8 @@
|
|||
#include "matador/sql/executor.hpp"
|
||||
#include "matador/sql/statement.hpp"
|
||||
|
||||
#include "matador/utils/message_bus.hpp"
|
||||
|
||||
#include <list>
|
||||
#include <mutex>
|
||||
#include <unordered_map>
|
||||
|
|
@ -12,17 +14,18 @@ namespace matador::sql {
|
|||
|
||||
class connection_pool;
|
||||
|
||||
struct statement_cache_event {
|
||||
enum class Type { Accessed, Added, Evicted };
|
||||
Type type;
|
||||
struct statement_event {
|
||||
std::string sql;
|
||||
std::chrono::steady_clock::time_point timestamp;
|
||||
};
|
||||
|
||||
class statement_cache_observer_interface {
|
||||
public:
|
||||
virtual void on_event(const statement_cache_event&) = 0;
|
||||
virtual ~statement_cache_observer_interface() = default;
|
||||
struct statement_accessed_event : statement_event {};
|
||||
struct statement_added_event : statement_event {};
|
||||
struct statement_evicted_event : statement_event {};
|
||||
|
||||
|
||||
struct statement_cache_config {
|
||||
size_t max_size;
|
||||
};
|
||||
|
||||
class statement_cache final {
|
||||
|
|
@ -36,7 +39,7 @@ private:
|
|||
};
|
||||
|
||||
public:
|
||||
explicit statement_cache(connection_pool &pool, size_t max_size = 50);
|
||||
statement_cache(utils::message_bus &bus, connection_pool &pool, size_t max_size = 50);
|
||||
statement_cache(const statement_cache &) = delete;
|
||||
statement_cache &operator=(const statement_cache &) = delete;
|
||||
statement_cache(statement_cache &&) = delete;
|
||||
|
|
@ -49,13 +52,13 @@ public:
|
|||
[[nodiscard]] size_t capacity() const;
|
||||
[[nodiscard]] bool empty() const;
|
||||
|
||||
void subscribe(statement_cache_observer_interface &observer);
|
||||
template<class EventType>
|
||||
utils::subscription subscribe(std::function<void(const EventType&)> handler) {
|
||||
return bus_.subscribe<EventType>(handler);
|
||||
}
|
||||
|
||||
|
||||
private:
|
||||
void push(statement_cache_event::Type type, const std::string& sql) const;
|
||||
|
||||
private:
|
||||
|
||||
size_t max_size_{};
|
||||
std::list<size_t> usage_list_; // LRU: front = most recent, back = least recent
|
||||
std::unordered_map<size_t, cache_entry> cache_map_;
|
||||
|
|
@ -64,7 +67,8 @@ private:
|
|||
connection_pool &pool_;
|
||||
const sql::dialect &dialect_;
|
||||
|
||||
std::vector<statement_cache_observer_interface*> observers_;
|
||||
utils::message_bus &bus_;
|
||||
};
|
||||
|
||||
}
|
||||
#endif //STATEMENT_CACHE_HPP
|
||||
|
|
|
|||
|
|
@ -152,16 +152,16 @@ public:
|
|||
}
|
||||
};
|
||||
|
||||
std::function<bool(const T&)> filt = nullptr;
|
||||
std::function<bool(const T&)> local_filter = nullptr;
|
||||
if (filter) {
|
||||
filt = [w, filter = std::move(filter)](const T& m) -> bool {
|
||||
local_filter = [w, filter = std::move(filter)](const T& m) -> bool {
|
||||
if (w.expired()) {
|
||||
return false;
|
||||
}
|
||||
return filter(m);
|
||||
};
|
||||
}
|
||||
return subscribe<T>(std::move(handler), std::move(filt));
|
||||
return subscribe<T>(std::move(handler), std::move(local_filter));
|
||||
}
|
||||
|
||||
// Unsubscribe by type-specific id (RAII calls this)
|
||||
|
|
|
|||
|
|
@ -16,14 +16,25 @@ public:
|
|||
std::chrono::milliseconds max_wait{250};
|
||||
};
|
||||
|
||||
struct execution_metrics {
|
||||
std::chrono::steady_clock::time_point lock_attempt_start;
|
||||
std::chrono::steady_clock::time_point lock_acquired{};
|
||||
std::chrono::steady_clock::time_point execution_start{};
|
||||
std::chrono::steady_clock::time_point execution_end{};
|
||||
size_t lock_attempts{0};
|
||||
};
|
||||
|
||||
explicit statement_cache_proxy(std::unique_ptr<statement_impl>&& stmt, connection_pool &pool, const size_t connection_id)
|
||||
: statement_proxy(std::move(stmt))
|
||||
, pool_(pool)
|
||||
, connection_id_(connection_id) {}
|
||||
|
||||
utils::result<size_t, utils::error> execute(interface::parameter_binder& bindings) override {
|
||||
auto result = try_with_retry([this, &bindings]() -> utils::result<size_t, utils::error> {
|
||||
execution_metrics metrics{std::chrono::steady_clock::now()};
|
||||
|
||||
auto result = try_with_retry([this, &bindings, &metrics]() -> utils::result<size_t, utils::error> {
|
||||
if (!try_lock()) {
|
||||
++metrics.lock_attempts;
|
||||
return utils::failure(utils::error{
|
||||
error_code::STATEMENT_LOCKED,
|
||||
"Failed to execute statement because it is already in use"
|
||||
|
|
@ -45,8 +56,11 @@ public:
|
|||
}
|
||||
|
||||
utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(interface::parameter_binder& bindings) override {
|
||||
auto result = try_with_retry([this, &bindings]() -> utils::result<std::unique_ptr<query_result_impl>, utils::error> {
|
||||
execution_metrics metrics{std::chrono::steady_clock::now()};
|
||||
|
||||
auto result = try_with_retry([this, &bindings, &metrics]() -> utils::result<std::unique_ptr<query_result_impl>, utils::error> {
|
||||
if (!try_lock()) {
|
||||
++metrics.lock_attempts;
|
||||
return utils::failure(utils::error{
|
||||
error_code::STATEMENT_LOCKED,
|
||||
"Failed to execute statement because it is already in use"
|
||||
|
|
@ -114,10 +128,11 @@ private:
|
|||
};
|
||||
|
||||
}
|
||||
statement_cache::statement_cache(connection_pool &pool, const size_t max_size)
|
||||
statement_cache::statement_cache(utils::message_bus &bus, connection_pool &pool, const size_t max_size)
|
||||
: max_size_(max_size)
|
||||
, pool_(pool)
|
||||
, dialect_(backend_provider::instance().connection_dialect(pool_.info().type)) {}
|
||||
, dialect_(backend_provider::instance().connection_dialect(pool_.info().type))
|
||||
, bus_(bus) {}
|
||||
|
||||
utils::result<statement, utils::error> statement_cache::acquire(const query_context& ctx) {
|
||||
std::unique_lock lock(mutex_);
|
||||
|
|
@ -128,7 +143,7 @@ utils::result<statement, utils::error> statement_cache::acquire(const query_cont
|
|||
if (const auto it = cache_map_.find(key); it != cache_map_.end()) {
|
||||
usage_list_.splice(usage_list_.begin(), usage_list_, it->second.position);
|
||||
it->second.last_access = now;
|
||||
push(statement_cache_event::Type::Accessed, ctx.sql);
|
||||
bus_.publish<statement_accessed_event>({ctx.sql, std::chrono::steady_clock::now()});
|
||||
return utils::ok(it->second.stmt);
|
||||
}
|
||||
// Prepare a new statement
|
||||
|
|
@ -149,7 +164,7 @@ utils::result<statement, utils::error> statement_cache::acquire(const query_cont
|
|||
const auto& key_to_remove = usage_list_.back();
|
||||
cache_map_.erase(key_to_remove);
|
||||
usage_list_.pop_back();
|
||||
push(statement_cache_event::Type::Evicted, ctx.sql);
|
||||
bus_.publish<statement_evicted_event>({ctx.sql, std::chrono::steady_clock::now()});
|
||||
}
|
||||
|
||||
usage_list_.push_front(key);
|
||||
|
|
@ -160,7 +175,7 @@ utils::result<statement, utils::error> statement_cache::acquire(const query_cont
|
|||
std::chrono::steady_clock::now(),
|
||||
usage_list_.begin()}
|
||||
}).first;
|
||||
push(statement_cache_event::Type::Added, ctx.sql);
|
||||
bus_.publish<statement_added_event>({ctx.sql, std::chrono::steady_clock::now()});
|
||||
|
||||
return utils::ok(it->second.stmt);
|
||||
}
|
||||
|
|
@ -176,18 +191,4 @@ bool statement_cache::empty() const {
|
|||
return cache_map_.empty();
|
||||
}
|
||||
|
||||
void statement_cache::subscribe(statement_cache_observer_interface &observer) {
|
||||
observers_.push_back(&observer);
|
||||
}
|
||||
|
||||
void statement_cache::push(statement_cache_event::Type type, const std::string &sql) const {
|
||||
using Clock = std::chrono::steady_clock;
|
||||
const auto ts = Clock::now();
|
||||
const statement_cache_event evt{type, sql, ts};
|
||||
for (auto& obs : observers_) {
|
||||
if (obs) {
|
||||
obs->on_event(evt);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,8 @@
|
|||
#include "matador/sql/error_code.hpp"
|
||||
#include "matador/sql/statement_cache.hpp"
|
||||
|
||||
#include "matador/utils/message_bus.hpp"
|
||||
|
||||
#include "../backend/test_backend_service.hpp"
|
||||
|
||||
#include "ConnectionPoolFixture.hpp"
|
||||
|
|
@ -18,32 +20,47 @@
|
|||
using namespace matador::test;
|
||||
using namespace matador::sql;
|
||||
using namespace matador::query;
|
||||
using namespace matador::utils;
|
||||
|
||||
class RecordingObserver final : public statement_cache_observer_interface {
|
||||
class RecordingObserver final {
|
||||
public:
|
||||
void on_event(const statement_cache_event& evt) override {
|
||||
explicit RecordingObserver(message_bus &bus) {
|
||||
subscriptions.push_back(bus.subscribe<statement_accessed_event>([this](const statement_accessed_event &ev) {
|
||||
std::lock_guard lock(mutex);
|
||||
events.push(evt);
|
||||
events.push(message::from_ref(ev));
|
||||
}));
|
||||
subscriptions.push_back(bus.subscribe<statement_added_event>([this](const statement_added_event &ev) {
|
||||
std::lock_guard lock(mutex);
|
||||
events.push(message::from_ref(ev));
|
||||
}));
|
||||
subscriptions.push_back(bus.subscribe<statement_evicted_event>([this](const statement_evicted_event &ev) {
|
||||
std::lock_guard lock(mutex);
|
||||
events.push(message::from_ref(ev));
|
||||
}));
|
||||
}
|
||||
|
||||
std::optional<statement_cache_event> poll() {
|
||||
std::optional<message> poll() {
|
||||
std::lock_guard lock(mutex);
|
||||
if (events.empty()) return std::nullopt;
|
||||
if (events.empty()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
auto evt = events.front();
|
||||
events.pop();
|
||||
return evt;
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<subscription> subscriptions;
|
||||
std::mutex mutex;
|
||||
std::queue<statement_cache_event> events;
|
||||
std::queue<message> events;
|
||||
};
|
||||
|
||||
TEST_CASE("Test statement cache", "[statement][cache]") {
|
||||
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
|
||||
|
||||
matador::utils::message_bus bus;
|
||||
connection_pool pool("noop://noop.db", 4);
|
||||
statement_cache cache(pool, 2);
|
||||
statement_cache cache(bus, pool, 2);
|
||||
|
||||
query_context ctx;
|
||||
ctx.sql = "SELECT * FROM person";
|
||||
|
|
@ -83,9 +100,9 @@ TEST_CASE("Test LRU cache evicts oldest entries", "[statement][cache][evict]") {
|
|||
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
|
||||
|
||||
connection_pool pool("noop://noop.db", 4);
|
||||
statement_cache cache(pool, 2);
|
||||
RecordingObserver observer;
|
||||
cache.subscribe(observer);
|
||||
message_bus bus;
|
||||
statement_cache cache(bus, pool, 2);
|
||||
RecordingObserver observer(bus);
|
||||
|
||||
REQUIRE(cache.capacity() == 2);
|
||||
REQUIRE(cache.empty());
|
||||
|
|
@ -116,8 +133,12 @@ TEST_CASE("Test LRU cache evicts oldest entries", "[statement][cache][evict]") {
|
|||
|
||||
int added = 0, evicted = 0;
|
||||
while (auto e = observer.poll()) {
|
||||
if (e->type == statement_cache_event::Type::Added) added++;
|
||||
if (e->type == statement_cache_event::Type::Evicted) evicted++;
|
||||
if (e->is<statement_added_event>()) {
|
||||
added++;
|
||||
}
|
||||
if (e->is<statement_evicted_event>()) {
|
||||
evicted++;
|
||||
}
|
||||
}
|
||||
REQUIRE(added >= 3);
|
||||
REQUIRE(evicted >= 1);
|
||||
|
|
@ -127,9 +148,9 @@ TEST_CASE("Test statement reuse avoids reprepare", "[statement][cache][prepare]"
|
|||
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
|
||||
|
||||
connection_pool pool("noop://noop.db", 4);
|
||||
statement_cache cache(pool, 2);
|
||||
RecordingObserver observer;
|
||||
cache.subscribe(observer);
|
||||
message_bus bus;
|
||||
statement_cache cache(bus, pool, 2);
|
||||
RecordingObserver observer(bus);
|
||||
|
||||
REQUIRE(cache.capacity() == 2);
|
||||
REQUIRE(cache.empty());
|
||||
|
|
@ -140,8 +161,6 @@ TEST_CASE("Test statement reuse avoids reprepare", "[statement][cache][prepare]"
|
|||
result = cache.acquire({"SELECT * FROM person"});
|
||||
REQUIRE(result);
|
||||
auto stmt2 = result.value();
|
||||
|
||||
|
||||
}
|
||||
|
||||
TEST_CASE("Multithreaded stress test", "[statement][cache][stress]") {
|
||||
|
|
@ -157,9 +176,9 @@ TEST_CASE("Multithreaded stress test", "[statement][cache][stress]") {
|
|||
}
|
||||
|
||||
connection_pool pool("noop://noop.db", 4);
|
||||
statement_cache cache(pool, 5);
|
||||
RecordingObserver observer;
|
||||
cache.subscribe(observer);
|
||||
message_bus bus;
|
||||
statement_cache cache(bus, pool, 5);
|
||||
RecordingObserver observer(bus);
|
||||
|
||||
auto start_time = std::chrono::steady_clock::now();
|
||||
|
||||
|
|
@ -203,7 +222,7 @@ TEST_CASE("Multithreaded stress test", "[statement][cache][stress]") {
|
|||
// Some events should be generated
|
||||
int accessed = 0;
|
||||
while (auto e = observer.poll()) {
|
||||
if (e->type == statement_cache_event::Type::Accessed) accessed++;
|
||||
if (e->is<statement_accessed_event>()) accessed++;
|
||||
}
|
||||
REQUIRE(accessed > 0);
|
||||
}
|
||||
|
|
@ -212,12 +231,13 @@ TEST_CASE("Race condition simulation with mixed access", "[statement_cache][race
|
|||
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
|
||||
|
||||
connection_pool pool("noop://noop.db", 4);
|
||||
statement_cache cache(pool, 5);
|
||||
message_bus bus;
|
||||
statement_cache cache(bus, pool, 5);
|
||||
|
||||
constexpr int threads = 8;
|
||||
constexpr int operations = 500;
|
||||
|
||||
auto task = [&](int id) {
|
||||
auto task = [&](int /*id*/) {
|
||||
for (int i = 0; i < operations; ++i) {
|
||||
auto sql = "SELECT " + std::to_string(i % 10);
|
||||
auto result = cache.acquire({sql});
|
||||
|
|
|
|||
Loading…
Reference in New Issue