当前位置: 首页 > news >正文

网站建设信(信科网络)良品铺子网站建设

网站建设信(信科网络),良品铺子网站建设,网站开发一定要用框架吗,dede网站地图位置欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码)#xff1a;https://github.com/zq2599/blog_demos 本篇概览 本文是《client-go实战》系列的第十二篇#xff0c;又有一个精彩的知识点在本章呈现#xff1a;选主(leader-election)在解释什么是选主之前https://github.com/zq2599/blog_demos 本篇概览 本文是《client-go实战》系列的第十二篇又有一个精彩的知识点在本章呈现选主(leader-election)在解释什么是选主之前咱们先来看一个场景有真实适用场景的技术学起来才有动力如下图所示稍后有详细说明 上图所描述的业务场景是个普通的controller应用 右侧是人工操作通过kubectl命令修改了service资源左侧的业务应用订阅了service的变化在收到service变更的事件后对pod进行写操作例如将收到事件的时间写入pod的label 以上的业务应用就是个很普通的controller很简单运行起来也没啥问题但是如果这个业务应用有多个实例呢 多实例的问题 所谓多个实例就是同样的业务应用我们运行了多个进程例如三个为什么多个进程同一个应用运行多个进程不是很正常么横向扩容不就是多进程嘛多个进程运行的时候如果service发生变化那么每个进程都会去修改pod的label这不是我们想要的只要修改一次就行了所以如何解决这个问题呢三个进程都是同一套代码都会订阅service的变化但是最终只修改一次pod经验丰富的您应该会想到分布式锁三个进程去抢分布式锁抢到的负责更新没错这是一个正确的解法但是分布式锁需要引入相关组件吧redis的setnx或者mysql的乐观锁这样就需要维护新的组件了其实这在kubernetes是个很典型的问题毕竟pod多实例在kubernetes是常态了所以当然也有官方的解法页就是本文的主题选主(leader-election) 选主(leader-election) 说到这里您应该能理解选主的含义了多个进程竞争某个key的leader咱们可以把特定的代码放在竞争成功后再执行由于同一时刻只有一个进程可以竞争成功这就相当于在不引入额外组件的情况下只用client-go就实现了分布式锁由于选主只是个特定的小知识点本篇就没什么多余的理论要研究了接下来直接开始实战编码实现一个功能来说明选主的用法实战的业务需求如下 开发一个应用该应用同时运行多个进程当kubernetes的指定namespace下的service发生变化时在pod的label中记录这个service的变化时间每次serivce变化pod的label只能修改一次尽管此时有多个进程 让我们少些套路多一点真诚不说废话直接开始动手实战吧 源码下载 如果您不想编写代码也可以从GitHub上直接下载地址和链接信息如下表所示(https://github.com/zq2599/blog_demos) 名称链接备注项目主页https://github.com/zq2599/blog_demos该项目在GitHub上的主页git仓库地址(https)https://github.com/zq2599/blog_demos.git该项目源码的仓库地址https协议git仓库地址(ssh)gitgithub.com:zq2599/blog_demos.git该项目源码的仓库地址ssh协议 这个git项目中有多个文件夹本篇的源码在leader-tutorials文件夹下如下图黄框所示 提前了解选主的代码 接下来会开发一个完整的controller应用以此来说明选主功能如果您觉得完整应用的代码太多懒得看只想了解选主部分那就在此提前将整个工程中选主相关的代码贴出来核心代码如下所示先创建锁对象就像分布式锁一样总要有个key然后执行leaderelection.RunOrDie方法参与选主一旦有了结果OnNewLeader方法会被回调这时候通过自身id和leader的id比较就知道是不是自己了另外当OnStartedLeading被执行的时候就意味着当前进程就是leader并且可以立即开始执行只有leader才能做的事情了 // startLeaderElection 选主的核心逻辑代码 func startLeaderElection(ctx context.Context, clientset *kubernetes.Clientset, stop chan struct{}) {klog.Infof([%s]创建选主所需的锁对象, processIndentify)// 创建锁对象lock : resourcelock.LeaseLock{LeaseMeta: metav1.ObjectMeta{Name: leader-tutorials,Namespace: NAMESPACE,},Client: clientset.CoordinationV1(),LockConfig: resourcelock.ResourceLockConfig{Identity: processIndentify,},}klog.Infof([%s]开始选主, processIndentify)// 启动选主操作leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{Lock: lock,ReleaseOnCancel: true,LeaseDuration: 10 * time.Second,RenewDeadline: 5 * time.Second,RetryPeriod: 2 * time.Second,Callbacks: leaderelection.LeaderCallbacks{OnStartedLeading: func(ctx context.Context) {klog.Infof([%s]当前进程是leader只有leader才能执行的业务逻辑立即开始, processIndentify)// 在这里写入选主成功的代码// 就像抢分布式锁一样当前进程选举成功的时候这的代码就会被执行// 所以在这里填写抢锁成功的业务逻辑吧本例中就是监听service变化然后修改pod的labelCreateAndStartController(ctx, clientset, v1.Service{}, services, NAMESPACE, stop)},OnStoppedLeading: func() {// 失去了leader时的逻辑klog.Infof([%s]失去leader身份不再是leader了, processIndentify)os.Exit(0)},OnNewLeader: func(identity string) {// 收到通知知道最终的选举结果if identity processIndentify {klog.Infof([%s]选主结果出来了当前进程就是leader, processIndentify)// I just got the lockreturn}klog.Infof([%s]选主结果出来了leader是 : [%s], processIndentify, identity)},},}) }实战部署service和deployment 首先请准备好k8s环境这在《client-go实战之六:时隔两年刷新版本继续实战》里面已有详细说明然后把本次实战所需的service和deployment部署好- 所有要部署的内容我都集中在这个名为nginx-deployment-service.yaml脚本中了 --- apiVersion: apps/v1 kind: Deployment metadata:namespace: client-go-tutorialsname: nginx-deploymentlabels:app: nginx-apptype: front-end spec:replicas: 3selector:matchLabels:app: nginx-apptype: front-endtemplate:metadata:labels:app: nginx-apptype: front-end# 这是第一个业务自定义label指定了mysql的语言类型是c语言language: c# 这是第二个业务自定义label指定了这个pod属于哪一类服务nginx属于web类business-service-type: webspec:containers:- name: nginx-containerimage: nginx:latestresources:limits:cpu: 0.5memory: 128Mirequests:cpu: 0.1memory: 64Mi --- apiVersion: v1 kind: Service metadata:namespace: client-go-tutorialsname: nginx-service spec:type: NodePortselector:app: nginx-apptype: front-endports:- port: 80targetPort: 80nodePort: 30011先执行以下命令创建namespace kubectl create namespace client-go-tutorials再执行以下命令即可完成资源的创建 kubectl apply -f nginx-deployment-service.yaml来查看一下资源情况如下图service和pod都创建好了准备工作完成可以开始编码了 编码准备工程 执行命令名为go mod init leader-tutorials新建module确保您的goproxy是正常的执行命令go get k8s.io/client-gov0.22.8下载client-go的指定版本现在工程已经准备好了接着就是具体的编码 编码梳理 咱们按照开发顺序开始写代码如果您看过欣宸的《client-go实战》系列此刻对使用client-go开发简易版controller应该很熟悉了这里再简单提一下开发的流程 将controller完整的写出来功能是监听service一旦有变化就更新pod的label在主控逻辑中根据选主结果决定是否启动步骤1中的controller 下面开始写代码 编码controller 新建controller.go文件在controller.go中增加常量和数据结构的定义 package mainimport (contextencoding/jsonfmttimek8s.io/klog/v2metav1 k8s.io/apimachinery/pkg/apis/meta/v1k8s.io/apimachinery/pkg/fieldsobjectruntime k8s.io/apimachinery/pkg/runtimek8s.io/apimachinery/pkg/typesk8s.io/apimachinery/pkg/util/runtimek8s.io/apimachinery/pkg/util/waitk8s.io/client-go/kubernetesk8s.io/client-go/tools/cachek8s.io/client-go/util/workqueue )const (LABLE_SERVICE_UPDATE_TIME service-update-time // 这个label用来记录service的更新时间 )// 自定义controller数据结构嵌入了真实的控制器 type Controller struct {ctx context.Contextclientset *kubernetes.Clientset// 本地缓存关注的对象都会同步到这里indexer cache.Indexer// 消息队列用来触发对真实对象的处理事件queue workqueue.RateLimitingInterface// 实际运行运行的控制器informer cache.Controller }然后是controller的套路代码主要是从队列中不断获取数据并处理的逻辑 // processNextItem 不间断从队列中取得数据并处理 func (c *Controller) processNextItem() bool {// 注意队列里面不是对象而是key这是个阻塞队列会一直等待key, quit : c.queue.Get()if quit {return false}// Tell the queue that we are done with processing this key. This unblocks the key for other workers// This allows safe parallel processing because two pods with the same key are never processed in// parallel.defer c.queue.Done(key)// 注意这里的syncToStdout应该是业务代码处理对象变化的事件err : c.updatePodsLabel(key.(string))// 如果前面的业务逻辑遇到了错误就在此处理c.handleErr(err, key)// 外面的调用逻辑是返回true就继续调用processNextItem方法return true }// runWorker 这是个无限循环不断地从队列取出数据处理 func (c *Controller) runWorker() {for c.processNextItem() {} }// handleErr 如果前面的业务逻辑执行出现错误就在此集中处理错误本例中主要是重试次数的控制 func (c *Controller) handleErr(err error, key interface{}) {if err nil {// Forget about the #AddRateLimited history of the key on every successful synchronization.// This ensures that future processing of updates for this key is not delayed because of// an outdated error history.c.queue.Forget(key)return}// 如果重试次数未超过5次就继续重试if c.queue.NumRequeues(key) 5 {klog.Infof(Error syncing pod %v: %v, key, err)// Re-enqueue the key rate limited. Based on the rate limiter on the// queue and the re-enqueue history, the key will be processed later again.c.queue.AddRateLimited(key)return}// 代码走到这里意味着有错误并且重试超过了5次应该立即丢弃c.queue.Forget(key)// 这种连续五次重试还未成功的错误交给全局处理逻辑runtime.HandleError(err)klog.Infof(Dropping pod %q out of the queue: %v, key, err) }// Run 开始常规的控制器模式持续响应资源变化事件 func (c *Controller) Run(threadiness int, stopCh chan struct{}) {defer runtime.HandleCrash()// Let the workers stop when we are donedefer c.queue.ShutDown()klog.Info(Starting Pod controller)go c.informer.Run(stopCh)// Wait for all involved caches to be synced, before processing items from the queue is started// 刚开始启动从api-server一次性全量同步所有数据if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {runtime.HandleError(fmt.Errorf(timed out waiting for caches to sync))return}// 支持多个线程并行从队列中取得数据进行处理for i : 0; i threadiness; i {go wait.Until(c.runWorker, time.Second, stopCh)}-stopChklog.Info(Stopping Pod controller) } 从上述代码可见监听的资源发生变化时调用的是updatePodsLabel方法此方法的作用就是查找该namespace下的所有pod依次用patch的方式更新pod的label // updatePodsLabel 这是业务逻辑代码一旦service发生变化就修改pod的label将service的变化事件记录进去 func (c *Controller) updatePodsLabel(key string) error {// 开始进入controller的业务逻辑klog.Infof([%s]这里是controller的业务逻辑key [%s], processIndentify, key)// 从本地缓存中取出完整的对象_, exists, err : c.indexer.GetByKey(key)if err ! nil {klog.Errorf([%s]根据key[%s]从本地缓存获取对象失败 : %v, processIndentify, key, err)return err}if !exists {klog.Infof([%s]对象不存在key [%s]这是个删除事件, processIndentify, key)} else {klog.Infof([%s]对象存在key [%s]这是个新增或修改事件, processIndentify, key)}// 代码走到这里表示监听的对象发生了变化// 按照业务设定需要修改pod的指定label,// 准备好操作pod的接口podInterface : c.clientset.CoreV1().Pods(NAMESPACE)// 远程取得最新的pod列表pods, err : podInterface.List(c.ctx, metav1.ListOptions{})if err ! nil {klog.Errorf([%s]远程获取pod列表失败 : %v, processIndentify, err)return err}// 将service的变化时间写入pod的指定label这里先获取当前时间updateTime : time.Now().Format(20060102150405)// 准备patch对象patchData : map[string]interface{}{metadata: map[string]interface{}{labels: map[string]interface{}{LABLE_SERVICE_UPDATE_TIME: updateTime,},},}// 转为byte数组稍后更新pod的时候就用这个数组进行patch更新patchByte, _ : json.Marshal(patchData)// 遍历所有pod逐个更新labelfor _, pod : range pods.Items {podName : pod.Nameklog.Infof([%s]正在更新pod [%s], processIndentify, podName)_, err : podInterface.Patch(c.ctx, podName, types.MergePatchType, patchByte, metav1.PatchOptions{})// 失败就返回会导致整体重试if err ! nil {klog.Infof([%s]更新pod [%s]失败, %v, processIndentify, podName, err)return err}klog.Infof([%s]更新pod [%s]成功, processIndentify, podName)}return nil }到这里controller的代码已经写得七七八八了还剩创建controller对象以及运行informer的代码这里将它们集中封装在一个方法中一旦这个方法被调用就意味着controller会被创建然后监听service变化再更新pod的label的逻辑就会被执行 // CreateAndStartController 为了便于外部使用这里将controller的创建和启动封装在一起 func CreateAndStartController(ctx context.Context, clientset *kubernetes.Clientset, objType objectruntime.Object, resource string, namespace string, stopCh chan struct{}) {// ListWatcher用于获取数据并监听资源的事件podListWatcher : cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), resource, NAMESPACE, fields.Everything())// 限速队列里面存的是有事件发生的对象的身份信息而非对象本身queue : workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())// 创建本地缓存并对指定类型的资源开始监听// 注意如果业务上有必要其实可以将新增、修改、删除等事件放入不同队列然后分别做针对性处理// 但是controller对应的模式主要是让status与spec达成一致也就是说增删改等事件对应的都是查到实际情况令其与期望情况保持一致// 因此多数情况下增删改用一个队列即可里面放入变化的对象的身份至于处理方式只有一种查到实际情况令其与期望情况保持一致indexer, informer : cache.NewIndexerInformer(podListWatcher, objType, 0, cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {key, err : cache.MetaNamespaceKeyFunc(obj)if err nil {// 再次注意这里放入队列的并非对象而是对象的身份作用是仅仅告知消费方该对象有变化// 至于有什么变化需要消费方自行判断然后再做针对性处理queue.Add(key)}},UpdateFunc: func(old interface{}, new interface{}) {key, err : cache.MetaNamespaceKeyFunc(new)if err nil {queue.Add(key)}},DeleteFunc: func(obj interface{}) {key, err : cache.DeletionHandlingMetaNamespaceKeyFunc(obj)if err nil {queue.Add(key)}},}, cache.Indexers{})controller : Controller{ctx: ctx,clientset: clientset,informer: informer,indexer: indexer,queue: queue,}go controller.Run(1, stopCh) }编码主控程序选主逻辑也在里面 本文是讲选主(leader-election)的前面做了这么多铺垫主角该上场了新建main.go文件定义常量以及全局变量 package mainimport (contextflagospath/filepathtimegithub.com/google/uuidv1 k8s.io/api/core/v1metav1 k8s.io/apimachinery/pkg/apis/meta/v1k8s.io/client-go/kubernetesk8s.io/client-go/tools/clientcmdk8s.io/client-go/tools/leaderelectionk8s.io/client-go/tools/leaderelection/resourcelockk8s.io/client-go/util/homedirk8s.io/klog/v2 )const (NAMESPACE client-go-tutorials )// 用于表明当前进程身份的全局变量目前用的是uuid var processIndentify string先把套路的代码写了就是client-go初始化的那部分以及main方法里面是整个程序的启动和业务调用流程可见选主有关的代码都放在名为startLeaderElection的方法中 // initOrDie client有关的初始化操作 func initOrDie() *kubernetes.Clientset {klog.Infof([%s]开始初始化kubernetes客户端相关对象, processIndentify)var kubeconfig *stringvar master string// 试图取到当前账号的家目录if home : homedir.HomeDir(); home ! {// 如果能取到就把家目录下的.kube/config作为默认配置文件kubeconfig flag.String(kubeconfig, filepath.Join(home, .kube, config), (optional) absolute path to the kubeconfig file)master } else {// 如果取不到就没有默认配置文件必须通过kubeconfig参数来指定flag.StringVar(kubeconfig, kubeconfig, , absolute path to the kubeconfig file)flag.StringVar(master, master, , master url)flag.Parse()}config, err : clientcmd.BuildConfigFromFlags(master, *kubeconfig)if err ! nil {klog.Fatal(err)}clientset, err : kubernetes.NewForConfig(config)if err ! nil {klog.Fatal(err)}klog.Infof([%s]kubernetes客户端相关对象创建成功, processIndentify)return clientset }func main() {// 一次性确定当前进程身份processIndentify uuid.New().String()// 准备一个带cancel的context这样在主程序退出的时候可以将停止的信号传递给业务ctx, cancel : context.WithCancel(context.Background())// 这个是用来停止controller的stop : make(chan struct{})// 主程序结束的时候下面的操作可以将业务逻辑都停掉defer func() {close(stop)cancel()}()// 初始化clientSet配置因为是启动阶段所以必须初始化成功否则进程退出clientset : initOrDie()// 在一个新的协程中执行选主逻辑以及选主成功的后的逻辑go startLeaderElection(ctx, clientset, stop)// 这里可以继续做其他事情klog.Infof(选主的协程已经在运行接下来可以执行其他业务 [%s], processIndentify)select {} }最后是选主的代码如下所示先创建锁对象就像分布式锁一样总要有个key然后执行leaderelection.RunOrDie方法参与选主一旦有了结果OnNewLeader方法会被回调这时候通过自身id和leader的id比较就知道是不是自己了另外当OnStartedLeading被执行的时候就意味着当前进程就是leader并且可以立即开始执行只有leader才能做的事情了 // startLeaderElection 选主的核心逻辑代码 func startLeaderElection(ctx context.Context, clientset *kubernetes.Clientset, stop chan struct{}) {klog.Infof([%s]创建选主所需的锁对象, processIndentify)// 创建锁对象lock : resourcelock.LeaseLock{LeaseMeta: metav1.ObjectMeta{Name: leader-tutorials,Namespace: NAMESPACE,},Client: clientset.CoordinationV1(),LockConfig: resourcelock.ResourceLockConfig{Identity: processIndentify,},}klog.Infof([%s]开始选主, processIndentify)// 启动选主操作leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{Lock: lock,ReleaseOnCancel: true,LeaseDuration: 10 * time.Second,RenewDeadline: 5 * time.Second,RetryPeriod: 2 * time.Second,Callbacks: leaderelection.LeaderCallbacks{OnStartedLeading: func(ctx context.Context) {klog.Infof([%s]当前进程是leader只有leader才能执行的业务逻辑立即开始, processIndentify)// 在这里写入选主成功的代码// 就像抢分布式锁一样当前进程选举成功的时候这的代码就会被执行// 所以在这里填写抢锁成功的业务逻辑吧本例中就是监听service变化然后修改pod的labelCreateAndStartController(ctx, clientset, v1.Service{}, services, NAMESPACE, stop)},OnStoppedLeading: func() {// 失去了leader时的逻辑klog.Infof([%s]失去leader身份不再是leader了, processIndentify)os.Exit(0)},OnNewLeader: func(identity string) {// 收到通知知道最终的选举结果if identity processIndentify {klog.Infof([%s]选主结果出来了当前进程就是leader, processIndentify)// I just got the lockreturn}klog.Infof([%s]选主结果出来了leader是 : [%s], processIndentify, identity)},},}) }上述代码中请注意LeaderElectionConfig对象的几个重要字段例如LeaseDuration、RenewDeadline、RetryPeriod这些是和选主时候的续租、超时、重试相关需要按照您的实际网络情况进行调整现在代码写完了可以开始验证了 验证 这里捋一下验证的步骤 构建项目生产二进制文件执行此二进制文件启动三个进程观察日志应该有一个进程选举成功另外两个只会在日志输出选主结果修改service资源再去观察日志发现leader进程会输出日志再检查pod的label发现已经修改用ctrlC命令将leader进程退出可见另外两个进程会有一个成为新的leader再次修改service资源新的leader会负责更新pod的label 接下来开始操作 执行命令go build对当前工程进行编译构建得到二进制文件leader-tutorials打开三个终端窗口输入同样的命令./leader-tutorials选主成功的进程日志如下之前操作过的残留所以没有一开始就选主成功而是等了几秒后才成为leader一旦成为leader全量同步service会触发一次pod的更新操作 再去看另外两个进程的日志可见已经识别到leader的身份于是就没有执行controller的逻辑 现在去修改service用命令kubectl edit service nginx-service -n client-go-tutorials编辑我这里是给service增加了一个label如下图所示 此刻leader进程会监听到service变化下图黄色箭头以下的内容就是处理pod的日志 去看另外两个进程的日志不会有任何变化因为controller都没有执行以下命令查看pod的修改情况(注意pod的名字要从您自己的环境复制) kubectl describe pod nginx-deployment-78f6b696d9-cr47w -n client-go-tutorials可以看到pod的label有变化如下图黄色箭头所示这和上面的leader日志的时间是一致的 目前leader进程工作正常再来试试leader进程退出后的情况用ctrlC终止leader进程再去看另外两个进程的日志发现其中一个成功成为新的leader 验证完成都符合预期至此client-go的选主功能实战就完成了如果您在寻找kubernetes原生的分布式锁方案希望本篇能给您一些参考 你不孤单欣宸原创一路相伴 Java系列Spring系列Docker系列kubernetes系列数据库中间件系列DevOps系列
http://www.hkea.cn/news/14479371/

相关文章:

  • 网站后台管理系统模板htmldnsprefetch wordpress
  • 手机网站内容模块北京市住房和城乡建设部官方网站
  • 邵东网站建设莱芜地板街50一次
  • 做汽配找哪个网站好好品质高端网站设计推荐
  • 网站目录结构设计应注意的问题手机网站左右滑动
  • 深圳响应式网站开发如何进行电商网站设计
  • 佛山设计网站公司0797 网站制作
  • 博州住房和城乡建设局网站如何建造自己的网站
  • 网站提现功能怎么做多仓库版仓库管理网站建设源码
  • 网站搜索网站建设平台选用及分析
  • 桐城住房建设网站搜狗推广下架
  • 公网ip购买wordpress站内seo
  • 深圳网站设计 商城外网视频网站做泥声控
  • 动态的网站怎么做设计一个网站的优势
  • 目前旅游网站开发电商平台网站制作
  • 网站ftp地址查询房产网站有哪些
  • 台州最新消息今天最新动态上海快速优化排名
  • 网站建设A系列套餐报价推销商务网站的途径有哪些
  • 外贸企业网站策划给企业做网站推广好么?
  • 景区网站建设策划书哪些网站上推广比较好
  • 凡科建站网站建设做网页用的网站
  • 网站界面设计实训的意义企业网站制作公司合肥
  • 珠海网站建设珠海易推网seo网络优化前景怎么样
  • 券优惠网站如何做云主机怎么装网站
  • 大连建设工程信息网官网官网官免费网站建设seo
  • 网站备案编号小程序需不需要服务器
  • 哈尔滨响应式网站建设公司图片类网站模板
  • 网站开发技术课程设计说明书网页设计公司联系方式
  • 重庆城市建设集团官方网站手机网站建设事项
  • 大学做兼职英语作文网站点击下载app安装