blob: acf1d628b6110c1f885aacc759eacc3eae90e4ca [file] [log] [blame]
// This file is part of Eigen, a lightweight C++ template library
// for linear algebra.
//
// Copyright (C) 2023 Charlie Schlosser <cs.schlosser@gmail.com>
//
// This Source Code Form is subject to the terms of the Mozilla
// Public License v. 2.0. If a copy of the MPL was not distributed
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
#ifndef EIGEN_CORE_THREAD_POOL_DEVICE_H
#define EIGEN_CORE_THREAD_POOL_DEVICE_H
namespace Eigen {
// CoreThreadPoolDevice provides an easy-to-understand Device for parallelizing Eigen Core expressions with
// Threadpool. Expressions are recursively split evenly until the evaluation cost is less than the threshold for
// delegating the task to a thread.
/*
a
/ \
/ \
/ \
/ \
/ \
/ \
/ \
a e
/ \ / \
/ \ / \
/ \ / \
a c e g
/ \ / \ / \ / \
/ \ / \ / \ / \
a b c d e f g h
*/
// Each task descends the binary tree to the left, delegates the right task to a new thread, and continues to the
// left. This ensures that work is evenly distributed to the thread pool as quickly as possible and minimizes the number
// of tasks created during the evaluation. Consider an expression that is divided into 8 chunks. The
// primary task 'a' creates tasks 'e' 'c' and 'b', and executes its portion of the expression at the bottom of the
// tree. Likewise, task 'e' creates tasks 'g' and 'f', and executes its portion of the expression.
struct CoreThreadPoolDevice {
using Task = std::function<void()>;
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE CoreThreadPoolDevice(ThreadPool& pool, float threadCostThreshold = 3e-5f)
: m_pool(pool) {
eigen_assert(threadCostThreshold >= 0.0f && "threadCostThreshold must be non-negative");
m_costFactor = threadCostThreshold;
}
template <int PacketSize>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int calculateLevels(Index size, float cost) const {
eigen_assert(cost >= 0.0f && "cost must be non-negative");
Index numOps = size / PacketSize;
int actualThreads = numOps < m_pool.NumThreads() ? static_cast<int>(numOps) : m_pool.NumThreads();
float totalCost = static_cast<float>(numOps) * cost;
float idealThreads = totalCost * m_costFactor;
if (idealThreads < static_cast<float>(actualThreads)) {
idealThreads = numext::maxi(idealThreads, 1.0f);
actualThreads = numext::mini(actualThreads, static_cast<int>(idealThreads));
}
int maxLevel = internal::log2_ceil(actualThreads);
return maxLevel;
}
// MSVC does not like inlining parallelForImpl
#if EIGEN_COMP_MSVC && !EIGEN_COMP_CLANG
#define EIGEN_PARALLEL_FOR_INLINE
#else
#define EIGEN_PARALLEL_FOR_INLINE EIGEN_STRONG_INLINE
#endif
template <typename UnaryFunctor, int PacketSize>
EIGEN_DEVICE_FUNC EIGEN_PARALLEL_FOR_INLINE void parallelForImpl(Index begin, Index end, UnaryFunctor& f,
Barrier& barrier, int level) {
while (level > 0) {
level--;
Index size = end - begin;
eigen_assert(size % PacketSize == 0 && "this function assumes size is a multiple of PacketSize");
Index mid = begin + numext::round_down(size >> 1, PacketSize);
Task right = [this, mid, end, &f, &barrier, level]() {
parallelForImpl<UnaryFunctor, PacketSize>(mid, end, f, barrier, level);
};
m_pool.Schedule(std::move(right));
end = mid;
}
for (Index i = begin; i < end; i += PacketSize) f(i);
barrier.Notify();
}
template <typename BinaryFunctor, int PacketSize>
EIGEN_DEVICE_FUNC EIGEN_PARALLEL_FOR_INLINE void parallelForImpl(Index outerBegin, Index outerEnd, Index innerBegin,
Index innerEnd, BinaryFunctor& f, Barrier& barrier,
int level) {
while (level > 0) {
level--;
Index outerSize = outerEnd - outerBegin;
if (outerSize > 1) {
Index outerMid = outerBegin + (outerSize >> 1);
Task right = [this, &f, &barrier, outerMid, outerEnd, innerBegin, innerEnd, level]() {
parallelForImpl<BinaryFunctor, PacketSize>(outerMid, outerEnd, innerBegin, innerEnd, f, barrier, level);
};
m_pool.Schedule(std::move(right));
outerEnd = outerMid;
} else {
Index innerSize = innerEnd - innerBegin;
eigen_assert(innerSize % PacketSize == 0 && "this function assumes innerSize is a multiple of PacketSize");
Index innerMid = innerBegin + numext::round_down(innerSize >> 1, PacketSize);
Task right = [this, &f, &barrier, outerBegin, outerEnd, innerMid, innerEnd, level]() {
parallelForImpl<BinaryFunctor, PacketSize>(outerBegin, outerEnd, innerMid, innerEnd, f, barrier, level);
};
m_pool.Schedule(std::move(right));
innerEnd = innerMid;
}
}
for (Index outer = outerBegin; outer < outerEnd; outer++)
for (Index inner = innerBegin; inner < innerEnd; inner += PacketSize) f(outer, inner);
barrier.Notify();
}
#undef EIGEN_PARALLEL_FOR_INLINE
template <typename UnaryFunctor, int PacketSize>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void parallelFor(Index begin, Index end, UnaryFunctor& f, float cost) {
Index size = end - begin;
int maxLevel = calculateLevels<PacketSize>(size, cost);
Barrier barrier(1 << maxLevel);
parallelForImpl<UnaryFunctor, PacketSize>(begin, end, f, barrier, maxLevel);
barrier.Wait();
}
template <typename BinaryFunctor, int PacketSize>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void parallelFor(Index outerBegin, Index outerEnd, Index innerBegin,
Index innerEnd, BinaryFunctor& f, float cost) {
Index outerSize = outerEnd - outerBegin;
Index innerSize = innerEnd - innerBegin;
Index size = outerSize * innerSize;
int maxLevel = calculateLevels<PacketSize>(size, cost);
Barrier barrier(1 << maxLevel);
parallelForImpl<BinaryFunctor, PacketSize>(outerBegin, outerEnd, innerBegin, innerEnd, f, barrier, maxLevel);
barrier.Wait();
}
ThreadPool& m_pool;
// costFactor is the cost of delegating a task to a thread
// the inverse is used to avoid a floating point division
float m_costFactor;
};
// specialization of coefficient-wise assignment loops for CoreThreadPoolDevice
namespace internal {
template <typename Kernel>
struct cost_helper {
using SrcEvaluatorType = typename Kernel::SrcEvaluatorType;
using DstEvaluatorType = typename Kernel::DstEvaluatorType;
using SrcXprType = typename SrcEvaluatorType::XprType;
using DstXprType = typename DstEvaluatorType::XprType;
static constexpr Index Cost = functor_cost<SrcXprType>::Cost + functor_cost<DstXprType>::Cost;
};
template <typename Kernel>
struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, DefaultTraversal, NoUnrolling> {
static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost;
struct AssignmentFunctor : public Kernel {
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer, Index inner) {
this->assignCoeffByOuterInner(outer, inner);
}
};
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
const Index innerSize = kernel.innerSize();
const Index outerSize = kernel.outerSize();
constexpr float cost = static_cast<float>(XprEvaluationCost);
AssignmentFunctor functor(kernel);
device.template parallelFor<AssignmentFunctor, 1>(0, outerSize, 0, innerSize, functor, cost);
}
};
template <typename Kernel>
struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, DefaultTraversal, InnerUnrolling> {
using DstXprType = typename Kernel::DstEvaluatorType::XprType;
static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, InnerSize = DstXprType::InnerSizeAtCompileTime;
struct AssignmentFunctor : public Kernel {
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer) {
copy_using_evaluator_DefaultTraversal_InnerUnrolling<Kernel, 0, InnerSize>::run(*this, outer);
}
};
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
const Index outerSize = kernel.outerSize();
AssignmentFunctor functor(kernel);
constexpr float cost = static_cast<float>(XprEvaluationCost) * static_cast<float>(InnerSize);
device.template parallelFor<AssignmentFunctor, 1>(0, outerSize, functor, cost);
}
};
template <typename Kernel>
struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, InnerVectorizedTraversal, NoUnrolling> {
using PacketType = typename Kernel::PacketType;
static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, PacketSize = unpacket_traits<PacketType>::size,
SrcAlignment = Kernel::AssignmentTraits::SrcAlignment,
DstAlignment = Kernel::AssignmentTraits::DstAlignment;
struct AssignmentFunctor : public Kernel {
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer, Index inner) {
this->template assignPacketByOuterInner<Unaligned, Unaligned, PacketType>(outer, inner);
}
};
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
const Index innerSize = kernel.innerSize();
const Index outerSize = kernel.outerSize();
const float cost = static_cast<float>(XprEvaluationCost) * static_cast<float>(innerSize);
AssignmentFunctor functor(kernel);
device.template parallelFor<AssignmentFunctor, PacketSize>(0, outerSize, 0, innerSize, functor, cost);
}
};
template <typename Kernel>
struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, InnerVectorizedTraversal, InnerUnrolling> {
using PacketType = typename Kernel::PacketType;
using DstXprType = typename Kernel::DstEvaluatorType::XprType;
static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, PacketSize = unpacket_traits<PacketType>::size,
SrcAlignment = Kernel::AssignmentTraits::SrcAlignment,
DstAlignment = Kernel::AssignmentTraits::DstAlignment,
InnerSize = DstXprType::InnerSizeAtCompileTime;
struct AssignmentFunctor : public Kernel {
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer) {
copy_using_evaluator_innervec_InnerUnrolling<Kernel, 0, InnerSize, SrcAlignment, DstAlignment>::run(*this, outer);
}
};
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
const Index outerSize = kernel.outerSize();
constexpr float cost = static_cast<float>(XprEvaluationCost) * static_cast<float>(InnerSize);
AssignmentFunctor functor(kernel);
device.template parallelFor<AssignmentFunctor, PacketSize>(0, outerSize, functor, cost);
}
};
template <typename Kernel>
struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, SliceVectorizedTraversal, NoUnrolling> {
using Scalar = typename Kernel::Scalar;
using PacketType = typename Kernel::PacketType;
static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, PacketSize = unpacket_traits<PacketType>::size;
struct PacketAssignmentFunctor : public Kernel {
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE PacketAssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer, Index inner) {
this->template assignPacketByOuterInner<Unaligned, Unaligned, PacketType>(outer, inner);
}
};
struct ScalarAssignmentFunctor : public Kernel {
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE ScalarAssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer) {
const Index innerSize = this->innerSize();
const Index packetAccessSize = numext::round_down(innerSize, PacketSize);
for (Index inner = packetAccessSize; inner < innerSize; inner++) this->assignCoeffByOuterInner(outer, inner);
}
};
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
const Index outerSize = kernel.outerSize();
const Index innerSize = kernel.innerSize();
const Index packetAccessSize = numext::round_down(innerSize, PacketSize);
constexpr float packetCost = static_cast<float>(XprEvaluationCost);
const float scalarCost = static_cast<float>(XprEvaluationCost) * static_cast<float>(innerSize - packetAccessSize);
PacketAssignmentFunctor packetFunctor(kernel);
ScalarAssignmentFunctor scalarFunctor(kernel);
device.template parallelFor<PacketAssignmentFunctor, PacketSize>(0, outerSize, 0, packetAccessSize, packetFunctor,
packetCost);
device.template parallelFor<ScalarAssignmentFunctor, 1>(0, outerSize, scalarFunctor, scalarCost);
};
};
template <typename Kernel>
struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, LinearTraversal, NoUnrolling> {
static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost;
struct AssignmentFunctor : public Kernel {
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index index) { this->assignCoeff(index); }
};
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
const Index size = kernel.size();
constexpr float cost = static_cast<float>(XprEvaluationCost);
AssignmentFunctor functor(kernel);
device.template parallelFor<AssignmentFunctor, 1>(0, size, functor, cost);
}
};
template <typename Kernel>
struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, LinearVectorizedTraversal, NoUnrolling> {
using Scalar = typename Kernel::Scalar;
using PacketType = typename Kernel::PacketType;
static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost,
RequestedAlignment = Kernel::AssignmentTraits::LinearRequiredAlignment,
PacketSize = unpacket_traits<PacketType>::size,
DstIsAligned = Kernel::AssignmentTraits::DstAlignment >= RequestedAlignment,
DstAlignment = packet_traits<Scalar>::AlignedOnScalar ? RequestedAlignment
: Kernel::AssignmentTraits::DstAlignment,
SrcAlignment = Kernel::AssignmentTraits::JointAlignment;
struct AssignmentFunctor : public Kernel {
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index index) {
this->template assignPacket<DstAlignment, SrcAlignment, PacketType>(index);
}
};
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
const Index size = kernel.size();
const Index alignedStart =
DstIsAligned ? 0 : internal::first_aligned<RequestedAlignment>(kernel.dstDataPtr(), size);
const Index alignedEnd = alignedStart + numext::round_down(size - alignedStart, PacketSize);
unaligned_dense_assignment_loop<DstIsAligned != 0>::run(kernel, 0, alignedStart);
constexpr float cost = static_cast<float>(XprEvaluationCost);
AssignmentFunctor functor(kernel);
device.template parallelFor<AssignmentFunctor, PacketSize>(alignedStart, alignedEnd, functor, cost);
unaligned_dense_assignment_loop<>::run(kernel, alignedEnd, size);
}
};
} // namespace internal
} // namespace Eigen
#endif // EIGEN_CORE_THREAD_POOL_DEVICE_H