Mercury Currency Engine
threadpool.hpp
Go to the documentation of this file.
1 //SPDX-License-Identifier: Apache-2.0
2 //Author: Blayne Dennis
7 #ifndef __MERCURY_COROUTINE_ENGINE_THREADPOOL__
8 #define __MERCURY_COROUTINE_ENGINE_THREADPOOL__
9 
10 // c
11 #include <limits.h>
12 
13 // c++
14 #include <vector>
15 #include <mutex>
16 #include <iostream>
17 #include <algorithm>
18 #include <memory>
19 
20 // local
21 #include "function_utility.hpp"
22 #include "timer.hpp"
23 #include "scheduler.hpp"
24 
25 namespace mce {
26 
27 struct threadpool;
28 
29 namespace detail {
30 
31 mce::threadpool*& tl_this_threadpool();
32 
33 }
34 
39 {
43  static inline std::shared_ptr<threadpool> make(size_t worker_count = 0)
44  {
45  threadpool* tpp = new threadpool(worker_count);
46  std::shared_ptr<threadpool> tp(tpp);
47  tpp->self_wptr_ = tp;
48  tp->init_();
49  return tp;
50  };
51 
53  virtual ~threadpool() { }
54 
56  inline size_t size() const { return workers_schedulers_.size(); }
57 
59  inline scheduler& worker(size_t idx) const
60  {
61  return *(workers_schedulers_[idx]);
62  }
63 
65  inline scheduler& worker()
66  {
67  const size_t start_idx = current_scheduler_idx_();
68  auto sz = workers_schedulers_.size();
69  size_t i = start_idx;
70  size_t end = sz; // end is our 'break out of loop' index
71  auto least_weight = workers_schedulers_[i]->measure();
72  scheduler* ret = workers_schedulers_[i];
73  bool found_empty = false;
74 
75  ++i; // start comparisons at index 1
76 
77  auto compare = [&]
78  {
79  for(; i<end; ++i)
80  {
81  // acquire the current scheduling load of a scheduler
82  auto cur_weight = workers_schedulers_[i]->measure();
83 
84  // end iteration if we find an empty scheduler
85  if(cur_weight)
86  {
87  if(cur_weight < least_weight)
88  {
89  // update return value
90  least_weight = cur_weight;
91  ret = workers_schedulers_[i];
92  }
93  }
94  else
95  {
96  found_empty = true;
97  ret = workers_schedulers_[i];
98  break;
99  }
100  }
101  };
102 
103  compare();
104 
105  // if we didn't find a 0 weight scheduler and our start_idx > 0
106  if(!found_empty && start_idx)
107  {
108  i = 0; // rotate back around to 0
109  end = start_idx; // our new end is our original beginning
110  compare();
111  }
112 
113  return *ret;
114  }
115 
124  inline std::vector<std::shared_ptr<scheduler>> workers()
125  {
126  std::vector<std::shared_ptr<scheduler>> ret(workers_schedulers_.size());
127 
128  std::transform(
129  workers_schedulers_.begin(),
130  workers_schedulers_.end(),
131  ret.begin(),
132  [](scheduler* sch){ return (std::shared_ptr<scheduler>)(*sch); }
133  );
134 
135  return ret;
136  }
137 
139  inline operator std::shared_ptr<threadpool>() { return self_wptr_.lock(); }
140 
141 protected:
142  // return the workers' state
143  inline lifecycle::state get_state_impl()
144  {
145  std::lock_guard<mce::spinlock> lk(lk_);
146 
147  auto lf = (lifecycle::implementation*)(workers_schedulers_.front());
148 
149  // state should be the same on all workers
150  return lf->get_state_impl();
151  }
152 
153  // suspend all workers, returning true if all workers suspend() == true, else false
154  inline bool suspend_impl()
155  {
156  bool ret = true;
157 
158  std::lock_guard<mce::spinlock> lk(lk_);
159 
160  for(auto& sch : workers_schedulers_)
161  {
162  ret = ret && ((lifecycle::implementation*)sch)->suspend_impl();
163  }
164 
165  return ret;
166  }
167 
168  // resume all workers
169  inline void resume_impl()
170  {
171  std::lock_guard<mce::spinlock> lk(lk_);
172 
173  for(auto& sch : workers_schedulers_)
174  {
175  ((lifecycle::implementation*)sch)->resume_impl();
176  }
177  }
178 
179  // halt all workers
180  inline void halt_impl()
181  {
182  std::lock_guard<mce::spinlock> lk(lk_);
183 
184  for(auto& worker : workers_memory_)
185  {
186  auto lf = (lifecycle::implementation*)(worker->sch.get());
187 
188  if(lf->get_state_impl() != lifecycle::state::halted)
189  {
190  lf->halt_impl();
191  worker->thd.join();
192  }
193  }
194  }
195 
196 private:
197  struct worker_thread
198  {
199  std::shared_ptr<scheduler> sch;
200  std::thread thd;
201 
202  worker_thread(std::shared_ptr<threadpool> tp) :
203  sch(scheduler::make(tp.get())),
204  thd([tp,this]() mutable
205  {
206  auto& tl_tp = detail::tl_this_threadpool();
207  auto parent_tp = tl_tp;
208  tl_tp = tp.get();
209 
210  try { while(this->sch->run()){ } }
211  catch(...)
212  {
213  tl_tp = parent_tp;
214  std::rethrow_exception(std::current_exception());
215  }
216 
217  tl_tp = parent_tp;
218  })
219  { }
220 
221  worker_thread() = delete;
222  worker_thread(worker_thread&&) = delete;
223 
224  ~worker_thread()
225  {
226  auto lf = (lifecycle::implementation*)(sch.get());
227 
228  if(lf->get_state_impl() != lifecycle::state::halted)
229  {
230  lf->halt_impl();
231  }
232 
233  if(thd.joinable()) { thd.join(); }
234  }
235  };
236 
237  threadpool(size_t worker_count) :
238  lifecycle(this),
239  workers_memory_(
240  [=]() mutable -> size_t
241  {
242  if(worker_count == 0)
243  {
244  worker_count = std::thread::hardware_concurrency();
245 
246  // enforce a minimum of 1 worker threads
247  if(worker_count == 0) { worker_count = 1; }
248  }
249 
250  return worker_count;
251  }()),
252  workers_schedulers_(workers_memory_.size())
253  { }
254 
255  // separate worker init from constructor so self shared_ptr can be setup
256  inline void init_()
257  {
258  auto self = self_wptr_.lock();
259  auto it = workers_schedulers_.begin();
260 
261  // initialize worker threads, no need for synchronization because no
262  // operations are scheduled on the schedulers till
263  // threadpool::make() returns.
264  for(auto& w : workers_memory_)
265  {
266  w = std::unique_ptr<worker_thread>(new worker_thread(self));
267  *it = w->sch.get();
268  ++it;
269  }
270  }
271 
272  // return the index of the worker we should measure() first
273  inline size_t current_scheduler_idx_()
274  {
275  std::lock_guard<mce::spinlock> lk(lk_);
276 
277  auto ret = current_scheduler_idx_val_;
278 
279  // rotate current scheduler to limit lock contention
280  if((current_scheduler_idx_val_+1) < workers_memory_.size())
281  {
282  ++current_scheduler_idx_val_;
283  }
284  else
285  {
286  current_scheduler_idx_val_ = 0;
287  }
288 
289  return ret;
290  }
291 
292  // as a general rule, anything relying on access to lk_ should not block
293  // on anything else for the duration of the lock
294  mce::spinlock lk_;
295 
296  // avoid circular shared memory structures through a weak_ptr
297  std::weak_ptr<threadpool> self_wptr_;
298 
299  // This vector never changes post initialization until the threadpool is
300  // destroyed. The fact that this vector doesn't change which worker is
301  // stored in what index is important to ensure that calls to
302  // `workers(size_t)` are consistent.
303  //
304  // Because this vector never changes until threadpool is destroyed, it can
305  // be read without a lock. However, a lock may be required for synchronized
306  // calls to scheduler operations.
307  std::vector<std::unique_ptr<worker_thread>> workers_memory_;
308 
309  // A vector of schedulers. This vector is not changed post init till
310  // destruction; it can be read without a lock.
311  std::vector<scheduler*> workers_schedulers_;
312 
313  // value which wraps around back to 0 when incremented past the max size,
314  // used for limiting lock contention by ensuring measurement()s are taken
315  // equally among all schedulers
316  size_t current_scheduler_idx_val_ = 0;
317 };
318 
320 inline bool in_threadpool()
321 {
322  return detail::tl_this_threadpool();
323 }
324 
327 {
328  return *detail::tl_this_threadpool();
329 }
330 
333 
336 
338 double balance_ratio();
339 
340 namespace detail {
341 
342 // select an arbitrary scheduler from the default_threadpool to always return
343 scheduler& default_threadpool_scheduler();
344 
345 inline scheduler& concurrent_algorithm()
346 {
347  return in_scheduler()
348  ? this_scheduler()
349  : default_threadpool_scheduler();
350 }
351 
352 inline scheduler& parallel_algorithm()
353 {
354  return in_threadpool()
355  ? this_threadpool().worker()
356  : default_threadpool().worker();
357 }
358 
360 {
361  if(in_threadpool())
362  {
364  scheduler* least_sch;
365 
366  // return true if the workload is imbalanced, else false
367  auto imbalanced = [&]() -> bool
368  {
369  auto& tp = this_threadpool();
370  size_t sz = tp.size();
371 
374 
377 
378  {
379  auto& sch = tp.worker(0);
380 
381  least_sch = &(sch);
382  least = sch.measure();
383  most = least;
384  }
385 
386  // begin on index 1, we've already taken 0
387  for(size_t i=1; i<sz; ++i)
388  {
389  auto& sch = tp.worker(i);
390  scheduler::measurement weight = sch.measure();
391 
392  if(weight < least)
393  {
394  least_sch = &(sch);
395  least = weight;
396  }
397  else if(weight > most) { most = weight; }
398  }
399 
400  // returns true if the difference between workloads is greater than
401  // the balance_ratio()
402  auto past_limit = [](size_t lhs, size_t rhs) -> bool
403  {
404  // cast to long double for a floating point division
405  return (static_cast<long double>(lhs) / rhs) >= balance_ratio();
406  };
407 
408  return past_limit(most.scheduled(), least.scheduled());
409  };
410 
411  return imbalanced()
412  ? *least_sch // select the least burdened scheduler
413  // select the current thread's scheduler
414  : this_scheduler();
415  }
416  else { return default_threadpool().worker(); }
417 }
418 
419 }
420 
433 template <typename... As>
434 void concurrent(As&&... args)
435 {
436  detail::concurrent_algorithm().schedule(std::forward<As>(args)...);
437 }
438 
451 template <typename... As>
452 void parallel(As&&... args)
453 {
454  detail::parallel_algorithm().schedule(std::forward<As>(args)...);
455 }
456 
491 template <typename... As>
492 void balance(As&&... args)
493 {
494  detail::balance_algorithm().schedule(std::forward<As>(args)...);
495 }
496 
497 }
498 #endif
bool in_scheduler()
returns true if calling scope is executing inside a running scheduler
Definition: scheduler.hpp:1095
scheduler & this_scheduler()
returns a shared pointer to the scheduler the calling scope is running in
Definition: scheduler.hpp:1106
virtual interface for implementors of lifecycle
Definition: scheduler.hpp:49
an interface for implementing lifecycle control operations
Definition: scheduler.hpp:38
state
an enumeration which represents the lifecycle object's current state
Definition: scheduler.hpp:41
lifecycle(implementation *self)
Definition: scheduler.hpp:62
a struct allowing comparison of scheduler workload
Definition: scheduler.hpp:734
size_t scheduled() const
count of all scheduled coroutines, blocked or enqueued
Definition: scheduler.hpp:790
object responsible for scheduling and executing coroutines
Definition: scheduler.hpp:180
Core mechanism for atomic synchronization.
Definition: atomic.hpp:20
Definition: threadpool.hpp:39
scheduler & worker()
return the least busy worker scheduler at time of call
Definition: threadpool.hpp:65
scheduler & worker(size_t idx) const
access the scheduler for a worker at a given index
Definition: threadpool.hpp:59
std::vector< std::shared_ptr< scheduler > > workers()
Definition: threadpool.hpp:124
virtual ~threadpool()
halt(), join() and delete all workers
Definition: threadpool.hpp:53
static std::shared_ptr< threadpool > make(size_t worker_count=0)
construct an allocated threadpool with a count of worker threads
Definition: threadpool.hpp:43
size_t size() const
return the count of worker threads
Definition: threadpool.hpp:56
void concurrent(As &&... args)
Launch user function and optional arguments as a coroutine running on a scheduler.
Definition: threadpool.hpp:434
void parallel(As &&... args)
Launch user function and optional arguments as a coroutine running on a scheduler.
Definition: threadpool.hpp:452
void balance(As &&... args)
Launch user function and optional arguments as a coroutine running on a scheduler....
Definition: threadpool.hpp:492
bool default_threadpool_enabled()
return true if default_threadpool() can be safely called, else false
scheduler & balance_algorithm()
Definition: threadpool.hpp:359
bool in_threadpool()
Return true if calling context is running in a threadpool.
Definition: threadpool.hpp:320
threadpool & default_threadpool()
return the default threadpool's
double balance_ratio()
return the balance ratio, set by compiler define: MCEBALANCERATIO
threadpool & this_threadpool()
return a reference to the threadpool the calling code is executing in
Definition: threadpool.hpp:326