Mercury Currency Engine
scheduler.hpp
Go to the documentation of this file.
1 //SPDX-License-Identifier: Apache-2.0
2 //Author: Blayne Dennis
9 #ifndef __MERCURY_COROUTINE_ENGINE_SCHEDULER__
10 #define __MERCURY_COROUTINE_ENGINE_SCHEDULER__
11 
12 // c
13 #include <limits.h>
14 
15 // c++
16 #include <memory>
17 #include <condition_variable>
18 #include <deque>
19 #include <thread>
20 #include <exception>
21 #include <utility>
22 #include <type_traits>
23 
24 // local
25 #include "function_utility.hpp"
26 #include "atomic.hpp"
27 #include "coroutine.hpp"
28 
29 // test, only uncomment for development of this library
30 //#include "dev_print.hpp"
31 
32 namespace mce {
33 
37 struct lifecycle
38 {
40  enum state
41  {
42  ready,
45  halted
46  };
47 
49  struct implementation {
50  virtual ~implementation() { }
51  virtual state get_state_impl() = 0;
52  virtual bool suspend_impl() = 0;
53  virtual void resume_impl() = 0;
54  virtual void halt_impl() = 0;
55  friend struct lifecycle;
56  };
57 
62  lifecycle(implementation* self) : self_(self), root_(nullptr) { }
63 
90  self_(self),
91  root_(root)
92  { }
93 
94  virtual ~lifecycle() { }
95 
97  inline state get_state()
98  {
99  if(root_){ return root_->get_state_impl(); }
100  else { return self_->get_state_impl(); }
101  }
102 
108  inline bool suspend()
109  {
110  if(root_){ return root_->suspend_impl(); }
111  else { return self_->suspend_impl(); }
112  }
113 
117  inline void resume()
118  {
119  if(root_){ root_->resume_impl(); }
120  else { self_->resume_impl(); }
121  }
122 
131  inline void halt()
132  {
133  if(root_){ root_->halt_impl(); }
134  else { self_->halt_impl(); }
135  }
136 
137 private:
138  implementation* self_;
139  implementation* root_;
140 };
141 
142 struct scheduler;
143 
144 namespace detail {
145 
146 // always points to the true scheduler running on the thread
147 scheduler*& tl_this_scheduler();
148 
149 // points to the scheduler accessible by this_scheduler, which is generally
150 // the same as tl_this_scheduler but may be different in certain cases
151 scheduler*& tl_this_scheduler_redirect();
152 
153 template <typename T>
154 using queue = std::deque<T>;
155 
156 }
157 
158 
159 //-----------------------------------------------------------------------------
160 // scheduler
161 //-----------------------------------------------------------------------------
162 
164 bool in_scheduler();
165 
167 scheduler& this_scheduler();
168 
180 {
208  struct park
209  {
211  {
212  // pointer to coroutine storage location
213  std::unique_ptr<mce::coroutine>* coroutine;
214 
215  // memory to source scheduler storage location
216  std::weak_ptr<mce::scheduler>* source;
217 
218  // arbitrary memory
219  void* memory;
220 
221  // cleanup procedure executed after acquiring coroutine
222  void(*cleanup)(void*);
223  };
224 
234  static inline void suspend(continuation& c)
235  {
236  // can safely write and read these variables without synchronization
237  // because they are only read/written by this thread, and this
238  // thread will only read/write to said values during a `run()` call
239  detail::tl_this_scheduler()->park_continuation_ = &c;
240  this_coroutine()->yield();
241  }
242  };
243 
267  struct parkable
268  {
269  class unpark_exception : public std::exception
270  {
271  public:
272  virtual const char* what() const throw()
273  {
274  return "Cannot unpark a parkable that is not parked";
275  }
276  };
277 
278  parkable() { }
279  ~parkable() { }
280 
282  template <typename LOCK>
283  void park(LOCK& lk)
284  {
285  if(in_scheduler())
286  {
287  wc = waiting_context::make<scheduled_context<LOCK>>();
288  }
289  else if(in_coroutine())
290  {
291  wc = waiting_context::make<coroutine_context<LOCK>>();
292  }
293  else
294  {
295  wc = waiting_context::make<thread_context<LOCK>>();
296  }
297 
298  wc->wait((void*)&lk);
299  }
300 
305  template <typename LOCK>
306  void unpark(LOCK& lk)
307  {
308  if(wc) { wc->notify(); }
309  else
310  {
311  lk.unlock();
312  throw unpark_exception();
313  }
314  }
315 
316  private:
317  struct waiting_context
318  {
319  template <typename CONTEXT>
320  static std::unique_ptr<waiting_context> make()
321  {
322  return std::unique_ptr<waiting_context>(
323  static_cast<waiting_context*>(new CONTEXT));
324  }
325 
326  virtual ~waiting_context() { }
327  virtual void wait(void* m) = 0;
328  virtual void notify() = 0;
329  };
330 
331  template <typename LOCK>
332  struct scheduled_context : public waiting_context
333  {
334  virtual ~scheduled_context(){}
335 
336  inline void wait(void* m)
337  {
338  LOCK& lk = *((LOCK*)m);
339 
340  // Indirectly retrieve this coroutine's pointer, with the
341  // side effect of the calling context releasing all
342  // ownership of said pointer.
343  park::continuation pc{&co, &wsch, m, unlocker<LOCK>};
344 
345  //blocking call until unpark
346  park::suspend(pc);
347 
348  // reacquire lock after unpark()
349  lk.lock();
350  }
351 
352  inline void notify()
353  {
354  auto sch = wsch.lock();
355 
356  if(sch) { sch->schedule(std::move(co)); }
357  }
358 
359  std::unique_ptr<coroutine> co;
360  std::weak_ptr<scheduler> wsch;
361  };
362 
363  template <typename LOCK>
364  struct coroutine_context : public waiting_context
365  {
366  virtual ~coroutine_context() { }
367 
368  inline void wait(void* m)
369  {
370  LOCK& lk = *((LOCK*)m);
371 
372  co_not_ready = true;
373  while(co_not_ready)
374  {
375  lk.unlock();
376  this_coroutine()->yield();
377  lk.lock();
378  }
379  }
380 
381  inline void notify()
382  {
383  // there is no coroutine to internally manage, break
384  // out of yield loop
385  co_not_ready = false;
386  }
387 
388  bool co_not_ready = false;
389  };
390 
391  template <typename LOCK>
392  struct thread_context : public waiting_context
393  {
394  virtual ~thread_context() { }
395 
396  inline void wait(void* m)
397  {
398  LOCK& lk = *((LOCK*)m);
399 
400  do
401  {
402  cv.wait(lk);
403  } while(!ready);
404 
405  ready = false;
406  }
407 
408  inline void notify()
409  {
410  ready = true;
411  cv.notify_one();
412  }
413 
414  bool ready = false;
415  std::condition_variable_any cv;
416  };
417 
418  template <typename LOCK>
419  static inline void unlocker(void* lk)
420  {
421  ((LOCK*)lk)->unlock();
422  }
423 
424  std::unique_ptr<waiting_context> wc;
425  };
426 
428  typedef mce::detail::queue<parkable*> parkable_queue;
429 
441  struct parkable_notify : public std::function<void(void*)>
442  {
443  template <typename... As>
444  parkable_notify(parkable* p, As&&... as) :
445  std::function<void(void*)>(std::forward<As>(as)...),
446  parkable_(p)
447  { }
448 
449  // convert to parkable*
450  inline operator parkable*() { return parkable_; }
451 
452  private:
453  parkable* parkable_;
454  };
455 
457  typedef mce::detail::queue<parkable_notify> parkable_notify_queue;
458 
459  virtual ~scheduler()
460  {
461  // ensure all coroutines are manually deleted
462  clear_task_queue_();
463  }
464 
466  static inline std::shared_ptr<scheduler> make(lifecycle::implementation* root=nullptr)
467  {
468  scheduler* sp = new scheduler(root);
469  std::shared_ptr<scheduler> s(sp);
470  s->self_wptr_ = s;
471  return s;
472  };
473 
505  inline bool run()
506  {
507  // stack variables
508 
509  // the currently running coroutine
510  coroutine* cur_co = nullptr;
511 
512  // only call function tl_this_scheduler() once to acquire reference to
513  // thread shared scheduler pointer
514  scheduler*& tl_cs = detail::tl_this_scheduler();
515  scheduler*& tl_cs_re = detail::tl_this_scheduler_redirect();
516 
517  // acquire the parent, if any, of the current coroutine scheduler
518  scheduler* parent_cs = tl_cs;
519 
520  // clarity flag to indicate this scheduler is running inside another
521  // scheduler on the same thread
522  bool child = parent_cs;
523 
524  // both can be pushed without a lock because self_wptr_ is only written
525  // in threadsafe scheduler::make()
526 
527  // temporarily reassign thread_local this_scheduler state to this scheduler
528  auto push_scheduler_state_ = [&]
529  {
530  tl_cs = this;
531  tl_cs_re = this;
532  };
533 
534  // restore parent thread_local this_scheduler state
535  auto pop_scheduler_state_ = [&]
536  {
537  tl_cs = parent_cs;
538  tl_cs_re = parent_cs;
539  };
540 
541  push_scheduler_state_();
542 
543  std::unique_lock<mce::spinlock> lk(lk_);
544 
545  // this should be trivially inlined by the compiler
546  auto execute_co = [&]
547  {
548  // Acquire a new task. Don't bother swapping, it's slower than
549  // copying the pointer
550  cur_co = task_queue_.front();
551 
552  // unlock scheduler state when running a task
553  lk.unlock();
554 
555  // execute coroutine
556  cur_co->run();
557 
558  // if park_continuation_ is set that means the coroutine is
559  // requesting to be parked.
560  if(park_continuation_)
561  {
562  auto& pc = *park_continuation_;
563  *(pc.coroutine) = std::unique_ptr<coroutine>(cur_co);
564  *(pc.source) = self_wptr_;
565  pc.cleanup(pc.memory);
566  park_continuation_ = nullptr;
567  lk.lock();
568  task_queue_.pop_front();
569  }
570  else if(cur_co->complete())
571  {
572  // cleanup coroutine
573  delete cur_co;
574  lk.lock();
575  task_queue_.pop_front();
576  --scheduled_;
577  }
578  else
579  {
580  // re-enqueue coroutine
581  lk.lock();
582  task_queue_.pop_front();
583  task_queue_.push_back(cur_co);
584  }
585  };
586 
587  // block until no longer suspended
588  while(state_ == lifecycle::state::suspended) { resume_wait_(lk); }
589 
590  // if halt() has been called, return immediately
591  // only one caller of run() is possible
592  if(state_ == lifecycle::state::ready)
593  {
594  // the current caller of run() claims this scheduler, any calls to
595  // run() while it is already running will halt the scheduler
596  state_ = lifecycle::state::running;
597 
598  try
599  {
600  if(child)
601  {
602  // this scheduler is running inside another scheduler
603  while(can_continue_())
604  {
605  if(task_queue_.size())
606  {
607  execute_co();
608 
609  lk.unlock();
610 
611  pop_scheduler_state_();
612 
613  // Allow parent scheduler to run. This must be
614  // called when scheduler lock is *not* held and
615  // parent tl_this_scheduler() state is restored.
616  this_coroutine()->yield();
617 
618  push_scheduler_state_();
619 
620  lk.lock();
621  }
622  else
623  {
624  pop_scheduler_state_();
625 
626  while(task_queue_.empty() && can_continue_())
627  {
628  tasks_available_child_wait_(lk);
629  }
630 
631  push_scheduler_state_();
632  }
633  }
634  }
635  else
636  {
637  // root scheduler evaluation loop
638  while(can_continue_())
639  {
640  if(task_queue_.size()) { execute_co(); }
641  else
642  {
643  while(task_queue_.empty() && can_continue_())
644  {
645  tasks_available_wait_(lk);
646  }
647  }
648  }
649  }
650  }
651  catch(...) // catch all other exceptions
652  {
653  // free memory we errored out of
654  if(cur_co) { delete cur_co; }
655 
656  // reset state in case of uncaught exception
657  if(tl_cs == this)
658  {
659  tl_cs = parent_cs; // pop
660  }
661 
662  std::rethrow_exception(std::current_exception());
663  }
664  }
665 
666  pop_scheduler_state_();
667 
668  if(state_ == lifecycle::state::suspended)
669  {
670  // reset scheduler state so run() can be called again
671  reset_flags_();
672  return true;
673  }
674  else
675  {
676  // clear task queue so coroutine controlled memory can be released
677  clear_task_queue_();
678 
679  halt_complete_ = true;
680 
681  // notify any listeners that the scheduler is halted
682  halt_complete_cv_.notify_all();
683 
684  while(halt_complete_waiters_.size())
685  {
686  halt_complete_waiters_.front()->unpark(lk_);
687  halt_complete_waiters_.pop_front();
688  }
689 
690  return false;
691  }
692  }
693 
713  template <typename A, typename... As>
714  void schedule(A&& a, As&&... as)
715  {
716  std::unique_lock<mce::spinlock> lk(lk_);
717 
718  if(state_ != lifecycle::state::halted)
719  {
720  schedule_(std::forward<A>(a), std::forward<As>(as)...);
721  }
722  }
723 
733  struct measurement
734  {
735  measurement() : weight_(0) { }
736  measurement(const measurement& rhs) : weight_(rhs.weight_) { }
737  measurement(measurement&& rhs) : weight_(rhs.weight_) { }
738 
753  measurement(size_t enqueued, size_t scheduled) :
754  weight_(
755  (enqueued > right_mask ? right_mask : enqueued) << half_width |
756  (scheduled > right_mask ? right_mask : scheduled))
757  { }
758 
759  inline measurement& operator=(const measurement& rhs)
760  {
761  weight_ = rhs.weight_;
762  return *this;
763  }
764 
765  inline measurement& operator=(measurement&& rhs)
766  {
767  weight_ = rhs.weight_;
768  return *this;
769  }
770 
771  inline bool operator ==(const measurement& rhs)
772  {
773  return weight_ == rhs.weight_;
774  }
775 
776  inline bool operator <(const measurement& rhs)
777  {
778  return weight_ < rhs.weight_;
779  }
780 
781  inline bool operator <=(const measurement& rhs)
782  {
783  return weight_ <= rhs.weight_;
784  }
785 
787  inline operator size_t() const { return weight_; }
788 
790  inline size_t scheduled() const { return weight_ & right_mask; }
791 
793  inline size_t enqueued() const { return (weight_ & left_mask) >> half_width; }
794 
796  inline size_t blocked() const { return scheduled() - enqueued(); }
797 
798  private:
799  static constexpr size_t size_width = sizeof(size_t) * 8;
800  static constexpr size_t half_width = size_width / 2;
801  static constexpr size_t right_mask = SIZE_MAX >> half_width;
802  static constexpr size_t left_mask = SIZE_MAX << half_width;
803 
804  size_t weight_;
805  };
806 
817  {
818  lk_.lock();
819 
820  size_t enqueued = task_queue_.size();
821  size_t scheduled = scheduled_;
822 
823  lk_.unlock();
824 
825  return { enqueued, scheduled };
826  }
827 
829  inline operator std::shared_ptr<scheduler>() { return self_wptr_.lock(); }
830 
831 protected:
832  inline state get_state_impl()
833  {
834  std::unique_lock<mce::spinlock> lk(lk_);
835  return state_;
836  }
837 
838  inline bool suspend_impl()
839  {
840  std::unique_lock<mce::spinlock> lk(lk_);
841 
842  if(state_ == lifecycle::state::halted) { return false; }
843  else
844  {
845  state_ = lifecycle::state::suspended;
846  // wakeup scheduler if necessary from waiting for tasks to force
847  // run() to exit
848  tasks_available_notify_();
849  return true;
850  }
851  }
852 
853  inline void resume_impl()
854  {
855  std::unique_lock<mce::spinlock> lk(lk_);
856 
857  if(state_ == lifecycle::state::suspended)
858  {
859  state_ = lifecycle::state::ready;
860  resume_notify_();
861  }
862  }
863 
864  inline void halt_impl()
865  {
866  std::unique_lock<mce::spinlock> lk(lk_);
867 
868  // set the scheduler to the permanent halted state
869  state_ = lifecycle::state::halted;
870 
871  // wakeup from suspend if necessary
872  resume_notify_();
873 
874  // handle case where halt is called from within running scheduler
875  if(in_scheduler() && &(this_scheduler()) == this)
876  {
877  // no need to notify here, running on the thread we are notifying
878  lk.unlock();
879  this_coroutine()->yield();
880  }
881  else
882  {
883  // wakeup scheduler if necessary
884  tasks_available_notify_();
885 
886  // handle case where halt() is called by a coroutine running in
887  // a different scheduler
888  if(in_coroutine())
889  {
890  while(!halt_complete_)
891  {
892  parkable p;
893  halt_complete_waiters_.push_back(&p);
894  p.park(lk);
895  }
896  }
897  // handle final case where halt() is called from another std::thread
898  else
899  {
900  while(!halt_complete_){ halt_complete_cv_.wait(lk); }
901  }
902  }
903  }
904 
905 private:
906  scheduler(lifecycle::implementation* root) :
907  lifecycle(this,root),
908  state_(lifecycle::state::ready), // state_ persists between suspends
909  scheduled_(0) // scheduled_ persists between suspends
910  {
911  reset_flags_(); // initialize flags
912  }
913 
914  // Reset scheduler state flags, etc. Does not reset scheduled coroutine
915  // queues.
916  //
917  // This method can ONLY be safely called by the constructor or run()
918  inline void reset_flags_()
919  {
920  halt_complete_ = false;
921  waiting_for_resume_ = false;
922  waiting_for_tasks_ = false;
923  tasks_available_park_ = nullptr;
924  }
925 
926  // abstract frequently used inlined state to this function for correctness
927  inline bool can_continue_() { return state_ < lifecycle::state::suspended; }
928 
929  void clear_task_queue_()
930  {
931  while(task_queue_.size())
932  {
933  delete task_queue_.front();
934  task_queue_.pop_front();
935  }
936  }
937 
938  // attempt to wait for resumption
939  template <typename LOCK>
940  void resume_wait_(LOCK& lk)
941  {
942  waiting_for_resume_ = true;
943  resume_cv_.wait(lk);
944  }
945 
946  // notify caller of run() that resume() has been called
947  inline void resume_notify_()
948  {
949  // only do notify if necessary
950  if(waiting_for_resume_)
951  {
952  waiting_for_resume_ = false;
953  resume_cv_.notify_one();
954  }
955  }
956 
957  inline void update_running_state_()
958  {
959  // reacquire running state in edgecase where suspend() and resume()
960  // happened quickly
961  if(state_ == lifecycle::state::ready)
962  {
963  state_ = lifecycle::state::running;
964  }
965  }
966 
967  // attempt to wait for more tasks
968  template <typename LOCK>
969  void tasks_available_wait_(LOCK& lk)
970  {
971  waiting_for_tasks_ = true;
972  tasks_available_cv_.wait(lk);
973  update_running_state_();
974  }
975 
976  template <typename LOCK>
977  void tasks_available_child_wait_(LOCK& lk)
978  {
979  waiting_for_tasks_ = true;
980  parkable p;
981  tasks_available_park_ = &p;
982  p.park(lk);
983  update_running_state_();
984  }
985 
986  // notify caller of run() that tasks are available
987  inline void tasks_available_notify_()
988  {
989  // only do notify if necessary
990  if(waiting_for_tasks_)
991  {
992  waiting_for_tasks_ = false;
993 
994  if(tasks_available_park_)
995  {
996  tasks_available_park_->unpark(lk_);
997  tasks_available_park_ = nullptr;
998  }
999  else
1000  {
1001  tasks_available_cv_.notify_one();
1002  }
1003  }
1004  }
1005 
1006  // when all coroutines are scheduled, notify
1007  inline void schedule_() { tasks_available_notify_(); }
1008 
1009  template <typename... As>
1010  void schedule_(std::unique_ptr<coroutine>&& c, As&&... as)
1011  {
1012  // detect if A is a container or a Callable
1013  schedule_coroutine_(std::move(c));
1014  schedule_(std::forward<As>(as)...);
1015  }
1016 
1017  template <typename A, typename... As>
1018  void schedule_(A&& a, As&&... as)
1019  {
1020  // detect if A is a container or a Callable
1021  schedule_fallback_(
1022  detail::is_container<typename std::decay<A>::type>(),
1023  std::forward<A>(a),
1024  std::forward<As>(as)...);
1025  }
1026 
1027  inline void schedule_coroutine_(std::unique_ptr<coroutine>&& c)
1028  {
1029  if(c)
1030  {
1031  ++scheduled_;
1032  task_queue_.push_back(c.release());
1033  }
1034  }
1035 
1036  template <typename Container, typename... As>
1037  void schedule_fallback_(std::true_type, Container&& coroutines, As&&... as)
1038  {
1039  for(auto& c : coroutines)
1040  {
1041  schedule_coroutine_(std::move(c));
1042  }
1043 
1044  schedule_(std::forward<As>(as)...);
1045  }
1046 
1047  template <typename Callable, typename... As>
1048  void schedule_fallback_(std::false_type, Callable&& cb, As&&... as)
1049  {
1050  schedule_coroutine_(
1052  std::forward<Callable>(cb),
1053  std::forward<As>(as)...));
1054 
1055  schedule_();
1056  }
1057 
1058  // the current state of the scheduler
1059  lifecycle::state state_;
1060 
1061  // true if halt has completed, used for synchronizing calls to halt()
1062  bool halt_complete_;
1063 
1064  // a count of coroutines on this scheduler, including coroutines
1065  // actively queued to be run AND parked coroutines associated with this
1066  // scheduler
1067  size_t scheduled_;
1068 
1069  std::condition_variable_any resume_cv_;
1070  bool waiting_for_resume_;
1071 
1072  std::condition_variable_any tasks_available_cv_;
1073  bool waiting_for_tasks_; // true if waiting on tasks_available_cv_
1074  scheduler::parkable* tasks_available_park_;
1075 
1076  std::condition_variable_any halt_complete_cv_;
1077  scheduler::parkable_queue halt_complete_waiters_;
1078 
1079  std::weak_ptr<scheduler> self_wptr_;
1080 
1081  // the continuation requested by the running coroutine
1082  park::continuation* park_continuation_ = nullptr;
1083 
1084  // queue holding scheduled coroutines. Raw coroutine pointers are used
1085  // internally because we don't want even the cost of rvalue swapping
1086  // unique pointers during internal operations (potentially 3 instructions),
1087  // when we can deep copy a word size value intead. This requires careful
1088  // calls to `delete` when a coroutine goes out of scope in this object.
1089  std::deque<coroutine*> task_queue_;
1090 
1091  mce::spinlock lk_;
1092 };
1093 
1095 inline bool in_scheduler()
1096 {
1097  return detail::tl_this_scheduler_redirect();
1098 }
1099 
1107 {
1108  return *(detail::tl_this_scheduler_redirect());
1109 }
1110 
1111 }
1112 
1113 #endif
Definition: scheduler.hpp:270
coroutine * this_coroutine()
Definition: coroutine.hpp:178
bool in_coroutine()
Returns true if executing in a coroutine, else false.
Definition: coroutine.hpp:169
bool in_scheduler()
returns true if calling scope is executing inside a running scheduler
Definition: scheduler.hpp:1095
scheduler & this_scheduler()
returns a shared pointer to the scheduler the calling scope is running in
Definition: scheduler.hpp:1106
Definition: coroutine.hpp:43
void yield()
Pause execution and return to run() caller.
Definition: coroutine.hpp:137
virtual void run()
Execute until thunk completes or yield() is called.
Definition: coroutine.hpp:105
static std::unique_ptr< coroutine > make(Callable &&cb, As &&... as)
construct an allocated coroutine from a Callable and arguments
Definition: coroutine.hpp:51
bool complete()
Returns true if thunk is complete, else false since the coroutine yielded early.
Definition: coroutine.hpp:143
virtual interface for implementors of lifecycle
Definition: scheduler.hpp:49
an interface for implementing lifecycle control operations
Definition: scheduler.hpp:38
void halt()
halt and join lifecycle execution
Definition: scheduler.hpp:131
lifecycle(implementation *self, implementation *root)
Definition: scheduler.hpp:89
bool suspend()
temporarily suspend operations
Definition: scheduler.hpp:108
state
an enumeration which represents the lifecycle object's current state
Definition: scheduler.hpp:41
@ running
is initial ready state
Definition: scheduler.hpp:43
@ halted
temporarily halted by a call to suspend()
Definition: scheduler.hpp:45
@ suspended
is running
Definition: scheduler.hpp:44
void resume()
resume any current or future call to run() after an suspend()
Definition: scheduler.hpp:117
lifecycle(implementation *self)
Definition: scheduler.hpp:62
state get_state()
return the state of the lifecycle
Definition: scheduler.hpp:97
a struct allowing comparison of scheduler workload
Definition: scheduler.hpp:734
size_t enqueued() const
count of coroutines actively enqueued for execution
Definition: scheduler.hpp:793
size_t blocked() const
count of coroutines blocked on the scheduler
Definition: scheduler.hpp:796
size_t scheduled() const
count of all scheduled coroutines, blocked or enqueued
Definition: scheduler.hpp:790
measurement(size_t enqueued, size_t scheduled)
Definition: scheduler.hpp:753
Definition: scheduler.hpp:211
fundamental structure for allowing blocking of coroutines running on a scheduler
Definition: scheduler.hpp:209
static void suspend(continuation &c)
the fundamental operation required for blocking a coroutine running in a scheduler
Definition: scheduler.hpp:234
Definition: scheduler.hpp:442
object containing information to block and unblock a coroutine (running in a scheduler) or thread
Definition: scheduler.hpp:268
void unpark(LOCK &lk)
unblock parked operation and reschedule it for execution.
Definition: scheduler.hpp:306
void park(LOCK &lk)
blocking call until unpark, unlocks and relocks given lock as necessary
Definition: scheduler.hpp:283
object responsible for scheduling and executing coroutines
Definition: scheduler.hpp:180
void schedule(A &&a, As &&... as)
schedule allocated coroutine(s)
Definition: scheduler.hpp:714
mce::detail::queue< parkable_notify > parkable_notify_queue
blocked queue for parkable_notify structs
Definition: scheduler.hpp:457
static std::shared_ptr< scheduler > make(lifecycle::implementation *root=nullptr)
return an allocated and initialized scheduler
Definition: scheduler.hpp:466
mce::detail::queue< parkable * > parkable_queue
most straightforward blocked queue
Definition: scheduler.hpp:428
bool run()
Definition: scheduler.hpp:505
measurement measure()
Definition: scheduler.hpp:816
Core mechanism for atomic synchronization.
Definition: atomic.hpp:20