diff --git a/demo/work.cpp b/demo/work.cpp index bf13acf..7c23005 100644 --- a/demo/work.cpp +++ b/demo/work.cpp @@ -75,8 +75,8 @@ int main() { const object::schema schema("Administration"); - sql::connection_pool pool("postgres://news:news@127.0.0.1:15432/matador", 4); - // sql::connection_pool pool("postgres://test:test123!@127.0.0.1:5432/matador", 4); + // sql::connection_pool pool("postgres://news:news@127.0.0.1:15432/matador", 4); + sql::connection_pool pool("postgres://test:test123!@127.0.0.1:5432/matador", 4); orm::session ses(pool); @@ -121,9 +121,4 @@ int main() { // } return 0; -} - -// registering table 'collection_center' (pk field: 'id') -// registering relation table 'collection_center_users' (first fk field: 'collection_center_id', second fk field: 'user_id') -// registering table 'internal_user_directory' (pk field: 'id') -// \ No newline at end of file +} \ No newline at end of file diff --git a/include/matador/sql/interface/statement_impl.hpp b/include/matador/sql/interface/statement_impl.hpp index fdc3d30..6cea4d9 100644 --- a/include/matador/sql/interface/statement_impl.hpp +++ b/include/matador/sql/interface/statement_impl.hpp @@ -26,14 +26,14 @@ public: virtual utils::result, utils::error> fetch(const interface::parameter_binder& bindings) = 0; template < class Type > - void bind_object(Type &obj, const interface::parameter_binder& bindings) { + void bind_object(Type &obj, interface::parameter_binder& bindings) { object_parameter_binder object_binder_; object_binder_.reset(start_index()); object_binder_.bind(obj, bindings); } template < class Type > - void bind(const size_t pos, Type &val, const interface::parameter_binder& bindings) { + void bind(const size_t pos, Type &val, interface::parameter_binder& bindings) { utils::data_type_traits::bind_value(bindings, adjust_index(pos), val); } diff --git a/include/matador/sql/interface/statement_proxy.hpp b/include/matador/sql/interface/statement_proxy.hpp index b4ac207..d9fe77f 100644 --- a/include/matador/sql/interface/statement_proxy.hpp +++ b/include/matador/sql/interface/statement_proxy.hpp @@ -16,11 +16,11 @@ public: virtual utils::result, utils::error> fetch(interface::parameter_binder& bindings) = 0; template - void bind(const Type &obj, const interface::parameter_binder& bindings) { + void bind(const Type &obj, interface::parameter_binder& bindings) { statement_->bind_object(obj, bindings); } template - void bind(size_t pos, Type &value, const interface::parameter_binder& bindings) { + void bind(size_t pos, Type &value, interface::parameter_binder& bindings) { statement_->bind(pos, value, bindings); } void bind(size_t pos, const char *value, size_t size, interface::parameter_binder& bindings) const; @@ -28,6 +28,8 @@ public: void reset() const; + [[nodiscard]] std::string sql() const; + [[nodiscard]] std::unique_ptr create_binder() const; protected: diff --git a/include/matador/sql/query_context.hpp b/include/matador/sql/query_context.hpp index b76aa23..614bfcd 100644 --- a/include/matador/sql/query_context.hpp +++ b/include/matador/sql/query_context.hpp @@ -31,17 +31,17 @@ struct sql_command_info { struct query_context { std::string sql; sql_command command{}; - std::string command_name; + std::string command_name{}; sql::table table{""}; - std::vector prototype; - std::vector result_vars; - std::vector bind_vars; - std::vector bind_types; + std::vector prototype{}; + std::vector result_vars{}; + std::vector bind_vars{}; + std::vector bind_types{}; - std::unordered_map column_aliases; - std::unordered_map table_aliases; + std::unordered_map column_aliases{}; + std::unordered_map table_aliases{}; - std::vector additional_commands; + std::vector additional_commands{}; }; } diff --git a/include/matador/sql/statement.hpp b/include/matador/sql/statement.hpp index 1148c01..153b29a 100644 --- a/include/matador/sql/statement.hpp +++ b/include/matador/sql/statement.hpp @@ -5,7 +5,6 @@ #include "matador/sql/query_result.hpp" #include "matador/sql/interface/statement_proxy.hpp" -#include "matador/sql/interface/parameter_binder.hpp" #include "matador/utils/error.hpp" #include "matador/utils/result.hpp" @@ -122,6 +121,8 @@ public: */ void reset() const; + [[nodiscard]] std::string sql() const; + private: template friend class detail::identifier_binder; @@ -140,7 +141,7 @@ statement &statement::bind(size_t pos, Type &value) { template statement &statement::bind(const Type &obj) { - statement_proxy_->bind(obj, bindings_); + statement_proxy_->bind(obj, *bindings_); return *this; } diff --git a/include/matador/sql/statement_cache.hpp b/include/matador/sql/statement_cache.hpp index 887ef15..7921254 100644 --- a/include/matador/sql/statement_cache.hpp +++ b/include/matador/sql/statement_cache.hpp @@ -2,8 +2,7 @@ #define STATEMENT_CACHE_HPP #include "matador/sql/executor.hpp" - -#include "matador/sql/interface/statement_proxy.hpp" +#include "matador/sql/statement.hpp" #include #include @@ -13,9 +12,36 @@ namespace matador::sql { class connection_pool; +struct statement_cache_event { + enum class Type { Accessed, Added, Evicted }; + Type type; + std::string sql; + std::chrono::steady_clock::time_point timestamp; +}; + +class statement_cache_observer_interface { +public: + virtual void on_event(const statement_cache_event&) = 0; + virtual ~statement_cache_observer_interface() = default; +}; + class statement_cache final { +private: + using list_iterator = std::list::iterator; + + struct cache_entry { + statement stmt; + std::chrono::steady_clock::time_point last_access; + list_iterator position; + }; + public: explicit statement_cache(connection_pool &pool, size_t max_size = 50); + statement_cache(const statement_cache &) = delete; + statement_cache &operator=(const statement_cache &) = delete; + statement_cache(statement_cache &&) = delete; + statement_cache &operator=(statement_cache &&) = delete; + ~statement_cache() = default; [[nodiscard]] utils::result acquire(const query_context &ctx); @@ -23,16 +49,22 @@ public: [[nodiscard]] size_t capacity() const; [[nodiscard]] bool empty() const; + void subscribe(statement_cache_observer_interface &observer); + +private: + void push(statement_cache_event::Type type, const std::string& sql) const; + private: - using list_iterator = std::list::iterator; size_t max_size_{}; std::list usage_list_; // LRU: front = most recent, back = least recent - std::unordered_map> cache_map_; + std::unordered_map cache_map_; std::mutex mutex_; connection_pool &pool_; const sql::dialect &dialect_; + + std::vector observers_; }; } #endif //STATEMENT_CACHE_HPP diff --git a/source/orm/sql/connection_pool.cpp b/source/orm/sql/connection_pool.cpp index 352304b..c991d22 100644 --- a/source/orm/sql/connection_pool.cpp +++ b/source/orm/sql/connection_pool.cpp @@ -71,9 +71,14 @@ connection_pool::connection_pool(const std::string& dns, size_t count) connection_ptr connection_pool::acquire() { std::unique_lock lock(mutex_); - while (idle_connections_.empty()) { - cv.wait(lock); + if (!cv.wait_for(lock, + std::chrono::seconds(30), + [this] { return !idle_connections_.empty(); })) { + return {nullptr, this}; } + // while (idle_connections_.empty()) { + // cv.wait(lock); + // } return get_next_connection(); } @@ -89,23 +94,40 @@ connection_ptr connection_pool::try_acquire() { connection_ptr connection_pool::acquire(const size_t id) { using namespace std::chrono_literals; - pointer next_connection{nullptr}; - auto try_count{0}; std::unique_lock lock(mutex_); - do { - if (auto it = idle_connections_.find(id); it != idle_connections_.end()) { - next_connection = it->second; - auto node = idle_connections_.extract(it); - inuse_connections_.insert(std::move(node)); - } else { - lock.unlock(); - std::this_thread::sleep_for(100ms); - lock.lock(); - } - } while(try_count++ < 5); + if (!cv.wait_for(lock, + 5s, + [this, id] { + return idle_connections_.find(id) != idle_connections_.end(); + })) { + return {nullptr, this}; + } + auto it = idle_connections_.find(id); + auto next_connection = it->second; + auto node = idle_connections_.extract(it); + inuse_connections_.insert(std::move(node)); return {next_connection, this}; + + // using namespace std::chrono_literals; + // pointer next_connection{nullptr}; + // auto try_count{0}; + // std::unique_lock lock(mutex_); + // + // do { + // if (auto it = idle_connections_.find(id); it != idle_connections_.end()) { + // next_connection = it->second; + // auto node = idle_connections_.extract(it); + // inuse_connections_.insert(std::move(node)); + // } else { + // lock.unlock(); + // std::this_thread::sleep_for(100ms); + // lock.lock(); + // } + // } while(try_count++ < 5); + // + // return {next_connection, this}; } void connection_pool::release(identifiable_connection* c) { @@ -116,6 +138,7 @@ void connection_pool::release(identifiable_connection* c) { if (const auto it = inuse_connections_.find(c->id); it != inuse_connections_.end()) { auto node = inuse_connections_.extract(it); idle_connections_.insert(std::move(node)); + cv.notify_one(); } } diff --git a/source/orm/sql/interface/statement_proxy.cpp b/source/orm/sql/interface/statement_proxy.cpp index 0c5654e..ce49980 100644 --- a/source/orm/sql/interface/statement_proxy.cpp +++ b/source/orm/sql/interface/statement_proxy.cpp @@ -15,6 +15,10 @@ void statement_proxy::reset() const { statement_->reset(); } +std::string statement_proxy::sql() const { + return statement_->query_.sql; +} + std::unique_ptr statement_proxy::create_binder() const { return statement_->create_binder(); } diff --git a/source/orm/sql/statement.cpp b/source/orm/sql/statement.cpp index b9b8b15..13595e2 100644 --- a/source/orm/sql/statement.cpp +++ b/source/orm/sql/statement.cpp @@ -95,4 +95,7 @@ void statement::reset() const statement_proxy_->reset(); } -} \ No newline at end of file +std::string statement::sql() const { + return statement_proxy_->sql(); +} +} diff --git a/source/orm/sql/statement_cache.cpp b/source/orm/sql/statement_cache.cpp index 3490491..4efe14d 100644 --- a/source/orm/sql/statement_cache.cpp +++ b/source/orm/sql/statement_cache.cpp @@ -4,35 +4,65 @@ #include "matador/sql/connection_pool.hpp" #include +#include namespace matador::sql { namespace internal { class statement_cache_proxy final : public statement_proxy { public: - explicit statement_cache_proxy(std::unique_ptr&& stmt) - : statement_proxy(std::move(stmt)) {} + struct retry_config { + size_t max_attempts{10}; + std::chrono::milliseconds initial_wait{10}; + std::chrono::milliseconds max_wait{250}; + }; + + explicit statement_cache_proxy(std::unique_ptr&& stmt, connection_pool &pool, const size_t connection_id) + : statement_proxy(std::move(stmt)) + , pool_(pool) + , connection_id_(connection_id) {} utils::result execute(interface::parameter_binder& bindings) override { - if (!try_lock()) { - return utils::failure(utils::error{ - error_code::STATEMENT_LOCKED, - "Failed to execute statement because it is already in use" - }); - } + auto result = try_with_retry([this, &bindings]() -> utils::result { + if (!try_lock()) { + return utils::failure(utils::error{ + error_code::STATEMENT_LOCKED, + "Failed to execute statement because it is already in use" + }); + } - auto guard = statement_guard(*this); - return statement_->execute(bindings); + auto guard = statement_guard(*this); + if (auto conn = pool_.acquire(connection_id_); !conn.valid()) { + return utils::failure(utils::error{ + error_code::EXECUTE_FAILED, + "Failed to execute statement because couldn't lock connection" + }); + } + + return statement_->execute(bindings); + }); + + return result; } - utils::result, utils::error> fetch(interface::parameter_binder& bindings) override { - if (!try_lock()) { - return utils::failure(utils::error{ - error_code::STATEMENT_LOCKED, - "Failed to execute statement because it is already in use" - }); - } - auto guard = statement_guard(*this); - return statement_->fetch(bindings); + utils::result, utils::error> fetch(interface::parameter_binder& bindings) override { + auto result = try_with_retry([this, &bindings]() -> utils::result, utils::error> { + if (!try_lock()) { + return utils::failure(utils::error{ + error_code::STATEMENT_LOCKED, + "Failed to execute statement because it is already in use" + }); + } + auto guard = statement_guard(*this); + if (const auto conn = pool_.acquire(connection_id_); !conn.valid()) { + return utils::failure(utils::error{ + error_code::EXECUTE_FAILED, + "Failed to execute statement because couldn't lock connection" + }); + } + return statement_->fetch(bindings); + }); + + return result; } protected: @@ -41,6 +71,28 @@ protected: return locked_.compare_exchange_strong(expected, true); } + template + [[nodiscard]] auto try_with_retry(Func &&func) -> decltype(func()) { + auto current_wait = config_.initial_wait; + + for (size_t attempt = 0; attempt < config_.max_attempts; ++attempt) { + if (auto result = func(); result.is_ok() || + result.err().ec() != error_code::STATEMENT_LOCKED) { + return result; + } + + if (attempt + 1 < config_.max_attempts) { + std::this_thread::sleep_for(current_wait); + current_wait = std::min(current_wait * 2, config_.max_wait); + } + } + + return utils::failure(utils::error{ + error_code::STATEMENT_LOCKED, + "Failed to execute statement because it is already in use" + }); + } + void unlock() { locked_.store(false); } @@ -56,6 +108,9 @@ private: private: std::atomic_bool locked_{false}; + connection_pool &pool_; + size_t connection_id_{}; + retry_config config_{}; }; } @@ -68,35 +123,46 @@ utils::result statement_cache::acquire(const query_cont std::unique_lock lock(mutex_); // hash statement const auto key = std::hash{}(ctx.sql); + const auto now = std::chrono::steady_clock::now(); // Found in cache. Move it to of the LRU list if (const auto it = cache_map_.find(key); it != cache_map_.end()) { - usage_list_.splice(usage_list_.begin(), usage_list_, it->second.second); - return utils::ok(it->second.first); + usage_list_.splice(usage_list_.begin(), usage_list_, it->second.position); + it->second.last_access = now; + push(statement_cache_event::Type::Accessed, ctx.sql); + return utils::ok(it->second.stmt); } // Prepare a new statement // acquire pool connection - const auto conn = pool_.acquire(); - auto result = conn->perform_prepare(ctx); - if (!result) { - return utils::failure(utils::error{error_code::PREPARE_FAILED, std::string("Failed to prepare")}); + size_t id{}; + std::unique_ptr stmt; + { + const auto conn = pool_.acquire(); + auto result = conn->perform_prepare(ctx); + if (!result) { + return utils::failure(utils::error{error_code::PREPARE_FAILED, std::string("Failed to prepare")}); + } + id = conn.id().value(); + stmt = result.release(); } - // If cache max size reached ensure space if (cache_map_.size() >= max_size_) { const auto& key_to_remove = usage_list_.back(); cache_map_.erase(key_to_remove); usage_list_.pop_back(); + push(statement_cache_event::Type::Evicted, ctx.sql); } usage_list_.push_front(key); const auto it = cache_map_.insert({ key, - std::make_pair(statement{ - std::make_shared(result.release())}, - usage_list_.begin()) + {statement{ + std::make_shared(std::move(stmt), pool_, id)}, + std::chrono::steady_clock::now(), + usage_list_.begin()} }).first; + push(statement_cache_event::Type::Added, ctx.sql); - return utils::ok(it->second.first); + return utils::ok(it->second.stmt); } size_t statement_cache::size() const { @@ -109,4 +175,19 @@ size_t statement_cache::capacity() const { bool statement_cache::empty() const { return cache_map_.empty(); } + +void statement_cache::subscribe(statement_cache_observer_interface &observer) { + observers_.push_back(&observer); +} + +void statement_cache::push(statement_cache_event::Type type, const std::string &sql) const { + using Clock = std::chrono::steady_clock; + const auto ts = Clock::now(); + const statement_cache_event evt{type, sql, ts}; + for (auto& obs : observers_) { + if (obs) { + obs->on_event(evt); + } + } +} } diff --git a/test/core/logger/LoggerTest.cpp b/test/core/logger/LoggerTest.cpp index 0f6bcb8..a1d9f4d 100644 --- a/test/core/logger/LoggerTest.cpp +++ b/test/core/logger/LoggerTest.cpp @@ -66,7 +66,7 @@ TEST_CASE("Test log file sink", "[logger][log][file_sink]") { strerror_s(buf, 1024, errno); FAIL(buf); #else - UNIT_FAIL(strerror(errno)); + FAIL(strerror(errno)); #endif } @@ -89,7 +89,7 @@ TEST_CASE("Test log file sink", "[logger][log][file_sink]") { strerror_s(buf, 1024, errno); FAIL(buf); #else - UNIT_FAIL(strerror(errno)); + FAIL(strerror(errno)); #endif } diff --git a/test/orm/backend/test_statement.cpp b/test/orm/backend/test_statement.cpp index d13f4d3..645a3f9 100644 --- a/test/orm/backend/test_statement.cpp +++ b/test/orm/backend/test_statement.cpp @@ -2,11 +2,18 @@ #include "test_result_reader.hpp" #include "test_parameter_binder.hpp" +#include +#include + namespace matador::test::orm { test_statement::test_statement(const sql::query_context &query) : statement_impl(query) {} utils::result test_statement::execute(const sql::interface::parameter_binder &/*bindings*/) { + using namespace std::chrono_literals; + std::mt19937 rng(query_.sql.size()); + std::uniform_int_distribution dist(10, 40); + std::this_thread::sleep_for(std::chrono::milliseconds(dist(rng))); return utils::ok(static_cast(8)); } diff --git a/test/orm/sql/StatementCacheTest.cpp b/test/orm/sql/StatementCacheTest.cpp index 1470791..e6cbdac 100644 --- a/test/orm/sql/StatementCacheTest.cpp +++ b/test/orm/sql/StatementCacheTest.cpp @@ -1,18 +1,44 @@ +#include #include #include #include "matador/sql/connection_pool.hpp" +#include "matador/sql/error_code.hpp" #include "matador/sql/statement_cache.hpp" #include "../backend/test_backend_service.hpp" #include "ConnectionPoolFixture.hpp" +#include +#include +#include + using namespace matador::test; using namespace matador::sql; using namespace matador::query; +class RecordingObserver final : public statement_cache_observer_interface { +public: + void on_event(const statement_cache_event& evt) override { + std::lock_guard lock(mutex); + events.push(evt); + } + + std::optional poll() { + std::lock_guard lock(mutex); + if (events.empty()) return std::nullopt; + auto evt = events.front(); + events.pop(); + return evt; + } + +private: + std::mutex mutex; + std::queue events; +}; + TEST_CASE("Test statement cache", "[statement][cache]") { backend_provider::instance().register_backend("noop", std::make_unique()); @@ -51,4 +77,166 @@ TEST_CASE("Test statement cache", "[statement][cache]") { REQUIRE(cache.size() == 2); REQUIRE(!cache.empty()); REQUIRE(cache.capacity() == 2); +} + +TEST_CASE("Test LRU cache evicts oldest entries", "[statement][cache][evict]") { + backend_provider::instance().register_backend("noop", std::make_unique()); + + connection_pool pool("noop://noop.db", 4); + statement_cache cache(pool, 2); + RecordingObserver observer; + cache.subscribe(observer); + + REQUIRE(cache.capacity() == 2); + REQUIRE(cache.empty()); + + auto result = cache.acquire({"SELECT * FROM person"}); + REQUIRE(result); + auto stmt1 = result.value(); + result = cache.acquire({"SELECT title FROM book"}); + REQUIRE(result); + auto stmt2 = result.value(); + result = cache.acquire({"SELECT name FROM author"}); // Should evict first statement + REQUIRE(result); + auto stmt3 = result.value(); + + // Trigger re-prepare of evicted statement + result = cache.acquire({"SELECT 1"}); + REQUIRE(result); + auto stmt4 = result.value(); + + REQUIRE(stmt1.sql() == "SELECT * FROM person"); + REQUIRE(stmt2.sql() == "SELECT title FROM book"); + REQUIRE(stmt3.sql() == "SELECT name FROM author"); + REQUIRE(stmt4.sql() == "SELECT 1"); + + REQUIRE(cache.size() == 2); + REQUIRE(!cache.empty()); + REQUIRE(cache.capacity() == 2); + + int added = 0, evicted = 0; + while (auto e = observer.poll()) { + if (e->type == statement_cache_event::Type::Added) added++; + if (e->type == statement_cache_event::Type::Evicted) evicted++; + } + REQUIRE(added >= 3); + REQUIRE(evicted >= 1); +} + +TEST_CASE("Test statement reuse avoids reprepare", "[statement][cache][prepare]") { + backend_provider::instance().register_backend("noop", std::make_unique()); + + connection_pool pool("noop://noop.db", 4); + statement_cache cache(pool, 2); + RecordingObserver observer; + cache.subscribe(observer); + + REQUIRE(cache.capacity() == 2); + REQUIRE(cache.empty()); + + auto result = cache.acquire({"SELECT * FROM person"}); + REQUIRE(result); + auto stmt1 = result.value(); + result = cache.acquire({"SELECT * FROM person"}); + REQUIRE(result); + auto stmt2 = result.value(); + + +} + +TEST_CASE("Multithreaded stress test", "[statement][cache][stress]") { + backend_provider::instance().register_backend("noop", std::make_unique()); + + constexpr int thread_count = 16; + constexpr int iterations = 1000; + constexpr int sql_pool_size = 10; + + std::vector sqls; + for (int i = 0; i < sql_pool_size; ++i) { + sqls.push_back("SELECT " + std::to_string(i)); + } + + connection_pool pool("noop://noop.db", 4); + statement_cache cache(pool, 5); + RecordingObserver observer; + cache.subscribe(observer); + + auto start_time = std::chrono::steady_clock::now(); + + std::atomic_int lock_failed_count{0}; + std::atomic_int exec_failed_count{0}; + + auto worker = [&](const int tid) { + std::mt19937 rng(tid); + std::uniform_int_distribution dist(0, sql_pool_size - 1); + + for (int i = 0; i < iterations; ++i) { + const auto& sql = sqls[dist(rng)]; + if (const auto result = cache.acquire({sql}); !result) { + FAIL("Failed to acquire statement"); + } else { + if (const auto exec_result = result->execute(); !exec_result) { + if (exec_result.err().ec() == error_code::STATEMENT_LOCKED) { + ++lock_failed_count; + } else { + ++exec_failed_count; + } + } + } + } + }; + + std::vector threads; + for (int i = 0; i < thread_count; ++i) { + threads.emplace_back(worker, i); + } + + for (auto& t : threads) { + t.join(); + } + + auto end_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(end_time - start_time); + + std::cout << "[Performance] Executed " << (thread_count * iterations) << " statements in " << duration.count() << " ms (lock failed: " << lock_failed_count << ", execute failed: " << exec_failed_count << ")\n"; + + // Some events should be generated + int accessed = 0; + while (auto e = observer.poll()) { + if (e->type == statement_cache_event::Type::Accessed) accessed++; + } + REQUIRE(accessed > 0); +} + +TEST_CASE("Race condition simulation with mixed access", "[statement_cache][race]") { + backend_provider::instance().register_backend("noop", std::make_unique()); + + connection_pool pool("noop://noop.db", 4); + statement_cache cache(pool, 5); + + constexpr int threads = 8; + constexpr int operations = 500; + + auto task = [&](int id) { + for (int i = 0; i < operations; ++i) { + auto sql = "SELECT " + std::to_string(i % 10); + auto result = cache.acquire({sql}); + REQUIRE(result); + + // if (i % 50 == 0) { + // cache.cleanup_expired_connections(); + // } + } + }; + + std::vector jobs; + for (int i = 0; i < threads; ++i) { + jobs.emplace_back(task, i); + } + + for (auto& t : jobs) { + t.join(); + } + + SUCCEED("Race simulation completed successfully without crash"); } \ No newline at end of file