Mercury Currency Engine
unbuffered_channel.hpp
Go to the documentation of this file.
1 //SPDX-License-Identifier: Apache-2.0
2 //Author: Blayne Dennis
7 #ifndef __MERCURY_COROUTINE_ENGINE_UNBUFFERED_CHANNEL__
8 #define __MERCURY_COROUTINE_ENGINE_UNBUFFERED_CHANNEL__
9 
10 // c++
11 #include <memory>
12 #include <utility>
13 #include <type_traits>
14 #include <typeinfo>
15 
16 // local
17 #include "scheduler.hpp"
18 #include "base_channel.hpp"
19 
20 // test, only uncomment for development of this library
21 //#include "dev_print.hpp"
22 
23 namespace mce {
24 
25 //-----------------------------------------------------------------------------
36 template <typename T>
37 struct unbuffered_channel : public base_channel<T>,
38  public channel_operators<T,unbuffered_channel<T>>
39 {
40  inline unbuffered_channel() :
41  base_channel<T>(this),
43  { }
44 
45  inline unbuffered_channel(const unbuffered_channel<T>& rhs) :
46  base_channel<T>(this),
48  ctx(rhs.ctx)
49  { }
50 
52  base_channel<T>(this),
54  ctx(std::move(rhs.ctx))
55  { }
56 
57  // operations
59  inline void construct() const
60  {
61  ctx = std::make_shared<unbuffered_channel_context>();
62  }
63 
65  inline void* context() const { return (void*)(ctx.get()); }
66 
68  inline const std::type_info& type_info() const { return typeid(*this); }
69 
71  inline void close() const { return ctx->close(); }
72 
74  inline bool closed() const { return ctx->closed(); }
75 
77  inline bool send(const T& s) const
78  {
79  return ctx->send(s,true) == result::success;
80  }
81 
83  inline bool send(T&& s) const
84  {
85  return ctx->send(std::move(s),true) == result::success;
86  }
87 
89  inline bool recv(T& r) const
90  {
91  return ctx->recv(r,true) == result::success;
92  }
93 
95  inline result try_send(const T& s) const { return ctx->send(s,false); }
96 
98  inline result try_send(T&& s) const { return ctx->send(std::move(s),false); }
99 
101  inline result try_recv(T& r) const { return ctx->recv(r,false); }
102 
104  inline void assign(const unbuffered_channel<T>& rhs) const
105  {
106  this->ctx = rhs.ctx;
107  }
108 
110  inline void assign(unbuffered_channel<T>&& rhs) const
111  {
112  this->ctx = std::move(rhs.ctx);
113  }
114 
115 private:
116  struct send_pair
117  {
118  bool is_rvalue;
119  void* target;
120 
121  send_pair(void* rhs_target, bool rhs_is_rvalue) :
122  is_rvalue(rhs_is_rvalue),
123  target(rhs_target)
124  { }
125  };
126 
127  struct unbuffered_channel_context
128  {
129  mce::spinlock spin_lk;
130  bool closed_flag;
133 
134  unbuffered_channel_context() : closed_flag(false) {}
135 
136  inline void close()
137  {
138  std::unique_lock<mce::spinlock> lk(spin_lk);
139  closed_flag=true;
140 
141  auto unpark_queue = [&](scheduler::parkable_notify_queue& lst)
142  {
143  while(!lst.empty())
144  {
145  lst.front()(NULL);
146  lst.pop_front();
147  }
148  };
149 
150  unpark_queue(parked_send);
151  unpark_queue(parked_recv);
152  }
153 
154  inline bool closed()
155  {
156  std::unique_lock<mce::spinlock> lk(spin_lk);
157  return closed_flag;
158  }
159 
160  inline result send_(void* s, bool block, bool is_rvalue)
161  {
162  std::unique_lock<mce::spinlock> lk(spin_lk);
163 
164  if(closed_flag)
165  {
166  lk.unlock();
167  return result::closed;
168  }
169 
170  // park if no recv is available
171  if(parked_recv.empty())
172  {
173  if(block)
174  {
175  bool failed = false;
176  scheduler::parkable p;
177  parked_send.push_back({&p,[&](void* m)
178  {
179  if(m)
180  {
181  if(is_rvalue){ *((T*)m) = std::move(*((T*)s)); }
182  else{ *((T*)m) = *((const T*)s); }
183  }
184  else{ failed = true; }
185  p.unpark(lk);
186  }});
187  p.park(lk);
188 
189  if(failed)
190  {
191  lk.unlock();
192  return result::closed;
193  }
194  }
195  else
196  {
197  // let other coroutines run
198  lk.unlock();
199  mce::yield();
200  return result::failure;
201  }
202  }
203  // else send value
204  else
205  {
206  send_pair sp(s,is_rvalue);
207  parked_recv.front()((void*)(&sp));
208  parked_recv.pop_front();
209  }
210 
211  // let other coroutines run
212  lk.unlock();
213  mce::yield();
214  return result::success;
215  }
216 
217  inline result send(const T& s, bool block)
218  {
219  return send_((void*)&s,block,false);
220  }
221 
222  inline result send(T&& s, bool block)
223  {
224  return send_((void*)&s,block,true);
225  }
226 
227  inline result recv(T& r, bool block)
228  {
229  std::unique_lock<mce::spinlock> lk(spin_lk);
230 
231  if(closed_flag)
232  {
233  lk.unlock();
234  return result::closed;
235  }
236 
237  // park if no send is available
238  if(parked_send.empty())
239  {
240  if(block)
241  {
242  bool failed = false;
243  scheduler::parkable p;
244  parked_recv.push_back({&p,[&](void* m)
245  {
246  if(m)
247  {
248  send_pair* sp = (send_pair*)m;
249  if(sp->is_rvalue)
250  {
251  r = std::move(*((T*)(sp->target)));
252  }
253  else{ r = *((const T*)(sp->target)); }
254  }
255  else { failed = true; }
256  p.unpark(lk);
257  }});
258  p.park(lk);
259 
260  if(failed)
261  {
262  lk.unlock();
263  return result::closed;
264  }
265  }
266  else
267  {
268  // let other coroutines run
269  lk.unlock();
270  mce::yield();
271  return result::failure;
272  }
273  }
274  // else recv value
275  else
276  {
277  parked_send.front()(&r);
278  parked_send.pop_front();
279  }
280 
281  // let other coroutines run
282  lk.unlock();
283  mce::yield();
284  return result::success;
285  }
286  };
287 
288  // Forcibly ignore the existence of the const concept
289  mutable std::shared_ptr<unbuffered_channel_context> ctx;
290 };
291 
292 }
293 
294 #endif
result
enum for channel operation results
Definition: base_channel.hpp:23
void yield()
Definition: coroutine.hpp:186
Definition: base_channel.hpp:49
Definition: base_channel.hpp:229
mce::detail::queue< parkable_notify > parkable_notify_queue
blocked queue for parkable_notify structs
Definition: scheduler.hpp:457
Core mechanism for atomic synchronization.
Definition: atomic.hpp:20
Definition: unbuffered_channel.hpp:39
bool send(T &&s) const
blocking move data through channel
Definition: unbuffered_channel.hpp:83
result try_send(T &&s) const
nonblocking attempt to move data through channel
Definition: unbuffered_channel.hpp:98
result try_recv(T &r) const
nonblocking attempt to retrieve data from channel
Definition: unbuffered_channel.hpp:101
const std::type_info & type_info() const
retrieve type_info
Definition: unbuffered_channel.hpp:68
void construct() const
construct channel context
Definition: unbuffered_channel.hpp:59
bool recv(T &r) const
blocking retrieve data from channel
Definition: unbuffered_channel.hpp:89
bool send(const T &s) const
blocking send a copy of data through channel
Definition: unbuffered_channel.hpp:77
void assign(const unbuffered_channel< T > &rhs) const
copy internal context of argument channel
Definition: unbuffered_channel.hpp:104
void * context() const
retrieve internal context pointer
Definition: unbuffered_channel.hpp:65
void assign(unbuffered_channel< T > &&rhs) const
move internal context of argument channel
Definition: unbuffered_channel.hpp:110
void close() const
close channel
Definition: unbuffered_channel.hpp:71
result try_send(const T &s) const
nonblocking attempt to send a copy of data through channel
Definition: unbuffered_channel.hpp:95
bool closed() const
report if channel is closed
Definition: unbuffered_channel.hpp:74