Mercury Currency Engine
buffered_channel.hpp
Go to the documentation of this file.
1 //SPDX-License-Identifier: Apache-2.0
2 //Author: Blayne Dennis
7 #ifndef __MERCURY_COROUTINE_ENGINE_BUFFERED_CHANNEL__
8 #define __MERCURY_COROUTINE_ENGINE_BUFFERED_CHANNEL__
9 
10 // c++
11 #include <memory>
12 #include <queue>
13 #include <mutex>
14 #include <condition_variable>
15 #include <typeinfo>
16 
17 // boost
18 #include <boost/circular_buffer.hpp>
19 
20 // local
21 #include "scheduler.hpp"
22 #include "base_channel.hpp"
23 
24 namespace mce {
25 
30 template <typename T>
31 struct buffered_channel : public base_channel<T>,
32  public channel_operators<T,buffered_channel<T>>
33 {
34  inline buffered_channel() :
35  base_channel<T>(this),
37  { }
38 
39  inline buffered_channel(const buffered_channel<T>& rhs) :
40  base_channel<T>(this),
42  ctx(rhs.ctx)
43  { }
44 
45  inline buffered_channel(buffered_channel<T>&& rhs) :
46  base_channel<T>(this),
48  ctx(std::move(rhs.ctx))
49  { }
50 
51  // operations
53  inline void construct(size_t sz) const
54  {
55  if(sz<1){ sz = 1; }
56  ctx = std::make_shared<buffered_channel_context>(sz);
57  }
58 
60  inline void construct() const { construct(0); }
61 
63  inline void* context() const { return (void*)(ctx.get()); }
64 
66  inline const std::type_info& type_info() const { return typeid(*this); }
67 
69  inline void close() const { ctx->close(); }
70 
72  inline bool closed() const { return ctx->closed(); }
73 
75  inline size_t size() const { return ctx->size(); }
76 
78  inline bool empty() const { return ctx->empty(); }
79 
81  inline bool full() const { return ctx->full(); }
82 
84  inline size_t capacity() const { return ctx->capacity(); }
85 
87  inline size_t reserve() const { return ctx->reserve(); }
88 
90  inline bool send(const T& s) const
91  {
92  return ctx->send(s,true) == result::success;
93  }
94 
96  inline bool send(T&& s) const
97  {
98  return ctx->send(s,true) == result::success;
99  }
100 
102  inline bool recv(T& r) const
103  {
104  return ctx->recv(r,true) == result::success;
105  }
106 
108  inline result try_send(const T& r) const { return ctx->send(r,false); }
109 
111  inline result try_send(T&& r) const { return ctx->send(std::move(r),false); }
112 
114  inline result try_recv(T& ret) const { return ctx->recv(ret,false); }
115 
117  inline void assign(const buffered_channel<T>& rhs) const
118  {
119  this->ctx = rhs.ctx;
120  }
121 
123  inline void assign(buffered_channel<T>&& rhs) const
124  {
125  this->ctx = std::move(rhs.ctx);
126  }
127 
128 private:
129  struct buffered_channel_context
130  {
131  mutable mce::spinlock spin_lk;
132  mutable bool closed_flag;
133  mutable boost::circular_buffer<T> buf;
134  mutable scheduler::parkable_notify_queue parked_send;
135  mutable scheduler::parkable_notify_queue parked_recv;
136 
137  buffered_channel_context(size_t sz=1) : closed_flag(false), buf(sz) { }
138 
139  inline void close() const
140  {
141  std::unique_lock<mce::spinlock> lk(spin_lk);
142  closed_flag=true;
143 
144  auto unpark_queue = [&](scheduler::parkable_notify_queue& lst)
145  {
146  while(!lst.empty())
147  {
148  lst.front()(NULL);
149  lst.pop_front();
150  }
151  };
152 
153  unpark_queue(parked_send);
154  unpark_queue(parked_recv);
155  }
156 
157  inline bool closed() const
158  {
159  std::unique_lock<mce::spinlock> lk(spin_lk);
160  return closed_flag;
161  }
162 
163  // return number of values stored in channel
164  inline size_t size() const
165  {
166  std::unique_lock<mce::spinlock> lk(spin_lk);
167  return buf.size();
168  }
169 
170  inline bool empty() const
171  {
172  std::unique_lock<mce::spinlock> lk(spin_lk);
173  return buf.empty();
174  }
175 
176  inline bool full() const
177  {
178  std::unique_lock<mce::spinlock> lk(spin_lk);
179  return buf.full();
180  }
181 
182  // return maximum number of storable values
183  inline size_t capacity() const
184  {
185  std::unique_lock<mce::spinlock> lk(spin_lk);
186  return buf.capacity();
187  }
188 
189  inline size_t reserve() const
190  {
191  std::unique_lock<mce::spinlock> lk(spin_lk);
192  return buf.reserve();
193  }
194 
195  inline result send_(void* s, bool block, bool is_rvalue) const
196  {
197  std::unique_lock<mce::spinlock> lk(spin_lk);
198 
199  if(closed_flag)
200  {
201  lk.unlock();
202  return result::closed;
203  }
204 
205  auto internal_send = [&]
206  {
207  // enqueue data
208  if(is_rvalue){ buf.push_back(std::move(*((T*)s))); }
209  else{ buf.push_back(*((const T*)s)); }
210  };
211 
212  if(buf.full())
213  {
214  if(block)// if full and this is a blocking send, park sender
215  {
216  bool failed = false;
217  scheduler::parkable p;
218  parked_send.push_back({&p,[&](void* m)
219  {
220  if(m){ internal_send(); }
221  else{ failed = true; }
222  p.unpark(lk);
223  }});
224  p.park(lk);
225 
226  if(failed)
227  {
228  lk.unlock();
229  return result::closed;
230  }
231  }
232  else
233  {
234  // let other coroutines run
235  lk.unlock();
236  mce::yield();
237  return result::failure;
238  }
239  }
240  else { internal_send(); }
241 
242  if(!closed_flag)
243  {
244  // if recv available, wakeup
245  if(parked_recv.size() && !buf.empty())
246  {
247  parked_recv.front()((void*)1);
248  parked_recv.pop_front();
249  }
250  }
251 
252  // let other coroutines run
253  lk.unlock();
254  mce::yield();
255  return result::success;
256  }
257 
258  inline result send(const T& s, bool block) const
259  {
260  return send_((void*)&s,block,false);
261  }
262 
263  inline result send(T&& s, bool block) const
264  {
265  return send_((void*)&s,block,true);
266  }
267 
268  inline result recv(T& r, bool block) const
269  {
270  std::unique_lock<mce::spinlock> lk(spin_lk);
271 
272  if(closed_flag)
273  {
274  lk.unlock();
275  return result::closed;
276  }
277 
278  auto internal_recv = [&]
279  {
280  r = std::move(buf.front());
281  buf.pop_front();
282  };
283 
284  if(buf.empty())
285  {
286  // if empty and this is a blocking recv, park receiver
287  if(block)
288  {
289  bool failed = false;
290  scheduler::parkable p;
291  parked_recv.push_back({&p,[&](void* m)
292  {
293  if(m){ internal_recv(); }
294  else { failed = true; }
295  p.unpark(lk);
296  }});
297  p.park(lk);
298 
299  if(failed)
300  {
301  lk.unlock();
302  return result::closed;
303  }
304  }
305  else
306  {
307  // let other coroutines run
308  lk.unlock();
309  mce::yield();
310  return result::failure;
311  }
312  }
313  else { internal_recv(); }
314 
315  if(!closed_flag)
316  {
317  // check if any senders are available, unpark one
318  if(parked_send.size() && !buf.full())
319  {
320  // senders have no memory m pointer to manage
321  parked_send.front()((void*)1);
322  parked_send.pop_front();
323  }
324  }
325 
326  // let other coroutines run
327  lk.unlock();
328  mce::yield();
329  return result::success;
330  }
331  };
332 
333  // Forcibly ignore the existence of the const concept
334  mutable std::shared_ptr<buffered_channel_context> ctx;
335 };
336 
337 }
338 
339 #endif
result
enum for channel operation results
Definition: base_channel.hpp:23
void yield()
Definition: coroutine.hpp:186
Definition: base_channel.hpp:49
Definition: buffered_channel.hpp:33
result try_send(const T &r) const
nonblocking attempt to send a copy of data through channel
Definition: buffered_channel.hpp:108
size_t size() const
return number of values stored in channel
Definition: buffered_channel.hpp:75
void close() const
close channel
Definition: buffered_channel.hpp:69
void assign(const buffered_channel< T > &rhs) const
copy internal context of argument channel
Definition: buffered_channel.hpp:117
result try_recv(T &ret) const
nonblocking attempt to retrieve data from channel
Definition: buffered_channel.hpp:114
size_t reserve() const
return count of unused values in the buffer
Definition: buffered_channel.hpp:87
bool send(const T &s) const
blocking send a copy of data through channel
Definition: buffered_channel.hpp:90
void construct(size_t sz) const
construct channel context by specifying the internal buffer size
Definition: buffered_channel.hpp:53
result try_send(T &&r) const
nonblocking attempt to move data through channel
Definition: buffered_channel.hpp:111
bool empty() const
return if there are no values in the channel
Definition: buffered_channel.hpp:78
void assign(buffered_channel< T > &&rhs) const
move internal context of argument channel
Definition: buffered_channel.hpp:123
size_t capacity() const
return maximum number of storable values
Definition: buffered_channel.hpp:84
void construct() const
construct channel context with a default internal buffer size
Definition: buffered_channel.hpp:60
void * context() const
retrieve internal context pointer
Definition: buffered_channel.hpp:63
bool send(T &&s) const
blocking move data through channel
Definition: buffered_channel.hpp:96
bool closed() const
report if channel is closed
Definition: buffered_channel.hpp:72
bool recv(T &r) const
blocking retrieve data from channel
Definition: buffered_channel.hpp:102
bool full() const
return if channel buffer is full
Definition: buffered_channel.hpp:81
const std::type_info & type_info() const
retrieve type_info
Definition: buffered_channel.hpp:66
Definition: base_channel.hpp:229
mce::detail::queue< parkable_notify > parkable_notify_queue
blocked queue for parkable_notify structs
Definition: scheduler.hpp:457
Core mechanism for atomic synchronization.
Definition: atomic.hpp:20