added a timeout parameter to connection_pool::acquire and a connection_pool::try_acquire with connection id
This commit is contained in:
parent
d6de8da3c8
commit
167e2ef382
|
|
@ -4,6 +4,7 @@
|
||||||
#include "matador/sql/connection.hpp"
|
#include "matador/sql/connection.hpp"
|
||||||
#include "matador/sql/connection_info.hpp"
|
#include "matador/sql/connection_info.hpp"
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
@ -11,7 +12,6 @@
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
|
||||||
namespace matador::sql {
|
namespace matador::sql {
|
||||||
|
|
||||||
class connection_pool;
|
class connection_pool;
|
||||||
|
|
||||||
struct identifiable_connection {
|
struct identifiable_connection {
|
||||||
|
|
@ -49,7 +49,8 @@ public:
|
||||||
|
|
||||||
connection_ptr acquire();
|
connection_ptr acquire();
|
||||||
connection_ptr try_acquire();
|
connection_ptr try_acquire();
|
||||||
connection_ptr acquire(size_t id);
|
connection_ptr try_acquire(size_t id);
|
||||||
|
connection_ptr acquire(size_t id, std::chrono::milliseconds timeout = std::chrono::milliseconds(5000));
|
||||||
|
|
||||||
void release(identifiable_connection *c);
|
void release(identifiable_connection *c);
|
||||||
void release(connection_ptr &c);
|
void release(connection_ptr &c);
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,5 @@
|
||||||
#include "matador/sql/connection_pool.hpp"
|
#include "matador/sql/connection_pool.hpp"
|
||||||
|
|
||||||
#include <chrono>
|
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
|
|
@ -98,42 +97,29 @@ connection_ptr connection_pool::try_acquire() {
|
||||||
return get_next_connection();
|
return get_next_connection();
|
||||||
}
|
}
|
||||||
|
|
||||||
connection_ptr connection_pool::acquire(const size_t id) {
|
connection_ptr connection_pool::try_acquire(const size_t id) {
|
||||||
using namespace std::chrono_literals;
|
std::unique_lock lock(mutex_);
|
||||||
|
const auto it = idle_connections_.find(id);
|
||||||
|
if (it == idle_connections_.end()) {
|
||||||
|
return {nullptr, this};
|
||||||
|
}
|
||||||
|
return {it->second, this};
|
||||||
|
}
|
||||||
|
|
||||||
|
connection_ptr connection_pool::acquire(const size_t id, const std::chrono::milliseconds timeout) {
|
||||||
std::unique_lock lock(mutex_);
|
std::unique_lock lock(mutex_);
|
||||||
|
|
||||||
if (!cv.wait_for(lock,
|
if (!cv.wait_for(lock, timeout, [this, id] {
|
||||||
5s,
|
return idle_connections_.find(id) != idle_connections_.end();
|
||||||
[this, id] {
|
})) {
|
||||||
return idle_connections_.find(id) != idle_connections_.end();
|
|
||||||
})) {
|
|
||||||
return {nullptr, this};
|
return {nullptr, this};
|
||||||
}
|
}
|
||||||
|
|
||||||
auto it = idle_connections_.find(id);
|
const auto it = idle_connections_.find(id);
|
||||||
auto next_connection = it->second;
|
auto next_connection = it->second;
|
||||||
auto node = idle_connections_.extract(it);
|
auto node = idle_connections_.extract(it);
|
||||||
inuse_connections_.insert(std::move(node));
|
inuse_connections_.insert(std::move(node));
|
||||||
return {next_connection, this};
|
return {next_connection, this};
|
||||||
|
|
||||||
// using namespace std::chrono_literals;
|
|
||||||
// pointer next_connection{nullptr};
|
|
||||||
// auto try_count{0};
|
|
||||||
// std::unique_lock lock(mutex_);
|
|
||||||
//
|
|
||||||
// do {
|
|
||||||
// if (auto it = idle_connections_.find(id); it != idle_connections_.end()) {
|
|
||||||
// next_connection = it->second;
|
|
||||||
// auto node = idle_connections_.extract(it);
|
|
||||||
// inuse_connections_.insert(std::move(node));
|
|
||||||
// } else {
|
|
||||||
// lock.unlock();
|
|
||||||
// std::this_thread::sleep_for(100ms);
|
|
||||||
// lock.lock();
|
|
||||||
// }
|
|
||||||
// } while(try_count++ < 5);
|
|
||||||
//
|
|
||||||
// return {next_connection, this};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void connection_pool::release(identifiable_connection* c) {
|
void connection_pool::release(identifiable_connection* c) {
|
||||||
|
|
|
||||||
|
|
@ -57,6 +57,8 @@ TEST_CASE_METHOD(ConnectionPoolFixture, "Create connection pool", "[connection p
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_CASE_METHOD(ConnectionPoolFixture, "Acquire connection by id", "[connection pool]") {
|
TEST_CASE_METHOD(ConnectionPoolFixture, "Acquire connection by id", "[connection pool]") {
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
connection_pool pool("noop://noop.db", 4);
|
connection_pool pool("noop://noop.db", 4);
|
||||||
|
|
||||||
REQUIRE(pool.size() == 4);
|
REQUIRE(pool.size() == 4);
|
||||||
|
|
@ -69,8 +71,10 @@ TEST_CASE_METHOD(ConnectionPoolFixture, "Acquire connection by id", "[connection
|
||||||
REQUIRE(ptr.id().value() > 0);
|
REQUIRE(ptr.id().value() > 0);
|
||||||
REQUIRE(ptr->is_open());
|
REQUIRE(ptr->is_open());
|
||||||
|
|
||||||
auto same_ptr = pool.acquire(ptr.id().value());
|
auto same_ptr = pool.try_acquire(ptr.id().value());
|
||||||
|
REQUIRE(!same_ptr.valid());
|
||||||
|
|
||||||
|
same_ptr = pool.acquire(ptr.id().value(), 3000ms);
|
||||||
REQUIRE(!same_ptr.valid());
|
REQUIRE(!same_ptr.valid());
|
||||||
|
|
||||||
const auto connection_id = ptr.id().value();
|
const auto connection_id = ptr.id().value();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue