Skip to content

Distributed Mode

Note

This feature is currently experimental. While it's fully functional and ready for use, it's not extensively tested in production yet. Bugs, performance issues and API changes should be expected. Welcome to have a try and build together with us!

Distributed mode enables you to create actors at remote nodes. When calling a remote actor, all arguments will be serialized and sent to the remote node through network, after execution the return value will be deserialized and sent back to the caller.

Example

#include <algorithm>
#include <cassert>
#include <iostream>
#include <string>
#include "ex_actor/api.h"

class PingWorker {
 public:
  explicit PingWorker(std::string name) : name_(std::move(name)) {}
  static PingWorker Create(std::string name) { return PingWorker(std::move(name)); }
  std::string Ping(const std::string& message) { return "ack from " + name_ + ", msg got: " + message; }
 private:
  std::string name_;
};

// 1. Register the class & methods using EXA_REMOTE macro.
// The first argument is a function to create your class,
// the rest are methods you want to call remotely.
// Why need such boilerplate? (1) Note about inheritance: (2)
EXA_REMOTE(&PingWorker::Create, &PingWorker::Ping); 

stdexec::task<void> MainCoroutine(int argc, char** argv) {
  std::string listen_address = argv[1];
  std::string node_name = argv[2];
  std::string peer_node_name = argv[3];
  std::string contact_address = (argc > 4) ? argv[4] : "";

  ex_actor::Init(/*thread_pool_size=*/4);
  // 2. start or join a cluster.
  co_await ex_actor::StartOrJoinCluster(ex_actor::ClusterConfig {
      // The public address other nodes use to connect to you,
      // we'll open a listening port at this address.
      .listen_address = listen_address,
      // If you're the first node of the cluster, leave this empty.
      // Otherwise, set to any node in the cluster to join.
      .contact_node_address = contact_address,
      // Optional friendly name for this node.
      .node_name = node_name,
  });

  // 3. Wait until the peer we care about has joined the cluster.
  auto [cluster_state, condition_met] = co_await ex_actor::WaitClusterState(
      /*predicate=*/[&](const ex_actor::ClusterState& state) {
        return std::ranges::any_of(state.nodes,
            [&](const auto& n) { return n.node_name == peer_node_name; });
      },
      /*timeout_ms=*/5000);
  assert(condition_met);

  // 4. Look up the peer node by its user-defined name.
  auto it = std::ranges::find_if(cluster_state.nodes,
      [&](const auto& n) { return n.node_name == peer_node_name; });
  auto remote_node_id = it->node_id;

  // 5. Create actor at remote node and play!
  auto ping_worker = co_await
      ex_actor::Spawn<&PingWorker::Create>(/*name=*/"Alice") // (3)
      .ToNode(remote_node_id);
  std::string ping_res = co_await ping_worker.Send<&PingWorker::Ping>("hello");
  assert(ping_res == "ack from Alice, msg got: hello");
  std::cout << "All work done" << std::endl;


  // helper function to wait for OS exit signal(like CTRL+C or kill) before shutting down, otherwise the process
  // might exit earlier than the peer node finishes its work, causing error in the peer node.
  co_await ex_actor::WaitOsExitSignal();
  ex_actor::Shutdown();
}

int main(int argc, char** argv) { stdexec::sync_wait(MainCoroutine(argc, argv)); }
  1. In C++20, there is no way to get your class's methods at compile-time, so we need you to list them manually. We'll eliminate it in C++26 with reflection, stay tuned!

  2. If you want to call a virtual method through base class, you need to register the base class's methods as well, like EXA_REMOTE(&Derived::Create, &Base::Foo, &Derived::Foo);

  3. Note here in Spawn<>, you should provide the factory function pointer (the first argument of EXA_REMOTE) as the template argument. This differs from local actors, where you would pass the class type itself.

Compile this program into a binary, let's say distributed_node.

# usage: `./distributed_node <listen_address> <node_name> <peer_node_name> [contact_address]`

# in one shell
./distributed_node tcp://127.0.0.1:5301 node0 node1

# in another shell
./distributed_node tcp://127.0.0.1:5302 node1 node0 tcp://127.0.0.1:5301

Both processes should print "All work done" log.

The process will block on ex_actor::WaitOsExitSignal(). You should kill them manually by CTRL+C or kill command.

Fault tolerance

When a node can't be reached by any node in the cluster, it's considered dead, all in-flight remote calls will throw ex_actor::NetworkError, by catching this exception you can handle the failure gracefully. (ex_actor::ConnectionLost is a type alias kept for backward compatibility.)

try {
  co_await ref.Send<&YourClass::Method>();
} catch (const ex_actor::NetworkError& e) {
  // e.remote_node_id tells you which node was lost
  // e.what() has a human-readable message
}

For example, now a node dies, you want to recreate your actor to another node:

bool connection_lost = false;
try {
  co_await ref.Send<&YourClass::Method>();
} catch (const ex_actor::NetworkError& e) {
  connection_lost = true;
}

if (connection_lost) {
  std::cout << "I need a new node to join the cluster!" << std::endl;
  // 1. launch a new node, either by starting a process manually,
  // or using outer system's API if you are using k8s or slurm etc.

  // 2. wait for the new node to be ready.
  co_await ex_actor::WaitClusterState(...);

  // 3. recreate your actor to the new node. The original actor's state
  // is lost forever, it's your responsibility to handle the state recovery.
  auto new_ref = co_await ex_actor::Spawn<&YourClass::Create>().ToNode(new_node_id);
}

Architecture details

Serialization

We choose reflect-cpp as the serialization library. It is a reflection-based C++20 serialization library, which can serialize basic structs automatically, avoiding a lot of boilerplate code.

As for the protocol, since we have a fixed schema by nature(all nodes use the same binary, so the types are the same across all nodes), we can take advantage of a schemafull protocol. For this reason, we choose Cap'n Proto from reflect-cpp's supported protocols.

Network

We choose ZeroMQ, it's a well-known and sophisticated message passing library.

The node states are synchronized via gossip protocol, no centralized coordination node.