Untitled
unknown
c_cpp
3 years ago
3.8 kB
15
Indexable
////////////////////////////////// queue.hpp //////////////////////////////////
#pragma once
#include <mutex>
#include <twist/ed/stdlike/mutex.hpp>
#include <twist/ed/stdlike/condition_variable.hpp>
#include <twist/ed/stdlike/atomic.hpp>
#include <optional>
#include <queue>
namespace tp {
// Unbounded blocking multi-producers/multi-consumers (MPMC) queue
template <typename T>
class UnboundedBlockingQueue {
public:
bool Put(T value) {
std::unique_lock lock(mutex_);
if (!working_) {
return false;
}
buffer_.emplace(std::move(value));
lock.unlock();
condvar_.notify_one();
return true;
}
std::optional<T> Take() {
std::unique_lock lock(mutex_);
if (!working_ && buffer_.empty()) {
return std::nullopt;
}
while (buffer_.empty() && working_) {
condvar_.wait(lock);
}
if (buffer_.empty()) {
return std::nullopt;
}
T value{std::move(buffer_.front())};
buffer_.pop();
return value;
}
void Close() {
std::unique_lock lock(mutex_);
working_ = false;
lock.unlock();
condvar_.notify_all();
}
private:
std::queue<T> buffer_{};
twist::ed::stdlike::mutex mutex_;
twist::ed::stdlike::condition_variable condvar_;
bool working_{true};
};
} // namespace tp
////////////////////////////////// pool.hpp //////////////////////////////////
#pragma once
#include <cstddef>
#include <cstdint>
#include <thread>
#include <tp/queue.hpp>
#include <tp/task.hpp>
#include <twist/ed/stdlike/thread.hpp>
#include <vector>
#include "twist/ed/stdlike/atomic.hpp"
namespace tp {
// Fixed-size pool of worker threads
class ThreadPool {
public:
explicit ThreadPool(size_t threads);
~ThreadPool();
// Non-copyable
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
// Non-movable
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;
// Launches worker threads
void Start();
// Schedules task for execution in one of the worker threads
void Submit(Task);
// Locates current thread pool from worker thread
static ThreadPool* Current();
// Waits until outstanding work count reaches zero
void WaitIdle();
// Stops the worker threads as soon as possible
void Stop();
private:
void Process();
private:
const std::size_t num_threads_;
UnboundedBlockingQueue<Task> tasks_{};
std::vector<twist::ed::stdlike::thread> threads_;
twist::ed::stdlike::atomic<bool> working_{false};
twist::ed::stdlike::atomic<uint32_t> task_counter_{0};
};
} // namespace tp
////////////////////////////////// pool.cpp //////////////////////////////////
#include <cassert>
#include <cstddef>
#include <tp/thread_pool.hpp>
#include <twist/ed/local/ptr.hpp>
#include <wheels/core/panic.hpp>
#include <iostream>
namespace tp {
TWISTED_THREAD_LOCAL_PTR(ThreadPool, pool)
ThreadPool::ThreadPool(size_t threads)
: num_threads_(threads) {
}
void ThreadPool::Start() {
working_.store(true);
for (size_t i = 0; i < num_threads_; ++i) {
threads_.emplace_back([this]() {
Process();
});
}
}
void ThreadPool::Process() {
pool = this;
while (working_.load()) {
auto task = tasks_.Take();
if (task.has_value()) {
task.value()();
task_counter_.fetch_sub(1);
}
}
}
ThreadPool::~ThreadPool() {
// Проверка что вызвали Stop
assert(!working_.load());
}
void ThreadPool::Submit(Task task) {
task_counter_.fetch_add(1);
tasks_.Put(std::move(task));
}
ThreadPool* ThreadPool::Current() {
return pool;
}
void ThreadPool::WaitIdle() {
while (task_counter_.load() != 0) {
}
}
void ThreadPool::Stop() {
working_.store(false);
tasks_.Close();
for (auto& thread : threads_) {
thread.join();
}
}
} // namespace tp
Editor is loading...