Mercury Currency Engine
await.hpp
Go to the documentation of this file.
1 //SPDX-License-Identifier: Apache-2.0
2 //Author: Blayne Dennis
7 #ifndef __MERCURY_COROUTINE_ENGINE_AWAIT__
8 #define __MERCURY_COROUTINE_ENGINE_AWAIT__
9 
10 #include <deque>
11 #include <memory>
12 
13 #include "function_utility.hpp"
14 #include "scheduler.hpp"
15 #include "threadpool.hpp"
16 #include "unbuffered_channel.hpp"
17 
18 namespace mce {
19 
20 //-----------------------------------------------------------------------------
21 // await operations
22 //-----------------------------------------------------------------------------
23 
24 namespace detail {
25 
26 /*
27  A coroutine which redirects user this_scheduler() and this_threadpool()
28  operations to point to the coroutine's original scheduler/threadpool,
29  implicitly allowing user scheduling to function correctly.
30  */
31 struct await_coroutine : public coroutine
32 {
33  template <typename... As>
34  await_coroutine(std::shared_ptr<threadpool>&& t, std::shared_ptr<scheduler>&& s, As&&... as) :
35  coroutine(std::forward<As>(as)...),
36  original_threadpool(std::move(t)),
37  original_scheduler(std::move(s))
38  { }
39 
40  virtual ~await_coroutine(){}
41 
42  template <typename... As>
43  static std::unique_ptr<coroutine> make(
44  std::shared_ptr<threadpool>&& t,
45  std::shared_ptr<scheduler>&& s,
46  As&&... as)
47  {
48  return std::unique_ptr<coroutine>(
49  static_cast<coroutine*>(
50  new await_coroutine{
51  std::move(t),
52  std::move(s),
53  std::forward<As>(as)...}));
54 
55  }
56 
57  /*
58  Every time this coroutine runs we manage the scheduler redirect pointer and
59  threadpool pointer so that if they attempt to access this_scheduler(),
60  or this_threadpool(), they will actually get the coroutine's original
61  scheduler/threadpool, not the await_worker's scheduler/threadpool.
62  */
63  virtual inline void run()
64  {
65  threadpool*& tl_tp = detail::tl_this_threadpool();
66  scheduler*& tl_cs_re = detail::tl_this_scheduler_redirect();
67 
68  threadpool* parent_tp = tl_tp;
69  scheduler* parent_cs = tl_cs_re;
70 
71  tl_tp = original_threadpool.get();
72  tl_cs_re = original_scheduler.get();
73 
74  try { coroutine::run(); }
75  catch(...)
76  {
77  tl_cs_re = parent_cs;
78  tl_tp = parent_tp;
79  std::rethrow_exception(std::current_exception());
80  }
81 
82  tl_cs_re = parent_cs;
83  tl_tp = parent_tp;
84  }
85 
86  std::shared_ptr<threadpool> original_threadpool;
87  std::shared_ptr<scheduler> original_scheduler;
88  std::unique_ptr<coroutine> co;
89 };
90 
91 // workers implicitly start a scheduler on a new thread during
92 // construction and shutdown said scheduler during destruction.
94 {
95  std::shared_ptr<scheduler> sch;
96  std::thread thd;
97 
98  // thread_local value defaults to false
99  static bool& tl_is_await();
100 
101  // a functor which takes a copy of the scheduler pointer
102  struct worker_task
103  {
104  inline void operator()()
105  {
106  // thread_local value is true when executing as an IO worker thread
107  tl_is_await() = true;
108  sch->run();
109  }
110 
111  scheduler* sch;
112  };
113 
114  await_worker() :
115  // construct the scheduler
116  sch(scheduler::make()),
117  // spawn a worker thread with a running scheduler
118  thd(std::thread(worker_task{ sch.get() }))
119  { }
120 
121  ~await_worker()
122  {
123  if(sch)
124  {
125  sch->halt();
126  thd.join();
127  }
128  }
129 };
130 
135 {
137  template <typename Callable, typename... As>
138  static void schedule(Callable&& cb, As&&... args)
139  {
140  if(in_scheduler() && !detail::await_worker::tl_is_await())
141  {
142  auto& tl_await_tp = await_threadpool::instance();
143 
144  // synchronization between caller and callee
146  mce::spinlock slk;
147 
148  // acquire a worker thread running a scheduler
149  auto w = tl_await_tp.checkout_worker();
150 
151  {
152  std::unique_lock<mce::spinlock> lk(slk);
153 
154  w->sch->schedule(detail::await_coroutine::make(
156  ? (std::shared_ptr<threadpool>)mce::this_threadpool()
157  : std::shared_ptr<threadpool>(),
159  ? (std::shared_ptr<scheduler>)mce::this_scheduler()
160  : std::shared_ptr<scheduler>(),
161  [&]
162  {
163  // execute Callable in coroutine running on the worker thread
164  cb(std::forward<As>(args)...);
165 
166  std::unique_lock<mce::spinlock> lk(slk);
167  pk.unpark(lk); // resume caller
168  }));
169 
170  pk.park(lk); // block until callee unparks
171  }
172 
173  tl_await_tp.checkin_worker(std::move(w));
174  }
175  else
176  {
177  // execute Callable directly if not in a coroutine running in a
178  // scheduler OR if we are already executing in a parent mce::await()
179  // call. No need to worry about blocking other running coroutines...
180  // we already have a dedicated system thread.
181  cb(std::forward<As>(args)...);
182  }
183  }
184 
185  static inline size_t worker_count()
186  {
187  return await_threadpool::instance().get_worker_count();
188  }
189 
190 private:
191  struct no_threads_t { };
192 
193  await_threadpool();
194  await_threadpool(no_threads_t) : min_worker_cnt_(0), worker_cnt_(0) { }
195  ~await_threadpool(){}
196 
198  static await_threadpool& instance();
199 
200  inline std::unique_ptr<await_worker> checkout_worker()
201  {
202  {
203  std::unique_lock<mce::spinlock> lk(lk_);
204  if(workers_.size())
205  {
206  auto w = std::move(workers_.front());
207  workers_.pop_front();
208  // return the first available worker
209  return w;
210  }
211  }
212 
213  // as a fallback generate a new await worker thread
214  ++worker_cnt_;
215  return std::unique_ptr<await_worker>(new await_worker);
216  }
217 
218  inline void checkin_worker(std::unique_ptr<await_worker>&& w)
219  {
220  std::unique_lock<mce::spinlock> lk(lk_);
221 
222  if(workers_.size() < min_worker_cnt_)
223  {
224  workers_.push_back(std::move(w));
225  }
226  else
227  {
228  --worker_cnt_;
229  }
230  }
231 
232  inline size_t get_worker_count()
233  {
234  std::unique_lock<mce::spinlock> lk(lk_);
235  return worker_cnt_;
236  }
237 
238  const size_t min_worker_cnt_;
239  size_t worker_cnt_;
240  mce::spinlock lk_;
241  std::deque<std::unique_ptr<await_worker>> workers_;
242 };
243 
244 template <typename Callable, typename... As>
245 detail::function_return_type<Callable,As...>
246 await_(std::false_type, Callable&& cb, As&&... args)
247 {
248  typedef detail::function_return_type<Callable,As...> R;
249 
250  R r;
251 
252  // assign the return value to the stack
253  await_threadpool::schedule([&]{ r = cb(std::forward<As>(args)...); });
254 
255  return r;
256 }
257 
258 template <typename Callable, typename... As>
259 int
260 await_(std::true_type, Callable&& cb, As&&... args)
261 {
262  await_threadpool::schedule([&]{ cb(std::forward<As>(args)...); });
263  return 0;
264 }
265 
266 }
267 
300 template <typename Callable, typename... As>
301 inline detail::convert_void_return<Callable,As...>
302 await(Callable&& cb, As&&... args)
303 {
304  using isv = typename std::is_void<detail::function_return_type<Callable,As...>>;
305  return detail::await_(
306  std::integral_constant<bool,isv::value>(),
307  std::forward<Callable>(cb),
308  std::forward<As>(args)...);
309 }
310 
314 inline bool is_await() { return detail::await_worker::tl_is_await(); }
315 
319 inline size_t await_count() { return detail::await_threadpool::worker_count(); }
320 
321 }
322 
323 #endif
detail::convert_void_return< Callable, As... > await(Callable &&cb, As &&... args)
Execute Callable potentially on a different thread and block current context until operation complete...
Definition: await.hpp:302
size_t await_count()
Definition: await.hpp:319
bool is_await()
Definition: await.hpp:314
bool in_scheduler()
returns true if calling scope is executing inside a running scheduler
Definition: scheduler.hpp:1095
scheduler & this_scheduler()
returns a shared pointer to the scheduler the calling scope is running in
Definition: scheduler.hpp:1106
Definition: coroutine.hpp:43
virtual void run()
Execute until thunk completes or yield() is called.
Definition: coroutine.hpp:105
coroutine(THUNK &&th)
construct a coroutine from a thunk
Definition: coroutine.hpp:62
Definition: await.hpp:32
virtual void run()
Execute until thunk completes or yield() is called.
Definition: await.hpp:63
specialized threadpool implementation for handling blocking tasks
Definition: await.hpp:135
static void schedule(Callable &&cb, As &&... args)
schedule a callable on the await_threadpool
Definition: await.hpp:138
Definition: await.hpp:94
object containing information to block and unblock a coroutine (running in a scheduler) or thread
Definition: scheduler.hpp:268
void unpark(LOCK &lk)
unblock parked operation and reschedule it for execution.
Definition: scheduler.hpp:306
void park(LOCK &lk)
blocking call until unpark, unlocks and relocks given lock as necessary
Definition: scheduler.hpp:283
object responsible for scheduling and executing coroutines
Definition: scheduler.hpp:180
Core mechanism for atomic synchronization.
Definition: atomic.hpp:20
Definition: threadpool.hpp:39
bool in_threadpool()
Return true if calling context is running in a threadpool.
Definition: threadpool.hpp:320
threadpool & this_threadpool()
return a reference to the threadpool the calling code is executing in
Definition: threadpool.hpp:326