7 #ifndef __MERCURY_COROUTINE_ENGINE_THREADPOOL__
8 #define __MERCURY_COROUTINE_ENGINE_THREADPOOL__
43 static inline std::shared_ptr<threadpool>
make(
size_t worker_count = 0)
46 std::shared_ptr<threadpool> tp(tpp);
56 inline size_t size()
const {
return workers_schedulers_.size(); }
61 return *(workers_schedulers_[idx]);
67 const size_t start_idx = current_scheduler_idx_();
68 auto sz = workers_schedulers_.size();
71 auto least_weight = workers_schedulers_[i]->measure();
73 bool found_empty =
false;
82 auto cur_weight = workers_schedulers_[i]->measure();
87 if(cur_weight < least_weight)
90 least_weight = cur_weight;
91 ret = workers_schedulers_[i];
97 ret = workers_schedulers_[i];
106 if(!found_empty && start_idx)
124 inline std::vector<std::shared_ptr<scheduler>>
workers()
126 std::vector<std::shared_ptr<scheduler>> ret(workers_schedulers_.size());
129 workers_schedulers_.begin(),
130 workers_schedulers_.end(),
132 [](
scheduler* sch){ return (std::shared_ptr<scheduler>)(*sch); }
139 inline operator std::shared_ptr<threadpool>() {
return self_wptr_.lock(); }
145 std::lock_guard<mce::spinlock> lk(lk_);
150 return lf->get_state_impl();
154 inline bool suspend_impl()
158 std::lock_guard<mce::spinlock> lk(lk_);
160 for(
auto& sch : workers_schedulers_)
162 ret = ret && ((lifecycle::implementation*)sch)->suspend_impl();
169 inline void resume_impl()
171 std::lock_guard<mce::spinlock> lk(lk_);
173 for(
auto& sch : workers_schedulers_)
175 ((lifecycle::implementation*)sch)->resume_impl();
180 inline void halt_impl()
182 std::lock_guard<mce::spinlock> lk(lk_);
184 for(
auto&
worker : workers_memory_)
186 auto lf = (lifecycle::implementation*)(
worker->sch.get());
188 if(lf->get_state_impl() != lifecycle::state::halted)
199 std::shared_ptr<scheduler> sch;
202 worker_thread(std::shared_ptr<threadpool> tp) :
203 sch(scheduler::
make(tp.get())),
204 thd([tp,this]() mutable
206 auto& tl_tp = detail::tl_this_threadpool();
207 auto parent_tp = tl_tp;
210 try {
while(this->sch->run()){ } }
214 std::rethrow_exception(std::current_exception());
221 worker_thread() =
delete;
222 worker_thread(worker_thread&&) =
delete;
226 auto lf = (lifecycle::implementation*)(sch.get());
228 if(lf->get_state_impl() != lifecycle::state::halted)
233 if(thd.joinable()) { thd.join(); }
237 threadpool(
size_t worker_count) :
240 [=]() mutable -> size_t
242 if(worker_count == 0)
244 worker_count = std::thread::hardware_concurrency();
247 if(worker_count == 0) { worker_count = 1; }
252 workers_schedulers_(workers_memory_.size())
258 auto self = self_wptr_.lock();
259 auto it = workers_schedulers_.begin();
264 for(
auto& w : workers_memory_)
266 w = std::unique_ptr<worker_thread>(
new worker_thread(
self));
273 inline size_t current_scheduler_idx_()
275 std::lock_guard<mce::spinlock> lk(lk_);
277 auto ret = current_scheduler_idx_val_;
280 if((current_scheduler_idx_val_+1) < workers_memory_.size())
282 ++current_scheduler_idx_val_;
286 current_scheduler_idx_val_ = 0;
297 std::weak_ptr<threadpool> self_wptr_;
307 std::vector<std::unique_ptr<worker_thread>> workers_memory_;
311 std::vector<scheduler*> workers_schedulers_;
316 size_t current_scheduler_idx_val_ = 0;
322 return detail::tl_this_threadpool();
328 return *detail::tl_this_threadpool();
343 scheduler& default_threadpool_scheduler();
349 : default_threadpool_scheduler();
352 inline scheduler& parallel_algorithm()
354 return in_threadpool()
355 ? this_threadpool().
worker()
356 : default_threadpool().
worker();
367 auto imbalanced = [&]() ->
bool
370 size_t sz = tp.size();
379 auto& sch = tp.worker(0);
382 least = sch.measure();
387 for(
size_t i=1; i<sz; ++i)
389 auto& sch = tp.worker(i);
397 else if(weight > most) { most = weight; }
402 auto past_limit = [](
size_t lhs,
size_t rhs) ->
bool
405 return (
static_cast<long double>(lhs) / rhs) >=
balance_ratio();
433 template <
typename... As>
436 detail::concurrent_algorithm().schedule(std::forward<As>(args)...);
451 template <
typename... As>
454 detail::parallel_algorithm().schedule(std::forward<As>(args)...);
491 template <
typename... As>
494 detail::balance_algorithm().schedule(std::forward<As>(args)...);
bool in_scheduler()
returns true if calling scope is executing inside a running scheduler
Definition: scheduler.hpp:1095
scheduler & this_scheduler()
returns a shared pointer to the scheduler the calling scope is running in
Definition: scheduler.hpp:1106
virtual interface for implementors of lifecycle
Definition: scheduler.hpp:49
an interface for implementing lifecycle control operations
Definition: scheduler.hpp:38
state
an enumeration which represents the lifecycle object's current state
Definition: scheduler.hpp:41
lifecycle(implementation *self)
Definition: scheduler.hpp:62
a struct allowing comparison of scheduler workload
Definition: scheduler.hpp:734
size_t scheduled() const
count of all scheduled coroutines, blocked or enqueued
Definition: scheduler.hpp:790
object responsible for scheduling and executing coroutines
Definition: scheduler.hpp:180
Core mechanism for atomic synchronization.
Definition: atomic.hpp:20
Definition: threadpool.hpp:39
scheduler & worker()
return the least busy worker scheduler at time of call
Definition: threadpool.hpp:65
scheduler & worker(size_t idx) const
access the scheduler for a worker at a given index
Definition: threadpool.hpp:59
std::vector< std::shared_ptr< scheduler > > workers()
Definition: threadpool.hpp:124
virtual ~threadpool()
halt(), join() and delete all workers
Definition: threadpool.hpp:53
static std::shared_ptr< threadpool > make(size_t worker_count=0)
construct an allocated threadpool with a count of worker threads
Definition: threadpool.hpp:43
size_t size() const
return the count of worker threads
Definition: threadpool.hpp:56
void concurrent(As &&... args)
Launch user function and optional arguments as a coroutine running on a scheduler.
Definition: threadpool.hpp:434
void parallel(As &&... args)
Launch user function and optional arguments as a coroutine running on a scheduler.
Definition: threadpool.hpp:452
void balance(As &&... args)
Launch user function and optional arguments as a coroutine running on a scheduler....
Definition: threadpool.hpp:492
bool default_threadpool_enabled()
return true if default_threadpool() can be safely called, else false
scheduler & balance_algorithm()
Definition: threadpool.hpp:359
bool in_threadpool()
Return true if calling context is running in a threadpool.
Definition: threadpool.hpp:320
threadpool & default_threadpool()
return the default threadpool's
double balance_ratio()
return the balance ratio, set by compiler define: MCEBALANCERATIO
threadpool & this_threadpool()
return a reference to the threadpool the calling code is executing in
Definition: threadpool.hpp:326