zoukankan      html  css  js  c++  java
  • MIT-6.824 Lab 3: Fault-tolerant Key/Value Service

    概述

    lab2中实现了raft协议,本lab将在raft之上实现一个可容错的k/v存储服务,第一部分是实现一个不带日志压缩的版本,第二部分是实现日志压缩。时间原因我只完成了第一部分。

    设计思路

    kvserver

    如上图,lab2实现了raft协议,本lab将实现kvserver。每个raft都关联一个kvserver,Clerks发送Put(), Append(), Get() RPC给leader服务器中的kvserver,kvserver收到请求后将操作打包成Log Entry提交给raft,然后阻塞等待raft将这个Entry拷贝到其它server,当Log Entry被拷贝到大部分的server后,leader 的raft会通知kvserver(raft往管道中塞comitted Entry,kvserver通过读这个管道获取通知),kvserver执行命令,然后响应Clerk。

    Clerk

    客户端通过Clerk发送请求,来看下Clerk代码:

    type Clerk struct {
    	servers []*labrpc.ClientEnd
    	// You will have to modify this struct.
    
    	lastLeader	int
    	cid		    int64
    	seq			int
    }
    
    func (ck *Clerk) Get(key string) string {
    
    	// You will have to modify this function.
        // 参数: 要读的key, 当前clerk的id,  请求序列号
    	getArgs := GetArgs{Key: key, Cid:ck.cid, Seq:ck.seq}
    	reply := GetReply{}
    
    	for {
    		doneCh := make(chan bool, 1)
    		go func() {
               //发送Get() RPC
    			ok := ck.servers[ck.lastLeader].Call("KVServer.Get", &getArgs, &reply)
    			doneCh <- ok
    		}()
    
    		select {
    		case <-time.After(600 * time.Millisecond):
    			DPrintf("clerk(%d) retry PutAppend after timeout
    ", ck.cid)
    			continue
    		case ok := <- doneCh:
               //收到响应后,并且是leader返回的,那么说明这个命令已经执行了
    			if ok && reply.WrongLeader != WrongLeader {
    				//请求序列号加1
                  ck.seq++
    				return reply.Value
    			}
    		}
    
           //换一个server重试
    		ck.lastLeader++
    		ck.lastLeader %= len(ck.servers)
    	}
    
    	return ""
    }
    

    这里只给出了Get()的代码,Put()和Append()类似,发送KVServer.Get给一个server,如果这个server不是leader,换一个server重试。直到发给真正的leader,并且leader将这个命令拷贝到大部分其它server后,然后成功执行该命令,Clerk.Get()才会返回。

    KVServer

    再来看下服务端的代码,KVServer处理Clerk的RPC请求:

    type KVServer struct {
    	mu      sync.Mutex
    	me      int
    	rf      *raft.Raft
    	applyCh chan raft.ApplyMsg
    
    	maxraftstate int // snapshot if log grows this big
    
    	// Your definitions here.
       // 保存键值对
    	db 		map[string]string
    	latestReplies map[int64]*LatestReply
    	notify map[int]chan struct{}
    }
    
    func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
    	// Your code here.
    	if _, isLeader := kv.rf.GetState(); !isLeader {
    		reply.WrongLeader = WrongLeader
    		reply.Err = ""
    		return
    	}
    
    	// 防止重复请求
    	kv.mu.Lock()
    	if latestReply, ok := kv.latestReplies[args.Cid]; ok && args.Seq <= latestReply.Seq {
    		reply.WrongLeader = IsLeader
    		reply.Value = latestReply.Reply.Value
    		reply.Err = latestReply.Reply.Err
    		kv.mu.Unlock()
    		return
    	}
    	kv.mu.Unlock()
    
    	command := Op{Operation:"Get", Key:args.Key, Cid:args.Cid, Seq:args.Seq}
    	index, term, _ := kv.rf.Start(command)
    
    	// 阻塞等待结果
    	kv.mu.Lock()
    	ch := make(chan struct{})
    	kv.notify[index] = ch
    	kv.mu.Unlock()
    
    	select {
    	case <-ch:
    		curTerm, isLeader := kv.rf.GetState()
    		DPrintf("%v got notify at index %v, isLeader = %v
    ", kv.me, index, isLeader)
    		if !isLeader || curTerm != term {
    			reply.WrongLeader = WrongLeader
    			reply.Err = ""
    		} else {
    			reply.WrongLeader = IsLeader
    			kv.mu.Lock()
    			if value, ok := kv.db[args.Key]; ok {
    				reply.Value = value
    				reply.Err = OK
    			} else {
    				reply.Err = ErrNoKey
    			}
    			kv.mu.Unlock()
    		}
    
    	}
    
    }
    

    KVServer.db用于保存键值对。
    KVServer.Get()首先判断自己是不是leader,如果不是leader,直接返回,这样Clerk好重试其它server。如果是leader,先在缓存中找,看这个请求是否已经执行过了。
    因为可能出现这么一种情况:如果leader commit一个Entry后立即奔溃了,那么Clerk就收不到响应,那么Clerk会将这个请求发给新的leader,新的leader收到请求后如果不做任何措施,将会二次commit该Log Entry,对于Put()和Append()请求执行两次是不正确的,所以需要一个办法防止一个请求执行两次。
    可以这么做:每个Clerk都分配一个唯一的cid,每个请求分配一个唯一的序列号seq,每成功一个请求,该序列号加一。服务端记录每个客户端cid最近一次apply的请求的序列号seq和对应的响应结果,根据这个信息可知,当再次收到这个客户端的序列号小于seq的请求时,说明已经执行过了,直接返回结果。

    如果之前没有执行过,那么调用

    kv.rf.Start(command)
    

    将Log Entry提交给raft,并且阻塞等待raft将这个Entry拷贝到其它大部分server,从阻塞返回后,说明这个Entry已经被拷贝到大部分server了,并且已经执行了命令,这时可以将结果返回给Clerk了。

    那么在哪里接收raft的消息呢?KVServer在创建的时候会在一个线程中执行如下函数:

    func (kv *KVServer) applyDaemon()  {
    	for appliedEntry := range kv.applyCh {
    		command := appliedEntry.Command.(Op)
    
    		// 执行命令, 过滤已经执行过得命令
    		kv.mu.Lock()
    		if latestReply, ok := kv.latestReplies[command.Cid]; !ok || command.Seq > latestReply.Seq {
    			switch command.Operation {
    			case "Get":
    				latestReply := LatestReply{Seq:command.Seq,}
    				reply := GetReply{}
    				if value, ok := kv.db[command.Key]; ok {
    					reply.Value = value
    				} else {
    					reply.Err = ErrNoKey
    				}
    				latestReply.Reply = reply
    				kv.latestReplies[command.Cid] = &latestReply
    			case "Put":
    				kv.db[command.Key] = command.Value
    				latestReply := LatestReply{Seq:command.Seq}
    				kv.latestReplies[command.Cid] = &latestReply
    			case "Append":
    				kv.db[command.Key] += command.Value
    				latestReply := LatestReply{Seq:command.Seq}
    				kv.latestReplies[command.Cid] = &latestReply
    			default:
    				panic("invalid command operation")
    			}
    		}
    
    		DPrintf("%d applied index:%d, cmd:%v
    ", kv.me, appliedEntry.CommandIndex, command)
    
    		// 通知
    		if ch, ok := kv.notify[appliedEntry.CommandIndex]; ok && ch != nil {
    			DPrintf("%d notify index %d
    ",kv.me, appliedEntry.CommandIndex)
    			close(ch)
    			delete(kv.notify, appliedEntry.CommandIndex)
    		}
    		kv.mu.Unlock()
    	}
    }
    

    kv.applyCh这个chanel会在创建raft的时候传给raft,当某个Log Entry可以被commit的时候,raft会往这个chanel中塞,只要for循环这个kv.applyCh,就能知道已经被commit的Entry,拿到Entry后,根据其中的命令执行相应的操作,然后通知KVServer.Get()继续执行。

    具体代码在:https://github.com/gatsbyd/mit_6.824_2018
    如有错误,欢迎指正:
    15313676365

  • 相关阅读:
    BNU 51002 BQG's Complexity Analysis
    BNU OJ 51003 BQG's Confusing Sequence
    BNU OJ 51000 BQG's Random String
    BNU OJ 50999 BQG's Approaching Deadline
    BNU OJ 50998 BQG's Messy Code
    BNU OJ 50997 BQG's Programming Contest
    CodeForces 609D Gadgets for dollars and pounds
    CodeForces 609C Load Balancing
    CodeForces 609B The Best Gift
    CodeForces 609A USB Flash Drives
  • 原文地址:https://www.cnblogs.com/gatsby123/p/10580757.html
Copyright © 2011-2022 走看看