added connection pool and tests

This commit is contained in:
Sascha Kühl 2025-02-05 15:47:51 +01:00
parent 45a7199ccf
commit bc3ffbda10
20 changed files with 495 additions and 53 deletions

View File

@ -10,13 +10,13 @@ include(cmake/CPM.cmake)
include(CTest)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
#set(CMAKE_CXX_STANDARD_REQUIRED ON)
#set(CMAKE_CXX_EXTENSIONS OFF)
#set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
SET(GCC_CLANG_COMMON_FLAGS "-Wall -Wextra -pedantic -ftemplate-backtrace-limit=0")
SET(GCC_CLANG_COMMON_FLAGS_DEBUG "-O0 -g -DDEBUG")
SET(GCC_CLANG_COMMON_FLAGS_RELEASE "-O1 -DNDEBUG")
#SET(GCC_CLANG_COMMON_FLAGS "-Wall -Wextra -pedantic -ftemplate-backtrace-limit=0")
#SET(GCC_CLANG_COMMON_FLAGS_DEBUG "-O0 -g -DDEBUG")
#SET(GCC_CLANG_COMMON_FLAGS_RELEASE "-O1 -DNDEBUG")
SET(CMAKE_POSITION_INDEPENDENT_CODE ON)
message(STATUS "C++ Compiler ID: ${CMAKE_CXX_COMPILER_ID}")
@ -30,9 +30,9 @@ if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set(CMAKE_CXX_FLAGS_RELEASE "${GCC_CLANG_COMMON_FLAGS_RELEASE}")
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "MSVC")
message(STATUS "MSVC detected - Adding compiler flags")
set(CMAKE_CXX_FLAGS "/W3 /EHsc /bigobj")
set(CMAKE_CXX_FLAGS_DEBUG "/MDd /Od /Zi /D_DEBUG /DDEBUG")
set(CMAKE_CXX_FLAGS_RELEASE "/O1 /DNDEBUG")
# set(CMAKE_CXX_FLAGS "/W3 /EHsc /bigobj")
# set(CMAKE_CXX_FLAGS_DEBUG "/MDd /Od /Zi /D_DEBUG /DDEBUG")
# set(CMAKE_CXX_FLAGS_RELEASE "/O1 /DNDEBUG")
add_compile_options(/Zc:preprocessor)
endif ()
#if(ENABLE_COVERAGE)

View File

@ -0,0 +1,188 @@
#ifndef QUERY_CONNECTION_POOL_HPP
#define QUERY_CONNECTION_POOL_HPP
#include "matador/sql/connection_info.hpp"
#include <chrono>
#include <mutex>
#include <string>
#include <optional>
#include <condition_variable>
#include <thread>
#include <unordered_map>
namespace matador::sql {
template < class Connection >
class connection_pool;
template < class Connection >
using IdConnection = std::pair<size_t, Connection>;
template < class Connection >
class connection_ptr
{
public:
connection_ptr(IdConnection<Connection> *c, connection_pool<Connection> *pool)
: connection_(c), pool_(pool) {}
~connection_ptr();
connection_ptr(const connection_ptr &) = delete;
connection_ptr& operator=(const connection_ptr &) = delete;
connection_ptr(connection_ptr &&x) noexcept
: connection_(x.connection_)
, pool_(x.pool_)
{
x.connection_ = nullptr;
x.pool_ = nullptr;
}
connection_ptr& operator=(connection_ptr &&x) noexcept
{
if (this == &x) {
return *this;
}
std::swap(connection_, x.connection_);
std::swap(pool_, x.pool_);
return *this;
}
Connection* operator->() { return &connection_->second; }
Connection& operator*() { return connection_->second; }
[[nodiscard]] std::optional<size_t> id() const
{
if (connection_) {
return connection_->first;
} else {
return std::nullopt;
}
}
[[nodiscard]] bool valid() const { return connection_ != nullptr; }
private:
friend class connection_pool<Connection>;
IdConnection<Connection> *connection_{};
connection_pool<Connection> *pool_{};
};
template < class Connection >
class connection_pool
{
public:
using connection_pointer = connection_ptr<Connection>;
public:
connection_pool(const std::string &dns, size_t count)
: info_(connection_info::parse(dns)) {
connection_repo_.reserve(count);
while (count) {
connection_repo_.emplace_back(count, info_);
auto &conn = connection_repo_.back();
idle_connections_.emplace(conn.first, &conn);
conn.second.open();
--count;
}
}
connection_pointer acquire() {
std::unique_lock<std::mutex> lock(mutex_);
while (idle_connections_.empty()) {
cv.wait(lock);
}
return get_next_connection();
}
connection_pointer try_acquire() {
std::unique_lock<std::mutex> lock(mutex_);
if (idle_connections_.empty()) {
return {nullptr, this};
}
return get_next_connection();
}
connection_pointer acquire(size_t id) {
using namespace std::chrono_literals;
pointer next_connection{nullptr};
auto try_count{0};
std::unique_lock<std::mutex> lock(mutex_);
do {
if (auto it = idle_connections_.find(id); it != idle_connections_.end()) {
next_connection = it->second;
auto node = idle_connections_.extract(it);
inuse_connections_.insert(std::move(node));
} else {
lock.unlock();
std::this_thread::sleep_for(100ms);
lock.lock();
}
} while(try_count++ < 5);
return {next_connection, this};
}
void release(IdConnection<Connection> *c) {
if (c == nullptr) {
return;
}
std::unique_lock<std::mutex> lock(mutex_);
if (auto it = inuse_connections_.find(c->first); it != inuse_connections_.end()) {
auto node = inuse_connections_.extract(it);
idle_connections_.insert(std::move(node));
}
}
void release(connection_ptr<Connection> &c) {
release(c.connection_);
c.connection_ = nullptr;
}
std::size_t size() const { return connection_repo_.size(); }
std::size_t idle() const {
std::lock_guard<std::mutex> guard(mutex_);
return idle_connections_.size();
}
std::size_t inuse() const {
std::lock_guard<std::mutex> guard(mutex_);
return inuse_connections_.size();
}
const connection_info &info() const {
return info_;
}
private:
connection_pointer get_next_connection() {
pointer next_connection{nullptr};
for (auto &item : idle_connections_) {
next_connection = item.second;
auto node = idle_connections_.extract(item.first);
inuse_connections_.insert(std::move(node));
break;
}
return {next_connection, this};
}
private:
mutable std::mutex mutex_;
std::condition_variable cv;
std::vector<IdConnection<Connection>> connection_repo_;
using pointer = IdConnection<Connection>*;
using connection_map = std::unordered_map<size_t, pointer>;
connection_map inuse_connections_;
connection_map idle_connections_;
const connection_info info_;
};
template<class Connection>
connection_ptr<Connection>::~connection_ptr() {
pool_->release(connection_);
}
}
#endif //QUERY_CONNECTION_POOL_HPP

View File

@ -125,7 +125,7 @@ private:
friend class dialect_builder;
next_placeholder_func placeholder_func_ = [](size_t) { return "?"; };
// to_escaped_string_func to_escaped_string_func_ = [](const utils::blob &val) { return utils::to_string(val); };
to_escaped_string_func to_escaped_string_func_ = [](const utils::blob &val) { return utils::to_string(val); };
escape_identifier_t identifier_escape_type_ = escape_identifier_t::ESCAPE_BOTH_SAME;

View File

@ -17,6 +17,7 @@ public:
dialect_builder& with_placeholder_func(const dialect::next_placeholder_func &func);
dialect_builder& with_default_schema_name(const std::string &schema_name);
dialect_builder& with_bool_strings(const std::string &true_string, const std::string &false_string);
dialect_builder& with_escape_string_func(const dialect::to_escaped_string_func &func);
dialect build();

View File

@ -5,7 +5,7 @@ namespace matador::utils {
struct placeholder {};
inline constexpr bool operator==(const placeholder&, const placeholder&) { return true; }
constexpr bool operator==(const placeholder&, const placeholder&) { return true; }
static constexpr placeholder _;

View File

@ -18,6 +18,7 @@ using database_type = std::variant<
int8_t, int16_t, int32_t, int64_t,
float, double,
bool,
const char*,
std::string,
blob,
nullptr_t>;

View File

@ -9,7 +9,7 @@ query_update_intermediate::query_update_intermediate(const sql::table& table)
context_->parts.push_back(std::make_unique<internal::query_update_part>(table));
}
query_set_intermediate query_update_intermediate::set(std::initializer_list<internal::key_value_pair> columns)
query_set_intermediate query_update_intermediate::set(const std::initializer_list<internal::key_value_pair> columns)
{
return set(std::vector<internal::key_value_pair>{columns});
}

View File

@ -124,8 +124,7 @@ std::string dialect::to_escaped_string(const utils::blob &value, const connectio
return conn->to_escaped_string(value);
}
return {};
// return to_escaped_string_func_(value);
return to_escaped_string_func_(value);
}
std::string dialect::default_schema_name() const

View File

@ -52,6 +52,12 @@ dialect_builder &dialect_builder::with_bool_strings(const std::string &true_stri
return *this;
}
dialect_builder& dialect_builder::with_escape_string_func( const dialect::to_escaped_string_func& func ) {
dialect_.to_escaped_string_func_ = func;
return *this;
}
dialect dialect_builder::build()
{
return std::move(dialect_);

View File

@ -1,8 +1,6 @@
#include "matador/sql/interface/query_result_reader.hpp"
#include "matador/utils/basic_types.hpp"
#include "matador/utils/convert.hpp"
#include "matador/utils/string.hpp"
#include "matador/utils/value.hpp"
namespace matador::sql {
@ -19,20 +17,4 @@ void convert(const char *val_str, utils::value &val)
val = *res;
}
// utils::blob query_result_reader::read_blob(const size_t index)
// {
// const auto *data = column(index);
// if (data == nullptr) {
// return {};
// }
// const auto len = strlen(data);
// const auto *bytes = reinterpret_cast<const unsigned char*>(data);
// return utils::blob{bytes, bytes+len};
// }
// utils::attribute_reader &query_result_reader::result_binder()
// {
// return empty_result_binder_;
// }
}

View File

@ -7,19 +7,22 @@ add_executable(OrmTests
backend/test_backend_service.hpp
backend/test_connection.cpp
backend/test_connection.hpp
query/ConditionTests.cpp
query/QueryBuilderTest.cpp
query/QueryFixture.cpp
query/QueryFixture.hpp
sql/ColumnTest.cpp
sql/FieldTest.cpp
backend/test_parameter_binder.cpp
backend/test_parameter_binder.hpp
backend/test_result_reader.cpp
backend/test_result_reader.hpp
backend/test_statement.cpp
backend/test_statement.hpp
backend/test_parameter_binder.cpp
backend/test_parameter_binder.hpp
query/ConditionTests.cpp
query/QueryBuilderTest.cpp
query/QueryFixture.cpp
query/QueryFixture.hpp
query/QueryTest.cpp
sql/ColumnTest.cpp
sql/ConnectionPoolTest.cpp
sql/FieldTest.cpp
utils/auto_reset_event.cpp
utils/auto_reset_event.hpp
)
target_link_libraries(OrmTests matador-orm matador-core Catch2::Catch2WithMain)

View File

@ -24,7 +24,9 @@ void test_backend_service::destroy(sql::connection_impl *impl)
const sql::dialect *test_backend_service::dialect() const
{
static sql::dialect dialect_ = sql::dialect_builder::builder().create().build();
static sql::dialect dialect_ = sql::dialect_builder::builder()
.create()
.build();
return &dialect_;
}

View File

@ -87,6 +87,7 @@ void test_result_reader::read_value(const char *id, const size_t index, utils::v
val = "value";
}
utils::attribute_reader &test_result_reader::result_binder() {
return query_result_reader::result_binder();
return empty_binder_;
}
} // namespace matador::test::orm

View File

@ -5,6 +5,33 @@
namespace matador::test::orm {
namespace detail {
class empty_binder final : public utils::attribute_reader
{
public:
void read_value(const char *, size_t, int8_t &) override {}
void read_value(const char *, size_t, int16_t &) override {}
void read_value(const char *, size_t, int32_t &) override {}
void read_value(const char *, size_t, int64_t &) override {}
void read_value(const char *, size_t, uint8_t &) override {}
void read_value(const char *, size_t, uint16_t &) override {}
void read_value(const char *, size_t, uint32_t &) override {}
void read_value(const char *, size_t, uint64_t &) override {}
void read_value(const char *, size_t, bool &) override {}
void read_value(const char *, size_t, float &) override {}
void read_value(const char *, size_t, double &) override {}
void read_value(const char *, size_t, time &) override {}
void read_value(const char *, size_t, date &) override {}
void read_value(const char *, size_t, char *, size_t) override {}
void read_value(const char *, size_t, std::string &) override {}
void read_value(const char *, size_t, std::string &, size_t) override {}
void read_value(const char *, size_t, utils::blob &) override {}
void read_value(const char *, size_t, utils::value &, size_t) override {}
};
}
class test_result_reader final : public sql::query_result_reader {
public:
[[nodiscard]] size_t column_count() const override;
@ -36,6 +63,7 @@ protected:
private:
uint8_t rows_{5};
detail::empty_binder empty_binder_;
};
}

View File

@ -22,7 +22,7 @@ using namespace matador::utils;
TEST_CASE_METHOD(QueryFixture, "Test create table sql statement string", "[query]") {
auto result = query::create()
.table({"person"}, {
make_pk_column<unsigned long>("id"),
make_pk_column<uint32_t>("id"),
make_column<std::string>("name", 255),
make_column<unsigned short>("age")
}).str(*db);
@ -31,10 +31,10 @@ TEST_CASE_METHOD(QueryFixture, "Test create table sql statement string", "[query
result = query::create()
.table("person", {
make_pk_column<unsigned long>("id"),
make_pk_column<uint32_t>("id"),
make_column<std::string>("name", {255, constraints::UNIQUE}, null_option::NOT_NULL),
make_column<unsigned short>("age"),
make_fk_column<unsigned long>("address", "address", "id")
make_fk_column<uint32_t>("address", "address", "id")
}).str(*db);
REQUIRE(result == R"##(CREATE TABLE "person" ("id" BIGINT NOT NULL, "name" VARCHAR(255) NOT NULL UNIQUE, "age" INTEGER NOT NULL, "address" BIGINT NOT NULL, CONSTRAINT PK_person PRIMARY KEY (id), CONSTRAINT FK_person_address FOREIGN KEY (address) REFERENCES address(id)))##");
@ -61,7 +61,7 @@ TEST_CASE_METHOD(QueryFixture, "Test insert sql statement string", "[query]") {
.into("person", {
"id", "name", "age"
})
.values({7UL, "george", 65U})
.values({7U, "george", 65U})
.str(*db);
REQUIRE(result == R"(INSERT INTO "person" ("id", "name", "age") VALUES (7, 'george', 65))");
@ -70,7 +70,7 @@ TEST_CASE_METHOD(QueryFixture, "Test insert sql statement string", "[query]") {
TEST_CASE_METHOD(QueryFixture, "Test update sql statement string", "[query]") {
auto result = query::update("person")
.set({
{"id", 7UL},
{"id", 7U},
{"name", "george"},
{"age", 65U}
})
@ -80,7 +80,7 @@ TEST_CASE_METHOD(QueryFixture, "Test update sql statement string", "[query]") {
result = query::update("person")
.set({
{"id", 7UL},
{"id", 7U},
{"name", "george"},
{"age", 65U}
})
@ -95,7 +95,7 @@ TEST_CASE_METHOD(QueryFixture, "Test update sql statement string", "[query]") {
TEST_CASE_METHOD(QueryFixture, "Test update limit sql statement", "[query][update][limit]") {
const auto result = query::update("person")
.set({{"id", 7UL}, {"name", "george"}, {"age", 65U}})
.set({{"id", 7U}, {"name", "george"}, {"age", 65U}})
.where("name"_col == "george")
.order_by("id"_col).asc()
.limit(2)
@ -185,7 +185,7 @@ TEST_CASE_METHOD(QueryFixture, "Test select sql statement string with offset and
TEST_CASE_METHOD(QueryFixture, "Test create, insert and select a blob column", "[query][blob]") {
auto result = query::create()
.table("person", {
make_pk_column<unsigned long>("id"),
make_pk_column<uint32_t>("id"),
make_column<std::string>("name", 255),
make_column<blob>("data")
})
@ -195,7 +195,7 @@ TEST_CASE_METHOD(QueryFixture, "Test create, insert and select a blob column", "
result = query::insert()
.into("person", {"id", "name", "data"})
.values({7UL, "george", blob{1, 'A', 3, 4}})
.values({7U, "george", blob{1, 'A', 3, 4}})
.str(*db);
REQUIRE(result == R"(INSERT INTO "person" ("id", "name", "data") VALUES (7, 'george', X'01410304'))");

View File

@ -21,7 +21,7 @@ TEST_CASE("Test create empty column", "[column]") {
c.set(7);
REQUIRE(c.type() == basic_type::type_int32);
REQUIRE(c.as<std::string>() == "7");
REQUIRE(c.as<long>() == 7);
REQUIRE(c.as<int>() == 7);
REQUIRE(c.str() == "7");
}

View File

@ -0,0 +1,176 @@
#include <catch2/catch_test_macros.hpp>
#include "matador/sql/backend_provider.hpp"
#include "matador/sql/connection.hpp"
#include "matador/sql/connection_pool.hpp"
#include "../backend/test_connection.hpp"
#include "../backend/test_backend_service.hpp"
#include "../utils/auto_reset_event.hpp"
using namespace matador::sql;
using namespace matador::test::utils;
using namespace matador::test::orm;
namespace matador::test::orm {
class ConnectionPoolFixture {
public:
ConnectionPoolFixture() {
backend_provider::instance().register_backend("noop", std::make_unique<test_backend_service>());
db = std::make_unique<sql::connection>("noop://noop.db");
}
~ConnectionPoolFixture() = default;
protected:
std::unique_ptr<connection> db;
};
}
TEST_CASE_METHOD(ConnectionPoolFixture, "Create connection pool", "[connection pool]") {
using pool_t = connection_pool<test_connection>;
pool_t pool("noop://noop.db", 4);
REQUIRE(pool.size() == 4);
REQUIRE(pool.idle() == 4);
REQUIRE(pool.inuse() == 0);
auto ptr = pool.acquire();
REQUIRE(ptr.valid());
REQUIRE(ptr.id().value() > 0);
REQUIRE(ptr->is_open());
REQUIRE(pool.idle() == 3);
REQUIRE(pool.inuse() == 1);
pool.release(ptr);
REQUIRE(!ptr.valid());
REQUIRE(pool.idle() == 4);
REQUIRE(pool.inuse() == 0);
ptr = pool.acquire(3);
REQUIRE(ptr.valid());
REQUIRE(ptr.id() == 3);
REQUIRE(ptr->is_open());
{
auto ptr2 = pool.acquire();
REQUIRE(ptr2.valid());
REQUIRE(ptr2->is_open());
REQUIRE(pool.idle() == 2);
REQUIRE(pool.inuse() == 2);
}
REQUIRE(pool.idle() == 3);
REQUIRE(pool.inuse() == 1);
pool.release(ptr);
REQUIRE(!ptr.valid());
REQUIRE(pool.idle() == 4);
REQUIRE(pool.inuse() == 0);
}
TEST_CASE_METHOD(ConnectionPoolFixture, "Acquire connection by id", "[connection pool]") {
using pool_t = connection_pool<test_connection>;
pool_t pool("noop://noop.db", 4);
REQUIRE(pool.size() == 4);
REQUIRE(pool.idle() == 4);
REQUIRE(pool.inuse() == 0);
auto ptr = pool.acquire();
REQUIRE(ptr.valid());
REQUIRE(ptr.id());
REQUIRE(ptr.id().value() > 0);
REQUIRE(ptr->is_open());
auto same_ptr = pool.acquire(ptr.id().value());
REQUIRE(!same_ptr.valid());
const auto connection_id = ptr.id().value();
pool.release(ptr);
REQUIRE(!ptr.valid());
same_ptr = pool.acquire(connection_id);
REQUIRE(same_ptr.valid());
REQUIRE(same_ptr.id() == connection_id);
}
TEST_CASE("Try acquire connection", "[connection pool][try acquire]") {
using pool_t = connection_pool<test_connection>;
pool_t pool("noop://noop.db", 1);
REQUIRE(pool.size() == 1);
REQUIRE(pool.idle() == 1);
REQUIRE(pool.inuse() == 0);
auto ptr = pool.try_acquire();
REQUIRE(ptr.valid());
REQUIRE(ptr.id());
REQUIRE(ptr.id().value() > 0);
REQUIRE(ptr->is_open());
REQUIRE(pool.size() == 1);
REQUIRE(pool.idle() == 0);
REQUIRE(pool.inuse() == 1);
auto ptr2 = pool.try_acquire();
REQUIRE(!ptr2.valid());
pool.release(ptr);
REQUIRE(!ptr.valid());
REQUIRE(pool.size() == 1);
REQUIRE(pool.idle() == 1);
REQUIRE(pool.inuse() == 0);
ptr2 = pool.try_acquire();
REQUIRE(ptr2.valid());
REQUIRE(ptr2.id());
REQUIRE(ptr2.id().value() > 0);
REQUIRE(ptr2->is_open());
REQUIRE(pool.size() == 1);
REQUIRE(pool.idle() == 0);
REQUIRE(pool.inuse() == 1);
pool.release(ptr2);
auto_reset_event reset_main_event;
auto_reset_event reset_thread_event;
std::thread t([&reset_main_event, &reset_thread_event, &pool]() {
auto c1 = pool.acquire();
REQUIRE(c1.valid());
REQUIRE(c1.id());
REQUIRE(c1.id().value() > 0);
reset_main_event.set();
reset_thread_event.wait_one();
pool.release(c1);
REQUIRE(!c1.valid());
reset_main_event.set();
});
reset_main_event.wait_one();
ptr2 = pool.try_acquire();
REQUIRE(!ptr2.valid());
reset_thread_event.set();
reset_main_event.wait_one();
ptr2 = pool.try_acquire();
REQUIRE(ptr2.valid());
REQUIRE(ptr2.id());
REQUIRE(ptr2.id().value() > 0);
t.join();
}

View File

@ -16,7 +16,7 @@ TEST_CASE("Test field", "[field]") {
REQUIRE(!f.is_bool());
REQUIRE(!f.is_string());
f = 7UL;
f = 7U;
REQUIRE(!f.is_null());
REQUIRE(f.is_integer());
REQUIRE(!f.is_floating_point());

View File

@ -0,0 +1,27 @@
#include "auto_reset_event.hpp"
namespace matador::test::utils {
auto_reset_event::auto_reset_event() : state(false) {}
void auto_reset_event::wait_one()
{
std::unique_lock<std::mutex> lock(sync);
underlying.wait(lock, [this](){return state.load();});
state = false;
}
void auto_reset_event::set()
{
std::unique_lock<std::mutex> lock(sync);
state = true;
underlying.notify_one();
}
void auto_reset_event::reset()
{
std::unique_lock<std::mutex> lock(sync);
state = false;
}
}

View File

@ -0,0 +1,28 @@
#ifndef QUERY_AUTO_RESET_EVENT_HPP
#define QUERY_AUTO_RESET_EVENT_HPP
#include <atomic>
#include <condition_variable>
#include <mutex>
namespace matador::test::utils {
class auto_reset_event
{
public:
auto_reset_event();
auto_reset_event(const auto_reset_event& other) = delete;
void wait_one();
void set();
void reset();
private:
std::condition_variable underlying;
std::mutex sync;
std::atomic<bool> state;
};
}
#endif //QUERY_AUTO_RESET_EVENT_HPP