Untitled
unknown
c_cpp
2 years ago
3.8 kB
12
Indexable
////////////////////////////////// queue.hpp ////////////////////////////////// #pragma once #include <mutex> #include <twist/ed/stdlike/mutex.hpp> #include <twist/ed/stdlike/condition_variable.hpp> #include <twist/ed/stdlike/atomic.hpp> #include <optional> #include <queue> namespace tp { // Unbounded blocking multi-producers/multi-consumers (MPMC) queue template <typename T> class UnboundedBlockingQueue { public: bool Put(T value) { std::unique_lock lock(mutex_); if (!working_) { return false; } buffer_.emplace(std::move(value)); lock.unlock(); condvar_.notify_one(); return true; } std::optional<T> Take() { std::unique_lock lock(mutex_); if (!working_ && buffer_.empty()) { return std::nullopt; } while (buffer_.empty() && working_) { condvar_.wait(lock); } if (buffer_.empty()) { return std::nullopt; } T value{std::move(buffer_.front())}; buffer_.pop(); return value; } void Close() { std::unique_lock lock(mutex_); working_ = false; lock.unlock(); condvar_.notify_all(); } private: std::queue<T> buffer_{}; twist::ed::stdlike::mutex mutex_; twist::ed::stdlike::condition_variable condvar_; bool working_{true}; }; } // namespace tp ////////////////////////////////// pool.hpp ////////////////////////////////// #pragma once #include <cstddef> #include <cstdint> #include <thread> #include <tp/queue.hpp> #include <tp/task.hpp> #include <twist/ed/stdlike/thread.hpp> #include <vector> #include "twist/ed/stdlike/atomic.hpp" namespace tp { // Fixed-size pool of worker threads class ThreadPool { public: explicit ThreadPool(size_t threads); ~ThreadPool(); // Non-copyable ThreadPool(const ThreadPool&) = delete; ThreadPool& operator=(const ThreadPool&) = delete; // Non-movable ThreadPool(ThreadPool&&) = delete; ThreadPool& operator=(ThreadPool&&) = delete; // Launches worker threads void Start(); // Schedules task for execution in one of the worker threads void Submit(Task); // Locates current thread pool from worker thread static ThreadPool* Current(); // Waits until outstanding work count reaches zero void WaitIdle(); // Stops the worker threads as soon as possible void Stop(); private: void Process(); private: const std::size_t num_threads_; UnboundedBlockingQueue<Task> tasks_{}; std::vector<twist::ed::stdlike::thread> threads_; twist::ed::stdlike::atomic<bool> working_{false}; twist::ed::stdlike::atomic<uint32_t> task_counter_{0}; }; } // namespace tp ////////////////////////////////// pool.cpp ////////////////////////////////// #include <cassert> #include <cstddef> #include <tp/thread_pool.hpp> #include <twist/ed/local/ptr.hpp> #include <wheels/core/panic.hpp> #include <iostream> namespace tp { TWISTED_THREAD_LOCAL_PTR(ThreadPool, pool) ThreadPool::ThreadPool(size_t threads) : num_threads_(threads) { } void ThreadPool::Start() { working_.store(true); for (size_t i = 0; i < num_threads_; ++i) { threads_.emplace_back([this]() { Process(); }); } } void ThreadPool::Process() { pool = this; while (working_.load()) { auto task = tasks_.Take(); if (task.has_value()) { task.value()(); task_counter_.fetch_sub(1); } } } ThreadPool::~ThreadPool() { // Проверка что вызвали Stop assert(!working_.load()); } void ThreadPool::Submit(Task task) { task_counter_.fetch_add(1); tasks_.Put(std::move(task)); } ThreadPool* ThreadPool::Current() { return pool; } void ThreadPool::WaitIdle() { while (task_counter_.load() != 0) { } } void ThreadPool::Stop() { working_.store(false); tasks_.Close(); for (auto& thread : threads_) { thread.join(); } } } // namespace tp
Editor is loading...