7 #ifndef __MERCURY_COROUTINE_ENGINE_BUFFERED_CHANNEL__
8 #define __MERCURY_COROUTINE_ENGINE_BUFFERED_CHANNEL__
14 #include <condition_variable>
18 #include <boost/circular_buffer.hpp>
48 ctx(std::move(rhs.ctx))
56 ctx = std::make_shared<buffered_channel_context>(sz);
63 inline void*
context()
const {
return (
void*)(ctx.get()); }
66 inline const std::type_info&
type_info()
const {
return typeid(*this); }
69 inline void close()
const { ctx->close(); }
72 inline bool closed()
const {
return ctx->closed(); }
75 inline size_t size()
const {
return ctx->size(); }
78 inline bool empty()
const {
return ctx->empty(); }
81 inline bool full()
const {
return ctx->full(); }
84 inline size_t capacity()
const {
return ctx->capacity(); }
87 inline size_t reserve()
const {
return ctx->reserve(); }
90 inline bool send(
const T& s)
const
92 return ctx->send(s,
true) == result::success;
96 inline bool send(T&& s)
const
98 return ctx->send(s,
true) == result::success;
104 return ctx->recv(r,
true) == result::success;
125 this->ctx = std::move(rhs.ctx);
129 struct buffered_channel_context
132 mutable bool closed_flag;
133 mutable boost::circular_buffer<T> buf;
137 buffered_channel_context(
size_t sz=1) : closed_flag(false), buf(sz) { }
139 inline void close()
const
141 std::unique_lock<mce::spinlock> lk(spin_lk);
153 unpark_queue(parked_send);
154 unpark_queue(parked_recv);
157 inline bool closed()
const
159 std::unique_lock<mce::spinlock> lk(spin_lk);
164 inline size_t size()
const
166 std::unique_lock<mce::spinlock> lk(spin_lk);
170 inline bool empty()
const
172 std::unique_lock<mce::spinlock> lk(spin_lk);
176 inline bool full()
const
178 std::unique_lock<mce::spinlock> lk(spin_lk);
185 std::unique_lock<mce::spinlock> lk(spin_lk);
186 return buf.capacity();
191 std::unique_lock<mce::spinlock> lk(spin_lk);
192 return buf.reserve();
195 inline result send_(
void* s,
bool block,
bool is_rvalue)
const
197 std::unique_lock<mce::spinlock> lk(spin_lk);
202 return result::closed;
205 auto internal_send = [&]
208 if(is_rvalue){ buf.push_back(std::move(*((T*)s))); }
209 else{ buf.push_back(*((
const T*)s)); }
217 scheduler::parkable p;
218 parked_send.push_back({&p,[&](
void* m)
220 if(m){ internal_send(); }
221 else{ failed =
true; }
229 return result::closed;
237 return result::failure;
240 else { internal_send(); }
245 if(parked_recv.size() && !buf.empty())
247 parked_recv.front()((
void*)1);
248 parked_recv.pop_front();
255 return result::success;
258 inline result send(
const T& s,
bool block)
const
260 return send_((
void*)&s,block,
false);
265 return send_((
void*)&s,block,
true);
270 std::unique_lock<mce::spinlock> lk(spin_lk);
275 return result::closed;
278 auto internal_recv = [&]
280 r = std::move(buf.front());
290 scheduler::parkable p;
291 parked_recv.push_back({&p,[&](
void* m)
293 if(m){ internal_recv(); }
294 else { failed =
true; }
302 return result::closed;
310 return result::failure;
313 else { internal_recv(); }
318 if(parked_send.size() && !buf.full())
321 parked_send.front()((
void*)1);
322 parked_send.pop_front();
329 return result::success;
334 mutable std::shared_ptr<buffered_channel_context> ctx;
result
enum for channel operation results
Definition: base_channel.hpp:23
void yield()
Definition: coroutine.hpp:186
Definition: base_channel.hpp:49
Definition: buffered_channel.hpp:33
result try_send(const T &r) const
nonblocking attempt to send a copy of data through channel
Definition: buffered_channel.hpp:108
size_t size() const
return number of values stored in channel
Definition: buffered_channel.hpp:75
void close() const
close channel
Definition: buffered_channel.hpp:69
void assign(const buffered_channel< T > &rhs) const
copy internal context of argument channel
Definition: buffered_channel.hpp:117
result try_recv(T &ret) const
nonblocking attempt to retrieve data from channel
Definition: buffered_channel.hpp:114
size_t reserve() const
return count of unused values in the buffer
Definition: buffered_channel.hpp:87
bool send(const T &s) const
blocking send a copy of data through channel
Definition: buffered_channel.hpp:90
void construct(size_t sz) const
construct channel context by specifying the internal buffer size
Definition: buffered_channel.hpp:53
result try_send(T &&r) const
nonblocking attempt to move data through channel
Definition: buffered_channel.hpp:111
bool empty() const
return if there are no values in the channel
Definition: buffered_channel.hpp:78
void assign(buffered_channel< T > &&rhs) const
move internal context of argument channel
Definition: buffered_channel.hpp:123
size_t capacity() const
return maximum number of storable values
Definition: buffered_channel.hpp:84
void construct() const
construct channel context with a default internal buffer size
Definition: buffered_channel.hpp:60
void * context() const
retrieve internal context pointer
Definition: buffered_channel.hpp:63
bool send(T &&s) const
blocking move data through channel
Definition: buffered_channel.hpp:96
bool closed() const
report if channel is closed
Definition: buffered_channel.hpp:72
bool recv(T &r) const
blocking retrieve data from channel
Definition: buffered_channel.hpp:102
bool full() const
return if channel buffer is full
Definition: buffered_channel.hpp:81
const std::type_info & type_info() const
retrieve type_info
Definition: buffered_channel.hpp:66
Definition: base_channel.hpp:229
mce::detail::queue< parkable_notify > parkable_notify_queue
blocked queue for parkable_notify structs
Definition: scheduler.hpp:457
Core mechanism for atomic synchronization.
Definition: atomic.hpp:20