Raft 7: Log Replication, Part 1

Reading Time: 9 minutes

In December, I took a course in which I attempted to implement the Raft distributed consensus algorithm from this paper. I continued my work on the implementation through January, then took a hiatus to design and teach MPCS 51039—a course on software development for computer science graduate students, my first ever full-length course of my own design. (If you’re curious, I wrote, and still write, about that process right here).

The course is now in its second run, and while it’s still a lot of work, it is no longer a 30-hour-per-week commitment (It needed to be good, okay?) So, your intrepid adventuress has returned from the high seas of syllabus design and exercise creation to the calm waters of…

studiostoks submarine cartoon woman

…wait. To the altogether different but still roiling seas of implementing the Raft distributed consensus algorithm. So now, the Raft series continues. In case you need a refresher, here’s where you can see all the posts so far about building this thing.

In the previous post, we worked on the heartbeat—the means by which the leader server communicates to the other servers that it’s still in charge. To do that, though, we removed the part where a server sends new log entries for other servers to replicate.

We need to redo it. So let’s get started.

Replication the Raft Way

For this post, I will insert block-quotes from section 5.3 of the Raft paper, which focuses on log replication. I’ll highlight the pieces that describe the implementation details we need to address.

Logs are organized as shown in Figure 6. Each log entry stores a state machine command along with the term number when the entry was received by the leader. The term numbers in log entries are used to detect inconsistencies between logs and to ensure some of the properties in Figure 3. Each log entry also has an integer index identifying its position in the log.

Screen Shot 2020-07-21 at 11.33.39 AM

Currently, each of our log entries includes a term number and a command for the state machine. This passage indicates that it also needs an index, which we will use to determine if logs are current by comparing the index and term number of the latest command in two separate logs. The idea is that we use this to fulfill the Log Matching Property:

Raft maintains the following properties, which together constitute the Log Matching Property in Figure 3:

• If two entries in different logs have the same index and term, then they store the same command.

• If two entries in different logs have the same index and term, then the logs are identical in all preceding entries.

The first property follows from the fact that a leader creates at most one entry with a given log index in a given term, and log entries never change their position in the log. The second property is guaranteed by a simple consistency check performed by AppendEntries.

I think I can get away with not adding indices to each row in my current log implementation. I think I should be able to use the indices from the collection that the state machine uses to execute the log commands. We’ll find out if I’m right about that.

The leader decides when it is safe to apply a log entry to the state machines; such an entry is called committed. Raft guarantees that committed entries are durable and will eventually be executed by all of the available state machines. A log entry is committed once the leader that created the entry has replicated it on a majority of the servers (e.g., entry 7 in Figure 6). This also commits all preceding entries in the leader’s log, including entries created by previous leaders.

Once upon a time, our server would only update other servers’ logs upon being explicitly told to do so. Now, our leader server will endeavor to update followers’ logs each time it receives a write (a command to set or delete a key-value pair).

My server implementation relies on a conditional statement to parse requests and issue responses. Before my hiatus, I moved this conditional statement into its own file called parsing.py. As I got back into the code after six months away from it, I found that one of the first things I wanted to do was move the conditional back into the server class. You see that change reflected in this commit.

Now that I’ve done some more work and reacquainted myself with the code, I find myself wanting to pull it back out again, largely to make it easier to unit test. I feel weird about this because I pride myself on prioritizing ‘legibility’ in the code I write—but it seems that what makes my code ‘legible’ to me changes depending on my immediate familiarity with it. Maybe if the conditional statement were under unit tests, I’d like it in its own file even after coming back from hiatus. Hmm. Maybe I’ll save this for later and put it in a different blog post series.

In the meantime, back to Raft.

Sending out logs to be replicated

All right, let’s reimplement log replication! First of all, since we have already distinguished a leader server and a follower server, we can give them different behavior.

So when a request related to the data comes through the conditional statement, a leader server should pass on the write commands it receives for follower servers to replicate. A follower server should not process these requests. One might argue that a follower should redirect to the leader, or at least tell the requester where to find the leader. For now, though, my followers slam the door on these requests with a curt "I am not the leader. Please leave me alone."


else: #if the request is to get, set, or delete
if self.leader:
self.current_operation = string_operation
key_value_store.write_to_log(string_operation, term_absent=True)
if self.current_operation.split(" ")[0] in ["set", "delete"]:
broadcast(self, with_return_address(self, "append_entries [" + self.current_operation + "]"))
send_pending = False
else:
response = "I am not the leader. Please leave me alone."

view raw

server.py

hosted with ❤ by GitHub

You’ll notice that this implementation focuses on write requests and..completely .drops read requests on the floor.

We don’t have to replicate reads. We’re focused on replication right now. So, we’ll stick with the writes for now, and come back to reimplement the reads later in this commit.

When a leader broadcasts the append_entries request to all the other servers on line 8 above, the follower servers accept the request and append the entry to their logs. They send a message back to the leader to indicate that they received, and appended, the new command.


if string_operation.split(" ")[0] == "append_entries":
# followers do this to update their logs.
stringified_logs_to_append = string_operation.replace("append_entries ", "")
print("Preparing to append: " + stringified_logs_to_append)
logs_to_append = ast.literal_eval(stringified_logs_to_append)
[key_value_store.write_to_log(log, term_absent=True) for log in logs_to_append]
print("State machine after appending: " + str(key_value_store.data))
response = "Append entries call successful!"

view raw

server.py

hosted with ❤ by GitHub

Now, there are two steps here: write a command to the log, and commit the command to the server’s state machine. So far, the leader has done the first of these two things. It waits to do the second one until a majority of servers have written the command to their logs. So, when the follower servers have sent the "Append entries call successful!" message, it’s time for the server to make a tally.

We initialize the server with a dictionary that indicates all followers are not up to date:


class Server:
self.followers_with_update_status = {}
self.current_operation = ''
self.current_operation_committed = False
for server_name in other_server_names(name):
self.followers_with_update_status[server_name] = False

view raw

server.py

hosted with ❤ by GitHub

Then, when one of them responds, that response gets its own branch in the conditional statement, where the leader changes the update status for that server to True.


elif string_operation == "Append entries call successful!":
if self.leader:
self.mark_updated(server_name)
send_pending = False

view raw

server.py

hosted with ❤ by GitHub

Each time the leader server updates the status of a follower server, it checks to determine if we have reached quorum: the point where a majority of servers have replicated the log.


def mark_updated(self, server_name):
self.followers_with_update_status[server_name] = True
trues = len(list(filter(lambda x: x is True, self.followers_with_update_status.values())))
falses = len(list(filter(lambda x: x is False, self.followers_with_update_status.values())))
if trues >= falses:
print("Committing entry: " + self.current_operation)
self.current_operation_committed = True
self.key_value_store.write_to_state_machine(self.current_operation, term_absent=True, write=False)
broadcast(self, with_return_address(self, "commit_entries ['" + self.current_operation + "']"))
self.current_operation_committed = False
for server_name in other_server_names(self.name):
self.followers_with_update_status[server_name] = False

view raw

server.py

hosted with ❤ by GitHub

If we have quorum, then we commit the entry: we make the change in the KeyValueStore, updating its data dictionary to reflect the collection of keys and values produced by the most up-to-date version of the log. How do we do this? We sit in a while loop (see line 10 below) until we get quorum.


else:
if self.leader:
self.current_operation = string_operation
if self.current_operation.split(" ")[0] in ["set", "delete"]:
key_value_store.write_to_log(string_operation, term_absent=True)
broadcast(self, with_return_address(self, "append_entries ['" + self.current_operation + "']"))
while not self.current_operation_committed:
pass
send_pending = False
else:
response = key_value_store.read(self.current_operation)
else:
response = "I am not the leader. Please leave me alone."

view raw

server.py

hosted with ❤ by GitHub

After we commit the entry to the leader server’s state machine, we need to let the follower servers know that the entry has been committed, so that they, too, can commit it to their state machines. So, on line 12, we broadcast a new message, covered in this commit. Followers respond like so:


elif string_operation.split(" ")[0] == "commit_entries":
# followers do this to update their logs.
stringified_logs_to_append = string_operation.replace("commit_entries ", "")
print("Preparing to commit: " + stringified_logs_to_append)
logs_to_append = ast.literal_eval(stringified_logs_to_append)
[key_value_store.write_to_state_machine(command, term_absent=True) for command in logs_to_append]
response = "Commit entries call successful!"
print("State machine after committing: " + str(key_value_store.data))

view raw

server.py

hosted with ❤ by GitHub

If you check out this commit of the Raft repo, you are up to date on all of these changes.

This version ignores a lot of sad paths. A few existing fissures in our implementation:

  1. The leader is supposed to keep trying indefinitely if a call to a follower to append an entry fails. For now, we’re going to say that once an entry is committed, a server moves on. Later, we’ll implement a follow catching up on multiple log statements from a leader, which will have the same effect in practice in many cases.
  2. For now, if a follower successfully receives the message to commit a command, we assume that it also succeeded in writing that command to its log. So if a follower server comes back up right in the middle of this transaction, it could have an up-to-date state machine that is not reflected in its out-of-date log.
  3. For now, the cycle of sending out an append_entries call and waiting for quorum on it is blocking on its thread. For now, I’m fine with this: If the follower servers aren’t moving fast enough to get to quorum, I figure they’re probably not fast enough to also be sending in loads of other requests.

We may come back to these fissures as we implement additional components of Raft. But I want to explicitly state our assumptions now, as well as where they fall down.

All right, that’s quite enough for one post. So far, though, follower servers still don’t exercise any scrutiny on the logs: they write and commit what they’re told to write and commit. They need to identify when their logs and states are out of date based on the requests they receive from the leader. We will leave that for the next post.

If you liked this piece, you might also like:

The time we discovered that people who tout numpy vectorization don’t know what it does

The time we discovered that people who scoff at SOAP don’t know what it does

The time we discovered that classic signs of bug-prone code don’t seem to cause bugs

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.