Compare commits

...

2 Commits

Author SHA1 Message Date
sascha 5e6ba3666e added benchmark test suite 2026-05-07 14:29:35 +02:00
sascha 3d4a61749b renamed template parameters 2026-05-07 14:28:22 +02:00
10 changed files with 281 additions and 195 deletions

View File

@ -25,10 +25,10 @@ struct cache_entry_base {
[[nodiscard]] virtual bool is_dead() const noexcept = 0;
};
template<typename T>
template<typename Type>
struct cache_entry : cache_entry_base {
std::weak_ptr<object_proxy<T> > proxy;
std::weak_ptr<T> entity;
std::weak_ptr<object_proxy<Type> > proxy;
std::weak_ptr<Type> entity;
[[nodiscard]] bool is_dead() const noexcept override {
return proxy.expired() && entity.expired();
@ -63,8 +63,8 @@ public:
* Andernfalls wird ein neuer Proxy erzeugt, im Cache abgelegt und zurückgegeben.
* Falls für (T, id) bereits eine Entity vorhanden ist, wird sie an den Proxy gebunden.
*
* @tparam T Entity-Typ.
* @tparam ResolverPtr Zeiger-Typ auf einen Resolver, typischerweise
* @tparam Type Entity-Typ.
* @tparam ResolverPointerType Zeiger-Typ auf einen Resolver, typischerweise
* @c std::shared_ptr<object_resolver<T>> oder @c std::weak_ptr<object_resolver<T>>.
* @param id Identifier/Primärschlüssel der Entity.
* @param resolver_ptr Resolver (shared/weak), der zur Lazy-Auflösung durch den Proxy genutzt wird.
@ -72,23 +72,23 @@ public:
*
* @note Diese Methode ist threadsafe.
*/
template<typename T, typename ResolverPtr>
std::shared_ptr<object_proxy<T>> acquire_proxy(utils::identifier id, ResolverPtr &&resolver_ptr) {
const auto k = make_key<T>(id);
template<typename Type, typename ResolverPointerType>
std::shared_ptr<object_proxy<Type>> acquire_proxy(utils::identifier id, ResolverPointerType &&resolver_ptr) {
const auto k = make_key<Type>(id);
std::unique_lock lock(mutex_);
auto it = map_.find(k);
if (it != map_.end()) {
// found entry, return std::shared_ptr of proxy
auto *entry = entry_cast_<T>(it->second.get());
auto *entry = entry_cast_<Type>(it->second.get());
if (auto proxy_ptr = entry->proxy.lock()) {
return proxy_ptr;
}
// if the proxy is dead, but the entity is alive, create a new proxy and attach entity
auto weak_resolver = to_weak<T>(std::forward<ResolverPtr>(resolver_ptr));
auto proxy_ptr = std::make_shared<object_proxy<T>>(weak_resolver, id);
auto weak_resolver = to_weak<Type>(std::forward<ResolverPointerType>(resolver_ptr));
auto proxy_ptr = std::make_shared<object_proxy<Type>>(weak_resolver, id);
if (auto obj = entry->entity.lock()) {
proxy_ptr->attach(std::move(obj));
@ -101,12 +101,12 @@ public:
}
// entry couldn't be found, create a new entry
auto entry_ptr = std::make_unique<cache_entry<T>>();
auto entry_ptr = std::make_unique<cache_entry<Type>>();
auto *entry = entry_ptr.get();
// create a weak resolver and shared proxy
auto weak_resolver = to_weak<T>(std::forward<ResolverPtr>(resolver_ptr));
auto proxy_ptr = std::make_shared<object_proxy<T>>(weak_resolver, id);
auto weak_resolver = to_weak<Type>(std::forward<ResolverPointerType>(resolver_ptr));
auto proxy_ptr = std::make_shared<object_proxy<Type>>(weak_resolver, id);
// lock entity and attach to proxy
if (auto obj = entry->entity.lock()) {
@ -286,12 +286,10 @@ private:
}
template<typename T, typename U>
using enable_if_resolver_shared_ptr_t =
std::enable_if_t<std::is_base_of_v<object_resolver<T>, U>, int>;
using enable_if_resolver_shared_ptr_t = std::enable_if_t<std::is_base_of_v<object_resolver<T>, U>, int>;
template<typename T, typename U>
using enable_if_resolver_weak_ptr_t =
std::enable_if_t<std::is_base_of_v<object_resolver<T>, U>, int>;
using enable_if_resolver_weak_ptr_t = std::enable_if_t<std::is_base_of_v<object_resolver<T>, U>, int>;
// shared_ptr<Derived> -> weak_ptr<Base>
template<typename T, typename U, enable_if_resolver_shared_ptr_t<T, U> = 0>
@ -306,8 +304,8 @@ private:
}
// (2) Opportunistischer Cleanup: entferne Entry, wenn beide weak_ptr abgelaufen sind.
template<typename It>
bool prune_if_dead(It &it) {
template<typename IteratorType>
bool prune_if_dead(IteratorType &it) {
if (it == map_.end()) {
return false;
}
@ -319,9 +317,9 @@ private:
return false;
}
template<typename T>
template<typename Type>
void prune_if_dead_typed(typename std::unordered_map<key, std::unique_ptr<cache_entry_base>, key_hash>::iterator &it) {
(void)T{}; // T bleibt im Interface, damit bestehende Call-Sites unverändert bleiben können.
(void)Type{}; // T bleibt im Interface, damit bestehende Call-Sites unverändert bleiben können.
prune_if_dead(it);
}

View File

@ -1,4 +1,5 @@
enable_testing()
add_subdirectory(core)
add_subdirectory(orm)
add_subdirectory(orm)
add_subdirectory(benchmark)

View File

@ -0,0 +1,35 @@
CPMAddPackage("gh:catchorg/Catch2@3.14.0")
list(APPEND CMAKE_MODULE_PATH ${catch2_SOURCE_DIR}/extras)
add_executable(Benchmarks
sql/StatementCacheBenchmark.cpp
../orm/backend/test_backend_service.cpp
../orm/backend/test_backend_service.hpp
../orm/backend/test_connection.cpp
../orm/backend/test_connection.hpp
../orm/backend/test_parameter_binder.cpp
../orm/backend/test_parameter_binder.hpp
../orm/backend/test_result_reader.cpp
../orm/backend/test_result_reader.hpp
../orm/backend/test_statement.cpp
../orm/backend/test_statement.hpp
../utils/MetricsObserver.hpp
../utils/MetricsObserver.cpp
../utils/RecordingObserver.hpp
../utils/RecordingObserver.cpp
)
target_link_libraries(Benchmarks matador-orm matador-core Catch2::Catch2WithMain)
target_include_directories(Benchmarks
PUBLIC
PRIVATE $<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}>/test
)
#if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
# target_compile_options(CoreTests PRIVATE -coverage)
# target_link_options(CoreTests PRIVATE -coverage)
#endif ()
add_test(NAME Benchmarks COMMAND Benchmarks)

View File

@ -0,0 +1,84 @@
#include <catch2/catch_test_macros.hpp>
#include "matador/sql/connection_pool.hpp"
#include "matador/sql/interface/connection_impl.hpp"
#include "../test/orm/backend/test_backend_service.hpp"
#include "utils/MetricsObserver.hpp"
#include "utils/RecordingObserver.hpp"
#include <random>
using namespace matador::test;
using namespace matador::sql;
using namespace matador::utils;
TEST_CASE("Multithreaded stress test", "[statement][cache][stress]") {
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
constexpr int thread_count = 16;
constexpr int iterations = 1000;
constexpr int sql_pool_size = 10;
std::vector<query_context> contexts;
for (int i = 0; i < sql_pool_size; ++i) {
const auto sql = "SELECT " + std::to_string(i);
contexts.push_back({sql, std::hash<std::string>{}(sql)});
}
connection_pool pool("noop://noop.db", 4);
message_bus bus;
statement_cache cache(bus, pool, 5);
RecordingObserver observer(bus);
MetricsObserver metrics(bus);
auto start_time = std::chrono::steady_clock::now();
std::atomic_int lock_failed_count{0};
std::atomic_int exec_failed_count{0};
auto worker = [&](const int tid) {
std::mt19937 rng(tid);
std::uniform_int_distribution dist(0, sql_pool_size - 1);
for (int i = 0; i < iterations; ++i) {
const auto& ctx = contexts[dist(rng)];
if (const auto result = cache.acquire(ctx); !result) {
FAIL("Failed to acquire statement");
} else {
if (const auto exec_result = result->execute(); !exec_result) {
if (exec_result.err().ec() == error_code::StatementLocked) {
++lock_failed_count;
} else {
++exec_failed_count;
}
}
}
}
};
std::vector<std::thread> threads;
for (int i = 0; i < thread_count; ++i) {
threads.emplace_back(worker, i);
}
for (auto& t : threads) {
t.join();
}
const auto end_time = std::chrono::steady_clock::now();
const auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
std::cout << "Executed " << (thread_count * iterations) << " statements in " << duration.count() << " ms (lock failed: " << lock_failed_count << ", execute failed: " << exec_failed_count << ")\n";
std::cout << "Average lock wait time: " << metrics.get_average_lock_wait_time().count() << "ms\n";
std::cout << "Total lock wait time: " << metrics.get_total_lock_wait_time().count() << "ms\n";
std::cout << "Average execution time: " << metrics.get_average_execution_time().count() << "ms\n";
std::cout << "Total execution time: " << metrics.get_total_execution_time().count() << "ms\n";
std::cout << "Number of lock failures: " << metrics.get_lock_failure_count() << "\n";
// Some events should be generated
int accessed = 0;
while (auto e = observer.poll()) {
if (e->is<statement_accessed_event>()) accessed++;
}
REQUIRE(accessed > 0);
}

View File

@ -32,10 +32,17 @@ add_executable(OrmTests
sql/StatementCacheTest.cpp
utils/auto_reset_event.cpp
utils/auto_reset_event.hpp
../utils/RecordingObserver.hpp
../utils/RecordingObserver.cpp
)
target_link_libraries(OrmTests matador-orm matador-core Catch2::Catch2WithMain)
target_include_directories(OrmTests
PRIVATE $<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}>/test
PRIVATE ${CMAKE_CURRENT_BINARY_DIR}
)
#target_compile_options(OrmTests PRIVATE -coverage)
#target_link_options(OrmTests PRIVATE -coverage)

View File

@ -1,116 +1,17 @@
#include <atomic>
#include <catch2/catch_test_macros.hpp>
#include "matador/sql/connection_pool.hpp"
#include "matador/sql/error_code.hpp"
#include "matador/sql/statement_cache.hpp"
#include "matador/utils/message_bus.hpp"
#include "matador/sql/interface/connection_impl.hpp"
#include "../backend/test_backend_service.hpp"
#include "ConnectionPoolFixture.hpp"
#include <queue>
#include <random>
#include <thread>
#include "utils/RecordingObserver.hpp"
using namespace matador::test;
using namespace matador::sql;
using namespace matador::utils;
class MetricsObserver {
public:
explicit MetricsObserver(message_bus &bus) {
subscriptions.push_back(bus.subscribe<statement_lock_failed_event>([this](const statement_lock_failed_event &ev) {
std::lock_guard lock(mutex_);
lock_failure_count_++;
total_lock_wait_time_ += ev.duration;
}));
subscriptions.push_back(bus.subscribe<statement_execution_event>([this](const statement_execution_event &ev) {
std::lock_guard lock(mutex_);
execution_count_++;
total_execution_time_ += ev.duration;
}));
}
std::chrono::milliseconds get_average_lock_wait_time() const {
std::lock_guard lock(mutex_);
if (lock_failure_count_ == 0) {
return std::chrono::milliseconds{0};
}
const auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(total_lock_wait_time_);
return std::chrono::milliseconds(millis.count() / lock_failure_count_);
}
std::chrono::milliseconds get_average_execution_time() const {
std::lock_guard lock(mutex_);
if (execution_count_ == 0) {
return std::chrono::milliseconds{0};
}
const auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(total_execution_time_);
return std::chrono::milliseconds(millis.count() / execution_count_);
}
std::chrono::milliseconds get_total_lock_wait_time() const {
std::lock_guard lock(mutex_);
return std::chrono::duration_cast<std::chrono::milliseconds>(total_lock_wait_time_);
}
std::chrono::milliseconds get_total_execution_time() const {
std::lock_guard lock(mutex_);
return std::chrono::duration_cast<std::chrono::milliseconds>(total_execution_time_);
}
size_t get_lock_failure_count() const {
std::lock_guard lock(mutex_);
return lock_failure_count_;
}
private:
std::vector<subscription> subscriptions;
mutable std::mutex mutex_;
size_t lock_failure_count_{0};
size_t execution_count_{0};
std::chrono::nanoseconds total_lock_wait_time_{0};
std::chrono::nanoseconds total_execution_time_{0};
};
class RecordingObserver final {
public:
explicit RecordingObserver(message_bus &bus) {
subscriptions.push_back(bus.subscribe<statement_accessed_event>([this](const statement_accessed_event &ev) {
std::lock_guard lock(mutex);
events.push(message::from_ref(ev));
}));
subscriptions.push_back(bus.subscribe<statement_added_event>([this](const statement_added_event &ev) {
std::lock_guard lock(mutex);
events.push(message::from_ref(ev));
}));
subscriptions.push_back(bus.subscribe<statement_evicted_event>([this](const statement_evicted_event &ev) {
std::lock_guard lock(mutex);
events.push(message::from_ref(ev));
}));
}
std::optional<message> poll() {
std::lock_guard lock(mutex);
if (events.empty()) {
return std::nullopt;
}
auto evt = events.front();
events.pop();
return evt;
}
private:
std::vector<subscription> subscriptions;
std::mutex mutex;
std::queue<message> events;
};
TEST_CASE("Test statement cache", "[statement][cache]") {
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
@ -222,77 +123,6 @@ TEST_CASE("Test statement reuse avoids reprepare", "[statement][cache][prepare]"
auto stmt2 = result.value();
}
// TEST_CASE("Multithreaded stress test", "[statement][cache][stress]") {
// backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
//
// constexpr int thread_count = 16;
// constexpr int iterations = 1000;
// constexpr int sql_pool_size = 10;
//
// std::vector<std::string> sqls;
// for (int i = 0; i < sql_pool_size; ++i) {
// sqls.push_back("SELECT " + std::to_string(i));
// }
//
// connection_pool pool("noop://noop.db", 4);
// message_bus bus;
// statement_cache cache(bus, pool, 5);
// RecordingObserver observer(bus);
// MetricsObserver metrics(bus);
//
// auto start_time = std::chrono::steady_clock::now();
//
// std::atomic_int lock_failed_count{0};
// std::atomic_int exec_failed_count{0};
//
// auto worker = [&](const int tid) {
// std::mt19937 rng(tid);
// std::uniform_int_distribution dist(0, sql_pool_size - 1);
//
// for (int i = 0; i < iterations; ++i) {
// const auto& sql = sqls[dist(rng)];
// if (const auto result = cache.acquire({sql}); !result) {
// FAIL("Failed to acquire statement");
// } else {
// if (const auto exec_result = result->execute(); !exec_result) {
// if (exec_result.err().ec() == error_code::STATEMENT_LOCKED) {
// ++lock_failed_count;
// } else {
// ++exec_failed_count;
// }
// }
// }
// }
// };
//
// std::vector<std::thread> threads;
// for (int i = 0; i < thread_count; ++i) {
// threads.emplace_back(worker, i);
// }
//
// for (auto& t : threads) {
// t.join();
// }
//
// auto end_time = std::chrono::steady_clock::now();
// auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
//
// std::cout << "[Performance] Executed " << (thread_count * iterations) << " statements in " << duration.count() << " ms (lock failed: " << lock_failed_count << ", execute failed: " << exec_failed_count << ")\n";
//
// std::cout << "Average lock wait time: " << metrics.get_average_lock_wait_time().count() << "ms\n";
// std::cout << "Total lock wait time: " << metrics.get_total_lock_wait_time().count() << "ms\n";
// std::cout << "Average execution time: " << metrics.get_average_execution_time().count() << "ms\n";
// std::cout << "Total execution time: " << metrics.get_total_execution_time().count() << "ms\n";
// std::cout << "Number of lock failures: " << metrics.get_lock_failure_count() << "\n";
//
// // Some events should be generated
// int accessed = 0;
// while (auto e = observer.poll()) {
// if (e->is<statement_accessed_event>()) accessed++;
// }
// REQUIRE(accessed > 0);
// }
TEST_CASE("Race condition simulation with mixed access", "[statement_cache][race]") {
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());

View File

@ -0,0 +1,51 @@
#include "MetricsObserver.hpp"
namespace matador::test {
MetricsObserver::MetricsObserver(utils::message_bus& bus) {
subscriptions.push_back(bus.subscribe<sql::statement_lock_failed_event>([this](const sql::statement_lock_failed_event &ev) {
std::lock_guard lock(mutex_);
lock_failure_count_++;
total_lock_wait_time_ += ev.duration;
}));
subscriptions.push_back(bus.subscribe<sql::statement_execution_event>([this](const sql::statement_execution_event &ev) {
std::lock_guard lock(mutex_);
execution_count_++;
total_execution_time_ += ev.duration;
}));
}
std::chrono::milliseconds MetricsObserver::get_average_lock_wait_time() const {
std::lock_guard lock(mutex_);
if (lock_failure_count_ == 0) {
return std::chrono::milliseconds{0};
}
const auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(total_lock_wait_time_);
return std::chrono::milliseconds(millis.count() / lock_failure_count_);
}
std::chrono::milliseconds MetricsObserver::get_average_execution_time() const {
std::lock_guard lock(mutex_);
if (execution_count_ == 0) {
return std::chrono::milliseconds{0};
}
const auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(total_execution_time_);
return std::chrono::milliseconds(millis.count() / execution_count_);
}
std::chrono::milliseconds MetricsObserver::get_total_lock_wait_time() const {
std::lock_guard lock(mutex_);
return std::chrono::duration_cast<std::chrono::milliseconds>(total_lock_wait_time_);
}
std::chrono::milliseconds MetricsObserver::get_total_execution_time() const {
std::lock_guard lock(mutex_);
return std::chrono::duration_cast<std::chrono::milliseconds>(total_execution_time_);
}
size_t MetricsObserver::get_lock_failure_count() const {
std::lock_guard lock(mutex_);
return lock_failure_count_;
}
}

View File

@ -0,0 +1,28 @@
#ifndef MATADOR_METRICS_OBSERVER_HPP
#define MATADOR_METRICS_OBSERVER_HPP
#include "matador/utils/message_bus.hpp"
#include "matador/sql/statement_cache.hpp"
namespace matador::test {
class MetricsObserver {
public:
explicit MetricsObserver(utils::message_bus &bus);
std::chrono::milliseconds get_average_lock_wait_time() const;
std::chrono::milliseconds get_average_execution_time() const;
std::chrono::milliseconds get_total_lock_wait_time() const;
std::chrono::milliseconds get_total_execution_time() const;
size_t get_lock_failure_count() const;
private:
std::vector<utils::subscription> subscriptions;
mutable std::mutex mutex_;
size_t lock_failure_count_{0};
size_t execution_count_{0};
std::chrono::nanoseconds total_lock_wait_time_{0};
std::chrono::nanoseconds total_execution_time_{0};
};
}
#endif //MATADOR_METRICS_OBSERVER_HPP

View File

@ -0,0 +1,28 @@
#include "RecordingObserver.hpp"
namespace matador::test {
RecordingObserver::RecordingObserver(utils::message_bus& bus) {
subscriptions.push_back(bus.subscribe<sql::statement_accessed_event>([this](const sql::statement_accessed_event &ev) {
std::lock_guard lock(mutex);
events.push(utils::message::from_ref(ev));
}));
subscriptions.push_back(bus.subscribe<sql::statement_added_event>([this](const sql::statement_added_event &ev) {
std::lock_guard lock(mutex);
events.push(utils::message::from_ref(ev));
}));
subscriptions.push_back(bus.subscribe<sql::statement_evicted_event>([this](const sql::statement_evicted_event &ev) {
std::lock_guard lock(mutex);
events.push(utils::message::from_ref(ev));
}));
}
std::optional<utils::message> RecordingObserver::poll() {
std::lock_guard lock(mutex);
if (events.empty()) {
return std::nullopt;
}
auto evt = events.front();
events.pop();
return evt;
}
}

View File

@ -0,0 +1,24 @@
#ifndef MATADOR_RECORDING_OBSERVER_HPP
#define MATADOR_RECORDING_OBSERVER_HPP
#include "matador/utils/message_bus.hpp"
#include "matador/sql/statement_cache.hpp"
#include <queue>
namespace matador::test {
class RecordingObserver final {
public:
explicit RecordingObserver(utils::message_bus &bus);
std::optional<utils::message> poll();
private:
std::vector<utils::subscription> subscriptions;
std::mutex mutex;
std::queue<utils::message> events;
};
}
#endif //MATADOR_RECORDING_OBSERVER_HPP