Labs are tentative and subject to change. It will be wise to start working on them after they are officially “released”.
- Lab 0: Basic parallel program
- Lab 1: Parallel execution
- Lab 2: Fault tolerance of workers
- Lab 3: Stream processing
- Lab 4: Redis FT
- Lab 5: Strong eventual consistency amidst network partitions
We will build a distributed word count application. But for simplicity, we will pretend that our multi-core system is a distributed system in itself.
We will start stateless python processes to act as distributed workers. These workers will co-ordinate with each other through Redis. Redis behaves like the master (or driver program) in contemporary systems.
All workers have access to a shared file system for reading and writing large inputs. This is trivially true on our single system. In a real setup, workers would additionally use a distributed file system like HDFS or a blob store like S3.
Please keep track of every lab’s work and the code. You will be periodically asked to upload reports covering the labs.
Lab 0: Basic parallel program
The goal of this lab is to have the basic setup working. You need to have
mkdir wss22 cd wss22/ conda create --name wss22 conda activate wss22
Further, you should download several large text files from here into a local folder. Next, write a simple Python program to count the words from all the files that you downloaded. You may print top 10 words counted by your program. Later, we will use this program to verify that your parallel system is correctly counting the words and is hopefully much faster.
Remember that $Efficiency = T_s / p T_p$, where $T_s$ is the time taken by the serial program, $p$ is the number of workers, and $T_p$ is the time taken by the parallel program. Next, draw the efficiency curves that we saw during the lecture hours.
- Keep the input files the same, and increase the number of workers.
- Keep the number of workers the same, increase the number of input files.
Once you are done with Part 1, familiarize yourself with
Redis. You need
redis installed for this.
docker run -d -p 6379:6379 redis:7.0-rc3
Lab 1: Parallel execution
In this lab, we will make the word count application run end-to-end using Redis.
But before that, you should read about
redis streams. You need
the following redis stream commands for this lab:
xack. Understand what they do.
Download the starter code.
unzip wss22-starter-lab2.zip cd wss22-starter-lab2/ pip3 install -r requirements.txt redis-cli CONFIG SET requirepass "pass"
constants.py to point to your
data folder. Run
python3 client.py. In this lab, you have to modify
The basic structure is as follows:
client.pyiterates over the folder with the text files to add the file paths into a redis stream using
xadd. It then starts the worker processes.
- Worker processes do
xreadgroupto read one file name from the Redis stream. Call
xreadgroupsuch that each worker gets a different file name.
- Worker process reads the file it is supposed to work on and counts each word’s frequency.
- When done, the worker process can use
zincrbyto increment each word’s count in a redis sorted set. And finally
xackthe message containing the filename.
- Then it reads another file by again calling
xreadgroup. If there are no more files to process, it exits.
- Compare the performance with your serial and parallel implementation in Lab-1.
- What if for a given text file, instead of locally counting all the words in
one go and then doing
zincrby, workers do
zincrbywith a count of 1 as soon as they read a word from the file?
- How does the file size impact completion time? What if each file is 100 bytes, 1 Kb, 10 Kb, …, 1 GB?
- How does your system handle skew? What if 100 input files are 100 MB each, but 1 file is 1 GB?
- How do stragglers affect your system? What if one of the worker is running at just half the speed of the other workers?
- You may create smaller files by opening a file in text editor and truncating the file after a few lines.
- You may create larger files by recursively running
cat in.txt in.txt in.txt in.txt > out.txt.
- You may use cgroups to slow down worker processes.
- The serial program you wrote in Lab-1 will come handy to test the correctness of your system.
Lab 2: Fault tolerance of workers
Now we wish to ensure that our system is tolerant to worker failures. Since, workers are stateless, we should be ok with losing worker state. But, we still have to ensure two things:
- Updates to redis should be made atomic. If a worker crashes after incrementing
counts of only a few words, then we will end up with incorrect counts. See
Redis fcall to
xacka file and to increment word counts as one atomic operation.
- Consumer groups in Redis streams ensure that each worker gets a unique file
name. But, if a worker crashes after getting a file name from Redis stream,
that file’s words may never get counted. Therefore, other workers will
have to steal filenames that have not been
xacked till a timeout. See xautoclaim to do so.
- Inject failures at different points in the worker code to verify that the word counts are indeed correct even after worker failures.
- How does the
xautoclaimtimeout affect your system? Compare the completion time with a very small timeout and a very large timeout both with and without failures.
- Do stragglers still affect your system? Other workers should steal the file of a straggler and will complete their count. Observe this behavior by severely slowing down a worker.
- But, now the straggler and the worker who stole its file might both try to update the count. How can you make this update idempotent?
- You may add crash points in the worker process to control where and how
workers crash. For instance, after the worker reads a filename from
xreadgroup, it may call
sys.exit()if a certain worker flag is set. Configure different flags for different workers at the time of their creation to verify fault tolerance.
- Workers can not exit until all the other workers are done with their files. Use xpending to verify this before exiting from workers.
xackreturns the number of Redis stream messages that were actually acknowledged. Verify that
xackreturns 1 before writing to the word count sorted set to get idempotence.
Lab 3: Stream processing
Our implementation already supports micro-batching approach to streaming. We will modify the client program to inject a certain number of files containing tweets to the redis stream every second. These files can be thought of as a micro-batch.
What we want to do in this lab is to measure the latency of each file i.e, the time from injecting the file into the stream to the time when file was processed successfully. You can measure this inside your redis function that completes processing of a file.
We want to draw a timeline of this latency at different injection rates, in the presence of failed workers, and stragglers. We hope to see that when workers fail (or become slow), the latency temporarily goes up, but then it recovers. You might have to implement some backpressure handling strategies if workers are not able to keep up with the file injection rate.
Lab 4: Redis FT
We would like to now ensure that our system tolerates Redis failures. We need not change the worker code for this lab. To reason about correctness, note that a Redis instance handles one command after another in a single thread to keep it linearizable.
In this lab, we will periodically create a checkpoint using
command. Redis starts periodically storing a
dump.rdb file on disk.
You can run
CONFIG GET dir from
redis-cli to find the directory where
dump.rdb gets stored. You may try to crash the Redis instance and then start a
new Redis instance. Redis should automatically read
dump.rdb file and restore
from it. Verify that this new instance have the keys from the old instance by
keys * using
Now while the job is running, try crashing the Redis instance and restarting another one. From a correctness standpoint, checkpoints are consistent because Redis has a single event loop and because all our edits were made atomic in lab 2.
In other words, let us say that a file
foo was processed after the checkpoint.
Now after a failover, the new Redis instance (recovered from the checkpoint)
will remember that the file has NOT yet been
xacked. Therefore, a worker will
again receive the file for processing and it will again
xack + increment word
counts in one atomic operation. Since our workers are stateless and file counts
are deterministic, recomputing a file’s word counts are ok.
Ensure that you set up the new instance in an identical manner, i.e, listen on the same port, set up the same password, and insert the same lua functions.
- Does the job still complete if you crash and restart a Redis instance?
- What if Redis crashes while the client was bootstrapping files into the Redis stream? How can you handle this?
- What if Redis crashes after the workers complete counting words from all the files but before the client reads the top-3 words? How can you handle this?
- Without any faults in the system, does the completion time increase due to the periodic backups?
- Does the frequency of backups affect the completion time without any Redis failures?
- How do input file sizes affect job completion time if we have Redis failures?
- How does the system behave if you keep killing and restarting the Redis instance every N seconds?
Using synchronous replication
Here, we create
2f+1 Redis replicas and connect them with Raft using the
RedisRaft module. The replicas are
always kept consistent by doing the replication synchronously. In other words,
the leader does not return from a Redis command until it hears back an
acknowledgement from a majority of replicas.
Try arbitrarily crashing and restarting
f replicas while the job is running
and observe that the job finishes successfully. The good thing about this design
is that we never have to recompute a file (rollback computation) after
failovers. But the bad thing is that during normal operations (without Redis
failures), each Redis write operation is now slower because of the added
overhead of replicating logs.
- Without any faults in the system, how does the completion time change with
RedisRaft compared to a single Redis instance?
- Try this experiment with small input files and with large input files.
- Does the job still complete if you crash one Redis instance? What if you crash two Redis instances?
- How does the system behave if you keep killing and restarting a random Redis instance every N seconds?
Lab 5: Strong eventual consistency amidst network partitions
In this lab, we create 3 identical VMs each containing multiple stateless worker processes, a driver process, and a Redis instance. Just like before, the driver process writes all the local files to be counted in the local Redis instance’s stream.
Each worker writes word counts to all of the 3 Redis instances. Once in a while, we arbitrarily partition workers from Redis instances. Workers consider a write to be successful if they are able to write to atleast one Redis instance. After the network partition heals, we do read repair so that the partitioned Redis instances can learn about the word counts that they may have missed.
- Without any partitions and crashes, is the word count now slower than the previous fault tolerant implementations?
- If a Redis instance recovers from a checkpoint, do we still need to rollback the computation? Can this rollback be avoided now?
- How quick is read repair?
- If we partition and heal the partition multiple times, i.e, do read repair multiple times, is the final output still correct? Why?
- The main goal is to design your own CRDT implementation to keep the word counts so that you can get correct counts after multiple read repairs. Notice that incrementing the word count of one file commutes with incrementing the word count of another file.