C++ synchronisations for SimGrid
Published:
Updated:
This is an overview of some recent additions to the SimGrid code related to actor synchronisation. It might be interesting for people using SimGrid, working on SimGrid or for people interested in generic C++ code for synchronisation or asynchronicity.
Table of content
SimGrid as a Discrete Event Simulator
SimGrid is a discrete event simulator of distributed systems: it does not simulate the world by small fixed-size steps but determines the date of the next event (such as the end of a communication, the end of a computation) and jumps to this date.
A number of actors executing user-provided code run on top of the simulation kernel[1]. When an actor needs to interact with the simulation kernel (eg. to start a communication), it issues a simcall (simulation call, an analogy to system calls) to the simulation kernel. This freezes the actor until it is woken up by the simulation kernel (eg. when the communication is finished).
The key ideas here are:
- The simulator is a discrete event simulator (event-driven).
- An actor can issue a blocking simcall and will be suspended until it is woken up by the simulation kernel (when the operation is completed).
- In order to move forward in (simulated) time, the simulation kernel needs to know which actions the actors want to do.
- As a consequence, the simulated time can only move forward when all the actors are waiting on a simcall.
- An actor cannot synchronise with another actor using OS-level primitives
such as
pthread_mutex_lock()
orstd::mutex
. The simulation kernel would wait for the actor to issue a simcall and would deadlock. Instead it must use simulation-level synchronisation primitives (such assimcall_mutex_lock()
). - Similarly, it cannot sleep using
std::this_thread::sleep_for()
which waits in the real world but must instead wait in the simulation withsimcall_process_sleep()
which waits in the simulation. - The simulation kernel cannot block. Only the actors can block (using simulation primitives).
Futures
What is a future?
We need a generic way to represent asynchronous operations in the simulation kernel. Futures are a nice abstraction for this which have been added to a lot languages (Java, Python, C++ since C++11, ECMAScript, etc.)[2].
A future represents the result of an asynchronous operation. As the operation may not be completed yet, its result is not available yet. Two different sort of APIs may be available to expose this future result:
- a blocking API where we wait for the result to be available
(
res = f.get()
); - a continuation-based API[3] where we say what should be
done with the result when the operation completes
(
future.then(something_to_do_with_the_result)
).
C++11 includes a generic class (std::future<T>
) which implements a blocking API.
The continuation-based API
is not available in the standard (yet) but is described in the
Concurrency Technical
Specification.
Which future do we need?
We might want to use a solution based on std::future
but our need is slightly
different from the C++11 futures. C++11 futures are not suitable for usage inside
the simulation kernel because they are only providing a blocking API
(future.get()
) whereas the simulation kernel cannot block.
Instead, we need a continuation-based API to be used in our event-driven
simulation kernel.
The C++ Concurrency TS describes a continuation-based API. Our future are based on this with a few differences[4]:
- The simulation kernel is single-threaded so we do not need thread synchronisation for out futures.
- As the simulation kernel cannot block,
f.wait()
and its variants are not meaningful in this context. - Similarly,
future.get()
does an implicit wait. Calling this method in the simulation kernel only makes sense if the future is already ready. If the future is not ready, this would deadlock the simulator and an error is raised instead. - We always call the continuations in the simulation loop (and not
inside the
future.then()
orpromise.set_value()
calls)[5].
Implementing Future
The implementation of future is in simgrid::kernel::Future
and
simgrid::kernel::Promise
[6] and is based on the Concurrency
TS[7]:
The future and the associated promise use a shared state defined with:
enum class FutureStatus {
not_ready,
ready,
done,
};
class FutureStateBase : private boost::noncopyable {
public:
void schedule(simgrid::xbt::Task<void()>&& job);
void set_exception(std::exception_ptr exception);
void set_continuation(simgrid::xbt::Task<void()>&& continuation);
FutureStatus get_status() const;
bool is_ready() const;
// [...]
private:
FutureStatus status_ = FutureStatus::not_ready;
std::exception_ptr exception_;
simgrid::xbt::Task<void()> continuation_;
};
template<class T>
class FutureState : public FutureStateBase {
public:
void set_value(T value);
T get();
private:
boost::optional<T> value_;
};
template<class T>
class FutureState<T&> : public FutureStateBase {
// ...
};
template<>
class FutureState<void> : public FutureStateBase {
// ...
};
Both Future
and Promise
have a reference to the shared state:
template<class T>
class Future {
// [...]
private:
std::shared_ptr<FutureState<T>> state_;
};
template<class T>
class Promise {
// [...]
private:
std::shared_ptr<FutureState<T>> state_;
bool future_get_ = false;
};
The crux of future.then()
is:
template<class T>
template<class F>
auto simgrid::kernel::Future<T>::thenNoUnwrap(F continuation)
-> Future<decltype(continuation(std::move(*this)))>
{
typedef decltype(continuation(std::move(*this))) R;
if (state_ == nullptr)
throw std::future_error(std::future_errc::no_state);
auto state = std::move(state_);
// Create a new future...
Promise<R> promise;
Future<R> future = promise.get_future();
// ...and when the current future is ready...
state->set_continuation(simgrid::xbt::makeTask(
[](Promise<R> promise, std::shared_ptr<FutureState<T>> state,
F continuation) {
// ...set the new future value by running the continuation.
Future<T> future(std::move(state));
simgrid::xbt::fulfillPromise(promise,[&]{
return continuation(std::move(future));
});
},
std::move(promise), state, std::move(continuation)));
return std::move(future);
}
We added a (much simpler) future.then_()
method which does not
create a new future:
template<class T>
template<class F>
void simgrid::kernel::Future<T>::then_(F continuation)
{
if (state_ == nullptr)
throw std::future_error(std::future_errc::no_state);
// Give shared-ownership to the continuation:
auto state = std::move(state_);
state->set_continuation(simgrid::xbt::makeTask(
std::move(continuation), state));
}
The .get()
delegates to the shared state. As we mentioned previously, an
error is raised if the future is not ready:
template<class T>
T simgrid::kernel::Future::get()
{
if (state_ == nullptr)
throw std::future_error(std::future_errc::no_state);
std::shared_ptr<FutureState<T>> state = std::move(state_);
return state->get();
}
template<class T>
T simgrid::kernel::FutureState<T>::get()
{
if (status_ != FutureStatus::ready)
xbt_die("Deadlock: this future is not ready");
status_ = FutureStatus::done;
if (exception_) {
std::exception_ptr exception = std::move(exception_);
exception_ = nullptr;
std::rethrow_exception(std::move(exception));
}
xbt_assert(this->value_);
auto result = std::move(this->value_.get());
this->value_ = boost::optional<T>();
return std::move(result);
}
Generic simcalls
Motivation
Simcalls are not so easy to understand and adding a new one is not so easy either. In order to add one simcall, one has to first add it to the list of simcalls which looks like this:
# This looks like C++ but it is a basic IDL-like language
# (one definition per line) parsed by a python script:
void process_kill(smx_process_t process);
void process_killall(int reset_pid);
void process_cleanup(smx_process_t process) [[nohandler]];
void process_suspend(smx_process_t process) [[block]];
void process_resume(smx_process_t process);
void process_set_host(smx_process_t process, sg_host_t dest);
int process_is_suspended(smx_process_t process) [[nohandler]];
int process_join(smx_process_t process, double timeout) [[block]];
int process_sleep(double duration) [[block]];
smx_mutex_t mutex_init();
void mutex_lock(smx_mutex_t mutex) [[block]];
int mutex_trylock(smx_mutex_t mutex);
void mutex_unlock(smx_mutex_t mutex);
[...]
At runtime, a simcall is represented by a structure containing a simcall number and its arguments (among some other things):
struct s_smx_simcall {
// Simcall number:
e_smx_simcall_t call;
// Issuing actor:
smx_process_t issuer;
// Arguments of the simcall:
union u_smx_scalar args[11];
// Result of the simcall:
union u_smx_scalar result;
// Some additional stuff:
smx_timer_t timer;
int mc_value;
};
with the a scalar union type:
union u_smx_scalar {
char c;
short s;
int i;
long l;
long long ll;
unsigned char uc;
unsigned short us;
unsigned int ui;
unsigned long ul;
unsigned long long ull;
double d;
void* dp;
FPtr fp;
};
Then one has to call (manually 😢) a Python script which generates a bunch of C++ files:
- an enum of all the simcall numbers;
- user-side wrappers
responsible for wrapping the parameters in the
struct s_smx_simcall
; and wrapping out the result; - accessors
to get/set values of of
struct s_smx_simcall
; - a simulation-kernel-side big switch handling all the simcall numbers.
Then one has to write the code of the kernel side handler for the simcall and the code of the simcall itself (which calls the code-generated marshaling/unmarshaling stuff).
In order to simplify this process, we added two generic simcalls which can be used to execute a function in the simulation kernel context:
# This one should really be called run_immediate:
void run_kernel(std::function<void()> const* code) [[nohandler]];
void run_blocking(std::function<void()> const* code) [[block,nohandler]];
Immediate simcall
The first one (simcall_run_kernel()
) executes a function in the simulation
kernel context and returns immediately (without blocking the actor):
void simcall_run_kernel(std::function<void()> const& code)
{
simcall_BODY_run_kernel(&code);
}
template<class F> inline
void simcall_run_kernel(F& f)
{
simcall_run_kernel(std::function<void()>(std::ref(f)));
}
On top of this, we add a wrapper which can be used to return a value of any type and properly handles exceptions:
template<class F>
typename std::result_of<F()>::type kernelImmediate(F&& code)
{
// If we are in the simulation kernel, we take the fast path and
// execute the code directly without simcall
// marshalling/unmarshalling/dispatch:
if (SIMIX_is_maestro())
return std::forward<F>(code)();
// If we are in the application, pass the code to the simulation
// kernel which executes it for us and reports the result:
typedef typename std::result_of<F()>::type R;
simgrid::xbt::Result<R> result;
simcall_run_kernel([&]{
xbt_assert(SIMIX_is_maestro(), "Not in maestro");
simgrid::xbt::fulfillPromise(result, std::forward<F>(code));
});
return result.get();
}
where Result<R>
can store either a R
or an exception.
Example of usage:
xbt_dict_t Host::properties() {
return simgrid::simix::kernelImmediate([&] {
simgrid::surf::HostImpl* surf_host =
this->extension<simgrid::surf::HostImpl>();
return surf_host->getProperties();
});
}
In this example, the kernelImmediate()
call is not in user code but
in the framework code. We do not expect the normal user to write
simulator kernel code. Those mechanisms are intended to be used by
the implementer of the framework in order to implement user
primitives.
Blocking simcall
The second generic simcall (simcall_run_blocking()
) executes a function in
the SimGrid simulation kernel immediately but does not wake up the calling actor
immediately:
void simcall_run_blocking(std::function<void()> const& code);
template<class F>
void simcall_run_blocking(F& f)
{
simcall_run_blocking(std::function<void()>(std::ref(f)));
}
The f
function is expected to setup some callbacks in the simulation
kernel which will wake up the actor (with
simgrid::simix::unblock(actor)
) when the operation is completed.
This is wrapped in a higher-level primitive as well. The
kernelSync()
function expects a function-object which is executed
immediately in the simulation kernel and returns a Future<T>
. The
simulator blocks the actor and resumes it when the Future<T>
becomes
ready with its result:
template<class F>
auto kernelSync(F code) -> decltype(code().get())
{
typedef decltype(code().get()) T;
if (SIMIX_is_maestro())
xbt_die("Can't execute blocking call in kernel mode");
smx_process_t self = SIMIX_process_self();
simgrid::xbt::Result<T> result;
simcall_run_blocking([&result, self, &code]{
try {
auto future = code();
future.then_([&result, self](simgrid::kernel::Future<T> value) {
// Propagate the result from the future
// to the simgrid::xbt::Result:
simgrid::xbt::setPromise(result, value);
simgrid::simix::unblock(self);
});
}
catch (...) {
// The code failed immediately. We can wake up the actor
// immediately with the exception:
result.set_exception(std::current_exception());
simgrid::simix::unblock(self);
}
});
// Get the result of the operation (which might be an exception):
return result.get();
}
A contrived example of this would be:
int res = simgrid::simix::kernelSync([&] {
return kernel_wait_until(30).then(
[](simgrid::kernel::Future<void> future) {
return 42;
}
);
});
A more realistic example (implementing user-level primitives) would be:
sg_size_t File::read(sg_size_t size)
{
return simgrid::simix::kernelSync([&] {
return file_->async_read(size);
});
}
Asynchronous operations
We can write the related kernelAsync()
which wakes up the actor immediately
and returns a future to the actor. As this future is used in the actor context,
it is a different future
(simgrid::simix::Future
instead of simgrid::kernel::Future
)
which implements a C++11 std::future
wait-based API:
template <class T>
class Future {
public:
Future() {}
Future(simgrid::kernel::Future<T> future) : future_(std::move(future)) {}
bool valid() const { return future_.valid(); }
T get();
bool is_ready() const;
void wait();
private:
// We wrap an event-based kernel future:
simgrid::kernel::Future<T> future_;
};
The future.get()
method is implemented as[8]:
template<class T>
T simgrid::simix::Future<T>::get()
{
if (!valid())
throw std::future_error(std::future_errc::no_state);
smx_process_t self = SIMIX_process_self();
simgrid::xbt::Result<T> result;
simcall_run_blocking([this, &result, self]{
try {
// When the kernel future is ready...
this->future_.then_(
[this, &result, self](simgrid::kernel::Future<T> value) {
// ... wake up the process with the result of the kernel future.
simgrid::xbt::setPromise(result, value);
simgrid::simix::unblock(self);
});
}
catch (...) {
result.set_exception(std::current_exception());
simgrid::simix::unblock(self);
}
});
return result.get();
}
kernelAsync()
simply 😉 calls kernelImmediate()
and wraps the
simgrid::kernel::Future
into a simgrid::simix::Future
:
template<class F>
auto kernelAsync(F code)
-> Future<decltype(code().get())>
{
typedef decltype(code().get()) T;
// Execute the code in the simulation kernel and get the kernel future:
simgrid::kernel::Future<T> future =
simgrid::simix::kernelImmediate(std::move(code));
// Wrap the kernel future in a user future:
return simgrid::simix::Future<T>(std::move(future));
}
A contrived example of this would be:
simgrid::simix::Future<int> future = simgrid::simix::kernelSync([&] {
return kernel_wait_until(30).then(
[](simgrid::kernel::Future<void> future) {
return 42;
}
);
});
do_some_stuff();
int res = future.get();
A more realistic example (implementing user-level primitives) would be:
simgrid::simix::Future<sg_size_t> File::async_read(sg_size_t size)
{
return simgrid::simix::kernelAsync([&] {
return file_->async_read(size);
});
}
kernelSync()
could be rewritten as:
template<class F>
auto kernelSync(F code) -> decltype(code().get())
{
return kernelAsync(std::move(code)).get();
}
The semantic is equivalent but this form would require two simcalls
instead of one to do the same job (one in kernelAsync()
and one in
.get()
).
Representing the simulated time
SimGrid uses double
for representing the simulated time:
- durations are expressed in seconds;
- timepoints are expressed as seconds from the beginning of the simulation.
In contrast, all the C++ APIs use std::chrono::duration
and
std::chrono::time_point
. They are used in:
std::this_thread::wait_for()
andstd::this_thread::wait_until()
;future.wait_for()
andfuture.wait_until()
;condvar.wait_for()
andcondvar.wait_until()
.
We can define future.wait_for(duration)
and future.wait_until(timepoint)
for our futures but for better compatibility with standard C++ code, we might
want to define versions expecting std::chrono::duration
and
std::chrono::time_point
.
For time points, we need to define a clock (which meets the
TrivialClock
requirements, see
[time.clock.req]
working in the simulated time in the C++14 standard):
struct SimulationClock {
using rep = double;
using period = std::ratio<1>;
using duration = std::chrono::duration<rep, period>;
using time_point = std::chrono::time_point<SimulationClock, duration>;
static constexpr bool is_steady = true;
static time_point now()
{
return time_point(duration(SIMIX_get_clock()));
}
};
A time point in the simulation is a time point using this clock:
template<class Duration>
using SimulationTimePoint =
std::chrono::time_point<SimulationClock, Duration>;
This is used for example in simgrid::s4u::this_actor::sleep_for()
and
simgrid::s4u::this_actor::sleep_until()
:
void sleep_for(double duration)
{
if (duration > 0)
simcall_process_sleep(duration);
}
void sleep_until(double timeout)
{
double now = SIMIX_get_clock();
if (timeout > now)
simcall_process_sleep(timeout - now);
}
template<class Rep, class Period>
void sleep_for(std::chrono::duration<Rep, Period> duration)
{
auto seconds =
std::chrono::duration_cast<SimulationClockDuration>(duration);
this_actor::sleep_for(seconds.count());
}
template<class Duration>
void sleep_until(const SimulationTimePoint<Duration>& timeout_time)
{
auto timeout_native =
std::chrono::time_point_cast<SimulationClockDuration>(timeout_time);
this_actor::sleep_until(timeout_native.time_since_epoch().count());
}
Which means it is possible to use (since C++14):
using namespace std::chrono_literals;
simgrid::s4u::actor::sleep_for(42s);
Mutexes and condition variables
Mutexes
SimGrid has had a C-based API for mutexes and condition variables for
some time. These mutexes are different from the standard
system-level mutex (std::mutex
, pthread_mutex_t
, etc.) because
they work at simulation-level. Locking on a simulation mutex does
not block the thread directly but makes a simcall
(simcall_mutex_lock()
) which asks the simulation kernel to wake the calling
actor when it can get ownership of the mutex. Blocking directly at the
OS level would deadlock the simulation.
Reusing the C++ standard API for our simulation mutexes has many benefits:
- it makes it easier for people familiar with the
std::mutex
to understand and use SimGrid mutexes; - we can benefit from a proven API;
- we can reuse from generic library code in SimGrid.
We defined a reference-counted Mutex
class for this (which supports
the Lockable
requirements, see
[thread.req.lockable.req]
in the C++14 standard):
class Mutex {
friend ConditionVariable;
private:
friend simgrid::simix::Mutex;
simgrid::simix::Mutex* mutex_;
Mutex(simgrid::simix::Mutex* mutex) : mutex_(mutex) {}
public:
friend void intrusive_ptr_add_ref(Mutex* mutex);
friend void intrusive_ptr_release(Mutex* mutex);
using Ptr = boost::intrusive_ptr<Mutex>;
// No copy:
Mutex(Mutex const&) = delete;
Mutex& operator=(Mutex const&) = delete;
static Ptr createMutex();
public:
void lock();
void unlock();
bool try_lock();
};
The methods are simply wrappers around existing simcalls:
void Mutex::lock()
{
simcall_mutex_lock(mutex_);
}
Using the same API as std::mutex
(Lockable
) means we can use existing
C++-standard code such as std::unique_lock<Mutex>
or
std::lock_guard<Mutex>
for exception-safe mutex handling[9]:
{
std::lock_guard<simgrid::s4u::Mutex> lock(*mutex);
sum += 1;
}
Condition Variables
Similarly SimGrid already had simulation-level condition variables
which can be exposed using the same API as std::condition_variable
:
class ConditionVariable {
private:
friend s_smx_cond;
smx_cond_t cond_;
ConditionVariable(smx_cond_t cond) : cond_(cond) {}
public:
ConditionVariable(ConditionVariable const&) = delete;
ConditionVariable& operator=(ConditionVariable const&) = delete;
friend void intrusive_ptr_add_ref(ConditionVariable* cond);
friend void intrusive_ptr_release(ConditionVariable* cond);
using Ptr = boost::intrusive_ptr<ConditionVariable>;
static Ptr createConditionVariable();
void wait(std::unique_lock<Mutex>& lock);
template<class P>
void wait(std::unique_lock<Mutex>& lock, P pred);
// Wait functions taking a plain double as time:
std::cv_status wait_until(std::unique_lock<Mutex>& lock,
double timeout_time);
std::cv_status wait_for(
std::unique_lock<Mutex>& lock, double duration);
template<class P>
bool wait_until(std::unique_lock<Mutex>& lock,
double timeout_time, P pred);
template<class P>
bool wait_for(std::unique_lock<Mutex>& lock,
double duration, P pred);
// Wait functions taking a std::chrono time:
template<class Rep, class Period, class P>
bool wait_for(std::unique_lock<Mutex>& lock,
std::chrono::duration<Rep, Period> duration, P pred);
template<class Rep, class Period>
std::cv_status wait_for(std::unique_lock<Mutex>& lock,
std::chrono::duration<Rep, Period> duration);
template<class Duration>
std::cv_status wait_until(std::unique_lock<Mutex>& lock,
const SimulationTimePoint<Duration>& timeout_time);
template<class Duration, class P>
bool wait_until(std::unique_lock<Mutex>& lock,
const SimulationTimePoint<Duration>& timeout_time, P pred);
// Notify:
void notify_one();
void notify_all();
};
We currently accept both double
(for simplicity and consistency with
the current codebase) and std::chrono
types (for compatibility with
C++ code) as durations and timepoints. One important thing to notice here is
that cond.wait_for()
and cond.wait_until()
work in the simulated time,
not in the real time.
The simple cond.wait()
and cond.wait_for()
delegate to
pre-existing simcalls:
void ConditionVariable::wait(std::unique_lock<Mutex>& lock)
{
simcall_cond_wait(cond_, lock.mutex()->mutex_);
}
std::cv_status ConditionVariable::wait_for(
std::unique_lock<Mutex>& lock, double timeout)
{
// The simcall uses -1 for "any timeout" but we don't want this:
if (timeout < 0)
timeout = 0.0;
try {
simcall_cond_wait_timeout(cond_, lock.mutex()->mutex_, timeout);
return std::cv_status::no_timeout;
}
catch (xbt_ex& e) {
// If the exception was a timeout, we have to take the lock again:
if (e.category == timeout_error) {
try {
lock.mutex()->lock();
return std::cv_status::timeout;
}
catch (...) {
std::terminate();
}
}
std::terminate();
}
catch (...) {
std::terminate();
}
}
Other methods are simple wrappers around those two:
template<class P>
void ConditionVariable::wait(std::unique_lock<Mutex>& lock, P pred)
{
while (!pred())
wait(lock);
}
template<class P>
bool ConditionVariable::wait_until(std::unique_lock<Mutex>& lock,
double timeout_time, P pred)
{
while (!pred())
if (this->wait_until(lock, timeout_time) == std::cv_status::timeout)
return pred();
return true;
}
template<class P>
bool ConditionVariable::wait_for(std::unique_lock<Mutex>& lock,
double duration, P pred)
{
return this->wait_until(lock,
SIMIX_get_clock() + duration, std::move(pred));
}
Conclusion
We wrote two future implementations based on the std::future
API:
- the first one is a non-blocking event-based (
future.then(stuff)
) future used inside our (non-blocking event-based) simulation kernel; - the second one is a wait-based (
future.get()
) future used in the actors which waits using a simcall.
These futures are used to implement kernelSync()
and kernelAsync()
which
expose asynchronous operations in the simulation kernel to the actors.
In addition, we wrote variations of some other C++ standard library
classes (SimulationClock
, Mutex
, ConditionVariable
) which work in
the simulation:
- using simulated time;
- using simcalls for synchronisation.
Reusing the same API as the C++ standard library is very useful because:
- we use a proven API with a clearly defined semantic;
- people already familiar with those API can use our own easily;
- users can rely on documentation, examples and tutorials made by other people;
- we can reuse generic code with our types (
std::unique_lock
,std::lock_guard
, etc.).
This type of approach might be useful for other libraries which define
their own contexts. An example of this is
Mordor, a I/O library using fibers
(cooperative scheduling): it implements cooperative/fiber
mutex,
recursive
mutex
which are compatible with the
BasicLockable
requirements (see
[thread.req.lockable.basic]
in the C++14 standard).
Appendix: useful helpers
Result
Result is like a mix of std::future
and std::promise
in a
single-object without shared-state and synchronisation:
template<class T>
class Result {
enum class ResultStatus {
invalid,
value,
exception,
};
public:
Result();
~Result();
Result(Result const& that);
Result& operator=(Result const& that);
Result(Result&& that);
Result& operator=(Result&& that);
bool is_valid() const;
void reset();
void set_exception(std::exception_ptr e);
void set_value(T&& value);
void set_value(T const& value);
T get();
private:
ResultStatus status_ = ResultStatus::invalid;
union {
T value_;
std::exception_ptr exception_;
};
};
Promise helpers
Those helper are useful for dealing with generic future-based code:
template<class R, class F>
auto fulfillPromise(R& promise, F&& code)
-> decltype(promise.set_value(code()))
{
try {
promise.set_value(std::forward<F>(code)());
}
catch(...) {
promise.set_exception(std::current_exception());
}
}
template<class P, class F>
auto fulfillPromise(P& promise, F&& code)
-> decltype(promise.set_value())
{
try {
std::forward<F>(code)();
promise.set_value();
}
catch(...) {
promise.set_exception(std::current_exception());
}
}
template<class P, class F>
void setPromise(P& promise, F&& future)
{
fulfillPromise(promise, [&]{ return std::forward<F>(future).get(); });
}
Task
Task<R(F...)>
is a type-erased callable object similar to
std::function<R(F...)>
but works for move-only types. It is similar to
std::package_task<R(F...)>
but does not wrap the result in a std::future<R>
(it is not packaged).
std::function |
std::packaged_task |
simgrid::xbt::Task |
|
---|---|---|---|
Copyable | Yes | No | No |
Movable | Yes | Yes | Yes |
Call | const |
non-const |
non-const |
Callable | multiple times | once | once |
Sets a promise | No | Yes | No |
It could be implemented as:
template<class T>
class Task {
private:
std::packaged_task<T> task_;
public:
template<class F>
void Task(F f) :
task_(std::forward<F>(f))
{}
template<class... ArgTypes>
auto operator()(ArgTypes... args)
-> decltype(task_.get_future().get())
{
task_(std::forward<ArgTypes)(args)...);
return task_.get_future().get();
}
};
but we don't need a shared-state.
This is useful in order to bind move-only type arguments:
template<class F, class... Args>
class TaskImpl {
private:
F code_;
std::tuple<Args...> args_;
typedef decltype(simgrid::xbt::apply(
std::move(code_), std::move(args_))) result_type;
public:
TaskImpl(F code, std::tuple<Args...> args) :
code_(std::move(code)),
args_(std::move(args))
{}
result_type operator()()
{
// simgrid::xbt::apply is C++17 std::apply:
return simgrid::xbt::apply(std::move(code_), std::move(args_));
}
};
template<class F, class... Args>
auto makeTask(F code, Args... args)
-> Task< decltype(code(std::move(args)...))() >
{
TaskImpl<F, Args...> task(
std::move(code), std::make_tuple(std::move(args)...));
return std::move(task);
}
Upate (2018-08-15): there is a
proposal
for including this as std::unique_function
in the C++ standard.
In addition to the implementations listed in the paper, there is also
folly::Function
or stdlab::task
.
There is a later proposal
for extending std::function
with non-copyable move-only types and one shot call
with eg. std::function<void()&&>
.
Update (2023-07-08):
C++23 features std::move_only_function<R(...)>
which is similar.
In contrast to xbt::Task<R(...)>
,
std::move_only_function<R(...)>
can be called multiple times.
The relationship between the SimGrid simulation kernel and the simulated actors is similar to the relationship between a OS kernel and the OS processes: the simulation kernel manages (schedules) the execution of the actors; the actors make requests to the simulation kernel using simcalls. However, both the simulation kernel and the actors currently run in the same OS process (and use same address space). ↩︎
There is an interesting library implementation in Rust as well. ↩︎
This is the kind of futures that are available in ECMAScript which use the same kind of never-blocking asynchronous model as our discrete event simulator. ↩︎
(which are related to the fact that we are in a non-blocking single-threaded simulation engine) ↩︎
Calling the continuations from simulation loop means that we don't have to fear problems like invariants not being restored when the callbacks are called 😨 or stack overflows triggered by deeply nested continuations chains 😰. The continuations are all called in a nice and predictable place in the simulator with a nice and predictable state 😌. ↩︎
In the C++ standard library,
std::future<T>
is used by the consumer of the result. On the other hand,std::promise<T>
is used by the producer of the result. The consumer callspromise.set_value(42)
orpromise.set_exception(e)
in order to set the result which will be made available to the consumer byfuture.get()
. ↩︎Currently, we did not implement some features such as shared futures. ↩︎
You might want to compare this method with
simgrid::kernel::Future::get()
we showed previously: the method of the kernel future does not block and raises an error if the future is not ready; the method of the actor future blocks after having set a continuation to wake the actor when the future is ready. ↩︎std::lock()
might kinda work too but it may not be such as good idea to use it as it may use adeadlock avoidance algorithm such as try-and-back-off
. A backoff would probably uselessly wait in real time instead of simulated time. The deadlock avoidance algorithm might as well add non-determinism in the simulation which we would like to avoid.std::try_lock()
should be safe to use though. ↩︎