Raft 3: Threads, Processes, and State Machines

Reading Time: 8 minutes

In November, I took Dave Beazley’s week-long course on The Structure and Interpretation of Computer Programs.

In December, I took Rafting Trip, in which we attempt to implement the Raft algorithm from this paper. In this series, I’ll share memorable insights from the course (here’s where you can see all the posts so far).

We started setting up the pieces in the previous post, and we left off here:

We’re doing pretty good, but we still need to take care of a detail. Currently, each of our servers will only accept connections from one client at a time. We would like them each to connect to multiple clients. Why? Because Raft achieves fault tolerance, in part, by electing one server the “leader” of the cluster, and all client requests are redirected to that leader. Any of the servers in the cluster could be elected leader and therefore need to do this.

Why does each server only accept connections from one client at a time? And how would we go about changing that?

Building Concurrent Solutions

What are our options for allowing a process to facilitate multiple concurrent interactions?

One common solution: threads. Before we move on, let’s quickly go over the distinction between a thread and a process. A process describes an individual running instance of a program. It has its own memory, which it does not share with other programs. An instance of our server running is one process. An instance of our client running is one process. Five instances of our server running represents five processes—they all run from the same code, but they’re allocated different address spaces in memory.

A process can run code in multiple separate threads. These threads can run code independently of each other. This is how a server might accept multiple clients: we would spin up a separate thread for each client so the server could have an independent conversation with that client. Unlike processes, they share their memory with each other.

The memory sharing thing makes it convenient for threads to communicate with one another. But it also means that, if two conflicting operations execute in separate threads, we have a race condition. Suppose one thread spun up in our server sets the key chelsea to point to the value rules, while another thread sets the key chelsea to point to the value rocks. Does Chelsea rock or rule? Or  chelsea doesn’t get set at all? Or does chelsea get set to some amalgamation of the two values like rucks? (As it so happens I do that too. but it was the intention of neither client to say so).

We tend to ameliorate this issue with threads by using discrete Events (which awaken threads when a specific action needs to be performed) or a mutex (short for “mutual exclusion,” a mutex prevents all threads except one from executing code at one time). Or we may use a Lock object, which locks a specific resource (like a row of a database, or a key value pair in our data store) such that only one thread can do stuff to it at one time.

Sometimes we organize this scheduling with thread queues, in which separate threads put stuff that needs to happen onto a queue, and items are popped off the queue in order.

Mutability and State Management

Ultimately our Raft implementation will need to manage two different sets of states:

  • The state of the data among all the servers
  • The state of the server (who is up, who is down, who is the leader)

We’ll talk more about these cases, but first let’s talk about the key component here: a state machine.

The term state machine doesn’t originally come from programming. It refers to a mathematical model of computation that can be in exactly one of a finite number of states at any given time. In code, we model this idea with classes that compute, and then report on, the instance being in one of a finite series of states. Usually, we do this via an instance attribute called state​ or status that takes one of a finite series of values.

look at a well-circumscribed example: a class representing a traffic light.


class TrafficLight:
def __init__(self, name, state):
self.name = str(name)
self.current_state = state
def progress(self):
if self.current_state == "green":
print(self.name + " light yellow")
self.current_state = "yellow"
elif self.current_state == "yellow":
print(self.name + " light red")
self.current_state = "red"
elif self.current_state == "red":
print(self.name + " light green")
self.current_state = "green"

Here we have a traffic light class with a current_state attribute that takes three values: red, yellow, green. Our progress() method documents the order that these states should happen and moves the light from one state to the next.

Imagine that we wanted to operate two of these lights simultaneously. How might we do that?


import time
from threading import Event, Thread
import queue
class TrafficLight:
def __init__(self, name, state):
self.name = str(name)
self.current_state = state
def progress(self):
if self.current_state == "green":
print(self.name + " light yellow")
self.current_state = "yellow"
elif self.current_state == "yellow":
print(self.name + " light red")
self.current_state = "red"
elif self.current_state == "red":
print(self.name + " light green")
self.current_state = "green"
class TrafficSystem:
states = {
"red": 30,
"yellow": 5,
"green": 25
}
current_time = 0
def __init__(self, speed=3):
self.traffic_lights = {}
self.traffic_lights['1'] = TrafficLight(1, "red")
self.traffic_lights['2'] = TrafficLight(2, "green")
self.speed = speed
self.event_queue = queue.Queue()
Thread(target=self.timer_thread, args=(self.traffic_lights['1'],), daemon=True).start()
Thread(target=self.timer_thread, args=(self.traffic_lights['2'],), daemon=True).start()
self.start_traffic()
def start_traffic(self):
while True:
event = self.event_queue.get().split(" ")
if event[0] == "progress":
self.traffic_lights[event[1]].progress()
def timer_thread(self, light):
self.event_queue.put('progress ' + light.name)
time.sleep(self.states[light.current_state] / (self.speed))
self.timer_thread(light)

Here, you see a TrafficSystem class. Usually, it makes sense to keep our objects with state separate from the management of that state. Two reasons for this: first, as the state and the management each become more complex, separating these concerns improves the legibility of our code. Second, separating the state from its coordination and timing allows us to write fast, time-independent tests for the state computation itself.

In the example above, we use several of the components we have discussed for concurrent code execution: threads, events, and a queue. We spin up a thread for each traffic light (lines 39 and 40), assign an amount of time that a light should live in each state (line 23), and push events onto our queue to progress each light after waiting the specified amount of time (lines 51-54).

If we  new up a TrafficSystem and start traffic, our lights start up red and green, then the green one goes yellow for five seconds, then it switches to red as the other switches to green. Repeat.

Now, what if we want to be able to interrupt this state—say, with a button for crossing the street, that should change the light colors as long as the green one has already been green for at least 30 seconds? This now means we must be able to introspect on the state at any given time. It also means that, if events are coordinated, we must have a unified understanding of time. So we cannot accomplish the goal with sleeps (which we can’t introspect) or threads (which don’t coordinate).

How might we get around something like this?

We might produce a system that divides each state into ticks of the clock, that counts each tick as its own sort of mini-state and transitions only as each state has run its course…or has run at least 30 seconds of its course when the crossing button is pressed.


#Initial state of traffic light
Init = {
'out1': 'G',
'out2': 'R',
'clock': 0,
'button': False,
'pc': 'G1'
}
# Define functions that determine state membership and state update.
# How to incorporate the button press?
G1 = lambda s: s['pc'] == 'G1' and (
(s['clock'] < 30 and dict(s, clock=s['clock'] + 1))
or (s['clock'] == 30 and dict(s, out1="Y", clock=0, pc="Y1"))
)
Y1 = lambda s: s['pc'] == 'Y1' and (
(s['clock'] < 5 and dict(s, clock=s['clock'] + 1))
or (s['clock'] == 5 and dict(s, out1='R', out2='G', clock=0, pc='G2'))
)
G2 = lambda s: s['pc'] == 'G2' and (
((s['clock'] == 60 or (s['clock'] >= 30 and s['button']))
and dict(s, out2="Y", clock=0, button=False, pc="Y2"))
or (s['clock'] < 60 and dict(s, clock=s['clock'] + 1))
)
Y2 = lambda s: s['pc'] == 'Y2' and (
(s['clock'] < 5 and dict(s, clock=s['clock'] + 1))
or (s['clock'] == 5 and dict(s, out1='G', out2='R', clock=0, pc='G1'))
)
# Next state relationship
Next = lambda s: G1(s) or Y1(s) or G2(s) or Y2(s)
def run():
s = Init
while s:
print(s)
s = Next(s) # State transition
# If here. There was no next state
print("DEADLOCK")
run()

This code looks strange, doesn’t it? It abuses the implementation of Python in which the predicate of and is returned such that state transitions happen when the seconds counter reaches a predefined number. When you run this, you’ll see the traffic light cycle through its states in rapid succession. This code manages its transitions independently of a timer (right now), focused chiefly on the order, We could insert a timer when that is needed.

So what might a first pass at concurrency look like for our server?

We might spin up a new thread for each client connection we receive, such that the client and server communications each happen in their own thread:

Screen Shot 2019-12-23 at 2.21.06 PM

Then we add a lock in our KeyValueStore to prevent simultaneous writes on the same key from corrupting our data:

Screen Shot 2019-12-23 at 2.18.36 PMScreen Shot 2019-12-23 at 2.18.44 PM

We now have our building blocks in place. It’s time to start getting more specific to the Raft implementation.

As we do that, it’s worth reviewing Raft’s ultimate purpose: to allow a server cluster to operate as a single source of truth without possessing a single point of failure. For that to work, it must be possible to coordinate the servers’ understandings of the up-to-date data. How do the servers catch each other up on the data if some of them go down and come back up?

It is this piece—log replication—that we turn to next.

If you liked this piece, you might also like:

The SICP series (based on another Dave Beazley course)

The Crafting Interpreters Series (ongoing)

The stuff in the (brand new!) “Debugging” category (I haz a proud)

2 comments

  1. “Suppose one thread spun up in our server sets the key chelsea to point to the value rules, while another thread sets the key chelsea to point to the value rules”
    I think the second thred was suppsoed to be “rocks”

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.