added more metrics to statement_cache
This commit is contained in:
parent
01d7179604
commit
6ac3f80887
|
|
@ -15,8 +15,8 @@ namespace matador::sql {
|
|||
class connection_pool;
|
||||
|
||||
struct statement_event {
|
||||
std::string sql;
|
||||
std::chrono::steady_clock::time_point timestamp;
|
||||
std::string sql{};
|
||||
std::chrono::steady_clock::time_point timestamp{};
|
||||
};
|
||||
|
||||
struct statement_accessed_event : statement_event {};
|
||||
|
|
@ -24,17 +24,15 @@ struct statement_added_event : statement_event {};
|
|||
struct statement_evicted_event : statement_event {};
|
||||
|
||||
struct statement_lock_failed_event : statement_event {
|
||||
std::chrono::steady_clock::time_point lock_attempt_start;
|
||||
std::chrono::nanoseconds duration{};
|
||||
};
|
||||
|
||||
struct statement_lock_acquired_event : statement_event {
|
||||
std::chrono::steady_clock::time_point lock_attempt_start;
|
||||
std::chrono::steady_clock::time_point lock_attempt_end;
|
||||
std::chrono::nanoseconds duration{};
|
||||
};
|
||||
|
||||
struct statement_execution_event : statement_event {
|
||||
std::chrono::steady_clock::time_point execution_start;
|
||||
std::chrono::steady_clock::time_point execution_end;
|
||||
std::chrono::nanoseconds duration{};
|
||||
};
|
||||
|
||||
struct statement_cache_config {
|
||||
|
|
|
|||
|
|
@ -34,18 +34,21 @@ public:
|
|||
execution_metrics metrics{std::chrono::steady_clock::now()};
|
||||
|
||||
auto result = try_with_retry([this, &bindings, &metrics]() -> utils::result<size_t, utils::error> {
|
||||
const auto query = sql();
|
||||
if (!try_lock()) {
|
||||
++metrics.lock_attempts;
|
||||
bus_.publish<statement_lock_failed_event>({sql(), std::chrono::steady_clock::now(), metrics.lock_attempt_start});
|
||||
const auto now = std::chrono::steady_clock::now();
|
||||
bus_.publish(statement_lock_failed_event{query, now, now - metrics.lock_attempt_start});
|
||||
return utils::failure(utils::error{
|
||||
error_code::STATEMENT_LOCKED,
|
||||
"Failed to execute statement because it is already in use"
|
||||
});
|
||||
}
|
||||
metrics.lock_acquired = std::chrono::steady_clock::now();
|
||||
bus_.publish<statement_lock_acquired_event>({sql(), std::chrono::steady_clock::now(), metrics.lock_attempt_start, metrics.lock_acquired});
|
||||
const auto now = std::chrono::steady_clock::now();
|
||||
bus_.publish(statement_lock_acquired_event{query, now, now - metrics.lock_attempt_start});
|
||||
|
||||
auto guard = statement_guard(*this);
|
||||
metrics.execution_start = std::chrono::steady_clock::now();
|
||||
if (const auto conn = pool_.acquire(connection_id_); !conn.valid()) {
|
||||
return utils::failure(utils::error{
|
||||
error_code::EXECUTE_FAILED,
|
||||
|
|
@ -53,11 +56,10 @@ public:
|
|||
});
|
||||
}
|
||||
|
||||
metrics.execution_start = std::chrono::steady_clock::now();
|
||||
auto execution_result = statement_->execute(bindings);
|
||||
metrics.execution_end = std::chrono::steady_clock::now();
|
||||
|
||||
bus_.publish<statement_execution_event>({sql(), std::chrono::steady_clock::now(), metrics.execution_start, metrics.execution_end});
|
||||
bus_.publish(statement_execution_event{query, metrics.execution_end, metrics.execution_end - metrics.execution_start});
|
||||
|
||||
return execution_result;
|
||||
});
|
||||
|
|
|
|||
|
|
@ -24,39 +24,33 @@ using namespace matador::utils;
|
|||
|
||||
class MetricsObserver {
|
||||
public:
|
||||
MetricsObserver(message_bus &bus) {
|
||||
|
||||
}
|
||||
void on_event(const statement_cache_event& evt) override {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
switch (evt.type) {
|
||||
case statement_cache_event::Type::LockFailed:
|
||||
explicit MetricsObserver(message_bus &bus) {
|
||||
subscriptions.push_back(bus.subscribe<statement_lock_failed_event>([this](const statement_lock_failed_event &ev) {
|
||||
std::lock_guard lock(mutex_);
|
||||
lock_failure_count_++;
|
||||
if (evt.duration) {
|
||||
total_lock_wait_time_ += evt.duration.value();
|
||||
}
|
||||
break;
|
||||
|
||||
case statement_cache_event::Type::ExecutionEnded:
|
||||
total_lock_wait_time_ += ev.duration;
|
||||
}));
|
||||
subscriptions.push_back(bus.subscribe<statement_execution_event>([this](const statement_execution_event &ev) {
|
||||
std::lock_guard lock(mutex_);
|
||||
execution_count_++;
|
||||
if (evt.duration) {
|
||||
total_execution_time_ += evt.duration.value();
|
||||
}
|
||||
break;
|
||||
}
|
||||
total_execution_time_ += ev.duration;
|
||||
}));
|
||||
|
||||
}
|
||||
|
||||
// Metrik-Zugriffsmethoden
|
||||
double get_average_lock_wait_time() const {
|
||||
std::lock_guard lock(mutex_);
|
||||
if (lock_failure_count_ == 0) return 0.0;
|
||||
if (lock_failure_count_ == 0) {
|
||||
return 0.0;
|
||||
}
|
||||
return std::chrono::duration<double>(total_lock_wait_time_).count() / lock_failure_count_;
|
||||
}
|
||||
|
||||
double get_average_execution_time() const {
|
||||
std::lock_guard lock(mutex_);
|
||||
if (execution_count_ == 0) return 0.0;
|
||||
if (execution_count_ == 0) {
|
||||
return 0.0;
|
||||
}
|
||||
return std::chrono::duration<double>(total_execution_time_).count() / execution_count_;
|
||||
}
|
||||
|
||||
|
|
@ -66,6 +60,7 @@ public:
|
|||
}
|
||||
|
||||
private:
|
||||
std::vector<subscription> subscriptions;
|
||||
mutable std::mutex mutex_;
|
||||
size_t lock_failure_count_{0};
|
||||
size_t execution_count_{0};
|
||||
|
|
@ -73,23 +68,6 @@ private:
|
|||
std::chrono::nanoseconds total_execution_time_{0};
|
||||
};
|
||||
|
||||
// void example_with_metrics() {
|
||||
// connection_pool pool("noop://noop.db", 4);
|
||||
// statement_cache cache(pool, 5);
|
||||
//
|
||||
// // Metriken-Observer hinzufügen
|
||||
// auto metrics = std::make_shared<MetricsObserver>();
|
||||
// cache.subscribe(*metrics);
|
||||
//
|
||||
// // Nach der Ausführung der Statements
|
||||
// std::cout << "Durchschnittliche Lock-Wartezeit: "
|
||||
// << metrics->get_average_lock_wait_time() << " s\n";
|
||||
// std::cout << "Durchschnittliche Ausführungszeit: "
|
||||
// << metrics->get_average_execution_time() << " s\n";
|
||||
// std::cout << "Anzahl der Lock-Failures: "
|
||||
// << metrics->get_lock_failure_count() << "\n";
|
||||
// }
|
||||
|
||||
class RecordingObserver final {
|
||||
public:
|
||||
explicit RecordingObserver(message_bus &bus) {
|
||||
|
|
@ -247,6 +225,7 @@ TEST_CASE("Multithreaded stress test", "[statement][cache][stress]") {
|
|||
message_bus bus;
|
||||
statement_cache cache(bus, pool, 5);
|
||||
RecordingObserver observer(bus);
|
||||
MetricsObserver metrics(bus);
|
||||
|
||||
auto start_time = std::chrono::steady_clock::now();
|
||||
|
||||
|
|
@ -287,6 +266,10 @@ TEST_CASE("Multithreaded stress test", "[statement][cache][stress]") {
|
|||
|
||||
std::cout << "[Performance] Executed " << (thread_count * iterations) << " statements in " << duration.count() << " ms (lock failed: " << lock_failed_count << ", execute failed: " << exec_failed_count << ")\n";
|
||||
|
||||
std::cout << "Average lock wait time: " << metrics.get_average_lock_wait_time() << "s\n";
|
||||
std::cout << "Average execution time: " << metrics.get_average_execution_time() << "s\n";
|
||||
std::cout << "Number of lock failures: " << metrics.get_lock_failure_count() << "\n";
|
||||
|
||||
// Some events should be generated
|
||||
int accessed = 0;
|
||||
while (auto e = observer.poll()) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue