Mercury Currency Engine
|
The functions mce::concurrent()
, mce::parallel()
and mce::balance()
will launch a given function or Callable (and any arguments) as a concurrently executing coroutine. Data can be communicated between coroutines and threads via channels (mce::chan
, mce::unbuffered_channel
, mce::buffered_channel
) and their send()
and recv()
methods.
A trivial example:
Terminal output:
See concurrency summary for more information
mce::unbuffered_channel
s, mce::buffered_channel
s and are synchronized communication mechanisms for sending values between places in code (running on any combination of normal threads or coroutines).
All channels must have their construct()
function called OR assigned their internals from another channel before they can function (a new channel with constructed context can be generated with a call to a channel's static make()
procedure). This is because channels are merely interfaces to shared data internals.
mce::unbuffered_channel
s communicate data and force both the sender and recipient to block until both are ready. This means that the sender or receiver will always block if there is no complementary blocking receiver or sender.
mce::buffered_channel
s communicate data asynchronously. Send operations only block when no room remains in the internal data container. Receive operations only block when no values remain unclaimed in the internal data container.
Both variants use send()
operations to send data into the channel and recv()
operations to retrieve data from the channel.
Warning: It is best to make copies of channels, so the shared internals do not unexpectedly go out of scope.
Terminal output:
As a note, all channels support the function void close()
, which will cause all current and future communication operations (such as send()
and recv()
) to fail. It will also unblock coroutines and threads using the channel.
mce::unbuffered_channel
s and mce::buffered_channel
s also have other functions such as try_recv()
in addition to their standard methods. See data communication summary for more information.
The mce::chan
object is a special wrapper object which can represent any channel which implements mce:base_channel
, such as an mce::unbuffered_channel
and mce::buffered_channel
. Because of this, it only directly gives access to API provided by (or implementable with said API) mce::base_channel
.
If you don't want subsequent functions to bother about what kind of channel they are using or simply want more readable code use mce::chan
.
Terminal output:
All channels provide iterator support with begin()
and end()
, allowing usage in range-for loops. This behavior is very useful because channel iterators are == end()
when the channel is closed, meaning you don't have to watch for operations to begin failing.
Iterators generated from parent type base_channel<TYPE>
are base_channel<TYPE>::iterator
. This pattern follows for the other channel types as well (IE unbuffered_channel<TYPE>
has unbuffered_channel<TYPE>::iterator
).
Should print:
Combining the mce::concurrent
function with channels allows trivial implementation of futures (where the channel is analogous to a std::future
object provided when std::async()
is called):
Terminal output:
Coroutines that detect they are blocked will suspend execution by communicating their blocked state and yielding to their calling context. Normally the calling context within this framework is a mce::scheduler
object which is responsible for efficiently executing the next coroutine.
Unit testing shows that coroutine yield context switching is significantly faster than OS driven condition synchronization. Assuming that useful work is being done in at least one scheduler the overhead cost for this operation is minimal.
This example can be run alongside a process observation task (like taskmanager on windows or top on linux) to see that the process is not using cpu:
A consequence of true blocking behavior is that by definition it blocks the entire thread. In the case of a thread running multiple coroutines this will therefore block all other coroutines from running, causing extreme delay (or deadlock if any coroutines require interaction from another coroutine on the blocked thread).
This is a common pain point for tasks that need to do blocking operations, that is, to interact with the operating system or other processes in a way that causes the caller to block for an indefinite amount of time.
Instead, await()
blocks a coroutine (in a way that allows other coroutines to execute in the meantime) and to execute the function passed to await()
on another (non-threadpool) thread running a scheduler without blocking other coroutines.
Here is a working example:
Terminal output:
Any function or lambda passed to await()
can reference local variables safely. This is because local variables exist in a context which is guaranteed to be blocked while the function passed to await()
is running.
Terminal output:
The mce
library provides is an object called mce::timer_service
in timer.hpp
(see "timer service" in Summary of Concurrent Operations for more information on this object), which can scheduler timers. This object requires its own system thread to run on. Unless disabled by compiler define MCE_DISABLE_DEFAULT_TIMER_THREADS, this frameworks provides a default mce::timer_service
running on a background worker thread accessible via function std::shared_pointer<mce::timer_service> mce::default_timer_service()
.
It should be noted that the toplevel mce::timer()
operation launch timers whose timeouts occur as a coroutine on the same mce::scheduler
of the system thread the mce::timer()
was called on. This is behavior causes timeouts to be "asynchronous" but they execute on the same thread that launched them, instead of on a dedicated timer_service thread.
To alert coroutines on timeout with a mce::timer_service
the user can use channels:
The above business logic is similar to how mce::sleep()
calls operate.
Throughout this library various functions will take other functions and an optional set of arguments which are internally bound together using std::bind()
. Example functions which have this behavior:
When this happens the ONLY way to pass an argument by reference is to use std::ref
or std::cref
. This limitation is imposed by c++ std::bind
.
Terminal output:
An easier solution is to use lambda reference captures:
Terminal output:
However, it should be noted that modifying by reference on different threads without synchronization is often an error and can cause the program to crash. This can be avoided by coroutines running on the same thread, but it is generally better to avoid the situation entirely and communicate instead through channels.
This project makes extensive use of thunks, also known as nullary lambdas. Lambdas are anonymous functions, which is a complicated way of saying they are functions we can define and manage inside other functions. A thunk is a function which takes no arguments and returns no value. While this sounds useless, lambdas can "capture context", allowing them to take references or make copies of data available in the current scope while they are being constructed:
Terminal output:
Using the ability to capture context we can do some incredible things. As a usability improvement mce provides some helper types and functions in "thunk.hpp" should the user desire to implement features similar to this library.
At the moment, coroutines are created with whatever default stack allocator the boost coroutine2 library provides (which itself is determined by what is available from boost context). A more optimal solution would probably be using "segmented_stack" allocators. However, segmented_stack objects require that boost context be compiled with specific flags, which are not guaranteed to be set.
A potential enhancement might be the ability to initialize threadpools to a specific allocator type, allowing the default_threadpool to be enhanced as available. Of note, I believe the default allocator is a segmented stack allocator when it is available, so the current implementation would use that by default when compiled with the boost context library with segmented stacks enabled.
As it stands, the default behavior for coroutines launched via concurrent() potentially take more memory than necessary.