146 lines
3.4 KiB
C++
146 lines
3.4 KiB
C++
#ifndef MATADOR_THREAD_POOL_HPP
|
|
#define MATADOR_THREAD_POOL_HPP
|
|
|
|
#include "matador/utils/result.hpp"
|
|
|
|
#include <thread>
|
|
#include <vector>
|
|
#include <future>
|
|
#include <atomic>
|
|
#include <functional>
|
|
#include <deque>
|
|
#include <mutex>
|
|
#include <condition_variable>
|
|
|
|
namespace matador::utils {
|
|
|
|
class cancel_token {
|
|
public:
|
|
cancel_token() = default;
|
|
void request_cancel() {
|
|
cancelled_.store(true, std::memory_order_relaxed);
|
|
}
|
|
bool is_cancelled() const {
|
|
return cancelled_.load(std::memory_order_relaxed);
|
|
}
|
|
private:
|
|
std::atomic_bool cancelled_{false};
|
|
};
|
|
|
|
template<typename T>
|
|
using result_fut = result<std::future<T>, std::string>;
|
|
|
|
/**
|
|
* Simple thread pool class. The pool
|
|
* consists of a given number of threads and
|
|
* a queue of tasks to be executed.
|
|
*/
|
|
class thread_pool {
|
|
public:
|
|
thread_pool(const thread_pool&) = delete;
|
|
thread_pool& operator=(const thread_pool&) = delete;
|
|
|
|
/**
|
|
* Creating a new thread pool with given
|
|
* numbers of threads.
|
|
*
|
|
* @param size Number of provided threads.
|
|
*/
|
|
explicit thread_pool(std::size_t size);
|
|
~thread_pool();
|
|
|
|
/**
|
|
* Push a task into the thread pool
|
|
* to be executed once a thread is available
|
|
*
|
|
* @tparam F Type of threaded function
|
|
* @tparam Args Types of the arguments
|
|
* @param func Function to be executed in the next available thread.
|
|
* @param args Passed arguments
|
|
*/
|
|
template <typename F, typename... Args>
|
|
auto schedule(F&& func, Args&&... args) -> result_fut<std::result_of_t<F(cancel_token&, Args...)>> {
|
|
using return_type = std::result_of_t<F(cancel_token&, Args...)>;
|
|
const auto token = std::make_shared<cancel_token>();
|
|
|
|
auto task_ptr = std::make_shared<std::packaged_task<return_type()>>(
|
|
std::bind(std::forward<F>(func), std::ref(*token), std::forward<Args>(args)...)
|
|
);
|
|
std::future<return_type> res = task_ptr->get_future();
|
|
|
|
{
|
|
std::lock_guard lock(mutex_);
|
|
if (!running_) {
|
|
return failure(std::string("Thread pool is shut down, cannot schedule new tasks."));
|
|
}
|
|
tasks_.emplace_back([task_ptr, token] {
|
|
try { (*task_ptr)(); }
|
|
catch (...) { /* Prevent exception escape */ }
|
|
}, token);
|
|
}
|
|
|
|
condition_task_.notify_one();
|
|
|
|
return ok(std::move(res));
|
|
}
|
|
|
|
/**
|
|
* Returns the number of available threads.
|
|
*
|
|
* @return Number of threads.
|
|
*/
|
|
std::size_t size() const;
|
|
|
|
/**
|
|
* Shuts the thread pool down.
|
|
*/
|
|
void shutdown();
|
|
|
|
/**
|
|
* Waits until all threads are finished.
|
|
*/
|
|
void wait();
|
|
|
|
void cancel_all_in_progress() const {
|
|
std::lock_guard lock(mutex_);
|
|
for (auto& t : tasks_) {
|
|
if (t.token)
|
|
t.token->request_cancel();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Returns the number of pending tasks.
|
|
*
|
|
* @return Number of pending tasks.
|
|
*/
|
|
std::size_t pending() const;
|
|
|
|
private:
|
|
/*
|
|
* wait for a task to execute
|
|
*/
|
|
void execute();
|
|
|
|
private:
|
|
struct task {
|
|
std::function<void()> func;
|
|
std::shared_ptr<cancel_token> token;
|
|
task(std::function<void()> f, std::shared_ptr<cancel_token> t) : func(std::move(f)), token(std::move(t)) {}
|
|
};
|
|
|
|
std::deque<task> tasks_;
|
|
std::vector<std::thread> threads_;
|
|
|
|
mutable std::mutex mutex_;
|
|
std::condition_variable condition_task_;
|
|
std::condition_variable condition_finished_;
|
|
std::atomic_uint busy_{0};
|
|
|
|
bool running_{true};
|
|
};
|
|
|
|
}
|
|
|
|
#endif //MATADOR_THREAD_POOL_HPP
|