diff --git a/include/matador/utils/leader_follower_thread_pool.hpp b/include/matador/utils/leader_follower_thread_pool.hpp new file mode 100644 index 0000000..748b13d --- /dev/null +++ b/include/matador/utils/leader_follower_thread_pool.hpp @@ -0,0 +1,125 @@ +#ifndef LEADER_FOLLOWER_THREAD_POOL_HPP +#define LEADER_FOLLOWER_THREAD_POOL_HPP + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace matador::utils { + +/** + * This thread pool class implements the + * leader follower pattern. + */ +class leader_follower_thread_pool +{ +private: + leader_follower_thread_pool(const leader_follower_thread_pool &); + + leader_follower_thread_pool &operator=(const leader_follower_thread_pool &); + +public: + /** + * Creates a new leader follower thread pool instance + * with the given thread pool size and given join + * function. + * + * @tparam F Type of join function + * @param size Number of threads + * @param join_func Join function. + */ + template + leader_follower_thread_pool(std::size_t size, F join_func) + : num_threads_(size), join_(join_func) + , follower_(size) {} + + ~leader_follower_thread_pool(); + + /** + * Starts the thread pool. + */ + void start(); + + /** + * Stops the thread pool. + */ + void stop(); + + /** + * Promotes the next new leading thread. + */ + void promote_new_leader(); + + /** + * Returns number of threads. + * + * @return Number of threads. + */ + std::size_t size() const; + + /** + * Shuts the thread pool down. + */ + void shutdown(); + + /** + * Returns the thread id of the current + * leading thread. + * + * @return Thread id of the leading thread. + */ + std::thread::id leader(); + + /** + * Returns the current number of + * thread followers. + * + * @return Number of follower threads. + */ + std::size_t num_follower() const; + + /** + * Returns true if the thread pool is running. + * + * @return True if thread pool is running. + */ + bool is_running() const; + +private: + /* + * wait for a task to execute + */ + void execute(); + +private: + typedef std::vector thread_vector_t; + typedef std::function join_func_t; + + std::size_t num_threads_{}; + + std::thread::id leader_{}; + std::thread::id null_id{}; + + join_func_t join_; + thread_vector_t threads_; + + std::mutex mutex_; + std::condition_variable condition_task_; + std::condition_variable condition_synchronizer_; + + std::atomic_bool signal_ready_ { false }; + std::atomic_bool signal_shutdown_ { false }; + + std::atomic_bool is_running_ { false }; + + std::atomic_size_t follower_{}; +}; + +} + +#endif //LEADER_FOLLOWER_THREAD_POOL_HPP diff --git a/source/core/CMakeLists.txt b/source/core/CMakeLists.txt index a65e201..2eaf6bb 100644 --- a/source/core/CMakeLists.txt +++ b/source/core/CMakeLists.txt @@ -32,6 +32,7 @@ add_library(matador-core STATIC ../../include/matador/utils/field_attributes.hpp ../../include/matador/utils/foreign_attributes.hpp ../../include/matador/utils/identifier.hpp + ../../include/matador/utils/leader_follower_thread_pool.hpp ../../include/matador/utils/library.hpp ../../include/matador/utils/macro_map.hpp ../../include/matador/utils/os.hpp @@ -59,6 +60,7 @@ add_library(matador-core STATIC utils/field_attributes.cpp utils/foreign_attributes.cpp utils/identifier.cpp + utils/leader_follower_thread_pool.cpp utils/library.cpp utils/os.cpp utils/string.cpp diff --git a/source/core/utils/leader_follower_thread_pool.cpp b/source/core/utils/leader_follower_thread_pool.cpp new file mode 100644 index 0000000..ee68071 --- /dev/null +++ b/source/core/utils/leader_follower_thread_pool.cpp @@ -0,0 +1,102 @@ +#include "matador/utils/leader_follower_thread_pool.hpp" + +#include + +namespace matador::utils { + +leader_follower_thread_pool::~leader_follower_thread_pool() +{ + shutdown(); +} + +void leader_follower_thread_pool::start() +{ + is_running_ = true; + for (std::size_t i = 0; i < num_threads_; ++i) { + threads_.emplace_back([this] { execute(); }); + } +} + +void leader_follower_thread_pool::stop() +{ + is_running_ = false; +} + +void leader_follower_thread_pool::promote_new_leader() +{ + std::lock_guard l(mutex_); + + if (leader_ != std::this_thread::get_id()) { + return; + } + + leader_ = null_id; + + signal_ready_ = true; + condition_synchronizer_.notify_one(); +} + +std::size_t leader_follower_thread_pool::size() const +{ + return threads_.size(); +} + +void leader_follower_thread_pool::shutdown() +{ + { + const std::lock_guard l(mutex_); +// if (!is_running_) { +// return; +// } + stop(); + signal_shutdown_ = true; + condition_synchronizer_.notify_all(); + } + + std::for_each(threads_.begin(), threads_.end(), [](thread_vector_t::reference item) { + if (item.joinable()) { + item.join(); + } + }); +} + +std::thread::id leader_follower_thread_pool::leader() { + const std::lock_guard l(mutex_); + return leader_; +} + +std::size_t leader_follower_thread_pool::num_follower() const { + return follower_; +} + +bool leader_follower_thread_pool::is_running() const +{ + return is_running_; +} + +void leader_follower_thread_pool::execute() { + std::unique_lock l(mutex_); + while (is_running_) { + while (leader_ != null_id) { +// log_.info("thread <%d> waiting for synchronizer (leader %d)", +// acquire_thread_index(std::this_thread::get_id()), +// acquire_thread_index(leader_)); + condition_synchronizer_.wait(l, [this]() { return signal_ready_ || signal_shutdown_; }); + signal_ready_ = false; + if (!is_running_) { + return; + } + } + +// log_.info("new leader <%d> (thread <%d> is now follower)",acquire_thread_index(std::this_thread::get_id()) , acquire_thread_index(leader_)); + leader_ = std::this_thread::get_id(); + + l.unlock(); + + join_(); + +// log_.info("thread <%d> finished work", acquire_thread_index(std::this_thread::get_id())); + l.lock(); + } +} +} \ No newline at end of file