7 #ifndef __MERCURY_COROUTINE_ENGINE_UNBUFFERED_CHANNEL__
8 #define __MERCURY_COROUTINE_ENGINE_UNBUFFERED_CHANNEL__
13 #include <type_traits>
54 ctx(std::move(rhs.ctx))
61 ctx = std::make_shared<unbuffered_channel_context>();
65 inline void*
context()
const {
return (
void*)(ctx.get()); }
68 inline const std::type_info&
type_info()
const {
return typeid(*this); }
71 inline void close()
const {
return ctx->close(); }
74 inline bool closed()
const {
return ctx->closed(); }
77 inline bool send(
const T& s)
const
79 return ctx->send(s,
true) == result::success;
83 inline bool send(T&& s)
const
85 return ctx->send(std::move(s),
true) == result::success;
89 inline bool recv(T& r)
const
91 return ctx->recv(r,
true) == result::success;
98 inline result try_send(T&& s)
const {
return ctx->send(std::move(s),
false); }
112 this->ctx = std::move(rhs.ctx);
121 send_pair(
void* rhs_target,
bool rhs_is_rvalue) :
122 is_rvalue(rhs_is_rvalue),
127 struct unbuffered_channel_context
134 unbuffered_channel_context() : closed_flag(false) {}
138 std::unique_lock<mce::spinlock> lk(spin_lk);
150 unpark_queue(parked_send);
151 unpark_queue(parked_recv);
156 std::unique_lock<mce::spinlock> lk(spin_lk);
160 inline result send_(
void* s,
bool block,
bool is_rvalue)
162 std::unique_lock<mce::spinlock> lk(spin_lk);
167 return result::closed;
171 if(parked_recv.empty())
176 scheduler::parkable p;
177 parked_send.push_back({&p,[&](
void* m)
181 if(is_rvalue){ *((T*)m) = std::move(*((T*)s)); }
182 else{ *((T*)m) = *((
const T*)s); }
184 else{ failed =
true; }
192 return result::closed;
200 return result::failure;
206 send_pair sp(s,is_rvalue);
207 parked_recv.front()((
void*)(&sp));
208 parked_recv.pop_front();
214 return result::success;
219 return send_((
void*)&s,block,
false);
224 return send_((
void*)&s,block,
true);
229 std::unique_lock<mce::spinlock> lk(spin_lk);
234 return result::closed;
238 if(parked_send.empty())
243 scheduler::parkable p;
244 parked_recv.push_back({&p,[&](
void* m)
248 send_pair* sp = (send_pair*)m;
251 r = std::move(*((T*)(sp->target)));
253 else{ r = *((
const T*)(sp->target)); }
255 else { failed =
true; }
263 return result::closed;
271 return result::failure;
277 parked_send.front()(&r);
278 parked_send.pop_front();
284 return result::success;
289 mutable std::shared_ptr<unbuffered_channel_context> ctx;
result
enum for channel operation results
Definition: base_channel.hpp:23
void yield()
Definition: coroutine.hpp:186
Definition: base_channel.hpp:49
Definition: base_channel.hpp:229
mce::detail::queue< parkable_notify > parkable_notify_queue
blocked queue for parkable_notify structs
Definition: scheduler.hpp:457
Core mechanism for atomic synchronization.
Definition: atomic.hpp:20
Definition: unbuffered_channel.hpp:39
bool send(T &&s) const
blocking move data through channel
Definition: unbuffered_channel.hpp:83
result try_send(T &&s) const
nonblocking attempt to move data through channel
Definition: unbuffered_channel.hpp:98
result try_recv(T &r) const
nonblocking attempt to retrieve data from channel
Definition: unbuffered_channel.hpp:101
const std::type_info & type_info() const
retrieve type_info
Definition: unbuffered_channel.hpp:68
void construct() const
construct channel context
Definition: unbuffered_channel.hpp:59
bool recv(T &r) const
blocking retrieve data from channel
Definition: unbuffered_channel.hpp:89
bool send(const T &s) const
blocking send a copy of data through channel
Definition: unbuffered_channel.hpp:77
void assign(const unbuffered_channel< T > &rhs) const
copy internal context of argument channel
Definition: unbuffered_channel.hpp:104
void * context() const
retrieve internal context pointer
Definition: unbuffered_channel.hpp:65
void assign(unbuffered_channel< T > &&rhs) const
move internal context of argument channel
Definition: unbuffered_channel.hpp:110
void close() const
close channel
Definition: unbuffered_channel.hpp:71
result try_send(const T &s) const
nonblocking attempt to send a copy of data through channel
Definition: unbuffered_channel.hpp:95
bool closed() const
report if channel is closed
Definition: unbuffered_channel.hpp:74