Distributed System notes
Design goals
-
Infrastructure services: storage, communication, computing. The course goal is to build an abstraction that hides the distribution while realizing high performance.
-
performance -> scalability
- scenario: n users <=> 1 web server <=> 1 database
- When n is 1million, scaling up web servers and spliting users accounts into multiple webs can achieve performance until web-db communication becomes bottle neck, that is, scalable web servers no longer help. Then db will need to be refactored so that is splitting data into multiple dbs.
- terms
scalability
: N times computers/resources can achieve N times throughput.
- special cases
- quick response time for a single user request
- all users want to update the same data
-
fault tolerance -> availability, recoveribility
-
scenario: always sth brokes when running 1000 computers vs running 1
-
rare events become real problems
-
terms
-
availability
: system will keep operating when certain kinds of failures occur. It will not be available when too many failures occur. -
recoverability
: system stops running/reponding to requests until the failed componets are repaired. Need to save latest data on disk etc so they can recover it when powering up. -
normal scale: 3 GHz microprocessor
-
-
tools
- non-volatile storage: store log of states. -> writing NV storage is too slow (move disk arms and wait for a disk platter to rotate)
- replicated servers: use a copy -> two replicas drifted out of sync
-
-
consistency
-
General-purpose infrastructure needs well-defined behavior.
-
everything could broke and lead to lose replica.
-
strong consistency often means poor performance relative to weaker consistency.
-
case studies: MapReduce 2004¶
Consistency and linearizability¶
case studies: GFS 2003¶
Primary/Backup Replication¶
- One way of providing fault tolerance: Replication.
- Replication deals with “fail-stop” failure of a single replica. It may not be able to detect h/w or bugs in s/w or human config errors. Geo-failure are dealt with only if replicas are physically separated.
- Failures in one server are independent from another otherwise it’s useless to use replication.
- Whether replication is worthwhile the N*expense of computing resources depends on the consequences of failures.
- Two main approaches:
- state transfer: primary sends internal memory (state) to backup
- replicated state machine: clients send operations to primary and primary sends those external events to backups. All replicas have deterministic same state as the primary if they execute the operations in the same order.
- tradeoffs:
- state transfer is simpler but may be large and slow to transfer over network
- replicated state machine are comparably smaller than state and generates less network traffic but complex to get right.
- example: VM-FT uses replicated state machine on single-core processor. It uses state transfer scheme when it is expanded to multi-core.
- Design points of replication schema
- What state to replicate?
- Does primary have to wait for backup?
- When to cut over to backup?
- Are anomalies visible at cut-over?
- How to bring a replacement backup up to speed?
Problem 1: At what level do we want replicas to be identical?
application state: forward op stream, can be efficient. ex: GFS.
machine level, registers and RAM content: forward machine events (interrupts, DMA, &c), not as efficient. ex: run an existing software without modification
case studies: VMware FT 2010¶
Terms:
- vm-ft: fault-tolerant virtual machines
- failstop: if something goes wrong, the computer would stop executing instead of generating incorrect results. E.g. unplug the power cable out of the server, CPU overheats.
- failover: if something goes wrong, the computer will be replaced by another one.
- virtual lockstep
- mode: logging, replaying, normal
- a deferred-execution context (similar to a tasklet in Linux) is a mechanism for deferring work to be executed later at a more appropriate time
FT design¶
- Synchronization of primary and backup VMs is based on the technique of deterministic replay. Briefly talking, deterministic replay transfers the non-deterministic operations/events to the log entries of the file. The backup can read from the file and replay execution of the primary.
- non-determinism of events and operations in VMs: virtual interrupts, reading the clock cycle counter of the processor
- question: non-deterministic input (capture and apply) -> deterministic execution + performance unaffected (at instruction level)
- However, that technique for VMware vSphere platform is introduced in 2007.
- FT protocol: No data is lost if a backup takes over after the primary fails.
- Output requirement <- output rule: The backup is supposed to execute consistently as the primary after it takes over the primary VM. The rule to guarantee that is two phase commit. The primary delays sending the output until the backup VM has received and acknowledged the log entries of the output operations.
- Can tolerate lost packets: incoming packets may be dropped during failure of the primary due to reasons unrelated to that failure.
failure handling¶
-
context: time lag on the execution of the backup VM. To control the time lag to be less than 100ms and no greater than 1s, the primary VM’s CPU limit is managed to increase/decrease to occupy/spare more execution time.
-
failure detection: use heartbeating (UDP) to monitor the traffic on the logging channel that connects primary and backup. A timeout in the flow of log entries and acknowledgements is detected as a failure.
-
split-brain problem: solution: (requirement) only one VM (primary or backup) takes over the execution.
Problem 0: How does VM-FT handle network partitions? That is, is it possible that if the primary and the backup end up in different network partitions that the backup will become a primary too and the system will run with two primaries?
There will be only one VM executing during network partitions. The primary VM delays sending outputs to the external world until the backup receives and acknowledges the log entries sent by it. When network partitions occur, the primary and backup VMs lost information of each other. The failure whether caused by network connection or VM faults is unknown.
VM-FT hence performs an atomic op test-and-set on shared storage that stores the virtual disks of VMs when the primary or the backup wants to go live. If the op succeeds, then the VM is allowed to go live. If not, that means a VM must have been live, so the current VM halts itself. If the VM cannot access shared storage, it waits until it can.
-
Start a new backup on another host: (primary, backup) = (P1, B1) -> (P1 is down, B1 takes over) -> (B1, ?) -> (B1, B2)
- Based on VMware VMotion: migration of a running VM from one server to another server within one second.
- Modified FT VMotion clones a VM to a remote host rather than migrating it within minutes or unnoticeable interruption.
- Simplified steps: the mode of B1, B2: (logging, replaying) -> choose a server to run B2: cluster service decides it -> cloning…
-
To eliminate non-determinism
-
on disk IO issues:
- parallel disk IOs try to access the same location on the shared storage -> detect IO races and make it sequential
- a disk op races with an application op when they are reading the same memory block at the same time -> MMU protection on pages - traps - (too expensive) => bounce buffers (cheaper) to read/write from/at
- when failures happen, no way to find out the IOs issuing during that time are completed or not. -> reissue the pending IOs
-
network IO issues: async updates to a VM’s state while executing ?-> VM traps and interrupts + delaying the sending packets-> performance challenges
-
Aside: Ignored: operations on VM-FT
design alternatives¶
non-shared storage vs shared storage
advantages:
- no delaying disk writes for the primary VM
- adds availability when the primary and backup VMs are far apart
disadvantages:
- need the alternative to handle network partition
- need to sync disks of VMs
Verified Primary/Backup¶
case studies: Grove 2023¶
Raft¶
site:
- http://nil.csail.mit.edu/6.824/2022/notes/l-raft.txt
- http://nil.csail.mit.edu/6.824/2022/notes/l-raft2.txt
Problem 0: a machine doesn’t understand the real-life events like network broken, disk failures,
server crashed. All it can notice is the state changes.
- It appears that no response to a query over the network.
- Unless an outside agent will decide when to switch servers/a perfectly reliable network
(never breaks)/ a single perfectly reliable server (vmware FT’s test-and-set server) -> possible
single-point failure - How to automate it?
Main idea:
- majority rule: same as Paxos. At lease one server is in the intersection of any two intersect
which can convey info about previous decisions. Such a system often is called quorum systems. - electing a new leader
- Logs of each server can be different but they will converge to be identical despite failures.
The crucial property:
- committed: an entry that is commited won’t be forgotten despite failures.
- logs with ordered commands:
- It stores the ordered commands in case leader must re-send to followers or or for replay after
reboot (persistence). - help to decide if the machines have identical logs
- help replicas agree on a single execution order
- It stores the ordered commands in case leader must re-send to followers or or for replay after
- leader: leader ensures the identical execution order of the same commands on all servers.
Raft provides:
- interface:
- Start(command) (index, term, isleader): only leader can start proposals.
- ApplyMsg(index, command): each peer sends this message for each committed entry to local
service.
- more understandable than Paxos
A little about history of consensus protocols: paxos (1998), viewstamped replication (1988,
2012). Raft (2014) is more modern. They are all partition-tolerant.
1 | Raft diagram: |
Problem 1: How Raft works?
Leader election¶
Leader (srv, term): the term helps servers to follow latest leader.
1 | Election diagram: |
Election results:
Each term has one leader at most despite failures (network partitions, server crashes).
- One leader is newly elected:
- The leader sends heartbeat messages to followers in a heartbeat interval.
- A server learns about newly elected leader from heartbeats with a higher term number in
AppendEntries requests.
- Election failed for less than a maj or notes: New election starts with new term number. Old
candidates quit.
- Split votes
- Not enough reachable servers
- Old leader holds hallucination that it is still a leader.
Election rules:
- A peer stays follower if it hears from current leader with heartbeat message.
- A peer starts leader election when it doesn’t receive the heartbeat message from current leader
for anelection timeout
. - Each server can cast one vote per term.
- A candidate will vote for itself.
- A peer will vote for the first candidate that asks
- A candidate becomes a leader when it receives votes from a majority of peers for a given term.
Problem 1.1: how does Raft avoid split votes?
Split votes can happen when simutaneous candidates ask for votes from a equal number of peers. Raft
uses random election timeout to avoid this problem. The election timeout must be chosen wisely. It
should be at least a few heartbeat intervals in case of network delays, long enough to elect a
leader before the next election starts. It should also be short enough to allow retries and react to
failures quickly.
Log¶
Keeping a log for old commands is common. But changing leader is the reason to have an ordered log
which helps leader to check follower states.
Every server should hold an identical log. That means no server can execute a different command for
a log entry that has been executed a command by any other server.
1 | Log diagram: |
The first question is could this scenario arise and how.
Example 2: AE(log entry) details after Leader (S3, 6):
- S3 is chosen as new leader for term 6.
- nextIndex[S2] = 13, nextIndex[S1] = 13
- It sends AE(13) with prevLogIndex=12, prevLogTerm=5
- S2 replies false, mismatched prevLogTerm.
- S3 decrements nextIndex[S2] to 12 (roll-back scheme, roll back one entry at a time)
- S3 sends AE(12+13) with prevLogIndex=11, prevLogTerm=3
- S2 deleted its entry 12
The same for S1.
The result of roll-back: followers hold the identical log with the leader by deleting the tail of log
with mismatched term and accepting leader’s entries after that point.
It’s ok to forget about S2’s (12, 4) entry. It’s not received by any server yet and hence not
committed. The client will resend the discarded commands of that term by and by.
New leader would not roll back committed entries from end of previous term.
-> Raft needs to ensure elected leader has all committed log entries.
1 | Example 3: |
Example 3: the longest log wins? -> disclose voting details.
step 1: S2 or S3 wins election for term 5. It sends AEs to all.
step 2: S1 incurs network partition: S1 is isolated from S2 and S3.
step 3:
- S1: it doesnot hear from leader. Next election for term 6 starts. Can’t get maj and start new
election for term 7… - S2 or S3: happily lives after S1 is disconnected since one of them will get maj and become leader.
The problem is why won’t happy server choose 6 as next term?
Backing things up a litter, S1 is leader for term 6; crash + reboot; leader in term 7; crash and
stay down. It crashes after it only sends AE to itself.
Because of leader (S1, 7), one of S2 or S3 learns about the last term is 7 while voting. So the next
term will be 8.
All peers reboot. While S1 holds the longest log, it can’t be a leader since entry 8 could have
committed.
end of section 5.4.1:
election restriction
, voters only cast votes for candidate who is at least as up-to-date.
- candidate has higher term in last log entry, or
- candidate has same last term and same length or longer log
The next leader will be S2 or S3. They vote for each other even if network connectivity is intact.
Because they have higher term.
Then S1 will be forced to discard 6,7. -> not committed -> clients will resend the discarded commands
1 | Example 4: |
end of section 5.3: a faster roll-back scheme than backs up one entry per request
How to roll back quickly?
S2: AE(prevLogTerm=6)
-> Reply: S1 rejects with (XTerm, XIndex, XLen)
- case 1, leader doesn’t have XTerm -> nextIndex[S1] = XIndex
- case 2, leader has XTerm -> nextIndex[S1] = leader’s last entry for XTerm
- case 3, follwer’s log too short -> nextIndex[S1] = XLen
(? binary search)
Phrases:
XTerm: term in the conflicting entry (if any)
XIndex: index of first entry with that term (if any)
XLen: log length
Persistence¶
After a server crashes, we can repair it by replacing with a fresh (empty) server or reboot the
crashed server.
- new: requires transfer of entire log/snapshots to new server, slow but necessary in case failure
is permanent. - old: requires state that persists across crashes. Must support this for simultaneous power
failures. -> persistence
What to remember?
- Essential info to let a Raft server rejoin the group. Save them after each change or before
sending RPCs. - log[], a server that has latest log entries needs to keep that for furture use of new leader.
- currentTerm, to ensure terms of system only increase. Each term has at most one leader to detect
RPCs from stale leaders and candidates. - votedFor, a server needs to remember who it voted for in the currrent term in case it is
rebooted and votes for a different candidate.
What can be volatile?
If a state will be reset after reboot, it doesn’t need to be persistent.
Performance cost:
- Persistence is often the bottleneck for performance.
- SSD (0.1ms per write) writes 100 times faster than a hard disk (10 ms per write), limiting us
to 100-10,000 ops/sec.
- SSD (0.1ms per write) writes 100 times faster than a hard disk (10 ms per write), limiting us
- optimizations
- batch many log entries per disk write
- persist to faster storage (battery-backed RAM)
- Another bottleneck RPC takes << 1 ms on a LAN
How does the service recover its state after a crash+reboot?
- Simple approach: start with empty state and re-play Raft’s entire log.
- Faster approach: snapshot + replay the tail of the log (that snapshot didn’t cover)
Log compaction and Snapshots¶
State = operation history
service states are usually much smaller than the complete log. A server can’t discard un-executed
entries which is not yet reflected in the state and un-committed entries which could be part of
leader’s majority.
To solve the problem of huge replicated log, service periodically creates persistent “snapshot”. A
snapshot includes copy of service state and index of last included log entry. Service tells Raft it
is snapshotted through some log index so Raft can discard log before that index. A server can
create a snapshot and discard prefix of log whenever.
In the case of crash+restart, service reads snapshot from disk and tells Raft last included index
to avoid duplicate applying of log entries.
When follower’s log ends before leader’s log starts, that part p is lost and can’t be recovered by
AppendEntries RPCs. Instead leader sends InstallSnapshot RPCs.
1 | L: |-p-|xxxxxxxxxxxxxx| |
Linearizability¶
notion of correctness for strong consistency: linearizability
An execution history is linearizable if one can find a total order of all operations,
that matches real-time (for non-overlapping ops), and
in which each read sees the value from the write preceding it in the order.
A history is a record of client operations, each with arguments, return value, time of start,
time completed.
draw the constraint arrows by time rule + value rule
- the order obeys value constraints (W -> R)
- the order obeys real-time constraints
Notation:
- Wx1 means write value 1 to record x, Rx1 means a read of rocord x yielded value 1.
- the history is client-centric view of requests. It is linearizable if there is no cycle
- the form of overlapping operations denotes concurrent operations
1 | example 4: |
From the examples in the lecture, it is known that:
- service can pick either order for concurrent operations
- all clients must see the writes in the same order expecially in the cases of replicas and caches
- reads must return fresh data. Stale values aren’t linearizable
- Duplicate request handling:
- supposing clients re-send requests if they don’t get a reply, leader rememembers client
requests it has seen and replies with saved response from first execution when it sees
duplicate. - it is linearizable to return the old saved value in this case. The reason is that the
duplicate request is seen as the same process of the failed first request, which is concurrent
to the write request.
- supposing clients re-send requests if they don’t get a reply, leader rememembers client
Duplicate RPC detection¶
Q: what should a client do if an operation times out? -> Re-send th request
- server is dead or request dropped
- server executed but request lost. Client still needs the result.
The idea is duplicate RPC detection. Client picks an ID for each request. Re-sends will use the
same ID in same RPC. Service keeps track a table indexed by ID and records value after executing.
If the second RPC arrives with the same ID, the service knows it’s a duplicate and generates reply
from the old saved value in the table.
Q: When can we delete table entries?
- If new leader takes over, how to get the duplicate table?
- No need. Every server updates their duplicate table as they execute.
- If server crashes, how to restore its table?
- replay of log populates the table or snapshots which contains a copy of the table
- as discussed in linearizability section, it is allowed to return the old saved value in this
case.
To keep the table small, service has one table entry per client rather than one per request. Each
client numbers requests sequentially. When server receives a client request with new number, it can
forget about client’s lower entries and update the table entry with the new request.
If a duplicate req arrives before the original executes, the service just call Start() again.
Server won’t execute a req that is already seen.
Read-only operations¶
end of section 8
Q: does the Raft leader have to commit read-only operations in the log before replying?
e.g. Get(key)?
Leader cannot respond immediately to a read-only operation using its current content. The reason is
that a server might have lost the recent election but not realized it yet in cases of network
partitions. Then a new leader processes Put()s for the key so that value in that server becomes
stale. Thus leader must commit read-only operations.
But read-heavy workload are quite normal. Committing read-only operations can take a long time. In
practice, people are often willing to exchange higher performance with stale data.
The idea is leases. Modify Raft as follows: defining a lease period, after each time the leader
gets an AE maj, it is entitled to respond to read-only requests for a lease period without
committing those requests to the log (w/o sending AEs).
A new leader can’t execute write requests until previous lease period has expired. So followers
keep track of the last time they responded to an AE, and tell the new leader in the RequestVote
reply.
As a result, we get faster read-only operations and the history is still linearizable.
*Paxos¶
Assumptions: non-Byzantine failures. Allows failures of missing messages and unordered requests.
P1. An acceptor must accept the first proposal that it receives.
For the acceptance of majority, one acceptor must accecpt >= 1 proposals. Assign number to the proposal as (seq_number, value).
A value is chosen only when it is accepted by a majority of acceptors. -> A value is chosen only when the proposal is chosen.
If multiple proposals are chosen, then it is guaranteed that all chosen proposals have the same value.
P2. If a proposal with value v is chosen, then every higher-numbered proposal that is chosen has value v.
-> 2a. accepted by any acceptor
-> 2b. issued by any proposer
-> 2c. For any v and n, if a proposal(n, v) is issued, then there is a set S consisting of a majority of acceptors such that either (a) no acceptor in S has accepted any proposal numbered less than n, or (b) v is the value of the highest-numbered proposal among all proposals numbered less than n accepted by the acceptors in S.
To prove 2c, the proposer (n, v) controls future acceptance by requesting a promise from acceptors that no more such acceptances of proposals numbered less than n.
Proposer’s algorithm:
(1) a prepare
req
A proposer send a prososal (n) and demands its acceptors to respond with:
- a promise never again to agree a proposal numbered less than n
- the proposal with the highest number less than n that it has accepted (previous accepted value), if any.
After the proposer gets the responses from a majority of the acceptors, it is getting into phase 2.
(2) an accept
req, sharing the same set of acceptors as phase-1.
The proposer issues a proposal (n, v) where v is the value either the highest-numbered proposal among the responses or the any one selected by the proposer if reported no proposals.
P1a. An acceptor can accept a proposal numbered n iff it has not responded to a prepare request having a number greater than n.
Phase 1:
(1) proposer: sends a prepare req (n)
(2) acceptor: responds the proposal(n, v) or null
Phase 2:
(1) proposer: sends an accept req (n, v) to the same majority group of acceptors
(2) acceptor: accepts (n, v) unless there are prepare requests numbered greater than n.
Optimizations:
- acceptors ignore the prepare requests numbered less than its accepted request number.
- acceptors notify the proposers the highest number of its prepare requests so that proposers can drop the proposals with lower numbered req.
Phase 3: learner: the acceptors can respond their acceptances to some set of distinguished learners.
# of distinguished learners^, reliability^ & communication complexity^.
Termination policy: a distinguished proposer must be selected as the only one to issue proposals. A reliable algorithm for electing a proposer must use either randomness or real time (timeouts).
implementation:
Paxos chooses a leader to perform the role of the distinguished proposer and the distinguished learner.
For persistence, an acceptor records its response in storage before sending. And each proposer keeps the highest-numbered proposal in storage before issuing.
Suppose that the acceptors are A, B, and C. A and B are also proposers. How does Paxos ensure that the following sequence of events can’t happen? What actually happens, and which value is ultimately chosen?
- A sends prepare requests with proposal number 1, and gets responses from A, B, and C.
- A sends
accept(1, "foo")
to A and C and gets responses from both. Because a majority accepted, A thinks that"foo"
has been chosen. However, A crashes before sending anaccept
to B.- B sends prepare messages with proposal number 2, and gets responses from B and C.
- B sends
accept(2, "bar")
messages to B and C and gets responses from both, so B thinks that"bar"
has been chosen.
After step 2, B and C have had the prepare message from A which is #1. After step 3, B would get response from C with value “foo” and gets null from B. At step 4, B will not send accecpt(2, “bar”) but (2, “foo”).
handout¶
site:
- http://nil.csail.mit.edu/6.824/2015/notes/l-paxos.txt
- http://nil.csail.mit.edu/6.824/2015/notes/paxos-code.html
There are protocols such as Paxos, Raft, Viewstamped Replication, Raft, Zookeeper providing strong
consistency. Paxos is different for its simplicity and wide application. Many other protocols can
be viewed as variants of Paxos.
Two topics are:
Problem 1: How Paxos works?
Agreement is hard :(
- multiple rounds for the op, tentative initialy but don’t know when agreement is permanent
- Agreement has to be able to complete even with failed servers. some servers decides a value but
other servers don’t respond. Can’t distinguish between network partition (servers are running
but cannot be reached by other partition) or failed servers.
Two main ideas in Paxos to address these problems:
- many rounds are possible but they will converge on one value
- majority rule: a key point is any two majorities overlap. At least one server with earlier
majority is shared by the later majority.
Paxos sketch:
- each server consists of three logical entities, acceptor, proposer, learner. Maybe more than
one proposer when multiple clients submit requests to different servers at the same time. - Proposers contacts acceptors to assemble a majority. if not get a majority, new round.
The crucial property:
- If a value was chosen, any subsequent choice must be the same value
- chosen is system-wide property
Why n?
- It distingushes among multiple rounds. Later rounds can supersede earlier rounds.
- n = <time, server_id>, n must be unique and roughly follow time.
- round(=proposal) numbers are WITHIN a particular instance.
What’s the commit point?
i.e. a point that agreement has reached or a server has executed the op
After a majority has the same (v_a, n_a). Neither v_a nor n_a is sufficient.
Thinking (discussing with Claude):
- why does the proposer need to pick v_a with highest n_a?
The proposer needs to pick an accepted value which is associated with the highest proposal
number that acceptors have seen. It represents the most recent accepted value.
- why does prepare handler check that n > n_p?
It picks a proposal number that is higher than current highest prepare seen. The proposer
should not send a proposal that is older than previous ones.
- why does accept handler check n >= n_p?
The acceptor promises not to accept any proposal with number less than n_p.
- why does accept handler update n_p = n?
The accept handler updates n_a because it’s accepting a value. The real question is why n_p is
updated too. If n_p < n, then n_p < n_a. The proposal with n_p will be rejected by the
acceptor.
- what if proposer S2 chooses n < S1’s n?
It will be rejected by any acceptor that has seen S1’s proposal.
- what if an acceptor crashes after receiving accept?
Assume that an acceptor receiving accept means that states were updated but it didn’t send
accept_ok, then the proposer will retry with a new proposal number and get rejected since the
acceptor is dead.
One acceptor crash will not affect the the system as long as the majority of the paxos peers
is alive. Since the paxos lab is not persistent, the peer will lost its states. It will get
updated when it is alive.
- what if an acceptor reboots after sending prepare_ok?
The next accept will be rejected because the acceptor forgets the previous prepare which updates
n_p.
Paxos gets stuck:
- not a majority that can communicate
- propsers retry immediately after accept_reject (should retry after a random amount of time)
1 | example 1 (normal operation): |
Problem 2: How to integrate Paxos into a real system?
Paxos-based replication:
Each client ops are appended to a replicated log of servers with a separate proposal number.
Log entries are numbered, called instances (seq). Paxos reaches agreement on each log entry.
Paxos provides:
- interface:
- Start (seq, v): Propose v for seq, without waiting for agreement to complete
- fate, v := Status(seq): find out the agreed value for seq. Only check local states, w/o
communications between servers.
- correctness: once any agreement reached, never changes. After agreement, servers may update state
- fault tolerance: tolerate non-reachability of a minority of servers
- liveness: will reach agreement if a majority of servers can communicate for long enough
Example (context): Server Si uses Paxos to get all servers to agree that a log entry x holds a
client op (Get, Put, Append).
Why a log is better than all replicas to agree on each op in lock-step:
- when state is small: agree on entire state -> a tie
- when state is very large: log describes changes -> win
- log helps to recover from failures: slow, miss messages, crash, start
Summary of how to use Paxos for RSM:
A log of Paxos instances (client op). Different instances’ Paxos agreements are independent.
*case studies: ZooKeeper 2010¶
ZooKeeper: Wait-free coordination for Internet-scale systems
Read: 1-2
Problem 0: One use of Zookeeper is as a fault-tolerant lock service (see the section “Simple locks” on page 6). Why isn’t possible for two clients to acquire the same lock? In particular, how does Zookeeper decide if a client has failed and it can give the client’s locks to other clients?
Because a lock can only be created once and held by one client. Once a lock is created, another client that tries to acquire the same lock will read the znode with the watch flag set. Zookeeper decides a client has failed by watch events.
> Claude: The key safety guarantee comes from combining these mechanisms: locks are typically implemented using ephemeral znodes, and session expiration is what ultimately determines if a client has failed. A client’s lock (ephemeral znode) will only be released when its session expires, ensuring no two clients can hold the same lock simultaneously
site: http://nil.csail.mit.edu/6.824/2020/notes/l-zookeeper.txt
Zookeeper is a Raft-like service with leader-based replication. We care about Zookeeper from two perspectives:
-
APIs for a general purpose coordination service
-
performance of replication: will Nx replica server bring Nx performance?
Normally, it wouldn’t for leader-based replication. The server requires leader to replicate one by one, slowing down the performance inverse proportionally to the number of replicas.
When a replica servers read-only client requests from their local state w/o other peers, reads from followers are not linearizable because they may return stale data. Even if a client is reading from an up-to-date replica, it still risks the possibility of seeing data values go backwards in time.
Zookeeper avoids this issue by changing the definition of correctness. It looses the constraints of linearizability by allowing reads to yield stale data so that clients are able to read from all replicas, changing total read capacity from O(1) to O(# servers). That highly increases the performance for read-heavy workloads.
Ordering guarantees:
- linearizable writes: writes are ordered with
zxid
and execute in zxid order. - FIFO client order

-
API overview
-
atomicity for mini-transaction: Zookeeper is good for small piece of data like 100MB, not as big as 100GB.
1
2
3
4
5
6# mini-transaction example
while true:
x, v := getData("f")
if setData(x + 1, version=v):
break
(sleep)
Scalable lock compared to threaded lock of Go:
: service creates sequential files. Each lock at most has one file requests to acquire. For example, supposing files are numbered with 0 - 3 and f0 holds the lock, the other files trying to acquire the lock look like the following diagram.
1 | # Locks w/o herd effect |
*case studies: Chubby 2006¶
The Chubby lock service for loosely-coupled distributed systems
Terms:
- Advisory locks are a type of locking mechanism where processes voluntarily check and respect locks, but the system doesn’t enforce the locks.
- Whole-file reads refer to a pattern where applications read entire files from beginning to end, rather than accessing random portions or small parts of files. This was one of the key workload assumptions in systems like Google File System (GFS).
This paper introduces Chubby, a distributed lock service, for use within a distributed system consisting of fairly large numbers of small machines connected by a high-speed network. It aims to deal with the problem of electing a leader from among a set of servers. The authors only want to discuss the design and implementation through real experience.
*case studies: Harp 1991¶
site: http://nil.csail.mit.edu/6.824/2015/notes/l-harp.txt
Replication in the Harp File System
Read: 1, 3-6
Question: Figures 5-1, 5-2, and 5-3 show that Harp often finishes benchmarks faster than a conventional non-replicated NFS server. This may be surprising, since you might expect Harp to do strictly more work than a conventional NFS server (for example, Harp must manage the replication). Why is Harp often faster? Will all NFS operations be faster with Harp than on a conventional NFS server, or just some of them? Which?
Harp is often faster because it applies three approaches to increase performance.
First, for non-modification operations, the primary node doesn’t send requests to backups and processes by local content.
Furthermore, since read operations in a file system are modifiled which changes the last viewed time of files upon reading, Harp makes a concession of consistency by allowing read requests to return to the client immediately from the primary. Such requests don’t have to wait for the acknowledgements of backups and increase performance. The rest of processing read requests between primary and backup is running at the background.
Secondly, a part of work is put background compared to unreplicated NFS server which is:
- in the phase 2 of 2PC, backups are informed about the commit
- committed records of event are applies to file system (apply)
Harp is a file service in a distributed network which was designed for small clusters. It adopts primary copy replication technique and write-ahead log, providing users similar interface to NFS at that time. Additional hardwares are used for reliable storage and UPS (uninterruptible power supply) which persists the log despite (short) power failures.
Environment: nodes are connected via a network; synchronized clocks allows a skew of <100ms, node failures include network, hardware, power; a crash of node is fail-stop.
Problem: how replication works?
The structure of section 4 is: overview, system behavior of failure cases, no failure case (normal case processing), failure case (fail-over), atomicity of transactions
The things I noticed are:
- Some familiar stuff: Harp uses primary/back replication model + WAL (limited size; volatile; redo log, not undo) to achieve reliability, consistency, correctness, availability.
- The replication models applies 2PC + view servers
- It is fault-tolerant, allowing <maj # of node failures
- It is able to tolerate simultaneous failures.
- reduce log size by exchanging and comparing LB and GLB.
- loosely synchronized clocks with a skew of < 100ms.
- Harp speeds up read op processing by using leases and sacrifice of a little consistency of last viewed time. When T(primary’s clock) > T (backup’s clock) + t (promised by backup which won’t start a new view less than this time), the read requests are sent to backups. Otherwise, the primary returns the result of reads directly.
- The member of Harp’s group has three roles: primary, backup, witness. Harp distributes work to all members by assigning roles with unmatched group. For example, in a system with three nodes A, B, C, A could be primary of g0, backup of g1, and witness of g2.
- Witness servers don’t store files because only (n+1) servers to store is enough for a system to tolerate n failure nodes. A witness server can be promoted and demoted.
- A view of the system is numbered. The higher, the most recent. Harp avoids simultaneous view changes which slow down fail-over.
Other things I wasn’t aware of:
- Harp uses Raft-like replication techniques, pre-dating Raft by 20+ years. It describes complete replicated systems and how it adapts to state machine abstraction.
- question: how does it handle n/w partition for modified operations?
- q: When can Harp form a new view?
- Optimizations of Harp: pipelining of requests to backup (numbered event records?)