added benchmark test suite
This commit is contained in:
parent
3d4a61749b
commit
5e6ba3666e
|
|
@ -1,4 +1,5 @@
|
||||||
enable_testing()
|
enable_testing()
|
||||||
|
|
||||||
add_subdirectory(core)
|
add_subdirectory(core)
|
||||||
add_subdirectory(orm)
|
add_subdirectory(orm)
|
||||||
|
add_subdirectory(benchmark)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,35 @@
|
||||||
|
CPMAddPackage("gh:catchorg/Catch2@3.14.0")
|
||||||
|
|
||||||
|
list(APPEND CMAKE_MODULE_PATH ${catch2_SOURCE_DIR}/extras)
|
||||||
|
|
||||||
|
add_executable(Benchmarks
|
||||||
|
sql/StatementCacheBenchmark.cpp
|
||||||
|
../orm/backend/test_backend_service.cpp
|
||||||
|
../orm/backend/test_backend_service.hpp
|
||||||
|
../orm/backend/test_connection.cpp
|
||||||
|
../orm/backend/test_connection.hpp
|
||||||
|
../orm/backend/test_parameter_binder.cpp
|
||||||
|
../orm/backend/test_parameter_binder.hpp
|
||||||
|
../orm/backend/test_result_reader.cpp
|
||||||
|
../orm/backend/test_result_reader.hpp
|
||||||
|
../orm/backend/test_statement.cpp
|
||||||
|
../orm/backend/test_statement.hpp
|
||||||
|
../utils/MetricsObserver.hpp
|
||||||
|
../utils/MetricsObserver.cpp
|
||||||
|
../utils/RecordingObserver.hpp
|
||||||
|
../utils/RecordingObserver.cpp
|
||||||
|
)
|
||||||
|
|
||||||
|
target_link_libraries(Benchmarks matador-orm matador-core Catch2::Catch2WithMain)
|
||||||
|
|
||||||
|
target_include_directories(Benchmarks
|
||||||
|
PUBLIC
|
||||||
|
PRIVATE $<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}>/test
|
||||||
|
)
|
||||||
|
|
||||||
|
#if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
|
||||||
|
# target_compile_options(CoreTests PRIVATE -coverage)
|
||||||
|
# target_link_options(CoreTests PRIVATE -coverage)
|
||||||
|
#endif ()
|
||||||
|
|
||||||
|
add_test(NAME Benchmarks COMMAND Benchmarks)
|
||||||
|
|
@ -0,0 +1,84 @@
|
||||||
|
#include <catch2/catch_test_macros.hpp>
|
||||||
|
|
||||||
|
#include "matador/sql/connection_pool.hpp"
|
||||||
|
#include "matador/sql/interface/connection_impl.hpp"
|
||||||
|
|
||||||
|
#include "../test/orm/backend/test_backend_service.hpp"
|
||||||
|
|
||||||
|
#include "utils/MetricsObserver.hpp"
|
||||||
|
#include "utils/RecordingObserver.hpp"
|
||||||
|
|
||||||
|
#include <random>
|
||||||
|
|
||||||
|
using namespace matador::test;
|
||||||
|
using namespace matador::sql;
|
||||||
|
using namespace matador::utils;
|
||||||
|
|
||||||
|
TEST_CASE("Multithreaded stress test", "[statement][cache][stress]") {
|
||||||
|
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
|
||||||
|
|
||||||
|
constexpr int thread_count = 16;
|
||||||
|
constexpr int iterations = 1000;
|
||||||
|
constexpr int sql_pool_size = 10;
|
||||||
|
|
||||||
|
std::vector<query_context> contexts;
|
||||||
|
for (int i = 0; i < sql_pool_size; ++i) {
|
||||||
|
const auto sql = "SELECT " + std::to_string(i);
|
||||||
|
contexts.push_back({sql, std::hash<std::string>{}(sql)});
|
||||||
|
}
|
||||||
|
|
||||||
|
connection_pool pool("noop://noop.db", 4);
|
||||||
|
message_bus bus;
|
||||||
|
statement_cache cache(bus, pool, 5);
|
||||||
|
RecordingObserver observer(bus);
|
||||||
|
MetricsObserver metrics(bus);
|
||||||
|
|
||||||
|
auto start_time = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
|
std::atomic_int lock_failed_count{0};
|
||||||
|
std::atomic_int exec_failed_count{0};
|
||||||
|
|
||||||
|
auto worker = [&](const int tid) {
|
||||||
|
std::mt19937 rng(tid);
|
||||||
|
std::uniform_int_distribution dist(0, sql_pool_size - 1);
|
||||||
|
|
||||||
|
for (int i = 0; i < iterations; ++i) {
|
||||||
|
const auto& ctx = contexts[dist(rng)];
|
||||||
|
if (const auto result = cache.acquire(ctx); !result) {
|
||||||
|
FAIL("Failed to acquire statement");
|
||||||
|
} else {
|
||||||
|
if (const auto exec_result = result->execute(); !exec_result) {
|
||||||
|
if (exec_result.err().ec() == error_code::StatementLocked) {
|
||||||
|
++lock_failed_count;
|
||||||
|
} else {
|
||||||
|
++exec_failed_count;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
std::vector<std::thread> threads;
|
||||||
|
for (int i = 0; i < thread_count; ++i) {
|
||||||
|
threads.emplace_back(worker, i);
|
||||||
|
}
|
||||||
|
for (auto& t : threads) {
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
|
const auto end_time = std::chrono::steady_clock::now();
|
||||||
|
const auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
|
||||||
|
|
||||||
|
std::cout << "Executed " << (thread_count * iterations) << " statements in " << duration.count() << " ms (lock failed: " << lock_failed_count << ", execute failed: " << exec_failed_count << ")\n";
|
||||||
|
std::cout << "Average lock wait time: " << metrics.get_average_lock_wait_time().count() << "ms\n";
|
||||||
|
std::cout << "Total lock wait time: " << metrics.get_total_lock_wait_time().count() << "ms\n";
|
||||||
|
std::cout << "Average execution time: " << metrics.get_average_execution_time().count() << "ms\n";
|
||||||
|
std::cout << "Total execution time: " << metrics.get_total_execution_time().count() << "ms\n";
|
||||||
|
std::cout << "Number of lock failures: " << metrics.get_lock_failure_count() << "\n";
|
||||||
|
|
||||||
|
// Some events should be generated
|
||||||
|
int accessed = 0;
|
||||||
|
while (auto e = observer.poll()) {
|
||||||
|
if (e->is<statement_accessed_event>()) accessed++;
|
||||||
|
}
|
||||||
|
REQUIRE(accessed > 0);
|
||||||
|
}
|
||||||
|
|
@ -32,10 +32,17 @@ add_executable(OrmTests
|
||||||
sql/StatementCacheTest.cpp
|
sql/StatementCacheTest.cpp
|
||||||
utils/auto_reset_event.cpp
|
utils/auto_reset_event.cpp
|
||||||
utils/auto_reset_event.hpp
|
utils/auto_reset_event.hpp
|
||||||
|
../utils/RecordingObserver.hpp
|
||||||
|
../utils/RecordingObserver.cpp
|
||||||
)
|
)
|
||||||
|
|
||||||
target_link_libraries(OrmTests matador-orm matador-core Catch2::Catch2WithMain)
|
target_link_libraries(OrmTests matador-orm matador-core Catch2::Catch2WithMain)
|
||||||
|
|
||||||
|
target_include_directories(OrmTests
|
||||||
|
PRIVATE $<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}>/test
|
||||||
|
PRIVATE ${CMAKE_CURRENT_BINARY_DIR}
|
||||||
|
)
|
||||||
|
|
||||||
#target_compile_options(OrmTests PRIVATE -coverage)
|
#target_compile_options(OrmTests PRIVATE -coverage)
|
||||||
#target_link_options(OrmTests PRIVATE -coverage)
|
#target_link_options(OrmTests PRIVATE -coverage)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,116 +1,17 @@
|
||||||
#include <atomic>
|
|
||||||
#include <catch2/catch_test_macros.hpp>
|
#include <catch2/catch_test_macros.hpp>
|
||||||
|
|
||||||
#include "matador/sql/connection_pool.hpp"
|
#include "matador/sql/connection_pool.hpp"
|
||||||
#include "matador/sql/error_code.hpp"
|
#include "matador/sql/error_code.hpp"
|
||||||
#include "matador/sql/statement_cache.hpp"
|
#include "matador/sql/interface/connection_impl.hpp"
|
||||||
|
|
||||||
#include "matador/utils/message_bus.hpp"
|
|
||||||
|
|
||||||
#include "../backend/test_backend_service.hpp"
|
#include "../backend/test_backend_service.hpp"
|
||||||
|
|
||||||
#include "ConnectionPoolFixture.hpp"
|
#include "utils/RecordingObserver.hpp"
|
||||||
|
|
||||||
#include <queue>
|
|
||||||
#include <random>
|
|
||||||
#include <thread>
|
|
||||||
|
|
||||||
using namespace matador::test;
|
using namespace matador::test;
|
||||||
using namespace matador::sql;
|
using namespace matador::sql;
|
||||||
using namespace matador::utils;
|
using namespace matador::utils;
|
||||||
|
|
||||||
class MetricsObserver {
|
|
||||||
public:
|
|
||||||
explicit MetricsObserver(message_bus &bus) {
|
|
||||||
subscriptions.push_back(bus.subscribe<statement_lock_failed_event>([this](const statement_lock_failed_event &ev) {
|
|
||||||
std::lock_guard lock(mutex_);
|
|
||||||
lock_failure_count_++;
|
|
||||||
total_lock_wait_time_ += ev.duration;
|
|
||||||
}));
|
|
||||||
subscriptions.push_back(bus.subscribe<statement_execution_event>([this](const statement_execution_event &ev) {
|
|
||||||
std::lock_guard lock(mutex_);
|
|
||||||
execution_count_++;
|
|
||||||
total_execution_time_ += ev.duration;
|
|
||||||
}));
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
std::chrono::milliseconds get_average_lock_wait_time() const {
|
|
||||||
std::lock_guard lock(mutex_);
|
|
||||||
if (lock_failure_count_ == 0) {
|
|
||||||
return std::chrono::milliseconds{0};
|
|
||||||
}
|
|
||||||
const auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(total_lock_wait_time_);
|
|
||||||
return std::chrono::milliseconds(millis.count() / lock_failure_count_);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::chrono::milliseconds get_average_execution_time() const {
|
|
||||||
std::lock_guard lock(mutex_);
|
|
||||||
if (execution_count_ == 0) {
|
|
||||||
return std::chrono::milliseconds{0};
|
|
||||||
}
|
|
||||||
|
|
||||||
const auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(total_execution_time_);
|
|
||||||
return std::chrono::milliseconds(millis.count() / execution_count_);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::chrono::milliseconds get_total_lock_wait_time() const {
|
|
||||||
std::lock_guard lock(mutex_);
|
|
||||||
return std::chrono::duration_cast<std::chrono::milliseconds>(total_lock_wait_time_);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::chrono::milliseconds get_total_execution_time() const {
|
|
||||||
std::lock_guard lock(mutex_);
|
|
||||||
return std::chrono::duration_cast<std::chrono::milliseconds>(total_execution_time_);
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t get_lock_failure_count() const {
|
|
||||||
std::lock_guard lock(mutex_);
|
|
||||||
return lock_failure_count_;
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
std::vector<subscription> subscriptions;
|
|
||||||
mutable std::mutex mutex_;
|
|
||||||
size_t lock_failure_count_{0};
|
|
||||||
size_t execution_count_{0};
|
|
||||||
std::chrono::nanoseconds total_lock_wait_time_{0};
|
|
||||||
std::chrono::nanoseconds total_execution_time_{0};
|
|
||||||
};
|
|
||||||
|
|
||||||
class RecordingObserver final {
|
|
||||||
public:
|
|
||||||
explicit RecordingObserver(message_bus &bus) {
|
|
||||||
subscriptions.push_back(bus.subscribe<statement_accessed_event>([this](const statement_accessed_event &ev) {
|
|
||||||
std::lock_guard lock(mutex);
|
|
||||||
events.push(message::from_ref(ev));
|
|
||||||
}));
|
|
||||||
subscriptions.push_back(bus.subscribe<statement_added_event>([this](const statement_added_event &ev) {
|
|
||||||
std::lock_guard lock(mutex);
|
|
||||||
events.push(message::from_ref(ev));
|
|
||||||
}));
|
|
||||||
subscriptions.push_back(bus.subscribe<statement_evicted_event>([this](const statement_evicted_event &ev) {
|
|
||||||
std::lock_guard lock(mutex);
|
|
||||||
events.push(message::from_ref(ev));
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
std::optional<message> poll() {
|
|
||||||
std::lock_guard lock(mutex);
|
|
||||||
if (events.empty()) {
|
|
||||||
return std::nullopt;
|
|
||||||
}
|
|
||||||
auto evt = events.front();
|
|
||||||
events.pop();
|
|
||||||
return evt;
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
std::vector<subscription> subscriptions;
|
|
||||||
std::mutex mutex;
|
|
||||||
std::queue<message> events;
|
|
||||||
};
|
|
||||||
|
|
||||||
TEST_CASE("Test statement cache", "[statement][cache]") {
|
TEST_CASE("Test statement cache", "[statement][cache]") {
|
||||||
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
|
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
|
||||||
|
|
||||||
|
|
@ -222,77 +123,6 @@ TEST_CASE("Test statement reuse avoids reprepare", "[statement][cache][prepare]"
|
||||||
auto stmt2 = result.value();
|
auto stmt2 = result.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
// TEST_CASE("Multithreaded stress test", "[statement][cache][stress]") {
|
|
||||||
// backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
|
|
||||||
//
|
|
||||||
// constexpr int thread_count = 16;
|
|
||||||
// constexpr int iterations = 1000;
|
|
||||||
// constexpr int sql_pool_size = 10;
|
|
||||||
//
|
|
||||||
// std::vector<std::string> sqls;
|
|
||||||
// for (int i = 0; i < sql_pool_size; ++i) {
|
|
||||||
// sqls.push_back("SELECT " + std::to_string(i));
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// connection_pool pool("noop://noop.db", 4);
|
|
||||||
// message_bus bus;
|
|
||||||
// statement_cache cache(bus, pool, 5);
|
|
||||||
// RecordingObserver observer(bus);
|
|
||||||
// MetricsObserver metrics(bus);
|
|
||||||
//
|
|
||||||
// auto start_time = std::chrono::steady_clock::now();
|
|
||||||
//
|
|
||||||
// std::atomic_int lock_failed_count{0};
|
|
||||||
// std::atomic_int exec_failed_count{0};
|
|
||||||
//
|
|
||||||
// auto worker = [&](const int tid) {
|
|
||||||
// std::mt19937 rng(tid);
|
|
||||||
// std::uniform_int_distribution dist(0, sql_pool_size - 1);
|
|
||||||
//
|
|
||||||
// for (int i = 0; i < iterations; ++i) {
|
|
||||||
// const auto& sql = sqls[dist(rng)];
|
|
||||||
// if (const auto result = cache.acquire({sql}); !result) {
|
|
||||||
// FAIL("Failed to acquire statement");
|
|
||||||
// } else {
|
|
||||||
// if (const auto exec_result = result->execute(); !exec_result) {
|
|
||||||
// if (exec_result.err().ec() == error_code::STATEMENT_LOCKED) {
|
|
||||||
// ++lock_failed_count;
|
|
||||||
// } else {
|
|
||||||
// ++exec_failed_count;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// };
|
|
||||||
//
|
|
||||||
// std::vector<std::thread> threads;
|
|
||||||
// for (int i = 0; i < thread_count; ++i) {
|
|
||||||
// threads.emplace_back(worker, i);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// for (auto& t : threads) {
|
|
||||||
// t.join();
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// auto end_time = std::chrono::steady_clock::now();
|
|
||||||
// auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
|
|
||||||
//
|
|
||||||
// std::cout << "[Performance] Executed " << (thread_count * iterations) << " statements in " << duration.count() << " ms (lock failed: " << lock_failed_count << ", execute failed: " << exec_failed_count << ")\n";
|
|
||||||
//
|
|
||||||
// std::cout << "Average lock wait time: " << metrics.get_average_lock_wait_time().count() << "ms\n";
|
|
||||||
// std::cout << "Total lock wait time: " << metrics.get_total_lock_wait_time().count() << "ms\n";
|
|
||||||
// std::cout << "Average execution time: " << metrics.get_average_execution_time().count() << "ms\n";
|
|
||||||
// std::cout << "Total execution time: " << metrics.get_total_execution_time().count() << "ms\n";
|
|
||||||
// std::cout << "Number of lock failures: " << metrics.get_lock_failure_count() << "\n";
|
|
||||||
//
|
|
||||||
// // Some events should be generated
|
|
||||||
// int accessed = 0;
|
|
||||||
// while (auto e = observer.poll()) {
|
|
||||||
// if (e->is<statement_accessed_event>()) accessed++;
|
|
||||||
// }
|
|
||||||
// REQUIRE(accessed > 0);
|
|
||||||
// }
|
|
||||||
|
|
||||||
TEST_CASE("Race condition simulation with mixed access", "[statement_cache][race]") {
|
TEST_CASE("Race condition simulation with mixed access", "[statement_cache][race]") {
|
||||||
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
|
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,51 @@
|
||||||
|
#include "MetricsObserver.hpp"
|
||||||
|
|
||||||
|
namespace matador::test {
|
||||||
|
MetricsObserver::MetricsObserver(utils::message_bus& bus) {
|
||||||
|
subscriptions.push_back(bus.subscribe<sql::statement_lock_failed_event>([this](const sql::statement_lock_failed_event &ev) {
|
||||||
|
std::lock_guard lock(mutex_);
|
||||||
|
lock_failure_count_++;
|
||||||
|
total_lock_wait_time_ += ev.duration;
|
||||||
|
}));
|
||||||
|
subscriptions.push_back(bus.subscribe<sql::statement_execution_event>([this](const sql::statement_execution_event &ev) {
|
||||||
|
std::lock_guard lock(mutex_);
|
||||||
|
execution_count_++;
|
||||||
|
total_execution_time_ += ev.duration;
|
||||||
|
}));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
std::chrono::milliseconds MetricsObserver::get_average_lock_wait_time() const {
|
||||||
|
std::lock_guard lock(mutex_);
|
||||||
|
if (lock_failure_count_ == 0) {
|
||||||
|
return std::chrono::milliseconds{0};
|
||||||
|
}
|
||||||
|
const auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(total_lock_wait_time_);
|
||||||
|
return std::chrono::milliseconds(millis.count() / lock_failure_count_);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::chrono::milliseconds MetricsObserver::get_average_execution_time() const {
|
||||||
|
std::lock_guard lock(mutex_);
|
||||||
|
if (execution_count_ == 0) {
|
||||||
|
return std::chrono::milliseconds{0};
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(total_execution_time_);
|
||||||
|
return std::chrono::milliseconds(millis.count() / execution_count_);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::chrono::milliseconds MetricsObserver::get_total_lock_wait_time() const {
|
||||||
|
std::lock_guard lock(mutex_);
|
||||||
|
return std::chrono::duration_cast<std::chrono::milliseconds>(total_lock_wait_time_);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::chrono::milliseconds MetricsObserver::get_total_execution_time() const {
|
||||||
|
std::lock_guard lock(mutex_);
|
||||||
|
return std::chrono::duration_cast<std::chrono::milliseconds>(total_execution_time_);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t MetricsObserver::get_lock_failure_count() const {
|
||||||
|
std::lock_guard lock(mutex_);
|
||||||
|
return lock_failure_count_;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,28 @@
|
||||||
|
#ifndef MATADOR_METRICS_OBSERVER_HPP
|
||||||
|
#define MATADOR_METRICS_OBSERVER_HPP
|
||||||
|
|
||||||
|
#include "matador/utils/message_bus.hpp"
|
||||||
|
|
||||||
|
#include "matador/sql/statement_cache.hpp"
|
||||||
|
|
||||||
|
namespace matador::test {
|
||||||
|
class MetricsObserver {
|
||||||
|
public:
|
||||||
|
explicit MetricsObserver(utils::message_bus &bus);
|
||||||
|
|
||||||
|
std::chrono::milliseconds get_average_lock_wait_time() const;
|
||||||
|
std::chrono::milliseconds get_average_execution_time() const;
|
||||||
|
std::chrono::milliseconds get_total_lock_wait_time() const;
|
||||||
|
std::chrono::milliseconds get_total_execution_time() const;
|
||||||
|
size_t get_lock_failure_count() const;
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::vector<utils::subscription> subscriptions;
|
||||||
|
mutable std::mutex mutex_;
|
||||||
|
size_t lock_failure_count_{0};
|
||||||
|
size_t execution_count_{0};
|
||||||
|
std::chrono::nanoseconds total_lock_wait_time_{0};
|
||||||
|
std::chrono::nanoseconds total_execution_time_{0};
|
||||||
|
};
|
||||||
|
}
|
||||||
|
#endif //MATADOR_METRICS_OBSERVER_HPP
|
||||||
|
|
@ -0,0 +1,28 @@
|
||||||
|
#include "RecordingObserver.hpp"
|
||||||
|
|
||||||
|
namespace matador::test {
|
||||||
|
RecordingObserver::RecordingObserver(utils::message_bus& bus) {
|
||||||
|
subscriptions.push_back(bus.subscribe<sql::statement_accessed_event>([this](const sql::statement_accessed_event &ev) {
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
events.push(utils::message::from_ref(ev));
|
||||||
|
}));
|
||||||
|
subscriptions.push_back(bus.subscribe<sql::statement_added_event>([this](const sql::statement_added_event &ev) {
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
events.push(utils::message::from_ref(ev));
|
||||||
|
}));
|
||||||
|
subscriptions.push_back(bus.subscribe<sql::statement_evicted_event>([this](const sql::statement_evicted_event &ev) {
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
events.push(utils::message::from_ref(ev));
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
std::optional<utils::message> RecordingObserver::poll() {
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
if (events.empty()) {
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
auto evt = events.front();
|
||||||
|
events.pop();
|
||||||
|
return evt;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,24 @@
|
||||||
|
#ifndef MATADOR_RECORDING_OBSERVER_HPP
|
||||||
|
#define MATADOR_RECORDING_OBSERVER_HPP
|
||||||
|
|
||||||
|
#include "matador/utils/message_bus.hpp"
|
||||||
|
|
||||||
|
#include "matador/sql/statement_cache.hpp"
|
||||||
|
|
||||||
|
#include <queue>
|
||||||
|
|
||||||
|
namespace matador::test {
|
||||||
|
class RecordingObserver final {
|
||||||
|
public:
|
||||||
|
explicit RecordingObserver(utils::message_bus &bus);
|
||||||
|
|
||||||
|
std::optional<utils::message> poll();
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::vector<utils::subscription> subscriptions;
|
||||||
|
std::mutex mutex;
|
||||||
|
std::queue<utils::message> events;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif //MATADOR_RECORDING_OBSERVER_HPP
|
||||||
Loading…
Reference in New Issue