#include #include #include "matador/utils/thread_pool.hpp" namespace matador::utils { thread_pool::thread_pool(const std::size_t size) { if (size == 0) { throw std::invalid_argument("Thread pool size must be positive"); } for (std::size_t i = 0; i < size; ++i) { threads_.emplace_back([this] { this->execute(); }); } } thread_pool::~thread_pool() { shutdown(); wait(); } std::size_t thread_pool::size() const { return threads_.size(); } void thread_pool::shutdown() { { std::lock_guard lock(mutex_); running_ = false; } condition_task_.notify_all(); } void thread_pool::wait() { for (std::thread& t : threads_) { if (t.joinable()) { t.join(); } } threads_.clear(); } std::size_t thread_pool::pending() const { const std::lock_guard l(mutex_); return tasks_.size(); } void thread_pool::execute() { while (true) { task t{nullptr, nullptr}; { std::unique_lock l(mutex_); condition_task_.wait(l, [this] { return !running_ ||!tasks_.empty(); }); if (!running_ && tasks_.empty()) { break; } if (!tasks_.empty()) { t = std::move(tasks_.front()); tasks_.pop_front(); ++busy_; if (t.token) { t.token->request_cancel(); // or signal task as "started" } } else { continue; } } if (t.func) { try { t.func(); } catch (...) {} } { std::lock_guard lock(mutex_); --busy_; if (tasks_.empty() && busy_ == 0) condition_finished_.notify_all(); } } } }