194 lines
5.8 KiB
C++
194 lines
5.8 KiB
C++
#include "matador/sql/statement_cache.hpp"
|
|
#include "matador/sql/backend_provider.hpp"
|
|
#include "matador/sql/error_code.hpp"
|
|
#include "matador/sql/connection_pool.hpp"
|
|
|
|
#include <atomic>
|
|
#include <thread>
|
|
|
|
namespace matador::sql {
|
|
namespace internal {
|
|
class statement_cache_proxy final : public statement_proxy {
|
|
public:
|
|
struct retry_config {
|
|
size_t max_attempts{10};
|
|
std::chrono::milliseconds initial_wait{10};
|
|
std::chrono::milliseconds max_wait{250};
|
|
};
|
|
|
|
explicit statement_cache_proxy(std::unique_ptr<statement_impl>&& stmt, connection_pool &pool, const size_t connection_id)
|
|
: statement_proxy(std::move(stmt))
|
|
, pool_(pool)
|
|
, connection_id_(connection_id) {}
|
|
|
|
utils::result<size_t, utils::error> execute(interface::parameter_binder& bindings) override {
|
|
auto result = try_with_retry([this, &bindings]() -> utils::result<size_t, utils::error> {
|
|
if (!try_lock()) {
|
|
return utils::failure(utils::error{
|
|
error_code::STATEMENT_LOCKED,
|
|
"Failed to execute statement because it is already in use"
|
|
});
|
|
}
|
|
|
|
auto guard = statement_guard(*this);
|
|
if (auto conn = pool_.acquire(connection_id_); !conn.valid()) {
|
|
return utils::failure(utils::error{
|
|
error_code::EXECUTE_FAILED,
|
|
"Failed to execute statement because couldn't lock connection"
|
|
});
|
|
}
|
|
|
|
return statement_->execute(bindings);
|
|
});
|
|
|
|
return result;
|
|
}
|
|
|
|
utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(interface::parameter_binder& bindings) override {
|
|
auto result = try_with_retry([this, &bindings]() -> utils::result<std::unique_ptr<query_result_impl>, utils::error> {
|
|
if (!try_lock()) {
|
|
return utils::failure(utils::error{
|
|
error_code::STATEMENT_LOCKED,
|
|
"Failed to execute statement because it is already in use"
|
|
});
|
|
}
|
|
auto guard = statement_guard(*this);
|
|
if (const auto conn = pool_.acquire(connection_id_); !conn.valid()) {
|
|
return utils::failure(utils::error{
|
|
error_code::EXECUTE_FAILED,
|
|
"Failed to execute statement because couldn't lock connection"
|
|
});
|
|
}
|
|
return statement_->fetch(bindings);
|
|
});
|
|
|
|
return result;
|
|
}
|
|
|
|
protected:
|
|
[[nodiscard]] bool try_lock() {
|
|
bool expected = false;
|
|
return locked_.compare_exchange_strong(expected, true);
|
|
}
|
|
|
|
template<typename Func>
|
|
[[nodiscard]] auto try_with_retry(Func &&func) -> decltype(func()) {
|
|
auto current_wait = config_.initial_wait;
|
|
|
|
for (size_t attempt = 0; attempt < config_.max_attempts; ++attempt) {
|
|
if (auto result = func(); result.is_ok() ||
|
|
result.err().ec() != error_code::STATEMENT_LOCKED) {
|
|
return result;
|
|
}
|
|
|
|
if (attempt + 1 < config_.max_attempts) {
|
|
std::this_thread::sleep_for(current_wait);
|
|
current_wait = std::min(current_wait * 2, config_.max_wait);
|
|
}
|
|
}
|
|
|
|
return utils::failure(utils::error{
|
|
error_code::STATEMENT_LOCKED,
|
|
"Failed to execute statement because it is already in use"
|
|
});
|
|
}
|
|
|
|
void unlock() {
|
|
locked_.store(false);
|
|
}
|
|
|
|
private:
|
|
struct statement_guard {
|
|
explicit statement_guard(statement_cache_proxy &statement_proxy)
|
|
: proxy(statement_proxy) {}
|
|
~statement_guard() { proxy.unlock(); }
|
|
|
|
statement_cache_proxy &proxy;
|
|
};
|
|
|
|
private:
|
|
std::atomic_bool locked_{false};
|
|
connection_pool &pool_;
|
|
size_t connection_id_{};
|
|
retry_config config_{};
|
|
};
|
|
|
|
}
|
|
statement_cache::statement_cache(connection_pool &pool, const size_t max_size)
|
|
: max_size_(max_size)
|
|
, pool_(pool)
|
|
, dialect_(backend_provider::instance().connection_dialect(pool_.info().type)) {}
|
|
|
|
utils::result<statement, utils::error> statement_cache::acquire(const query_context& ctx) {
|
|
std::unique_lock lock(mutex_);
|
|
// hash statement
|
|
const auto key = std::hash<std::string>{}(ctx.sql);
|
|
const auto now = std::chrono::steady_clock::now();
|
|
// Found in cache. Move it to of the LRU list
|
|
if (const auto it = cache_map_.find(key); it != cache_map_.end()) {
|
|
usage_list_.splice(usage_list_.begin(), usage_list_, it->second.position);
|
|
it->second.last_access = now;
|
|
push(statement_cache_event::Type::Accessed, ctx.sql);
|
|
return utils::ok(it->second.stmt);
|
|
}
|
|
// Prepare a new statement
|
|
// acquire pool connection
|
|
size_t id{};
|
|
std::unique_ptr<statement_impl> stmt;
|
|
{
|
|
const auto conn = pool_.acquire();
|
|
auto result = conn->perform_prepare(ctx);
|
|
if (!result) {
|
|
return utils::failure(utils::error{error_code::PREPARE_FAILED, std::string("Failed to prepare")});
|
|
}
|
|
id = conn.id().value();
|
|
stmt = result.release();
|
|
}
|
|
// If cache max size reached ensure space
|
|
if (cache_map_.size() >= max_size_) {
|
|
const auto& key_to_remove = usage_list_.back();
|
|
cache_map_.erase(key_to_remove);
|
|
usage_list_.pop_back();
|
|
push(statement_cache_event::Type::Evicted, ctx.sql);
|
|
}
|
|
|
|
usage_list_.push_front(key);
|
|
const auto it = cache_map_.insert({
|
|
key,
|
|
{statement{
|
|
std::make_shared<internal::statement_cache_proxy>(std::move(stmt), pool_, id)},
|
|
std::chrono::steady_clock::now(),
|
|
usage_list_.begin()}
|
|
}).first;
|
|
push(statement_cache_event::Type::Added, ctx.sql);
|
|
|
|
return utils::ok(it->second.stmt);
|
|
}
|
|
|
|
size_t statement_cache::size() const {
|
|
return cache_map_.size();
|
|
}
|
|
size_t statement_cache::capacity() const {
|
|
return max_size_;
|
|
}
|
|
|
|
bool statement_cache::empty() const {
|
|
return cache_map_.empty();
|
|
}
|
|
|
|
void statement_cache::subscribe(statement_cache_observer_interface &observer) {
|
|
observers_.push_back(&observer);
|
|
}
|
|
|
|
void statement_cache::push(statement_cache_event::Type type, const std::string &sql) const {
|
|
using Clock = std::chrono::steady_clock;
|
|
const auto ts = Clock::now();
|
|
const statement_cache_event evt{type, sql, ts};
|
|
for (auto& obs : observers_) {
|
|
if (obs) {
|
|
obs->on_event(evt);
|
|
}
|
|
}
|
|
}
|
|
}
|