Compare commits

...

2 Commits

Author SHA1 Message Date
Sascha Kühl 01d7179604 added message event handling 2025-08-11 16:59:48 +02:00
Sascha Kühl 789be1174b removed namespace interface 2025-08-11 16:59:16 +02:00
13 changed files with 302 additions and 152 deletions

View File

@ -3,7 +3,7 @@
#include "matador/utils/attribute_writer.hpp" #include "matador/utils/attribute_writer.hpp"
namespace matador::sql::interface { namespace matador::sql {
using parameter_binder = utils::attribute_writer; using parameter_binder = utils::attribute_writer;

View File

@ -22,23 +22,23 @@ protected:
public: public:
virtual ~statement_impl() = default; virtual ~statement_impl() = default;
virtual utils::result<size_t, utils::error> execute(const interface::parameter_binder& bindings) = 0; virtual utils::result<size_t, utils::error> execute(const parameter_binder& bindings) = 0;
virtual utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(const interface::parameter_binder& bindings) = 0; virtual utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(const parameter_binder& bindings) = 0;
template < class Type > template < class Type >
void bind_object(Type &obj, interface::parameter_binder& bindings) { void bind_object(Type &obj, parameter_binder& bindings) {
object_parameter_binder object_binder_; object_parameter_binder object_binder_;
object_binder_.reset(start_index()); object_binder_.reset(start_index());
object_binder_.bind(obj, bindings); object_binder_.bind(obj, bindings);
} }
template < class Type > template < class Type >
void bind(const size_t pos, Type &val, interface::parameter_binder& bindings) { void bind(const size_t pos, Type &val, parameter_binder& bindings) {
utils::data_type_traits<Type>::bind_value(bindings, adjust_index(pos), val); utils::data_type_traits<Type>::bind_value(bindings, adjust_index(pos), val);
} }
void bind(size_t pos, const char *value, size_t size, interface::parameter_binder& bindings) const; void bind(size_t pos, const char *value, size_t size, parameter_binder& bindings) const;
void bind(size_t pos, std::string &val, size_t size, interface::parameter_binder& bindings) const; void bind(size_t pos, std::string &val, size_t size, parameter_binder& bindings) const;
virtual void reset() = 0; virtual void reset() = 0;

View File

@ -12,19 +12,19 @@ protected:
public: public:
virtual ~statement_proxy() = default; virtual ~statement_proxy() = default;
virtual utils::result<size_t, utils::error> execute(interface::parameter_binder& bindings) = 0; virtual utils::result<size_t, utils::error> execute(parameter_binder& bindings) = 0;
virtual utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(interface::parameter_binder& bindings) = 0; virtual utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(parameter_binder& bindings) = 0;
template<class Type> template<class Type>
void bind(const Type &obj, interface::parameter_binder& bindings) { void bind(const Type &obj, parameter_binder& bindings) {
statement_->bind_object(obj, bindings); statement_->bind_object(obj, bindings);
} }
template<typename Type> template<typename Type>
void bind(size_t pos, Type &value, interface::parameter_binder& bindings) { void bind(size_t pos, Type &value, parameter_binder& bindings) {
statement_->bind(pos, value, bindings); statement_->bind(pos, value, bindings);
} }
void bind(size_t pos, const char *value, size_t size, interface::parameter_binder& bindings) const; void bind(size_t pos, const char *value, size_t size, parameter_binder& bindings) const;
void bind(size_t pos, std::string &val, size_t size, interface::parameter_binder& bindings) const; void bind(size_t pos, std::string &val, size_t size, parameter_binder& bindings) const;
void reset() const; void reset() const;

View File

@ -23,6 +23,19 @@ struct statement_accessed_event : statement_event {};
struct statement_added_event : statement_event {}; struct statement_added_event : statement_event {};
struct statement_evicted_event : statement_event {}; struct statement_evicted_event : statement_event {};
struct statement_lock_failed_event : statement_event {
std::chrono::steady_clock::time_point lock_attempt_start;
};
struct statement_lock_acquired_event : statement_event {
std::chrono::steady_clock::time_point lock_attempt_start;
std::chrono::steady_clock::time_point lock_attempt_end;
};
struct statement_execution_event : statement_event {
std::chrono::steady_clock::time_point execution_start;
std::chrono::steady_clock::time_point execution_end;
};
struct statement_cache_config { struct statement_cache_config {
size_t max_size; size_t max_size;

View File

@ -14,60 +14,109 @@
#include <stdexcept> #include <stdexcept>
namespace matador::utils { namespace matador::utils {
class message_bus; // forward class message_bus;
/**
* @brief Represents a generic, type-safe message object used for communication.
*/
class message { class message {
public: public:
/**
* @brief Default constructor for an empty message.
*/
message(); message();
// Owning: copy into an owned shared_ptr (may allocate) /**
template<typename T> * @brief Constructs an owning message from a value.
explicit message(const T& value) { *
auto strong = std::make_shared<T>(value); // allocate * @tparam MessageType The type of the message.
* @param msg The message to be stored. A copy is made.
*/
template<typename MessageType>
explicit message(const MessageType& msg) {
auto strong = std::make_shared<MessageType>(msg); // allocate
owner_ = std::static_pointer_cast<void>(strong); // share ownership as void* owner_ = std::static_pointer_cast<void>(strong); // share ownership as void*
ptr_ = static_cast<const void*>(strong.get()); ptr_ = static_cast<const void*>(strong.get());
type_ = std::type_index(typeid(T)); type_ = std::type_index(typeid(MessageType));
} }
// Non-owning: zero-copy reference to an existing object (no allocation) /**
// Caller must guarantee lifetime of `ref` for the usage duration. * @brief Constructs a non-owning message from a reference.
template<typename T> *
static message from_ref(const T& ref) { * @tparam MessageType The type of the reference.
* @param msg A reference to the message data. The caller must ensure its lifetime during use.
* @return A non-owning message wrapping the reference.
*/
template<typename MessageType>
static message from_ref(const MessageType& msg) {
message m; message m;
m.ptr_ = static_cast<const void*>(std::addressof(ref)); m.ptr_ = static_cast<const void*>(std::addressof(msg));
m.type_ = std::type_index(typeid(T)); m.type_ = std::type_index(typeid(MessageType));
// owner_ remains null => non-owning // owner_ remains null => non-owning
return m; return m;
} }
// Owning from shared_ptr (no extra copy; shares ownership) /**
template<typename T> * @brief Constructs an owning message from a shared pointer.
static message from_shared(std::shared_ptr<T> sp) { *
* @tparam MessageType The type of the object managed by the shared pointer.
* @param msg A shared pointer to the message.
* @return A message that shares ownership of the object.
*/
template<typename MessageType>
static message from_shared(std::shared_ptr<MessageType> msg) {
message m; message m;
m.owner_ = std::static_pointer_cast<void>(sp); m.owner_ = std::static_pointer_cast<void>(msg);
m.ptr_ = static_cast<const void*>(sp.get()); m.ptr_ = static_cast<const void*>(msg.get());
m.type_ = std::type_index(typeid(T)); m.type_ = std::type_index(typeid(MessageType));
return m; return m;
} }
/**
* @brief Checks if the message is empty.
*
* @return True if the message is empty, false otherwise.
*/
[[nodiscard]] bool empty() const; [[nodiscard]] bool empty() const;
/**
* @brief Gets the type of the object stored in the message.
*
* @return The type of the stored object as a std::type_index.
*/
[[nodiscard]] std::type_index type() const; [[nodiscard]] std::type_index type() const;
template<typename T> /**
[[nodiscard]] bool is() const { return type_ == std::type_index(typeid(T)); } * @brief Checks if the stored object is of the specified type.
*
* @tparam MessageType The type to check.
* @return True if the object matches the type, false otherwise.
*/
template<typename MessageType>
[[nodiscard]] bool is() const { return type_ == std::type_index(typeid(MessageType)); }
// Access as typed reference. Works for both owning and non-owning. /**
// Throws std::bad_cast if type mismatch. * @brief Accesses the stored object as a typed reference.
template<typename T> *
const T& get() const { * @tparam MessageType The expected type of the object.
if (!is<T>()) throw std::bad_cast(); * @return A constant reference to the stored object.
const void* p = msg_ptr(); * @throws std::bad_cast if the type does not match.
* @throws std::runtime_error if the message is empty.
*/
template<typename MessageType>
const MessageType& get() const {
if (!is<MessageType>()) throw std::bad_cast();
const void* p = raw_ptr();
if (!p) throw std::runtime_error("AnyMessage: empty pointer"); if (!p) throw std::runtime_error("AnyMessage: empty pointer");
return *static_cast<const T*>(p); return *static_cast<const MessageType*>(p);
} }
// Return the raw const void* pointer to the stored object (may be non-owning) /**
[[nodiscard]] const void* msg_ptr() const; * @brief Retrieves the raw pointer to the stored object.
*
* @return A const void* pointing to the stored object (may be non-owning).
*/
[[nodiscard]] const void* raw_ptr() const;
private: private:
const void* ptr_; // non-owning raw pointer (if set) const void* ptr_; // non-owning raw pointer (if set)
@ -75,22 +124,46 @@ private:
std::type_index type_{typeid(void)}; std::type_index type_{typeid(void)};
}; };
// Forward declare Subscription so MessageBus can return it. /**
* @brief Represents a subscription to a message_bus, providing RAII-style unsubscription.
*/
class subscription { class subscription {
public: public:
/**
* @brief Default constructor for an empty subscription.
*/
subscription() = default; subscription() = default;
/**
* @brief Constructs a subscription for a specific handler and message type.
*
* @param bus Pointer to the message bus managing the subscription.
* @param type The type_index of the message type.
* @param id The unique ID of the handler.
*/
subscription(message_bus* bus, std::type_index type, uint64_t id); subscription(message_bus* bus, std::type_index type, uint64_t id);
// Move-only (non-copyable) // Prohibit copying of subscription objects.
subscription(const subscription&) = delete; subscription(const subscription&) = delete;
subscription& operator=(const subscription&) = delete; subscription& operator=(const subscription&) = delete;
// Allow move semantics for subscription objects.
subscription(subscription&& other) noexcept; subscription(subscription&& other) noexcept;
subscription& operator=(subscription&& other) noexcept; subscription& operator=(subscription&& other) noexcept;
/**
* @brief Destructor that ensures cleanup by unsubscribing from the bus.
*/
~subscription(); ~subscription();
/**
* @brief Unsubscribes the handler from the message bus.
*/
void unsubscribe(); void unsubscribe();
/**
* @brief Checks if the subscription is still valid.
*
* @return True if the subscription is valid, false otherwise.
*/
[[nodiscard]] bool valid() const; [[nodiscard]] bool valid() const;
private: private:
@ -99,90 +172,139 @@ private:
uint64_t id_ = 0; uint64_t id_ = 0;
}; };
// === MessageBus: runtime-registered, thread-safe === /**
* @brief A thread-safe message bus for runtime message publishing and subscription handling.
*/
class message_bus { class message_bus {
public: public:
using HandlerId = uint64_t; using HandlerId = uint64_t; /**< Type alias for handler IDs. */
message_bus() : next_id_{1} {} /**
* @brief Constructs a new message_bus object.
*/
message_bus();
// Subscribe with a free function or lambda /**
template<typename T> * @brief Subscribes to messages of a specific type using a free function or lambda.
subscription subscribe(std::function<void(const T&)> handler, *
std::function<bool(const T&)> filter = nullptr) * @tparam MessageType The type of the message to subscribe to.
* @param handler The function to execute when a message of type T is published.
* @param filter An optional filter function to determine if the handler should be invoked.
* @return A subscription object to manage the handler's registration.
*/
template<typename MessageType>
subscription subscribe(std::function<void(const MessageType&)> handler,
std::function<bool(const MessageType&)> filter = nullptr)
{ {
auto id = next_id_.fetch_add(1, std::memory_order_relaxed); auto id = next_id_.fetch_add(1, std::memory_order_relaxed);
std::unique_lock writeLock(mutex_); std::unique_lock writeLock(mutex_);
auto &vec = handlers_[std::type_index(typeid(T))]; auto &vec = handlers_[std::type_index(typeid(MessageType))];
Entry e; Entry e;
e.id = id; e.id = id;
e.handler = [h = std::move(handler)](const void* p) { e.handler = [h = std::move(handler)](const void* p) {
h(*static_cast<const T*>(p)); h(*static_cast<const MessageType*>(p));
}; };
if (filter) { if (filter) {
e.filter = [f = std::move(filter)](const void* p) -> bool { e.filter = [f = std::move(filter)](const void* p) -> bool {
return f(*static_cast<const T*>(p)); return f(*static_cast<const MessageType*>(p));
}; };
} else { } else {
e.filter = nullptr; e.filter = nullptr;
} }
vec.emplace_back(std::move(e)); vec.emplace_back(std::move(e));
return {this, std::type_index(typeid(T)), id}; return {this, std::type_index(typeid(MessageType)), id};
} }
// Subscribe raw pointer instance; caller ensures lifetime /**
template<typename T, typename C> * @brief Subscribes to messages using a class member function bound to an instance.
subscription subscribe(C* instance, void (C::*memberFn)(const T&), *
std::function<bool(const T&)> filter = nullptr) * @tparam MessageType The type of the message to subscribe to.
* @tparam CallerClass The class of the instance.
* @param instance A pointer to the instance.
* @param memberFn A pointer to the member function to execute.
* @param filter An optional filter function.
* @return A subscription object to manage the handler's registration.
*/
template<typename MessageType, typename CallerClass>
subscription subscribe(CallerClass* instance, void (CallerClass::*memberFn)(const MessageType&),
std::function<bool(const MessageType&)> filter = nullptr)
{ {
auto fn = [instance, memberFn](const T& m) { (instance->*memberFn)(m); }; auto fn = [instance, memberFn](const MessageType& m) { (instance->*memberFn)(m); };
return subscribe<T>(std::function<void(const T&)>(fn), std::move(filter)); return subscribe<MessageType>(std::function<void(const MessageType&)>(fn), std::move(filter));
} }
// Subscribe shared_ptr instance (safe) /**
template<typename T, typename C> * @brief Subscribes to messages using a member function bound to a shared_ptr instance.
subscription subscribe(std::shared_ptr<C> instance, void (C::*memberFn)(const T&), *
std::function<bool(const T&)> filter = nullptr) * @tparam MessageType The type of the message to subscribe to.
* @tparam CallerClass The class of the shared_ptr instance.
* @param instance A shared pointer to the instance.
* @param memberFn A pointer to the member function to execute.
* @param filter An optional filter function.
* @return A subscription object to manage the handler's registration.
*/
template<typename MessageType, typename CallerClass>
subscription subscribe(std::shared_ptr<CallerClass> instance, void (CallerClass::*memberFn)(const MessageType&),
std::function<bool(const MessageType&)> filter = nullptr)
{ {
std::weak_ptr<C> w = instance; std::weak_ptr<CallerClass> w = instance;
auto handler = [w, memberFn](const T& m) { auto handler = [w, memberFn](const MessageType& m) {
if (auto s = w.lock()) { if (auto s = w.lock()) {
(s.get()->*memberFn)(m); (s.get()->*memberFn)(m);
} }
}; };
std::function<bool(const T&)> local_filter = nullptr; std::function<bool(const MessageType&)> local_filter = nullptr;
if (filter) { if (filter) {
local_filter = [w, filter = std::move(filter)](const T& m) -> bool { local_filter = [w, filter = std::move(filter)](const MessageType& m) -> bool {
if (w.expired()) { if (w.expired()) {
return false; return false;
} }
return filter(m); return filter(m);
}; };
} }
return subscribe<T>(std::move(handler), std::move(local_filter)); return subscribe<MessageType>(std::move(handler), std::move(local_filter));
} }
// Unsubscribe by type-specific id (RAII calls this) /**
template<typename T> * @brief Unsubscribes a handler from the bus using its type and ID.
*
* @tparam MessageType The type of the message.
* @param id The unique handler ID to unsubscribe.
*/
template<typename MessageType>
void unsubscribe(const HandlerId id) { void unsubscribe(const HandlerId id) {
unsubscribe(std::type_index(typeid(T)), id); unsubscribe(std::type_index(typeid(MessageType)), id);
} }
// Unsubscribe by explicit type_index and id /**
* @brief Unsubscribes a handler by its type index and ID.
*
* @param type The type_index of the message type.
* @param id The unique handler ID to unsubscribe.
*/
void unsubscribe(std::type_index type, HandlerId id); void unsubscribe(std::type_index type, HandlerId id);
// Unsubscribe by id regardless of type (O(#types) scan) /**
* @brief Unsubscribes a handler by its unique ID, regardless of type.
*
* @param id The unique handler ID to unsubscribe.
*/
void unsubscribe(HandlerId id); void unsubscribe(HandlerId id);
// Publish a typed message (fast) /**
template<typename T> * @brief Publishes a message of a specific type.
void publish(const T& msg) const { *
* @tparam MessageType The type of the message to publish.
* @param msg The message to publish.
*/
template<typename MessageType>
void publish(const MessageType& msg) const {
std::vector<Entry> snapshot; std::vector<Entry> snapshot;
{ {
std::shared_lock readLock(mutex_); std::shared_lock readLock(mutex_);
const auto it = handlers_.find(std::type_index(typeid(T))); const auto it = handlers_.find(std::type_index(typeid(MessageType)));
if (it == handlers_.end()) { if (it == handlers_.end()) {
return; return;
} }
@ -193,7 +315,11 @@ public:
} }
} }
// Publish using AnyMessage (zero-copy if AnyMessage is non-owning) /**
* @brief Publishes a generic message object.
*
* @param msg The message to publish.
*/
void publish(const message& msg) const; void publish(const message& msg) const;
private: private:
@ -210,7 +336,6 @@ private:
std::atomic<HandlerId> next_id_; std::atomic<HandlerId> next_id_;
}; };
// RAII unsubscribe implementation
inline void subscription::unsubscribe() { inline void subscription::unsubscribe() {
if (bus_) { if (bus_) {
bus_->unsubscribe(type_, id_); bus_->unsubscribe(type_, id_);

View File

@ -14,7 +14,7 @@ std::type_index message::type() const {
return type_; return type_;
} }
const void * message::msg_ptr() const { const void * message::raw_ptr() const {
return owner_ ? owner_.get() : ptr_; return owner_ ? owner_.get() : ptr_;
} }
@ -49,6 +49,9 @@ bool subscription::valid() const {
return bus_ != nullptr; return bus_ != nullptr;
} }
message_bus::message_bus()
: next_id_{1} {}
void message_bus::unsubscribe(const std::type_index type, HandlerId id) { void message_bus::unsubscribe(const std::type_index type, HandlerId id) {
std::unique_lock writeLock(mutex_); std::unique_lock writeLock(mutex_);
const auto it = handlers_.find(type); const auto it = handlers_.find(type);
@ -89,7 +92,7 @@ void message_bus::publish(const message &msg) const {
} }
snapshot = it->second; snapshot = it->second;
} }
const void* p = msg.msg_ptr(); const void* p = msg.raw_ptr();
for (const auto &e : snapshot) { for (const auto &e : snapshot) {
if (!e.filter || e.filter(p)) e.handler(p); if (!e.filter || e.filter(p)) e.handler(p);
} }

View File

@ -19,10 +19,10 @@ public:
explicit connection_statement_proxy(std::unique_ptr<statement_impl>&& stmt) explicit connection_statement_proxy(std::unique_ptr<statement_impl>&& stmt)
: statement_proxy(std::move(stmt)) {} : statement_proxy(std::move(stmt)) {}
utils::result<size_t, utils::error> execute(interface::parameter_binder& bindings) override { utils::result<size_t, utils::error> execute(parameter_binder& bindings) override {
return statement_->execute(bindings); return statement_->execute(bindings);
} }
utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(interface::parameter_binder& bindings) override { utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(parameter_binder& bindings) override {
return statement_->fetch(bindings); return statement_->fetch(bindings);
} }
}; };

View File

@ -6,11 +6,11 @@ statement_impl::statement_impl(query_context query)
: query_(std::move(query)) : query_(std::move(query))
{} {}
void statement_impl::bind(const size_t pos, const char *value, const size_t size, interface::parameter_binder& bindings) const { void statement_impl::bind(const size_t pos, const char *value, const size_t size, parameter_binder& bindings) const {
utils::data_type_traits<const char*>::bind_value(bindings, adjust_index(pos), value, size); utils::data_type_traits<const char*>::bind_value(bindings, adjust_index(pos), value, size);
} }
void statement_impl::bind(const size_t pos, std::string &val, const size_t size, interface::parameter_binder& bindings) const { void statement_impl::bind(const size_t pos, std::string &val, const size_t size, parameter_binder& bindings) const {
utils::data_type_traits<std::string>::bind_value(bindings, adjust_index(pos), val, size); utils::data_type_traits<std::string>::bind_value(bindings, adjust_index(pos), val, size);
} }

View File

@ -4,10 +4,10 @@ namespace matador::sql {
statement_proxy::statement_proxy(std::unique_ptr<statement_impl>&& stmt) statement_proxy::statement_proxy(std::unique_ptr<statement_impl>&& stmt)
: statement_(std::move(stmt)){} : statement_(std::move(stmt)){}
void statement_proxy::bind(const size_t pos, const char* value, const size_t size, interface::parameter_binder& bindings) const { void statement_proxy::bind(const size_t pos, const char* value, const size_t size, parameter_binder& bindings) const {
statement_->bind(pos, value, size, bindings); statement_->bind(pos, value, size, bindings);
} }
void statement_proxy::bind(const size_t pos, std::string& val, const size_t size, interface::parameter_binder& bindings) const { void statement_proxy::bind(const size_t pos, std::string& val, const size_t size, parameter_binder& bindings) const {
statement_->bind(pos, val, size, bindings); statement_->bind(pos, val, size, bindings);
} }

View File

@ -24,24 +24,26 @@ public:
size_t lock_attempts{0}; size_t lock_attempts{0};
}; };
explicit statement_cache_proxy(std::unique_ptr<statement_impl>&& stmt, connection_pool &pool, const size_t connection_id) statement_cache_proxy(utils::message_bus &bus, std::unique_ptr<statement_impl>&& stmt, connection_pool &pool, const size_t connection_id)
: statement_proxy(std::move(stmt)) : statement_proxy(std::move(stmt))
, pool_(pool) , pool_(pool)
, connection_id_(connection_id) {} , connection_id_(connection_id)
, bus_(bus) {}
utils::result<size_t, utils::error> execute(interface::parameter_binder& bindings) override { utils::result<size_t, utils::error> execute(parameter_binder& bindings) override {
execution_metrics metrics{std::chrono::steady_clock::now()}; execution_metrics metrics{std::chrono::steady_clock::now()};
auto result = try_with_retry([this, &bindings, &metrics]() -> utils::result<size_t, utils::error> { auto result = try_with_retry([this, &bindings, &metrics]() -> utils::result<size_t, utils::error> {
if (!try_lock()) { if (!try_lock()) {
++metrics.lock_attempts; ++metrics.lock_attempts;
bus_.publish<statement_lock_failed_event>({sql(), std::chrono::steady_clock::now(), metrics.lock_attempt_start});
return utils::failure(utils::error{ return utils::failure(utils::error{
error_code::STATEMENT_LOCKED, error_code::STATEMENT_LOCKED,
"Failed to execute statement because it is already in use" "Failed to execute statement because it is already in use"
}); });
} }
metrics.lock_acquired = std::chrono::steady_clock::now(); metrics.lock_acquired = std::chrono::steady_clock::now();
// publish bus_.publish<statement_lock_acquired_event>({sql(), std::chrono::steady_clock::now(), metrics.lock_attempt_start, metrics.lock_acquired});
auto guard = statement_guard(*this); auto guard = statement_guard(*this);
if (const auto conn = pool_.acquire(connection_id_); !conn.valid()) { if (const auto conn = pool_.acquire(connection_id_); !conn.valid()) {
@ -55,14 +57,15 @@ public:
auto execution_result = statement_->execute(bindings); auto execution_result = statement_->execute(bindings);
metrics.execution_end = std::chrono::steady_clock::now(); metrics.execution_end = std::chrono::steady_clock::now();
// publish bus_.publish<statement_execution_event>({sql(), std::chrono::steady_clock::now(), metrics.execution_start, metrics.execution_end});
return execution_result; return execution_result;
}); });
return result; return result;
} }
utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(interface::parameter_binder& bindings) override { utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(parameter_binder& bindings) override {
execution_metrics metrics{std::chrono::steady_clock::now()}; execution_metrics metrics{std::chrono::steady_clock::now()};
auto result = try_with_retry([this, &bindings, &metrics]() -> utils::result<std::unique_ptr<query_result_impl>, utils::error> { auto result = try_with_retry([this, &bindings, &metrics]() -> utils::result<std::unique_ptr<query_result_impl>, utils::error> {
@ -104,7 +107,7 @@ protected:
if (attempt + 1 < config_.max_attempts) { if (attempt + 1 < config_.max_attempts) {
std::this_thread::sleep_for(current_wait); std::this_thread::sleep_for(current_wait);
current_wait = std::min(current_wait * 2, config_.max_wait); current_wait = (std::min)(current_wait * 2, config_.max_wait);
} }
} }
@ -132,6 +135,7 @@ private:
connection_pool &pool_; connection_pool &pool_;
size_t connection_id_{}; size_t connection_id_{};
retry_config config_{}; retry_config config_{};
utils::message_bus &bus_;
}; };
} }
@ -178,7 +182,7 @@ utils::result<statement, utils::error> statement_cache::acquire(const query_cont
const auto it = cache_map_.insert({ const auto it = cache_map_.insert({
key, key,
{statement{ {statement{
std::make_shared<internal::statement_cache_proxy>(std::move(stmt), pool_, id)}, std::make_shared<internal::statement_cache_proxy>(bus_, std::move(stmt), pool_, id)},
std::chrono::steady_clock::now(), std::chrono::steady_clock::now(),
usage_list_.begin()} usage_list_.begin()}
}).first; }).first;

View File

@ -2,6 +2,7 @@
#include "test_result_reader.hpp" #include "test_result_reader.hpp"
#include "test_parameter_binder.hpp" #include "test_parameter_binder.hpp"
#include <chrono>
#include <random> #include <random>
#include <thread> #include <thread>
@ -9,7 +10,7 @@ namespace matador::test::orm {
test_statement::test_statement(const sql::query_context &query) test_statement::test_statement(const sql::query_context &query)
: statement_impl(query) {} : statement_impl(query) {}
utils::result<size_t, utils::error> test_statement::execute(const sql::interface::parameter_binder &/*bindings*/) { utils::result<size_t, utils::error> test_statement::execute(const sql::parameter_binder &/*bindings*/) {
using namespace std::chrono_literals; using namespace std::chrono_literals;
std::mt19937 rng(query_.sql.size()); std::mt19937 rng(query_.sql.size());
std::uniform_int_distribution dist(10, 40); std::uniform_int_distribution dist(10, 40);
@ -17,7 +18,7 @@ utils::result<size_t, utils::error> test_statement::execute(const sql::interface
return utils::ok(static_cast<size_t>(8)); return utils::ok(static_cast<size_t>(8));
} }
utils::result<std::unique_ptr<sql::query_result_impl>, utils::error> test_statement::fetch(const sql::interface::parameter_binder &/*bindings*/) { utils::result<std::unique_ptr<sql::query_result_impl>, utils::error> test_statement::fetch(const sql::parameter_binder &/*bindings*/) {
return utils::ok(std::make_unique<sql::query_result_impl>(std::make_unique<test_result_reader>(), query_.prototype, query_.prototype.size())); return utils::ok(std::make_unique<sql::query_result_impl>(std::make_unique<test_result_reader>(), query_.prototype, query_.prototype.size()));
} }

View File

@ -8,8 +8,8 @@ namespace matador::test::orm {
class test_statement final : public sql::statement_impl { class test_statement final : public sql::statement_impl {
public: public:
explicit test_statement(const sql::query_context &query); explicit test_statement(const sql::query_context &query);
utils::result<size_t, utils::error> execute(const sql::interface::parameter_binder &bindings) override; utils::result<size_t, utils::error> execute(const sql::parameter_binder &bindings) override;
utils::result<std::unique_ptr<sql::query_result_impl>, utils::error> fetch(const sql::interface::parameter_binder &bindings) override; utils::result<std::unique_ptr<sql::query_result_impl>, utils::error> fetch(const sql::parameter_binder &bindings) override;
void reset() override; void reset() override;
protected: protected:

View File

@ -22,54 +22,57 @@ using namespace matador::sql;
using namespace matador::query; using namespace matador::query;
using namespace matador::utils; using namespace matador::utils;
// Beispiel für einen Observer, der die Metriken auswertet class MetricsObserver {
// class MetricsObserver : public statement_cache_observer_interface { public:
// public: MetricsObserver(message_bus &bus) {
// void on_event(const statement_cache_event& evt) override {
// std::lock_guard<std::mutex> lock(mutex_); }
// void on_event(const statement_cache_event& evt) override {
// switch (evt.type) { std::lock_guard<std::mutex> lock(mutex_);
// case statement_cache_event::Type::LockFailed:
// lock_failure_count_++; switch (evt.type) {
// if (evt.duration) { case statement_cache_event::Type::LockFailed:
// total_lock_wait_time_ += evt.duration.value(); lock_failure_count_++;
// } if (evt.duration) {
// break; total_lock_wait_time_ += evt.duration.value();
// }
// case statement_cache_event::Type::ExecutionEnded: break;
// execution_count_++;
// if (evt.duration) { case statement_cache_event::Type::ExecutionEnded:
// total_execution_time_ += evt.duration.value(); execution_count_++;
// } if (evt.duration) {
// break; total_execution_time_ += evt.duration.value();
// } }
// } break;
// }
// // Metrik-Zugriffsmethoden }
// double get_average_lock_wait_time() const {
// std::lock_guard<std::mutex> lock(mutex_); // Metrik-Zugriffsmethoden
// if (lock_failure_count_ == 0) return 0.0; double get_average_lock_wait_time() const {
// return std::chrono::duration<double>(total_lock_wait_time_).count() / lock_failure_count_; std::lock_guard lock(mutex_);
// } if (lock_failure_count_ == 0) return 0.0;
// return std::chrono::duration<double>(total_lock_wait_time_).count() / lock_failure_count_;
// double get_average_execution_time() const { }
// std::lock_guard<std::mutex> lock(mutex_);
// if (execution_count_ == 0) return 0.0; double get_average_execution_time() const {
// return std::chrono::duration<double>(total_execution_time_).count() / execution_count_; std::lock_guard lock(mutex_);
// } if (execution_count_ == 0) return 0.0;
// return std::chrono::duration<double>(total_execution_time_).count() / execution_count_;
// size_t get_lock_failure_count() const { }
// std::lock_guard<std::mutex> lock(mutex_);
// return lock_failure_count_; size_t get_lock_failure_count() const {
// } std::lock_guard lock(mutex_);
// return lock_failure_count_;
// private: }
// mutable std::mutex mutex_;
// size_t lock_failure_count_{0}; private:
// size_t execution_count_{0}; mutable std::mutex mutex_;
// std::chrono::nanoseconds total_lock_wait_time_{0}; size_t lock_failure_count_{0};
// std::chrono::nanoseconds total_execution_time_{0}; size_t execution_count_{0};
// }; std::chrono::nanoseconds total_lock_wait_time_{0};
std::chrono::nanoseconds total_execution_time_{0};
};
// void example_with_metrics() { // void example_with_metrics() {
// connection_pool pool("noop://noop.db", 4); // connection_pool pool("noop://noop.db", 4);
// statement_cache cache(pool, 5); // statement_cache cache(pool, 5);
@ -304,9 +307,10 @@ TEST_CASE("Race condition simulation with mixed access", "[statement_cache][race
auto task = [&](int /*id*/) { auto task = [&](int /*id*/) {
for (int i = 0; i < operations; ++i) { for (int i = 0; i < operations; ++i) {
auto sql = "SELECT " + std::to_string(i % 10); const auto sql = "SELECT " + std::to_string(i % 10);
auto result = cache.acquire({sql}); if (const auto result = cache.acquire({sql}); !result) {
REQUIRE(result); FAIL("Statement should not be available");
}
// if (i % 50 == 0) { // if (i % 50 == 0) {
// cache.cleanup_expired_connections(); // cache.cleanup_expired_connections();