integrated object cache into session and added message bus to object_cache and update tests

This commit is contained in:
sascha 2026-05-22 15:26:16 +02:00
parent ddf19bbf07
commit 5ec7c13420
8 changed files with 212 additions and 21 deletions

View File

@ -5,6 +5,7 @@
#include "matador/object/object_resolver.hpp"
#include "matador/utils/identifier.hpp"
#include "matador/utils/message_bus.hpp"
#include <unordered_map>
#include <memory>
@ -35,6 +36,23 @@ struct cache_entry : cache_entry_base {
}
};
struct object_cache_event {
std::type_index type{typeid(void)};
utils::identifier id{};
std::chrono::steady_clock::time_point timestamp{};
};
struct object_cache_accessed_event : object_cache_event {};
struct object_cache_proxy_added_event : object_cache_event {};
struct object_cache_entity_attached_event : object_cache_event {};
struct object_cache_imported_event : object_cache_event {};
struct object_cache_erased_event : object_cache_event {};
struct object_cache_sweep_event {
std::size_t removed{};
std::chrono::steady_clock::time_point timestamp{};
};
/**
* @brief Thread-sicherer Cache für Objekt-Proxies und (optional) geladene Entities.
*
@ -54,7 +72,8 @@ public:
/**
* @brief Erzeugt einen leeren Cache.
*/
object_cache() = default;
explicit object_cache(utils::message_bus &bus)
: bus_(bus) {}
/**
* @brief Liefert einen Proxy für (T, id) und erstellt ihn bei Bedarf.
@ -83,6 +102,7 @@ public:
// found entry, return std::shared_ptr of proxy
auto *entry = entry_cast_<Type>(it->second.get());
if (auto proxy_ptr = entry->proxy.lock()) {
bus_.publish<object_cache_accessed_event>({k.type, id, std::chrono::steady_clock::now()});
return proxy_ptr;
}
@ -96,6 +116,8 @@ public:
entry->proxy = proxy_ptr;
bus_.publish<object_cache_proxy_added_event>({k.type, id, std::chrono::steady_clock::now()});
// return the shared_ptr of the proxy
return proxy_ptr;
}
@ -117,10 +139,65 @@ public:
entry->proxy = proxy_ptr;
map_.emplace(k, std::move(entry_ptr));
bus_.publish<object_cache_proxy_added_event>({k.type, id, std::chrono::steady_clock::now()});
// return the shared_ptr of the proxy
return proxy_ptr;
}
template<typename Type, typename ResolverPointerType>
bool import(const utils::identifier &id, const std::shared_ptr<object_proxy<Type>> &proxy, ResolverPointerType &&resolver_ptr) {
if (!proxy) {
return false;
}
auto obj = proxy->object();
auto weak_resolver = to_weak<Type>(std::forward<ResolverPointerType>(resolver_ptr));
const auto k = make_key<Type>(id);
std::unique_lock lock(mutex_);
auto it = map_.find(k);
if (it != map_.end()) {
auto *entry = entry_cast_<Type>(it->second.get());
if (auto existing_proxy = entry->proxy.lock()) {
if (existing_proxy != proxy) {
return false;
}
}
proxy->primary_key(id);
proxy->resolver(std::move(weak_resolver));
if (obj) {
entry->entity = obj;
proxy->attach(std::move(obj));
}
entry->proxy = proxy;
bus_.publish<object_cache_imported_event>({k.type, id, std::chrono::steady_clock::now()});
return true;
}
auto entry_ptr = std::make_unique<cache_entry<Type>>();
auto *entry = entry_ptr.get();
proxy->primary_key(id);
proxy->resolver(std::move(weak_resolver));
if (obj) {
entry->entity = obj;
proxy->attach(std::move(obj));
}
entry->proxy = proxy;
map_.emplace(k, std::move(entry_ptr));
bus_.publish<object_cache_imported_event>({k.type, id, std::chrono::steady_clock::now()});
return true;
}
/**
* @brief Verknüpft eine geladene Entity mit einem Cache-Eintrag (Type, id).
*
@ -145,6 +222,7 @@ public:
auto *entry = entry_ptr.get();
entry->entity = obj;
map_.emplace(k, std::move(entry_ptr));
bus_.publish<object_cache_entity_attached_event>({k.type, id, std::chrono::steady_clock::now()});
return;
}
@ -155,6 +233,8 @@ public:
proxy->attach(std::move(obj));
}
bus_.publish<object_cache_entity_attached_event>({k.type, id, std::chrono::steady_clock::now()});
prune_if_dead(it);
}
@ -179,6 +259,9 @@ public:
auto *entry = entry_cast_<Type>(it->second.get());
auto entity_ptr = entry->entity.lock();
if (entity_ptr) {
bus_.publish<object_cache_accessed_event>({k.type, id, std::chrono::steady_clock::now()});
}
prune_if_dead(it);
return entity_ptr;
}
@ -204,6 +287,9 @@ public:
auto *entry = entry_cast_<Type>(it->second.get());
const bool loaded = !entry->entity.expired();
if (loaded) {
bus_.publish<object_cache_accessed_event>({k.type, id, std::chrono::steady_clock::now()});
}
prune_if_dead(it);
return loaded;
}
@ -224,6 +310,7 @@ public:
}
map_.erase(it);
bus_.publish<object_cache_erased_event>({k.type, id, std::chrono::steady_clock::now()});
}
/**
@ -246,6 +333,10 @@ public:
}
}
if (removed > 0) {
bus_.publish<object_cache_sweep_event>({removed, std::chrono::steady_clock::now()});
}
return removed;
}
@ -325,6 +416,7 @@ private:
private:
mutable std::shared_mutex mutex_{};
utils::message_bus &bus_;
std::unordered_map<key, std::unique_ptr<cache_entry_base>, key_hash> map_;
};

View File

@ -53,6 +53,18 @@ public:
}
}
void resolver(std::weak_ptr<object_resolver<Type>> resolver) {
std::lock_guard lock(mutex_);
resolver_ = std::move(resolver);
}
[[nodiscard]] std::shared_ptr<Type> object() const {
if (!obj_) {
std::ignore = resolve();
}
return obj_;
}
void invalidate() {
std::lock_guard lock(mutex_);
obj_.reset();

View File

@ -57,6 +57,8 @@ public:
void reset() { proxy_.reset(); }
void reset(const std::shared_ptr<object_proxy<Type>>& proxy) { proxy_ = proxy; }
[[nodiscard]] std::shared_ptr<object_proxy<Type>> proxy() const { return proxy_; }
operator bool() const { return valid(); }
[[nodiscard]] bool valid() const { return proxy_ != nullptr && !proxy_->empty(); }

View File

@ -3,21 +3,25 @@
#include <utility>
#include "matador/utils/identifier.hpp"
#include "matador/utils/primary_key_accessor.hpp"
#include "matador/utils/error.hpp"
#include "matador/utils/result.hpp"
#include "matador/object/object_cache.hpp"
#include "matador/object/object_ptr.hpp"
#include "matador/sql/query_context.hpp"
#include "matador/sql/executor.hpp"
#include "matador/sql/execute_result.hpp"
#include "matador/sql/executor.hpp"
#include "matador/sql/statement.hpp"
#include "matador/query/error_code.hpp"
#include "matador/query/abstract_pk_generator.hpp"
namespace matador::query {
using resolver_service_ptr = std::shared_ptr<sql::resolver_service>;
class insert_step {
public:
explicit insert_step(sql::query_context ctx)
@ -26,13 +30,41 @@ public:
virtual utils::result<void, utils::error> prepare(sql::executor &conn) = 0;
virtual utils::result<void, utils::error> insert(sql::statement &stmt) = 0;
virtual utils::result<void, utils::error> finalize(object::object_cache& cache, const resolver_service_ptr& resolver_service) = 0;
[[nodiscard]] const sql::query_context& ctx() const { return ctx_; }
protected:
utils::identifier id_;
utils::primary_key_accessor pk_accessor_;
sql::query_context ctx_;
};
template<typename ObjectType>
utils::result<void, utils::error> finalize_inserted_object(object::object_ptr<ObjectType> &ptr,
const utils::identifier& pk,
object::object_cache &cache,
const std::shared_ptr<sql::resolver_service> &resolver_service) {
if (!ptr) {
return utils::failure(utils::error(error_code::InvalidObject, "Inserted object is null."));
}
if (!resolver_service) {
return utils::failure(utils::error(error_code::UnknownType, "Missing resolver service."));
}
auto resolver = resolver_service->template object_resolver<ObjectType>();
if (!resolver) {
return utils::failure(utils::error(error_code::UnknownType, "Missing object resolver for inserted type."));
}
if (!cache.import<ObjectType>(pk, ptr.proxy(), resolver)) {
return utils::failure(utils::error(error_code::FailedToFindObject, "Object cache already contains another live proxy for inserted object."));
}
ptr.change_state(object::object_state::Persistent);
return utils::ok<void>();
}
template <typename ObjectType>
class insert_step_pk_generated : public insert_step {
public:
@ -46,7 +78,8 @@ public:
if (!result.is_ok()) {
return utils::failure(result.err());
}
pk_accessor_.set(*ptr_, utils::identifier{*result});
id_ = *result;
pk_accessor_.set(*ptr_, id_);
return utils::ok<void>();
}
@ -62,6 +95,10 @@ public:
return utils::ok<void>();
}
utils::result<void, utils::error> finalize(object::object_cache& cache, const resolver_service_ptr& resolver_service) override {
return finalize_inserted_object(ptr_, id_, cache, resolver_service);
}
private:
object::object_ptr<ObjectType> ptr_;
abstract_pk_generator& pk_generator_;
@ -92,16 +129,19 @@ public:
auto rec = result->value();
const auto& f = rec.at(pk_column_name_);
utils::identifier id;
if (auto res = id.assign(f.value()); !res.is_ok()) {
if (auto res = id_.assign(f.value()); !res.is_ok()) {
return utils::failure(res.err());
}
pk_accessor_.set(*ptr_, id);
pk_accessor_.set(*ptr_, id_);
ptr_.change_state(object::object_state::Persistent);
return utils::ok<void>();
}
utils::result<void, utils::error> finalize(object::object_cache& cache, const resolver_service_ptr& resolver_service) override {
return finalize_inserted_object(ptr_, id_, cache, resolver_service);
}
private:
object::object_ptr<ObjectType> ptr_;
std::string pk_column_name_;
@ -123,10 +163,15 @@ public:
return utils::failure(exec_result.err());
}
id_ = object::primary_key_resolver::resolve_object(*ptr_).pk;
ptr_.change_state(object::object_state::Persistent);
return utils::ok<void>();
}
utils::result<void, utils::error> finalize(object::object_cache& cache, const resolver_service_ptr& resolver_service) override {
return finalize_inserted_object(ptr_, id_, cache, resolver_service);
}
private:
object::object_ptr<ObjectType> ptr_;
};
@ -150,6 +195,10 @@ public:
return utils::ok<void>();
}
utils::result<void, utils::error> finalize(object::object_cache& /*cache*/, const resolver_service_ptr& /*resolver_service*/) override {
return utils::ok<void>();
}
private:
object::object_ptr<ObjectType> ptr_;
};

View File

@ -16,6 +16,7 @@
#include "matador/sql/statement.hpp"
#include "matador/sql/statement_cache.hpp"
#include "matador/object/object_cache.hpp"
#include "matador/object/object_ptr.hpp"
#include <unordered_map>
@ -69,6 +70,8 @@ private:
mutable sql::statement_cache cache_;
const sql::dialect &dialect_;
object::object_cache object_cache_;
const basic_schema &schema_;
mutable std::unordered_map<std::string, std::vector<object::attribute> > prototypes_;
std::shared_ptr<sql::resolver_service> resolver_service_;
@ -114,7 +117,13 @@ utils::result<object::object_ptr<Type>, utils::error> session::insert(object::ob
}
}
obj.change_state(object::object_state::Persistent);
// After successfully executed all inserts, add them to the object cache
for (auto &step : *steps) {
if (const auto result = step->finalize(object_cache_, resolver_service_); !result.is_ok()) {
return utils::failure(result.err());
}
}
return utils::ok(obj);
}
@ -233,6 +242,15 @@ utils::result<object::object_ptr<Type>, utils::error> session::find(const Primar
return utils::failure(make_error(error_code::FailedToFindPrimaryKey, "Type hasn't primary key."));
}
auto resolver = resolver_service_->template object_resolver<Type>();
if (!resolver) {
return utils::failure(utils::error(error_code::UnknownType, "Missing object resolver for inserted type."));
}
if (object_cache_.is_loaded<Type>(utils::identifier{pk})) {
return utils::ok(object::object_ptr(object_cache_.acquire_proxy<Type>(utils::identifier{pk}, resolver)));
}
select_query_builder eqb(schema_);
auto data = eqb.build<Type>(*it->second.table().primary_key_column() == pk);
if (!data.is_ok()) {

View File

@ -18,6 +18,7 @@ session::session(session_context&& ctx, const basic_schema &scm)
: pool_(ctx.dns, ctx.connection_count, [ctx](const sql::connection_info& info) { return sql::connection(info, ctx.resolver_service); })
, cache_(ctx.bus, pool_, ctx.cache_size)
, dialect_(sql::backend_provider::instance().connection_dialect(pool_.info().type))
, object_cache_(ctx.bus)
, schema_(scm)
, resolver_service_(ctx.resolver_service) {
using namespace matador::utils;

View File

@ -79,6 +79,7 @@ TEST_CASE_METHOD(SessionFixture, "Test insert object with has many relation", "[
session ses({bus, connection::dns, 4}, schema);
{
auto s_king = make_object<author>(1, "Steven", "King", "21.9.1947", 1956, false);
s_king->books.push_back(make_object<book>(2, "Carrie", nullobj, 1974));
@ -91,6 +92,17 @@ TEST_CASE_METHOD(SessionFixture, "Test insert object with has many relation", "[
auto res = ses.insert(s_king);
REQUIRE(res);
validate_author_state(s_king, object_state::Persistent);
auto author_result = ses.find<author>(s_king->id);
REQUIRE(author_result);
REQUIRE(author_result->is_persistent());
REQUIRE(author_result.value()->books.size() == 5);
}
auto author_result = ses.find<author>(1);
REQUIRE(author_result);
REQUIRE(author_result->is_persistent());
REQUIRE(author_result.value()->books.size() == 5);
}
TEST_CASE_METHOD(SessionFixture, "Test insert object with has many relation with identity", "[session][insert][has_many][identity]") {

View File

@ -2,7 +2,9 @@
#include "matador/object/object_cache.hpp"
#include "matador/object/object_resolver.hpp"
#include "matador/utils/identifier.hpp"
#include "matador/utils/message_bus.hpp"
#include "../test/models/person.hpp"
@ -58,7 +60,8 @@ private:
} // namespace
TEST_CASE("object_cache: acquire_proxy returns the same proxy instance across threads", "[object][cache][threadsafe]") {
matador::object::object_cache cache;
matador::utils::message_bus bus;
matador::object::object_cache cache(bus);
const matador::utils::identifier id{123};
std::atomic_int calls{0};
@ -89,7 +92,8 @@ TEST_CASE("object_cache: acquire_proxy returns the same proxy instance across th
}
TEST_CASE("object_cache: attach_entity makes is_loaded/get_entity reflect presence", "[object][cache]") {
matador::object::object_cache cache;
matador::utils::message_bus bus;
matador::object::object_cache cache(bus);
const matador::utils::identifier id{42};
REQUIRE_FALSE(cache.is_loaded<person>(id));
@ -107,7 +111,8 @@ TEST_CASE("object_cache: attach_entity makes is_loaded/get_entity reflect presen
}
TEST_CASE("object_cache: erase invalidates existing proxies", "[object][cache]") {
matador::object::object_cache cache;
matador::utils::message_bus bus;
matador::object::object_cache cache(bus);
const matador::utils::identifier id{9};
std::atomic_int calls{0};