9 #ifndef __MERCURY_COROUTINE_ENGINE_SCHEDULER__
10 #define __MERCURY_COROUTINE_ENGINE_SCHEDULER__
17 #include <condition_variable>
22 #include <type_traits>
51 virtual state get_state_impl() = 0;
52 virtual bool suspend_impl() = 0;
53 virtual void resume_impl() = 0;
54 virtual void halt_impl() = 0;
99 if(root_){
return root_->get_state_impl(); }
100 else {
return self_->get_state_impl(); }
110 if(root_){
return root_->suspend_impl(); }
111 else {
return self_->suspend_impl(); }
119 if(root_){ root_->resume_impl(); }
120 else { self_->resume_impl(); }
133 if(root_){ root_->halt_impl(); }
134 else { self_->halt_impl(); }
138 implementation* self_;
139 implementation* root_;
147 scheduler*& tl_this_scheduler();
151 scheduler*& tl_this_scheduler_redirect();
153 template <
typename T>
154 using queue = std::deque<T>;
213 std::unique_ptr<mce::coroutine>*
coroutine;
216 std::weak_ptr<mce::scheduler>* source;
222 void(*cleanup)(
void*);
239 detail::tl_this_scheduler()->park_continuation_ = &c;
272 virtual const char* what()
const throw()
274 return "Cannot unpark a parkable that is not parked";
282 template <
typename LOCK>
287 wc = waiting_context::make<scheduled_context<LOCK>>();
291 wc = waiting_context::make<coroutine_context<LOCK>>();
295 wc = waiting_context::make<thread_context<LOCK>>();
298 wc->wait((
void*)&lk);
305 template <
typename LOCK>
308 if(wc) { wc->notify(); }
317 struct waiting_context
319 template <
typename CONTEXT>
320 static std::unique_ptr<waiting_context>
make()
322 return std::unique_ptr<waiting_context>(
323 static_cast<waiting_context*
>(
new CONTEXT));
326 virtual ~waiting_context() { }
327 virtual void wait(
void* m) = 0;
328 virtual void notify() = 0;
331 template <
typename LOCK>
332 struct scheduled_context :
public waiting_context
334 virtual ~scheduled_context(){}
336 inline void wait(
void* m)
338 LOCK& lk = *((LOCK*)m);
343 park::continuation pc{&co, &wsch, m, unlocker<LOCK>};
354 auto sch = wsch.lock();
356 if(sch) { sch->schedule(std::move(co)); }
359 std::unique_ptr<coroutine> co;
360 std::weak_ptr<scheduler> wsch;
363 template <
typename LOCK>
364 struct coroutine_context :
public waiting_context
366 virtual ~coroutine_context() { }
368 inline void wait(
void* m)
370 LOCK& lk = *((LOCK*)m);
385 co_not_ready =
false;
388 bool co_not_ready =
false;
391 template <
typename LOCK>
392 struct thread_context :
public waiting_context
394 virtual ~thread_context() { }
396 inline void wait(
void* m)
398 LOCK& lk = *((LOCK*)m);
415 std::condition_variable_any cv;
418 template <
typename LOCK>
419 static inline void unlocker(
void* lk)
421 ((LOCK*)lk)->unlock();
424 std::unique_ptr<waiting_context> wc;
443 template <
typename... As>
445 std::function<void(void*)>(std::forward<As>(as)...),
450 inline operator parkable*() {
return parkable_; }
469 std::shared_ptr<scheduler> s(sp);
514 scheduler*& tl_cs = detail::tl_this_scheduler();
515 scheduler*& tl_cs_re = detail::tl_this_scheduler_redirect();
522 bool child = parent_cs;
528 auto push_scheduler_state_ = [&]
535 auto pop_scheduler_state_ = [&]
538 tl_cs_re = parent_cs;
541 push_scheduler_state_();
543 std::unique_lock<mce::spinlock> lk(lk_);
546 auto execute_co = [&]
550 cur_co = task_queue_.front();
560 if(park_continuation_)
562 auto& pc = *park_continuation_;
563 *(pc.coroutine) = std::unique_ptr<coroutine>(cur_co);
564 *(pc.source) = self_wptr_;
565 pc.cleanup(pc.memory);
566 park_continuation_ =
nullptr;
568 task_queue_.pop_front();
575 task_queue_.pop_front();
582 task_queue_.pop_front();
583 task_queue_.push_back(cur_co);
588 while(state_ == lifecycle::state::suspended) { resume_wait_(lk); }
592 if(state_ == lifecycle::state::ready)
596 state_ = lifecycle::state::running;
603 while(can_continue_())
605 if(task_queue_.size())
611 pop_scheduler_state_();
618 push_scheduler_state_();
624 pop_scheduler_state_();
626 while(task_queue_.empty() && can_continue_())
628 tasks_available_child_wait_(lk);
631 push_scheduler_state_();
638 while(can_continue_())
640 if(task_queue_.size()) { execute_co(); }
643 while(task_queue_.empty() && can_continue_())
645 tasks_available_wait_(lk);
654 if(cur_co) {
delete cur_co; }
662 std::rethrow_exception(std::current_exception());
666 pop_scheduler_state_();
668 if(state_ == lifecycle::state::suspended)
679 halt_complete_ =
true;
682 halt_complete_cv_.notify_all();
684 while(halt_complete_waiters_.size())
686 halt_complete_waiters_.front()->unpark(lk_);
687 halt_complete_waiters_.pop_front();
713 template <
typename A,
typename... As>
716 std::unique_lock<mce::spinlock> lk(lk_);
718 if(state_ != lifecycle::state::halted)
720 schedule_(std::forward<A>(a), std::forward<As>(as)...);
761 weight_ = rhs.weight_;
765 inline measurement& operator=(measurement&& rhs)
767 weight_ = rhs.weight_;
771 inline bool operator ==(
const measurement& rhs)
773 return weight_ == rhs.weight_;
776 inline bool operator <(
const measurement& rhs)
778 return weight_ < rhs.weight_;
781 inline bool operator <=(
const measurement& rhs)
783 return weight_ <= rhs.weight_;
787 inline operator size_t()
const {
return weight_; }
790 inline size_t scheduled()
const {
return weight_ & right_mask; }
793 inline size_t enqueued()
const {
return (weight_ & left_mask) >> half_width; }
799 static constexpr
size_t size_width =
sizeof(size_t) * 8;
800 static constexpr
size_t half_width = size_width / 2;
801 static constexpr
size_t right_mask = SIZE_MAX >> half_width;
802 static constexpr
size_t left_mask = SIZE_MAX << half_width;
820 size_t enqueued = task_queue_.size();
821 size_t scheduled = scheduled_;
825 return { enqueued, scheduled };
829 inline operator std::shared_ptr<scheduler>() {
return self_wptr_.lock(); }
832 inline state get_state_impl()
834 std::unique_lock<mce::spinlock> lk(lk_);
838 inline bool suspend_impl()
840 std::unique_lock<mce::spinlock> lk(lk_);
842 if(state_ == lifecycle::state::halted) {
return false; }
845 state_ = lifecycle::state::suspended;
848 tasks_available_notify_();
853 inline void resume_impl()
855 std::unique_lock<mce::spinlock> lk(lk_);
857 if(state_ == lifecycle::state::suspended)
859 state_ = lifecycle::state::ready;
864 inline void halt_impl()
866 std::unique_lock<mce::spinlock> lk(lk_);
869 state_ = lifecycle::state::halted;
884 tasks_available_notify_();
890 while(!halt_complete_)
893 halt_complete_waiters_.push_back(&p);
900 while(!halt_complete_){ halt_complete_cv_.wait(lk); }
906 scheduler(lifecycle::implementation* root) :
918 inline void reset_flags_()
920 halt_complete_ =
false;
921 waiting_for_resume_ =
false;
922 waiting_for_tasks_ =
false;
923 tasks_available_park_ =
nullptr;
927 inline bool can_continue_() {
return state_ < lifecycle::state::suspended; }
929 void clear_task_queue_()
931 while(task_queue_.size())
933 delete task_queue_.front();
934 task_queue_.pop_front();
939 template <
typename LOCK>
940 void resume_wait_(LOCK& lk)
942 waiting_for_resume_ =
true;
947 inline void resume_notify_()
950 if(waiting_for_resume_)
952 waiting_for_resume_ =
false;
953 resume_cv_.notify_one();
957 inline void update_running_state_()
961 if(state_ == lifecycle::state::ready)
963 state_ = lifecycle::state::running;
968 template <
typename LOCK>
969 void tasks_available_wait_(LOCK& lk)
971 waiting_for_tasks_ =
true;
972 tasks_available_cv_.wait(lk);
973 update_running_state_();
976 template <
typename LOCK>
977 void tasks_available_child_wait_(LOCK& lk)
979 waiting_for_tasks_ =
true;
981 tasks_available_park_ = &p;
983 update_running_state_();
987 inline void tasks_available_notify_()
990 if(waiting_for_tasks_)
992 waiting_for_tasks_ =
false;
994 if(tasks_available_park_)
996 tasks_available_park_->
unpark(lk_);
997 tasks_available_park_ =
nullptr;
1001 tasks_available_cv_.notify_one();
1007 inline void schedule_() { tasks_available_notify_(); }
1009 template <
typename... As>
1010 void schedule_(std::unique_ptr<coroutine>&& c, As&&... as)
1013 schedule_coroutine_(std::move(c));
1014 schedule_(std::forward<As>(as)...);
1017 template <
typename A,
typename... As>
1018 void schedule_(A&& a, As&&... as)
1022 detail::is_container<
typename std::decay<A>::type>(),
1024 std::forward<As>(as)...);
1027 inline void schedule_coroutine_(std::unique_ptr<coroutine>&& c)
1032 task_queue_.push_back(c.release());
1036 template <
typename Container,
typename... As>
1037 void schedule_fallback_(std::true_type, Container&& coroutines, As&&... as)
1039 for(
auto& c : coroutines)
1041 schedule_coroutine_(std::move(c));
1044 schedule_(std::forward<As>(as)...);
1047 template <
typename Callable,
typename... As>
1048 void schedule_fallback_(std::false_type, Callable&& cb, As&&... as)
1050 schedule_coroutine_(
1052 std::forward<Callable>(cb),
1053 std::forward<As>(as)...));
1062 bool halt_complete_;
1069 std::condition_variable_any resume_cv_;
1070 bool waiting_for_resume_;
1072 std::condition_variable_any tasks_available_cv_;
1073 bool waiting_for_tasks_;
1074 scheduler::parkable* tasks_available_park_;
1076 std::condition_variable_any halt_complete_cv_;
1079 std::weak_ptr<scheduler> self_wptr_;
1082 park::continuation* park_continuation_ =
nullptr;
1089 std::deque<coroutine*> task_queue_;
1097 return detail::tl_this_scheduler_redirect();
1108 return *(detail::tl_this_scheduler_redirect());
Definition: scheduler.hpp:270
coroutine * this_coroutine()
Definition: coroutine.hpp:178
bool in_coroutine()
Returns true if executing in a coroutine, else false.
Definition: coroutine.hpp:169
bool in_scheduler()
returns true if calling scope is executing inside a running scheduler
Definition: scheduler.hpp:1095
scheduler & this_scheduler()
returns a shared pointer to the scheduler the calling scope is running in
Definition: scheduler.hpp:1106
Definition: coroutine.hpp:43
void yield()
Pause execution and return to run() caller.
Definition: coroutine.hpp:137
virtual void run()
Execute until thunk completes or yield() is called.
Definition: coroutine.hpp:105
static std::unique_ptr< coroutine > make(Callable &&cb, As &&... as)
construct an allocated coroutine from a Callable and arguments
Definition: coroutine.hpp:51
bool complete()
Returns true if thunk is complete, else false since the coroutine yielded early.
Definition: coroutine.hpp:143
virtual interface for implementors of lifecycle
Definition: scheduler.hpp:49
an interface for implementing lifecycle control operations
Definition: scheduler.hpp:38
void halt()
halt and join lifecycle execution
Definition: scheduler.hpp:131
lifecycle(implementation *self, implementation *root)
Definition: scheduler.hpp:89
bool suspend()
temporarily suspend operations
Definition: scheduler.hpp:108
state
an enumeration which represents the lifecycle object's current state
Definition: scheduler.hpp:41
@ running
is initial ready state
Definition: scheduler.hpp:43
@ halted
temporarily halted by a call to suspend()
Definition: scheduler.hpp:45
@ suspended
is running
Definition: scheduler.hpp:44
void resume()
resume any current or future call to run() after an suspend()
Definition: scheduler.hpp:117
lifecycle(implementation *self)
Definition: scheduler.hpp:62
state get_state()
return the state of the lifecycle
Definition: scheduler.hpp:97
a struct allowing comparison of scheduler workload
Definition: scheduler.hpp:734
size_t enqueued() const
count of coroutines actively enqueued for execution
Definition: scheduler.hpp:793
size_t blocked() const
count of coroutines blocked on the scheduler
Definition: scheduler.hpp:796
size_t scheduled() const
count of all scheduled coroutines, blocked or enqueued
Definition: scheduler.hpp:790
measurement(size_t enqueued, size_t scheduled)
Definition: scheduler.hpp:753
Definition: scheduler.hpp:211
fundamental structure for allowing blocking of coroutines running on a scheduler
Definition: scheduler.hpp:209
static void suspend(continuation &c)
the fundamental operation required for blocking a coroutine running in a scheduler
Definition: scheduler.hpp:234
Definition: scheduler.hpp:442
object containing information to block and unblock a coroutine (running in a scheduler) or thread
Definition: scheduler.hpp:268
void unpark(LOCK &lk)
unblock parked operation and reschedule it for execution.
Definition: scheduler.hpp:306
void park(LOCK &lk)
blocking call until unpark, unlocks and relocks given lock as necessary
Definition: scheduler.hpp:283
object responsible for scheduling and executing coroutines
Definition: scheduler.hpp:180
void schedule(A &&a, As &&... as)
schedule allocated coroutine(s)
Definition: scheduler.hpp:714
mce::detail::queue< parkable_notify > parkable_notify_queue
blocked queue for parkable_notify structs
Definition: scheduler.hpp:457
static std::shared_ptr< scheduler > make(lifecycle::implementation *root=nullptr)
return an allocated and initialized scheduler
Definition: scheduler.hpp:466
mce::detail::queue< parkable * > parkable_queue
most straightforward blocked queue
Definition: scheduler.hpp:428
bool run()
Definition: scheduler.hpp:505
measurement measure()
Definition: scheduler.hpp:816
Core mechanism for atomic synchronization.
Definition: atomic.hpp:20