added thread pool with tests

This commit is contained in:
Sascha Kühl 2025-06-26 16:04:17 +02:00
parent dafa81aef0
commit 2271702e6c
6 changed files with 325 additions and 1 deletions

View File

@ -11,7 +11,7 @@ namespace matador::utils {
template < typename ValueType, typename ErrorType >
class [[nodiscard]] result;
template <typename ValueType>
template <typename>
struct is_result : std::false_type {};
template < typename ValueType, typename ErrorType >

View File

@ -0,0 +1,145 @@
#ifndef MATADOR_THREAD_POOL_HPP
#define MATADOR_THREAD_POOL_HPP
#include "matador/utils/result.hpp"
#include <thread>
#include <vector>
#include <future>
#include <atomic>
#include <functional>
#include <deque>
#include <mutex>
#include <condition_variable>
namespace matador::utils {
class cancel_token {
public:
cancel_token() = default;
void request_cancel() {
cancelled_.store(true, std::memory_order_relaxed);
}
bool is_cancelled() const {
return cancelled_.load(std::memory_order_relaxed);
}
private:
std::atomic_bool cancelled_{false};
};
template<typename T>
using result_fut = result<std::future<T>, std::string>;
/**
* Simple thread pool class. The pool
* consists of a given number of threads and
* a queue of tasks to be executed.
*/
class thread_pool {
public:
thread_pool(const thread_pool&) = delete;
thread_pool& operator=(const thread_pool&) = delete;
/**
* Creating a new thread pool with given
* numbers of threads.
*
* @param size Number of provided threads.
*/
explicit thread_pool(std::size_t size);
~thread_pool();
/**
* Push a task into the thread pool
* to be executed once a thread is available
*
* @tparam F Type of threaded function
* @tparam Args Types of the arguments
* @param func Function to be executed in the next available thread.
* @param args Passed arguments
*/
template <typename F, typename... Args>
auto schedule(F&& func, Args&&... args) -> result_fut<std::result_of_t<F(cancel_token&, Args...)>> {
using return_type = std::result_of_t<F(cancel_token&, Args...)>;
const auto token = std::make_shared<cancel_token>();
auto task_ptr = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(func), std::ref(*token), std::forward<Args>(args)...)
);
std::future<return_type> res = task_ptr->get_future();
{
std::lock_guard lock(mutex_);
if (!running_) {
return failure(std::string("Thread pool is shut down, cannot schedule new tasks."));
}
tasks_.emplace_back([task_ptr, token] {
try { (*task_ptr)(); }
catch (...) { /* Prevent exception escape */ }
}, token);
}
condition_task_.notify_one();
return ok(std::move(res));
}
/**
* Returns the number of available threads.
*
* @return Number of threads.
*/
std::size_t size() const;
/**
* Shuts the thread pool down.
*/
void shutdown();
/**
* Waits until all threads are finished.
*/
void wait();
void cancel_all_in_progress() const {
std::lock_guard lock(mutex_);
for (auto& t : tasks_) {
if (t.token)
t.token->request_cancel();
}
}
/**
* Returns the number of pending tasks.
*
* @return Number of pending tasks.
*/
std::size_t pending() const;
private:
/*
* wait for a task to execute
*/
void execute();
private:
struct task {
std::function<void()> func;
std::shared_ptr<cancel_token> token;
task(std::function<void()> f, std::shared_ptr<cancel_token> t) : func(std::move(f)), token(std::move(t)) {}
};
std::deque<task> tasks_;
std::vector<std::thread> threads_;
mutable std::mutex mutex_;
std::condition_variable condition_task_;
std::condition_variable condition_finished_;
std::atomic_uint busy_{0};
bool running_{true};
};
}
#endif //MATADOR_THREAD_POOL_HPP

View File

@ -39,6 +39,7 @@ add_library(matador-core STATIC
../../include/matador/utils/result.hpp
../../include/matador/utils/singleton.hpp
../../include/matador/utils/string.hpp
../../include/matador/utils/thread_pool.hpp
../../include/matador/utils/types.hpp
../../include/matador/utils/uuid.hpp
../../include/matador/utils/value.hpp
@ -61,6 +62,7 @@ add_library(matador-core STATIC
utils/library.cpp
utils/os.cpp
utils/string.cpp
utils/thread_pool.cpp
utils/types.cpp
utils/uuid.cpp
utils/value.cpp

View File

@ -0,0 +1,84 @@
#include <thread>
#include <algorithm>
#include "matador/utils/thread_pool.hpp"
namespace matador::utils {
thread_pool::thread_pool(const std::size_t size) {
if (size == 0) {
throw std::invalid_argument("Thread pool size must be positive");
}
for (std::size_t i = 0; i < size; ++i) {
threads_.emplace_back([this] { this->execute(); });
}
}
thread_pool::~thread_pool() {
shutdown();
wait();
}
std::size_t thread_pool::size() const {
return threads_.size();
}
void thread_pool::shutdown() {
{
std::lock_guard lock(mutex_);
running_ = false;
}
condition_task_.notify_all();
}
void thread_pool::wait() {
for (std::thread& t : threads_) {
if (t.joinable()) {
t.join();
}
}
threads_.clear();
}
std::size_t thread_pool::pending() const {
const std::lock_guard l(mutex_);
return tasks_.size();
}
void thread_pool::execute()
{
while (true) {
task t{nullptr, nullptr};
{
std::unique_lock l(mutex_);
condition_task_.wait(l, [this] {
return !running_ ||!tasks_.empty();
});
if (!running_ && tasks_.empty()) {
break;
}
if (!tasks_.empty()) {
t = std::move(tasks_.front());
tasks_.pop_front();
++busy_;
if (t.token) {
t.token->request_cancel(); // or signal task as "started"
}
} else {
continue;
}
}
if (t.func) {
try { t.func(); } catch (...) {}
}
{
std::lock_guard lock(mutex_);
--busy_;
if (tasks_.empty() && busy_ == 0)
condition_finished_.notify_all();
}
}
}
}

View File

@ -11,6 +11,7 @@ add_executable(CoreTests
utils/ResultTest.cpp
utils/FieldAttributeTest.cpp
utils/VersionTest.cpp
utils/ThreadPoolTest.cpp
utils/StringTest.cpp
object/AttributeDefinitionGeneratorTest.cpp
object/PrimaryKeyResolverTest.cpp

View File

@ -0,0 +1,92 @@
#include <catch2/catch_test_macros.hpp>
#include "matador/utils/thread_pool.hpp"
using namespace matador::utils;
TEST_CASE("thread_pool creates and rejects zero-sized construction") {
REQUIRE_THROWS(thread_pool(0));
const thread_pool tp(2);
REQUIRE(tp.size() == 2);
}
TEST_CASE("thread_pool schedules and runs simple tasks") {
thread_pool pool(3);
auto fut = pool.schedule([](cancel_token&, const int x) { return x * 10; }, 7);
REQUIRE(fut);
REQUIRE(fut.value().get() == 70);
}
TEST_CASE("thread_pool parallel computation and results") {
thread_pool pool(4);
std::vector<std::future<int>> futs;
for (int i = 1; i <= 20; ++i) {
auto res = pool.schedule([](cancel_token&, const int y) { return y + 1; }, i);
REQUIRE(res);
futs.push_back(std::move(res.value()));
}
int sum = 0;
for (auto& f : futs) {
sum += f.get();
}
REQUIRE(sum == 230); // (2+3+...+21) = (21*22/2)-(1) = 231-1
}
TEST_CASE("thread_pool rejects scheduling after shutdown") {
thread_pool pool(2);
pool.shutdown();
auto result = pool.schedule([](cancel_token&) { return 1; });
REQUIRE_FALSE(result);
REQUIRE(result.err().find("shut down") != std::string::npos);
}
TEST_CASE("thread_pool supports cancel_token for not-yet-run task") {
thread_pool pool(1);
std::atomic ran{false};
// Fill the only worker
auto fut1 = std::move(pool.schedule([](cancel_token&, const int delay) {
std::this_thread::sleep_for(std::chrono::milliseconds(delay));
return 123;
}, 250).value());
// This will wait in the queue
std::shared_ptr<cancel_token> ct;
auto result = pool.schedule([&ran]( const cancel_token& token) {
ran = true;
for (int i = 0; i < 10; ++i) {
if (token.is_cancelled())
return -1;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
return 100;
});
REQUIRE(result);
// ct = std::make_shared<cancel_token>(); // Dummy; real token is internal
pool.cancel_all_in_progress();
// Give time for first task to finish and second to start
int v = result.value().get();
REQUIRE((v == -1 || v == 100)); // Cancel best-effort, may not run at all
// If cancelled while still not started, should be -1
// If started before cancel, may be 100 (rare but possible)
}
TEST_CASE("thread_pool: shutdown finishes all running tasks") {
thread_pool pool(2);
std::atomic counter{0};
std::vector<std::future<void>> futs;
for (int i = 0; i < 4; ++i) {
auto fut = pool.schedule([&counter](cancel_token&) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
++counter;
});
REQUIRE(fut);
futs.push_back(std::move(fut.value()));
}
pool.shutdown();
pool.wait();
for (auto& f : futs) { f.get(); }
REQUIRE(counter == 4);
}