#include #include "matador/utils/thread_pool.hpp" using namespace matador::utils; TEST_CASE("thread_pool creates and rejects zero-sized construction") { REQUIRE_THROWS(thread_pool(0)); const thread_pool tp(2); REQUIRE(tp.size() == 2); } TEST_CASE("thread_pool schedules and runs simple tasks") { thread_pool pool(3); auto fut = pool.schedule([](cancel_token&, const int x) { return x * 10; }, 7); REQUIRE(fut); REQUIRE(fut.value().get() == 70); } TEST_CASE("thread_pool parallel computation and results") { thread_pool pool(4); std::vector> futs; for (int i = 1; i <= 20; ++i) { auto res = pool.schedule([](cancel_token&, const int y) { return y + 1; }, i); REQUIRE(res); futs.push_back(std::move(res.value())); } int sum = 0; for (auto& f : futs) { sum += f.get(); } REQUIRE(sum == 230); // (2+3+...+21) = (21*22/2)-(1) = 231-1 } TEST_CASE("thread_pool rejects scheduling after shutdown") { thread_pool pool(2); pool.shutdown(); auto result = pool.schedule([](cancel_token&) { return 1; }); REQUIRE_FALSE(result); REQUIRE(result.err().find("shut down") != std::string::npos); } TEST_CASE("thread_pool supports cancel_token for not-yet-run task") { thread_pool pool(1); std::atomic ran{false}; // Fill the only worker auto fut1 = std::move(pool.schedule([](cancel_token&, const int delay) { std::this_thread::sleep_for(std::chrono::milliseconds(delay)); return 123; }, 250).value()); // This will wait in the queue std::shared_ptr ct; auto result = pool.schedule([&ran]( const cancel_token& token) { ran = true; for (int i = 0; i < 10; ++i) { if (token.is_cancelled()) return -1; std::this_thread::sleep_for(std::chrono::milliseconds(10)); } return 100; }); REQUIRE(result); // ct = std::make_shared(); // Dummy; real token is internal pool.cancel_all_in_progress(); // Give time for first task to finish and second to start int v = result.value().get(); REQUIRE((v == -1 || v == 100)); // Cancel best-effort, may not run at all // If cancelled while still not started, should be -1 // If started before cancel, may be 100 (rare but possible) } TEST_CASE("thread_pool: shutdown finishes all running tasks") { thread_pool pool(2); std::atomic counter{0}; std::vector> futs; for (int i = 0; i < 4; ++i) { auto fut = pool.schedule([&counter](cancel_token&) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); ++counter; }); REQUIRE(fut); futs.push_back(std::move(fut.value())); } pool.shutdown(); pool.wait(); for (auto& f : futs) { f.get(); } REQUIRE(counter == 4); }