7 #ifndef __MERCURY_COROUTINE_ENGINE_AWAIT__
8 #define __MERCURY_COROUTINE_ENGINE_AWAIT__
33 template <
typename... As>
34 await_coroutine(std::shared_ptr<threadpool>&& t, std::shared_ptr<scheduler>&& s, As&&... as) :
36 original_threadpool(std::move(t)),
37 original_scheduler(std::move(s))
42 template <
typename... As>
43 static std::unique_ptr<coroutine> make(
44 std::shared_ptr<threadpool>&& t,
45 std::shared_ptr<scheduler>&& s,
48 return std::unique_ptr<coroutine>(
53 std::forward<As>(as)...}));
63 virtual inline void run()
65 threadpool*& tl_tp = detail::tl_this_threadpool();
66 scheduler*& tl_cs_re = detail::tl_this_scheduler_redirect();
71 tl_tp = original_threadpool.get();
72 tl_cs_re = original_scheduler.get();
79 std::rethrow_exception(std::current_exception());
86 std::shared_ptr<threadpool> original_threadpool;
87 std::shared_ptr<scheduler> original_scheduler;
88 std::unique_ptr<coroutine> co;
95 std::shared_ptr<scheduler> sch;
99 static bool& tl_is_await();
104 inline void operator()()
107 tl_is_await() =
true;
137 template <
typename Callable,
typename... As>
140 if(
in_scheduler() && !detail::await_worker::tl_is_await())
142 auto& tl_await_tp = await_threadpool::instance();
149 auto w = tl_await_tp.checkout_worker();
152 std::unique_lock<mce::spinlock> lk(slk);
154 w->sch->schedule(detail::await_coroutine::make(
157 : std::shared_ptr<threadpool>(),
160 : std::shared_ptr<scheduler>(),
164 cb(std::forward<As>(args)...);
166 std::unique_lock<mce::spinlock> lk(slk);
173 tl_await_tp.checkin_worker(std::move(w));
181 cb(std::forward<As>(args)...);
185 static inline size_t worker_count()
187 return await_threadpool::instance().get_worker_count();
191 struct no_threads_t { };
194 await_threadpool(no_threads_t) : min_worker_cnt_(0), worker_cnt_(0) { }
195 ~await_threadpool(){}
198 static await_threadpool& instance();
200 inline std::unique_ptr<await_worker> checkout_worker()
203 std::unique_lock<mce::spinlock> lk(lk_);
206 auto w = std::move(workers_.front());
207 workers_.pop_front();
215 return std::unique_ptr<await_worker>(
new await_worker);
218 inline void checkin_worker(std::unique_ptr<await_worker>&& w)
220 std::unique_lock<mce::spinlock> lk(lk_);
222 if(workers_.size() < min_worker_cnt_)
224 workers_.push_back(std::move(w));
232 inline size_t get_worker_count()
234 std::unique_lock<mce::spinlock> lk(lk_);
238 const size_t min_worker_cnt_;
241 std::deque<std::unique_ptr<await_worker>> workers_;
244 template <
typename Callable,
typename... As>
245 detail::function_return_type<Callable,As...>
246 await_(std::false_type, Callable&& cb, As&&... args)
248 typedef detail::function_return_type<Callable,As...> R;
258 template <
typename Callable,
typename... As>
260 await_(std::true_type, Callable&& cb, As&&... args)
300 template <
typename Callable,
typename... As>
301 inline detail::convert_void_return<Callable,As...>
304 using isv =
typename std::is_void<detail::function_return_type<Callable,As...>>;
305 return detail::await_(
306 std::integral_constant<bool,isv::value>(),
307 std::forward<Callable>(cb),
308 std::forward<As>(args)...);
314 inline bool is_await() {
return detail::await_worker::tl_is_await(); }
319 inline size_t await_count() {
return detail::await_threadpool::worker_count(); }
detail::convert_void_return< Callable, As... > await(Callable &&cb, As &&... args)
Execute Callable potentially on a different thread and block current context until operation complete...
Definition: await.hpp:302
size_t await_count()
Definition: await.hpp:319
bool is_await()
Definition: await.hpp:314
bool in_scheduler()
returns true if calling scope is executing inside a running scheduler
Definition: scheduler.hpp:1095
scheduler & this_scheduler()
returns a shared pointer to the scheduler the calling scope is running in
Definition: scheduler.hpp:1106
Definition: coroutine.hpp:43
virtual void run()
Execute until thunk completes or yield() is called.
Definition: coroutine.hpp:105
coroutine(THUNK &&th)
construct a coroutine from a thunk
Definition: coroutine.hpp:62
virtual void run()
Execute until thunk completes or yield() is called.
Definition: await.hpp:63
specialized threadpool implementation for handling blocking tasks
Definition: await.hpp:135
static void schedule(Callable &&cb, As &&... args)
schedule a callable on the await_threadpool
Definition: await.hpp:138
Definition: await.hpp:103
object containing information to block and unblock a coroutine (running in a scheduler) or thread
Definition: scheduler.hpp:268
void unpark(LOCK &lk)
unblock parked operation and reschedule it for execution.
Definition: scheduler.hpp:306
void park(LOCK &lk)
blocking call until unpark, unlocks and relocks given lock as necessary
Definition: scheduler.hpp:283
object responsible for scheduling and executing coroutines
Definition: scheduler.hpp:180
Core mechanism for atomic synchronization.
Definition: atomic.hpp:20
Definition: threadpool.hpp:39
bool in_threadpool()
Return true if calling context is running in a threadpool.
Definition: threadpool.hpp:320
threadpool & this_threadpool()
return a reference to the threadpool the calling code is executing in
Definition: threadpool.hpp:326