#ifndef MATADOR_THREAD_POOL_HPP #define MATADOR_THREAD_POOL_HPP #include "matador/utils/result.hpp" #include #include #include #include #include #include #include #include namespace matador::utils { class cancel_token { public: cancel_token() = default; void request_cancel() { cancelled_.store(true, std::memory_order_relaxed); } bool is_cancelled() const { return cancelled_.load(std::memory_order_relaxed); } private: std::atomic_bool cancelled_{false}; }; template using result_fut = result, std::string>; /** * Simple thread pool class. The pool * consists of a given number of threads and * a queue of tasks to be executed. */ class thread_pool { public: thread_pool(const thread_pool&) = delete; thread_pool& operator=(const thread_pool&) = delete; /** * Creating a new thread pool with given * numbers of threads. * * @param size Number of provided threads. */ explicit thread_pool(std::size_t size); ~thread_pool(); /** * Push a task into the thread pool * to be executed once a thread is available * * @tparam F Type of threaded function * @tparam Args Types of the arguments * @param func Function to be executed in the next available thread. * @param args Passed arguments */ template auto schedule(F&& func, Args&&... args) -> result_fut> { using return_type = std::result_of_t; const auto token = std::make_shared(); auto task_ptr = std::make_shared>( std::bind(std::forward(func), std::ref(*token), std::forward(args)...) ); std::future res = task_ptr->get_future(); { std::lock_guard lock(mutex_); if (!running_) { return failure(std::string("Thread pool is shut down, cannot schedule new tasks.")); } tasks_.emplace_back([task_ptr, token] { try { (*task_ptr)(); } catch (...) { /* Prevent exception escape */ } }, token); } condition_task_.notify_one(); return ok(std::move(res)); } /** * Returns the number of available threads. * * @return Number of threads. */ std::size_t size() const; /** * Shuts the thread pool down. */ void shutdown(); /** * Waits until all threads are finished. */ void wait(); void cancel_all_in_progress() const { std::lock_guard lock(mutex_); for (auto& t : tasks_) { if (t.token) t.token->request_cancel(); } } /** * Returns the number of pending tasks. * * @return Number of pending tasks. */ std::size_t pending() const; private: /* * wait for a task to execute */ void execute(); private: struct task { std::function func; std::shared_ptr token; task(std::function f, std::shared_ptr t) : func(std::move(f)), token(std::move(t)) {} }; std::deque tasks_; std::vector threads_; mutable std::mutex mutex_; std::condition_variable condition_task_; std::condition_variable condition_finished_; std::atomic_uint busy_{0}; bool running_{true}; }; } #endif //MATADOR_THREAD_POOL_HPP