Redis/Valkey Replication Internals: The Architecture Behind Zero-Copy Command Propagation

Redis/Valkey Replication Internals: The Architecture Behind Zero-Copy Command Propagation

How replBufBlock's reference-counted shared memory design enables efficient replication

·19 min read

The Mystery of Disappearing Data

Recently I started to work on fixing a specific bug in Valkey, the problem was that if we have a hash key and there are values inside with a set TTL, if you were to execute a command on the Master node with KEEPTTL this can cause a data inconsisteny between the master and the replica.

Think about it, you already have a key on master and you try to overwrite that key however when you pass the KEEPTTL flag the replication executes the same command over in the replica. However the problem arises in the following:

shell
> hsetex EX 3 myhash fields f1 v1
> hsetex myhash keepttl fields f1 v1

Do you see anything special above? Well think about it, the first statement creates a new hashset called myhash and adds a key into it called f1 with value v1 with time-to-live of 3 seconds. We then run the same however we use keepttl the idea behind a keepttl command is that you ask Valkey to preserve the TTL set on that field.

However things are never this simple! Say you had a network partition or a delay between the first and the last command the Master node ran the commands in succession however on the replica the first command ran and due to a delay the second command ran after say 4 seconds.

Now we are in a state of inconsistency! Master says that myhash has a field called f1 which is expired however the replica still contains this key, with what TTL exactly? Well since the command replicated had just a KEEPTTL field this key will live infinitely (by the time you're reading this blog this has already been merged and fixed we'll talk about it soon ;)

Note that everything you read in this blog is written from a perspective that you my reader are also an engineer and know and understand C (don't fret too much I'll keep explaining).

Replication 101: How Distributed Key-Value Stores Stay in Sync

Let's talk about the elephant in the room! Btw we'll create a toy-version of the replication model followed by Valkey and Redis so stick around (make sure to understand this otherwise the rest would be magic).

I am assuming you already know what replication is, if you don't then, "Replication is a simple process by which data is propagated from one node to another". That's the most simple way to put it, however when we talk about data replication its generally a broadcast where one node (a master) propagates or distributes its data across all the available nodes (replicas).

Primary-Replica Architecture

Valkey also follows the Primary-Replica architecture where there is one master which accepts writes and these writes are then propagated to all the replicas subscribed to it. The general flow being that all your writes will always go to the master and reads will be going to the replicas.

Valkey by default uses async replication the idea being that every client keeps a track of the commands that it has replicated, here's a simplified version of the client struct from server.h in Valkey:

c
// server.h

typedef struct ClientReplicationData {
    ... fields ...

    listNode *ref_repl_buf_node;         /* Referenced node of replication buffer blocks,
                                           see the definition of replBufBlock. */
    size_t ref_block_pos;                /* Access position of referenced buffer block,
                                           i.e. the next offset to send. */

    ... fields ...
} ClientReplicationData;

typedef struct client {
    ... fields ...

    /* Client state structs. */
    ClientReplicationData *repl_data;     /* Required for Replication operations. lazily initialized when first needed */

    ... fields ...
} client;

Observe that we a pointer to something called ref_repl_buf_node, it essentially points to the a block of memory on the master on which the replication data is stored. We will dive more deeper into replBufBlock which is the magic behind an efficient replication strategy used by Valkey and Redis.

Valkey also is very smart about replication the master maintains something called a "Repl Backlog" which is the amount of RESP commands that are stored in the buffer this is something that is maintained by the primary which means if the client requests any command that was executed before the available backlog Valkey can optionally switch over to synchronous replication where it can request a full copy of the data through RDB (one of the two persistance options the other being AOF).

A quick word about Hash Field Expiration (HFE)

One thing before we go any further, let's talk about HFE. Redis and Valkey are strong contenders of KV Stores especially when it comes to caching, which also means that keys in general also have an associated TTL. This means that Valkey is responsible for getting rid of a key when it expires. However this idea of removing keys have forever been on a top-level key.

Valkey supports something called Hash Field Expiration wherein individual keys in a Hash can also have their own TTLs. To understand this let's quickly talk about how Valkey handles expiration. In general there are two ways expired keys are reclaimed:

  1. Lazy Expiration -- when the user performs a Get on a key Valkey checks if the key requested is supposed to be expired in that case it deletes it immediately.
  2. Active Seek -- Valkey runs a cron job in the background which runs 10 times every second sampling a small set of keys with expiration and deletes the one not needed.

Valkey extended the job to support this, note that the core challenge here is that you could have several hash sets that can contain millions of keys themselves. They do it via something called a vset (Volatile Set) a semi-strcutured data structure, honestly if you wanna read about it more here is a blog Ran wrote Introducing Hash Field Expirations and you can check it in vset.h and vset.c for its implementation.

Replication

The elephant in the room! Let's now talk about how it all works.

RESP

Redis and Valkey both use the RESP (Redis Serialization Protocol) for communication, the client needs to implement this protocol to talk with the redis-server or valkey-server.

The good part about RESP is that its pretty easy to implement, parse, and very readable. If you have ever done GET <key> or SET <key> then you've used RESP, the only internal would be is how the protocol itself is embedded.

The valkey server processCommand is called with a serverCommand set in the client's pointer. In the networking.c's implementation the following code is responsible for entry:

c
int processInputBuffer(client *c) {
    /* Parse the query buffer and/or execute already parsed commands. */
    while ((c->querybuf && c->qb_pos < sdslen(c->querybuf)) ||
           c->cmd_queue.off < c->cmd_queue.len) {
        ... code ...
        /* If commands are queued up, pop from the queue first */
        if (!consumeCommandQueue(c)) {
            parseInputBuffer(c); // <-- Observe parse input buffer
            prepareCommandQueue(c);
        }
... more code ...

The __attribute__((weak)) main function's flow reads the data incoming into the c->querybuf which is defined as an sds:

c
typedef struct client {
  ... code ...

  sds querybuf; /* data accumulated through incoming client queries */

  ... code ...

  struct serverCommand *cmd; /* current command */

  ... code ...
} client;

Once the command has been consumed into the query buffer the query buffer is then parsed and is forwarded to processCommand dispatcher which picks this up and forwards it to the call function.

Rewriting commands

Let's say you were creating a simple KV Store that has 3 nodes, 1 master and 2 replicas and if you were asked that we need to replicate data from the Master node to the replicas how would you handle that?

Well a simple way to do this is that on the master node we generate some logs, these logs are a record of what was performed in the master node, once done the master sends these logs over to the replicas which just replay these logs on their end. Pretty simple and straightforward.

In the case of Valkey and Redis, they have something called an Append-only File (AOF). AOF is one of the two available way Valkey can persist data. Valkey also takes snapshots of data however snapshots are not particularly efficient. Whereas an append only file is a better alternative.

Above we read how data is shared across nodes and similarly Valkey utilizes this to its full potential, where when a replica is first created the master will send over the entire snapshot of data at a point in time through which the replica can come to a stable state, this if further maintained by sending RESP commands that were executed on the master.

Note that we do have AOF files but the server sends back RESP Commands that were executed, though these commands are also altered time to time. What do I mean by that? Observe the following code:

C
void hsetexCommand(client *c) {

    ... code ...

    int need_rewrite_argv = 0;

    ... code ...

    if (flags & (ARGS_SET_NX | ARGS_SET_XX | ARGS_SET_FNX | ARGS_SET_FXX | ARGS_EX | ARGS_PX | ARGS_EXAT)) {
        need_rewrite_argv = 1;
    }

    ... code ...

    /* Handle parsing and calculating the expiration time. */
    if (flags & ARGS_KEEPTTL)
        set_flags |= HASH_SET_KEEP_EXPIRY;
    else if (expire) {
        ... code ...

        if (checkAlreadyExpired(when)) {
            set_expired = 1;
        }
    }

    ... code ...

    bool has_volatile_fields = hashTypeHasVolatileFields(o);

    /* Prepare a new argv when rewriting the command. If set_expired is true,
     * all expired fields will be deleted. Otherwise, if rewriting is needed due to NX/XX/FNX/FXX flags,
     * copy the command, key, and optional arguments, skipping the NX/XX/FNX/FXX flags. */
    if (set_expired) {
        new_argv = zmalloc(sizeof(robj *) * (num_fields + 2));
        new_argv[new_argc++] = shared.hdel;
        incrRefCount(shared.hdel);
        new_argv[new_argc++] = c->argv[1];
        incrRefCount(c->argv[1]);
    } else if (need_rewrite_argv) {
        /* We use new_argv for rewrite */
        new_argv = zmalloc(sizeof(robj *) * c->argc); // <-- Allocates a new server object with argc length
        // Copy optional args (skip NX/XX/FNX/FXX)
        for (int i = 0; i < fields_index; i++) {
            if (strcasecmp(objectGetVal(c->argv[i]), "NX") &&
                strcasecmp(objectGetVal(c->argv[i]), "XX") &&
                strcasecmp(objectGetVal(c->argv[i]), "FNX") &&
                strcasecmp(objectGetVal(c->argv[i]), "FXX")) {
                /* Propagate as HSETEX Key Value PXAT millisecond-timestamp if there is
                 * EX/PX/EXAT flag. */
                if (expire && !(flags & ARGS_PXAT) && c->argv[i + 1] == expire) {
                    robj *milliseconds_obj = createStringObjectFromLongLong(when);
                    new_argv[new_argc++] = shared.pxat;
                    new_argv[new_argc++] = milliseconds_obj;
                    i++; // skip the original expire argument
                } else {
                    new_argv[new_argc++] = c->argv[i];
                    incrRefCount(c->argv[i]);
                }
            }
        }
    }

    ... code ...
}

Note the following two:

  1. If our fields are already expired notice how we rewrite it into a new argv that uses hdel! There was not hdel command executed by the client rather its Valkey that rewrites and propagates that command therefore the key gets deleted from the replicas, essentially bringing the two in a common state.
  2. Notice that in the case flags such as NX/XX or others are set we just rewrite the entire command vector. And we handle the case where we literally add a new argument PXAT for absolute timestamps.

Do note that commands that contain things like time and stuff are pretty non-deterministic in a way that Valkey handles them differently or propagates them differently. Same with floating point values for example:

INCRBYFLOAT key 1.5

Will be converted to something like:

SET key 2.5 KEEPTTL

Why? Because float precision can differ between systems. By rewriting to the final value with SET, replicas get the exact same result. Do note that the original command (c->original_argv) is preserved for monitoring, but the rewritten version (c->argv) is what gets replicated!

What can a block do?

Now that we have almost everything we need to understand from a core perspective let's dive into the fundamental architecture and data structures that make this possible.

If you go through server.h and server.c you'll come across quite a particular comment on a struct called replBufBlock that looks like this:

C
typedef struct replBufBlock {
   int refcount;              // How many readers need this
   long long id;              // Monotonic ID
   long long repl_offset;     // Offset in replication stream
   size_t size, used;         // Buffer capacity and usage
   char buf[];                // Flexible array for commands
} replBufBlock;

What does that do? Well the best way to think about it through a diagram that is present right above it.

Replication Buffer (linked list of blocks):

+--------------+       +--------------+       +--------------+
| refcount = 1 |  ...  | refcount = 0 |  ...  | refcount = 2 |
+--------------+       +--------------+       +--------------+
     |                                            /       \
     |                                           /         \
     |                                          /           \
 Repl Backlog                              Replica_A    Replica_B

Let's take one block which is just one instance of replBufBlock the idea here is that the buf is a shared memory buffer which holds commands that are to be sent over to the replicas. That's all there is to it.

The primary instance is the one which manages almost everything, the master node is responsible for creating the replication buffer. Which is generally a Linked List containing replBufBlock as nodes.

Let's talk about the structure of the block itself, we have something called refcount this is used to count the numbers of readers actively using this node. The replication buffer (everytime I say replication buffer its a linked list) contains all the RESP commands ran on master in a sequence, however there are replicas actively requesting data. Therefore every replica also sends an offset, the offset where they want the data from.

How this all ties in is that when a Primary is initiated they all contain a replication id, here is the valkeyServer struct:

c
struct valkeyServer {
    ...code...

    /* Replication (primary) */
    char replid[CONFIG_RUN_ID_SIZE + 1];       /* My current replication ID. */
    long long primary_repl_offset;             /* My current replication offset */
    int repl_ping_replica_period;              /* Primary pings the replica every N seconds */
    replBacklog *repl_backlog;                 /* Replication backlog for partial syncs */
    long long repl_backlog_size;               /* Backlog circular buffer size */
    int repl_min_replicas_to_write;            /* Min number of replicas to write. */
    int repl_min_replicas_max_lag;             /* Max lag of <count> replicas to write. */
    int repl_good_replicas_count;              /* Number of replicas with lag <= max_lag. */
    int wait_before_rdb_client_free;           /* Grace period in seconds for replica main channel
                                                * to establish psync. */
    size_t repl_buffer_mem;                    /* The memory of replication buffer. */

    ...code...
}

Observe a few more fields I have left from that struct in there, this is the core struct that creates the valkey server instance. Note that we have a replid that is the replication ID that is the id that belongs to this primary. Now within this same struct we also have the following:

c
...code...

/* Replication (replica) */
char *primary_host;     /* Hostname of primary */
int primary_port;       /* Port of primary */
int repl_timeout;       /* Timeout after N seconds of primary idle */
client *primary;        /* Client that is primary for this replica */
uint64_t rdb_client_id; /* Rdb client id as it defined at primary side */
struct {
    connection *conn;
    char replid[CONFIG_RUN_ID_SIZE + 1];
    long long reploff;
    long long read_reploff;
    int dbid;
} repl_provisional_primary;
client *cached_primary;              /* Cached primary to be reused for PSYNC. */
int repl_state;                      /* Replication status if the instance is a replica */
int repl_rdb_channel_state;          /* State of the replica's rdb channel during dual-channel-replication */
off_t repl_transfer_last_fsync_off;  /* Offset when we fsync-ed last time. */
connection *repl_transfer_s;         /* Replica -> Primary SYNC connection */
connection *repl_rdb_transfer_s;     /* Primary FULL SYNC connection (RDB download) */

...code...

Observe in the above struct we have replid and repl_transfer_last_fsync_off in general these two fields are used by the replica with PSYNC command to the primary, in the function replicaReceiveRDBFromPrimaryToDisk we have the following:

c
...code...

/* Now read the actual sync data and save it to disk */
do {
  ...code...

  /* Write what we got from the socket to the dump file on disk */
  if ((nwritten = write(server.repl_transfer_fd, buf, nread)) != nread) {
      ...code...
  }

  ...code...

  /* Sync data on disk from time to time, otherwise at the end of the
   * transfer we may suffer a big delay as the memory buffers are copied
   * into the actual disk. */
  if (server.bio_repl_transfer_read >= repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) {
      off_t sync_size = server.bio_repl_transfer_read - repl_transfer_last_fsync_off;
      rdb_fsync_range(server.repl_transfer_fd, repl_transfer_last_fsync_off, sync_size);
      repl_transfer_last_fsync_off += sync_size; // <-- WE INCREASE THE OFFSET
  }

  ...code...

Valkey's replica will keep updating its last sync offset and will send the replication id and this offset when it starts using the PSYNC command. This creates a new replication stream from the primary.

Going down the rabbit hole

Now let's take one command and follow it just so that we can see all this in action and what happens!

As mentioned earlier we start from the main function and go down to the call function! The calling chain to reach this function is pretty long Valkey runs using an event loop which starts at the bottom of the main function, the aeMain keeps looping and calls the aeProcessEvents which triggers a certain block when we receive an event which in turn calls the connSocketEventHandler note the idea here is that right now these events are nothing but mere socket events received over TCP.

The socket handler has a callHandler generic function which accepts a handler function which calls readQueryFromClient here you can guess already what happens we read from the client connection and populate an input buffer. Once here one of the core function processInputBuffer is called which parses the input buffer received via the parseInputBuffer Valkey uses the RESP protocol which is extremely easy to parse as well.

Once here we also prepare what we parsed into a serverCommand struct which looks like this:

c
struct serverCommand {
    /* Declarative data */
    const char *declared_name;    /* A string representing the command declared_name.
                                   * It is a const char * for native commands and SDS for module commands. */
    const char *summary;          /* Summary of the command (optional). */
    const char *complexity;       /* Complexity description (optional). */
    const char **tips; /* An array of strings that are meant to be tips for clients/proxies regarding this command */
    int num_tips;
    serverCommandProc *proc;        /* Command implementation */
    int arity;                      /* Number of arguments, it is possible to use -N to say >= N */
    ... code ...
};

Note that there are some really cool things that you generally see in Redis or Valkey CLI ie. arity and tips! Once done and finally after a lot of checks we are finally ready to proceed and invoke the call command.

The "Call"-ing

The call function is one of the most important function in Valkey's codebase, once everything is done processing, ACL checks, and what not, the call function is what actually calls the proc above.

c
c->cmd->proc(c);

The cmd points to the current command which calls the function which holds the logic and implementation of that command. For example if you called SET myname sourav this is what my debugger shows:

shell
 cmd serverCommand * = {declared_name:"set", summary:"Sets the string value of a key, ignoring its type. The key is created if it doesn't exist.", ...}
  declared_name const char * = "set"
  summary const char * = "Sets the string value of a key, ignoring its type. The key is created if it doesn't exist."
  complexity const char * = "O(1)"
  since const char * = "1.0.0"
   doc_flags int = 0
   group serverCommandGroup = COMMAND_GROUP_STRING
  history commandHistory * = {since:"2.6.12", changes:"Added the `EX`, `PX`, `NX` and `XX` options."}
   num_history int = 5
   proc serverCommandProc * = (valkey-server`setCommand at t_string.c:231)
   arity int = -3
   flags uint64_t = 5
   acl_categories uint64_t = 32900
   get_dbid_args commandDbIdArgs * = <null>
 ▸ key_specs keySpec * = {notes:"RW and ACCESS due to the optional `GET` argument", ...}
   key_specs_num int = 1
   getkeys_proc serverGetKeysProc * = (valkey-server`setGetKeys at db.c:2921)
   num_args int = 5
  args serverCommandArg * = {name:"key", type:ARG_TYPE_KEY, key_spec_index:0, ...}
 ▸ fullname sds = "set"
 ▸ current_name sds = "set"
 ▸ latency_histogram hdr_histogram * = {lowest_discernible_value:1, highest_trackable_value:1000000000, ...}
 ▸ legacy_range_key_spec keySpec = {notes:"RW and ACCESS due to the optional `GET` argument", ...}
 ▸ subcommands_ht hashtable * = <null>
 ▸ parent serverCommand * = <null>
 ▸ module_cmd ValkeyModuleCommand * = <null>

Observe that by merely that you get to know a lot of what that command is trying to do and where it will land next! And we finally end up calling the implementation.

However what we want is what comes after this! Once the logical part is done we run into the following code, something if you're still here is what excited you to read all this in the first place (pat yourself on the back):

c
/* Propagate the command into the AOF and replication link.
 * We never propagate EXEC explicitly, it will be implicitly
 * propagated if needed (see propagatePendingCommands).
 * Also, module commands take care of themselves */
if (flags & CMD_CALL_PROPAGATE && !c->flag.prevent_prop && c->cmd->proc != execCommand &&
  !(c->cmd->flags & CMD_MODULE)) {
  int propagate_flags = PROPAGATE_NONE;

  /* Check if the command operated changes in the data set. If so
   * set for replication / AOF propagation. */
  if (dirty) propagate_flags |= (PROPAGATE_AOF | PROPAGATE_REPL);

  /* If the client forced AOF / replication of the command, set
   * the flags regardless of the command effects on the data set. */
  if (c->flag.force_repl) propagate_flags |= PROPAGATE_REPL;
  if (c->flag.force_aof) propagate_flags |= PROPAGATE_AOF;

  /* However prevent AOF / replication propagation if the command
   * implementation called preventCommandPropagation() or similar,
   * or if we don't have the call() flags to do so. */
  if (c->flag.prevent_repl_prop || c->flag.module_prevent_repl_prop || !(flags & CMD_CALL_PROPAGATE_REPL))
      propagate_flags &= ~PROPAGATE_REPL;
  if (c->flag.prevent_aof_prop || c->flag.module_prevent_aof_prop || !(flags & CMD_CALL_PROPAGATE_AOF))
      propagate_flags &= ~PROPAGATE_AOF;

  /* Call alsoPropagate() only if at least one of AOF / replication
   * propagation is needed. */
  if (propagate_flags != PROPAGATE_NONE) alsoPropagate(c->db->id, c->argv, c->argc, propagate_flags, c->slot);
}
Share:
$ cat ./author
name: Sourav Singh Rawat
role: systems engineer who mass produces bugs
status: probably reading kernel code rn
$