Untitled

mail@pastecode.io avatar
unknown
c_cpp
a year ago
3.8 kB
7
Indexable
Never
////////////////////////////////// queue.hpp //////////////////////////////////
#pragma once

#include <mutex>
#include <twist/ed/stdlike/mutex.hpp>
#include <twist/ed/stdlike/condition_variable.hpp>
#include <twist/ed/stdlike/atomic.hpp>

#include <optional>
#include <queue>

namespace tp {

// Unbounded blocking multi-producers/multi-consumers (MPMC) queue

template <typename T>
class UnboundedBlockingQueue {
 public:
  bool Put(T value) {
    std::unique_lock lock(mutex_);
    if (!working_) {
      return false;
    }
    buffer_.emplace(std::move(value));
    lock.unlock();
    condvar_.notify_one();
    return true;
  }

  std::optional<T> Take() {
    std::unique_lock lock(mutex_);
    if (!working_ && buffer_.empty()) {
      return std::nullopt;
    }
    while (buffer_.empty() && working_) {
      condvar_.wait(lock);
    }
    if (buffer_.empty()) {
      return std::nullopt;
    }
    T value{std::move(buffer_.front())};
    buffer_.pop();
    return value;
  }

  void Close() {
    std::unique_lock lock(mutex_);
    working_ = false;
    lock.unlock();
    condvar_.notify_all();
  }

 private:
  std::queue<T> buffer_{};
  twist::ed::stdlike::mutex mutex_;
  twist::ed::stdlike::condition_variable condvar_;
  bool working_{true};
};

}  // namespace tp


////////////////////////////////// pool.hpp //////////////////////////////////
#pragma once

#include <cstddef>
#include <cstdint>
#include <thread>
#include <tp/queue.hpp>
#include <tp/task.hpp>

#include <twist/ed/stdlike/thread.hpp>
#include <vector>
#include "twist/ed/stdlike/atomic.hpp"

namespace tp {

// Fixed-size pool of worker threads

class ThreadPool {
 public:
  explicit ThreadPool(size_t threads);
  ~ThreadPool();

  // Non-copyable
  ThreadPool(const ThreadPool&) = delete;
  ThreadPool& operator=(const ThreadPool&) = delete;

  // Non-movable
  ThreadPool(ThreadPool&&) = delete;
  ThreadPool& operator=(ThreadPool&&) = delete;

  // Launches worker threads
  void Start();

  // Schedules task for execution in one of the worker threads
  void Submit(Task);

  // Locates current thread pool from worker thread
  static ThreadPool* Current();

  // Waits until outstanding work count reaches zero
  void WaitIdle();

  // Stops the worker threads as soon as possible
  void Stop();

 private:
  void Process();

 private:
  const std::size_t num_threads_;
  UnboundedBlockingQueue<Task> tasks_{};
  std::vector<twist::ed::stdlike::thread> threads_;
  twist::ed::stdlike::atomic<bool> working_{false};
  twist::ed::stdlike::atomic<uint32_t> task_counter_{0};
};

}  // namespace tp


////////////////////////////////// pool.cpp //////////////////////////////////
#include <cassert>
#include <cstddef>
#include <tp/thread_pool.hpp>

#include <twist/ed/local/ptr.hpp>

#include <wheels/core/panic.hpp>

#include <iostream>

namespace tp {

TWISTED_THREAD_LOCAL_PTR(ThreadPool, pool)

ThreadPool::ThreadPool(size_t threads)
    : num_threads_(threads) {
}

void ThreadPool::Start() {
  working_.store(true);
  for (size_t i = 0; i < num_threads_; ++i) {
    threads_.emplace_back([this]() {
      Process();
    });
  }
}

void ThreadPool::Process() {
  pool = this;
  while (working_.load()) {
    auto task = tasks_.Take();
    if (task.has_value()) {
      task.value()();
      task_counter_.fetch_sub(1);
    }
  }
}

ThreadPool::~ThreadPool() {
  // Проверка что вызвали Stop
  assert(!working_.load());
}

void ThreadPool::Submit(Task task) {
  task_counter_.fetch_add(1);
  tasks_.Put(std::move(task));
}

ThreadPool* ThreadPool::Current() {
  return pool;
}

void ThreadPool::WaitIdle() {
  while (task_counter_.load() != 0) {
  }
}

void ThreadPool::Stop() {
  working_.store(false);
  tasks_.Close();
  for (auto& thread : threads_) {
    thread.join();
  }
}

}  // namespace tp