blob: 80270a9bfd981a4e33b47edb904b61c76dc70a56 [file] [log] [blame]
// This file is part of Eigen, a lightweight C++ template library
// for linear algebra.
//
// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
//
// This Source Code Form is subject to the terms of the Mozilla
// Public License v. 2.0. If a copy of the MPL was not distributed
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
#ifndef EIGEN_CXX11_TENSOR_TENSOR_NONBLOCKING_THREAD_POOL_H
#define EIGEN_CXX11_TENSOR_TENSOR_NONBLOCKING_THREAD_POOL_H
// TODO(rmlarsen): The implementation without thread_local below is
// a temporary hack to allow this code to build on iOS. Support for
// thread_local is allegedly forthcoming in Apple XCode 8, at which point
// we can get rid of this. See:
// http://stackoverflow.com/questions/28094794/why-does-apple-clang-disallow-c11-thread-local-when-official-clang-supports
#if __cplusplus > 199711L && (!defined(__APPLE__) || __has_feature(cxx_thread_local))
#define EIGEN_HAS_THREAD_LOCAL
#endif
#include <atomic>
#include <functional>
using ::std::binary_function;
using ::std::equal_to;
using ::std::greater;
#include <memory>
using ::std::allocator;
#ifndef EIGEN_HAS_THREAD_LOCAL
#include <unordered_map>
#include "base/port.h"
// #include "util/hash/hash.h"
using ::std::unordered_map;
using HASH_NAMESPACE::hash;
#endif
#include <vector>
using ::std::vector;
#include "TensorEventCount.h"
#include "TensorRunQueue.h"
namespace Eigen {
template <typename Environment>
class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
public:
typedef typename Environment::Task Task;
typedef RunQueue<Task, 1024> Queue;
ThreadPoolTempl(int num_threads, Environment env = Environment())
: env_(env),
threads_(num_threads),
queues_(num_threads),
coprimes_(num_threads),
waiters_(num_threads),
blocked_(0),
spinning_(0),
done_(false),
ec_(waiters_) {
// Calculate coprimes of num_threads.
// Coprimes are used for a random walk over all threads in Steal
// and NonEmptyQueueIndex. Iteration is based on the fact that if we take
// a random starting thread index t and calculate num_threads - 1 subsequent
// indices as (t + coprime) % num_threads, we will cover all threads without
// repetitions (effectively getting a presudo-random permutation of thread
// indices).
for (unsigned i = 1; i <= num_threads; i++) {
unsigned a = i;
unsigned b = num_threads;
// If GCD(a, b) == 1, then a and b are coprimes.
while (b != 0) {
unsigned tmp = a;
a = b;
b = tmp % b;
}
if (a == 1) {
coprimes_.push_back(i);
}
}
for (int i = 0; i < num_threads; i++) {
queues_.push_back(new Queue());
}
#ifndef EIGEN_HAS_THREAD_LOCAL
init_barrier_.reset(new Barrier(num_threads));
#endif
for (int i = 0; i < num_threads; i++) {
threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
}
}
~ThreadPoolTempl() {
done_ = true;
// Now if all threads block without work, they will start exiting.
// But note that threads can continue to work arbitrary long,
// block, submit new work, unblock and otherwise live full life.
ec_.Notify(true);
// Join threads explicitly to avoid destruction order issues.
for (size_t i = 0; i < threads_.size(); i++) delete threads_[i];
for (size_t i = 0; i < threads_.size(); i++) delete queues_[i];
#ifndef EIGEN_HAS_THREAD_LOCAL
for (auto it : per_thread_map_) delete it.second;
#endif
}
void Schedule(std::function<void()> fn) {
Task t = env_.CreateTask(std::move(fn));
PerThread* pt = GetPerThread();
if (pt->pool == this) {
// Worker thread of this pool, push onto the thread's queue.
Queue* q = queues_[pt->thread_id];
t = q->PushFront(std::move(t));
} else {
// A free-standing thread (or worker of another pool), push onto a random
// queue.
Queue* q = queues_[Rand(&pt->rand) % queues_.size()];
t = q->PushBack(std::move(t));
}
// Note: below we touch this after making w available to worker threads.
// Strictly speaking, this can lead to a racy-use-after-free. Consider that
// Schedule is called from a thread that is neither main thread nor a worker
// thread of this pool. Then, execution of w directly or indirectly
// completes overall computations, which in turn leads to destruction of
// this. We expect that such scenario is prevented by program, that is,
// this is kept alive while any threads can potentially be in Schedule.
if (!t.f)
ec_.Notify(false);
else
env_.ExecuteTask(t); // Push failed, execute directly.
}
int NumThreads() const final { return static_cast<int>(threads_.size()); }
int CurrentThreadId() const final {
const PerThread* pt = const_cast<ThreadPoolTempl*>(this)->GetPerThread();
if (pt->pool == this) {
eigen_assert(pt->thread_id >= 0);
return pt->thread_id;
} else {
return -1;
}
}
private:
typedef typename Environment::EnvThread Thread;
struct PerThread {
PerThread() : pool(nullptr), thread_id(-1) { rand = GlobalThreadIdHash(); }
ThreadPoolTempl* pool; // Parent pool, or null for normal threads.
uint64_t rand; // Random generator state.
int thread_id; // Worker thread index in pool.
};
Environment env_;
FixedSizeVector<Thread*> threads_;
FixedSizeVector<Queue*> queues_;
FixedSizeVector<unsigned> coprimes_;
std::vector<EventCount::Waiter> waiters_;
std::atomic<unsigned> blocked_;
std::atomic<bool> spinning_;
std::atomic<bool> done_;
EventCount ec_;
#ifndef EIGEN_HAS_THREAD_LOCAL
std::unique_ptr<Barrier> init_barrier_;
mutex mu; // Protects per_thread_map_.
std::unordered_map<uint64_t, PerThread*> per_thread_map_;
#endif
// Main worker thread loop.
void WorkerLoop(unsigned thread_id) {
#ifndef EIGEN_HAS_THREAD_LOCAL
PerThread* pt = new PerThread();
mu.lock();
per_thread_map_[GlobalThreadIdHash()] = pt;
mu.unlock();
init_barrier_->Notify();
init_barrier_->Wait();
#else
PerThread* pt = GetPerThread();
#endif
pt->pool = this;
pt->thread_id = thread_id;
Queue* q = queues_[thread_id];
EventCount::Waiter* waiter = &waiters_[thread_id];
for (;;) {
Task t = q->PopFront();
if (!t.f) {
t = Steal();
if (!t.f) {
// Leave one thread spinning. This reduces latency.
// TODO(dvyukov): 1000 iterations is based on fair dice roll, tune it.
// Also, the time it takes to attempt to steal work 1000 times depends
// on the size of the thread pool. However the speed at which the user
// of the thread pool submits tasks is independent of the size of the
// pool. Consider a time based limit instead.
if (!spinning_ && !spinning_.exchange(true)) {
for (int i = 0; i < 1000 && !t.f; i++) {
t = Steal();
}
spinning_ = false;
}
if (!t.f) {
if (!WaitForWork(waiter, &t)) {
return;
}
}
}
}
if (t.f) {
env_.ExecuteTask(t);
}
}
}
// Steal tries to steal work from other worker threads in best-effort manner.
Task Steal() {
PerThread* pt = GetPerThread();
unsigned size = queues_.size();
unsigned r = Rand(&pt->rand);
unsigned inc = coprimes_[r % coprimes_.size()];
unsigned victim = r % size;
for (unsigned i = 0; i < size; i++) {
Task t = queues_[victim]->PopBack();
if (t.f) {
return t;
}
victim += inc;
if (victim >= size) {
victim -= size;
}
}
return Task();
}
// WaitForWork blocks until new work is available (returns true), or if it is
// time to exit (returns false). Can optionally return a task to execute in t
// (in this case t.f != nullptr on return).
bool WaitForWork(EventCount::Waiter* waiter, Task* t) {
eigen_assert(!t->f);
// We already did best-effort emptiness check in Steal, so prepare for
// blocking.
ec_.Prewait(waiter);
// Now do a reliable emptiness check.
int victim = NonEmptyQueueIndex();
if (victim != -1) {
ec_.CancelWait(waiter);
*t = queues_[victim]->PopBack();
return true;
}
// Number of blocked threads is used as termination condition.
// If we are shutting down and all worker threads blocked without work,
// that's we are done.
blocked_++;
if (done_ && blocked_ == threads_.size()) {
ec_.CancelWait(waiter);
// Almost done, but need to re-check queues.
// Consider that all queues are empty and all worker threads are preempted
// right after incrementing blocked_ above. Now a free-standing thread
// submits work and calls destructor (which sets done_). If we don't
// re-check queues, we will exit leaving the work unexecuted.
if (NonEmptyQueueIndex() != -1) {
// Note: we must not pop from queues before we decrement blocked_,
// otherwise the following scenario is possible. Consider that instead
// of checking for emptiness we popped the only element from queues.
// Now other worker threads can start exiting, which is bad if the
// work item submits other work. So we just check emptiness here,
// which ensures that all worker threads exit at the same time.
blocked_--;
return true;
}
// Reached stable termination state.
ec_.Notify(true);
return false;
}
ec_.CommitWait(waiter);
blocked_--;
return true;
}
int NonEmptyQueueIndex() {
PerThread* pt = GetPerThread();
unsigned size = queues_.size();
unsigned r = Rand(&pt->rand);
unsigned inc = coprimes_[r % coprimes_.size()];
unsigned victim = r % size;
for (unsigned i = 0; i < size; i++) {
if (!queues_[victim]->Empty()) {
return victim;
}
victim += inc;
if (victim >= size) {
victim -= size;
}
}
return -1;
}
static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() {
return std::hash<std::thread::id>()(std::this_thread::get_id());
}
EIGEN_STRONG_INLINE PerThread* GetPerThread() {
#ifndef EIGEN_HAS_THREAD_LOCAL
static PerThread dummy;
auto it = per_thread_map_.find(GlobalThreadIdHash());
if (it == per_thread_map_.end()) {
return &dummy;
} else {
return it->second;
}
#else
static thread_local PerThread per_thread;
PerThread* pt = &per_thread;
return pt;
#endif
}
static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) {
uint64_t current = *state;
// Update the internal state
*state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
// Generate the random output (using the PCG-XSH-RS scheme)
return (current ^ (current >> 22)) >> (22 + (current >> 61));
}
};
#ifdef EIGEN_USE_CUSTOM_THREAD_POOL
typedef ThreadPoolTempl<StlThreadEnvironment> ThreadPool;
#endif
} // namespace Eigen
#endif // EIGEN_CXX11_TENSOR_TENSOR_NONBLOCKING_THREAD_POOL_H