Lab1

本实验实现了一个简单的,单机多进程的MapReduce框架。这个框架包括一个coordinator进程和若干个worker进程,它们使用Go语言的RPC框架进行通信。

在coordinator启动时,会传入待处理的文件列表,此即map task的数目。coordinator中还硬编码了reduce task的数目。

框架代码已经给出了coordinator的运行逻辑:使用go起一个rpc server处理worker发来的请求,主线程每秒钟轮询coordinator.Done函数,如果已经完成所有任务则退出。

我在coordinator中实现了如下的RPC调用:

函数 参数 返回值
Register NULL WorkerID int Nreducer int
MapGetTask WorkerID int FileID int FileName string AllFileDone bool
MapTaskDone WorkerID int FileId int NULL
ReduceGetTask WorkerID int ReducerID int NFiles int AllReducerDone bool
ReduceTaskDone WorkerID int ReducerID int NULL
GraceExit WorkerID int NULL

worker的工作流程是:

  1. 通过register调用向coordinator注册自己,获得worker ID。
  2. Map阶段:循环,通过MapGetTask获取map task并进行处理,将结果保存到中间文件,完成后发送MapTaskDone。如果在MapGetTask中看到了AllFileDone,则跳出循环。
  3. Reduce阶段:循环,通过ReduceGetTask获取reduce task,从中间文件读取map结果并进行处理,完成后发送ReduceTaskDone。如果在ReduceGetTask中看到了AllReducerDone,则跳出循环。
  4. 发送GraceExit并终止。

coordinator的实现:

  1. 追踪所有task的状态,并使用空闲链表维护当前未分配的task。
  2. 起另外一个goroutine,定期遍历所有task。如果某个task已经在>10s前分配给worker,但还没有完成,则认为该worker已经crash,将该task重新置为未分配的状态。该worker可能只是跑得慢,会诈尸,因此coordinator应注意不再响应被视为crash的worker发来的请求。
  3. 维护每个worker是否已经退出的信息(通过GraceExit),crash的worker视为已经退出。
  4. 当任务处理完成且所有worker已经退出后,coordinator可以退出。

需要注意的:

  1. 中间文件使用map和reduce task的id命名,和worker id没有关系。

  2. 写中间文件和计算结果时应该使用临时文件,完成后再rename,以防worker中途crash。

Lab2

A

本部分实现Raft的leader选举。

每台server包括两个循环例程:

  • ticker():定期检查election timeout,如超时则启动选举。
  • broadcast():(作为leader时)发送AppendEntries心跳。

需要注意的:

  1. 论文5.1节指出,某台server在通信时发现其他server的term更大,应该跟上更新的term。如果自己是candidate或者leader,应该转变为follower。RequestVoteAppendEntries应该实现这样的逻辑。

  2. 论文5.2节中提到:

    If the leader’s term (included in its RPC) is at least as large as the candidate’s current term, then the candidate recognizes the leader as legitimate and returns to follower state.

    对于AppendEntries,如果某server收到了term相同的请求,且自己是candidate,应该承认新的leader并将自己降级为follower。

  3. 使用-race确保没有数据竞争。

  4. 选举时主线程对每台server起goroutine发送RequestVote后,不需要等待其返回。这是因为在rpc目标被disconnect之后,这些请求可能不会返回(把主线程卡死),而是在reconnect之后才返回(需要正确处理RequestVote在选举过程以外返回的情况)。

B

本部分实现Raft的日志同步。每台server通过Start()接受上层应用发来的cmd,并同步返回,只有leader返回success。Raft集群根据协议同步,提交和应用(apply)这些cmd。所谓apply指的是向每台server创建时传入的applyCh中放入消息,通知上层应用。

每台server引入了另外两个循环例程:

  • commit():(作为leader)时按照图2右下角描述的过程更新commitIndex
  • apply():将已提交的cmdapply到状态机(即发送ApplyMsgapplyCh)。

上述两个例程可以用条件变量实现以避免盲等。commit()在当选为leader和matchIndex被更新时被唤醒;apply()commitIndex被更新时唤醒。

Start()需要记录本地日志,起一个goroutine向其他服务器广播AppendEntries,主线程可以直接返回。按照论文的要求,leader应该一直重试发送AppendEntries,直到确认所有server都已经收到日志更新为止。在我的实现中,该例程只尝试广播一次,后续的重试由心跳广播去做。

需要注意的:

  1. 接收到AppendEntries请求后,如果leaderCommit > commitIndex,需要根据index of last new entry更新commitIndex。此处index of last new entry应该是本次请求参数中最新的log entry的下标,而不是当前server中最后一条log的下标。

  2. 所有服务器都应该在接收到term更大的Request/Reply之后将自己降级为follower,并跟上新的term。由于votedFor是当前term的状态,term更新时应该将votedFor一并重置。(AppendEntries在term相同时也有特殊逻辑,见A部分第3条注意)

  3. RequestVote中,某server在向其他server投票之后,也要重置election timer。

  4. 在发送AppendEntries时,entries[]要对rf.log[]进行深拷贝,不能只存一个切片,否则接收方处理AppendEntries请求时会出现数据竞争。(竞争出现在rpc库的encode和decode中,没有研究具体原因)

  5. 一些请求可能延迟发出,或者结果在server的状态发生变化后才返回,造成一些corner case。两个例子:

    • 一台term过时的leader发出AppendEntries,试图用(过时的)日志更新其他server。正常情况下该请求应该因为term太小而被拒绝。

      然而,假设过时leader刚创建了AppendEntries的goroutine,此时收到某个term更高的rpc请求,因此转变为follower,同时提升term。此时该goroutine被调度运行,使用新的term作为请求参数,构造了一个足够新的请求并被其他server接受,效果表现为接受了过时leader的请求,造成错误。

      解决方法:上述goroutine在准备请求参数前应该再次检查当前server是否为leader,如果不是则early return,这样一来,请求参数不可能使用更新后的term值。虽然该goroutine仍然可能在server不再是leader之后发出AppendEntries请求,但这样的请求因为term太小而不会被接受,因而是无害的。

    • 一台leader发送了AppendEntries,但是reply很晚才到达。在这个时间内,该leader已经被罢免,发生了重新选举,并再次成为leader,初始化了matchIndex。此时如果这个AppendEntries迟到的reply返回了false,会将matchIndex减到0甚至负值。

      解决方法:更新matchIndex进行校验,保证matchIndex >= 1

观察到这样的执行情况:

  1. 集群包含3台服务器。S0当选为leader,但在还没有发送AppendEntries心跳时,S1就已经超时并成为candidate。
  2. S0向S1和S2发送带有cmd1的AppendEntries。S2成功接受,但S1因为term更新而拒绝接受。
  3. S0收到了S1发来的RequestVote并成为follower。由于cmd1存在于S0和S2中,S1无法当选,最终一定是S0或者S2成为新的leader。
  4. cmd1最终可以被同步到全部3台服务器中,但是由于term已经变得更新,而cmd1日志的term是1,cmd1永远无法被新的leader提交。

似乎Raft是允许这样的情况出现的:虽然cmd1暂时无法被提交(以及apply),但是当下一条指令cmd2到来时,新的同步建立,cmd2的提交隐式导致了之前的日志(即cmd1)也被提交。在真实系统中这不算一个问题,但是本实验测试中有一些零散的Start()测试,需要确认cmd1已经成功apply,才会继续执行,此时会造成错误。

这个问题实际上是由于S1过快的超时并重新开始选举导致的。论文图2中指出:server向其他server投票后也要重置election timer,而我在A部分忽略了这一点,修复后该问题不再出现。

真实系统中不稳定的网络仍然可能导致上述情况出现,但不会出现在本实验的测试中。

C

本部分要求Raft实例在crash之后能够从持久化的状态中恢复。

按照论文的要求,每台服务器只需要持久化currentTerm votedForlogs[]。每次修改这些变量后都应使用persist()将其持久化。

本部分还必须实现论文一笔带过的优化项:允许nextIndexAppendEntries请求失败时每次回退多个entry。否则,部分测试用例中,崩溃的leader恢复后需要数百次AppendEntries才能将自己的log同步给其他server,无法在时限内完成同步。实验文档中给出了该优化项的实现方式。

本部分中,我还限制leader发送AppendEntries的时间间隔不能小于50ms,不过并没有观察到明显的性能提升。

D

本部分按照论文第7节的要求实现snapshot功能。

不同于2C部分中实现的persist,snapshot的实现对上层应用是有感知的,具体来说:

  • 上层应用决定进行snapshot的时机。
  • snapshot的对象是上层应用的状态机,也即只有被raft模块apply之后的cmd可以被snapshot。
  • 上层应用调用raft模块的Snapshot()方法,告诉raft模块index及之前的log可以被清理掉。

首先,需要对已有代码进行改进,允许每个server的log[]只是完整日志的一部分,甚至为空数组。我引入了logBaselastLogTerm两个变量,分别表示log[0]在完整日志中的下标,以及snapshot中最后一个元素的term。这两个变量和log[]一样,属于server的持久状态。

log[]不是完整日志会导致以下问题:

  1. leader需要发送的日志太老,可能已经被snapshot了。

    解决方法:此时需要通过InstallSnapshot发送leader拥有的snapshot。

  2. follower处理AppendEntriesRPC时,拥有的日志太新,无法验证arg.lastLogIndexarg.lastLogTerm是否匹配。

    解决方法:这种情况下通过reply告诉leader,向自己发送snapshot。

  3. commit和apply线程要如何处理不在log[]中,已经被snapshot的日志项?

    解决方法:已经被snapshot的日志项一定是已经被apply过的,因此可以直接递增commitIndexlastApplied指针,直到它们大大于等于logBase为止。

正确支持不完整的log[]后,可以开始实现InstallSnapshotRPC:

  1. 我的实现不会把snapshot拆成多个chunk发送,因此不需要论文里的offsetdone参数。

  2. 在发送AppendEntries的逻辑中,leader根据nextIndex[i]得出需要向server发送的日志下标。如果发现该下标不在自己的log[]中,转为发送InstallSnapshot。发送成功后,leader更新nextIndex[i]=logBasematchIndex[i] = logBase - 1

  3. InstallSnapshot的接收方首先判断snapshot中lastIncludedIndex和自己logBase的关系。如果lastIncludedIndex < logBase,说明该snapshot比已有的snapshot还要旧,是完整log的一个前缀,直接返回即可。

    否则,接收方删除自己log[]中于snapshot相同的前缀,保留剩余部分。最后,接收方需要发送ApplyMsg,告知上层应用自己收到了一个snapshot,需要更新状态机。

测试框架提供的applyCh是一个容量为1的channel,这就导致了如下问题:

  1. 测试框架在applierSnap()中消费applyCh中的消息,并在需要快照时调用rf.Snapshot(),这需要拿锁rf.mu
  2. raft模块的rf.apply()后台例程向applyCh中生产消息,同时拿锁rf.mu

此时,applierSnap()等待拿锁rf.murf.apply()等待applyCh变空,两者互相等待,出现死锁。

解决方案:自己引入一个channel做buffer。applyToBuf()把消息放进buffer,另有一个applyToService把buffer中的消息交给上层应用提供的applyCh

Lab3

本实验在Raft的基础上实现一个fault-tolerant的KV存储。该系统包含多个server和多个client,server之间通过Raft协议同步数据以保证一致性,client通过RPC调用server的Get(key), Put(key, value)Append(key, arg)接口实现读写。

工作流程:

  1. client通过RPC调用server,server调用Raft模块的Start()接口。
  2. 如果server不是leader,则直接返回。client应寻找其他server,直至找到leader为止。
  3. 否则,server通过等待ApplyMsg,在Raft完成数据同步后,更新本地KV存储的状态机,并使RPC handler返回。

由于网络可能丢包或发生partition,client可能收不到reply,造成请求超时,可能的原因有:

  1. client发送请求后,server提交Raft同步请求后发生crash,或是发出的reply丢包。
  2. server不可达,没有收到client的请求。

由于Append()操作并非幂等,系统对于这两种情况的解决方法是不一样的:对情况1,重新发送请求会导致请求被重复执行;对情况2,重新发送请求则是安全的。

实验文档已经给出了以上问题的解决方案

  • 每个RPC请求应该带有一个ID,便于server发现重复请求。对重复请求,server应该返回上一次执行的结果。
  • client应该为重传的RPC请求赋予相同ID,否则赋予新ID。

由于每个client都是同步发送请求,即上一个请求返回后才会发送下一个,上述策略可以实现为:

  • 每个client生成随机的clientID,并维护一个自增的reqID。每个请求都带上这两个ID。
  • server为每一个client维护最后一个请求的reqID及其执行结果(即Get()的返回值)。如果收到的请求的reqID小于等于该值,则直接返回上一次的执行结果。
  • server为每个client维护的上述信息应该被记入Raft日志以保证同步。
  • 如果reply丢失,client在请求超时后,server完成同步前重新发送,可能导致Raft日志中存在两个clientIDreqID均相同的条目。这是合法的,只需要保证KV存储在处理ApplyMsg时不执行重复请求的状态转移即可。

我的KV server实现:

  1. 维护一个kv.chanMap,将每个client映射到一个channel。每个RPC handler会创建channel,写入kv.chanMap中并等待。
  2. 一个后台例程kv.apply()applyCh中读取ApplyMsg,并应用状态转移,同时通过channel向对应的client发送结果。RPC handler收到结果之后返回。

注意点:

  1. 当网络出现partition时,跨partition的RPC调用有时会直接失败,有时会无限阻塞而不返回。因此需要在client中对RPC调用封装一层,在超时后返回错误。

  2. 用于RPC handler和kv.apply()之间通信的channel是一次性的,同一client的每次请求都是一个不同的channel,因此需要:

    • RPC handler需要校验kv.apply()处理的是否是自己发出的请求(通过reqID),以免收到的是对该client之前请求的响应。
    • kv.apply()每处理完一个请求后将channel从kv.chanMap中删除。
  3. 我在Lab2中限制了每个server发送AppendEntries()的频率,这是不必要的,因为Lab3对KV存储的吞吐量有要求。去除该限制后,暴露出Lab2中存在的问题:

    • Leader向follower发送AppendEntries(),follower处理了请求,但是leader还没有收到reply。
    • 此时leader发生了snapshot,并收到了新条目,需要向follower发送。由于leader尚未收到reply,此时该follower的nextIndex没有发生变化。
    • 由于leader的log已经被snapshot,这次请求会发送InstallSnapshot()给follower。
    • 此时follower的log其实是与leader的上一次AppendEntries同步的,并且很可能都已经被apply过。在InstallSnapshot()的handler中发送ApplyMsg,会导致上层应用观测到apply下标发生了回退。

    解决方法:接收到snapshot后,仅当rf.lastApplied < args.LastIncludedIndex时,才需要发送ApplyMsg给上层应用。

Snapshot

3B部分需要实现KV存储系统的快照功能,该部分较为简单。

  1. server需要快照的内容是存储键值对的kvMap和维护每client请求ID的clientMap
  2. server通过一个后台例程检查maxraftstate的大小,在接近阈值时保存快照。
  3. server在启动时读取快照(如果有的话)。
  4. server在kv.apply()中处理snapshot相关的ApplyMsg,并使用接收到的snapshot更新快照。

Lab4

实验的第4部分是构建一个分片的KV存储引擎。

A

分片KV存储系统中存在若干个replica group,每个分片(shard)都会映射到某个replica group,所有导向该分片的请求都由它处理。这个映射关系是动态的,并由一个中心化的shard controller负责维护:client/server会通过RPC向其询问当前的配置情况,admin可以通过RPC更改当前配置。shard controller本身也是一个多台server组成的raft集群。

本部分的任务是实现shard controller的增删改查RPC调用,并做到负载均衡,即shard被尽可能平均分配给replica group。

RPC 参数 返回值 描述
Join Servers map[int][]string / 添加Servers指定的replica group集合
Leave GIDs []int / 删除GIDs指定的replica group集合
Move Shard intGID int / Shard被映射到GID指定的replica group
Query Num int Config 查询ID为Num的配置信息

注:每个replica group是一个GID(int)到服务器名列表([]string)的映射。

实现上,与Raft交互的部分基本和Lab3相同,需要考虑的主要是负载均衡(Rebalance)的过程。

  • 对于增加group的情形,可以按照拥有的shard数目对所有group排序,计算平均分配下每个group应拥有的shard数目,将shard从数量超标的group移动到数量不足的group。
  • 对于减少group的情形,将shard从被删除的group移动到拥有shard最少的group。这个过程可以用堆来实现。

B

本部分在3B和4A的基础上,实现带分片的KV存储,我参考了这里这里的思路后完成实现。

每个server应该维护每个分片的状态,并在配置更新时完成分片在server之间的转移。我采用的是push转移方式,即server将新配置下不再属于自己的分片推送给该分片应该属于的server。分片的所有状态如下:

状态 描述
Unavailable 该分片不属于本server
Available 该分片属于,且已经存储在本serever,可以提供服务
Pushing 该分片不属于本server,但仍然存储在本server,正在向外推送
Pulling 该分片属于本server,但还没有从其他server推送过来,暂时无法提供服务

相应地,需要引入一些额外的后台例程:

名称 描述
tryUpdateConfig() 每个group的leader试图更新config
pushShards() 每个group的leader将处于Pushing状态下的分片推送到另一个group的leader
cleanShards() 每个group中的server将处于Unavailable状态下的分片从本地存储中删除
submitEmptyLog() 每个group中新当选的leader应向新term提交一条日志,以打破活锁

其中:

  1. tryUpdateConfig()只能在当前server中不存在Pushing/Pulling状态的分片时才能更新config,以防分片转移被跳过。

  2. 有一些操作需要通过Raft同步,保证group中的各server状态一致,包括:

    Raft日志类型 提交条件 处理动作
    ReConf leader发现存在新配置且满足更新条件后提交 应用新配置,并根据新旧配置之间的diff更新分片状态
    RecvShards leader接收到其他group发来的分片后提交 将接收到的分片应用到本地对应的Pulling分片上
    PushShardsDone 正在Pushing的leader确认目标group已经接收了分片 将被接收分片的本地状态由Pulling变为Unavailable
  3. 提交空日志的目的:Raft的leader在重启后需要有新entry的加入,才会推进之前term中log的commit。如果两台server互相需要向对方push数据(即也需要接收来自对方的数据),无法接收client请求(也就没有新的raft entry),可能出现活锁。解决方法就是定期检查raft日志,如果当前term还没有提交过日志,就提交一个NullOp打破可能出现的活锁。