7 Etcd服务端实现
7.1 Etcd启动
Etcd有多种启动方式,我们从最简单的方式入手,也就是从embed的etcd.go开始启动,最后会启动EtcdServer。
先看看etcd.go中的启动代码:
func StartEtcd(inCfg *Config) (e *Etcd, err error)
从StartEtcd方法启动etcd服务,参数是初始配置信息config,启动集群间监听进程和客户端监听进程,最后启动EtcdServer。
主要代码:
e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
cfg := &e.cfg
if e.Peers, err = startPeerListeners(cfg); err != nil {
return
}
if e.sctxs, err = startClientListeners(cfg); err != nil {
return
}
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
return
}
e.Server.Start()
startPeerListeners启动Peer监听,等待集群中其他机器连接自己。startClientListeners启动客户端监听Socket,等待客户端请求并响应。最后调用Start方法启动EtcdServer。
7.2 EtcdServer
EtcdServer位于etcdserver/server.go,定义了Server接口和EtcdServer对象。EtcdServer从逻辑上讲代表了一个完整的Etcd服务。
图7.1 Etcd服务端的功能示意图
Etcd服务端主要提供两大类客户端接口:
(1)集群配置
由memberHandler负责,提供添加集群成员,删除成员,更新成员信息三种接口服务。
(2)KV键值:由keysHandler负责。
KeysHandler接收到客户端请求后,调用EtcdServer的Do方法处理请求,Watcher类的客户端请求信息同样包含在keysHandler中了。KV键值响应主要在v2_server.go中定义,etcd新版本同时还提供了v3操作命令集,本文不讨论v3的源码实现。
7.2.1 接口定义
Etcdserver/server.go中定义了Server接口,是服务端的主接口,其中Do方法处理客户端请求。
Server.go中定义了EtcdServer对象,它是Server接口的实现类。Server中的Do接口是专门用来响应客户端请求的。
Server接口定义:
-
start
读取配置文件,启动本Server。
-
stop
停止本Server
-
ID
获取本节点server的ID,集群中所有的机器都有唯一ID,用于标识自己。
-
Leader
获取leader的ID
-
Do
处理客户群请求,返回处理结果。
定义:
func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error)
在server.go中并没有看到Go接口的实现,其实它是在v2_server.go文件中定义的。
-
Process
Process(ctx context.Context, m raftpb.Message) error
处理Raft消息。
-
AddMember
向Etcd集群中增加一台服务器,新增服务器的ID必须唯一标识。
-
RemoveMember
从集群删除一台服务器,删除服务器的ID必须已经存在于集群中。
-
UpdateMember
修改集群成员属性,如果成员ID不存在则返回ErrIDNotFound错误。
7.2.2 实体定义
EtcdServer表示一个独立运行的Etcd节点。
type EtcdServer struct {
inflightSnapshots int64
appliedIndex uint64
committedIndex uint64.
consistIndex consistentIndex
Cfg *ServerConfig
readych chan struct{}
r raftNode
snapCount uint64
w wait.Wait
readMu sync.RWMutex
readwaitc chan struct{}
readNotifier *notifier
stop chan struct{}
stopping chan struct{}
done chan struct{}
errorc chan error
id types.ID
attributes membership.Attributes
cluster *membership.RaftCluster
store store.Store
applyV2 ApplierV2
applyV3 applierV3
applyV3Base applierV3
applyWait wait.WaitTime
kv mvcc.ConsistentWatchableKV
lessor lease.Lessor
bemu sync.Mutex
be backend.Backend
authStore auth.AuthStore
alarmStore *alarm.AlarmStore
stats *stats.ServerStats
lstats *stats.LeaderStats
SyncTicker *time.Ticker
compactor *compactor.Periodic
peerRt http.RoundTripper
reqIDGen *idutil.Generator
forceVersionC chan struct{}
wgMu sync.RWMutex
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
leadTimeMu sync.RWMutex
leadElectedTime time.Time
}
7.2.3 Do
Do定义在v2_server.go中,处理客户群请求包,调用raftNode的Propose方法。在上一章已经介绍过。
对于KV键值请求,Do方法是在etcdServer/v2_server.go中定义的,它的相关代码逻辑如下:
func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
r.ID = s.reqIDGen.Next()
if r.Method == "GET" && r.Quorum {
r.Method = "QGET"
}
v2api := (v2API)(&v2apiStore{s})
switch r.Method {
case "POST":
return v2api.Post(ctx, &r)
case "PUT":
return v2api.Put(ctx, &r)
case "DELETE":
return v2api.Delete(ctx, &r)
case "QGET":
return v2api.QGet(ctx, &r)
case "GET":
return v2api.Get(ctx, &r)
case "HEAD":
return v2api.Head(ctx, &r)
}
return Response{}, ErrUnknownMethod
}
可以看到对客户端的KV键值请求,最终是通过v2apiStore的相关方法来实现。客户端的命令前缀为"/v2/keys"。支持的命令有以下这些:
-
GET/QGET:读取键值
-
POST:创建一个新的KV键值
-
PUT:重新设置键值的值
-
DELETE:删除已有键值
v2apiStore包含了EtcdServer引用。
type v2apiStore struct{ s *EtcdServer }
除了GET命令,其余Post,Put和Delete每个写操作请求最后都是通过processRaftRequest方法来处理的。
我们先看看GET命令的处理:
func (a *v2apiStore) Get(ctx context.Context, r *pb.Request) (Response, error) {
if r.Wait {
wc, err := a.s.store.Watch(r.Path, r.Recursive, r.Stream, r.Since)
if err != nil {
return Response{}, err
}
return Response{Watcher: wc}, nil
}
ev, err := a.s.store.Get(r.Path, r.Recursive, r.Sorted)
if err != nil {
return Response{}, err
}
return Response{Event: ev}, nil
}
看到对于普通的GET操作,直接调用store.Get方法获取KV值返回给客户端,如果是Watcher操作,则返回Watcher给客户端,客户端后续通过Watcher接口读取变化值。
对于POST,PUT,DELETE命令,走下述Propose流程处理。
图7.2 Propose流程示意图
比如"DELETE"命令。
func (a *v2apiStore) Delete(ctx context.Context, r *pb.Request) (Response, error) {
return a.processRaftRequest(ctx, r)
}
processRaftRequest方法的源码如下:
func (a *v2apiStore) processRaftRequest(ctx context.Context, r *pb.Request) (Response, error) {
data, err := r.Marshal()
if err != nil {
return Response{}, err
}
ch := a.s.w.Register(r.ID)
start := time.Now()
a.s.r.Propose(ctx, data)
proposalsPending.Inc()
defer proposalsPending.Dec()
select {
case x := <-ch:
resp := x.(Response)
return resp, resp.err
case <-ctx.Done():
proposalsFailed.Inc()
a.s.w.Trigger(r.ID, nil) // GC wait
return Response{}, a.s.parseProposeCtxErr(ctx.Err(), start)
case <-a.s.stopping:
}
return Response{}, ErrStopped
}
-
data, err := r.Marshal()语句:
这条语句从pb.request得到请求数据data
-
ch := a.s.w.Register(r.ID)语句:
注册chain,一直等待直到ch有响应数据。
Register方法是wait的Register方法。该方法直到调用wait的Trigger方法后才会有数据从而触发select在该Register Id上线程被唤醒。Wait在pkg/wait中定义。
-
a.s.r.Propose(ctx, data)
Propose方法在node中定义,raftNode在etcdserver/node.go文件中。Propose将写事务请求发给Leader,等待集群间同步。Propose集群间同步消息完成后会唤醒a.s.w.Register语句。
调用raft/node的Propose方法处理写事务请求,进一步调用step方法将写事务封装成MsgProp消息并传递给集群中其他机器。
func (n *node) Propose(ctx context.Context, data []byte) error { return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) }
step会调用StepFunc函数来处理MsgProp消息,根据leader,follower,candidate等运行状态分别调用不同的实现函数。
-
select语句
select … case …语句类似于Socket通信中的select语句,它的含义是只要任意一个case语句有数据返回就往下执行,否则就阻塞在这里让出CPU给其他线程执行。
case x := <-ch:当ch有值时,将ch赋值给x变量,同时唤醒case语句被执行,这里将执行以下代码:
resp := x.(Response) return resp, resp.err
此时将ch中的返回结果Response回复给调用者(即客户端)。
case <-ctx.Done():说明上下文被中断,Context的Done()被触发,此时写事务执行失败,返回空Response。
7.2.4 初始化
Etcd服务端主要由5大组件构成,他们的分工如下:
-
etcdServer:主进程,相当于整个Etcd的容器,包含了raftNode,WAL,snapshotter等多个关键组件。
-
raftNode:执行raft协议,保证写事务的集群一致性维护。
-
Store:管理维护Etcd数据库
-
Wal:管理事务日志
-
Snapshotter:负责数据快照,管理store数据库在内存中和磁盘上的相互转换。
raftNode除了负责集群间raft消息交互,还负责事务和快照的存储,保持数据一致性。
Etcd定义了一个storage数据结构,一起负责事务和快照。
type storage struct {
*wal.WAL
*snap.Snapshotter
}
storage中没有指定WAL和Snapshotter的变量名称,这两个类的方法都可直接通过storage来调用,比如WAL的Save方法,可以通过storage.Save来调用,也可以通过storage.WAL.Save来调用,这两者是等价的,在阅读源码的时候要注意这一点,否则对Go语法不太了解的读者会感到迷惑。
func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
st := store.New(StoreClusterPrefix, StoreKeysPrefix)
var (
w *wal.WAL
n raft.Node
s *raft.MemoryStorage
id types.ID
cl *membership.RaftCluster
)
haveWAL := wal.Exist(cfg.WALDir())
ss := snap.New(cfg.SnapDir())
bepath := filepath.Join(cfg.SnapDir(), databaseFilename)
beExist := fileutil.Exist(bepath)
switch {
case haveWAL:
snapshot, err = ss.Load()
if snapshot != nil {
if err = st.Recovery(snapshot.Data); err != nil {
plog.Panicf("recovered store from snapshot error: %v", err)
}
}
cfg.Print()
if !cfg.ForceNewCluster {
id, cl, n, s, w = restartNode(cfg, snapshot)
} else {
id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
}
cl.SetStore(st)
cl.SetBackend(be)
cl.Recover(api.UpdateCapability)
}
if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
return