You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
486 lines
17 KiB
486 lines
17 KiB
// This file is part of Eigen, a lightweight C++ template library |
|
// for linear algebra. |
|
// |
|
// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com> |
|
// |
|
// This Source Code Form is subject to the terms of the Mozilla |
|
// Public License v. 2.0. If a copy of the MPL was not distributed |
|
// with this file, You can obtain one at the mozilla.org home page |
|
|
|
#ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H |
|
#define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H |
|
|
|
namespace Eigen { |
|
|
|
template <typename Environment> |
|
class ThreadPoolTempl : public Eigen::ThreadPoolInterface { |
|
public: |
|
typedef typename Environment::Task Task; |
|
typedef RunQueue<Task, 1024> Queue; |
|
|
|
ThreadPoolTempl(int num_threads, Environment env = Environment()) |
|
: ThreadPoolTempl(num_threads, true, env) {} |
|
|
|
ThreadPoolTempl(int num_threads, bool allow_spinning, |
|
Environment env = Environment()) |
|
: env_(env), |
|
num_threads_(num_threads), |
|
allow_spinning_(allow_spinning), |
|
thread_data_(num_threads), |
|
all_coprimes_(num_threads), |
|
waiters_(num_threads), |
|
global_steal_partition_(EncodePartition(0, num_threads_)), |
|
blocked_(0), |
|
spinning_(0), |
|
done_(false), |
|
cancelled_(false), |
|
ec_(waiters_) { |
|
waiters_.resize(num_threads_); |
|
// Calculate coprimes of all numbers [1, num_threads]. |
|
// Coprimes are used for random walks over all threads in Steal |
|
// and NonEmptyQueueIndex. Iteration is based on the fact that if we take |
|
// a random starting thread index t and calculate num_threads - 1 subsequent |
|
// indices as (t + coprime) % num_threads, we will cover all threads without |
|
// repetitions (effectively getting a presudo-random permutation of thread |
|
// indices). |
|
eigen_plain_assert(num_threads_ < kMaxThreads); |
|
for (int i = 1; i <= num_threads_; ++i) { |
|
all_coprimes_.emplace_back(i); |
|
ComputeCoprimes(i, &all_coprimes_.back()); |
|
} |
|
#ifndef EIGEN_THREAD_LOCAL |
|
init_barrier_.reset(new Barrier(num_threads_)); |
|
#endif |
|
thread_data_.resize(num_threads_); |
|
for (int i = 0; i < num_threads_; i++) { |
|
SetStealPartition(i, EncodePartition(0, num_threads_)); |
|
thread_data_[i].thread.reset( |
|
env_.CreateThread([this, i]() { WorkerLoop(i); })); |
|
} |
|
#ifndef EIGEN_THREAD_LOCAL |
|
// Wait for workers to initialize per_thread_map_. Otherwise we might race |
|
// with them in Schedule or CurrentThreadId. |
|
init_barrier_->Wait(); |
|
#endif |
|
} |
|
|
|
~ThreadPoolTempl() { |
|
done_ = true; |
|
|
|
// Now if all threads block without work, they will start exiting. |
|
// But note that threads can continue to work arbitrary long, |
|
// block, submit new work, unblock and otherwise live full life. |
|
if (!cancelled_) { |
|
ec_.Notify(true); |
|
} else { |
|
// Since we were cancelled, there might be entries in the queues. |
|
// Empty them to prevent their destructor from asserting. |
|
for (size_t i = 0; i < thread_data_.size(); i++) { |
|
thread_data_[i].queue.Flush(); |
|
} |
|
} |
|
// Join threads explicitly (by destroying) to avoid destruction order within |
|
// this class. |
|
for (size_t i = 0; i < thread_data_.size(); ++i) |
|
thread_data_[i].thread.reset(); |
|
} |
|
|
|
void SetStealPartitions(const std::vector<std::pair<unsigned, unsigned>>& partitions) { |
|
eigen_plain_assert(partitions.size() == static_cast<std::size_t>(num_threads_)); |
|
|
|
// Pass this information to each thread queue. |
|
for (int i = 0; i < num_threads_; i++) { |
|
const auto& pair = partitions[i]; |
|
unsigned start = pair.first, end = pair.second; |
|
AssertBounds(start, end); |
|
unsigned val = EncodePartition(start, end); |
|
SetStealPartition(i, val); |
|
} |
|
} |
|
|
|
void Schedule(std::function<void()> fn) EIGEN_OVERRIDE { |
|
ScheduleWithHint(std::move(fn), 0, num_threads_); |
|
} |
|
|
|
void ScheduleWithHint(std::function<void()> fn, int start, |
|
int limit) override { |
|
Task t = env_.CreateTask(std::move(fn)); |
|
PerThread* pt = GetPerThread(); |
|
if (pt->pool == this) { |
|
// Worker thread of this pool, push onto the thread's queue. |
|
Queue& q = thread_data_[pt->thread_id].queue; |
|
t = q.PushFront(std::move(t)); |
|
} else { |
|
// A free-standing thread (or worker of another pool), push onto a random |
|
// queue. |
|
eigen_plain_assert(start < limit); |
|
eigen_plain_assert(limit <= num_threads_); |
|
int num_queues = limit - start; |
|
int rnd = Rand(&pt->rand) % num_queues; |
|
eigen_plain_assert(start + rnd < limit); |
|
Queue& q = thread_data_[start + rnd].queue; |
|
t = q.PushBack(std::move(t)); |
|
} |
|
// Note: below we touch this after making w available to worker threads. |
|
// Strictly speaking, this can lead to a racy-use-after-free. Consider that |
|
// Schedule is called from a thread that is neither main thread nor a worker |
|
// thread of this pool. Then, execution of w directly or indirectly |
|
// completes overall computations, which in turn leads to destruction of |
|
// this. We expect that such scenario is prevented by program, that is, |
|
// this is kept alive while any threads can potentially be in Schedule. |
|
if (!t.f) { |
|
ec_.Notify(false); |
|
} else { |
|
env_.ExecuteTask(t); // Push failed, execute directly. |
|
} |
|
} |
|
|
|
void Cancel() EIGEN_OVERRIDE { |
|
cancelled_ = true; |
|
done_ = true; |
|
|
|
// Let each thread know it's been cancelled. |
|
#ifdef EIGEN_THREAD_ENV_SUPPORTS_CANCELLATION |
|
for (size_t i = 0; i < thread_data_.size(); i++) { |
|
thread_data_[i].thread->OnCancel(); |
|
} |
|
#endif |
|
|
|
// Wake up the threads without work to let them exit on their own. |
|
ec_.Notify(true); |
|
} |
|
|
|
int NumThreads() const EIGEN_FINAL { return num_threads_; } |
|
|
|
int CurrentThreadId() const EIGEN_FINAL { |
|
const PerThread* pt = const_cast<ThreadPoolTempl*>(this)->GetPerThread(); |
|
if (pt->pool == this) { |
|
return pt->thread_id; |
|
} else { |
|
return -1; |
|
} |
|
} |
|
|
|
private: |
|
// Create a single atomic<int> that encodes start and limit information for |
|
// each thread. |
|
// We expect num_threads_ < 65536, so we can store them in a single |
|
// std::atomic<unsigned>. |
|
// Exposed publicly as static functions so that external callers can reuse |
|
// this encode/decode logic for maintaining their own thread-safe copies of |
|
// scheduling and steal domain(s). |
|
static const int kMaxPartitionBits = 16; |
|
static const int kMaxThreads = 1 << kMaxPartitionBits; |
|
|
|
inline unsigned EncodePartition(unsigned start, unsigned limit) { |
|
return (start << kMaxPartitionBits) | limit; |
|
} |
|
|
|
inline void DecodePartition(unsigned val, unsigned* start, unsigned* limit) { |
|
*limit = val & (kMaxThreads - 1); |
|
val >>= kMaxPartitionBits; |
|
*start = val; |
|
} |
|
|
|
void AssertBounds(int start, int end) { |
|
eigen_plain_assert(start >= 0); |
|
eigen_plain_assert(start < end); // non-zero sized partition |
|
eigen_plain_assert(end <= num_threads_); |
|
} |
|
|
|
inline void SetStealPartition(size_t i, unsigned val) { |
|
thread_data_[i].steal_partition.store(val, std::memory_order_relaxed); |
|
} |
|
|
|
inline unsigned GetStealPartition(int i) { |
|
return thread_data_[i].steal_partition.load(std::memory_order_relaxed); |
|
} |
|
|
|
void ComputeCoprimes(int N, MaxSizeVector<unsigned>* coprimes) { |
|
for (int i = 1; i <= N; i++) { |
|
unsigned a = i; |
|
unsigned b = N; |
|
// If GCD(a, b) == 1, then a and b are coprimes. |
|
while (b != 0) { |
|
unsigned tmp = a; |
|
a = b; |
|
b = tmp % b; |
|
} |
|
if (a == 1) { |
|
coprimes->push_back(i); |
|
} |
|
} |
|
} |
|
|
|
typedef typename Environment::EnvThread Thread; |
|
|
|
struct PerThread { |
|
constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {} |
|
ThreadPoolTempl* pool; // Parent pool, or null for normal threads. |
|
uint64_t rand; // Random generator state. |
|
int thread_id; // Worker thread index in pool. |
|
#ifndef EIGEN_THREAD_LOCAL |
|
// Prevent false sharing. |
|
char pad_[128]; |
|
#endif |
|
}; |
|
|
|
struct ThreadData { |
|
constexpr ThreadData() : thread(), steal_partition(0), queue() {} |
|
std::unique_ptr<Thread> thread; |
|
std::atomic<unsigned> steal_partition; |
|
Queue queue; |
|
}; |
|
|
|
Environment env_; |
|
const int num_threads_; |
|
const bool allow_spinning_; |
|
MaxSizeVector<ThreadData> thread_data_; |
|
MaxSizeVector<MaxSizeVector<unsigned>> all_coprimes_; |
|
MaxSizeVector<EventCount::Waiter> waiters_; |
|
unsigned global_steal_partition_; |
|
std::atomic<unsigned> blocked_; |
|
std::atomic<bool> spinning_; |
|
std::atomic<bool> done_; |
|
std::atomic<bool> cancelled_; |
|
EventCount ec_; |
|
#ifndef EIGEN_THREAD_LOCAL |
|
std::unique_ptr<Barrier> init_barrier_; |
|
std::mutex per_thread_map_mutex_; // Protects per_thread_map_. |
|
std::unordered_map<uint64_t, std::unique_ptr<PerThread>> per_thread_map_; |
|
#endif |
|
|
|
// Main worker thread loop. |
|
void WorkerLoop(int thread_id) { |
|
#ifndef EIGEN_THREAD_LOCAL |
|
std::unique_ptr<PerThread> new_pt(new PerThread()); |
|
per_thread_map_mutex_.lock(); |
|
bool insertOK = per_thread_map_.emplace(GlobalThreadIdHash(), std::move(new_pt)).second; |
|
eigen_plain_assert(insertOK); |
|
EIGEN_UNUSED_VARIABLE(insertOK); |
|
per_thread_map_mutex_.unlock(); |
|
init_barrier_->Notify(); |
|
init_barrier_->Wait(); |
|
#endif |
|
PerThread* pt = GetPerThread(); |
|
pt->pool = this; |
|
pt->rand = GlobalThreadIdHash(); |
|
pt->thread_id = thread_id; |
|
Queue& q = thread_data_[thread_id].queue; |
|
EventCount::Waiter* waiter = &waiters_[thread_id]; |
|
// TODO(dvyukov,rmlarsen): The time spent in NonEmptyQueueIndex() is |
|
// proportional to num_threads_ and we assume that new work is scheduled at |
|
// a constant rate, so we set spin_count to 5000 / num_threads_. The |
|
// constant was picked based on a fair dice roll, tune it. |
|
const int spin_count = |
|
allow_spinning_ && num_threads_ > 0 ? 5000 / num_threads_ : 0; |
|
if (num_threads_ == 1) { |
|
// For num_threads_ == 1 there is no point in going through the expensive |
|
// steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the |
|
// victim queues it might reverse the order in which ops are executed |
|
// compared to the order in which they are scheduled, which tends to be |
|
// counter-productive for the types of I/O workloads the single thread |
|
// pools tend to be used for. |
|
while (!cancelled_) { |
|
Task t = q.PopFront(); |
|
for (int i = 0; i < spin_count && !t.f; i++) { |
|
if (!cancelled_.load(std::memory_order_relaxed)) { |
|
t = q.PopFront(); |
|
} |
|
} |
|
if (!t.f) { |
|
if (!WaitForWork(waiter, &t)) { |
|
return; |
|
} |
|
} |
|
if (t.f) { |
|
env_.ExecuteTask(t); |
|
} |
|
} |
|
} else { |
|
while (!cancelled_) { |
|
Task t = q.PopFront(); |
|
if (!t.f) { |
|
t = LocalSteal(); |
|
if (!t.f) { |
|
t = GlobalSteal(); |
|
if (!t.f) { |
|
// Leave one thread spinning. This reduces latency. |
|
if (allow_spinning_ && !spinning_ && !spinning_.exchange(true)) { |
|
for (int i = 0; i < spin_count && !t.f; i++) { |
|
if (!cancelled_.load(std::memory_order_relaxed)) { |
|
t = GlobalSteal(); |
|
} else { |
|
return; |
|
} |
|
} |
|
spinning_ = false; |
|
} |
|
if (!t.f) { |
|
if (!WaitForWork(waiter, &t)) { |
|
return; |
|
} |
|
} |
|
} |
|
} |
|
} |
|
if (t.f) { |
|
env_.ExecuteTask(t); |
|
} |
|
} |
|
} |
|
} |
|
|
|
// Steal tries to steal work from other worker threads in the range [start, |
|
// limit) in best-effort manner. |
|
Task Steal(unsigned start, unsigned limit) { |
|
PerThread* pt = GetPerThread(); |
|
const size_t size = limit - start; |
|
unsigned r = Rand(&pt->rand); |
|
// Reduce r into [0, size) range, this utilizes trick from |
|
// xxxps://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ |
|
eigen_plain_assert(all_coprimes_[size - 1].size() < (1<<30)); |
|
unsigned victim = ((uint64_t)r * (uint64_t)size) >> 32; |
|
unsigned index = ((uint64_t) all_coprimes_[size - 1].size() * (uint64_t)r) >> 32; |
|
unsigned inc = all_coprimes_[size - 1][index]; |
|
|
|
for (unsigned i = 0; i < size; i++) { |
|
eigen_plain_assert(start + victim < limit); |
|
Task t = thread_data_[start + victim].queue.PopBack(); |
|
if (t.f) { |
|
return t; |
|
} |
|
victim += inc; |
|
if (victim >= size) { |
|
victim -= size; |
|
} |
|
} |
|
return Task(); |
|
} |
|
|
|
// Steals work within threads belonging to the partition. |
|
Task LocalSteal() { |
|
PerThread* pt = GetPerThread(); |
|
unsigned partition = GetStealPartition(pt->thread_id); |
|
// If thread steal partition is the same as global partition, there is no |
|
// need to go through the steal loop twice. |
|
if (global_steal_partition_ == partition) return Task(); |
|
unsigned start, limit; |
|
DecodePartition(partition, &start, &limit); |
|
AssertBounds(start, limit); |
|
|
|
return Steal(start, limit); |
|
} |
|
|
|
// Steals work from any other thread in the pool. |
|
Task GlobalSteal() { |
|
return Steal(0, num_threads_); |
|
} |
|
|
|
|
|
// WaitForWork blocks until new work is available (returns true), or if it is |
|
// time to exit (returns false). Can optionally return a task to execute in t |
|
// (in such case t.f != nullptr on return). |
|
bool WaitForWork(EventCount::Waiter* waiter, Task* t) { |
|
eigen_plain_assert(!t->f); |
|
// We already did best-effort emptiness check in Steal, so prepare for |
|
// blocking. |
|
ec_.Prewait(); |
|
// Now do a reliable emptiness check. |
|
int victim = NonEmptyQueueIndex(); |
|
if (victim != -1) { |
|
ec_.CancelWait(); |
|
if (cancelled_) { |
|
return false; |
|
} else { |
|
*t = thread_data_[victim].queue.PopBack(); |
|
return true; |
|
} |
|
} |
|
// Number of blocked threads is used as termination condition. |
|
// If we are shutting down and all worker threads blocked without work, |
|
// that's we are done. |
|
blocked_++; |
|
// TODO is blocked_ required to be unsigned? |
|
if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) { |
|
ec_.CancelWait(); |
|
// Almost done, but need to re-check queues. |
|
// Consider that all queues are empty and all worker threads are preempted |
|
// right after incrementing blocked_ above. Now a free-standing thread |
|
// submits work and calls destructor (which sets done_). If we don't |
|
// re-check queues, we will exit leaving the work unexecuted. |
|
if (NonEmptyQueueIndex() != -1) { |
|
// Note: we must not pop from queues before we decrement blocked_, |
|
// otherwise the following scenario is possible. Consider that instead |
|
// of checking for emptiness we popped the only element from queues. |
|
// Now other worker threads can start exiting, which is bad if the |
|
// work item submits other work. So we just check emptiness here, |
|
// which ensures that all worker threads exit at the same time. |
|
blocked_--; |
|
return true; |
|
} |
|
// Reached stable termination state. |
|
ec_.Notify(true); |
|
return false; |
|
} |
|
ec_.CommitWait(waiter); |
|
blocked_--; |
|
return true; |
|
} |
|
|
|
int NonEmptyQueueIndex() { |
|
PerThread* pt = GetPerThread(); |
|
// We intentionally design NonEmptyQueueIndex to steal work from |
|
// anywhere in the queue so threads don't block in WaitForWork() forever |
|
// when all threads in their partition go to sleep. Steal is still local. |
|
const size_t size = thread_data_.size(); |
|
unsigned r = Rand(&pt->rand); |
|
unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()]; |
|
unsigned victim = r % size; |
|
for (unsigned i = 0; i < size; i++) { |
|
if (!thread_data_[victim].queue.Empty()) { |
|
return victim; |
|
} |
|
victim += inc; |
|
if (victim >= size) { |
|
victim -= size; |
|
} |
|
} |
|
return -1; |
|
} |
|
|
|
static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() { |
|
return std::hash<std::thread::id>()(std::this_thread::get_id()); |
|
} |
|
|
|
EIGEN_STRONG_INLINE PerThread* GetPerThread() { |
|
#ifndef EIGEN_THREAD_LOCAL |
|
static PerThread dummy; |
|
auto it = per_thread_map_.find(GlobalThreadIdHash()); |
|
if (it == per_thread_map_.end()) { |
|
return &dummy; |
|
} else { |
|
return it->second.get(); |
|
} |
|
#else |
|
EIGEN_THREAD_LOCAL PerThread per_thread_; |
|
PerThread* pt = &per_thread_; |
|
return pt; |
|
#endif |
|
} |
|
|
|
static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) { |
|
uint64_t current = *state; |
|
// Update the internal state |
|
*state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL; |
|
// Generate the random output (using the PCG-XSH-RS scheme) |
|
return static_cast<unsigned>((current ^ (current >> 22)) >> |
|
(22 + (current >> 61))); |
|
} |
|
}; |
|
|
|
typedef ThreadPoolTempl<StlThreadEnvironment> ThreadPool; |
|
|
|
} // namespace Eigen |
|
|
|
#endif // EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
|
|
|