client-go提供了资源锁相关工具
tools/leaderelection/resourcelock/interface.go中定义了资源锁接口:
type Interface interface { Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) //获取资源锁的所有信息 Create(ctx context.Context, ler LeaderElectionRecord) error //创建资源锁 Update(ctx context.Context, ler LeaderElectionRecord) error //更新资源锁 RecordEvent(string) //通过EventBroadcaster事件管理器记录事件 Identity() string //获取领导者身份标识 Describe() string //获取资源锁的信息 }
k8s中实现了三种资源锁,每种资源锁都实现了对key(资源锁)的操作方法
使用EndpointsResourceLock时,key存的是竞选为领导者节点的信息,通过LeaderElectionRecord结构体进行描述:
type LeaderElectionRecord struct { HolderIdentity string `json:"holderIdentity”` //领导者身份标识,通常为Hostname_<hash值> LeaseDurationSeconds int `json:"leaseDurationSeconds”` //领导者租约的时长 AcquireTime metav1.Time `json:"acquireTime”` //领导者获得锁的时间 RenewTime metav1.Time `json:"renewTime”` //领导者续租的时间 LeaderTransitions int `json:"leaderTransitions”` //领导者选举切换的次数 }
tools/leaderelection/leaderelection.go
func (le *LeaderElector) Run(ctx context.Context) { defer func() { runtime.HandleCrash() le.config.Callbacks.OnStoppedLeading() }() if !le.acquire(ctx) { //尝试获得锁 return } ctx, cancel := context.WithCancel(ctx) defer cancel() go le.config.Callbacks.OnStartedLeading(ctx) //通过回调函数执行主要逻辑 le.renew(ctx) //对资源锁续约 }
资源锁获取过程:
func (le *LeaderElector) acquire(ctx context.Context) bool { ctx, cancel := context.WithCancel(ctx) defer cancel() succeeded := false desc := le.config.Lock.Describe() wait.JitterUntil(func() { succeeded = le.tryAcquireOrRenew(ctx) //获取资源锁 le.maybeReportTransition() if !succeeded { return //获取资源锁失败,return等待下一次定时获取 } le.config.Lock.RecordEvent("became leader") le.metrics.leaderOn(le.config.Name) cancel() }, le.config.RetryPeriod, JitterFactor, true, ctx.Done()) //通过wait.JitterUntil定时器定时执行匿名函数 return succeeded }
tryAcquireOrRenew(ctx):
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { now := metav1.Now() leaderElectionRecord := rl.LeaderElectionRecord{ HolderIdentity: le.config.Lock.Identity(), LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), RenewTime: now, AcquireTime: now, } oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx) //获取资源锁 if err != nil { if !errors.IsNotFound(err) { return false } if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil { //创建资源锁 return false //创建失败 } le.observedRecord = leaderElectionRecord le.observedTime = le.clock.Now() return true //当前节点成为leader,返回 } if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) { le.observedRecord = *oldLeaderElectionRecord le.observedRawRecord = oldLeaderElectionRawRecord le.observedTime = le.clock.Now() //更新本地缓存的租约信息 } if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && !le.IsLeader() { return false //当前leader租约未到期,暂时不能抢占,返回 } if le.IsLeader() { //当前节点是leader leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime //资源锁获得时间保持不变 leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions //leader切换次数保持不变 } else { leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 //领导者切换次数+1,抢占资源锁 } if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil { //尝试更新租约 return false //更新不成功,返回 } le.observedRecord = leaderElectionRecord le.observedTime = le.clock.Now() return true }
资源锁续约过程:
func (le *LeaderElector) renew(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() wait.Until(func() { timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline) defer timeoutCancel() err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) { return le.tryAcquireOrRenew(timeoutCtx), nil //返回true说明续约成功;返回false则退出并执行le.release释放资源锁 }, timeoutCtx.Done()) //通过wait.PollImmediateUntil定时器定时执行匿名函数 le.maybeReportTransition() desc := le.config.Lock.Describe() if err == nil { return } le.config.Lock.RecordEvent("stopped leading") le.metrics.leaderOff(le.config.Name) cancel() //更新租约失败 }, le.config.RetryPeriod, ctx.Done()) if le.config.ReleaseOnCancel { le.release() } }