Tutorial¶
The best way to learn a new library is to learn from examples, let's go through some examples and you'll learn all you want :)
This framework is based on std::execution,
but it's fine if you are not familiar with it, the example is easy enough to understand.
Note: C++26 is not finalized now. Currently we're based on the early implementation of std::execution - nvidia/stdexec,
so you'll see namespace stdexec instead of std::execution in the following examples.
Basic case - turn your class into an actor¶
First let's go through a basic example - create your first actor and call it.
Nearly all of our APIs are async, so let's put everything in a coroutine, then we can easily use co_await to wait for the result non-blockingly.
Here we use std::execution::task, which is the standard coroutine type in std::execution.
#include <cassert>
#include "ex_actor/api.h"
// 0. Assume you have a class, you want to turn it into an actor.
struct Counter {
int Add(int x) { return count += x; }
int count = 0;
};
stdexec::task<void> MainCoroutine() {
/*
1. First, initialize ex_actor runtime.
*/
ex_actor::Init(/*thread_pool_size=*/1);
/*
2. Create an actor, returns an `ActorRef` object.
this object can be copied and passed between actors.
*/
ex_actor::ActorRef actor = co_await ex_actor::Spawn<Counter>();
/*
3. Everything is setup, you can call the actor's method now using `actor_ref.Send`.
This method returns a standard std::execution sender, compatible with everything
in the `std::execution` ecosystem. If you met "unsupported type" compile error: (1)
*/
auto sender = actor.Send<&Counter::Add>(1);
/*
3.1 For local actors, you can try `SendLocal`, which doesn't require the args to be serializable.
*/
auto sender2 = actor.SendLocal<&Counter::Add>(1);
/*
4. The sender is lazy executed. To execute the sender and wait for the result non-blockingly,
use `co_await`. Note that the sender is not copyable, so you need to use `std::move`.
*/
auto res = co_await std::move(sender);
assert(res == 1);
// A shorter way
res = co_await actor.Send<&Counter::Add>(1);
assert(res == 2);
ex_actor::Shutdown();
}
int main() { stdexec::sync_wait(MainCoroutine()); }
-
This method requires your args can be serialized by reflect-cpp, because it can potentially be passed through the network in distributed mode.
reflect-cpp can handle simple structs and common containers automatically, but if your type is non-trivial(e.g. has private fields), you may meet compile errors like "Unsupported type", please refer https://rfl.getml.com/concepts/custom_classes to add a serializer for it.
If you only run ex_actor in a single process, you can use
SendLocal()instead, which doesn't require the args to be serializable, see below.
Create and call an actor inside another actor¶
When you want to create or call an actor inside another actor. You should make the method a coroutine.
The following example shows how to create an actor inside an actor and call it without blocking the scheduler thread.
#include <cassert>
#include "ex_actor/api.h"
class Child {
public:
std::string Ping() {
return "Dad, I'm here!";
}
};
class Father {
public:
// Actor's method can be a sender.
stdexec::task<std::string> SpawnChildAndPing() {
if (child_.IsEmpty()) {
// this line won't block the scheduler thread
child_ = co_await ex_actor::Spawn<Child>();
}
// this line won't either
std::string child_res = co_await child_.Send<&Child::Ping>();
co_return "Where is my child? " + child_res;
}
private:
// ActorRef is default-constructible, you can create an empty ActorRef and set it later.
ex_actor::ActorRef<Child> child_;
};
stdexec::task<void> MainCoroutine() {
// Here we have only one thread in scheduler, but it still can finish the entire work,
// because everything is async, there is no blocking wait
ex_actor::Init(/*thread_pool_size=*/1);
ex_actor::ActorRef<Father> father = co_await ex_actor::Spawn<Father>();
// When the return type of your method is a std::execution sender, we'll automatically
// unwrap the result for you, so you don't need to `co_await` twice.
std::string res = co_await father.Send<&Father::SpawnChildAndPing>();
assert(res == "Where is my child? Dad, I'm here!");
ex_actor::Shutdown();
}
int main() { stdexec::sync_wait(MainCoroutine()); }
Pass ActorRef between actors¶
ActorRef is copyable and can be passed freely between actors.
#include <cassert>
#include <string>
#include "ex_actor/api.h"
class PingWorker {
public:
std::string Ping() { return "Hi"; }
};
class Proxy {
public:
explicit Proxy(ex_actor::ActorRef<PingWorker> actor_ref) : actor_ref_(actor_ref) {}
// Actor's method can be a sender.
stdexec::task<std::string> ProxyPing() {
// This line won't block the scheduler thread.
std::string ping_res = co_await actor_ref_.template Send<&PingWorker::Ping>();
co_return ping_res + " from Proxy";
}
private:
ex_actor::ActorRef<PingWorker> actor_ref_;
};
stdexec::task<void> MainCoroutine() {
ex_actor::Init(/*thread_pool_size=*/1);
ex_actor::ActorRef ping_worker = co_await ex_actor::Spawn<PingWorker>();
// 1. create a proxy actor, who has a reference to the ping_worker actor
ex_actor::ActorRef proxy = co_await ex_actor::Spawn<Proxy>(ping_worker);
// 2. call through the proxy actor.
std::string res = co_await proxy.Send<&Proxy::ProxyPing>();
assert(res == "Hi from Proxy");
ex_actor::Shutdown();
}
int main() { stdexec::sync_wait(MainCoroutine()); }
Trigger senders without waiting for the result immediately¶
You can use simple_counting_scope + spawn/spawn_future to trigger a task without waiting for the result immediately.
#include <cassert>
#include "ex_actor/api.h"
struct Counter {
int AddAndGet(int x) { return count += x; }
int count = 0;
};
stdexec::task<void> MainCoroutine() {
ex_actor::Init(/*thread_pool_size=*/1);
ex_actor::ActorRef actor = co_await ex_actor::Spawn<Counter>();
stdexec::simple_counting_scope scope;
// `spawn` requires the sender to be void and exception-free, use `ex_actor::DiscardResult()`
// to drop the result and swallow errors.
stdexec::spawn(actor.Send<&Counter::AddAndGet>(1) | ex_actor::DiscardResult(), scope.get_token());
// If you care about the result, use `stdexec::spawn_future` to get a future-like object.
auto future = stdexec::spawn_future(actor.Send<&Counter::AddAndGet>(1), scope.get_token());
int res = co_await std::move(future);
// actor executes messages in serial, so the result must be 2.
assert(res == 2);
// you must wait for all tasks to complete before destroying the scope,
// or std::terminate will be called.
co_await scope.join();
ex_actor::Shutdown();
}
int main() { stdexec::sync_wait(MainCoroutine()); }
Execute multiple senders in parallel¶
You can execute multiple senders in parallel using when_all or simple_counting_scope + spawn/spawn_future in std::execution.
#include <cassert>
#include "ex_actor/api.h"
struct Counter {
int AddAndGet(int x) { return count += x; }
void Add(int x) { count += x; }
int GetValue() const { return count; }
int count = 0;
};
stdexec::task<void> MainCoroutine() {
ex_actor::Init(/*thread_pool_size=*/3);
// create multiple counters, you want to increase them in parallel
std::vector<ex_actor::ActorRef<Counter>> counters;
for (int i = 0; i < 3; ++i) {
counters.push_back(co_await ex_actor::Spawn<Counter>());
}
// 1. `when_all` example, handy for small number of senders.
auto [res1, res2, res3] = co_await stdexec::when_all(
counters[0].Send<&Counter::AddAndGet>(1),
counters[1].Send<&Counter::AddAndGet>(1),
counters[2].Send<&Counter::AddAndGet>(1)
);
assert(res1 == 1);
assert(res2 == 1);
assert(res3 == 1);
// 2. for large number of senders where you need a loop, use `spawn`/`spawn_future` + `simple_counting_scope`.
stdexec::simple_counting_scope scope;
// 2.1 `spawn` example. Wrap with ex_actor::DiscardResult() to drop the result and errors.
for (int i = 0; i < counters.size(); ++i) {
stdexec::spawn(counters[i].Send<&Counter::Add>(1) | ex_actor::DiscardResult(), scope.get_token());
}
co_await scope.join();
for (int i = 0; i < counters.size(); ++i) {
int value = co_await counters[i].Send<&Counter::GetValue>();
assert(value == 2);
}
// 2.2 `spawn_future` example, which returns a future-like object which you can wait for later.
stdexec::simple_counting_scope scope2;
using FutureType = decltype(stdexec::spawn_future(counters[0].Send<&Counter::AddAndGet>(1), scope2.get_token()));
std::vector<FutureType> futures;
for (int i = 0; i < counters.size(); ++i) {
auto future = stdexec::spawn_future(counters[i].Send<&Counter::AddAndGet>(1), scope2.get_token());
futures.push_back(std::move(future));
}
for (int i = 0; i < futures.size(); ++i) {
int value = co_await std::move(futures[i]);
assert(value == 3);
}
co_await scope2.join();
ex_actor::Shutdown();
}
int main() { stdexec::sync_wait(MainCoroutine()); }
[Optional Read] Wrap the result using plain sender adapter¶
We recommend you to use std::execution::task coroutine to wrap the result, which is easier and more readable.
But if you insist on using plain sender adapter for some reason, be cautious that you'll lose the scheduler affinity std::execution::task provided,
the then callback will run on the actor's thread, do not capture local variable's reference, or this pointer of an actor instance in the callback.
#include <cassert>
#include "ex_actor/api.h"
struct Counter {
int Add(int x) { return count += x; }
int count = 0;
};
int main() {
ex_actor::Init(/*thread_pool_size=*/2);
auto [actor] = stdexec::sync_wait(ex_actor::Spawn<Counter>()).value();
// Sender adapter style
int local_var = 1;
auto task1 = actor.Send<&Counter::Add>(1) | stdexec::then([&local_var](int value) {
// this line will be executed on the actor's thread.
// local_var will have data race
local_var++;
return value + 1;
});
// data race
local_var++;
auto [res1] = stdexec::sync_wait(std::move(task1)).value();
assert(res1 == 2);
ex_actor::Shutdown();
}
In the above example, the then callback runs on the actor's thread. If you capture local variables in the then callback, it will cause a data race.
A more dangerous example is capturing this in actor's method.
#include <cassert>
#include <iostream>
#include "ex_actor/api.h"
struct DummyActor {
void Ping() {}
};
class Proxy {
public:
explicit Proxy(ex_actor::ActorRef<DummyActor> actor_ref) : actor_ref_(actor_ref) {}
stdexec::sender auto SomeMethod() {
// DO NOT capture `this` in the callback, it will cause data race
return actor_ref_.template Send<&DummyActor::Ping>() | stdexec::then([this]() {
// this line will be executed on the DummyActor's thread.
// the next line will have data race
actor_member_var++;
});
}
void AnotherMethod() {
// the other side of the data race
actor_member_var++;
}
private:
int actor_member_var = 0;
ex_actor::ActorRef<DummyActor> actor_ref_;
};
stdexec::task<void> MainCoroutine() {
ex_actor::Init(/*thread_pool_size=*/2);
ex_actor::ActorRef dummy_actor = co_await ex_actor::Spawn<DummyActor>();
// 2. create a proxy actor, who has a reference to the dummy actor
ex_actor::ActorRef proxy = co_await ex_actor::Spawn<Proxy>(dummy_actor);
// 3. call through the proxy actor
stdexec::simple_counting_scope scope;
stdexec::spawn(proxy.Send<&Proxy::SomeMethod>() | ex_actor::DiscardResult(), scope.get_token());
stdexec::spawn(proxy.Send<&Proxy::AnotherMethod>() | ex_actor::DiscardResult(), scope.get_token());
co_await scope.join();
ex_actor::Shutdown();
}
int main() { stdexec::sync_wait(MainCoroutine()); }
Understanding the scheduler affinity of std::execution::task¶
To understand why the callback of then runs on the target actor's thread, while in coroutine it runs on the caller's thread, you need to know the scheduler switching mechanism in std::execution.
In std::execution, scheduler's switch should be explicit - by calling continue_on explicitly.
An actor itself is a scheduler (not the scheduler inside the runtime which used to shceudle the actor, but actor itself), when you call its method, you schedule a task on it. So all the callbacks will run on the actor's thread.
But in a coroutine, the code looks like they are executing in the same thread.
So in order not to confuse the user, make coroutine easy to use, std::execution::task has scheduler affinity - it will keep the scheduler the same across the entire coroutine.
In other words, after any co_await in the coroutine, std::execution::task will help you to switch back to the coroutine's scheduler.
(See std::execution::task's proposal for more details).