blob: 66fd63c71bcc283a8d741f1cddfc8462107b7178 [file] [log] [blame]
// This file is part of Eigen, a lightweight C++ template library
// for linear algebra.
//
// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
//
// This Source Code Form is subject to the terms of the Mozilla
// Public License v. 2.0. If a copy of the MPL was not distributed
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
#ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
#define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
// IWYU pragma: private
#include "./InternalHeaderCheck.h"
namespace Eigen {
template <typename Environment>
class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
public:
typedef typename Environment::Task Task;
typedef RunQueue<Task, 1024> Queue;
ThreadPoolTempl(int num_threads, Environment env = Environment()) : ThreadPoolTempl(num_threads, true, env) {}
ThreadPoolTempl(int num_threads, bool allow_spinning, Environment env = Environment())
: env_(env),
num_threads_(num_threads),
allow_spinning_(allow_spinning),
thread_data_(num_threads),
all_coprimes_(num_threads),
waiters_(num_threads),
global_steal_partition_(EncodePartition(0, num_threads_)),
blocked_(0),
spinning_(0),
done_(false),
cancelled_(false),
ec_(waiters_) {
waiters_.resize(num_threads_);
// Calculate coprimes of all numbers [1, num_threads].
// Coprimes are used for random walks over all threads in Steal
// and NonEmptyQueueIndex. Iteration is based on the fact that if we take
// a random starting thread index t and calculate num_threads - 1 subsequent
// indices as (t + coprime) % num_threads, we will cover all threads without
// repetitions (effectively getting a presudo-random permutation of thread
// indices).
eigen_plain_assert(num_threads_ < kMaxThreads);
for (int i = 1; i <= num_threads_; ++i) {
all_coprimes_.emplace_back(i);
ComputeCoprimes(i, &all_coprimes_.back());
}
#ifndef EIGEN_THREAD_LOCAL
init_barrier_.reset(new Barrier(num_threads_));
#endif
thread_data_.resize(num_threads_);
for (int i = 0; i < num_threads_; i++) {
SetStealPartition(i, EncodePartition(0, num_threads_));
thread_data_[i].thread.reset(env_.CreateThread([this, i]() { WorkerLoop(i); }));
}
#ifndef EIGEN_THREAD_LOCAL
// Wait for workers to initialize per_thread_map_. Otherwise we might race
// with them in Schedule or CurrentThreadId.
init_barrier_->Wait();
#endif
}
~ThreadPoolTempl() {
done_ = true;
// Now if all threads block without work, they will start exiting.
// But note that threads can continue to work arbitrary long,
// block, submit new work, unblock and otherwise live full life.
if (!cancelled_) {
ec_.Notify(true);
} else {
// Since we were cancelled, there might be entries in the queues.
// Empty them to prevent their destructor from asserting.
for (size_t i = 0; i < thread_data_.size(); i++) {
thread_data_[i].queue.Flush();
}
}
// Join threads explicitly (by destroying) to avoid destruction order within
// this class.
for (size_t i = 0; i < thread_data_.size(); ++i) thread_data_[i].thread.reset();
}
void SetStealPartitions(const std::vector<std::pair<unsigned, unsigned>>& partitions) {
eigen_plain_assert(partitions.size() == static_cast<std::size_t>(num_threads_));
// Pass this information to each thread queue.
for (int i = 0; i < num_threads_; i++) {
const auto& pair = partitions[i];
unsigned start = pair.first, end = pair.second;
AssertBounds(start, end);
unsigned val = EncodePartition(start, end);
SetStealPartition(i, val);
}
}
void Schedule(std::function<void()> fn) EIGEN_OVERRIDE { ScheduleWithHint(std::move(fn), 0, num_threads_); }
void ScheduleWithHint(std::function<void()> fn, int start, int limit) override {
Task t = env_.CreateTask(std::move(fn));
PerThread* pt = GetPerThread();
if (pt->pool == this) {
// Worker thread of this pool, push onto the thread's queue.
Queue& q = thread_data_[pt->thread_id].queue;
t = q.PushFront(std::move(t));
} else {
// A free-standing thread (or worker of another pool), push onto a random
// queue.
eigen_plain_assert(start < limit);
eigen_plain_assert(limit <= num_threads_);
int num_queues = limit - start;
int rnd = Rand(&pt->rand) % num_queues;
eigen_plain_assert(start + rnd < limit);
Queue& q = thread_data_[start + rnd].queue;
t = q.PushBack(std::move(t));
}
// Note: below we touch this after making w available to worker threads.
// Strictly speaking, this can lead to a racy-use-after-free. Consider that
// Schedule is called from a thread that is neither main thread nor a worker
// thread of this pool. Then, execution of w directly or indirectly
// completes overall computations, which in turn leads to destruction of
// this. We expect that such scenario is prevented by program, that is,
// this is kept alive while any threads can potentially be in Schedule.
if (!t.f) {
ec_.Notify(false);
} else {
env_.ExecuteTask(t); // Push failed, execute directly.
}
}
void Cancel() EIGEN_OVERRIDE {
cancelled_ = true;
done_ = true;
// Let each thread know it's been cancelled.
#ifdef EIGEN_THREAD_ENV_SUPPORTS_CANCELLATION
for (size_t i = 0; i < thread_data_.size(); i++) {
thread_data_[i].thread->OnCancel();
}
#endif
// Wake up the threads without work to let them exit on their own.
ec_.Notify(true);
}
int NumThreads() const EIGEN_FINAL { return num_threads_; }
int CurrentThreadId() const EIGEN_FINAL {
const PerThread* pt = const_cast<ThreadPoolTempl*>(this)->GetPerThread();
if (pt->pool == this) {
return pt->thread_id;
} else {
return -1;
}
}
private:
// Create a single atomic<int> that encodes start and limit information for
// each thread.
// We expect num_threads_ < 65536, so we can store them in a single
// std::atomic<unsigned>.
// Exposed publicly as static functions so that external callers can reuse
// this encode/decode logic for maintaining their own thread-safe copies of
// scheduling and steal domain(s).
static const int kMaxPartitionBits = 16;
static const int kMaxThreads = 1 << kMaxPartitionBits;
inline unsigned EncodePartition(unsigned start, unsigned limit) { return (start << kMaxPartitionBits) | limit; }
inline void DecodePartition(unsigned val, unsigned* start, unsigned* limit) {
*limit = val & (kMaxThreads - 1);
val >>= kMaxPartitionBits;
*start = val;
}
void AssertBounds(int start, int end) {
eigen_plain_assert(start >= 0);
eigen_plain_assert(start < end); // non-zero sized partition
eigen_plain_assert(end <= num_threads_);
}
inline void SetStealPartition(size_t i, unsigned val) {
thread_data_[i].steal_partition.store(val, std::memory_order_relaxed);
}
inline unsigned GetStealPartition(int i) { return thread_data_[i].steal_partition.load(std::memory_order_relaxed); }
void ComputeCoprimes(int N, MaxSizeVector<unsigned>* coprimes) {
for (int i = 1; i <= N; i++) {
unsigned a = i;
unsigned b = N;
// If GCD(a, b) == 1, then a and b are coprimes.
while (b != 0) {
unsigned tmp = a;
a = b;
b = tmp % b;
}
if (a == 1) {
coprimes->push_back(i);
}
}
}
typedef typename Environment::EnvThread Thread;
struct PerThread {
constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {}
ThreadPoolTempl* pool; // Parent pool, or null for normal threads.
uint64_t rand; // Random generator state.
int thread_id; // Worker thread index in pool.
#ifndef EIGEN_THREAD_LOCAL
// Prevent false sharing.
char pad_[128];
#endif
};
struct ThreadData {
constexpr ThreadData() : thread(), steal_partition(0), queue() {}
std::unique_ptr<Thread> thread;
std::atomic<unsigned> steal_partition;
Queue queue;
};
Environment env_;
const int num_threads_;
const bool allow_spinning_;
MaxSizeVector<ThreadData> thread_data_;
MaxSizeVector<MaxSizeVector<unsigned>> all_coprimes_;
MaxSizeVector<EventCount::Waiter> waiters_;
unsigned global_steal_partition_;
std::atomic<unsigned> blocked_;
std::atomic<bool> spinning_;
std::atomic<bool> done_;
std::atomic<bool> cancelled_;
EventCount ec_;
#ifndef EIGEN_THREAD_LOCAL
std::unique_ptr<Barrier> init_barrier_;
EIGEN_MUTEX per_thread_map_mutex_; // Protects per_thread_map_.
std::unordered_map<uint64_t, std::unique_ptr<PerThread>> per_thread_map_;
#endif
// Main worker thread loop.
void WorkerLoop(int thread_id) {
#ifndef EIGEN_THREAD_LOCAL
std::unique_ptr<PerThread> new_pt(new PerThread());
per_thread_map_mutex_.lock();
bool insertOK = per_thread_map_.emplace(GlobalThreadIdHash(), std::move(new_pt)).second;
eigen_plain_assert(insertOK);
EIGEN_UNUSED_VARIABLE(insertOK);
per_thread_map_mutex_.unlock();
init_barrier_->Notify();
init_barrier_->Wait();
#endif
PerThread* pt = GetPerThread();
pt->pool = this;
pt->rand = GlobalThreadIdHash();
pt->thread_id = thread_id;
Queue& q = thread_data_[thread_id].queue;
EventCount::Waiter* waiter = &waiters_[thread_id];
// TODO(dvyukov,rmlarsen): The time spent in NonEmptyQueueIndex() is
// proportional to num_threads_ and we assume that new work is scheduled at
// a constant rate, so we set spin_count to 5000 / num_threads_. The
// constant was picked based on a fair dice roll, tune it.
const int spin_count = allow_spinning_ && num_threads_ > 0 ? 5000 / num_threads_ : 0;
if (num_threads_ == 1) {
// For num_threads_ == 1 there is no point in going through the expensive
// steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the
// victim queues it might reverse the order in which ops are executed
// compared to the order in which they are scheduled, which tends to be
// counter-productive for the types of I/O workloads the single thread
// pools tend to be used for.
while (!cancelled_) {
Task t = q.PopFront();
for (int i = 0; i < spin_count && !t.f; i++) {
if (!cancelled_.load(std::memory_order_relaxed)) {
t = q.PopFront();
}
}
if (!t.f) {
if (!WaitForWork(waiter, &t)) {
return;
}
}
if (t.f) {
env_.ExecuteTask(t);
}
}
} else {
while (!cancelled_) {
Task t = q.PopFront();
if (!t.f) {
t = LocalSteal();
if (!t.f) {
t = GlobalSteal();
if (!t.f) {
// Leave one thread spinning. This reduces latency.
if (allow_spinning_ && !spinning_ && !spinning_.exchange(true)) {
for (int i = 0; i < spin_count && !t.f; i++) {
if (!cancelled_.load(std::memory_order_relaxed)) {
t = GlobalSteal();
} else {
return;
}
}
spinning_ = false;
}
if (!t.f) {
if (!WaitForWork(waiter, &t)) {
return;
}
}
}
}
}
if (t.f) {
env_.ExecuteTask(t);
}
}
}
}
// Steal tries to steal work from other worker threads in the range [start,
// limit) in best-effort manner.
Task Steal(unsigned start, unsigned limit) {
PerThread* pt = GetPerThread();
const size_t size = limit - start;
unsigned r = Rand(&pt->rand);
// Reduce r into [0, size) range, this utilizes trick from
// https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
eigen_plain_assert(all_coprimes_[size - 1].size() < (1 << 30));
unsigned victim = ((uint64_t)r * (uint64_t)size) >> 32;
unsigned index = ((uint64_t)all_coprimes_[size - 1].size() * (uint64_t)r) >> 32;
unsigned inc = all_coprimes_[size - 1][index];
for (unsigned i = 0; i < size; i++) {
eigen_plain_assert(start + victim < limit);
Task t = thread_data_[start + victim].queue.PopBack();
if (t.f) {
return t;
}
victim += inc;
if (victim >= size) {
victim -= static_cast<unsigned int>(size);
}
}
return Task();
}
// Steals work within threads belonging to the partition.
Task LocalSteal() {
PerThread* pt = GetPerThread();
unsigned partition = GetStealPartition(pt->thread_id);
// If thread steal partition is the same as global partition, there is no
// need to go through the steal loop twice.
if (global_steal_partition_ == partition) return Task();
unsigned start, limit;
DecodePartition(partition, &start, &limit);
AssertBounds(start, limit);
return Steal(start, limit);
}
// Steals work from any other thread in the pool.
Task GlobalSteal() { return Steal(0, num_threads_); }
// WaitForWork blocks until new work is available (returns true), or if it is
// time to exit (returns false). Can optionally return a task to execute in t
// (in such case t.f != nullptr on return).
bool WaitForWork(EventCount::Waiter* waiter, Task* t) {
eigen_plain_assert(!t->f);
// We already did best-effort emptiness check in Steal, so prepare for
// blocking.
ec_.Prewait();
// Now do a reliable emptiness check.
int victim = NonEmptyQueueIndex();
if (victim != -1) {
ec_.CancelWait();
if (cancelled_) {
return false;
} else {
*t = thread_data_[victim].queue.PopBack();
return true;
}
}
// Number of blocked threads is used as termination condition.
// If we are shutting down and all worker threads blocked without work,
// that's we are done.
blocked_++;
// TODO is blocked_ required to be unsigned?
if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) {
ec_.CancelWait();
// Almost done, but need to re-check queues.
// Consider that all queues are empty and all worker threads are preempted
// right after incrementing blocked_ above. Now a free-standing thread
// submits work and calls destructor (which sets done_). If we don't
// re-check queues, we will exit leaving the work unexecuted.
if (NonEmptyQueueIndex() != -1) {
// Note: we must not pop from queues before we decrement blocked_,
// otherwise the following scenario is possible. Consider that instead
// of checking for emptiness we popped the only element from queues.
// Now other worker threads can start exiting, which is bad if the
// work item submits other work. So we just check emptiness here,
// which ensures that all worker threads exit at the same time.
blocked_--;
return true;
}
// Reached stable termination state.
ec_.Notify(true);
return false;
}
ec_.CommitWait(waiter);
blocked_--;
return true;
}
int NonEmptyQueueIndex() {
PerThread* pt = GetPerThread();
// We intentionally design NonEmptyQueueIndex to steal work from
// anywhere in the queue so threads don't block in WaitForWork() forever
// when all threads in their partition go to sleep. Steal is still local.
const size_t size = thread_data_.size();
unsigned r = Rand(&pt->rand);
unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()];
unsigned victim = r % size;
for (unsigned i = 0; i < size; i++) {
if (!thread_data_[victim].queue.Empty()) {
return victim;
}
victim += inc;
if (victim >= size) {
victim -= static_cast<unsigned int>(size);
}
}
return -1;
}
static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() {
return std::hash<std::thread::id>()(std::this_thread::get_id());
}
EIGEN_STRONG_INLINE PerThread* GetPerThread() {
#ifndef EIGEN_THREAD_LOCAL
static PerThread dummy;
auto it = per_thread_map_.find(GlobalThreadIdHash());
if (it == per_thread_map_.end()) {
return &dummy;
} else {
return it->second.get();
}
#else
EIGEN_THREAD_LOCAL PerThread per_thread_;
PerThread* pt = &per_thread_;
return pt;
#endif
}
static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) {
uint64_t current = *state;
// Update the internal state
*state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
// Generate the random output (using the PCG-XSH-RS scheme)
return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
}
};
typedef ThreadPoolTempl<StlThreadEnvironment> ThreadPool;
} // namespace Eigen
#endif // EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H