Sorock Guide

Cover Image

Multi-Raft

Raft consensus algorithm is nowadays a widely used building block of distributed applications. The algorithm is about consensus on the sequence of log entries, which are applied to a state machine. When the two log entries are identical, then the resulting state is identical too. In this sense, it is also called replicated state machine.

The typical usage of the Raft algorithm is to implement a distributed key-value store. However, naive implementation will suffer from scalability issues due to the seriality of the algorithm. Let's consider putting (k1,v1) and (k2,v2) in the data store, naive implementation can't handle these two concurrently because the operations are serialized.

Sharding is a common solution to this kind of problem. If the k1 and k2 are enough distant in the key space, then the operations can be handled concurrently by sharded Raft clusters. Since we can split the single Raft cluster into multiple small Raft clusters, it naturally addresses the capacity scalability problem at the same time. TiKV uses this technique (ref).

There are two ways of implementing a multi-raft. One is deploying multiple Raft clusters independently. One of the problems of this approach is that resources like IP addresses or ports must be allocated per node to identify them. This introduces unnecessary complexity in deployment. Another problem is that shards can't share the resources efficiently. In the implementation of a key-value store, the embedded datastore should be shared among the shards for efficient I/O like write batching. For these problems, I will take the second approach.

sorock implements in-process multi-raft. As the name implies, sorock allows you to place multiple Raft processes in a single gRPC server process. These Raft processes can form a Raft cluster on independent shards.

Heartbeat Multiplexing

In Raft, leader is responsible for periodically sending heartbeats to all followers to maintain the leadership. In Multi-Raft, without any optimization load from the the heartbeats can be non-negligible. However, we can optimize to reduce the heartbeat RPCs by multiplexing the heartbeats from different shards.

Problem of naive implementation

In the following diagram, we have two nodes Node1 and Node2 and there are two shards. A Process is a leader if it is denoted with an asterisk (*): P1 is sending the heartbeats to the P3 and P2 to P4 as well. So there are two RPC messages sent from P1 to P3 and P2 to P4 independently.

graph LR
  subgraph Node1
    P1(P1*)
    P2(P2*)
  end
  subgraph Node2
    P3(P3)
    P4(P4)
  end
  P1 -->|heartbeat| P3
  P2 --> P4

Multiplexing heartbeats

To reduce the number of RPCs to only one, we can introduce multiplexer and demultiplexer in the nodes. The two heartbeats are buffered in the multiplexer and sent in a batched RPC to the destination node. In the destination node, the demultiplexer will split the batched RPC into individual heartbeats and send them to the corresponding processes.

graph LR
  subgraph Node1
    P1(P1*)
    P2(P2*)
    MUX(Mux)
  end
  subgraph Node2
    DEMUX(Demux)
    P3(P3)
    P4(P4)
  end
  P1 --> MUX
  P2 --> MUX
  MUX --> DEMUX
  DEMUX --> P3
  DEMUX --> P4

In this case the reduction rate is 2. But what would be the reduction rate in general. Let's do some math.

Math: Reduction rate

Let's consider there are N nodes and L shards. Each shard have K replication and leader processes are balanced among the nodes.

In this case, the total number of heartbeats sent in period is LK. And the total directed paths connecting two nodes is N(N-1) and these heartbeats are evenly attributed to these paths. Therefore, the number of heartbeats sent in each path is LK/(N(N-1)) and this is the reduction rate. For example, if there are 5 nodes and 1000 shards with 3 replication, the reduction rate is 150.

Batched Write

In multi-raft, multiple shards process write requests. Conceptually, each shard maintains its own log for entry insertion.

Having a physically independent log for each shard isn't efficient as each write requires a transaction to persist the data on the storage.

However, an optimization technique called "batching" can be used. Here, each shard maintains a virtual log, and the entries are temporarily queued in a shared queue. These queued entries are then processed in a single transaction, reducing the number of transactions.

This approach often presents a throughput versus latency dilemma. However, this implementation increases throughput without sacrificing latency.

graph LR
  CLI(Client)

  subgraph P1
    T1(redb::Table)
  end
  subgraph P2
    T2(redb::Table)
  end
  subgraph P3
    T3(redb::Table)
  end

  subgraph Reaper
    Q(Queue)
  end
  DB[(redb::Database)]

  CLI -->|entry| T1
  CLI --> T2
  CLI --> T3
  T1 -->|lazy entry| Q
  T2 --> Q
  T3 --> Q
  Q -->|transaction| DB

Raft Process

The core of multi-raft is Raft process. Each multi-raft server has one or more Raft processes.

To implement multi-raft, sorock implements Raft process as it is agnostic to detailed node communications through gRPC. Since the Raft process doesn't know about the IO, we call it Pure Raft.

To make Raft process to communicate with other Raft processes through network, RaftDriver must be provided. Everything about actual network communication is encapsulated under RaftDriver.

#![allow(unused)]
fn main() {
impl RaftProcess {
    pub async fn new(
        app: impl RaftApp,
        log_store: impl RaftLogStore,
        ballot_store: impl RaftBallotStore,
        driver: RaftDriver,
    ) -> Result<Self> {
}

Multi Threading

Inside RaftProcess, there is a state called RaftCore and many threads to process it concurrently on an event-driven basis, most of them are driven by timers.

Application State

In RaftCore, RaftApp and RaftLogStore are especially important because these two compose the application state. This section explains the conceptual aspect of it.

RaftApp

RaftApp is an abstraction that represents the FSM (Finite State Machine) of the application and the snapshot repository. The snapshot repository isn't separated because the contents of the snapshot is strongly coupled with the application state.

The application state can be updated by applying the log entries. In this figure, the last applied entry is of index 55.

RaftApp can generate a snapshot arbitrarily to compact the log. When RaftApp makes a snapshot and stores it in the snapshot repository. The latest snapshot is immediately picked up by the Raft process and it manipulates the log (right in the figure) by replacing the snapshot entry. In this figure, the snapshot index is 51.

Old snapshots will be garbage collected.

Snapshots may be fetched from other nodes. This happens when the Raft process is far behind the leader and the leader doesn't have the log entries as they are garbage collected.

RaftLogStore

RaftLogStore is an abstraction that represents the log entries. In the figure, log entries from 45 to 50 are scheduled for garbage collection. snapshot entry is of index 51 and it is guaranteed that the corresponding snapshot exists in the snapshot repository. 52 to 55 are applied. 56 or later are not applied yet. They are either uncommitted or committed.

Client Interaction

You can send application-defined commands to the Raft cluster.

sorock distinguishes the command into two types: read-write and read-only. The read-only command is called query and queries can be processed through the optimized path.

R/W commnand

R/W command is a normal application command that is inserted into the log to be applied later.

You can send a R/W command to the cluster with this API.

message WriteRequest {
  uint32 shard_id = 1;
  bytes message = 2;
  string request_id = 3;
}

request_id is to avoid doubly application. Let's think about this scenario:

  1. The client sends a R/W command to add 1 to the value.
  2. The leader server replicates the command to the followers but crashes before application (+response).
  3. The client resends the command to a new leader after a timeout.
  4. The result is adding 2 to the value whereas the expectation is 1.

To avoid this issue, request_id is added to identify the commands.

R/O command

The R/O command can bypass the log because it is safe to execute the query after the commit index at query time is applied. This is called read_index optimization.

You can send a R/O command to the cluster with the following API.

When read_locally is set to true, the request isn't proxied to the leader but processed locally.

message ReadRequest {
  uint32 shard_id = 1;
  bytes message = 2;
  bool read_locally = 3;
}

Cluster Management

Single server change approach

There are two approaches to membership change. One is by joint consensus and the other is what is called single server change approach. sorock implements the second one.

This approach exploits the log replication mechanism in membership change and therefore can be implemented as an extension of the normal log replication by defining AddServer and RemoveServer commands. From the admin client, the following API can add or remove a Raft process in the cluster.

message AddServerRequest {
  uint32 shard_id = 1;
  string server_id = 2;
}

message RemoveServerRequest {
  uint32 shard_id = 1;
  string server_id = 2;
}

Cluster bootstrapping

In Raft, any command is directed to the leader. So the question is how to add a Raft process to the empty cluster.

The answer is so-called cluster bootstrapping. On receiving the AddServer command and the receiving node recognizes the cluster is empty, the node tries to form a new cluster with itself and immediately becomes a leader as a result of a self election.

Leadership

Command redirection

Raft is a leader-based consensus algorithm. Only a single leader can exist in the cluster at a time and all commands are directed to the leader.

In sorock, if the receiving Raft process isn't the leader, the command is redirected to the leader.

Adaptive leader failure detection

Detecting the leader's failure is a very important issue in Raft algorithm. The naive implementation can send heartbeats to the followers periodically and followers can detect the leader's failure by timeout. However, this approach requires the heartbeat interval and the timeout duration to be set properly before deployment. This brings another complexity. Not only that, these times can't be fixed to a single value when the distance between nodes is heterogeneous such as geo-distributed environment.

To solve this problem, sorock uses an adaptive failure detection algorithm called Phi accrual failure detector. With this approach, users are free from setting the timing parameters.

Leadership transfer extension

In multi-raft, changing the cluster members is not a rare case. An example of this case is rebalancing: To balance the CPU/disk usage between nodes, Raft process may be moved to other nodes.

If the Raft process to be removed is the leader, the cluster will not have a leader until a new leader is elected which causes downtime.

To mitigate this problem, the admin client can send a TimeoutNow command to any remaining Raft process to forcibly start a new election (by promoting to a candidate in Raft term).

message TimeoutNow {
  uint32 shard_id = 1;
}

Development

Testing

cargo test to run unit tests.

Benchmark

cargo +nightly bench to run benchmarks.

Load Testing

We have a nice load testing tool named "Mr. Bench". To ask him for a help, run cargo run -p mrbench. For detail, please refer to the README.

Documentation

For editing the mdbook under doc/ directory, you can run mdbook serve doc to start the local mdbook server.