Compare commits

...

2 Commits

Author SHA1 Message Date
Sascha Kühl 01d7179604 added message event handling 2025-08-11 16:59:48 +02:00
Sascha Kühl 789be1174b removed namespace interface 2025-08-11 16:59:16 +02:00
13 changed files with 302 additions and 152 deletions

View File

@ -3,7 +3,7 @@
#include "matador/utils/attribute_writer.hpp"
namespace matador::sql::interface {
namespace matador::sql {
using parameter_binder = utils::attribute_writer;

View File

@ -22,23 +22,23 @@ protected:
public:
virtual ~statement_impl() = default;
virtual utils::result<size_t, utils::error> execute(const interface::parameter_binder& bindings) = 0;
virtual utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(const interface::parameter_binder& bindings) = 0;
virtual utils::result<size_t, utils::error> execute(const parameter_binder& bindings) = 0;
virtual utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(const parameter_binder& bindings) = 0;
template < class Type >
void bind_object(Type &obj, interface::parameter_binder& bindings) {
void bind_object(Type &obj, parameter_binder& bindings) {
object_parameter_binder object_binder_;
object_binder_.reset(start_index());
object_binder_.bind(obj, bindings);
}
template < class Type >
void bind(const size_t pos, Type &val, interface::parameter_binder& bindings) {
void bind(const size_t pos, Type &val, parameter_binder& bindings) {
utils::data_type_traits<Type>::bind_value(bindings, adjust_index(pos), val);
}
void bind(size_t pos, const char *value, size_t size, interface::parameter_binder& bindings) const;
void bind(size_t pos, std::string &val, size_t size, interface::parameter_binder& bindings) const;
void bind(size_t pos, const char *value, size_t size, parameter_binder& bindings) const;
void bind(size_t pos, std::string &val, size_t size, parameter_binder& bindings) const;
virtual void reset() = 0;

View File

@ -12,19 +12,19 @@ protected:
public:
virtual ~statement_proxy() = default;
virtual utils::result<size_t, utils::error> execute(interface::parameter_binder& bindings) = 0;
virtual utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(interface::parameter_binder& bindings) = 0;
virtual utils::result<size_t, utils::error> execute(parameter_binder& bindings) = 0;
virtual utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(parameter_binder& bindings) = 0;
template<class Type>
void bind(const Type &obj, interface::parameter_binder& bindings) {
void bind(const Type &obj, parameter_binder& bindings) {
statement_->bind_object(obj, bindings);
}
template<typename Type>
void bind(size_t pos, Type &value, interface::parameter_binder& bindings) {
void bind(size_t pos, Type &value, parameter_binder& bindings) {
statement_->bind(pos, value, bindings);
}
void bind(size_t pos, const char *value, size_t size, interface::parameter_binder& bindings) const;
void bind(size_t pos, std::string &val, size_t size, interface::parameter_binder& bindings) const;
void bind(size_t pos, const char *value, size_t size, parameter_binder& bindings) const;
void bind(size_t pos, std::string &val, size_t size, parameter_binder& bindings) const;
void reset() const;

View File

@ -23,6 +23,19 @@ struct statement_accessed_event : statement_event {};
struct statement_added_event : statement_event {};
struct statement_evicted_event : statement_event {};
struct statement_lock_failed_event : statement_event {
std::chrono::steady_clock::time_point lock_attempt_start;
};
struct statement_lock_acquired_event : statement_event {
std::chrono::steady_clock::time_point lock_attempt_start;
std::chrono::steady_clock::time_point lock_attempt_end;
};
struct statement_execution_event : statement_event {
std::chrono::steady_clock::time_point execution_start;
std::chrono::steady_clock::time_point execution_end;
};
struct statement_cache_config {
size_t max_size;

View File

@ -14,60 +14,109 @@
#include <stdexcept>
namespace matador::utils {
class message_bus; // forward
class message_bus;
/**
* @brief Represents a generic, type-safe message object used for communication.
*/
class message {
public:
/**
* @brief Default constructor for an empty message.
*/
message();
// Owning: copy into an owned shared_ptr (may allocate)
template<typename T>
explicit message(const T& value) {
auto strong = std::make_shared<T>(value); // allocate
/**
* @brief Constructs an owning message from a value.
*
* @tparam MessageType The type of the message.
* @param msg The message to be stored. A copy is made.
*/
template<typename MessageType>
explicit message(const MessageType& msg) {
auto strong = std::make_shared<MessageType>(msg); // allocate
owner_ = std::static_pointer_cast<void>(strong); // share ownership as void*
ptr_ = static_cast<const void*>(strong.get());
type_ = std::type_index(typeid(T));
type_ = std::type_index(typeid(MessageType));
}
// Non-owning: zero-copy reference to an existing object (no allocation)
// Caller must guarantee lifetime of `ref` for the usage duration.
template<typename T>
static message from_ref(const T& ref) {
/**
* @brief Constructs a non-owning message from a reference.
*
* @tparam MessageType The type of the reference.
* @param msg A reference to the message data. The caller must ensure its lifetime during use.
* @return A non-owning message wrapping the reference.
*/
template<typename MessageType>
static message from_ref(const MessageType& msg) {
message m;
m.ptr_ = static_cast<const void*>(std::addressof(ref));
m.type_ = std::type_index(typeid(T));
m.ptr_ = static_cast<const void*>(std::addressof(msg));
m.type_ = std::type_index(typeid(MessageType));
// owner_ remains null => non-owning
return m;
}
// Owning from shared_ptr (no extra copy; shares ownership)
template<typename T>
static message from_shared(std::shared_ptr<T> sp) {
/**
* @brief Constructs an owning message from a shared pointer.
*
* @tparam MessageType The type of the object managed by the shared pointer.
* @param msg A shared pointer to the message.
* @return A message that shares ownership of the object.
*/
template<typename MessageType>
static message from_shared(std::shared_ptr<MessageType> msg) {
message m;
m.owner_ = std::static_pointer_cast<void>(sp);
m.ptr_ = static_cast<const void*>(sp.get());
m.type_ = std::type_index(typeid(T));
m.owner_ = std::static_pointer_cast<void>(msg);
m.ptr_ = static_cast<const void*>(msg.get());
m.type_ = std::type_index(typeid(MessageType));
return m;
}
/**
* @brief Checks if the message is empty.
*
* @return True if the message is empty, false otherwise.
*/
[[nodiscard]] bool empty() const;
/**
* @brief Gets the type of the object stored in the message.
*
* @return The type of the stored object as a std::type_index.
*/
[[nodiscard]] std::type_index type() const;
template<typename T>
[[nodiscard]] bool is() const { return type_ == std::type_index(typeid(T)); }
/**
* @brief Checks if the stored object is of the specified type.
*
* @tparam MessageType The type to check.
* @return True if the object matches the type, false otherwise.
*/
template<typename MessageType>
[[nodiscard]] bool is() const { return type_ == std::type_index(typeid(MessageType)); }
// Access as typed reference. Works for both owning and non-owning.
// Throws std::bad_cast if type mismatch.
template<typename T>
const T& get() const {
if (!is<T>()) throw std::bad_cast();
const void* p = msg_ptr();
/**
* @brief Accesses the stored object as a typed reference.
*
* @tparam MessageType The expected type of the object.
* @return A constant reference to the stored object.
* @throws std::bad_cast if the type does not match.
* @throws std::runtime_error if the message is empty.
*/
template<typename MessageType>
const MessageType& get() const {
if (!is<MessageType>()) throw std::bad_cast();
const void* p = raw_ptr();
if (!p) throw std::runtime_error("AnyMessage: empty pointer");
return *static_cast<const T*>(p);
return *static_cast<const MessageType*>(p);
}
// Return the raw const void* pointer to the stored object (may be non-owning)
[[nodiscard]] const void* msg_ptr() const;
/**
* @brief Retrieves the raw pointer to the stored object.
*
* @return A const void* pointing to the stored object (may be non-owning).
*/
[[nodiscard]] const void* raw_ptr() const;
private:
const void* ptr_; // non-owning raw pointer (if set)
@ -75,22 +124,46 @@ private:
std::type_index type_{typeid(void)};
};
// Forward declare Subscription so MessageBus can return it.
/**
* @brief Represents a subscription to a message_bus, providing RAII-style unsubscription.
*/
class subscription {
public:
/**
* @brief Default constructor for an empty subscription.
*/
subscription() = default;
/**
* @brief Constructs a subscription for a specific handler and message type.
*
* @param bus Pointer to the message bus managing the subscription.
* @param type The type_index of the message type.
* @param id The unique ID of the handler.
*/
subscription(message_bus* bus, std::type_index type, uint64_t id);
// Move-only (non-copyable)
// Prohibit copying of subscription objects.
subscription(const subscription&) = delete;
subscription& operator=(const subscription&) = delete;
// Allow move semantics for subscription objects.
subscription(subscription&& other) noexcept;
subscription& operator=(subscription&& other) noexcept;
/**
* @brief Destructor that ensures cleanup by unsubscribing from the bus.
*/
~subscription();
/**
* @brief Unsubscribes the handler from the message bus.
*/
void unsubscribe();
/**
* @brief Checks if the subscription is still valid.
*
* @return True if the subscription is valid, false otherwise.
*/
[[nodiscard]] bool valid() const;
private:
@ -99,90 +172,139 @@ private:
uint64_t id_ = 0;
};
// === MessageBus: runtime-registered, thread-safe ===
/**
* @brief A thread-safe message bus for runtime message publishing and subscription handling.
*/
class message_bus {
public:
using HandlerId = uint64_t;
using HandlerId = uint64_t; /**< Type alias for handler IDs. */
message_bus() : next_id_{1} {}
/**
* @brief Constructs a new message_bus object.
*/
message_bus();
// Subscribe with a free function or lambda
template<typename T>
subscription subscribe(std::function<void(const T&)> handler,
std::function<bool(const T&)> filter = nullptr)
/**
* @brief Subscribes to messages of a specific type using a free function or lambda.
*
* @tparam MessageType The type of the message to subscribe to.
* @param handler The function to execute when a message of type T is published.
* @param filter An optional filter function to determine if the handler should be invoked.
* @return A subscription object to manage the handler's registration.
*/
template<typename MessageType>
subscription subscribe(std::function<void(const MessageType&)> handler,
std::function<bool(const MessageType&)> filter = nullptr)
{
auto id = next_id_.fetch_add(1, std::memory_order_relaxed);
std::unique_lock writeLock(mutex_);
auto &vec = handlers_[std::type_index(typeid(T))];
auto &vec = handlers_[std::type_index(typeid(MessageType))];
Entry e;
e.id = id;
e.handler = [h = std::move(handler)](const void* p) {
h(*static_cast<const T*>(p));
h(*static_cast<const MessageType*>(p));
};
if (filter) {
e.filter = [f = std::move(filter)](const void* p) -> bool {
return f(*static_cast<const T*>(p));
return f(*static_cast<const MessageType*>(p));
};
} else {
e.filter = nullptr;
}
vec.emplace_back(std::move(e));
return {this, std::type_index(typeid(T)), id};
return {this, std::type_index(typeid(MessageType)), id};
}
// Subscribe raw pointer instance; caller ensures lifetime
template<typename T, typename C>
subscription subscribe(C* instance, void (C::*memberFn)(const T&),
std::function<bool(const T&)> filter = nullptr)
/**
* @brief Subscribes to messages using a class member function bound to an instance.
*
* @tparam MessageType The type of the message to subscribe to.
* @tparam CallerClass The class of the instance.
* @param instance A pointer to the instance.
* @param memberFn A pointer to the member function to execute.
* @param filter An optional filter function.
* @return A subscription object to manage the handler's registration.
*/
template<typename MessageType, typename CallerClass>
subscription subscribe(CallerClass* instance, void (CallerClass::*memberFn)(const MessageType&),
std::function<bool(const MessageType&)> filter = nullptr)
{
auto fn = [instance, memberFn](const T& m) { (instance->*memberFn)(m); };
return subscribe<T>(std::function<void(const T&)>(fn), std::move(filter));
auto fn = [instance, memberFn](const MessageType& m) { (instance->*memberFn)(m); };
return subscribe<MessageType>(std::function<void(const MessageType&)>(fn), std::move(filter));
}
// Subscribe shared_ptr instance (safe)
template<typename T, typename C>
subscription subscribe(std::shared_ptr<C> instance, void (C::*memberFn)(const T&),
std::function<bool(const T&)> filter = nullptr)
/**
* @brief Subscribes to messages using a member function bound to a shared_ptr instance.
*
* @tparam MessageType The type of the message to subscribe to.
* @tparam CallerClass The class of the shared_ptr instance.
* @param instance A shared pointer to the instance.
* @param memberFn A pointer to the member function to execute.
* @param filter An optional filter function.
* @return A subscription object to manage the handler's registration.
*/
template<typename MessageType, typename CallerClass>
subscription subscribe(std::shared_ptr<CallerClass> instance, void (CallerClass::*memberFn)(const MessageType&),
std::function<bool(const MessageType&)> filter = nullptr)
{
std::weak_ptr<C> w = instance;
auto handler = [w, memberFn](const T& m) {
std::weak_ptr<CallerClass> w = instance;
auto handler = [w, memberFn](const MessageType& m) {
if (auto s = w.lock()) {
(s.get()->*memberFn)(m);
}
};
std::function<bool(const T&)> local_filter = nullptr;
std::function<bool(const MessageType&)> local_filter = nullptr;
if (filter) {
local_filter = [w, filter = std::move(filter)](const T& m) -> bool {
local_filter = [w, filter = std::move(filter)](const MessageType& m) -> bool {
if (w.expired()) {
return false;
}
return filter(m);
};
}
return subscribe<T>(std::move(handler), std::move(local_filter));
return subscribe<MessageType>(std::move(handler), std::move(local_filter));
}
// Unsubscribe by type-specific id (RAII calls this)
template<typename T>
/**
* @brief Unsubscribes a handler from the bus using its type and ID.
*
* @tparam MessageType The type of the message.
* @param id The unique handler ID to unsubscribe.
*/
template<typename MessageType>
void unsubscribe(const HandlerId id) {
unsubscribe(std::type_index(typeid(T)), id);
unsubscribe(std::type_index(typeid(MessageType)), id);
}
// Unsubscribe by explicit type_index and id
/**
* @brief Unsubscribes a handler by its type index and ID.
*
* @param type The type_index of the message type.
* @param id The unique handler ID to unsubscribe.
*/
void unsubscribe(std::type_index type, HandlerId id);
// Unsubscribe by id regardless of type (O(#types) scan)
/**
* @brief Unsubscribes a handler by its unique ID, regardless of type.
*
* @param id The unique handler ID to unsubscribe.
*/
void unsubscribe(HandlerId id);
// Publish a typed message (fast)
template<typename T>
void publish(const T& msg) const {
/**
* @brief Publishes a message of a specific type.
*
* @tparam MessageType The type of the message to publish.
* @param msg The message to publish.
*/
template<typename MessageType>
void publish(const MessageType& msg) const {
std::vector<Entry> snapshot;
{
std::shared_lock readLock(mutex_);
const auto it = handlers_.find(std::type_index(typeid(T)));
const auto it = handlers_.find(std::type_index(typeid(MessageType)));
if (it == handlers_.end()) {
return;
}
@ -193,7 +315,11 @@ public:
}
}
// Publish using AnyMessage (zero-copy if AnyMessage is non-owning)
/**
* @brief Publishes a generic message object.
*
* @param msg The message to publish.
*/
void publish(const message& msg) const;
private:
@ -210,7 +336,6 @@ private:
std::atomic<HandlerId> next_id_;
};
// RAII unsubscribe implementation
inline void subscription::unsubscribe() {
if (bus_) {
bus_->unsubscribe(type_, id_);

View File

@ -14,7 +14,7 @@ std::type_index message::type() const {
return type_;
}
const void * message::msg_ptr() const {
const void * message::raw_ptr() const {
return owner_ ? owner_.get() : ptr_;
}
@ -49,6 +49,9 @@ bool subscription::valid() const {
return bus_ != nullptr;
}
message_bus::message_bus()
: next_id_{1} {}
void message_bus::unsubscribe(const std::type_index type, HandlerId id) {
std::unique_lock writeLock(mutex_);
const auto it = handlers_.find(type);
@ -89,7 +92,7 @@ void message_bus::publish(const message &msg) const {
}
snapshot = it->second;
}
const void* p = msg.msg_ptr();
const void* p = msg.raw_ptr();
for (const auto &e : snapshot) {
if (!e.filter || e.filter(p)) e.handler(p);
}

View File

@ -19,10 +19,10 @@ public:
explicit connection_statement_proxy(std::unique_ptr<statement_impl>&& stmt)
: statement_proxy(std::move(stmt)) {}
utils::result<size_t, utils::error> execute(interface::parameter_binder& bindings) override {
utils::result<size_t, utils::error> execute(parameter_binder& bindings) override {
return statement_->execute(bindings);
}
utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(interface::parameter_binder& bindings) override {
utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(parameter_binder& bindings) override {
return statement_->fetch(bindings);
}
};

View File

@ -6,11 +6,11 @@ statement_impl::statement_impl(query_context query)
: query_(std::move(query))
{}
void statement_impl::bind(const size_t pos, const char *value, const size_t size, interface::parameter_binder& bindings) const {
void statement_impl::bind(const size_t pos, const char *value, const size_t size, parameter_binder& bindings) const {
utils::data_type_traits<const char*>::bind_value(bindings, adjust_index(pos), value, size);
}
void statement_impl::bind(const size_t pos, std::string &val, const size_t size, interface::parameter_binder& bindings) const {
void statement_impl::bind(const size_t pos, std::string &val, const size_t size, parameter_binder& bindings) const {
utils::data_type_traits<std::string>::bind_value(bindings, adjust_index(pos), val, size);
}

View File

@ -4,10 +4,10 @@ namespace matador::sql {
statement_proxy::statement_proxy(std::unique_ptr<statement_impl>&& stmt)
: statement_(std::move(stmt)){}
void statement_proxy::bind(const size_t pos, const char* value, const size_t size, interface::parameter_binder& bindings) const {
void statement_proxy::bind(const size_t pos, const char* value, const size_t size, parameter_binder& bindings) const {
statement_->bind(pos, value, size, bindings);
}
void statement_proxy::bind(const size_t pos, std::string& val, const size_t size, interface::parameter_binder& bindings) const {
void statement_proxy::bind(const size_t pos, std::string& val, const size_t size, parameter_binder& bindings) const {
statement_->bind(pos, val, size, bindings);
}

View File

@ -24,24 +24,26 @@ public:
size_t lock_attempts{0};
};
explicit statement_cache_proxy(std::unique_ptr<statement_impl>&& stmt, connection_pool &pool, const size_t connection_id)
statement_cache_proxy(utils::message_bus &bus, std::unique_ptr<statement_impl>&& stmt, connection_pool &pool, const size_t connection_id)
: statement_proxy(std::move(stmt))
, pool_(pool)
, connection_id_(connection_id) {}
, connection_id_(connection_id)
, bus_(bus) {}
utils::result<size_t, utils::error> execute(interface::parameter_binder& bindings) override {
utils::result<size_t, utils::error> execute(parameter_binder& bindings) override {
execution_metrics metrics{std::chrono::steady_clock::now()};
auto result = try_with_retry([this, &bindings, &metrics]() -> utils::result<size_t, utils::error> {
if (!try_lock()) {
++metrics.lock_attempts;
bus_.publish<statement_lock_failed_event>({sql(), std::chrono::steady_clock::now(), metrics.lock_attempt_start});
return utils::failure(utils::error{
error_code::STATEMENT_LOCKED,
"Failed to execute statement because it is already in use"
});
}
metrics.lock_acquired = std::chrono::steady_clock::now();
// publish
bus_.publish<statement_lock_acquired_event>({sql(), std::chrono::steady_clock::now(), metrics.lock_attempt_start, metrics.lock_acquired});
auto guard = statement_guard(*this);
if (const auto conn = pool_.acquire(connection_id_); !conn.valid()) {
@ -55,14 +57,15 @@ public:
auto execution_result = statement_->execute(bindings);
metrics.execution_end = std::chrono::steady_clock::now();
// publish
bus_.publish<statement_execution_event>({sql(), std::chrono::steady_clock::now(), metrics.execution_start, metrics.execution_end});
return execution_result;
});
return result;
}
utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(interface::parameter_binder& bindings) override {
utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(parameter_binder& bindings) override {
execution_metrics metrics{std::chrono::steady_clock::now()};
auto result = try_with_retry([this, &bindings, &metrics]() -> utils::result<std::unique_ptr<query_result_impl>, utils::error> {
@ -104,7 +107,7 @@ protected:
if (attempt + 1 < config_.max_attempts) {
std::this_thread::sleep_for(current_wait);
current_wait = std::min(current_wait * 2, config_.max_wait);
current_wait = (std::min)(current_wait * 2, config_.max_wait);
}
}
@ -132,6 +135,7 @@ private:
connection_pool &pool_;
size_t connection_id_{};
retry_config config_{};
utils::message_bus &bus_;
};
}
@ -178,7 +182,7 @@ utils::result<statement, utils::error> statement_cache::acquire(const query_cont
const auto it = cache_map_.insert({
key,
{statement{
std::make_shared<internal::statement_cache_proxy>(std::move(stmt), pool_, id)},
std::make_shared<internal::statement_cache_proxy>(bus_, std::move(stmt), pool_, id)},
std::chrono::steady_clock::now(),
usage_list_.begin()}
}).first;

View File

@ -2,6 +2,7 @@
#include "test_result_reader.hpp"
#include "test_parameter_binder.hpp"
#include <chrono>
#include <random>
#include <thread>
@ -9,7 +10,7 @@ namespace matador::test::orm {
test_statement::test_statement(const sql::query_context &query)
: statement_impl(query) {}
utils::result<size_t, utils::error> test_statement::execute(const sql::interface::parameter_binder &/*bindings*/) {
utils::result<size_t, utils::error> test_statement::execute(const sql::parameter_binder &/*bindings*/) {
using namespace std::chrono_literals;
std::mt19937 rng(query_.sql.size());
std::uniform_int_distribution dist(10, 40);
@ -17,7 +18,7 @@ utils::result<size_t, utils::error> test_statement::execute(const sql::interface
return utils::ok(static_cast<size_t>(8));
}
utils::result<std::unique_ptr<sql::query_result_impl>, utils::error> test_statement::fetch(const sql::interface::parameter_binder &/*bindings*/) {
utils::result<std::unique_ptr<sql::query_result_impl>, utils::error> test_statement::fetch(const sql::parameter_binder &/*bindings*/) {
return utils::ok(std::make_unique<sql::query_result_impl>(std::make_unique<test_result_reader>(), query_.prototype, query_.prototype.size()));
}

View File

@ -8,8 +8,8 @@ namespace matador::test::orm {
class test_statement final : public sql::statement_impl {
public:
explicit test_statement(const sql::query_context &query);
utils::result<size_t, utils::error> execute(const sql::interface::parameter_binder &bindings) override;
utils::result<std::unique_ptr<sql::query_result_impl>, utils::error> fetch(const sql::interface::parameter_binder &bindings) override;
utils::result<size_t, utils::error> execute(const sql::parameter_binder &bindings) override;
utils::result<std::unique_ptr<sql::query_result_impl>, utils::error> fetch(const sql::parameter_binder &bindings) override;
void reset() override;
protected:

View File

@ -22,54 +22,57 @@ using namespace matador::sql;
using namespace matador::query;
using namespace matador::utils;
// Beispiel für einen Observer, der die Metriken auswertet
// class MetricsObserver : public statement_cache_observer_interface {
// public:
// void on_event(const statement_cache_event& evt) override {
// std::lock_guard<std::mutex> lock(mutex_);
//
// switch (evt.type) {
// case statement_cache_event::Type::LockFailed:
// lock_failure_count_++;
// if (evt.duration) {
// total_lock_wait_time_ += evt.duration.value();
// }
// break;
//
// case statement_cache_event::Type::ExecutionEnded:
// execution_count_++;
// if (evt.duration) {
// total_execution_time_ += evt.duration.value();
// }
// break;
// }
// }
//
// // Metrik-Zugriffsmethoden
// double get_average_lock_wait_time() const {
// std::lock_guard<std::mutex> lock(mutex_);
// if (lock_failure_count_ == 0) return 0.0;
// return std::chrono::duration<double>(total_lock_wait_time_).count() / lock_failure_count_;
// }
//
// double get_average_execution_time() const {
// std::lock_guard<std::mutex> lock(mutex_);
// if (execution_count_ == 0) return 0.0;
// return std::chrono::duration<double>(total_execution_time_).count() / execution_count_;
// }
//
// size_t get_lock_failure_count() const {
// std::lock_guard<std::mutex> lock(mutex_);
// return lock_failure_count_;
// }
//
// private:
// mutable std::mutex mutex_;
// size_t lock_failure_count_{0};
// size_t execution_count_{0};
// std::chrono::nanoseconds total_lock_wait_time_{0};
// std::chrono::nanoseconds total_execution_time_{0};
// };
class MetricsObserver {
public:
MetricsObserver(message_bus &bus) {
}
void on_event(const statement_cache_event& evt) override {
std::lock_guard<std::mutex> lock(mutex_);
switch (evt.type) {
case statement_cache_event::Type::LockFailed:
lock_failure_count_++;
if (evt.duration) {
total_lock_wait_time_ += evt.duration.value();
}
break;
case statement_cache_event::Type::ExecutionEnded:
execution_count_++;
if (evt.duration) {
total_execution_time_ += evt.duration.value();
}
break;
}
}
// Metrik-Zugriffsmethoden
double get_average_lock_wait_time() const {
std::lock_guard lock(mutex_);
if (lock_failure_count_ == 0) return 0.0;
return std::chrono::duration<double>(total_lock_wait_time_).count() / lock_failure_count_;
}
double get_average_execution_time() const {
std::lock_guard lock(mutex_);
if (execution_count_ == 0) return 0.0;
return std::chrono::duration<double>(total_execution_time_).count() / execution_count_;
}
size_t get_lock_failure_count() const {
std::lock_guard lock(mutex_);
return lock_failure_count_;
}
private:
mutable std::mutex mutex_;
size_t lock_failure_count_{0};
size_t execution_count_{0};
std::chrono::nanoseconds total_lock_wait_time_{0};
std::chrono::nanoseconds total_execution_time_{0};
};
// void example_with_metrics() {
// connection_pool pool("noop://noop.db", 4);
// statement_cache cache(pool, 5);
@ -304,9 +307,10 @@ TEST_CASE("Race condition simulation with mixed access", "[statement_cache][race
auto task = [&](int /*id*/) {
for (int i = 0; i < operations; ++i) {
auto sql = "SELECT " + std::to_string(i % 10);
auto result = cache.acquire({sql});
REQUIRE(result);
const auto sql = "SELECT " + std::to_string(i % 10);
if (const auto result = cache.acquire({sql}); !result) {
FAIL("Statement should not be available");
}
// if (i % 50 == 0) {
// cache.cleanup_expired_connections();