Compare commits
No commits in common. "01d7179604e233630c001de0fff60851ced3f354" and "2e2fcd01b69d904225798d0d89837b79a5e479e0" have entirely different histories.
01d7179604
...
2e2fcd01b6
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
#include "matador/utils/attribute_writer.hpp"
|
||||
|
||||
namespace matador::sql {
|
||||
namespace matador::sql::interface {
|
||||
|
||||
using parameter_binder = utils::attribute_writer;
|
||||
|
||||
|
|
|
|||
|
|
@ -22,23 +22,23 @@ protected:
|
|||
public:
|
||||
virtual ~statement_impl() = default;
|
||||
|
||||
virtual utils::result<size_t, utils::error> execute(const parameter_binder& bindings) = 0;
|
||||
virtual utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(const parameter_binder& bindings) = 0;
|
||||
virtual utils::result<size_t, utils::error> execute(const interface::parameter_binder& bindings) = 0;
|
||||
virtual utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(const interface::parameter_binder& bindings) = 0;
|
||||
|
||||
template < class Type >
|
||||
void bind_object(Type &obj, parameter_binder& bindings) {
|
||||
void bind_object(Type &obj, interface::parameter_binder& bindings) {
|
||||
object_parameter_binder object_binder_;
|
||||
object_binder_.reset(start_index());
|
||||
object_binder_.bind(obj, bindings);
|
||||
}
|
||||
|
||||
template < class Type >
|
||||
void bind(const size_t pos, Type &val, parameter_binder& bindings) {
|
||||
void bind(const size_t pos, Type &val, interface::parameter_binder& bindings) {
|
||||
utils::data_type_traits<Type>::bind_value(bindings, adjust_index(pos), val);
|
||||
}
|
||||
|
||||
void bind(size_t pos, const char *value, size_t size, parameter_binder& bindings) const;
|
||||
void bind(size_t pos, std::string &val, size_t size, parameter_binder& bindings) const;
|
||||
void bind(size_t pos, const char *value, size_t size, interface::parameter_binder& bindings) const;
|
||||
void bind(size_t pos, std::string &val, size_t size, interface::parameter_binder& bindings) const;
|
||||
|
||||
virtual void reset() = 0;
|
||||
|
||||
|
|
|
|||
|
|
@ -12,19 +12,19 @@ protected:
|
|||
public:
|
||||
virtual ~statement_proxy() = default;
|
||||
|
||||
virtual utils::result<size_t, utils::error> execute(parameter_binder& bindings) = 0;
|
||||
virtual utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(parameter_binder& bindings) = 0;
|
||||
virtual utils::result<size_t, utils::error> execute(interface::parameter_binder& bindings) = 0;
|
||||
virtual utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(interface::parameter_binder& bindings) = 0;
|
||||
|
||||
template<class Type>
|
||||
void bind(const Type &obj, parameter_binder& bindings) {
|
||||
void bind(const Type &obj, interface::parameter_binder& bindings) {
|
||||
statement_->bind_object(obj, bindings);
|
||||
}
|
||||
template<typename Type>
|
||||
void bind(size_t pos, Type &value, parameter_binder& bindings) {
|
||||
void bind(size_t pos, Type &value, interface::parameter_binder& bindings) {
|
||||
statement_->bind(pos, value, bindings);
|
||||
}
|
||||
void bind(size_t pos, const char *value, size_t size, parameter_binder& bindings) const;
|
||||
void bind(size_t pos, std::string &val, size_t size, parameter_binder& bindings) const;
|
||||
void bind(size_t pos, const char *value, size_t size, interface::parameter_binder& bindings) const;
|
||||
void bind(size_t pos, std::string &val, size_t size, interface::parameter_binder& bindings) const;
|
||||
|
||||
void reset() const;
|
||||
|
||||
|
|
|
|||
|
|
@ -23,19 +23,6 @@ struct statement_accessed_event : statement_event {};
|
|||
struct statement_added_event : statement_event {};
|
||||
struct statement_evicted_event : statement_event {};
|
||||
|
||||
struct statement_lock_failed_event : statement_event {
|
||||
std::chrono::steady_clock::time_point lock_attempt_start;
|
||||
};
|
||||
|
||||
struct statement_lock_acquired_event : statement_event {
|
||||
std::chrono::steady_clock::time_point lock_attempt_start;
|
||||
std::chrono::steady_clock::time_point lock_attempt_end;
|
||||
};
|
||||
|
||||
struct statement_execution_event : statement_event {
|
||||
std::chrono::steady_clock::time_point execution_start;
|
||||
std::chrono::steady_clock::time_point execution_end;
|
||||
};
|
||||
|
||||
struct statement_cache_config {
|
||||
size_t max_size;
|
||||
|
|
|
|||
|
|
@ -14,109 +14,60 @@
|
|||
#include <stdexcept>
|
||||
|
||||
namespace matador::utils {
|
||||
class message_bus;
|
||||
class message_bus; // forward
|
||||
|
||||
/**
|
||||
* @brief Represents a generic, type-safe message object used for communication.
|
||||
*/
|
||||
class message {
|
||||
public:
|
||||
/**
|
||||
* @brief Default constructor for an empty message.
|
||||
*/
|
||||
message();
|
||||
|
||||
/**
|
||||
* @brief Constructs an owning message from a value.
|
||||
*
|
||||
* @tparam MessageType The type of the message.
|
||||
* @param msg The message to be stored. A copy is made.
|
||||
*/
|
||||
template<typename MessageType>
|
||||
explicit message(const MessageType& msg) {
|
||||
auto strong = std::make_shared<MessageType>(msg); // allocate
|
||||
// Owning: copy into an owned shared_ptr (may allocate)
|
||||
template<typename T>
|
||||
explicit message(const T& value) {
|
||||
auto strong = std::make_shared<T>(value); // allocate
|
||||
owner_ = std::static_pointer_cast<void>(strong); // share ownership as void*
|
||||
ptr_ = static_cast<const void*>(strong.get());
|
||||
type_ = std::type_index(typeid(MessageType));
|
||||
type_ = std::type_index(typeid(T));
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Constructs a non-owning message from a reference.
|
||||
*
|
||||
* @tparam MessageType The type of the reference.
|
||||
* @param msg A reference to the message data. The caller must ensure its lifetime during use.
|
||||
* @return A non-owning message wrapping the reference.
|
||||
*/
|
||||
template<typename MessageType>
|
||||
static message from_ref(const MessageType& msg) {
|
||||
// Non-owning: zero-copy reference to an existing object (no allocation)
|
||||
// Caller must guarantee lifetime of `ref` for the usage duration.
|
||||
template<typename T>
|
||||
static message from_ref(const T& ref) {
|
||||
message m;
|
||||
m.ptr_ = static_cast<const void*>(std::addressof(msg));
|
||||
m.type_ = std::type_index(typeid(MessageType));
|
||||
m.ptr_ = static_cast<const void*>(std::addressof(ref));
|
||||
m.type_ = std::type_index(typeid(T));
|
||||
// owner_ remains null => non-owning
|
||||
return m;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Constructs an owning message from a shared pointer.
|
||||
*
|
||||
* @tparam MessageType The type of the object managed by the shared pointer.
|
||||
* @param msg A shared pointer to the message.
|
||||
* @return A message that shares ownership of the object.
|
||||
*/
|
||||
template<typename MessageType>
|
||||
static message from_shared(std::shared_ptr<MessageType> msg) {
|
||||
// Owning from shared_ptr (no extra copy; shares ownership)
|
||||
template<typename T>
|
||||
static message from_shared(std::shared_ptr<T> sp) {
|
||||
message m;
|
||||
m.owner_ = std::static_pointer_cast<void>(msg);
|
||||
m.ptr_ = static_cast<const void*>(msg.get());
|
||||
m.type_ = std::type_index(typeid(MessageType));
|
||||
m.owner_ = std::static_pointer_cast<void>(sp);
|
||||
m.ptr_ = static_cast<const void*>(sp.get());
|
||||
m.type_ = std::type_index(typeid(T));
|
||||
return m;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Checks if the message is empty.
|
||||
*
|
||||
* @return True if the message is empty, false otherwise.
|
||||
*/
|
||||
[[nodiscard]] bool empty() const;
|
||||
|
||||
/**
|
||||
* @brief Gets the type of the object stored in the message.
|
||||
*
|
||||
* @return The type of the stored object as a std::type_index.
|
||||
*/
|
||||
[[nodiscard]] std::type_index type() const;
|
||||
|
||||
/**
|
||||
* @brief Checks if the stored object is of the specified type.
|
||||
*
|
||||
* @tparam MessageType The type to check.
|
||||
* @return True if the object matches the type, false otherwise.
|
||||
*/
|
||||
template<typename MessageType>
|
||||
[[nodiscard]] bool is() const { return type_ == std::type_index(typeid(MessageType)); }
|
||||
template<typename T>
|
||||
[[nodiscard]] bool is() const { return type_ == std::type_index(typeid(T)); }
|
||||
|
||||
/**
|
||||
* @brief Accesses the stored object as a typed reference.
|
||||
*
|
||||
* @tparam MessageType The expected type of the object.
|
||||
* @return A constant reference to the stored object.
|
||||
* @throws std::bad_cast if the type does not match.
|
||||
* @throws std::runtime_error if the message is empty.
|
||||
*/
|
||||
template<typename MessageType>
|
||||
const MessageType& get() const {
|
||||
if (!is<MessageType>()) throw std::bad_cast();
|
||||
const void* p = raw_ptr();
|
||||
// Access as typed reference. Works for both owning and non-owning.
|
||||
// Throws std::bad_cast if type mismatch.
|
||||
template<typename T>
|
||||
const T& get() const {
|
||||
if (!is<T>()) throw std::bad_cast();
|
||||
const void* p = msg_ptr();
|
||||
if (!p) throw std::runtime_error("AnyMessage: empty pointer");
|
||||
return *static_cast<const MessageType*>(p);
|
||||
return *static_cast<const T*>(p);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Retrieves the raw pointer to the stored object.
|
||||
*
|
||||
* @return A const void* pointing to the stored object (may be non-owning).
|
||||
*/
|
||||
[[nodiscard]] const void* raw_ptr() const;
|
||||
// Return the raw const void* pointer to the stored object (may be non-owning)
|
||||
[[nodiscard]] const void* msg_ptr() const;
|
||||
|
||||
private:
|
||||
const void* ptr_; // non-owning raw pointer (if set)
|
||||
|
|
@ -124,46 +75,22 @@ private:
|
|||
std::type_index type_{typeid(void)};
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Represents a subscription to a message_bus, providing RAII-style unsubscription.
|
||||
*/
|
||||
// Forward declare Subscription so MessageBus can return it.
|
||||
class subscription {
|
||||
public:
|
||||
/**
|
||||
* @brief Default constructor for an empty subscription.
|
||||
*/
|
||||
subscription() = default;
|
||||
/**
|
||||
* @brief Constructs a subscription for a specific handler and message type.
|
||||
*
|
||||
* @param bus Pointer to the message bus managing the subscription.
|
||||
* @param type The type_index of the message type.
|
||||
* @param id The unique ID of the handler.
|
||||
*/
|
||||
subscription(message_bus* bus, std::type_index type, uint64_t id);
|
||||
|
||||
// Prohibit copying of subscription objects.
|
||||
// Move-only (non-copyable)
|
||||
subscription(const subscription&) = delete;
|
||||
subscription& operator=(const subscription&) = delete;
|
||||
// Allow move semantics for subscription objects.
|
||||
subscription(subscription&& other) noexcept;
|
||||
subscription& operator=(subscription&& other) noexcept;
|
||||
|
||||
/**
|
||||
* @brief Destructor that ensures cleanup by unsubscribing from the bus.
|
||||
*/
|
||||
~subscription();
|
||||
|
||||
/**
|
||||
* @brief Unsubscribes the handler from the message bus.
|
||||
*/
|
||||
void unsubscribe();
|
||||
|
||||
/**
|
||||
* @brief Checks if the subscription is still valid.
|
||||
*
|
||||
* @return True if the subscription is valid, false otherwise.
|
||||
*/
|
||||
[[nodiscard]] bool valid() const;
|
||||
|
||||
private:
|
||||
|
|
@ -172,139 +99,90 @@ private:
|
|||
uint64_t id_ = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief A thread-safe message bus for runtime message publishing and subscription handling.
|
||||
*/
|
||||
// === MessageBus: runtime-registered, thread-safe ===
|
||||
class message_bus {
|
||||
public:
|
||||
using HandlerId = uint64_t; /**< Type alias for handler IDs. */
|
||||
using HandlerId = uint64_t;
|
||||
|
||||
/**
|
||||
* @brief Constructs a new message_bus object.
|
||||
*/
|
||||
message_bus();
|
||||
message_bus() : next_id_{1} {}
|
||||
|
||||
/**
|
||||
* @brief Subscribes to messages of a specific type using a free function or lambda.
|
||||
*
|
||||
* @tparam MessageType The type of the message to subscribe to.
|
||||
* @param handler The function to execute when a message of type T is published.
|
||||
* @param filter An optional filter function to determine if the handler should be invoked.
|
||||
* @return A subscription object to manage the handler's registration.
|
||||
*/
|
||||
template<typename MessageType>
|
||||
subscription subscribe(std::function<void(const MessageType&)> handler,
|
||||
std::function<bool(const MessageType&)> filter = nullptr)
|
||||
// Subscribe with a free function or lambda
|
||||
template<typename T>
|
||||
subscription subscribe(std::function<void(const T&)> handler,
|
||||
std::function<bool(const T&)> filter = nullptr)
|
||||
{
|
||||
auto id = next_id_.fetch_add(1, std::memory_order_relaxed);
|
||||
std::unique_lock writeLock(mutex_);
|
||||
|
||||
auto &vec = handlers_[std::type_index(typeid(MessageType))];
|
||||
auto &vec = handlers_[std::type_index(typeid(T))];
|
||||
Entry e;
|
||||
e.id = id;
|
||||
e.handler = [h = std::move(handler)](const void* p) {
|
||||
h(*static_cast<const MessageType*>(p));
|
||||
h(*static_cast<const T*>(p));
|
||||
};
|
||||
if (filter) {
|
||||
e.filter = [f = std::move(filter)](const void* p) -> bool {
|
||||
return f(*static_cast<const MessageType*>(p));
|
||||
return f(*static_cast<const T*>(p));
|
||||
};
|
||||
} else {
|
||||
e.filter = nullptr;
|
||||
}
|
||||
vec.emplace_back(std::move(e));
|
||||
return {this, std::type_index(typeid(MessageType)), id};
|
||||
return {this, std::type_index(typeid(T)), id};
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Subscribes to messages using a class member function bound to an instance.
|
||||
*
|
||||
* @tparam MessageType The type of the message to subscribe to.
|
||||
* @tparam CallerClass The class of the instance.
|
||||
* @param instance A pointer to the instance.
|
||||
* @param memberFn A pointer to the member function to execute.
|
||||
* @param filter An optional filter function.
|
||||
* @return A subscription object to manage the handler's registration.
|
||||
*/
|
||||
template<typename MessageType, typename CallerClass>
|
||||
subscription subscribe(CallerClass* instance, void (CallerClass::*memberFn)(const MessageType&),
|
||||
std::function<bool(const MessageType&)> filter = nullptr)
|
||||
// Subscribe raw pointer instance; caller ensures lifetime
|
||||
template<typename T, typename C>
|
||||
subscription subscribe(C* instance, void (C::*memberFn)(const T&),
|
||||
std::function<bool(const T&)> filter = nullptr)
|
||||
{
|
||||
auto fn = [instance, memberFn](const MessageType& m) { (instance->*memberFn)(m); };
|
||||
return subscribe<MessageType>(std::function<void(const MessageType&)>(fn), std::move(filter));
|
||||
auto fn = [instance, memberFn](const T& m) { (instance->*memberFn)(m); };
|
||||
return subscribe<T>(std::function<void(const T&)>(fn), std::move(filter));
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Subscribes to messages using a member function bound to a shared_ptr instance.
|
||||
*
|
||||
* @tparam MessageType The type of the message to subscribe to.
|
||||
* @tparam CallerClass The class of the shared_ptr instance.
|
||||
* @param instance A shared pointer to the instance.
|
||||
* @param memberFn A pointer to the member function to execute.
|
||||
* @param filter An optional filter function.
|
||||
* @return A subscription object to manage the handler's registration.
|
||||
*/
|
||||
template<typename MessageType, typename CallerClass>
|
||||
subscription subscribe(std::shared_ptr<CallerClass> instance, void (CallerClass::*memberFn)(const MessageType&),
|
||||
std::function<bool(const MessageType&)> filter = nullptr)
|
||||
// Subscribe shared_ptr instance (safe)
|
||||
template<typename T, typename C>
|
||||
subscription subscribe(std::shared_ptr<C> instance, void (C::*memberFn)(const T&),
|
||||
std::function<bool(const T&)> filter = nullptr)
|
||||
{
|
||||
std::weak_ptr<CallerClass> w = instance;
|
||||
auto handler = [w, memberFn](const MessageType& m) {
|
||||
std::weak_ptr<C> w = instance;
|
||||
auto handler = [w, memberFn](const T& m) {
|
||||
if (auto s = w.lock()) {
|
||||
(s.get()->*memberFn)(m);
|
||||
}
|
||||
};
|
||||
|
||||
std::function<bool(const MessageType&)> local_filter = nullptr;
|
||||
std::function<bool(const T&)> local_filter = nullptr;
|
||||
if (filter) {
|
||||
local_filter = [w, filter = std::move(filter)](const MessageType& m) -> bool {
|
||||
local_filter = [w, filter = std::move(filter)](const T& m) -> bool {
|
||||
if (w.expired()) {
|
||||
return false;
|
||||
}
|
||||
return filter(m);
|
||||
};
|
||||
}
|
||||
return subscribe<MessageType>(std::move(handler), std::move(local_filter));
|
||||
return subscribe<T>(std::move(handler), std::move(local_filter));
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Unsubscribes a handler from the bus using its type and ID.
|
||||
*
|
||||
* @tparam MessageType The type of the message.
|
||||
* @param id The unique handler ID to unsubscribe.
|
||||
*/
|
||||
template<typename MessageType>
|
||||
// Unsubscribe by type-specific id (RAII calls this)
|
||||
template<typename T>
|
||||
void unsubscribe(const HandlerId id) {
|
||||
unsubscribe(std::type_index(typeid(MessageType)), id);
|
||||
unsubscribe(std::type_index(typeid(T)), id);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Unsubscribes a handler by its type index and ID.
|
||||
*
|
||||
* @param type The type_index of the message type.
|
||||
* @param id The unique handler ID to unsubscribe.
|
||||
*/
|
||||
// Unsubscribe by explicit type_index and id
|
||||
void unsubscribe(std::type_index type, HandlerId id);
|
||||
|
||||
/**
|
||||
* @brief Unsubscribes a handler by its unique ID, regardless of type.
|
||||
*
|
||||
* @param id The unique handler ID to unsubscribe.
|
||||
*/
|
||||
// Unsubscribe by id regardless of type (O(#types) scan)
|
||||
void unsubscribe(HandlerId id);
|
||||
|
||||
/**
|
||||
* @brief Publishes a message of a specific type.
|
||||
*
|
||||
* @tparam MessageType The type of the message to publish.
|
||||
* @param msg The message to publish.
|
||||
*/
|
||||
template<typename MessageType>
|
||||
void publish(const MessageType& msg) const {
|
||||
// Publish a typed message (fast)
|
||||
template<typename T>
|
||||
void publish(const T& msg) const {
|
||||
std::vector<Entry> snapshot;
|
||||
{
|
||||
std::shared_lock readLock(mutex_);
|
||||
const auto it = handlers_.find(std::type_index(typeid(MessageType)));
|
||||
const auto it = handlers_.find(std::type_index(typeid(T)));
|
||||
if (it == handlers_.end()) {
|
||||
return;
|
||||
}
|
||||
|
|
@ -315,11 +193,7 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Publishes a generic message object.
|
||||
*
|
||||
* @param msg The message to publish.
|
||||
*/
|
||||
// Publish using AnyMessage (zero-copy if AnyMessage is non-owning)
|
||||
void publish(const message& msg) const;
|
||||
|
||||
private:
|
||||
|
|
@ -336,6 +210,7 @@ private:
|
|||
std::atomic<HandlerId> next_id_;
|
||||
};
|
||||
|
||||
// RAII unsubscribe implementation
|
||||
inline void subscription::unsubscribe() {
|
||||
if (bus_) {
|
||||
bus_->unsubscribe(type_, id_);
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ std::type_index message::type() const {
|
|||
return type_;
|
||||
}
|
||||
|
||||
const void * message::raw_ptr() const {
|
||||
const void * message::msg_ptr() const {
|
||||
return owner_ ? owner_.get() : ptr_;
|
||||
}
|
||||
|
||||
|
|
@ -49,9 +49,6 @@ bool subscription::valid() const {
|
|||
return bus_ != nullptr;
|
||||
}
|
||||
|
||||
message_bus::message_bus()
|
||||
: next_id_{1} {}
|
||||
|
||||
void message_bus::unsubscribe(const std::type_index type, HandlerId id) {
|
||||
std::unique_lock writeLock(mutex_);
|
||||
const auto it = handlers_.find(type);
|
||||
|
|
@ -92,7 +89,7 @@ void message_bus::publish(const message &msg) const {
|
|||
}
|
||||
snapshot = it->second;
|
||||
}
|
||||
const void* p = msg.raw_ptr();
|
||||
const void* p = msg.msg_ptr();
|
||||
for (const auto &e : snapshot) {
|
||||
if (!e.filter || e.filter(p)) e.handler(p);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,10 +19,10 @@ public:
|
|||
explicit connection_statement_proxy(std::unique_ptr<statement_impl>&& stmt)
|
||||
: statement_proxy(std::move(stmt)) {}
|
||||
|
||||
utils::result<size_t, utils::error> execute(parameter_binder& bindings) override {
|
||||
utils::result<size_t, utils::error> execute(interface::parameter_binder& bindings) override {
|
||||
return statement_->execute(bindings);
|
||||
}
|
||||
utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(parameter_binder& bindings) override {
|
||||
utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(interface::parameter_binder& bindings) override {
|
||||
return statement_->fetch(bindings);
|
||||
}
|
||||
};
|
||||
|
|
|
|||
|
|
@ -6,11 +6,11 @@ statement_impl::statement_impl(query_context query)
|
|||
: query_(std::move(query))
|
||||
{}
|
||||
|
||||
void statement_impl::bind(const size_t pos, const char *value, const size_t size, parameter_binder& bindings) const {
|
||||
void statement_impl::bind(const size_t pos, const char *value, const size_t size, interface::parameter_binder& bindings) const {
|
||||
utils::data_type_traits<const char*>::bind_value(bindings, adjust_index(pos), value, size);
|
||||
}
|
||||
|
||||
void statement_impl::bind(const size_t pos, std::string &val, const size_t size, parameter_binder& bindings) const {
|
||||
void statement_impl::bind(const size_t pos, std::string &val, const size_t size, interface::parameter_binder& bindings) const {
|
||||
utils::data_type_traits<std::string>::bind_value(bindings, adjust_index(pos), val, size);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,10 +4,10 @@ namespace matador::sql {
|
|||
statement_proxy::statement_proxy(std::unique_ptr<statement_impl>&& stmt)
|
||||
: statement_(std::move(stmt)){}
|
||||
|
||||
void statement_proxy::bind(const size_t pos, const char* value, const size_t size, parameter_binder& bindings) const {
|
||||
void statement_proxy::bind(const size_t pos, const char* value, const size_t size, interface::parameter_binder& bindings) const {
|
||||
statement_->bind(pos, value, size, bindings);
|
||||
}
|
||||
void statement_proxy::bind(const size_t pos, std::string& val, const size_t size, parameter_binder& bindings) const {
|
||||
void statement_proxy::bind(const size_t pos, std::string& val, const size_t size, interface::parameter_binder& bindings) const {
|
||||
statement_->bind(pos, val, size, bindings);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -24,26 +24,24 @@ public:
|
|||
size_t lock_attempts{0};
|
||||
};
|
||||
|
||||
statement_cache_proxy(utils::message_bus &bus, std::unique_ptr<statement_impl>&& stmt, connection_pool &pool, const size_t connection_id)
|
||||
explicit statement_cache_proxy(std::unique_ptr<statement_impl>&& stmt, connection_pool &pool, const size_t connection_id)
|
||||
: statement_proxy(std::move(stmt))
|
||||
, pool_(pool)
|
||||
, connection_id_(connection_id)
|
||||
, bus_(bus) {}
|
||||
, connection_id_(connection_id) {}
|
||||
|
||||
utils::result<size_t, utils::error> execute(parameter_binder& bindings) override {
|
||||
utils::result<size_t, utils::error> execute(interface::parameter_binder& bindings) override {
|
||||
execution_metrics metrics{std::chrono::steady_clock::now()};
|
||||
|
||||
auto result = try_with_retry([this, &bindings, &metrics]() -> utils::result<size_t, utils::error> {
|
||||
if (!try_lock()) {
|
||||
++metrics.lock_attempts;
|
||||
bus_.publish<statement_lock_failed_event>({sql(), std::chrono::steady_clock::now(), metrics.lock_attempt_start});
|
||||
return utils::failure(utils::error{
|
||||
error_code::STATEMENT_LOCKED,
|
||||
"Failed to execute statement because it is already in use"
|
||||
});
|
||||
}
|
||||
metrics.lock_acquired = std::chrono::steady_clock::now();
|
||||
bus_.publish<statement_lock_acquired_event>({sql(), std::chrono::steady_clock::now(), metrics.lock_attempt_start, metrics.lock_acquired});
|
||||
// publish
|
||||
|
||||
auto guard = statement_guard(*this);
|
||||
if (const auto conn = pool_.acquire(connection_id_); !conn.valid()) {
|
||||
|
|
@ -57,15 +55,14 @@ public:
|
|||
auto execution_result = statement_->execute(bindings);
|
||||
metrics.execution_end = std::chrono::steady_clock::now();
|
||||
|
||||
bus_.publish<statement_execution_event>({sql(), std::chrono::steady_clock::now(), metrics.execution_start, metrics.execution_end});
|
||||
|
||||
// publish
|
||||
return execution_result;
|
||||
});
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(parameter_binder& bindings) override {
|
||||
utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(interface::parameter_binder& bindings) override {
|
||||
execution_metrics metrics{std::chrono::steady_clock::now()};
|
||||
|
||||
auto result = try_with_retry([this, &bindings, &metrics]() -> utils::result<std::unique_ptr<query_result_impl>, utils::error> {
|
||||
|
|
@ -107,7 +104,7 @@ protected:
|
|||
|
||||
if (attempt + 1 < config_.max_attempts) {
|
||||
std::this_thread::sleep_for(current_wait);
|
||||
current_wait = (std::min)(current_wait * 2, config_.max_wait);
|
||||
current_wait = std::min(current_wait * 2, config_.max_wait);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -135,7 +132,6 @@ private:
|
|||
connection_pool &pool_;
|
||||
size_t connection_id_{};
|
||||
retry_config config_{};
|
||||
utils::message_bus &bus_;
|
||||
};
|
||||
|
||||
}
|
||||
|
|
@ -182,7 +178,7 @@ utils::result<statement, utils::error> statement_cache::acquire(const query_cont
|
|||
const auto it = cache_map_.insert({
|
||||
key,
|
||||
{statement{
|
||||
std::make_shared<internal::statement_cache_proxy>(bus_, std::move(stmt), pool_, id)},
|
||||
std::make_shared<internal::statement_cache_proxy>(std::move(stmt), pool_, id)},
|
||||
std::chrono::steady_clock::now(),
|
||||
usage_list_.begin()}
|
||||
}).first;
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@
|
|||
#include "test_result_reader.hpp"
|
||||
#include "test_parameter_binder.hpp"
|
||||
|
||||
#include <chrono>
|
||||
#include <random>
|
||||
#include <thread>
|
||||
|
||||
|
|
@ -10,7 +9,7 @@ namespace matador::test::orm {
|
|||
test_statement::test_statement(const sql::query_context &query)
|
||||
: statement_impl(query) {}
|
||||
|
||||
utils::result<size_t, utils::error> test_statement::execute(const sql::parameter_binder &/*bindings*/) {
|
||||
utils::result<size_t, utils::error> test_statement::execute(const sql::interface::parameter_binder &/*bindings*/) {
|
||||
using namespace std::chrono_literals;
|
||||
std::mt19937 rng(query_.sql.size());
|
||||
std::uniform_int_distribution dist(10, 40);
|
||||
|
|
@ -18,7 +17,7 @@ utils::result<size_t, utils::error> test_statement::execute(const sql::parameter
|
|||
return utils::ok(static_cast<size_t>(8));
|
||||
}
|
||||
|
||||
utils::result<std::unique_ptr<sql::query_result_impl>, utils::error> test_statement::fetch(const sql::parameter_binder &/*bindings*/) {
|
||||
utils::result<std::unique_ptr<sql::query_result_impl>, utils::error> test_statement::fetch(const sql::interface::parameter_binder &/*bindings*/) {
|
||||
return utils::ok(std::make_unique<sql::query_result_impl>(std::make_unique<test_result_reader>(), query_.prototype, query_.prototype.size()));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,8 +8,8 @@ namespace matador::test::orm {
|
|||
class test_statement final : public sql::statement_impl {
|
||||
public:
|
||||
explicit test_statement(const sql::query_context &query);
|
||||
utils::result<size_t, utils::error> execute(const sql::parameter_binder &bindings) override;
|
||||
utils::result<std::unique_ptr<sql::query_result_impl>, utils::error> fetch(const sql::parameter_binder &bindings) override;
|
||||
utils::result<size_t, utils::error> execute(const sql::interface::parameter_binder &bindings) override;
|
||||
utils::result<std::unique_ptr<sql::query_result_impl>, utils::error> fetch(const sql::interface::parameter_binder &bindings) override;
|
||||
void reset() override;
|
||||
|
||||
protected:
|
||||
|
|
|
|||
|
|
@ -22,57 +22,54 @@ using namespace matador::sql;
|
|||
using namespace matador::query;
|
||||
using namespace matador::utils;
|
||||
|
||||
class MetricsObserver {
|
||||
public:
|
||||
MetricsObserver(message_bus &bus) {
|
||||
|
||||
}
|
||||
void on_event(const statement_cache_event& evt) override {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
switch (evt.type) {
|
||||
case statement_cache_event::Type::LockFailed:
|
||||
lock_failure_count_++;
|
||||
if (evt.duration) {
|
||||
total_lock_wait_time_ += evt.duration.value();
|
||||
}
|
||||
break;
|
||||
|
||||
case statement_cache_event::Type::ExecutionEnded:
|
||||
execution_count_++;
|
||||
if (evt.duration) {
|
||||
total_execution_time_ += evt.duration.value();
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Metrik-Zugriffsmethoden
|
||||
double get_average_lock_wait_time() const {
|
||||
std::lock_guard lock(mutex_);
|
||||
if (lock_failure_count_ == 0) return 0.0;
|
||||
return std::chrono::duration<double>(total_lock_wait_time_).count() / lock_failure_count_;
|
||||
}
|
||||
|
||||
double get_average_execution_time() const {
|
||||
std::lock_guard lock(mutex_);
|
||||
if (execution_count_ == 0) return 0.0;
|
||||
return std::chrono::duration<double>(total_execution_time_).count() / execution_count_;
|
||||
}
|
||||
|
||||
size_t get_lock_failure_count() const {
|
||||
std::lock_guard lock(mutex_);
|
||||
return lock_failure_count_;
|
||||
}
|
||||
|
||||
private:
|
||||
mutable std::mutex mutex_;
|
||||
size_t lock_failure_count_{0};
|
||||
size_t execution_count_{0};
|
||||
std::chrono::nanoseconds total_lock_wait_time_{0};
|
||||
std::chrono::nanoseconds total_execution_time_{0};
|
||||
};
|
||||
|
||||
// Beispiel für einen Observer, der die Metriken auswertet
|
||||
// class MetricsObserver : public statement_cache_observer_interface {
|
||||
// public:
|
||||
// void on_event(const statement_cache_event& evt) override {
|
||||
// std::lock_guard<std::mutex> lock(mutex_);
|
||||
//
|
||||
// switch (evt.type) {
|
||||
// case statement_cache_event::Type::LockFailed:
|
||||
// lock_failure_count_++;
|
||||
// if (evt.duration) {
|
||||
// total_lock_wait_time_ += evt.duration.value();
|
||||
// }
|
||||
// break;
|
||||
//
|
||||
// case statement_cache_event::Type::ExecutionEnded:
|
||||
// execution_count_++;
|
||||
// if (evt.duration) {
|
||||
// total_execution_time_ += evt.duration.value();
|
||||
// }
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // Metrik-Zugriffsmethoden
|
||||
// double get_average_lock_wait_time() const {
|
||||
// std::lock_guard<std::mutex> lock(mutex_);
|
||||
// if (lock_failure_count_ == 0) return 0.0;
|
||||
// return std::chrono::duration<double>(total_lock_wait_time_).count() / lock_failure_count_;
|
||||
// }
|
||||
//
|
||||
// double get_average_execution_time() const {
|
||||
// std::lock_guard<std::mutex> lock(mutex_);
|
||||
// if (execution_count_ == 0) return 0.0;
|
||||
// return std::chrono::duration<double>(total_execution_time_).count() / execution_count_;
|
||||
// }
|
||||
//
|
||||
// size_t get_lock_failure_count() const {
|
||||
// std::lock_guard<std::mutex> lock(mutex_);
|
||||
// return lock_failure_count_;
|
||||
// }
|
||||
//
|
||||
// private:
|
||||
// mutable std::mutex mutex_;
|
||||
// size_t lock_failure_count_{0};
|
||||
// size_t execution_count_{0};
|
||||
// std::chrono::nanoseconds total_lock_wait_time_{0};
|
||||
// std::chrono::nanoseconds total_execution_time_{0};
|
||||
// };
|
||||
// void example_with_metrics() {
|
||||
// connection_pool pool("noop://noop.db", 4);
|
||||
// statement_cache cache(pool, 5);
|
||||
|
|
@ -307,10 +304,9 @@ TEST_CASE("Race condition simulation with mixed access", "[statement_cache][race
|
|||
|
||||
auto task = [&](int /*id*/) {
|
||||
for (int i = 0; i < operations; ++i) {
|
||||
const auto sql = "SELECT " + std::to_string(i % 10);
|
||||
if (const auto result = cache.acquire({sql}); !result) {
|
||||
FAIL("Statement should not be available");
|
||||
}
|
||||
auto sql = "SELECT " + std::to_string(i % 10);
|
||||
auto result = cache.acquire({sql});
|
||||
REQUIRE(result);
|
||||
|
||||
// if (i % 50 == 0) {
|
||||
// cache.cleanup_expired_connections();
|
||||
|
|
|
|||
Loading…
Reference in New Issue