Compare commits
2 Commits
35fad9f47c
...
4f017ac1de
| Author | SHA1 | Date |
|---|---|---|
|
|
4f017ac1de | |
|
|
60cef7e938 |
|
|
@ -4,6 +4,8 @@
|
||||||
#include "matador/sql/executor.hpp"
|
#include "matador/sql/executor.hpp"
|
||||||
#include "matador/sql/statement.hpp"
|
#include "matador/sql/statement.hpp"
|
||||||
|
|
||||||
|
#include "matador/utils/message_bus.hpp"
|
||||||
|
|
||||||
#include <list>
|
#include <list>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
|
@ -12,17 +14,18 @@ namespace matador::sql {
|
||||||
|
|
||||||
class connection_pool;
|
class connection_pool;
|
||||||
|
|
||||||
struct statement_cache_event {
|
struct statement_event {
|
||||||
enum class Type { Accessed, Added, Evicted };
|
|
||||||
Type type;
|
|
||||||
std::string sql;
|
std::string sql;
|
||||||
std::chrono::steady_clock::time_point timestamp;
|
std::chrono::steady_clock::time_point timestamp;
|
||||||
};
|
};
|
||||||
|
|
||||||
class statement_cache_observer_interface {
|
struct statement_accessed_event : statement_event {};
|
||||||
public:
|
struct statement_added_event : statement_event {};
|
||||||
virtual void on_event(const statement_cache_event&) = 0;
|
struct statement_evicted_event : statement_event {};
|
||||||
virtual ~statement_cache_observer_interface() = default;
|
|
||||||
|
|
||||||
|
struct statement_cache_config {
|
||||||
|
size_t max_size;
|
||||||
};
|
};
|
||||||
|
|
||||||
class statement_cache final {
|
class statement_cache final {
|
||||||
|
|
@ -36,7 +39,7 @@ private:
|
||||||
};
|
};
|
||||||
|
|
||||||
public:
|
public:
|
||||||
explicit statement_cache(connection_pool &pool, size_t max_size = 50);
|
statement_cache(utils::message_bus &bus, connection_pool &pool, size_t max_size = 50);
|
||||||
statement_cache(const statement_cache &) = delete;
|
statement_cache(const statement_cache &) = delete;
|
||||||
statement_cache &operator=(const statement_cache &) = delete;
|
statement_cache &operator=(const statement_cache &) = delete;
|
||||||
statement_cache(statement_cache &&) = delete;
|
statement_cache(statement_cache &&) = delete;
|
||||||
|
|
@ -49,13 +52,13 @@ public:
|
||||||
[[nodiscard]] size_t capacity() const;
|
[[nodiscard]] size_t capacity() const;
|
||||||
[[nodiscard]] bool empty() const;
|
[[nodiscard]] bool empty() const;
|
||||||
|
|
||||||
void subscribe(statement_cache_observer_interface &observer);
|
template<class EventType>
|
||||||
|
utils::subscription subscribe(std::function<void(const EventType&)> handler) {
|
||||||
|
return bus_.subscribe<EventType>(handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void push(statement_cache_event::Type type, const std::string& sql) const;
|
|
||||||
|
|
||||||
private:
|
|
||||||
|
|
||||||
size_t max_size_{};
|
size_t max_size_{};
|
||||||
std::list<size_t> usage_list_; // LRU: front = most recent, back = least recent
|
std::list<size_t> usage_list_; // LRU: front = most recent, back = least recent
|
||||||
std::unordered_map<size_t, cache_entry> cache_map_;
|
std::unordered_map<size_t, cache_entry> cache_map_;
|
||||||
|
|
@ -64,7 +67,8 @@ private:
|
||||||
connection_pool &pool_;
|
connection_pool &pool_;
|
||||||
const sql::dialect &dialect_;
|
const sql::dialect &dialect_;
|
||||||
|
|
||||||
std::vector<statement_cache_observer_interface*> observers_;
|
utils::message_bus &bus_;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
#endif //STATEMENT_CACHE_HPP
|
#endif //STATEMENT_CACHE_HPP
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,222 @@
|
||||||
|
#ifndef MATADOR_MESSAGE_BUS_HPP
|
||||||
|
#define MATADOR_MESSAGE_BUS_HPP
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <functional>
|
||||||
|
#include <memory>
|
||||||
|
#include <mutex>
|
||||||
|
#include <shared_mutex>
|
||||||
|
#include <typeindex>
|
||||||
|
#include <typeinfo>
|
||||||
|
#include <unordered_map>
|
||||||
|
#include <vector>
|
||||||
|
#include <algorithm>
|
||||||
|
#include <stdexcept>
|
||||||
|
|
||||||
|
namespace matador::utils {
|
||||||
|
class message_bus; // forward
|
||||||
|
|
||||||
|
class message {
|
||||||
|
public:
|
||||||
|
message();
|
||||||
|
|
||||||
|
// Owning: copy into an owned shared_ptr (may allocate)
|
||||||
|
template<typename T>
|
||||||
|
explicit message(const T& value) {
|
||||||
|
auto strong = std::make_shared<T>(value); // allocate
|
||||||
|
owner_ = std::static_pointer_cast<void>(strong); // share ownership as void*
|
||||||
|
ptr_ = static_cast<const void*>(strong.get());
|
||||||
|
type_ = std::type_index(typeid(T));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Non-owning: zero-copy reference to an existing object (no allocation)
|
||||||
|
// Caller must guarantee lifetime of `ref` for the usage duration.
|
||||||
|
template<typename T>
|
||||||
|
static message from_ref(const T& ref) {
|
||||||
|
message m;
|
||||||
|
m.ptr_ = static_cast<const void*>(std::addressof(ref));
|
||||||
|
m.type_ = std::type_index(typeid(T));
|
||||||
|
// owner_ remains null => non-owning
|
||||||
|
return m;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Owning from shared_ptr (no extra copy; shares ownership)
|
||||||
|
template<typename T>
|
||||||
|
static message from_shared(std::shared_ptr<T> sp) {
|
||||||
|
message m;
|
||||||
|
m.owner_ = std::static_pointer_cast<void>(sp);
|
||||||
|
m.ptr_ = static_cast<const void*>(sp.get());
|
||||||
|
m.type_ = std::type_index(typeid(T));
|
||||||
|
return m;
|
||||||
|
}
|
||||||
|
|
||||||
|
[[nodiscard]] bool empty() const;
|
||||||
|
[[nodiscard]] std::type_index type() const;
|
||||||
|
|
||||||
|
template<typename T>
|
||||||
|
[[nodiscard]] bool is() const { return type_ == std::type_index(typeid(T)); }
|
||||||
|
|
||||||
|
// Access as typed reference. Works for both owning and non-owning.
|
||||||
|
// Throws std::bad_cast if type mismatch.
|
||||||
|
template<typename T>
|
||||||
|
const T& get() const {
|
||||||
|
if (!is<T>()) throw std::bad_cast();
|
||||||
|
const void* p = msg_ptr();
|
||||||
|
if (!p) throw std::runtime_error("AnyMessage: empty pointer");
|
||||||
|
return *static_cast<const T*>(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return the raw const void* pointer to the stored object (may be non-owning)
|
||||||
|
[[nodiscard]] const void* msg_ptr() const;
|
||||||
|
|
||||||
|
private:
|
||||||
|
const void* ptr_; // non-owning raw pointer (if set)
|
||||||
|
std::shared_ptr<void> owner_; // owning pointer if we made a copy or got shared_ptr
|
||||||
|
std::type_index type_{typeid(void)};
|
||||||
|
};
|
||||||
|
|
||||||
|
// Forward declare Subscription so MessageBus can return it.
|
||||||
|
class subscription {
|
||||||
|
public:
|
||||||
|
subscription() = default;
|
||||||
|
subscription(message_bus* bus, std::type_index type, uint64_t id);
|
||||||
|
|
||||||
|
// Move-only (non-copyable)
|
||||||
|
subscription(const subscription&) = delete;
|
||||||
|
subscription& operator=(const subscription&) = delete;
|
||||||
|
subscription(subscription&& other) noexcept;
|
||||||
|
subscription& operator=(subscription&& other) noexcept;
|
||||||
|
|
||||||
|
~subscription();
|
||||||
|
|
||||||
|
void unsubscribe();
|
||||||
|
|
||||||
|
[[nodiscard]] bool valid() const;
|
||||||
|
|
||||||
|
private:
|
||||||
|
message_bus* bus_ = nullptr;
|
||||||
|
std::type_index type_{typeid(void)};
|
||||||
|
uint64_t id_ = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
// === MessageBus: runtime-registered, thread-safe ===
|
||||||
|
class message_bus {
|
||||||
|
public:
|
||||||
|
using HandlerId = uint64_t;
|
||||||
|
|
||||||
|
message_bus() : next_id_{1} {}
|
||||||
|
|
||||||
|
// Subscribe with a free function or lambda
|
||||||
|
template<typename T>
|
||||||
|
subscription subscribe(std::function<void(const T&)> handler,
|
||||||
|
std::function<bool(const T&)> filter = nullptr)
|
||||||
|
{
|
||||||
|
auto id = next_id_.fetch_add(1, std::memory_order_relaxed);
|
||||||
|
std::unique_lock writeLock(mutex_);
|
||||||
|
|
||||||
|
auto &vec = handlers_[std::type_index(typeid(T))];
|
||||||
|
Entry e;
|
||||||
|
e.id = id;
|
||||||
|
e.handler = [h = std::move(handler)](const void* p) {
|
||||||
|
h(*static_cast<const T*>(p));
|
||||||
|
};
|
||||||
|
if (filter) {
|
||||||
|
e.filter = [f = std::move(filter)](const void* p) -> bool {
|
||||||
|
return f(*static_cast<const T*>(p));
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
e.filter = nullptr;
|
||||||
|
}
|
||||||
|
vec.emplace_back(std::move(e));
|
||||||
|
return {this, std::type_index(typeid(T)), id};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe raw pointer instance; caller ensures lifetime
|
||||||
|
template<typename T, typename C>
|
||||||
|
subscription subscribe(C* instance, void (C::*memberFn)(const T&),
|
||||||
|
std::function<bool(const T&)> filter = nullptr)
|
||||||
|
{
|
||||||
|
auto fn = [instance, memberFn](const T& m) { (instance->*memberFn)(m); };
|
||||||
|
return subscribe<T>(std::function<void(const T&)>(fn), std::move(filter));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe shared_ptr instance (safe)
|
||||||
|
template<typename T, typename C>
|
||||||
|
subscription subscribe(std::shared_ptr<C> instance, void (C::*memberFn)(const T&),
|
||||||
|
std::function<bool(const T&)> filter = nullptr)
|
||||||
|
{
|
||||||
|
std::weak_ptr<C> w = instance;
|
||||||
|
auto handler = [w, memberFn](const T& m) {
|
||||||
|
if (auto s = w.lock()) {
|
||||||
|
(s.get()->*memberFn)(m);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
std::function<bool(const T&)> local_filter = nullptr;
|
||||||
|
if (filter) {
|
||||||
|
local_filter = [w, filter = std::move(filter)](const T& m) -> bool {
|
||||||
|
if (w.expired()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return filter(m);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
return subscribe<T>(std::move(handler), std::move(local_filter));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unsubscribe by type-specific id (RAII calls this)
|
||||||
|
template<typename T>
|
||||||
|
void unsubscribe(const HandlerId id) {
|
||||||
|
unsubscribe(std::type_index(typeid(T)), id);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unsubscribe by explicit type_index and id
|
||||||
|
void unsubscribe(std::type_index type, HandlerId id);
|
||||||
|
|
||||||
|
// Unsubscribe by id regardless of type (O(#types) scan)
|
||||||
|
void unsubscribe(HandlerId id);
|
||||||
|
|
||||||
|
// Publish a typed message (fast)
|
||||||
|
template<typename T>
|
||||||
|
void publish(const T& msg) const {
|
||||||
|
std::vector<Entry> snapshot;
|
||||||
|
{
|
||||||
|
std::shared_lock readLock(mutex_);
|
||||||
|
const auto it = handlers_.find(std::type_index(typeid(T)));
|
||||||
|
if (it == handlers_.end()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
snapshot = it->second; // copy list to avoid holding lock during callbacks
|
||||||
|
}
|
||||||
|
for (const auto &e : snapshot) {
|
||||||
|
if (!e.filter || e.filter(&msg)) e.handler(&msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish using AnyMessage (zero-copy if AnyMessage is non-owning)
|
||||||
|
void publish(const message& msg) const;
|
||||||
|
|
||||||
|
private:
|
||||||
|
friend class subscription;
|
||||||
|
|
||||||
|
struct Entry {
|
||||||
|
HandlerId id;
|
||||||
|
std::function<void(const void*)> handler;
|
||||||
|
std::function<bool(const void*)> filter;
|
||||||
|
};
|
||||||
|
|
||||||
|
mutable std::shared_mutex mutex_;
|
||||||
|
std::unordered_map<std::type_index, std::vector<Entry>> handlers_;
|
||||||
|
std::atomic<HandlerId> next_id_;
|
||||||
|
};
|
||||||
|
|
||||||
|
// RAII unsubscribe implementation
|
||||||
|
inline void subscription::unsubscribe() {
|
||||||
|
if (bus_) {
|
||||||
|
bus_->unsubscribe(type_, id_);
|
||||||
|
bus_ = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif //MATADOR_MESSAGE_BUS_HPP
|
||||||
|
|
@ -13,8 +13,8 @@ add_library(matador-core STATIC
|
||||||
../../include/matador/net/reactor.hpp
|
../../include/matador/net/reactor.hpp
|
||||||
../../include/matador/net/select_fd_sets.hpp
|
../../include/matador/net/select_fd_sets.hpp
|
||||||
../../include/matador/net/socket_interrupter.hpp
|
../../include/matador/net/socket_interrupter.hpp
|
||||||
../../include/matador/object/attribute_definition_generator.hpp
|
|
||||||
../../include/matador/object/attribute_definition.hpp
|
../../include/matador/object/attribute_definition.hpp
|
||||||
|
../../include/matador/object/attribute_definition_generator.hpp
|
||||||
../../include/matador/object/basic_object_info.hpp
|
../../include/matador/object/basic_object_info.hpp
|
||||||
../../include/matador/object/error_code.hpp
|
../../include/matador/object/error_code.hpp
|
||||||
../../include/matador/object/foreign_node_completer.hpp
|
../../include/matador/object/foreign_node_completer.hpp
|
||||||
|
|
@ -30,6 +30,7 @@ add_library(matador-core STATIC
|
||||||
../../include/matador/object/schema.hpp
|
../../include/matador/object/schema.hpp
|
||||||
../../include/matador/object/schema_node.hpp
|
../../include/matador/object/schema_node.hpp
|
||||||
../../include/matador/object/schema_node_iterator.hpp
|
../../include/matador/object/schema_node_iterator.hpp
|
||||||
|
../../include/matador/sql/statement_cache.hpp
|
||||||
../../include/matador/utils/access.hpp
|
../../include/matador/utils/access.hpp
|
||||||
../../include/matador/utils/attribute_reader.hpp
|
../../include/matador/utils/attribute_reader.hpp
|
||||||
../../include/matador/utils/attribute_writer.hpp
|
../../include/matador/utils/attribute_writer.hpp
|
||||||
|
|
@ -47,13 +48,14 @@ add_library(matador-core STATIC
|
||||||
../../include/matador/utils/errors.hpp
|
../../include/matador/utils/errors.hpp
|
||||||
../../include/matador/utils/export.hpp
|
../../include/matador/utils/export.hpp
|
||||||
../../include/matador/utils/fetch_type.hpp
|
../../include/matador/utils/fetch_type.hpp
|
||||||
../../include/matador/utils/file.hpp
|
|
||||||
../../include/matador/utils/field_attributes.hpp
|
../../include/matador/utils/field_attributes.hpp
|
||||||
|
../../include/matador/utils/file.hpp
|
||||||
../../include/matador/utils/foreign_attributes.hpp
|
../../include/matador/utils/foreign_attributes.hpp
|
||||||
../../include/matador/utils/identifier.hpp
|
../../include/matador/utils/identifier.hpp
|
||||||
../../include/matador/utils/leader_follower_thread_pool.hpp
|
../../include/matador/utils/leader_follower_thread_pool.hpp
|
||||||
../../include/matador/utils/library.hpp
|
../../include/matador/utils/library.hpp
|
||||||
../../include/matador/utils/macro_map.hpp
|
../../include/matador/utils/macro_map.hpp
|
||||||
|
../../include/matador/utils/message_bus.hpp
|
||||||
../../include/matador/utils/os.hpp
|
../../include/matador/utils/os.hpp
|
||||||
../../include/matador/utils/placeholder.hpp
|
../../include/matador/utils/placeholder.hpp
|
||||||
../../include/matador/utils/primary_key_attribute.hpp
|
../../include/matador/utils/primary_key_attribute.hpp
|
||||||
|
|
@ -73,8 +75,8 @@ add_library(matador-core STATIC
|
||||||
logger/log_manager.cpp
|
logger/log_manager.cpp
|
||||||
logger/logger.cpp
|
logger/logger.cpp
|
||||||
logger/rotating_file_sink.cpp
|
logger/rotating_file_sink.cpp
|
||||||
object/attribute_definition_generator.cpp
|
|
||||||
object/attribute_definition.cpp
|
object/attribute_definition.cpp
|
||||||
|
object/attribute_definition_generator.cpp
|
||||||
object/basic_object_info.cpp
|
object/basic_object_info.cpp
|
||||||
object/error_code.cpp
|
object/error_code.cpp
|
||||||
object/foreign_node_completer.cpp
|
object/foreign_node_completer.cpp
|
||||||
|
|
@ -93,6 +95,7 @@ add_library(matador-core STATIC
|
||||||
utils/identifier.cpp
|
utils/identifier.cpp
|
||||||
utils/leader_follower_thread_pool.cpp
|
utils/leader_follower_thread_pool.cpp
|
||||||
utils/library.cpp
|
utils/library.cpp
|
||||||
|
utils/message_bus.cpp
|
||||||
utils/os.cpp
|
utils/os.cpp
|
||||||
utils/primary_key_attribute.cpp
|
utils/primary_key_attribute.cpp
|
||||||
utils/string.cpp
|
utils/string.cpp
|
||||||
|
|
@ -101,7 +104,6 @@ add_library(matador-core STATIC
|
||||||
utils/uuid.cpp
|
utils/uuid.cpp
|
||||||
utils/value.cpp
|
utils/value.cpp
|
||||||
utils/version.cpp
|
utils/version.cpp
|
||||||
../../include/matador/sql/statement_cache.hpp
|
|
||||||
)
|
)
|
||||||
|
|
||||||
target_link_libraries(matador-core ${CMAKE_DL_LIBS})
|
target_link_libraries(matador-core ${CMAKE_DL_LIBS})
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,97 @@
|
||||||
|
#include "matador/utils/message_bus.hpp"
|
||||||
|
|
||||||
|
namespace matador::utils {
|
||||||
|
message::message()
|
||||||
|
: ptr_(nullptr)
|
||||||
|
, owner_(nullptr)
|
||||||
|
, type_(typeid(void)) {}
|
||||||
|
|
||||||
|
bool message::empty() const {
|
||||||
|
return ptr_ == nullptr && !owner_;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::type_index message::type() const {
|
||||||
|
return type_;
|
||||||
|
}
|
||||||
|
|
||||||
|
const void * message::msg_ptr() const {
|
||||||
|
return owner_ ? owner_.get() : ptr_;
|
||||||
|
}
|
||||||
|
|
||||||
|
subscription::subscription(message_bus *bus, const std::type_index type, const uint64_t id)
|
||||||
|
: bus_(bus)
|
||||||
|
, type_(type)
|
||||||
|
, id_(id) {}
|
||||||
|
|
||||||
|
subscription::subscription(subscription &&other) noexcept
|
||||||
|
: bus_(other.bus_)
|
||||||
|
, type_(other.type_)
|
||||||
|
, id_(other.id_) {
|
||||||
|
other.bus_ = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
subscription& subscription::operator=(subscription &&other) noexcept {
|
||||||
|
if (this != &other) {
|
||||||
|
unsubscribe();
|
||||||
|
bus_ = other.bus_;
|
||||||
|
type_ = other.type_;
|
||||||
|
id_ = other.id_;
|
||||||
|
other.bus_ = nullptr;
|
||||||
|
}
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
subscription::~subscription() {
|
||||||
|
unsubscribe();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool subscription::valid() const {
|
||||||
|
return bus_ != nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
void message_bus::unsubscribe(const std::type_index type, HandlerId id) {
|
||||||
|
std::unique_lock writeLock(mutex_);
|
||||||
|
const auto it = handlers_.find(type);
|
||||||
|
if (it == handlers_.end()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
auto &handler_vector = it->second;
|
||||||
|
handler_vector.erase(std::remove_if(handler_vector.begin(), handler_vector.end(),
|
||||||
|
[id](const Entry &e) { return e.id == id; }),
|
||||||
|
handler_vector.end());
|
||||||
|
if (handler_vector.empty()) {
|
||||||
|
handlers_.erase(it);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void message_bus::unsubscribe(HandlerId id) {
|
||||||
|
std::unique_lock writeLock(mutex_);
|
||||||
|
for (auto it = handlers_.begin(); it != handlers_.end(); ) {
|
||||||
|
auto &handler_vector = it->second;
|
||||||
|
handler_vector.erase(std::remove_if(handler_vector.begin(), handler_vector.end(),
|
||||||
|
[id](const Entry &e) { return e.id == id; }),
|
||||||
|
handler_vector.end());
|
||||||
|
if (handler_vector.empty()) it = handlers_.erase(it);
|
||||||
|
else ++it;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void message_bus::publish(const message &msg) const {
|
||||||
|
if (msg.empty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
std::vector<Entry> snapshot;
|
||||||
|
{
|
||||||
|
std::shared_lock readLock(mutex_);
|
||||||
|
const auto it = handlers_.find(msg.type());
|
||||||
|
if (it == handlers_.end()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
snapshot = it->second;
|
||||||
|
}
|
||||||
|
const void* p = msg.msg_ptr();
|
||||||
|
for (const auto &e : snapshot) {
|
||||||
|
if (!e.filter || e.filter(p)) e.handler(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -16,14 +16,25 @@ public:
|
||||||
std::chrono::milliseconds max_wait{250};
|
std::chrono::milliseconds max_wait{250};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct execution_metrics {
|
||||||
|
std::chrono::steady_clock::time_point lock_attempt_start;
|
||||||
|
std::chrono::steady_clock::time_point lock_acquired{};
|
||||||
|
std::chrono::steady_clock::time_point execution_start{};
|
||||||
|
std::chrono::steady_clock::time_point execution_end{};
|
||||||
|
size_t lock_attempts{0};
|
||||||
|
};
|
||||||
|
|
||||||
explicit statement_cache_proxy(std::unique_ptr<statement_impl>&& stmt, connection_pool &pool, const size_t connection_id)
|
explicit statement_cache_proxy(std::unique_ptr<statement_impl>&& stmt, connection_pool &pool, const size_t connection_id)
|
||||||
: statement_proxy(std::move(stmt))
|
: statement_proxy(std::move(stmt))
|
||||||
, pool_(pool)
|
, pool_(pool)
|
||||||
, connection_id_(connection_id) {}
|
, connection_id_(connection_id) {}
|
||||||
|
|
||||||
utils::result<size_t, utils::error> execute(interface::parameter_binder& bindings) override {
|
utils::result<size_t, utils::error> execute(interface::parameter_binder& bindings) override {
|
||||||
auto result = try_with_retry([this, &bindings]() -> utils::result<size_t, utils::error> {
|
execution_metrics metrics{std::chrono::steady_clock::now()};
|
||||||
|
|
||||||
|
auto result = try_with_retry([this, &bindings, &metrics]() -> utils::result<size_t, utils::error> {
|
||||||
if (!try_lock()) {
|
if (!try_lock()) {
|
||||||
|
++metrics.lock_attempts;
|
||||||
return utils::failure(utils::error{
|
return utils::failure(utils::error{
|
||||||
error_code::STATEMENT_LOCKED,
|
error_code::STATEMENT_LOCKED,
|
||||||
"Failed to execute statement because it is already in use"
|
"Failed to execute statement because it is already in use"
|
||||||
|
|
@ -45,8 +56,11 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(interface::parameter_binder& bindings) override {
|
utils::result<std::unique_ptr<query_result_impl>, utils::error> fetch(interface::parameter_binder& bindings) override {
|
||||||
auto result = try_with_retry([this, &bindings]() -> utils::result<std::unique_ptr<query_result_impl>, utils::error> {
|
execution_metrics metrics{std::chrono::steady_clock::now()};
|
||||||
|
|
||||||
|
auto result = try_with_retry([this, &bindings, &metrics]() -> utils::result<std::unique_ptr<query_result_impl>, utils::error> {
|
||||||
if (!try_lock()) {
|
if (!try_lock()) {
|
||||||
|
++metrics.lock_attempts;
|
||||||
return utils::failure(utils::error{
|
return utils::failure(utils::error{
|
||||||
error_code::STATEMENT_LOCKED,
|
error_code::STATEMENT_LOCKED,
|
||||||
"Failed to execute statement because it is already in use"
|
"Failed to execute statement because it is already in use"
|
||||||
|
|
@ -114,10 +128,11 @@ private:
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
statement_cache::statement_cache(connection_pool &pool, const size_t max_size)
|
statement_cache::statement_cache(utils::message_bus &bus, connection_pool &pool, const size_t max_size)
|
||||||
: max_size_(max_size)
|
: max_size_(max_size)
|
||||||
, pool_(pool)
|
, pool_(pool)
|
||||||
, dialect_(backend_provider::instance().connection_dialect(pool_.info().type)) {}
|
, dialect_(backend_provider::instance().connection_dialect(pool_.info().type))
|
||||||
|
, bus_(bus) {}
|
||||||
|
|
||||||
utils::result<statement, utils::error> statement_cache::acquire(const query_context& ctx) {
|
utils::result<statement, utils::error> statement_cache::acquire(const query_context& ctx) {
|
||||||
std::unique_lock lock(mutex_);
|
std::unique_lock lock(mutex_);
|
||||||
|
|
@ -128,7 +143,7 @@ utils::result<statement, utils::error> statement_cache::acquire(const query_cont
|
||||||
if (const auto it = cache_map_.find(key); it != cache_map_.end()) {
|
if (const auto it = cache_map_.find(key); it != cache_map_.end()) {
|
||||||
usage_list_.splice(usage_list_.begin(), usage_list_, it->second.position);
|
usage_list_.splice(usage_list_.begin(), usage_list_, it->second.position);
|
||||||
it->second.last_access = now;
|
it->second.last_access = now;
|
||||||
push(statement_cache_event::Type::Accessed, ctx.sql);
|
bus_.publish<statement_accessed_event>({ctx.sql, std::chrono::steady_clock::now()});
|
||||||
return utils::ok(it->second.stmt);
|
return utils::ok(it->second.stmt);
|
||||||
}
|
}
|
||||||
// Prepare a new statement
|
// Prepare a new statement
|
||||||
|
|
@ -149,7 +164,7 @@ utils::result<statement, utils::error> statement_cache::acquire(const query_cont
|
||||||
const auto& key_to_remove = usage_list_.back();
|
const auto& key_to_remove = usage_list_.back();
|
||||||
cache_map_.erase(key_to_remove);
|
cache_map_.erase(key_to_remove);
|
||||||
usage_list_.pop_back();
|
usage_list_.pop_back();
|
||||||
push(statement_cache_event::Type::Evicted, ctx.sql);
|
bus_.publish<statement_evicted_event>({ctx.sql, std::chrono::steady_clock::now()});
|
||||||
}
|
}
|
||||||
|
|
||||||
usage_list_.push_front(key);
|
usage_list_.push_front(key);
|
||||||
|
|
@ -160,7 +175,7 @@ utils::result<statement, utils::error> statement_cache::acquire(const query_cont
|
||||||
std::chrono::steady_clock::now(),
|
std::chrono::steady_clock::now(),
|
||||||
usage_list_.begin()}
|
usage_list_.begin()}
|
||||||
}).first;
|
}).first;
|
||||||
push(statement_cache_event::Type::Added, ctx.sql);
|
bus_.publish<statement_added_event>({ctx.sql, std::chrono::steady_clock::now()});
|
||||||
|
|
||||||
return utils::ok(it->second.stmt);
|
return utils::ok(it->second.stmt);
|
||||||
}
|
}
|
||||||
|
|
@ -176,18 +191,4 @@ bool statement_cache::empty() const {
|
||||||
return cache_map_.empty();
|
return cache_map_.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
void statement_cache::subscribe(statement_cache_observer_interface &observer) {
|
|
||||||
observers_.push_back(&observer);
|
|
||||||
}
|
|
||||||
|
|
||||||
void statement_cache::push(statement_cache_event::Type type, const std::string &sql) const {
|
|
||||||
using Clock = std::chrono::steady_clock;
|
|
||||||
const auto ts = Clock::now();
|
|
||||||
const statement_cache_event evt{type, sql, ts};
|
|
||||||
for (auto& obs : observers_) {
|
|
||||||
if (obs) {
|
|
||||||
obs->on_event(evt);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ add_executable(CoreTests
|
||||||
object/AttributeDefinitionGeneratorTest.cpp
|
object/AttributeDefinitionGeneratorTest.cpp
|
||||||
object/PrimaryKeyResolverTest.cpp
|
object/PrimaryKeyResolverTest.cpp
|
||||||
object/SchemaTest.cpp
|
object/SchemaTest.cpp
|
||||||
|
utils/MessageBusTest.cpp
|
||||||
)
|
)
|
||||||
|
|
||||||
target_link_libraries(CoreTests matador-core Catch2::Catch2WithMain)
|
target_link_libraries(CoreTests matador-core Catch2::Catch2WithMain)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,157 @@
|
||||||
|
#include <catch2/catch_test_macros.hpp>
|
||||||
|
|
||||||
|
#include "matador/utils/message_bus.hpp"
|
||||||
|
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
|
struct TestEvent { int value; };
|
||||||
|
struct FilterEvent { int value; };
|
||||||
|
struct ClassEvent { std::string msg; };
|
||||||
|
|
||||||
|
class Receiver {
|
||||||
|
public:
|
||||||
|
void on_event(const ClassEvent& e) {
|
||||||
|
received.push_back(e.msg);
|
||||||
|
}
|
||||||
|
std::vector<std::string> received;
|
||||||
|
};
|
||||||
|
|
||||||
|
using namespace matador::utils;
|
||||||
|
|
||||||
|
TEST_CASE("Basic publish/subscribe works", "[MessageBus]") {
|
||||||
|
message_bus bus;
|
||||||
|
int counter = 0;
|
||||||
|
|
||||||
|
auto sub = bus.subscribe<TestEvent>([&](const TestEvent& e) {
|
||||||
|
counter += e.value;
|
||||||
|
});
|
||||||
|
|
||||||
|
bus.publish(TestEvent{1});
|
||||||
|
bus.publish(TestEvent{2});
|
||||||
|
|
||||||
|
REQUIRE(counter == 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("Filtering works", "[MessageBus]") {
|
||||||
|
message_bus bus;
|
||||||
|
int counter = 0;
|
||||||
|
|
||||||
|
auto sub = bus.subscribe<FilterEvent>(
|
||||||
|
[&](const FilterEvent& e) { counter += e.value; },
|
||||||
|
[](const FilterEvent& e) { return e.value % 2 == 0; } // only even
|
||||||
|
);
|
||||||
|
|
||||||
|
bus.publish(FilterEvent{1}); // should be filtered out
|
||||||
|
bus.publish(FilterEvent{2});
|
||||||
|
bus.publish(FilterEvent{3}); // filtered
|
||||||
|
bus.publish(FilterEvent{4});
|
||||||
|
|
||||||
|
REQUIRE(counter == 6); // only 2 + 4
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("Member function subscription works", "[MessageBus]") {
|
||||||
|
message_bus bus;
|
||||||
|
Receiver r;
|
||||||
|
|
||||||
|
auto sub = bus.subscribe<ClassEvent>(&r, &Receiver::on_event);
|
||||||
|
bus.publish(ClassEvent{"hello"});
|
||||||
|
bus.publish(ClassEvent{"world"});
|
||||||
|
|
||||||
|
REQUIRE(r.received.size() == 2);
|
||||||
|
REQUIRE(r.received[0] == "hello");
|
||||||
|
REQUIRE(r.received[1] == "world");
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("Shared_ptr instance subscription works", "[MessageBus]") {
|
||||||
|
message_bus bus;
|
||||||
|
const auto r = std::make_shared<Receiver>();
|
||||||
|
|
||||||
|
auto sub = bus.subscribe<ClassEvent>(r, &Receiver::on_event);
|
||||||
|
bus.publish(ClassEvent{"foo"});
|
||||||
|
|
||||||
|
REQUIRE(r->received.size() == 1);
|
||||||
|
REQUIRE(r->received[0] == "foo");
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("RAII unsubscription works", "[MessageBus]") {
|
||||||
|
message_bus bus;
|
||||||
|
int counter = 0;
|
||||||
|
|
||||||
|
{
|
||||||
|
auto sub = bus.subscribe<TestEvent>([&](const TestEvent& e) {
|
||||||
|
counter += e.value;
|
||||||
|
});
|
||||||
|
bus.publish(TestEvent{5});
|
||||||
|
REQUIRE(counter == 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
// sub goes out of scope, unsubscribed
|
||||||
|
bus.publish(TestEvent{5});
|
||||||
|
REQUIRE(counter == 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("Type-erased AnyMessage publish works", "[MessageBus]") {
|
||||||
|
message_bus bus;
|
||||||
|
int counter = 0;
|
||||||
|
|
||||||
|
auto sub = bus.subscribe<TestEvent>([&](const TestEvent& e) {
|
||||||
|
counter += e.value;
|
||||||
|
});
|
||||||
|
|
||||||
|
const message msg(TestEvent{10});
|
||||||
|
bus.publish(msg);
|
||||||
|
|
||||||
|
REQUIRE(counter == 10);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("Multiple subscribers all receive messages", "[MessageBus]") {
|
||||||
|
message_bus bus;
|
||||||
|
int a = 0, b = 0;
|
||||||
|
|
||||||
|
auto sub1 = bus.subscribe<TestEvent>([&](const TestEvent& e) { a += e.value; });
|
||||||
|
auto sub2 = bus.subscribe<TestEvent>([&](const TestEvent& e) { b += e.value; });
|
||||||
|
|
||||||
|
bus.publish(TestEvent{3});
|
||||||
|
REQUIRE(a == 3);
|
||||||
|
REQUIRE(b == 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("Stress test with multi-threaded publish and subscribe", "[MessageBus][stress]") {
|
||||||
|
message_bus bus;
|
||||||
|
std::atomic<int> counter{0};
|
||||||
|
constexpr int numThreads = 8;
|
||||||
|
constexpr int eventsPerThread = 5000;
|
||||||
|
|
||||||
|
// Subscribe before launching publishers
|
||||||
|
auto sub = bus.subscribe<TestEvent>([&](const TestEvent& e) {
|
||||||
|
counter.fetch_add(e.value, std::memory_order_relaxed);
|
||||||
|
});
|
||||||
|
|
||||||
|
std::vector<std::thread> threads;
|
||||||
|
threads.reserve(numThreads);
|
||||||
|
|
||||||
|
// Publisher threads
|
||||||
|
for (int t = 0; t < numThreads; ++t) {
|
||||||
|
threads.emplace_back([&] {
|
||||||
|
for (int i = 0; i < eventsPerThread; ++i) {
|
||||||
|
bus.publish(TestEvent{1});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Concurrent subscriber thread
|
||||||
|
threads.emplace_back([&] {
|
||||||
|
for (int i = 0; i < 1000; ++i) {
|
||||||
|
auto tempSub = bus.subscribe<TestEvent>([&](const TestEvent& e) {
|
||||||
|
counter.fetch_add(e.value, std::memory_order_relaxed);
|
||||||
|
});
|
||||||
|
std::this_thread::yield(); // simulate churn
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
for (auto& th : threads) {
|
||||||
|
th.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
REQUIRE(counter.load(std::memory_order_relaxed) >= numThreads * eventsPerThread);
|
||||||
|
}
|
||||||
|
|
@ -7,6 +7,8 @@
|
||||||
#include "matador/sql/error_code.hpp"
|
#include "matador/sql/error_code.hpp"
|
||||||
#include "matador/sql/statement_cache.hpp"
|
#include "matador/sql/statement_cache.hpp"
|
||||||
|
|
||||||
|
#include "matador/utils/message_bus.hpp"
|
||||||
|
|
||||||
#include "../backend/test_backend_service.hpp"
|
#include "../backend/test_backend_service.hpp"
|
||||||
|
|
||||||
#include "ConnectionPoolFixture.hpp"
|
#include "ConnectionPoolFixture.hpp"
|
||||||
|
|
@ -18,32 +20,47 @@
|
||||||
using namespace matador::test;
|
using namespace matador::test;
|
||||||
using namespace matador::sql;
|
using namespace matador::sql;
|
||||||
using namespace matador::query;
|
using namespace matador::query;
|
||||||
|
using namespace matador::utils;
|
||||||
|
|
||||||
class RecordingObserver final : public statement_cache_observer_interface {
|
class RecordingObserver final {
|
||||||
public:
|
public:
|
||||||
void on_event(const statement_cache_event& evt) override {
|
explicit RecordingObserver(message_bus &bus) {
|
||||||
|
subscriptions.push_back(bus.subscribe<statement_accessed_event>([this](const statement_accessed_event &ev) {
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
events.push(evt);
|
events.push(message::from_ref(ev));
|
||||||
|
}));
|
||||||
|
subscriptions.push_back(bus.subscribe<statement_added_event>([this](const statement_added_event &ev) {
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
events.push(message::from_ref(ev));
|
||||||
|
}));
|
||||||
|
subscriptions.push_back(bus.subscribe<statement_evicted_event>([this](const statement_evicted_event &ev) {
|
||||||
|
std::lock_guard lock(mutex);
|
||||||
|
events.push(message::from_ref(ev));
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<statement_cache_event> poll() {
|
std::optional<message> poll() {
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
if (events.empty()) return std::nullopt;
|
if (events.empty()) {
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
auto evt = events.front();
|
auto evt = events.front();
|
||||||
events.pop();
|
events.pop();
|
||||||
return evt;
|
return evt;
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
std::vector<subscription> subscriptions;
|
||||||
std::mutex mutex;
|
std::mutex mutex;
|
||||||
std::queue<statement_cache_event> events;
|
std::queue<message> events;
|
||||||
};
|
};
|
||||||
|
|
||||||
TEST_CASE("Test statement cache", "[statement][cache]") {
|
TEST_CASE("Test statement cache", "[statement][cache]") {
|
||||||
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
|
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
|
||||||
|
|
||||||
|
matador::utils::message_bus bus;
|
||||||
connection_pool pool("noop://noop.db", 4);
|
connection_pool pool("noop://noop.db", 4);
|
||||||
statement_cache cache(pool, 2);
|
statement_cache cache(bus, pool, 2);
|
||||||
|
|
||||||
query_context ctx;
|
query_context ctx;
|
||||||
ctx.sql = "SELECT * FROM person";
|
ctx.sql = "SELECT * FROM person";
|
||||||
|
|
@ -83,9 +100,9 @@ TEST_CASE("Test LRU cache evicts oldest entries", "[statement][cache][evict]") {
|
||||||
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
|
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
|
||||||
|
|
||||||
connection_pool pool("noop://noop.db", 4);
|
connection_pool pool("noop://noop.db", 4);
|
||||||
statement_cache cache(pool, 2);
|
message_bus bus;
|
||||||
RecordingObserver observer;
|
statement_cache cache(bus, pool, 2);
|
||||||
cache.subscribe(observer);
|
RecordingObserver observer(bus);
|
||||||
|
|
||||||
REQUIRE(cache.capacity() == 2);
|
REQUIRE(cache.capacity() == 2);
|
||||||
REQUIRE(cache.empty());
|
REQUIRE(cache.empty());
|
||||||
|
|
@ -116,8 +133,12 @@ TEST_CASE("Test LRU cache evicts oldest entries", "[statement][cache][evict]") {
|
||||||
|
|
||||||
int added = 0, evicted = 0;
|
int added = 0, evicted = 0;
|
||||||
while (auto e = observer.poll()) {
|
while (auto e = observer.poll()) {
|
||||||
if (e->type == statement_cache_event::Type::Added) added++;
|
if (e->is<statement_added_event>()) {
|
||||||
if (e->type == statement_cache_event::Type::Evicted) evicted++;
|
added++;
|
||||||
|
}
|
||||||
|
if (e->is<statement_evicted_event>()) {
|
||||||
|
evicted++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
REQUIRE(added >= 3);
|
REQUIRE(added >= 3);
|
||||||
REQUIRE(evicted >= 1);
|
REQUIRE(evicted >= 1);
|
||||||
|
|
@ -127,9 +148,9 @@ TEST_CASE("Test statement reuse avoids reprepare", "[statement][cache][prepare]"
|
||||||
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
|
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
|
||||||
|
|
||||||
connection_pool pool("noop://noop.db", 4);
|
connection_pool pool("noop://noop.db", 4);
|
||||||
statement_cache cache(pool, 2);
|
message_bus bus;
|
||||||
RecordingObserver observer;
|
statement_cache cache(bus, pool, 2);
|
||||||
cache.subscribe(observer);
|
RecordingObserver observer(bus);
|
||||||
|
|
||||||
REQUIRE(cache.capacity() == 2);
|
REQUIRE(cache.capacity() == 2);
|
||||||
REQUIRE(cache.empty());
|
REQUIRE(cache.empty());
|
||||||
|
|
@ -140,8 +161,6 @@ TEST_CASE("Test statement reuse avoids reprepare", "[statement][cache][prepare]"
|
||||||
result = cache.acquire({"SELECT * FROM person"});
|
result = cache.acquire({"SELECT * FROM person"});
|
||||||
REQUIRE(result);
|
REQUIRE(result);
|
||||||
auto stmt2 = result.value();
|
auto stmt2 = result.value();
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_CASE("Multithreaded stress test", "[statement][cache][stress]") {
|
TEST_CASE("Multithreaded stress test", "[statement][cache][stress]") {
|
||||||
|
|
@ -157,9 +176,9 @@ TEST_CASE("Multithreaded stress test", "[statement][cache][stress]") {
|
||||||
}
|
}
|
||||||
|
|
||||||
connection_pool pool("noop://noop.db", 4);
|
connection_pool pool("noop://noop.db", 4);
|
||||||
statement_cache cache(pool, 5);
|
message_bus bus;
|
||||||
RecordingObserver observer;
|
statement_cache cache(bus, pool, 5);
|
||||||
cache.subscribe(observer);
|
RecordingObserver observer(bus);
|
||||||
|
|
||||||
auto start_time = std::chrono::steady_clock::now();
|
auto start_time = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
|
|
@ -203,7 +222,7 @@ TEST_CASE("Multithreaded stress test", "[statement][cache][stress]") {
|
||||||
// Some events should be generated
|
// Some events should be generated
|
||||||
int accessed = 0;
|
int accessed = 0;
|
||||||
while (auto e = observer.poll()) {
|
while (auto e = observer.poll()) {
|
||||||
if (e->type == statement_cache_event::Type::Accessed) accessed++;
|
if (e->is<statement_accessed_event>()) accessed++;
|
||||||
}
|
}
|
||||||
REQUIRE(accessed > 0);
|
REQUIRE(accessed > 0);
|
||||||
}
|
}
|
||||||
|
|
@ -212,12 +231,13 @@ TEST_CASE("Race condition simulation with mixed access", "[statement_cache][race
|
||||||
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
|
backend_provider::instance().register_backend("noop", std::make_unique<orm::test_backend_service>());
|
||||||
|
|
||||||
connection_pool pool("noop://noop.db", 4);
|
connection_pool pool("noop://noop.db", 4);
|
||||||
statement_cache cache(pool, 5);
|
message_bus bus;
|
||||||
|
statement_cache cache(bus, pool, 5);
|
||||||
|
|
||||||
constexpr int threads = 8;
|
constexpr int threads = 8;
|
||||||
constexpr int operations = 500;
|
constexpr int operations = 500;
|
||||||
|
|
||||||
auto task = [&](int id) {
|
auto task = [&](int /*id*/) {
|
||||||
for (int i = 0; i < operations; ++i) {
|
for (int i = 0; i < operations; ++i) {
|
||||||
auto sql = "SELECT " + std::to_string(i % 10);
|
auto sql = "SELECT " + std::to_string(i % 10);
|
||||||
auto result = cache.acquire({sql});
|
auto result = cache.acquire({sql});
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue