zoukankan      html  css  js  c++  java
  • Etcd中Raft linearizable read实现

    linearizable

    有点疑惑,不确定是现在浏览的版本没开发完全,还是没有按照论文的linearizable来实现。

    按照论文所说,在客户端请求的时候,实际上是一个强一致的 exactly once的过程。

    在etcd中,只看到了read的 linearizable ,并且用到的地方是在诸如读取节点列表,开始事务等操作中。

    可以从2个层面来验证写与读一致性

    business data which stored in etcd

    因为raft只有leader是写的入口,所以保证数据的顺序是可以在leader做处理的。

    业务数据的顺序,etcd并不能保证顺序,因为入口之外的原因太多:

    并发,业务方的数据是并发过来的,那么到达leader的先后性无法保证

    业务请求策略,业务数据是并发且被路由到不通的follower,到达leader的先后性也是无法保证的

    网络延迟,那更无法保证绝对的先后性了

    所以leader是无法保证业务数据的绝对先后,这是由client或者说使用方来设计的,和数据库是一样的道理。

    leader能保证的是,进入其内部后数据保存的一致性

    所以在业务数据层面,etcd无法保证其先后性,除非提供特殊的协议,事务算半种,要绝对的一致性,得是嵌入或者包装业务数据的协议。

    所以我理解的是,没有在业务数据层面用写一致的必要。

    application config data of etcd

    etcd中实现读一致的入口全部都是和应用配置相关的,例如节点列表、事务、降级,下图是使用到的地方

    判断读一致的逻辑是

    confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId)
    if isStopped(err) {
       return
    }
    if err != nil {
       nr.notify(err)
       continue
    }
    
    trace.Step("read index received")
    
    trace.AddField(traceutil.Field{Key: "readStateIndex", Value: confirmedIndex})
    
    appliedIndex := s.getAppliedIndex()
    trace.AddField(traceutil.Field{Key: "appliedIndex", Value: strconv.FormatUint(appliedIndex, 10)})
    
    if appliedIndex < confirmedIndex {
       select {
       case <-s.applyWait.Wait(confirmedIndex):
       case <-s.stopping:
          return
       }
    }
    // unblock all l-reads requested at indices before confirmedIndex
    nr.notify(nil)
    

    requestCurrentIndex会返回当前leader正在处理的commitIndex,这里设计很巧妙,简单来说是返回commitIndex,然后节点和自己的appiledIndex相比较,直到appliedIndex >= commitIndex ,才算做读一致完成。

    结合读一致使用的地方以及逻辑这2点,保证的是所有请求时当下leader中在处理的数据在集群内都被处理了,可能后面又变了,例如节点变更了,所以实际上是瞬时一致并且最终一致。

    例如txn开启事务,保证了请求时leader的数据都处理好了,因为leader要处理的数据也包括节点变更的配置数据。

    所保证的是脏读的场景对于单次请求是有效的,即单次请求是不会有脏读的,但这次请求返回的数据与下次请求返回的数据仍然有可能不一致,所以是最终一致的。

    写一致是否有必要在这里实现?如果这次的请求要求写一致,那么下次请求就要依赖于这次请求的写一致,这次请求还没结束,后面的请求全部都要wating,并且无穷级联下去。所以选择了最终一致来应对写一致。这是个典型的base。

    结合以上2个层面的分析,写一致在业务数据层面是由使用方来设计,配置数据层面是由最终一致代替。

    读一致业务数据层面也是由使用方来设计,配置数据层面保证当次请求的读取没有脏数据,也由最终一致代替。

    Raft log replication flow

    还是得放上这张图,因为整个通讯过程仍然是遵照raft消息的方式,只不过这里用的消息类型是 MsgReadIndex,同proposal的区别是没有commit的过程,仅仅是将MsgReadIndex消息发送给leader,leader会回复ReadIndexResp。

    Logic flow

    Trigger linearizable read

    读一致的入口在上面已列举,此处随意找一个

    func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
       if isTxnReadonly(r) {
          trace := traceutil.New("transaction",
             s.Logger(),
             traceutil.Field{Key: "read_only", Value: true},
          )
          ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
          if !isTxnSerializable(r) {
             err := s.linearizableReadNotify(ctx)
             trace.Step("agreement among raft nodes before linearized reading")
             if err != nil {
                return nil, err
             }
          }
    

    s.linearizableReadNotify(ctx),等待读一致处理完成

    开始读一致的逻辑

    func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
       s.readMu.RLock()
       nc := s.readNotifier
       s.readMu.RUnlock()
    
       // signal linearizable loop for current notify if it hasn't been already
       select {
       case s.readwaitc <- struct{}{}:
       default:
       }
    
       // wait for read state notification
       select {
       case <-nc.c:
          return nc.err
       case <-ctx.Done():
          return ctx.Err()
       case <-s.done:
          return ErrStopped
       }
    }
    

    这是与 linearizableReadLoop 相互通讯的胶水函数

    s.readwaitc 触发linearizableReadLoop逻辑开始查询readindex

    s.readNotifier 返回信号表明读一致处理完成

    Read loop on server starting

    在server 启动时,会启动读一致的自旋,即上面处理读一致的逻辑

    func (s *EtcdServer) linearizableReadLoop() {
       for {
          requestId := s.reqIDGen.Next()
          leaderChangedNotifier := s.LeaderChangedNotify()
          select {
          case <-leaderChangedNotifier:
             continue
          case <-s.readwaitc:
          case <-s.stopping:
             return
          }
    
          // as a single loop is can unlock multiple reads, it is not very useful
          // to propagate the trace from Txn or Range.
          trace := traceutil.New("linearizableReadLoop", s.Logger())
    
          nextnr := newNotifier()
          s.readMu.Lock()
          nr := s.readNotifier
          s.readNotifier = nextnr
          s.readMu.Unlock()
    
          confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId)
          if isStopped(err) {
             return
          }
          if err != nil {
             nr.notify(err)
             continue
          }
    
          trace.Step("read index received")
    
          trace.AddField(traceutil.Field{Key: "readStateIndex", Value: confirmedIndex})
    
          appliedIndex := s.getAppliedIndex()
          trace.AddField(traceutil.Field{Key: "appliedIndex", Value: strconv.FormatUint(appliedIndex, 10)})
    
          if appliedIndex < confirmedIndex {
             select {
             case <-s.applyWait.Wait(confirmedIndex):
             case <-s.stopping:
                return
             }
          }
          // unblock all l-reads requested at indices before confirmedIndex
          nr.notify(nil)
          trace.Step("applied index is now lower than readState.Index")
    
          trace.LogAllStepsIfLong(traceThreshold)
       }
    }
    

    confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId)

    requestId每次都会生成,目的是区分每一次的读,因为raft在自旋,读取的操作也可能是并发,所以需要有唯一区分的identity

    s.readwaitc,server中专门为读一致创建的channel,生成完requestId后,会block在这里,等待signal来触发读一致的后续逻辑

    s.readNotifier 读一致通知的channel,appliedIndex >= confirmedIndex 时,通过这个channel通知调用者,读一致的逻辑已经完成

    Request of read current index

    func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) {
       err := s.sendReadIndex(requestId)
       if err != nil {
          return 0, err
       }
    
       lg := s.Logger()
       errorTimer := time.NewTimer(s.Cfg.ReqTimeout())
       defer errorTimer.Stop()
       retryTimer := time.NewTimer(readIndexRetryTime)
       defer retryTimer.Stop()
    
       firstCommitInTermNotifier := s.FirstCommitInTermNotify()
    
       for {
          select {
          case rs := <-s.r.readStateC:
             requestIdBytes := uint64ToBigEndianBytes(requestId)
             gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes)
             if !gotOwnResponse {
                // a previous request might time out. now we should ignore the response of it and
                // continue waiting for the response of the current requests.
                responseId := uint64(0)
                if len(rs.RequestCtx) == 8 {
                   responseId = binary.BigEndian.Uint64(rs.RequestCtx)
                }
                lg.Warn(
                   "ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
                   zap.Uint64("sent-request-id", requestId),
                   zap.Uint64("received-request-id", responseId),
                )
                slowReadIndex.Inc()
                continue
             }
             return rs.Index, nil
          case <-leaderChangedNotifier:
             readIndexFailed.Inc()
             // return a retryable error.
             return 0, ErrLeaderChanged
          case <-firstCommitInTermNotifier:
             firstCommitInTermNotifier = s.FirstCommitInTermNotify()
             lg.Info("first commit in current term: resending ReadIndex request")
             err := s.sendReadIndex(requestId)
             if err != nil {
                return 0, err
             }
             retryTimer.Reset(readIndexRetryTime)
             continue
          case <-retryTimer.C:
             lg.Warn(
                "waiting for ReadIndex response took too long, retrying",
                zap.Uint64("sent-request-id", requestId),
                zap.Duration("retry-timeout", readIndexRetryTime),
             )
             err := s.sendReadIndex(requestId)
             if err != nil {
                return 0, err
             }
             retryTimer.Reset(readIndexRetryTime)
             continue
          case <-errorTimer.C:
             lg.Warn(
                "timed out waiting for read index response (local node might have slow network)",
                zap.Duration("timeout", s.Cfg.ReqTimeout()),
             )
             slowReadIndex.Inc()
             return 0, ErrTimeout
          case <-s.stopping:
             return 0, ErrStopped
          }
       }
    }
    

    s.sendReadIndex(requestId),包裹requestId,将消息向raft模块传递

    func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
       return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
    }
    

    会将消息下放到raft模块中stepFunc来处理

    如果是follower,会发送至leader

    case pb.MsgReadIndex:
       if r.lead == None {
          r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
          return nil
       }
       m.To = r.lead
       r.send(m)
    

    如果是leader

    case pb.MsgReadIndex:
       // only one voting member (the leader) in the cluster
       if r.prs.IsSingleton() {
          if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
             r.send(resp)
          }
          return nil
       }
    
       // Postpone read only request when this leader has not committed
       // any log entry at its term.
       if !r.committedEntryInCurrentTerm() {
          r.pendingReadIndexMessages = append(r.pendingReadIndexMessages, m)
          return nil
       }
    
       sendMsgReadIndexResponse(r, m)
    
       return nil
    

    sendMsgReadIndexResponse(r, m) 回复read index给请求者

    func sendMsgReadIndexResponse(r *raft, m pb.Message) {
       // thinking: use an internally defined context instead of the user given context.
       // We can express this in terms of the term and index instead of a user-supplied value.
       // This would allow multiple reads to piggyback on the same message.
       switch r.readOnly.option {
       // If more than the local vote is needed, go through a full broadcast.
       case ReadOnlySafe:
          r.readOnly.addRequest(r.raftLog.committed, m)
          // The local node automatically acks the request.
          r.readOnly.recvAck(r.id, m.Entries[0].Data)
          r.bcastHeartbeatWithCtx(m.Entries[0].Data)
       case ReadOnlyLeaseBased:
          if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
             r.send(resp)
          }
       }
    }
    

    可以得知,不论哪种情况,leader都会将 commited index 作为read index 回复给请求者

    func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message {
       if req.From == None || req.From == r.id {
          r.readStates = append(r.readStates, ReadState{
             Index:      readIndex,
             RequestCtx: req.Entries[0].Data,
          })
          return pb.Message{}
       }
       return pb.Message{
          Type:    pb.MsgReadIndexResp,
          To:      req.From,
          Index:   readIndex,
          Entries: req.Entries,
       }
    }
    

    Response of read current index

    当follower接收到 MsgReadIndexResp后

    case pb.MsgReadIndexResp:
       if len(m.Entries) != 1 {
          r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
          return nil
       }
       r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
    

    RequestCtx: m.Entries[0].Data} 这里放的就是 requestId

    回到 request current index

    for {
    		select {
    		case rs := <-s.r.readStateC:
    			requestIdBytes := uint64ToBigEndianBytes(requestId)
    			gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes)
    			if !gotOwnResponse {
    				// a previous request might time out. now we should ignore the response of it and
    				// continue waiting for the response of the current requests.
    				responseId := uint64(0)
    				if len(rs.RequestCtx) == 8 {
    					responseId = binary.BigEndian.Uint64(rs.RequestCtx)
    				}
    				lg.Warn(
    					"ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
    					zap.Uint64("sent-request-id", requestId),
    					zap.Uint64("received-request-id", responseId),
    				)
    				slowReadIndex.Inc()
    				continue
    			}
    			return rs.Index, nil
    

    s.r.readStateC 是在 raft Ready里面传过来的

    if len(rd.ReadStates) != 0 {
       select {
       case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
       case <-time.After(internalTimeout):
          r.lg.Warn("timed out sending read state", zap.Duration("timeout", internalTimeout))
       case <-r.stopped:
          return
       }
    }
    

    因为 read current index中的for循环,直到 requestId相等,返回readIndex至 linearizableReadLoop

    再回到 linearizableReadLoop

    for {
       requestId := s.reqIDGen.Next()
       leaderChangedNotifier := s.LeaderChangedNotify()
       select {
       case <-leaderChangedNotifier:
          continue
       case <-s.readwaitc:
       case <-s.stopping:
          return
       }
       //省略若干
       if appliedIndex < confirmedIndex {
    			select {
    			case <-s.applyWait.Wait(confirmedIndex):
    			case <-s.stopping:
    				return
    			}
    		}
    		// unblock all l-reads requested at indices before confirmedIndex
    		nr.notify(nil)
    }
    

    s.applyWait.Wait(confirmedIndex)

    leader虽然返回了read index,但还没有在本节点apply,一定要apply之后才会通知读一致完成

    因为apply 才会存储,如果没有存储,如果集群出现宕机,仍然会有脏读的可能。

    // unblock all l-reads requested at indices before confirmedIndex
    nr.notify(nil)

    通知读一致处理完成。

    Summary

    从follower读取leader当前的commited index,follower接收到后,直到apply完成,这几个步骤构成了避免的脏读的过程。

    所以返回的数据是当前时间点内部一致的。

  • 相关阅读:
    将cvs迁移到svn
    wincvs,cvs,svn
    Open Source Camp 北京 2008技术交流盛会 感悟
    21,22,23,24日外出纪要
    10.31,11.1外出纪要
    虚拟经济区一行有感
    29,30号活动预告
    ror 2.1.2migration
    netbeans 6.5 release
    [转载]ruby on rails 每周精选二
  • 原文地址:https://www.cnblogs.com/dopeter/p/raft_impl_etcd_linearizable_read.html
Copyright © 2011-2022 走看看