Mercury Currency Engine
timer.hpp
Go to the documentation of this file.
1 //SPDX-License-Identifier: Apache-2.0
2 //Author: Blayne Dennis
7 #ifndef __MERCURY_COROUTINE_ENGINE_TIMER__
8 #define __MERCURY_COROUTINE_ENGINE_TIMER__
9 
10 // c++ includes
11 #include <chrono>
12 #include <condition_variable>
13 #include <functional>
14 #include <memory>
15 #include <thread>
16 #include <utility>
17 #include <list>
18 
19 // local
20 #include "function_utility.hpp"
21 #include "atomic.hpp"
22 #include "scheduler.hpp"
23 
24 namespace mce {
25 
26 enum time_unit
27 {
28  hour,
29  minute,
30  second,
31  millisecond,
32  microsecond,
33  nanosecond
34 };
35 
36 // use steady clock as we don't want to risk the clock changing near boot if
37 // if clock is determined by a runtime source
38 typedef std::chrono::steady_clock::time_point time_point;
39 typedef std::chrono::steady_clock::duration duration;
40 
41 // Timer utility functions
42 
44 inline mce::duration get_duration(mce::time_unit u, size_t count)
45 {
46  mce::duration dur;
47  switch(u)
48  {
49  case time_unit::hour:
50  dur = std::chrono::hours(count);
51  break;
52  case time_unit::minute:
53  dur = std::chrono::minutes(count);
54  break;
55  case time_unit::second:
56  dur = std::chrono::seconds(count);
57  break;
58  case time_unit::millisecond:
59  dur = std::chrono::milliseconds(count);
60  break;
61  case time_unit::microsecond:
62  dur = std::chrono::microseconds(count);
63  break;
64  default:
65  dur = std::chrono::milliseconds(0);
66  break;
67  }
68  return dur;
69 }
70 
72 inline size_t get_time_point_difference(mce::time_unit u, mce::time_point p0, mce::time_point p1)
73 {
74  size_t dif=0;
75 
76  auto calc = p0 > p1 ? (p0 - p1) : (p1 - p0);
77 
78  switch(u)
79  {
80  case time_unit::hour:
81  dif = (size_t)(std::chrono::duration_cast<std::chrono::hours>(calc).count());
82  break;
83  case time_unit::minute:
84  dif = (size_t)(std::chrono::duration_cast<std::chrono::minutes>(calc).count());
85  break;
86  case time_unit::second:
87  dif = (size_t)(std::chrono::duration_cast<std::chrono::seconds>(calc).count());
88  break;
89  case time_unit::millisecond:
90  dif = (size_t)(std::chrono::duration_cast<std::chrono::milliseconds>(calc).count());
91  break;
92  case time_unit::microsecond:
93  dif = (size_t)(std::chrono::duration_cast<std::chrono::microseconds>(calc).count());
94  break;
95  default:
96  break;
97  }
98 
99  return dif;
100 }
101 
103 inline mce::time_point current_time()
104 {
105  return std::chrono::steady_clock::now();
106 }
107 
108 
131 {
132  struct timer_id
133  {
134  // Utilizing the uniqueness of allocated smart pointers in comparison
135  bool operator==(const timer_id& rhs) { return valid == rhs.valid; }
136 
137  private:
138  // The validy of the timer_id token is determined by if the callback for
139  // this timer has completed or not. Once callback finishes,
140  // *valid==false.
141  //
142  // Read/write to this value can only be done by the timer_service when
143  // holding its spinlock.
144  std::shared_ptr<bool> valid;
145 
146  friend struct timer_service;
147  };
148 
151  continue_running_(false),
152  new_timers_(false),
153  running_(false),
154  executing_remove_sync_required_(false),
155  waiting_for_timeouts_(false)
156  { }
157 
163 
165  inline void start()
166  {
167  std::unique_lock<mce::spinlock> lk(mut_);
168  continue_running_ = true;
169  running_ = true;
170  executing_remove_sync_required_ = false;
171  waiting_for_timeouts_ = false;
172  ready_cv_.notify_all();
173 
174  // will never return unless shutdown() is called
175  check_timers(lk);
176 
177  running_ = false;
178  join_cv_.notify_all();
179  }
180 
182  inline void ready()
183  {
184  std::unique_lock<mce::spinlock> lk(mut_);
185  while(!running_){ ready_cv_.wait(lk); }
186  }
187 
189  inline void shutdown()
190  {
191  std::unique_lock<mce::spinlock> lk(mut_);
192  if(continue_running_) { continue_running_ = false; }
193  cv_.notify_one(); // wakeup sleeping thread
194  while(running_) { join_cv_.wait(lk); }
195  }
196 
204  template <typename THUNK>
205  timer_id timer(const mce::time_point& timeout, THUNK&& timeout_handler)
206  {
207  return create_timer({ timeout, std::forward<THUNK>(timeout_handler) });
208  }
209 
210 
218  template <typename THUNK>
219  timer_id timer(const mce::duration& d, THUNK&& timeout_handler)
220  {
221  time_point tp = d+current_time();
222  return timer(tp, std::forward<THUNK>(timeout_handler));
223  }
224 
225 
234  template <typename THUNK>
235  timer_id timer(const time_unit u, size_t count, THUNK&& timeout_handler)
236  {
237  time_point timeout;
238 
239  switch(u)
240  {
241  case time_unit::hour:
242  timeout = current_time() + std::chrono::hours(count);
243  break;
244  case time_unit::minute:
245  timeout = current_time() + std::chrono::minutes(count);
246  break;
247  case time_unit::second:
248  timeout = current_time() + std::chrono::seconds(count);
249  break;
250  case time_unit::millisecond:
251  timeout = current_time() + std::chrono::milliseconds(count);
252  break;
253  case time_unit::microsecond:
254  timeout = current_time() + std::chrono::microseconds(count);
255  break;
256  default:
257  timeout = current_time();
258  break;
259  }
260 
261  return timer(timeout, std::forward<THUNK>(timeout_handler));
262  }
263 
265  inline bool running(timer_id id)
266  {
267  std::lock_guard<mce::spinlock> lk(mut_);
268  return *(id.valid) || executing_timer_ == id;
269  }
270 
281  inline bool remove(timer_id id)
282  {
283  bool success = false;
284 
285  std::unique_lock<mce::spinlock> lk(mut_);
286 
287  if(executing_timer_ == id)
288  {
289  // block until timeout handler finishes
290  do
291  {
292  executing_remove_sync_required_ = true;
293  remove_sync_cv.wait(lk);
294  } while(executing_timer_ == id);
295  }
296  else if(*(id.valid))
297  {
298  *(id.valid) = false;
299 
300  timer_queue::iterator it;
301  for(it = timers_.begin(); it != timers_.end(); ++it)
302  {
303  if(it->id == id)
304  {
305  success = true;
306  it = timers_.erase(it);
307  break;
308  }
309  }
310  }
311 
312  return success;
313  }
314 
322  inline void clear()
323  {
324  std::lock_guard<mce::spinlock> lk(mut_);
325  timers_.clear();
326  }
327 
329  inline size_t count()
330  {
331  std::unique_lock<mce::spinlock> lk(mut_);
332  return timers_.size();
333  }
334 
335 private:
336  struct timer_data
337  {
338  mce::time_point tp;
339  thunk timeout_handler;
340  timer_id id;
341 
342  bool operator<(const timer_data& rhs)
343  {
344  return tp < rhs.tp;
345  }
346  };
347 
348  bool continue_running_;
349  bool new_timers_;
350  bool running_;
351  bool executing_remove_sync_required_;
352  bool waiting_for_timeouts_;
353 
354  timer_id executing_timer_;
355 
356  using timer_queue = std::list<timer_data>;
357 
358  // queue of timers ordered by soonest timeout to latest
359  timer_queue timers_;
360 
361  mce::spinlock mut_;
362  std::condition_variable_any cv_;
363  std::condition_variable_any ready_cv_;
364  std::condition_variable_any join_cv_;
365  std::condition_variable_any remove_sync_cv;
366 
367  timer_id create_timer(timer_data&& td);
368 
369  inline timer_id get_id()
370  {
371  timer_id id;
372  id.valid = std::make_shared<bool>(true);
373  return id;
374  }
375 
376  inline void check_timers(std::unique_lock<mce::spinlock>& lk)
377  {
378  bool resume_required = false;
379  mce::time_point cur_time;
380  mce::time_point sleep_time;
381  timer_queue::iterator it;
382 
383  while(continue_running_)
384  {
385  do
386  {
387  new_timers_ = false;
388  cur_time = current_time();
389  it = timers_.begin();
390 
391  while(it != timers_.end())
392  {
393  // handle timeout
394  if(it->tp <= cur_time)
395  {
396  executing_timer_ = it->id;
397  *(executing_timer_.valid) = false;
398 
399  {
400  // move timer from map
401  const thunk t = std::move(it->timeout_handler);
402 
403  // do not use this again until checked in while()
404  timers_.erase(it);
405 
406  // execute timeouts *outside* the service lock in case a
407  // timeout calls this timer service
408  lk.unlock();
409  t();
410 
411  // ensure timeout thunk is destroyed at this point
412  }
413 
414  lk.lock();
415 
416  executing_timer_.valid.reset();
417 
418  if(executing_remove_sync_required_)
419  {
420  executing_remove_sync_required_ = false;
421  remove_sync_cv.notify_all();
422  }
423 
424  // during callback execution service was shutdown, stop
425  // evaluating timers
426  if(!continue_running_) { break; }
427  }
428  else
429  {
430  // We've reached the last timer we need to iterate. Set sleep
431  // time to the current timeout.
432  resume_required = true;
433  sleep_time = it->tp;
434  break;
435  }
436 
437  cur_time = current_time();
438  it = timers_.begin();
439  }
440 
441  // repeat if timer() was called during timeout execution
442  } while(new_timers_);
443 
444 
445  // don't do any sleeps (especially not indefinite ones) if we know we need
446  // to shutdown
447  if(continue_running_)
448  {
449  // Release lock and sleep until woken up or sleep_time has been reached
450  if(resume_required)
451  {
452  waiting_for_timeouts_ = true;
453  cv_.wait_until(lk,sleep_time);
454  resume_required = false;
455  }
456  else
457  {
458  // no timers running, block until timers are added
459  waiting_for_timeouts_ = true;
460  cv_.wait(lk);
461  }
462  }
463  }
464  }
465 };
466 
469 
472 
473 namespace detail {
474 
476 {
477  // reschedule handler
478  inline void operator()()
479  {
480  if(scheduler) { scheduler->schedule(timeout_handler); }
481  else { timeout_handler(); }
482  }
483 
484  std::shared_ptr<mce::scheduler> scheduler;
485  mce::thunk timeout_handler;
486 };
487 
488 }
489 
498 template <typename Callable, typename... As>
499 timer_id timer(time_unit u, size_t count, Callable&& cb, As&&... as)
500 {
501  return default_timer_service().timer(
502  u,
503  count,
505  in_scheduler() ? this_scheduler() : std::shared_ptr<scheduler>(),
506  make_thunk(std::forward<Callable>(cb), std::forward<As>(as)...)
507  });
508 }
509 
517 template <typename Callable, typename... As>
518 timer_id timer(const mce::time_point& timeout, Callable&& cb, As&&... as)
519 {
520  return default_timer_service().timer(
521  timeout,
523  in_scheduler() ? this_scheduler() : std::shared_ptr<scheduler>(),
524  make_thunk(std::forward<Callable>(cb), std::forward<As>(as)...)
525  });
526 }
527 
535 template <typename Callable, typename... As>
536 timer_id timer(const mce::duration& timeout, Callable&& cb, As&&... as)
537 {
538  return default_timer_service().timer(
539  timeout,
541  in_scheduler() ? this_scheduler() : std::shared_ptr<scheduler>(),
542  make_thunk(std::forward<Callable>(cb), std::forward<As>(as)...)
543  });
544 }
545 
547 inline bool remove_timer(timer_id id)
548 {
549  return default_timer_service().remove(id);
550 }
551 
553 inline size_t count_timers()
554 {
555  return default_timer_service().count();
556 }
557 
563 inline bool sleep(mce::duration d)
564 {
565  // timeout handler functor
566  struct wakeup
567  {
568 
569  // callback implementation only sets the success flag, indicating that
570  // the timer timed out properly instead of being cleared early
571  inline void operator()() { *success = true; }
572 
573  struct resumer
574  {
575  // the actual logic is in the destructor in this case because we need
576  // to ensure that the coroutine wakes up again no matter what
577  ~resumer()
578  {
579  std::unique_lock<mce::spinlock> lk(*slk);
580  pk->unpark(lk);
581  }
582 
584  mce::spinlock* slk;
585  };
586 
587  bool* success;
588  std::shared_ptr<resumer> resumer_;
589  };
590 
591  // data persists on the coroutine or thread stack
593  mce::spinlock slk;
594  bool success = false;
595 
596  std::unique_lock<mce::spinlock> lk(slk);
597 
598  timer(
599  d+current_time(),
600  wakeup{
601  &success,
602  std::shared_ptr<wakeup::resumer>(new wakeup::resumer{ &pk, &slk })
603  }
604  );
605 
606  pk.park(lk);
607 
608  return success;
609 }
610 
617 inline bool sleep(time_unit u, size_t count)
618 {
619  return sleep(get_duration(u, count));
620 }
621 
622 }
623 #endif
@ success
channel is closed, operation failed
Definition: base_channel.hpp:26
std::function< void()> thunk
thunk type definition. Also known as a nullary function
Definition: function_utility.hpp:72
bool in_scheduler()
returns true if calling scope is executing inside a running scheduler
Definition: scheduler.hpp:1095
scheduler & this_scheduler()
returns a shared pointer to the scheduler the calling scope is running in
Definition: scheduler.hpp:1106
Definition: timer.hpp:476
object containing information to block and unblock a coroutine (running in a scheduler) or thread
Definition: scheduler.hpp:268
void park(LOCK &lk)
blocking call until unpark, unlocks and relocks given lock as necessary
Definition: scheduler.hpp:283
object responsible for scheduling and executing coroutines
Definition: scheduler.hpp:180
void schedule(A &&a, As &&... as)
schedule allocated coroutine(s)
Definition: scheduler.hpp:714
Core mechanism for atomic synchronization.
Definition: atomic.hpp:20
Definition: timer.hpp:133
Definition: timer.hpp:131
void clear()
remove all pending timers
Definition: timer.hpp:322
size_t count()
Return the number of running timers.
Definition: timer.hpp:329
timer_id timer(const time_unit u, size_t count, THUNK &&timeout_handler)
start timer
Definition: timer.hpp:235
timer_service()
default constructor
Definition: timer.hpp:150
bool running(timer_id id)
return true if timer is running, else false
Definition: timer.hpp:265
timer_id timer(const mce::time_point &timeout, THUNK &&timeout_handler)
start timer
Definition: timer.hpp:205
timer_id timer(const mce::duration &d, THUNK &&timeout_handler)
start timer
Definition: timer.hpp:219
void start()
start timer service on current thread
Definition: timer.hpp:165
bool remove(timer_id id)
remove a running timer
Definition: timer.hpp:281
void shutdown()
inform service to shutdown and join with service
Definition: timer.hpp:189
~timer_service()
Definition: timer.hpp:162
void ready()
blocks until service is running
Definition: timer.hpp:182
mce::time_point current_time()
Return the current time. All mce timer time operations are calculated using this function.
Definition: timer.hpp:103
timer_id timer(time_unit u, size_t count, Callable &&cb, As &&... as)
launch a timer with a Callable to be called on timeout
Definition: timer.hpp:499
bool remove_timer(timer_id id)
remove a running timer, return true if successful, else false
Definition: timer.hpp:547
mce::duration get_duration(mce::time_unit u, size_t count)
Return duration for count units of the given time unit.
Definition: timer.hpp:44
size_t get_time_point_difference(mce::time_unit u, mce::time_point p0, mce::time_point p1)
Return the difference in time units between two time points.
Definition: timer.hpp:72
timer_service::timer_id timer_id
define mce::timer_id
Definition: timer.hpp:468
bool sleep(mce::duration d)
Put coroutine or thread to sleep in a blocking fashion.
Definition: timer.hpp:563
timer_service & default_timer_service()
Access to default mce::timer_service object.
size_t count_timers()
return a count of running timers
Definition: timer.hpp:553