added leader follower thread pool

This commit is contained in:
Sascha Kühl 2025-06-26 16:23:18 +02:00
parent 2271702e6c
commit acd30afe6f
3 changed files with 229 additions and 0 deletions

View File

@ -0,0 +1,125 @@
#ifndef LEADER_FOLLOWER_THREAD_POOL_HPP
#define LEADER_FOLLOWER_THREAD_POOL_HPP
#include <memory>
#include <thread>
#include <vector>
#include <functional>
#include <deque>
#include <atomic>
#include <mutex>
#include <condition_variable>
namespace matador::utils {
/**
* This thread pool class implements the
* leader follower pattern.
*/
class leader_follower_thread_pool
{
private:
leader_follower_thread_pool(const leader_follower_thread_pool &);
leader_follower_thread_pool &operator=(const leader_follower_thread_pool &);
public:
/**
* Creates a new leader follower thread pool instance
* with the given thread pool size and given join
* function.
*
* @tparam F Type of join function
* @param size Number of threads
* @param join_func Join function.
*/
template<typename F>
leader_follower_thread_pool(std::size_t size, F join_func)
: num_threads_(size), join_(join_func)
, follower_(size) {}
~leader_follower_thread_pool();
/**
* Starts the thread pool.
*/
void start();
/**
* Stops the thread pool.
*/
void stop();
/**
* Promotes the next new leading thread.
*/
void promote_new_leader();
/**
* Returns number of threads.
*
* @return Number of threads.
*/
std::size_t size() const;
/**
* Shuts the thread pool down.
*/
void shutdown();
/**
* Returns the thread id of the current
* leading thread.
*
* @return Thread id of the leading thread.
*/
std::thread::id leader();
/**
* Returns the current number of
* thread followers.
*
* @return Number of follower threads.
*/
std::size_t num_follower() const;
/**
* Returns true if the thread pool is running.
*
* @return True if thread pool is running.
*/
bool is_running() const;
private:
/*
* wait for a task to execute
*/
void execute();
private:
typedef std::vector<std::thread> thread_vector_t;
typedef std::function<void()> join_func_t;
std::size_t num_threads_{};
std::thread::id leader_{};
std::thread::id null_id{};
join_func_t join_;
thread_vector_t threads_;
std::mutex mutex_;
std::condition_variable condition_task_;
std::condition_variable condition_synchronizer_;
std::atomic_bool signal_ready_ { false };
std::atomic_bool signal_shutdown_ { false };
std::atomic_bool is_running_ { false };
std::atomic_size_t follower_{};
};
}
#endif //LEADER_FOLLOWER_THREAD_POOL_HPP

View File

@ -32,6 +32,7 @@ add_library(matador-core STATIC
../../include/matador/utils/field_attributes.hpp
../../include/matador/utils/foreign_attributes.hpp
../../include/matador/utils/identifier.hpp
../../include/matador/utils/leader_follower_thread_pool.hpp
../../include/matador/utils/library.hpp
../../include/matador/utils/macro_map.hpp
../../include/matador/utils/os.hpp
@ -59,6 +60,7 @@ add_library(matador-core STATIC
utils/field_attributes.cpp
utils/foreign_attributes.cpp
utils/identifier.cpp
utils/leader_follower_thread_pool.cpp
utils/library.cpp
utils/os.cpp
utils/string.cpp

View File

@ -0,0 +1,102 @@
#include "matador/utils/leader_follower_thread_pool.hpp"
#include <algorithm>
namespace matador::utils {
leader_follower_thread_pool::~leader_follower_thread_pool()
{
shutdown();
}
void leader_follower_thread_pool::start()
{
is_running_ = true;
for (std::size_t i = 0; i < num_threads_; ++i) {
threads_.emplace_back([this] { execute(); });
}
}
void leader_follower_thread_pool::stop()
{
is_running_ = false;
}
void leader_follower_thread_pool::promote_new_leader()
{
std::lock_guard<std::mutex> l(mutex_);
if (leader_ != std::this_thread::get_id()) {
return;
}
leader_ = null_id;
signal_ready_ = true;
condition_synchronizer_.notify_one();
}
std::size_t leader_follower_thread_pool::size() const
{
return threads_.size();
}
void leader_follower_thread_pool::shutdown()
{
{
const std::lock_guard<std::mutex> l(mutex_);
// if (!is_running_) {
// return;
// }
stop();
signal_shutdown_ = true;
condition_synchronizer_.notify_all();
}
std::for_each(threads_.begin(), threads_.end(), [](thread_vector_t::reference item) {
if (item.joinable()) {
item.join();
}
});
}
std::thread::id leader_follower_thread_pool::leader() {
const std::lock_guard<std::mutex> l(mutex_);
return leader_;
}
std::size_t leader_follower_thread_pool::num_follower() const {
return follower_;
}
bool leader_follower_thread_pool::is_running() const
{
return is_running_;
}
void leader_follower_thread_pool::execute() {
std::unique_lock<std::mutex> l(mutex_);
while (is_running_) {
while (leader_ != null_id) {
// log_.info("thread <%d> waiting for synchronizer (leader %d)",
// acquire_thread_index(std::this_thread::get_id()),
// acquire_thread_index(leader_));
condition_synchronizer_.wait(l, [this]() { return signal_ready_ || signal_shutdown_; });
signal_ready_ = false;
if (!is_running_) {
return;
}
}
// log_.info("new leader <%d> (thread <%d> is now follower)",acquire_thread_index(std::this_thread::get_id()) , acquire_thread_index(leader_));
leader_ = std::this_thread::get_id();
l.unlock();
join_();
// log_.info("thread <%d> finished work", acquire_thread_index(std::this_thread::get_id()));
l.lock();
}
}
}