Untitled
unknown
c_cpp
3 years ago
3.8 kB
13
Indexable
////////////////////////////////// queue.hpp //////////////////////////////////
#pragma once
#include <mutex>
#include <twist/ed/stdlike/mutex.hpp>
#include <twist/ed/stdlike/condition_variable.hpp>
#include <twist/ed/stdlike/atomic.hpp>
#include <optional>
#include <queue>
namespace tp {
// Unbounded blocking multi-producers/multi-consumers (MPMC) queue
template <typename T>
class UnboundedBlockingQueue {
 public:
  bool Put(T value) {
    std::unique_lock lock(mutex_);
    if (!working_) {
      return false;
    }
    buffer_.emplace(std::move(value));
    lock.unlock();
    condvar_.notify_one();
    return true;
  }
  std::optional<T> Take() {
    std::unique_lock lock(mutex_);
    if (!working_ && buffer_.empty()) {
      return std::nullopt;
    }
    while (buffer_.empty() && working_) {
      condvar_.wait(lock);
    }
    if (buffer_.empty()) {
      return std::nullopt;
    }
    T value{std::move(buffer_.front())};
    buffer_.pop();
    return value;
  }
  void Close() {
    std::unique_lock lock(mutex_);
    working_ = false;
    lock.unlock();
    condvar_.notify_all();
  }
 private:
  std::queue<T> buffer_{};
  twist::ed::stdlike::mutex mutex_;
  twist::ed::stdlike::condition_variable condvar_;
  bool working_{true};
};
}  // namespace tp
////////////////////////////////// pool.hpp //////////////////////////////////
#pragma once
#include <cstddef>
#include <cstdint>
#include <thread>
#include <tp/queue.hpp>
#include <tp/task.hpp>
#include <twist/ed/stdlike/thread.hpp>
#include <vector>
#include "twist/ed/stdlike/atomic.hpp"
namespace tp {
// Fixed-size pool of worker threads
class ThreadPool {
 public:
  explicit ThreadPool(size_t threads);
  ~ThreadPool();
  // Non-copyable
  ThreadPool(const ThreadPool&) = delete;
  ThreadPool& operator=(const ThreadPool&) = delete;
  // Non-movable
  ThreadPool(ThreadPool&&) = delete;
  ThreadPool& operator=(ThreadPool&&) = delete;
  // Launches worker threads
  void Start();
  // Schedules task for execution in one of the worker threads
  void Submit(Task);
  // Locates current thread pool from worker thread
  static ThreadPool* Current();
  // Waits until outstanding work count reaches zero
  void WaitIdle();
  // Stops the worker threads as soon as possible
  void Stop();
 private:
  void Process();
 private:
  const std::size_t num_threads_;
  UnboundedBlockingQueue<Task> tasks_{};
  std::vector<twist::ed::stdlike::thread> threads_;
  twist::ed::stdlike::atomic<bool> working_{false};
  twist::ed::stdlike::atomic<uint32_t> task_counter_{0};
};
}  // namespace tp
////////////////////////////////// pool.cpp //////////////////////////////////
#include <cassert>
#include <cstddef>
#include <tp/thread_pool.hpp>
#include <twist/ed/local/ptr.hpp>
#include <wheels/core/panic.hpp>
#include <iostream>
namespace tp {
TWISTED_THREAD_LOCAL_PTR(ThreadPool, pool)
ThreadPool::ThreadPool(size_t threads)
    : num_threads_(threads) {
}
void ThreadPool::Start() {
  working_.store(true);
  for (size_t i = 0; i < num_threads_; ++i) {
    threads_.emplace_back([this]() {
      Process();
    });
  }
}
void ThreadPool::Process() {
  pool = this;
  while (working_.load()) {
    auto task = tasks_.Take();
    if (task.has_value()) {
      task.value()();
      task_counter_.fetch_sub(1);
    }
  }
}
ThreadPool::~ThreadPool() {
  // Проверка что вызвали Stop
  assert(!working_.load());
}
void ThreadPool::Submit(Task task) {
  task_counter_.fetch_add(1);
  tasks_.Put(std::move(task));
}
ThreadPool* ThreadPool::Current() {
  return pool;
}
void ThreadPool::WaitIdle() {
  while (task_counter_.load() != 0) {
  }
}
void ThreadPool::Stop() {
  working_.store(false);
  tasks_.Close();
  for (auto& thread : threads_) {
    thread.join();
  }
}
}  // namespace tp
Editor is loading...