SD distributed system

our goal is to build reliable systems from unreliable components. we want to build systems that serve many clients, store a lot of data, perform well, all while keeping availability high.

L14:RAID:reliability via replication

Question: how to build fault-tolerant/reliable systems?

The high-level process of dealing with failures is to identify the faults, detect/contain the faults, and handle the faults. in lecture, we will build a set of abstractions to make that process more manageable.

How to measure success?

  • availability is one way to measure reliability. Reliability metrics differ from different systems.
  • Mean time to failure, mean time to repair, mean time between failures, availability

Problem: single disk failures

RAID is a data storage virtualization technology that combines multiple physical disk drive components into one or more logical units for the purposes of data redundancy, performance improvement, or both.

Three effective approaches are as following:

  1. RAID 1: mirroring. Make a replica to store the same data inside.
  2. RAID 4: make a dedicated parity disk. A xor B = C. We can induce the failed one no matter which one failed.
  3. RAID 5: intersperse the parity sectors amongst all N+1 disks to load balance writes.

Review: RAID 不能解决现有的问题,因为现实中不会永远只是单个磁盘崩溃,需要更系统地处理方法。

L15:Transactions: atomicity & isolation

problem: Possible crash in some point at thousand lines of code.

1
2
3
4
5
6
transfer (bank_file, account_a, account_b, amount): 
bank = read_accounts(bank_file)
bank[account_a] = bank[account_a] - amount
bank[account_b] = bank[account_b] + amount
write_accounts(tmp_file)
rename(tmp_file, bank_file)

Atomicity

What is atomicity?

  • an action is atomic if it happens completely or not at all. if we can guarantee atomicity, it will be much easier to reason about failures.

  • Understanding that this code should be atomic comes from understanding what the application is doing. What actions need to be atomic depends on the application.

分析:为了实现文件读写的原子性,我们需要保证文件的数据不能因为某行程序崩溃而丢失。

Golden rule: never modify the only copy.

Attempt 1: 只用一个文件存储账户,账户的任何操作(读写,加载,更新)都在这个文件中直接修改,当修改完成,再从内存写回到磁盘中。

Attempt 2: shadow copy:创建一个复制的文件后再对原文件进行操作(比如rename). 这里需要一步重命名文件,以使能对failures更准确地推断。(在Example中详述)

summary of shadow copy: it meets our goal. However, it perform badly. Problems include requiring copying the entire file for even small changes…

Example: file crash

Problem1: a crash during rename() potentially leaves bank_file in an intermediate state.

Solution 1: make rename() atomic

  • Single-sector writes are atomic
1
2
3
4
5
6
rename(tmp_file, orig_file): 
tmp_inode = lookup(tmp_file) // = 2
orig_inode = lookup(orig_file) // = 1
orig_file dirent = tmp_inode <- crash!
remove tmp_file dirent
decref(orig_inode)

Problem2: rename happened, but refcounts wrong

Solution2: recover the disk after a crash.

如果在recover时crash,那么继续recover

1
2
3
4
5
recover(disk): 
for inode in disk.inodes:
inode.refcount = find_all_refs(disk.root_dir, inode)
if exists(tmp_file):
unlink(tmp_file)

isolation

isolation refers to how and when the effects of one action (A1) are visible to another (A2) where A1 and A2 appear to have executed serially, even if they are actually executed in parallel.

Problem: how to use lock?

Problem: transactions are powerful abstraction that provides atomicity and isolation. How to implement transaction?

Example:

​ T1 T2
​ begin begin
​ transfer(A, B, 20) transfer(B, C, 5)
​ withdraw(B, 10) deposit(A, 5)
​ end end

L16:Logging(atomicity)

Problem 0: how do systems guarantee atomicity and isolation?

Basic idea: Keep a log of all changes and whether a transaction commits.

changes include:

  1. begin(allocate new transaction ID),
  2. write(append entry to log),
  3. read(scan log to find last committed value),
  4. commit(write commit record),
  5. recover(doing nothing).

Problem 1: which operation in(read, write, recovery) is slowest?

Answer: Read is slowest because it must scan entire log.

Improvement 1: use cell storage(non-volatile memory) to store data. Updates go to log and cell storage, then we can read from cell storage without scanning the entire log.

Changes with improvement 1:

  1. how to write?

    Log before install. And the record helps to recover from a crash in between the two writes. (Write-ahead logging)

  2. how to recover?

    basically, scan the log entry requiring recovering

1
2
3
4
5
6
7
8
9
10
11
12
13
read(var): 
return cell_read(var)
write(var, value):
log.append(current_tid, “UPDATE”, var,
read(var), value)
cell_write(var, value)
recover(log):
commits = []
for record r in log[len(log)-1] .. log[0]:
if r.type == COMMIT:
commits.add(r.tid)
if r.type == UPDATE and r.tid not in commits:
cell_write(r.var, r.old_val) // undo

problem 2: the changes made in problem 1 cause bad performance in recovery for it needs to scan the entire log.

Improvement 2: use a cache(volatile memory).

Problem 2.1: After crash, there may be updates that didn’t make it to cell storage (were in cache but not flushed)

Improvement 2.1: We need a redo phase in addition to an undo phase in our recovery (see slide for code)

Problem 2.2: recovery takes longer and longer as the log grows

Improvement 2.2: write the checkpoint record and truncate the log

checkpoint is a certain point in the process of transactions where all previous transactions are known to store in the NVM storage. Flush all cached entries, write a checkpoint record, then truncate the log prior to the checkpoint record.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
read(var): 
if var in cache:
return cache[var]
else:
// may evict others from cache to cell storage
cache[var] = cell_read(var)
return cache[var]

write(var, value):
log.append(current_tid, update, var,
read(var), value)
cache[var] = value

flush(): // called “occasionally”
cell_write(var, cache[var]) for each var

recover(log):
commits = {}
for record r in log[len(log)-1] .. log[0]:
if r.type == commit:
commits.add(r.tid)
if r.type == update and r.tid not in commits:
cell_write(r.var, r.old_val) // undo
for record r in log[0] .. log[len(log)-1]:
if r.type == UPDATE and r.tid in commits:
cell_write(r.var, r.new_value) // redo, improvement 2.1

real problem: one single failure in high-scalability system can cause problems as well. like bank system, hospital system…

L17:two-phase locking(isolation)

Problem: We have multiple transactions – T1, T2, …, TN – all of which must be atomic, and all of which can have multiple steps. We want to schedule the steps of these transactions so that it appears as if they ran sequentially.

Solution: we need a fine-grained locking. our goal (in lecture) is to run transactions concurrently, but to produce a schedule that is conflict serializable

Serializability

what type of serializability you want depends on what your application needs/what state do we need/want?

different type of serializability: conflict serializability

  • Conflicts: two operations conflict if they operate on the same object and at least one of them is a write.

  • a schedule is conflict serializable if the order of all of its conflicts is the same as the order of the conflicts in some sequential schedule.

Problem: how do we generate conflict-serializable schedules?

Two-phase locking

  1. each shared variable has a lock
  2. before any operation on a variable, the transaction must acquire the corresponding lock
  3. multiple transactions can hold reader locks for the same variable at once; a transaction can only hold a writer lock for a variable if there are no other locks held for that variable.(improvement 1)
  4. after a transaction releases a lock, it may not acquire any other locks(improvement 0)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
T1 
begin acquire(x.lock)
T1.1 read(x)
acquire(y.lock)
T1.2 tmp = read(y)
T1.3 write(y, tmp+10)
commit release(x.lock)
release(y.lock)

T2
begin acquire(x.lock)
T2.1 write(x, 20)
acquire(y.lock)
T2.2 write(y, 30)
commit release(x.lock)
release(y.lock)

Lemma:2PL (without improvement version) produces a conflict-serializable schedule.

Proof: prove by contradiction. If 2PL is not conflict serializable, then a cycle exists in the conflict graph. There are T1, T2, … Tk transactions in the graph. To cause conflicts, each pair of confict graph must share a lock as T1 and T2 shares lock x1, (T2, T3, x2), (Tk, T1, xk). To process in the graph, x1 must be released by T1 before T2 can acquire it which contradicts #4 in 2PL.

problem 1: dead lock

Better solution than global ordering on locks is to take advantage of atomicity and abort one of the transactions.

performance improvement(improvement 1): reader-writer locks

  • Rules: many readers, one writer.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
T1 
begin acquire(x.reader_lock)
T1.1 read(x)
acquire(y.reader_lock)
T1.2 tmp = read(y)
acquire(y.writer_lock)
T1.3 write(y, tmp+10)
commit release(x.reader_lock)
release(y.reader_lock)
release(y.writer_lock)

T2
begin acquire(x.writer_lock)
T2.1 write(x, 20)
acquire(y.writer_lock)
T2.2 write(y, 30)
commit release(x.writer_lock)
release(y.writer_lock)

improvement 2: give up conflict serializable in view serializable cases

why not view serializable?

  • view serializablity is the intermediate reads and final state (writes) are the same as in some sequential schedule.

  • test reason: hard to detect view serializable cases. While checking whether a graph is acyclic is fast.

  • aside: how to check a graph is acyclic? dfs, O(|V| + |E|)

L18:distributed transactions: two-phase commit(multi-site atomicity)

Scenario: Client + coordinator + two servers: one with accounts A-M, the other with accounts N-Z.

Goal: develop a protocol that can provide multi-site atomicity in the face of all sorts of failures(every part of the system can fail)

Problem: one server committed, the other did not

Approach: two-phase commit, nodes agree that they’re ready to commit before committing

more specifically,

  1. Coordinator sends tasks to servers (workers)
  2. Once all tasks are done, coordinator sends prepare messages to workers.
  3. Once all workers are confirmed to be prepared, coordinator will tell them to commit, and tell client that the transaction has committed.

Understanding the process is possible to infer the stage where the failure occurred and solve it.

basic idea: it’s ok to abort the process before the commit point. Otherwise, it cannot abort, and it needs recovery approach.

question: how things work?

  • Worker: worker prepared, worker acknowledged

  • Coordinator

    • if heared all workers prepared, it writes COMMIT to its own log,(commit point)
    • if heard all workers committed, it writes DONE to its own log.(transaction is totally done at that point)

approach applies in different types of failures:

  1. Worker/Network Failures Prior to the Commit Point
    • Lost prepared message
    • lost ACK for prepare
    • Worker failure before prepare
  2. Worker Failures After the Commit Point
    • Before receiving commit
    • After commit received
  3. Coordinator Failures
    • Before prepare
    • After commit point, before DONE
    • After writing DONE

performance issues:

Problem -1: when sys fails, some data becomes unavailable.

Solution: replication.

approach: single-copy consistency

Problem -1.1: we need to keep multiple copies of the same pieces of data consistent, and we need to decide what type of consistency we want in different scenarios.

L19:Replication:replicated state machines

goal:solve problem -1.1 of L18

Attempt 1: replicate data on two servers

Problem 1: messages can arrive at replicas in different orders resulting in inconsistent state.

Attempt 2: make one replica the primary replica, and have a coordinator in place to help manage failures

Problem 2: network partition splits the communication in half

Attempt 3: use a view server to determine which replica is primary, in hopes that we can deal with network partitions

problem 3.1: network partition prevents S1 from communicating with VS

problem 4: VS fails. It can be a bottleneck of the system.

Solution: Distribute the view server and achive agreement from them. View sever 1 through n, each manages a partition of replica sets.

  • machanisms for distributed consensus
    • Raft
    • Paxos