MIT6.824实验记录
Lab1
本实验实现了一个简单的,单机多进程的MapReduce框架。这个框架包括一个coordinator进程和若干个worker进程,它们使用Go语言的RPC框架进行通信。
在coordinator启动时,会传入待处理的文件列表,此即map task的数目。coordinator中还硬编码了reduce task的数目。
框架代码已经给出了coordinator的运行逻辑:使用go
起一个rpc
server处理worker发来的请求,主线程每秒钟轮询coordinator.Done
函数,如果已经完成所有任务则退出。
我在coordinator中实现了如下的RPC调用:
函数 | 参数 | 返回值 |
---|---|---|
Register | NULL | WorkerID int Nreducer int |
MapGetTask | WorkerID int |
FileID int FileName string
AllFileDone bool |
MapTaskDone | WorkerID int FileId int |
NULL |
ReduceGetTask | WorkerID int |
ReducerID int NFiles int
AllReducerDone bool |
ReduceTaskDone | WorkerID int ReducerID int |
NULL |
GraceExit | WorkerID int |
NULL |
worker的工作流程是:
- 通过
register
调用向coordinator注册自己,获得worker ID。 - Map阶段:循环,通过
MapGetTask
获取map task并进行处理,将结果保存到中间文件,完成后发送MapTaskDone
。如果在MapGetTask
中看到了AllFileDone
,则跳出循环。 - Reduce阶段:循环,通过
ReduceGetTask
获取reduce task,从中间文件读取map结果并进行处理,完成后发送ReduceTaskDone
。如果在ReduceGetTask
中看到了AllReducerDone
,则跳出循环。 - 发送
GraceExit
并终止。
coordinator的实现:
- 追踪所有task的状态,并使用空闲链表维护当前未分配的task。
- 起另外一个goroutine,定期遍历所有task。如果某个task已经在>10s前分配给worker,但还没有完成,则认为该worker已经crash,将该task重新置为未分配的状态。该worker可能只是跑得慢,会诈尸,因此coordinator应注意不再响应被视为crash的worker发来的请求。
- 维护每个worker是否已经退出的信息(通过
GraceExit
),crash的worker视为已经退出。 - 当任务处理完成且所有worker已经退出后,coordinator可以退出。
需要注意的:
中间文件使用map和reduce task的id命名,和worker id没有关系。
写中间文件和计算结果时应该使用临时文件,完成后再rename,以防worker中途crash。
Lab2
A
本部分实现Raft的leader选举。
每台server包括两个循环例程:
ticker()
:定期检查election timeout,如超时则启动选举。broadcast()
:(作为leader时)发送AppendEntries心跳。
需要注意的:
论文5.1节指出,某台server在通信时发现其他server的term更大,应该跟上更新的term。如果自己是candidate或者leader,应该转变为follower。
RequestVote
和AppendEntries
应该实现这样的逻辑。论文5.2节中提到:
If the leader’s term (included in its RPC) is at least as large as the candidate’s current term, then the candidate recognizes the leader as legitimate and returns to follower state.
对于
AppendEntries
,如果某server收到了term相同的请求,且自己是candidate,应该承认新的leader并将自己降级为follower。使用
-race
确保没有数据竞争。选举时主线程对每台server起goroutine发送
RequestVote
后,不需要等待其返回。这是因为在rpc目标被disconnect之后,这些请求可能不会返回(把主线程卡死),而是在reconnect之后才返回(需要正确处理RequestVote
在选举过程以外返回的情况)。
B
本部分实现Raft的日志同步。每台server通过Start()
接受上层应用发来的cmd
,并同步返回,只有leader返回success。Raft集群根据协议同步,提交和应用(apply)这些cmd
。所谓apply指的是向每台server创建时传入的applyCh
中放入消息,通知上层应用。
每台server引入了另外两个循环例程:
commit()
:(作为leader)时按照图2右下角描述的过程更新commitIndex
。apply()
:将已提交的cmd
apply到状态机(即发送ApplyMsg
到applyCh
)。
上述两个例程可以用条件变量实现以避免盲等。commit()
在当选为leader和matchIndex
被更新时被唤醒;apply()
在commitIndex
被更新时唤醒。
Start()
需要记录本地日志,起一个goroutine向其他服务器广播AppendEntries
,主线程可以直接返回。按照论文的要求,leader应该一直重试发送AppendEntries
,直到确认所有server都已经收到日志更新为止。在我的实现中,该例程只尝试广播一次,后续的重试由心跳广播去做。
需要注意的:
接收到
AppendEntries
请求后,如果leaderCommit > commitIndex
,需要根据index of last new entry更新commitIndex。此处index of last new entry应该是本次请求参数中最新的log entry的下标,而不是当前server中最后一条log的下标。所有服务器都应该在接收到term更大的Request/Reply之后将自己降级为follower,并跟上新的term。由于
votedFor
是当前term的状态,term更新时应该将votedFor
一并重置。(AppendEntries
在term相同时也有特殊逻辑,见A部分第3条注意)RequestVote
中,某server在向其他server投票之后,也要重置election timer。在发送
AppendEntries
时,entries[]
要对rf.log[]
进行深拷贝,不能只存一个切片,否则接收方处理AppendEntries
请求时会出现数据竞争。(竞争出现在rpc库的encode和decode中,没有研究具体原因)一些请求可能延迟发出,或者结果在server的状态发生变化后才返回,造成一些corner case。两个例子:
一台term过时的leader发出
AppendEntries
,试图用(过时的)日志更新其他server。正常情况下该请求应该因为term太小而被拒绝。然而,假设过时leader刚创建了
AppendEntries
的goroutine,此时收到某个term更高的rpc请求,因此转变为follower,同时提升term。此时该goroutine被调度运行,使用新的term作为请求参数,构造了一个足够新的请求并被其他server接受,效果表现为接受了过时leader的请求,造成错误。解决方法:上述goroutine在准备请求参数前应该再次检查当前server是否为leader,如果不是则early return,这样一来,请求参数不可能使用更新后的term值。虽然该goroutine仍然可能在server不再是leader之后发出
AppendEntries
请求,但这样的请求因为term太小而不会被接受,因而是无害的。一台leader发送了
AppendEntries
,但是reply很晚才到达。在这个时间内,该leader已经被罢免,发生了重新选举,并再次成为leader,初始化了matchIndex
。此时如果这个AppendEntries
迟到的reply返回了false,会将matchIndex
减到0甚至负值。解决方法:更新
matchIndex
进行校验,保证matchIndex >= 1
。
观察到这样的执行情况:
- 集群包含3台服务器。S0当选为leader,但在还没有发送
AppendEntries
心跳时,S1就已经超时并成为candidate。 - S0向S1和S2发送带有
cmd1
的AppendEntries。S2成功接受,但S1因为term更新而拒绝接受。 - S0收到了S1发来的
RequestVote
并成为follower。由于cmd1
存在于S0和S2中,S1无法当选,最终一定是S0或者S2成为新的leader。 cmd1
最终可以被同步到全部3台服务器中,但是由于term已经变得更新,而cmd1
日志的term是1,cmd1
永远无法被新的leader提交。
似乎Raft是允许这样的情况出现的:虽然cmd1
暂时无法被提交(以及apply),但是当下一条指令cmd2
到来时,新的同步建立,cmd2
的提交隐式导致了之前的日志(即cmd1
)也被提交。在真实系统中这不算一个问题,但是本实验测试中有一些零散的Start()
测试,需要确认cmd1
已经成功apply,才会继续执行,此时会造成错误。
这个问题实际上是由于S1过快的超时并重新开始选举导致的。论文图2中指出:server向其他server投票后也要重置election timer,而我在A部分忽略了这一点,修复后该问题不再出现。
真实系统中不稳定的网络仍然可能导致上述情况出现,但不会出现在本实验的测试中。
C
本部分要求Raft实例在crash之后能够从持久化的状态中恢复。
按照论文的要求,每台服务器只需要持久化currentTerm
votedFor
和logs[]
。每次修改这些变量后都应使用persist()
将其持久化。
本部分还必须实现论文一笔带过的优化项:允许nextIndex
在AppendEntries
请求失败时每次回退多个entry。否则,部分测试用例中,崩溃的leader恢复后需要数百次AppendEntries
才能将自己的log同步给其他server,无法在时限内完成同步。实验文档中给出了该优化项的实现方式。
本部分中,我还限制leader发送AppendEntries
的时间间隔不能小于50ms,不过并没有观察到明显的性能提升。
D
本部分按照论文第7节的要求实现snapshot功能。
不同于2C部分中实现的persist,snapshot的实现对上层应用是有感知的,具体来说:
- 上层应用决定进行snapshot的时机。
- snapshot的对象是上层应用的状态机,也即只有被raft模块apply之后的cmd可以被snapshot。
- 上层应用调用raft模块的
Snapshot()
方法,告诉raft模块index
及之前的log可以被清理掉。
首先,需要对已有代码进行改进,允许每个server的log[]
只是完整日志的一部分,甚至为空数组。我引入了logBase
和lastLogTerm
两个变量,分别表示log[0]
在完整日志中的下标,以及snapshot中最后一个元素的term。这两个变量和log[]
一样,属于server的持久状态。
log[]
不是完整日志会导致以下问题:
leader需要发送的日志太老,可能已经被snapshot了。
解决方法:此时需要通过
InstallSnapshot
发送leader拥有的snapshot。follower处理
AppendEntries
RPC时,拥有的日志太新,无法验证arg.lastLogIndex
和arg.lastLogTerm
是否匹配。解决方法:这种情况下通过reply告诉leader,向自己发送snapshot。
commit和apply线程要如何处理不在
log[]
中,已经被snapshot的日志项?解决方法:已经被snapshot的日志项一定是已经被apply过的,因此可以直接递增
commitIndex
和lastApplied
指针,直到它们大大于等于logBase
为止。
正确支持不完整的log[]
后,可以开始实现InstallSnapshot
RPC:
我的实现不会把snapshot拆成多个chunk发送,因此不需要论文里的
offset
和done
参数。在发送
AppendEntries
的逻辑中,leader根据nextIndex[i]
得出需要向server发送的日志下标。如果发现该下标不在自己的log[]
中,转为发送InstallSnapshot
。发送成功后,leader更新nextIndex[i]=logBase
及matchIndex[i] = logBase - 1
。InstallSnapshot
的接收方首先判断snapshot中lastIncludedIndex
和自己logBase
的关系。如果lastIncludedIndex < logBase
,说明该snapshot比已有的snapshot还要旧,是完整log的一个前缀,直接返回即可。否则,接收方删除自己
log[]
中于snapshot相同的前缀,保留剩余部分。最后,接收方需要发送ApplyMsg
,告知上层应用自己收到了一个snapshot,需要更新状态机。
测试框架提供的applyCh
是一个容量为1的channel,这就导致了如下问题:
- 测试框架在
applierSnap()
中消费applyCh
中的消息,并在需要快照时调用rf.Snapshot()
,这需要拿锁rf.mu
。 - raft模块的
rf.apply()
后台例程向applyCh
中生产消息,同时拿锁rf.mu
。
此时,applierSnap()
等待拿锁rf.mu
,rf.apply()
等待applyCh
变空,两者互相等待,出现死锁。
解决方案:自己引入一个channel做buffer。applyToBuf()
把消息放进buffer,另有一个applyToService
把buffer中的消息交给上层应用提供的applyCh
。
Lab3
本实验在Raft的基础上实现一个fault-tolerant的KV存储。该系统包含多个server和多个client,server之间通过Raft协议同步数据以保证一致性,client通过RPC调用server的Get(key)
,
Put(key, value)
和
Append(key, arg)
接口实现读写。
工作流程:
- client通过RPC调用server,server调用Raft模块的
Start()
接口。 - 如果server不是leader,则直接返回。client应寻找其他server,直至找到leader为止。
- 否则,server通过等待
ApplyMsg
,在Raft完成数据同步后,更新本地KV存储的状态机,并使RPC handler返回。
由于网络可能丢包或发生partition,client可能收不到reply,造成请求超时,可能的原因有:
- client发送请求后,server提交Raft同步请求后发生crash,或是发出的reply丢包。
- server不可达,没有收到client的请求。
由于Append()
操作并非幂等,系统对于这两种情况的解决方法是不一样的:对情况1,重新发送请求会导致请求被重复执行;对情况2,重新发送请求则是安全的。
实验文档已经给出了以上问题的解决方案:
- 每个RPC请求应该带有一个ID,便于server发现重复请求。对重复请求,server应该返回上一次执行的结果。
- client应该为重传的RPC请求赋予相同ID,否则赋予新ID。
由于每个client都是同步发送请求,即上一个请求返回后才会发送下一个,上述策略可以实现为:
- 每个client生成随机的
clientID
,并维护一个自增的reqID
。每个请求都带上这两个ID。 - server为每一个client维护最后一个请求的
reqID
及其执行结果(即Get()
的返回值)。如果收到的请求的reqID
小于等于该值,则直接返回上一次的执行结果。 - server为每个client维护的上述信息应该被记入Raft日志以保证同步。
- 如果reply丢失,client在请求超时后,server完成同步前重新发送,可能导致Raft日志中存在两个
clientID
和reqID
均相同的条目。这是合法的,只需要保证KV存储在处理ApplyMsg
时不执行重复请求的状态转移即可。
我的KV server实现:
- 维护一个
kv.chanMap
,将每个client映射到一个channel。每个RPC handler会创建channel,写入kv.chanMap
中并等待。 - 一个后台例程
kv.apply()
从applyCh
中读取ApplyMsg
,并应用状态转移,同时通过channel向对应的client发送结果。RPC handler收到结果之后返回。
注意点:
当网络出现partition时,跨partition的RPC调用有时会直接失败,有时会无限阻塞而不返回。因此需要在client中对RPC调用封装一层,在超时后返回错误。
用于RPC handler和
kv.apply()
之间通信的channel是一次性的,同一client的每次请求都是一个不同的channel,因此需要:- RPC
handler需要校验
kv.apply()
处理的是否是自己发出的请求(通过reqID
),以免收到的是对该client之前请求的响应。 kv.apply()
每处理完一个请求后将channel从kv.chanMap
中删除。
- RPC
handler需要校验
我在Lab2中限制了每个server发送
AppendEntries()
的频率,这是不必要的,因为Lab3对KV存储的吞吐量有要求。去除该限制后,暴露出Lab2中存在的问题:- Leader向follower发送
AppendEntries()
,follower处理了请求,但是leader还没有收到reply。 - 此时leader发生了snapshot,并收到了新条目,需要向follower发送。由于leader尚未收到reply,此时该follower的
nextIndex
没有发生变化。 - 由于leader的log已经被snapshot,这次请求会发送
InstallSnapshot()
给follower。 - 此时follower的log其实是与leader的上一次
AppendEntries
同步的,并且很可能都已经被apply过。在InstallSnapshot()
的handler中发送ApplyMsg
,会导致上层应用观测到apply下标发生了回退。
解决方法:接收到snapshot后,仅当
rf.lastApplied < args.LastIncludedIndex
时,才需要发送ApplyMsg
给上层应用。- Leader向follower发送
Snapshot
3B部分需要实现KV存储系统的快照功能,该部分较为简单。
- server需要快照的内容是存储键值对的
kvMap
和维护每client请求ID的clientMap
。 - server通过一个后台例程检查
maxraftstate
的大小,在接近阈值时保存快照。 - server在启动时读取快照(如果有的话)。
- server在
kv.apply()
中处理snapshot相关的ApplyMsg
,并使用接收到的snapshot更新快照。
Lab4
实验的第4部分是构建一个分片的KV存储引擎。
A
分片KV存储系统中存在若干个replica group,每个分片(shard)都会映射到某个replica group,所有导向该分片的请求都由它处理。这个映射关系是动态的,并由一个中心化的shard controller负责维护:client/server会通过RPC向其询问当前的配置情况,admin可以通过RPC更改当前配置。shard controller本身也是一个多台server组成的raft集群。
本部分的任务是实现shard controller的增删改查RPC调用,并做到负载均衡,即shard被尽可能平均分配给replica group。
RPC | 参数 | 返回值 | 描述 |
---|---|---|---|
Join | Servers map[int][]string |
/ | 添加Servers 指定的replica group集合 |
Leave | GIDs []int |
/ | 删除GIDs 指定的replica group集合 |
Move | Shard int 和GID int |
/ | 令Shard 被映射到GID 指定的replica group |
Query | Num int |
Config | 查询ID为Num 的配置信息 |
注:每个replica
group是一个GID(int
)到服务器名列表([]string
)的映射。
实现上,与Raft交互的部分基本和Lab3相同,需要考虑的主要是负载均衡(Rebalance)的过程。
- 对于增加group的情形,可以按照拥有的shard数目对所有group排序,计算平均分配下每个group应拥有的shard数目,将shard从数量超标的group移动到数量不足的group。
- 对于减少group的情形,将shard从被删除的group移动到拥有shard最少的group。这个过程可以用堆来实现。
B
本部分在3B和4A的基础上,实现带分片的KV存储,我参考了这里和这里的思路后完成实现。
每个server应该维护每个分片的状态,并在配置更新时完成分片在server之间的转移。我采用的是push转移方式,即server将新配置下不再属于自己的分片推送给该分片应该属于的server。分片的所有状态如下:
状态 | 描述 |
---|---|
Unavailable | 该分片不属于本server |
Available | 该分片属于,且已经存储在本serever,可以提供服务 |
Pushing | 该分片不属于本server,但仍然存储在本server,正在向外推送 |
Pulling | 该分片属于本server,但还没有从其他server推送过来,暂时无法提供服务 |
相应地,需要引入一些额外的后台例程:
名称 | 描述 |
---|---|
tryUpdateConfig() |
每个group的leader试图更新config |
pushShards() |
每个group的leader将处于Pushing状态下的分片推送到另一个group的leader |
cleanShards() |
每个group中的server将处于Unavailable状态下的分片从本地存储中删除 |
submitEmptyLog() |
每个group中新当选的leader应向新term提交一条日志,以打破活锁 |
其中:
tryUpdateConfig()
只能在当前server中不存在Pushing/Pulling状态的分片时才能更新config,以防分片转移被跳过。有一些操作需要通过Raft同步,保证group中的各server状态一致,包括:
Raft日志类型 提交条件 处理动作 ReConf leader发现存在新配置且满足更新条件后提交 应用新配置,并根据新旧配置之间的diff更新分片状态 RecvShards leader接收到其他group发来的分片后提交 将接收到的分片应用到本地对应的Pulling分片上 PushShardsDone 正在Pushing的leader确认目标group已经接收了分片 将被接收分片的本地状态由Pulling变为Unavailable 提交空日志的目的:Raft的leader在重启后需要有新entry的加入,才会推进之前term中log的commit。如果两台server互相需要向对方push数据(即也需要接收来自对方的数据),无法接收client请求(也就没有新的raft entry),可能出现活锁。解决方法就是定期检查raft日志,如果当前term还没有提交过日志,就提交一个NullOp打破可能出现的活锁。