zoukankan      html  css  js  c++  java
  • MIT-6.824 Lab 3: Fault-tolerant Key/Value Service

    概述

    lab2中实现了raft协议,本lab将在raft之上实现一个可容错的k/v存储服务,第一部分是实现一个不带日志压缩的版本,第二部分是实现日志压缩。时间原因我只完成了第一部分。

    设计思路

    kvserver

    如上图,lab2实现了raft协议,本lab将实现kvserver。每个raft都关联一个kvserver,Clerks发送Put(), Append(), Get() RPC给leader服务器中的kvserver,kvserver收到请求后将操作打包成Log Entry提交给raft,然后阻塞等待raft将这个Entry拷贝到其它server,当Log Entry被拷贝到大部分的server后,leader 的raft会通知kvserver(raft往管道中塞comitted Entry,kvserver通过读这个管道获取通知),kvserver执行命令,然后响应Clerk。

    Clerk

    客户端通过Clerk发送请求,来看下Clerk代码:

    type Clerk struct {
    	servers []*labrpc.ClientEnd
    	// You will have to modify this struct.
    
    	lastLeader	int
    	cid		    int64
    	seq			int
    }
    
    func (ck *Clerk) Get(key string) string {
    
    	// You will have to modify this function.
        // 参数: 要读的key, 当前clerk的id,  请求序列号
    	getArgs := GetArgs{Key: key, Cid:ck.cid, Seq:ck.seq}
    	reply := GetReply{}
    
    	for {
    		doneCh := make(chan bool, 1)
    		go func() {
               //发送Get() RPC
    			ok := ck.servers[ck.lastLeader].Call("KVServer.Get", &getArgs, &reply)
    			doneCh <- ok
    		}()
    
    		select {
    		case <-time.After(600 * time.Millisecond):
    			DPrintf("clerk(%d) retry PutAppend after timeout
    ", ck.cid)
    			continue
    		case ok := <- doneCh:
               //收到响应后,并且是leader返回的,那么说明这个命令已经执行了
    			if ok && reply.WrongLeader != WrongLeader {
    				//请求序列号加1
                  ck.seq++
    				return reply.Value
    			}
    		}
    
           //换一个server重试
    		ck.lastLeader++
    		ck.lastLeader %= len(ck.servers)
    	}
    
    	return ""
    }
    

    这里只给出了Get()的代码,Put()和Append()类似,发送KVServer.Get给一个server,如果这个server不是leader,换一个server重试。直到发给真正的leader,并且leader将这个命令拷贝到大部分其它server后,然后成功执行该命令,Clerk.Get()才会返回。

    KVServer

    再来看下服务端的代码,KVServer处理Clerk的RPC请求:

    type KVServer struct {
    	mu      sync.Mutex
    	me      int
    	rf      *raft.Raft
    	applyCh chan raft.ApplyMsg
    
    	maxraftstate int // snapshot if log grows this big
    
    	// Your definitions here.
       // 保存键值对
    	db 		map[string]string
    	latestReplies map[int64]*LatestReply
    	notify map[int]chan struct{}
    }
    
    func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
    	// Your code here.
    	if _, isLeader := kv.rf.GetState(); !isLeader {
    		reply.WrongLeader = WrongLeader
    		reply.Err = ""
    		return
    	}
    
    	// 防止重复请求
    	kv.mu.Lock()
    	if latestReply, ok := kv.latestReplies[args.Cid]; ok && args.Seq <= latestReply.Seq {
    		reply.WrongLeader = IsLeader
    		reply.Value = latestReply.Reply.Value
    		reply.Err = latestReply.Reply.Err
    		kv.mu.Unlock()
    		return
    	}
    	kv.mu.Unlock()
    
    	command := Op{Operation:"Get", Key:args.Key, Cid:args.Cid, Seq:args.Seq}
    	index, term, _ := kv.rf.Start(command)
    
    	// 阻塞等待结果
    	kv.mu.Lock()
    	ch := make(chan struct{})
    	kv.notify[index] = ch
    	kv.mu.Unlock()
    
    	select {
    	case <-ch:
    		curTerm, isLeader := kv.rf.GetState()
    		DPrintf("%v got notify at index %v, isLeader = %v
    ", kv.me, index, isLeader)
    		if !isLeader || curTerm != term {
    			reply.WrongLeader = WrongLeader
    			reply.Err = ""
    		} else {
    			reply.WrongLeader = IsLeader
    			kv.mu.Lock()
    			if value, ok := kv.db[args.Key]; ok {
    				reply.Value = value
    				reply.Err = OK
    			} else {
    				reply.Err = ErrNoKey
    			}
    			kv.mu.Unlock()
    		}
    
    	}
    
    }
    

    KVServer.db用于保存键值对。
    KVServer.Get()首先判断自己是不是leader,如果不是leader,直接返回,这样Clerk好重试其它server。如果是leader,先在缓存中找,看这个请求是否已经执行过了。
    因为可能出现这么一种情况:如果leader commit一个Entry后立即奔溃了,那么Clerk就收不到响应,那么Clerk会将这个请求发给新的leader,新的leader收到请求后如果不做任何措施,将会二次commit该Log Entry,对于Put()和Append()请求执行两次是不正确的,所以需要一个办法防止一个请求执行两次。
    可以这么做:每个Clerk都分配一个唯一的cid,每个请求分配一个唯一的序列号seq,每成功一个请求,该序列号加一。服务端记录每个客户端cid最近一次apply的请求的序列号seq和对应的响应结果,根据这个信息可知,当再次收到这个客户端的序列号小于seq的请求时,说明已经执行过了,直接返回结果。

    如果之前没有执行过,那么调用

    kv.rf.Start(command)
    

    将Log Entry提交给raft,并且阻塞等待raft将这个Entry拷贝到其它大部分server,从阻塞返回后,说明这个Entry已经被拷贝到大部分server了,并且已经执行了命令,这时可以将结果返回给Clerk了。

    那么在哪里接收raft的消息呢?KVServer在创建的时候会在一个线程中执行如下函数:

    func (kv *KVServer) applyDaemon()  {
    	for appliedEntry := range kv.applyCh {
    		command := appliedEntry.Command.(Op)
    
    		// 执行命令, 过滤已经执行过得命令
    		kv.mu.Lock()
    		if latestReply, ok := kv.latestReplies[command.Cid]; !ok || command.Seq > latestReply.Seq {
    			switch command.Operation {
    			case "Get":
    				latestReply := LatestReply{Seq:command.Seq,}
    				reply := GetReply{}
    				if value, ok := kv.db[command.Key]; ok {
    					reply.Value = value
    				} else {
    					reply.Err = ErrNoKey
    				}
    				latestReply.Reply = reply
    				kv.latestReplies[command.Cid] = &latestReply
    			case "Put":
    				kv.db[command.Key] = command.Value
    				latestReply := LatestReply{Seq:command.Seq}
    				kv.latestReplies[command.Cid] = &latestReply
    			case "Append":
    				kv.db[command.Key] += command.Value
    				latestReply := LatestReply{Seq:command.Seq}
    				kv.latestReplies[command.Cid] = &latestReply
    			default:
    				panic("invalid command operation")
    			}
    		}
    
    		DPrintf("%d applied index:%d, cmd:%v
    ", kv.me, appliedEntry.CommandIndex, command)
    
    		// 通知
    		if ch, ok := kv.notify[appliedEntry.CommandIndex]; ok && ch != nil {
    			DPrintf("%d notify index %d
    ",kv.me, appliedEntry.CommandIndex)
    			close(ch)
    			delete(kv.notify, appliedEntry.CommandIndex)
    		}
    		kv.mu.Unlock()
    	}
    }
    

    kv.applyCh这个chanel会在创建raft的时候传给raft,当某个Log Entry可以被commit的时候,raft会往这个chanel中塞,只要for循环这个kv.applyCh,就能知道已经被commit的Entry,拿到Entry后,根据其中的命令执行相应的操作,然后通知KVServer.Get()继续执行。

    具体代码在:https://github.com/gatsbyd/mit_6.824_2018
    如有错误,欢迎指正:
    15313676365

  • 相关阅读:
    outer join,inner join,left join,right join的区别是什么?
    hdu 数值转换
    hdu 4
    hdu
    NET Framework数据提供程序的4种核心对象及其作用
    时间复杂度和空间复杂度
    hdu 1004
    hdu 级数求和
    [记录]微软生成wsdl代理类
    css position:relative 在IE6, 7下有bug
  • 原文地址:https://www.cnblogs.com/gatsby123/p/10580757.html
Copyright © 2011-2022 走看看