| // This file is part of Eigen, a lightweight C++ template library |
| // for linear algebra. |
| // |
| // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com> |
| // |
| // This Source Code Form is subject to the terms of the Mozilla |
| // Public License v. 2.0. If a copy of the MPL was not distributed |
| // with this file, You can obtain one at http://mozilla.org/MPL/2.0/. |
| |
| #ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H |
| #define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H |
| |
| // IWYU pragma: private |
| #include "./InternalHeaderCheck.h" |
| |
| namespace Eigen { |
| |
| // RunQueue is a fixed-size, partially non-blocking deque or Work items. |
| // Operations on front of the queue must be done by a single thread (owner), |
| // operations on back of the queue can be done by multiple threads concurrently. |
| // |
| // Algorithm outline: |
| // All remote threads operating on the queue back are serialized by a mutex. |
| // This ensures that at most two threads access state: owner and one remote |
| // thread (Size aside). The algorithm ensures that the occupied region of the |
| // underlying array is logically continuous (can wraparound, but no stray |
| // occupied elements). Owner operates on one end of this region, remote thread |
| // operates on the other end. Synchronization between these threads |
| // (potential consumption of the last element and take up of the last empty |
| // element) happens by means of state variable in each element. States are: |
| // empty, busy (in process of insertion of removal) and ready. Threads claim |
| // elements (empty->busy and ready->busy transitions) by means of a CAS |
| // operation. The finishing transition (busy->empty and busy->ready) are done |
| // with plain store as the element is exclusively owned by the current thread. |
| // |
| // Note: we could permit only pointers as elements, then we would not need |
| // separate state variable as null/non-null pointer value would serve as state, |
| // but that would require malloc/free per operation for large, complex values |
| // (and this is designed to store std::function<()>). |
| template <typename Work, unsigned kSize> |
| class RunQueue { |
| public: |
| RunQueue() : front_(0), back_(0) { |
| // require power-of-two for fast masking |
| eigen_plain_assert((kSize & (kSize - 1)) == 0); |
| eigen_plain_assert(kSize > 2); // why would you do this? |
| eigen_plain_assert(kSize <= (64 << 10)); // leave enough space for counter |
| for (unsigned i = 0; i < kSize; i++) array_[i].state.store(kEmpty, std::memory_order_relaxed); |
| } |
| |
| ~RunQueue() { eigen_plain_assert(Size() == 0); } |
| |
| // PushFront inserts w at the beginning of the queue. |
| // If queue is full returns w, otherwise returns default-constructed Work. |
| Work PushFront(Work w) { |
| unsigned front = front_.load(std::memory_order_relaxed); |
| Elem* e = &array_[front & kMask]; |
| uint8_t s = e->state.load(std::memory_order_relaxed); |
| if (s != kEmpty || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) return w; |
| front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed); |
| e->w = std::move(w); |
| e->state.store(kReady, std::memory_order_release); |
| return Work(); |
| } |
| |
| // PopFront removes and returns the first element in the queue. |
| // If the queue was empty returns default-constructed Work. |
| Work PopFront() { |
| unsigned front = front_.load(std::memory_order_relaxed); |
| Elem* e = &array_[(front - 1) & kMask]; |
| uint8_t s = e->state.load(std::memory_order_relaxed); |
| if (s != kReady || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) return Work(); |
| Work w = std::move(e->w); |
| e->state.store(kEmpty, std::memory_order_release); |
| front = ((front - 1) & kMask2) | (front & ~kMask2); |
| front_.store(front, std::memory_order_relaxed); |
| return w; |
| } |
| |
| // PushBack adds w at the end of the queue. |
| // If queue is full returns w, otherwise returns default-constructed Work. |
| Work PushBack(Work w) { |
| EIGEN_MUTEX_LOCK lock(mutex_); |
| unsigned back = back_.load(std::memory_order_relaxed); |
| Elem* e = &array_[(back - 1) & kMask]; |
| uint8_t s = e->state.load(std::memory_order_relaxed); |
| if (s != kEmpty || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) return w; |
| back = ((back - 1) & kMask2) | (back & ~kMask2); |
| back_.store(back, std::memory_order_relaxed); |
| e->w = std::move(w); |
| e->state.store(kReady, std::memory_order_release); |
| return Work(); |
| } |
| |
| // PopBack removes and returns the last elements in the queue. |
| Work PopBack() { |
| if (Empty()) return Work(); |
| EIGEN_MUTEX_LOCK lock(mutex_); |
| unsigned back = back_.load(std::memory_order_relaxed); |
| Elem* e = &array_[back & kMask]; |
| uint8_t s = e->state.load(std::memory_order_relaxed); |
| if (s != kReady || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) return Work(); |
| Work w = std::move(e->w); |
| e->state.store(kEmpty, std::memory_order_release); |
| back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed); |
| return w; |
| } |
| |
| // PopBackHalf removes and returns half last elements in the queue. |
| // Returns number of elements removed. |
| unsigned PopBackHalf(std::vector<Work>* result) { |
| if (Empty()) return 0; |
| EIGEN_MUTEX_LOCK lock(mutex_); |
| unsigned back = back_.load(std::memory_order_relaxed); |
| unsigned size = Size(); |
| unsigned mid = back; |
| if (size > 1) mid = back + (size - 1) / 2; |
| unsigned n = 0; |
| unsigned start = 0; |
| for (; static_cast<int>(mid - back) >= 0; mid--) { |
| Elem* e = &array_[mid & kMask]; |
| uint8_t s = e->state.load(std::memory_order_relaxed); |
| if (n == 0) { |
| if (s != kReady || !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) continue; |
| start = mid; |
| } else { |
| // Note: no need to store temporal kBusy, we exclusively own these |
| // elements. |
| eigen_plain_assert(s == kReady); |
| } |
| result->push_back(std::move(e->w)); |
| e->state.store(kEmpty, std::memory_order_release); |
| n++; |
| } |
| if (n != 0) back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed); |
| return n; |
| } |
| |
| // Size returns current queue size. |
| // Can be called by any thread at any time. |
| unsigned Size() const { return SizeOrNotEmpty<true>(); } |
| |
| // Empty tests whether container is empty. |
| // Can be called by any thread at any time. |
| bool Empty() const { return SizeOrNotEmpty<false>() == 0; } |
| |
| // Delete all the elements from the queue. |
| void Flush() { |
| while (!Empty()) { |
| PopFront(); |
| } |
| } |
| |
| private: |
| static const unsigned kMask = kSize - 1; |
| static const unsigned kMask2 = (kSize << 1) - 1; |
| |
| enum State { |
| kEmpty, |
| kBusy, |
| kReady, |
| }; |
| |
| struct Elem { |
| std::atomic<uint8_t> state; |
| Work w; |
| }; |
| |
| // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of |
| // front/back, respectively. The remaining bits contain modification counters |
| // that are incremented on Push operations. This allows us to (1) distinguish |
| // between empty and full conditions (if we would use log(kSize) bits for |
| // position, these conditions would be indistinguishable); (2) obtain |
| // consistent snapshot of front_/back_ for Size operation using the |
| // modification counters. |
| EIGEN_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned> front_; |
| EIGEN_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned> back_; |
| EIGEN_MUTEX mutex_; // guards `PushBack` and `PopBack` (accesses `back_`) |
| |
| EIGEN_ALIGN_TO_AVOID_FALSE_SHARING Elem array_[kSize]; |
| |
| // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false, |
| // only whether the size is 0 is guaranteed to be correct. |
| // Can be called by any thread at any time. |
| template <bool NeedSizeEstimate> |
| unsigned SizeOrNotEmpty() const { |
| // Emptiness plays critical role in thread pool blocking. So we go to great |
| // effort to not produce false positives (claim non-empty queue as empty). |
| unsigned front = front_.load(std::memory_order_acquire); |
| for (;;) { |
| // Capture a consistent snapshot of front/tail. |
| unsigned back = back_.load(std::memory_order_acquire); |
| unsigned front1 = front_.load(std::memory_order_relaxed); |
| if (front != front1) { |
| front = front1; |
| std::atomic_thread_fence(std::memory_order_acquire); |
| continue; |
| } |
| if (NeedSizeEstimate) { |
| return CalculateSize(front, back); |
| } else { |
| // This value will be 0 if the queue is empty, and undefined otherwise. |
| unsigned maybe_zero = ((front ^ back) & kMask2); |
| // Queue size estimate must agree with maybe zero check on the queue |
| // empty/non-empty state. |
| eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0)); |
| return maybe_zero; |
| } |
| } |
| } |
| |
| EIGEN_ALWAYS_INLINE unsigned CalculateSize(unsigned front, unsigned back) const { |
| int size = (front & kMask2) - (back & kMask2); |
| // Fix overflow. |
| if (EIGEN_PREDICT_FALSE(size < 0)) size += 2 * kSize; |
| // Order of modification in push/pop is crafted to make the queue look |
| // larger than it is during concurrent modifications. E.g. push can |
| // increment size before the corresponding pop has decremented it. |
| // So the computed size can be up to kSize + 1, fix it. |
| if (EIGEN_PREDICT_FALSE(size > static_cast<int>(kSize))) size = kSize; |
| return static_cast<unsigned>(size); |
| } |
| |
| RunQueue(const RunQueue&) = delete; |
| void operator=(const RunQueue&) = delete; |
| }; |
| |
| } // namespace Eigen |
| |
| #endif // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H |