网站设计团队名称,游戏网站建设成功案例,个人备案的网站 做企业站,wordpress主题 免费 cmsleader-election选主机制
1 为什么需要leader-election#xff1f;
在集群中存在某种业务场景#xff0c;一批相同功能的进程同时运行#xff0c;但是同一时刻#xff0c;只能有一个工作#xff0c;只有当正在工作的进程异常时#xff0c;才会由另一个进程进行接管。这…leader-election选主机制
1 为什么需要leader-election
在集群中存在某种业务场景一批相同功能的进程同时运行但是同一时刻只能有一个工作只有当正在工作的进程异常时才会由另一个进程进行接管。这种业务逻辑通常用于实现一主多从。
如果有人认为传统应用需要部署多个通常是为了容灾而在k8s上运行的Pod受控制器管理如果Pod异常或者Pod所在宿主机宕机Pod是可以漂移到其他节点的所以不需要部署多个Pod只需要部署一个Pod就行。k8s上的Pod确实可以漂移但是如果宿主机宕机k8s认为Pod异常并在其他节点重建Pod是有周期的不能在查询不到Pod状态时立刻就将Pod驱逐掉也许节点只是临时不可用呢例如负载很高因此判断宿主机宕机需要有个时间短。
k8s节点故障时工作负载的调度周期
因此在k8s中运行一主多从是为了能够实现主的快速切换。
2 kubernetes中的leader-election
k8s中也有这种业务场景在多master场景下只能有一个master上的进程工作例如scheduler和controller-manager。以scheduler来说它的工作是给Pod分配合适的宿主机如果有多个scheduler同时运行就会出现竞争因此如果允许这种场景存在的话就又需要实现一种调度逻辑某个Pod由哪个scheduler进行调度这相当于又要实现一层调度。但是实际上调度工作是相对比较简单的不需要多个scheduler进行负载只需要一个scheduler进行调度就行。因此k8s提供了leader-election的能力。
leader-election的具体工作方式是各候选者将自身的信息写入某一个资源如果写成功某个后选择就称为了主其他就是备同时在之后主会定期更新资源的时间如果超过一段时间未更新时间其他候选者发现资源的最后更新时间超过一定值就会认为主挂掉然后会向资源写入自身信息从而成为新的主。
基于该原理有一个现成的镜像可以使用instana/leader-elector。
apiVersion: apps/v1
kind: Deployment
metadata:labels:app: leadername: leader
spec:replicas: 3selector:matchLabels:app: leadertemplate:metadata:labels:app: leaderspec:containers:- image: instana/leader-elector:0.5.13name: leadercommand: [/app/server,--id$(POD_NAME),--electionleader-election,--http0.0.0.0:4040]env:- name: POD_NAMEvalueFrom:fieldRef:fieldPath: metadata.name上面的yaml有两个需要注意的地方
/app/server是二进制程序id参数是候选者的唯一标识election是资源名称http是应用监听的IP和端口号将pod的名称作为id参数也就是候选者的唯一标识
创建deploy后会启动三个Pod通过kubectl logs可以看到只有一个Pod成为主也就是向资源名称为leader-election的Endpoint写入了自身的Pod名称。然后通过代理(kubectl proxy)访问http://localhost:8001/api/v1/namespaces/default/pods/:4040/proxy/就会看到主的Pod名称。
知道了leader-election的大概原理也知道了上面的镜像可以直接实现主的选举那么如何使用呢
2.1 Sidecar
直接将上面的leader-elector镜像作为Sidecar将Pod名称作为候选者的唯一标识然后将Pod名称也注入到环境变量在业务进程起来后定时调用http://localhost:4040就可以获取主如果发现主的名称与自身的Pod的名称一致就执行业务逻辑否则一直等待。
2.2 SDK
使用Sidecar的好处是比较方便开发成本低不便的地方就是适用场景有限只能写入Endpoint资源。因此在某些场景下可以使用SDK直接基于leader-election库开发。
k8s-leader-election
创建一个Lease类型的锁(当然也可以是其他类型但是lease更加轻量)创建资源时需要指定资源的命名空间、名称、标识(这一批Pod都会该命名空间的资源写入自身的唯一标识)。然后调用leaderelection库中的RunOrDie()函数此时会指定
Lock资源锁将前面创建的Lease类型锁填入ReleaseOnCancelLeaseDuration租约时间RenewDeadlineleader刷新超时RetryPeriod刷新租约的时间间隔Callbacks指定成为leader时要执行的业务逻辑(OnStartedLeading)从leader变成非leader时要执行的逻辑(OnStoppedLeading)leader变更时要执行的逻辑(OnNewLeader)。
3 具体实现机制
// leaderelection/leaderelection.go
func (le *LeaderElector) Run(ctx context.Context) {defer runtime.HandleCrash()defer func() {le.config.Callbacks.OnStoppedLeading()}()// 申请资源锁有三种情形// 1 出错则返回falseRun()直接退出// 2 获取到锁了则返回true执行回调函数// 3 未获取到锁acquire()函数不会返回if !le.acquire(ctx) {return // ctx signalled done}ctx, cancel : context.WithCancel(ctx)defer cancel()// 申请成功后执行回调函数go le.config.Callbacks.OnStartedLeading(ctx)// 定时刷新租约le.renew(ctx)
}// 申请资源锁
func (le *LeaderElector) acquire(ctx context.Context) bool {ctx, cancel : context.WithCancel(ctx)defer cancel()succeeded : falsedesc : le.config.Lock.Describe()klog.Infof(attempting to acquire leader lease %v..., desc)// 每隔RetryPeriod去申请资源锁或者更新wait.JitterUntil(func() {succeeded le.tryAcquireOrRenew(ctx)le.maybeReportTransition()if !succeeded {// 没有获取到锁下一次再尝试klog.V(4).Infof(failed to acquire lease %v, desc)return}// 成功获取到锁则退出le.config.Lock.RecordEvent(became leader)le.metrics.leaderOn(le.config.Name)klog.Infof(successfully acquired lease %v, desc)cancel()}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())return succeeded
}func (le *LeaderElector) renew(ctx context.Context) {ctx, cancel : context.WithCancel(ctx)defer cancel()// 每隔RetryPeriod尝试更新租约wait.Until(func() {timeoutCtx, timeoutCancel : context.WithTimeout(ctx, le.config.RenewDeadline)defer timeoutCancel()err : wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {return le.tryAcquireOrRenew(timeoutCtx), nil}, timeoutCtx.Done())le.maybeReportTransition()desc : le.config.Lock.Describe()if err nil {klog.V(5).Infof(successfully renewed lease %v, desc)return}le.config.Lock.RecordEvent(stopped leading)le.metrics.leaderOff(le.config.Name)klog.Infof(failed to renew lease %v: %v, desc, err)cancel()}, le.config.RetryPeriod, ctx.Done())// if we hold the lease, give it upif le.config.ReleaseOnCancel {le.release()}
}// 尝试获取或者更新资源锁
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {now : metav1.Now()leaderElectionRecord : rl.LeaderElectionRecord{HolderIdentity: le.config.Lock.Identity(),LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),RenewTime: now,AcquireTime: now,}// 1 获取资源锁记录oldLeaderElectionRecord, oldLeaderElectionRawRecord, err : le.config.Lock.Get(ctx)if err ! nil {if !errors.IsNotFound(err) {klog.Errorf(error retrieving resource lock %v: %v, le.config.Lock.Describe(), err)return false}// 创建资源锁if err le.config.Lock.Create(ctx, leaderElectionRecord); err ! nil {klog.Errorf(error initially creating leader election record: %v, err)return false}le.setObservedRecord(leaderElectionRecord)return true}// 2 将资源锁记录与缓存的上一次的值进行对比// 如果当前不是leader并且资源锁没有过期则退出if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {le.setObservedRecord(oldLeaderElectionRecord)le.observedRawRecord oldLeaderElectionRawRecord}if len(oldLeaderElectionRecord.HolderIdentity) 0 le.observedTime.Add(le.config.LeaseDuration).After(now.Time) !le.IsLeader() {klog.V(4).Infof(lock is held by %v and has not yet expired, oldLeaderElectionRecord.HolderIdentity)return false}// 3. Were going to try to update. The leaderElectionRecord is set to its default// here. Lets correct it before updating.if le.IsLeader() {// 当前是leader锁资源未过期将之前的资源锁的数据填充到新的资源锁中(申请锁时间切换次数)leaderElectionRecord.AcquireTime oldLeaderElectionRecord.AcquireTimeleaderElectionRecord.LeaderTransitions oldLeaderElectionRecord.LeaderTransitions} else {// 当前不是leaderleaderElectionRecord.LeaderTransitions oldLeaderElectionRecord.LeaderTransitions 1}// 更新资源锁if err le.config.Lock.Update(ctx, leaderElectionRecord); err ! nil {klog.Errorf(Failed to update lock: %v, err)return false}le.setObservedRecord(leaderElectionRecord)return true
}