Mercury Currency Engine
condition_variable.hpp
Go to the documentation of this file.
1 //SPDX-License-Identifier: Apache-2.0
2 //Author: Blayne Dennis
7 #ifndef __MERCURY_COROUTINE_ENGINE_CONDITION_VARIABLE__
8 #define __MERCURY_COROUTINE_ENGINE_CONDITION_VARIABLE__
9 
10 // c++
11 #include <deque>
12 #include <thread>
13 #include <condition_variable>
14 #include <memory>
15 
16 // local
17 #include "atomic.hpp"
18 #include "scheduler.hpp"
19 #include "timer.hpp"
20 
21 namespace mce {
22 
28 {
30  condition_variable(condition_variable&& rhs) = delete;
31  condition_variable(const condition_variable& rhs) = delete;
32  condition_variable& operator=(condition_variable&& rhs) = delete;
33  condition_variable& operator=(const condition_variable& rhs) = delete;
34 
38  template <class Lock>
39  void wait(Lock& lk)
40  {
42 
43  // interleave locks
44  std::unique_lock<mce::spinlock> inner_lk(lk_);
45  lk.unlock();
46 
47  auto key = borrow_key();
48  notify_queue_.push_back({ key, { &p, unparker{&p} }});
49  p.park(inner_lk);
50  return_key(key);
51 
52  inner_lk.unlock();
53  lk.lock();
54  }
55 
59  template <class Lock, class Pred>
60  void wait(Lock& lk, Pred p)
61  {
62  while(!p()) { wait(lk); }
63  }
64 
69  template <class Lock, class Rep, class Period>
70  std::cv_status wait_for(Lock& lk, const std::chrono::duration<Rep, Period>& d)
71  {
72  auto tp = d+mce::current_time();
73  return wait_until(lk, tp);
74  }
75 
80  template <class Lock, class Rep, class Period, class Pred>
81  bool wait_for(Lock& lk, const std::chrono::duration<Rep, Period>& d, Pred p)
82  {
83  bool result = false;
84  std::cv_status status = std::cv_status::no_timeout;
85 
86  while(!(result = p()))
87  {
88  status = wait_for(lk, d);
89 
90  // exit loop early because the wait operation took too long and we
91  // timed out
92  if(status == std::cv_status::timeout) { break; }
93  }
94 
95  return result;
96  }
97 
102  template <class Lock, class Clock, class Duration>
103  std::cv_status wait_until(
104  Lock& user_lk,
105  const std::chrono::time_point<Clock, Duration>& tp)
106  {
107  std::cv_status status = std::cv_status::no_timeout;
108  timer_id id;
110  bool notify_available = true;
111 
112  // interleave locks
113  std::unique_lock<mce::spinlock> lk(lk_);
114  user_lk.unlock();
115 
116  // acquire a unique key for this operation
117  auto key = borrow_key();
118 
119  notify_queue_.push_back({
120  key,
121  { &p, unparker_with_flag{&p, &notify_available} }
122  });
123 
124  // start the timer
125  id = ts_.timer(
126  tp,
127  clear_safe_handler{
128  std::make_shared<clear_safe_handler::resumer>(
129  [&]
130  {
131  std::unique_lock<mce::spinlock> lk(lk_);
132  if(notify_available)
133  {
134  status = std::cv_status::timeout;
135  notify_with_key(lk,key);
136  }
137  })
138  });
139 
140  // release the lock and park until unpark or timeout
141  p.park(lk);
142 
143  // return the key
144  return_key(key);
145 
146  // release the condition_variable's private lock
147  lk.unlock();
148 
149  // remove timer if it is enqueued, this operation is synchronized such
150  // that it is guaranteed to not return until the timer is removed and
151  // its timeout handler is not executing
152  ts_.remove(id);
153 
154  // re-acquire user lock
155  user_lk.lock();
156 
157  return status;
158  }
159 
164  template <class Lock, class Clock, class Duration, class Pred>
165  void wait_until(Lock& lk,
166  std::chrono::time_point<Clock, Duration> tp,
167  Pred p)
168  {
169  while(!p()) { wait_until(lk, tp); }
170  }
171 
173  inline void notify_one()
174  {
175  std::unique_lock<mce::spinlock> lk(lk_);
176  auto it = notify_queue_.begin();
177  if(it != notify_queue_.end())
178  {
179  it->second((void*)&lk);
180  notify_queue_.erase(it);
181  }
182  }
183 
185  inline void notify_all()
186  {
187  std::unique_lock<mce::spinlock> lk(lk_);
188  auto it = notify_queue_.begin();
189  while(it != notify_queue_.end())
190  {
191  it->second((void*)&lk);
192  ++it;
193  }
194  notify_queue_.clear();
195  }
196 
197 private:
198  typedef size_t key_type;
199  typedef std::deque<std::pair<key_type,scheduler::parkable_notify>> notify_queue;
200 
201  // ensure the handler is called even if the timer is cleared
202  struct clear_safe_handler
203  {
204  struct resumer
205  {
206  resumer(thunk hdl) :
207  hdl_(std::move(hdl))
208  { }
209 
210  ~resumer(){ hdl_(); }
211  const thunk hdl_;
212  };
213 
214  // timeout was reached
215  inline void operator()(){ }
216 
217  std::shared_ptr<resumer> resumer_;
218  };
219 
220  struct unparker
221  {
222  unparker(scheduler::parkable* p) : p_(p) { }
223  inline void operator()(void* m) { p_->unpark(*((std::unique_lock<mce::spinlock>*)m)); }
224  scheduler::parkable* p_;
225  };
226 
227  struct unparker_with_flag
228  {
229  unparker_with_flag(scheduler::parkable* p, bool* flag) :
230  p_(p),
231  flag_(flag)
232  { }
233 
234  inline void operator()(void* m)
235  {
236  // this is a guard from unparking twice, in the case that a
237  // call to notify_one/all wakes this up before timeout and
238  // before the timeout is successfully removed
239  if(*flag_)
240  {
241  *flag_ = false;
242  p_->unpark(*((std::unique_lock<mce::spinlock>*)m));
243  }
244  }
245 
246  scheduler::parkable* p_;
247  bool* flag_;
248  };
249 
250  // Acquire or generate a unique key. This method requires caller to
251  // implicitly own a lock
252  inline size_t borrow_key()
253  {
254  size_t ret;
255 
256  if(!free_keys_.empty())
257  {
258  ret = free_keys_.front();
259  free_keys_.pop_front();
260  }
261  else
262  {
263  ret = key_source_;
264  ++key_source_;
265  }
266 
267  return ret;
268  }
269 
270  inline void return_key(key_type key) { free_keys_.push_back(key); }
271 
272  // Notify a specific blocked operation, unlike notify_one()/notify_all()
273  // which are agnostic.
274  inline void notify_with_key(std::unique_lock<mce::spinlock>& lk, key_type key)
275  {
276  notify_queue::iterator it;
277  for(it = notify_queue_.begin(); it != notify_queue_.end(); ++it)
278  {
279  if(it->first == key)
280  {
281  it->second((void*)&lk);
282  notify_queue_.erase(it);
283  break;
284  }
285  }
286  }
287 
288  mce::spinlock lk_;
289  notify_queue notify_queue_;
290  std::deque<key_type> free_keys_;
291  size_t key_source_ = 0;
293 };
294 
295 }
296 #endif
result
enum for channel operation results
Definition: base_channel.hpp:23
std::function< void()> thunk
thunk type definition. Also known as a nullary function
Definition: function_utility.hpp:72
Definition: condition_variable.hpp:205
Definition: condition_variable.hpp:28
std::cv_status wait_until(Lock &user_lk, const std::chrono::time_point< Clock, Duration > &tp)
Definition: condition_variable.hpp:103
void wait_until(Lock &lk, std::chrono::time_point< Clock, Duration > tp, Pred p)
Definition: condition_variable.hpp:165
void notify_all()
Unblock all contexts calling a wait operation.
Definition: condition_variable.hpp:185
bool wait_for(Lock &lk, const std::chrono::duration< Rep, Period > &d, Pred p)
Definition: condition_variable.hpp:81
std::cv_status wait_for(Lock &lk, const std::chrono::duration< Rep, Period > &d)
Definition: condition_variable.hpp:70
void wait(Lock &lk, Pred p)
Definition: condition_variable.hpp:60
void notify_one()
Unblock one context calling a wait operation.
Definition: condition_variable.hpp:173
void wait(Lock &lk)
Definition: condition_variable.hpp:39
object containing information to block and unblock a coroutine (running in a scheduler) or thread
Definition: scheduler.hpp:268
void park(LOCK &lk)
blocking call until unpark, unlocks and relocks given lock as necessary
Definition: scheduler.hpp:283
Core mechanism for atomic synchronization.
Definition: atomic.hpp:20
Definition: timer.hpp:133
Definition: timer.hpp:131
timer_id timer(const mce::time_point &timeout, THUNK &&timeout_handler)
start timer
Definition: timer.hpp:205
bool remove(timer_id id)
remove a running timer
Definition: timer.hpp:281
mce::time_point current_time()
Return the current time. All mce timer time operations are calculated using this function.
Definition: timer.hpp:103
timer_service & default_timer_service()
Access to default mce::timer_service object.