From 2271702e6cff23e6e5ed9aa6e66d634238c3ffc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sascha=20K=C3=BChl?= Date: Thu, 26 Jun 2025 16:04:17 +0200 Subject: [PATCH] added thread pool with tests --- include/matador/utils/result.hpp | 2 +- include/matador/utils/thread_pool.hpp | 145 ++++++++++++++++++++++++++ source/core/CMakeLists.txt | 2 + source/core/utils/thread_pool.cpp | 84 +++++++++++++++ test/core/CMakeLists.txt | 1 + test/core/utils/ThreadPoolTest.cpp | 92 ++++++++++++++++ 6 files changed, 325 insertions(+), 1 deletion(-) diff --git a/include/matador/utils/result.hpp b/include/matador/utils/result.hpp index cc529f7..7ca9148 100644 --- a/include/matador/utils/result.hpp +++ b/include/matador/utils/result.hpp @@ -11,7 +11,7 @@ namespace matador::utils { template < typename ValueType, typename ErrorType > class [[nodiscard]] result; -template +template struct is_result : std::false_type {}; template < typename ValueType, typename ErrorType > diff --git a/include/matador/utils/thread_pool.hpp b/include/matador/utils/thread_pool.hpp index e69de29..7754ff6 100644 --- a/include/matador/utils/thread_pool.hpp +++ b/include/matador/utils/thread_pool.hpp @@ -0,0 +1,145 @@ +#ifndef MATADOR_THREAD_POOL_HPP +#define MATADOR_THREAD_POOL_HPP + +#include "matador/utils/result.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace matador::utils { + +class cancel_token { +public: + cancel_token() = default; + void request_cancel() { + cancelled_.store(true, std::memory_order_relaxed); + } + bool is_cancelled() const { + return cancelled_.load(std::memory_order_relaxed); + } +private: + std::atomic_bool cancelled_{false}; +}; + +template +using result_fut = result, std::string>; + +/** + * Simple thread pool class. The pool + * consists of a given number of threads and + * a queue of tasks to be executed. + */ +class thread_pool { +public: + thread_pool(const thread_pool&) = delete; + thread_pool& operator=(const thread_pool&) = delete; + + /** + * Creating a new thread pool with given + * numbers of threads. + * + * @param size Number of provided threads. + */ + explicit thread_pool(std::size_t size); + ~thread_pool(); + + /** + * Push a task into the thread pool + * to be executed once a thread is available + * + * @tparam F Type of threaded function + * @tparam Args Types of the arguments + * @param func Function to be executed in the next available thread. + * @param args Passed arguments + */ + template + auto schedule(F&& func, Args&&... args) -> result_fut> { + using return_type = std::result_of_t; + const auto token = std::make_shared(); + + auto task_ptr = std::make_shared>( + std::bind(std::forward(func), std::ref(*token), std::forward(args)...) + ); + std::future res = task_ptr->get_future(); + + { + std::lock_guard lock(mutex_); + if (!running_) { + return failure(std::string("Thread pool is shut down, cannot schedule new tasks.")); + } + tasks_.emplace_back([task_ptr, token] { + try { (*task_ptr)(); } + catch (...) { /* Prevent exception escape */ } + }, token); + } + + condition_task_.notify_one(); + + return ok(std::move(res)); + } + + /** + * Returns the number of available threads. + * + * @return Number of threads. + */ + std::size_t size() const; + + /** + * Shuts the thread pool down. + */ + void shutdown(); + + /** + * Waits until all threads are finished. + */ + void wait(); + + void cancel_all_in_progress() const { + std::lock_guard lock(mutex_); + for (auto& t : tasks_) { + if (t.token) + t.token->request_cancel(); + } + } + + /** + * Returns the number of pending tasks. + * + * @return Number of pending tasks. + */ + std::size_t pending() const; + +private: + /* + * wait for a task to execute + */ + void execute(); + +private: + struct task { + std::function func; + std::shared_ptr token; + task(std::function f, std::shared_ptr t) : func(std::move(f)), token(std::move(t)) {} + }; + + std::deque tasks_; + std::vector threads_; + + mutable std::mutex mutex_; + std::condition_variable condition_task_; + std::condition_variable condition_finished_; + std::atomic_uint busy_{0}; + + bool running_{true}; +}; + +} + +#endif //MATADOR_THREAD_POOL_HPP diff --git a/source/core/CMakeLists.txt b/source/core/CMakeLists.txt index ef4a506..a65e201 100644 --- a/source/core/CMakeLists.txt +++ b/source/core/CMakeLists.txt @@ -39,6 +39,7 @@ add_library(matador-core STATIC ../../include/matador/utils/result.hpp ../../include/matador/utils/singleton.hpp ../../include/matador/utils/string.hpp + ../../include/matador/utils/thread_pool.hpp ../../include/matador/utils/types.hpp ../../include/matador/utils/uuid.hpp ../../include/matador/utils/value.hpp @@ -61,6 +62,7 @@ add_library(matador-core STATIC utils/library.cpp utils/os.cpp utils/string.cpp + utils/thread_pool.cpp utils/types.cpp utils/uuid.cpp utils/value.cpp diff --git a/source/core/utils/thread_pool.cpp b/source/core/utils/thread_pool.cpp index e69de29..5b00f42 100644 --- a/source/core/utils/thread_pool.cpp +++ b/source/core/utils/thread_pool.cpp @@ -0,0 +1,84 @@ +#include +#include +#include "matador/utils/thread_pool.hpp" + +namespace matador::utils { + +thread_pool::thread_pool(const std::size_t size) { + if (size == 0) { + throw std::invalid_argument("Thread pool size must be positive"); + } + for (std::size_t i = 0; i < size; ++i) { + threads_.emplace_back([this] { this->execute(); }); + } +} + +thread_pool::~thread_pool() { + shutdown(); + wait(); +} + +std::size_t thread_pool::size() const { + return threads_.size(); +} + +void thread_pool::shutdown() { + { + std::lock_guard lock(mutex_); + running_ = false; + } + condition_task_.notify_all(); +} + +void thread_pool::wait() { + for (std::thread& t : threads_) { + if (t.joinable()) { + t.join(); + } + } + threads_.clear(); +} + +std::size_t thread_pool::pending() const { + const std::lock_guard l(mutex_); + return tasks_.size(); +} + +void thread_pool::execute() +{ + while (true) { + task t{nullptr, nullptr}; + { + std::unique_lock l(mutex_); + condition_task_.wait(l, [this] { + return !running_ ||!tasks_.empty(); + }); + if (!running_ && tasks_.empty()) { + break; + } + if (!tasks_.empty()) { + t = std::move(tasks_.front()); + tasks_.pop_front(); + ++busy_; + if (t.token) { + t.token->request_cancel(); // or signal task as "started" + } + } else { + continue; + } + } + if (t.func) { + try { t.func(); } catch (...) {} + } + + { + std::lock_guard lock(mutex_); + --busy_; + if (tasks_.empty() && busy_ == 0) + condition_finished_.notify_all(); + } + + } +} + +} \ No newline at end of file diff --git a/test/core/CMakeLists.txt b/test/core/CMakeLists.txt index 3857c92..29c3eb3 100644 --- a/test/core/CMakeLists.txt +++ b/test/core/CMakeLists.txt @@ -11,6 +11,7 @@ add_executable(CoreTests utils/ResultTest.cpp utils/FieldAttributeTest.cpp utils/VersionTest.cpp + utils/ThreadPoolTest.cpp utils/StringTest.cpp object/AttributeDefinitionGeneratorTest.cpp object/PrimaryKeyResolverTest.cpp diff --git a/test/core/utils/ThreadPoolTest.cpp b/test/core/utils/ThreadPoolTest.cpp index e69de29..9acf3da 100644 --- a/test/core/utils/ThreadPoolTest.cpp +++ b/test/core/utils/ThreadPoolTest.cpp @@ -0,0 +1,92 @@ +#include + +#include "matador/utils/thread_pool.hpp" + +using namespace matador::utils; + +TEST_CASE("thread_pool creates and rejects zero-sized construction") { + REQUIRE_THROWS(thread_pool(0)); + + const thread_pool tp(2); + REQUIRE(tp.size() == 2); +} + +TEST_CASE("thread_pool schedules and runs simple tasks") { + thread_pool pool(3); + auto fut = pool.schedule([](cancel_token&, const int x) { return x * 10; }, 7); + REQUIRE(fut); + REQUIRE(fut.value().get() == 70); +} + +TEST_CASE("thread_pool parallel computation and results") { + thread_pool pool(4); + std::vector> futs; + for (int i = 1; i <= 20; ++i) { + auto res = pool.schedule([](cancel_token&, const int y) { return y + 1; }, i); + REQUIRE(res); + futs.push_back(std::move(res.value())); + } + int sum = 0; + for (auto& f : futs) { + sum += f.get(); + } + REQUIRE(sum == 230); // (2+3+...+21) = (21*22/2)-(1) = 231-1 +} + +TEST_CASE("thread_pool rejects scheduling after shutdown") { + thread_pool pool(2); + pool.shutdown(); + auto result = pool.schedule([](cancel_token&) { return 1; }); + REQUIRE_FALSE(result); + REQUIRE(result.err().find("shut down") != std::string::npos); +} + +TEST_CASE("thread_pool supports cancel_token for not-yet-run task") { + thread_pool pool(1); + std::atomic ran{false}; + + // Fill the only worker + auto fut1 = std::move(pool.schedule([](cancel_token&, const int delay) { + std::this_thread::sleep_for(std::chrono::milliseconds(delay)); + return 123; + }, 250).value()); + + // This will wait in the queue + std::shared_ptr ct; + auto result = pool.schedule([&ran]( const cancel_token& token) { + ran = true; + for (int i = 0; i < 10; ++i) { + if (token.is_cancelled()) + return -1; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + return 100; + }); + + REQUIRE(result); + // ct = std::make_shared(); // Dummy; real token is internal + pool.cancel_all_in_progress(); + // Give time for first task to finish and second to start + int v = result.value().get(); + REQUIRE((v == -1 || v == 100)); // Cancel best-effort, may not run at all + // If cancelled while still not started, should be -1 + // If started before cancel, may be 100 (rare but possible) +} + +TEST_CASE("thread_pool: shutdown finishes all running tasks") { + thread_pool pool(2); + std::atomic counter{0}; + std::vector> futs; + for (int i = 0; i < 4; ++i) { + auto fut = pool.schedule([&counter](cancel_token&) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + ++counter; + }); + REQUIRE(fut); + futs.push_back(std::move(fut.value())); + } + pool.shutdown(); + pool.wait(); + for (auto& f : futs) { f.get(); } + REQUIRE(counter == 4); +}