当前位置: 首页 > news >正文

网站建设芜湖建站空间

网站建设芜湖,建站空间,网站百度收录怎么做,cms 官方网站Volcano Controller控制器源码解析 本文从源码的角度分析Volcano Controller相关功能的实现。 本篇Volcano版本为v1.8.0。 Volcano项目地址: https://github.com/volcano-sh/volcano controller命令main入口: cmd/controller-manager/main.go controller相关代码目录: pkg/co…Volcano Controller控制器源码解析 本文从源码的角度分析Volcano Controller相关功能的实现。 本篇Volcano版本为v1.8.0。 Volcano项目地址: https://github.com/volcano-sh/volcano controller命令main入口: cmd/controller-manager/main.go controller相关代码目录: pkg/controllers 更多文章访问: https://www.cyisme.top 整体实现并不复杂 而且项目比较简洁、风格一致与k8s controller代码风格也一致。可以作为学习开发k8s controller的一个参考。 代码风格 controller需要实现framework中interface的定义。 type Controller interface {Name() string// 初始化Initialize(opt *ControllerOption) error// 运行Run(stopCh -chan struct{}) }Initialize方法作为根据option初始化controller的入口 像infomer设置、queue设置、cache设置等都在这里完成。 func (jf *jobflowcontroller) Initialize(opt *framework.ControllerOption) error {// clientjf.kubeClient opt.KubeClientjf.vcClient opt.VolcanoClient// informerjf.jobFlowInformer informerfactory.NewSharedInformerFactory(jf.vcClient, 0).Flow().V1alpha1().JobFlows()jf.jobFlowSynced jf.jobFlowInformer.Informer().HasSyncedjf.jobFlowLister jf.jobFlowInformer.Lister()jf.jobFlowInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: jf.addJobFlow,UpdateFunc: jf.updateJobFlow,})// 参数jf.maxRequeueNum opt.MaxRequeueNumif jf.maxRequeueNum 0 {jf.maxRequeueNum -1}// queuejf.queue workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())// 入队的工具函数jf.enqueueJobFlow jf.enqueue// 处理队列中数据的处理函数jf.syncHandler jf.handleJobFlow// ... }Run方法会运行多个goroutine 执行操作 func (jf *jobflowcontroller) Run(stopCh -chan struct{}) {defer jf.queue.ShutDown()go jf.jobFlowInformer.Informer().Run(stopCh)go jf.jobTemplateInformer.Informer().Run(stopCh)go jf.jobInformer.Informer().Run(stopCh)cache.WaitForCacheSync(stopCh, jf.jobSynced, jf.jobFlowSynced, jf.jobTemplateSynced)// 使用 k8s pkg中的util 与k8s controller的风格一致go wait.Until(jf.worker, time.Second, stopCh)klog.Infof(JobFlowController is running ...... )-stopCh }worker会负责处理队列中的数据 交给handler处理。 vocalno中所有的controller外层都是这执行逻辑可能会有细微差别 具体的handler 是差异化的。所以后面的controller介绍也不会再提这一部分 会着重handler的实现。 func (jf *jobflowcontroller) worker() {// 代理一层for jf.processNextWorkItem() {} }func (jf *jobflowcontroller) processNextWorkItem() bool {// 获取数据obj, shutdown : jf.queue.Get()if shutdown {// Stop workingreturn false}defer jf.queue.Done(obj)req, ok : obj.(*apis.FlowRequest)if !ok {klog.Errorf(%v is not a valid queue request struct., obj)return true}// 具体处理handlererr : jf.syncHandler(req)jf.handleJobFlowErr(err, obj)return true }Queue Controller Queue Controler主要监听三个资源对象 QueuePodGroupCommand 控制器会监听他们的状态用以更新Queue资源的状态从而实现依据Queue资源的调度。 func (c *queuecontroller) Initialize(opt *framework.ControllerOption) error {// 省略部分代码queueInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: c.addQueue,UpdateFunc: c.updateQueue,DeleteFunc: c.deleteQueue,})pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{// 省略部分代码})if utilfeature.DefaultFeatureGate.Enabled(features.QueueCommandSync) {c.cmdInformer factory.Bus().V1alpha1().Commands()c.cmdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{FilterFunc: func(obj interface{}) bool {switch v : obj.(type) {case *busv1alpha1.Command:return IsQueueReference(v.TargetObject)default:return false}},Handler: cache.ResourceEventHandlerFuncs{AddFunc: c.addCommand,},})c.cmdLister c.cmdInformer.Lister()c.cmdSynced c.cmdInformer.Informer().HasSynced}// 省略部分代码 }监听到的消息会放到队列中, 队列是通过k8s pkg中的WorkQueue实现的。 type queuecontroller struct {// 省略部分代码// ...// queues that need to be updated.queue workqueue.RateLimitingInterfacecommandQueue workqueue.RateLimitingInterface// queue name - podgroup namespace/namepodGroups map[string]map[string]struct{}// 省略部分代码// ... }queuecontroller.queue接收apis.Request对象作为消息queuecontroller.commandQueue接收busv1alpha1.Command对象作为消息。 在经过queuecontroller.handlerCommand方法处理后 queuecontroller.commandQueue中的busv1alpha1.Command对象转换成apis.Request事件放到queuecontroller.queue中统一处理。 func (c *queuecontroller) handleCommand(cmd *busv1alpha1.Command) error {// 接受处理 删除commanderr : c.vcClient.BusV1alpha1().Commands(cmd.Namespace).Delete(context.TODO(), cmd.Name, metav1.DeleteOptions{})if err ! nil {// 省略部分代码}// command对象中会有ownerReference 从中提取queue对象名称req : apis.Request{QueueName: cmd.TargetObject.Name,// CommandIssuedEvent是内部事件类型 用户引发命令时 会触发该事件Event: busv1alpha1.CommandIssuedEvent,Action: busv1alpha1.Action(cmd.Action),}// 将command事件转换成request事件放到queue中c.enqueueQueue(req)return nil }queuecontroller.handleQueue是queuecontroller.queue的事件处理函数主要是根据request事件的类型调用不同的处理函数更新Queue资源的状态。 func (c *queuecontroller) handleQueue(req *apis.Request) error {// 这里的queue是k8s中的Queue资源对象queue, err : c.queueLister.Get(req.QueueName)if err ! nil {// 省略部分代码}// 根据queue当前的状态 生成不同执行器queueState : queuestate.NewState(queue)// 执行操作if err : queueState.Execute(req.Action); err ! nil {// 省略部分代码}return nil }Queue资源有4中状态QueueState 四种状态分别对应四种执行器 Open -- openStateClosed -- closedStateClosing -- closingStateUnknown -- unknownState 以closeState执行器为例代码实现如下其他的执行器实现类似不再举例 type closedState struct {queue *v1beta1.Queue } func (cs *closedState) Execute(action v1alpha1.Action) error {switch action {// 开启动作case v1alpha1.OpenQueueAction:return OpenQueue(cs.queue, func(status *v1beta1.QueueStatus, podGroupList []string) {status.State v1beta1.QueueStateOpen})// 关闭动作case v1alpha1.CloseQueueAction:return SyncQueue(cs.queue, func(status *v1beta1.QueueStatus, podGroupList []string) {status.State v1beta1.QueueStateClosed})// 默认动作default:return SyncQueue(cs.queue, func(status *v1beta1.QueueStatus, podGroupList []string) {specState : cs.queue.Status.Stateif specState v1beta1.QueueStateOpen {status.State v1beta1.QueueStateOpenreturn}if specState v1beta1.QueueStateClosed {status.State v1beta1.QueueStateClosedreturn}status.State v1beta1.QueueStateUnknown})} } Queue资源在volcano中有4种动作Action 执行器中将根据动作执行不同的操作 EnqueueJob 这个动作执行器中没有用到SyncQueue 这个动作执行器中执行默认操作OpenQueueCloseQueue 实际上 对应这三个动作会有三个处理函数,他们被定义为QueueActionFn类型 type QueueActionFn func(queue *v1beta1.Queue, fn UpdateQueueStatusFn) error因为Queue资源可以重复的Close或者Open, 所以其实执行器中并没有拦截或者限制这种操作 而是比较简单的对状态进行重置。 操作调用的函数如下 closedState和closingState状态执行器中 #mermaid-svg-dDu0i1WDlsqGkQT3 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-dDu0i1WDlsqGkQT3 .error-icon{fill:#552222;}#mermaid-svg-dDu0i1WDlsqGkQT3 .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-dDu0i1WDlsqGkQT3 .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-dDu0i1WDlsqGkQT3 .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-dDu0i1WDlsqGkQT3 .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-dDu0i1WDlsqGkQT3 .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-dDu0i1WDlsqGkQT3 .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-dDu0i1WDlsqGkQT3 .marker{fill:#333333;stroke:#333333;}#mermaid-svg-dDu0i1WDlsqGkQT3 .marker.cross{stroke:#333333;}#mermaid-svg-dDu0i1WDlsqGkQT3 svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-dDu0i1WDlsqGkQT3 .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-dDu0i1WDlsqGkQT3 .cluster-label text{fill:#333;}#mermaid-svg-dDu0i1WDlsqGkQT3 .cluster-label span{color:#333;}#mermaid-svg-dDu0i1WDlsqGkQT3 .label text,#mermaid-svg-dDu0i1WDlsqGkQT3 span{fill:#333;color:#333;}#mermaid-svg-dDu0i1WDlsqGkQT3 .node rect,#mermaid-svg-dDu0i1WDlsqGkQT3 .node circle,#mermaid-svg-dDu0i1WDlsqGkQT3 .node ellipse,#mermaid-svg-dDu0i1WDlsqGkQT3 .node polygon,#mermaid-svg-dDu0i1WDlsqGkQT3 .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-dDu0i1WDlsqGkQT3 .node .label{text-align:center;}#mermaid-svg-dDu0i1WDlsqGkQT3 .node.clickable{cursor:pointer;}#mermaid-svg-dDu0i1WDlsqGkQT3 .arrowheadPath{fill:#333333;}#mermaid-svg-dDu0i1WDlsqGkQT3 .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-dDu0i1WDlsqGkQT3 .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-dDu0i1WDlsqGkQT3 .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-dDu0i1WDlsqGkQT3 .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-dDu0i1WDlsqGkQT3 .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-dDu0i1WDlsqGkQT3 .cluster text{fill:#333;}#mermaid-svg-dDu0i1WDlsqGkQT3 .cluster span{color:#333;}#mermaid-svg-dDu0i1WDlsqGkQT3 div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-dDu0i1WDlsqGkQT3 :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} OpenQueueAction OpenQueue CloseQueueAction SyncQueue Other openState状态执行器中 #mermaid-svg-aR1EtNOZ67FHE82N {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-aR1EtNOZ67FHE82N .error-icon{fill:#552222;}#mermaid-svg-aR1EtNOZ67FHE82N .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-aR1EtNOZ67FHE82N .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-aR1EtNOZ67FHE82N .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-aR1EtNOZ67FHE82N .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-aR1EtNOZ67FHE82N .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-aR1EtNOZ67FHE82N .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-aR1EtNOZ67FHE82N .marker{fill:#333333;stroke:#333333;}#mermaid-svg-aR1EtNOZ67FHE82N .marker.cross{stroke:#333333;}#mermaid-svg-aR1EtNOZ67FHE82N svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-aR1EtNOZ67FHE82N .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-aR1EtNOZ67FHE82N .cluster-label text{fill:#333;}#mermaid-svg-aR1EtNOZ67FHE82N .cluster-label span{color:#333;}#mermaid-svg-aR1EtNOZ67FHE82N .label text,#mermaid-svg-aR1EtNOZ67FHE82N span{fill:#333;color:#333;}#mermaid-svg-aR1EtNOZ67FHE82N .node rect,#mermaid-svg-aR1EtNOZ67FHE82N .node circle,#mermaid-svg-aR1EtNOZ67FHE82N .node ellipse,#mermaid-svg-aR1EtNOZ67FHE82N .node polygon,#mermaid-svg-aR1EtNOZ67FHE82N .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-aR1EtNOZ67FHE82N .node .label{text-align:center;}#mermaid-svg-aR1EtNOZ67FHE82N .node.clickable{cursor:pointer;}#mermaid-svg-aR1EtNOZ67FHE82N .arrowheadPath{fill:#333333;}#mermaid-svg-aR1EtNOZ67FHE82N .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-aR1EtNOZ67FHE82N .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-aR1EtNOZ67FHE82N .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-aR1EtNOZ67FHE82N .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-aR1EtNOZ67FHE82N .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-aR1EtNOZ67FHE82N .cluster text{fill:#333;}#mermaid-svg-aR1EtNOZ67FHE82N .cluster span{color:#333;}#mermaid-svg-aR1EtNOZ67FHE82N div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-aR1EtNOZ67FHE82N :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} OpenQueueAction SyncQueue CloseQueueAction CloseQueue Other unknownState状态执行器中 #mermaid-svg-uVGvkAqTVR95vRIq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-uVGvkAqTVR95vRIq .error-icon{fill:#552222;}#mermaid-svg-uVGvkAqTVR95vRIq .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-uVGvkAqTVR95vRIq .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-uVGvkAqTVR95vRIq .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-uVGvkAqTVR95vRIq .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-uVGvkAqTVR95vRIq .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-uVGvkAqTVR95vRIq .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-uVGvkAqTVR95vRIq .marker{fill:#333333;stroke:#333333;}#mermaid-svg-uVGvkAqTVR95vRIq .marker.cross{stroke:#333333;}#mermaid-svg-uVGvkAqTVR95vRIq svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-uVGvkAqTVR95vRIq .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-uVGvkAqTVR95vRIq .cluster-label text{fill:#333;}#mermaid-svg-uVGvkAqTVR95vRIq .cluster-label span{color:#333;}#mermaid-svg-uVGvkAqTVR95vRIq .label text,#mermaid-svg-uVGvkAqTVR95vRIq span{fill:#333;color:#333;}#mermaid-svg-uVGvkAqTVR95vRIq .node rect,#mermaid-svg-uVGvkAqTVR95vRIq .node circle,#mermaid-svg-uVGvkAqTVR95vRIq .node ellipse,#mermaid-svg-uVGvkAqTVR95vRIq .node polygon,#mermaid-svg-uVGvkAqTVR95vRIq .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-uVGvkAqTVR95vRIq .node .label{text-align:center;}#mermaid-svg-uVGvkAqTVR95vRIq .node.clickable{cursor:pointer;}#mermaid-svg-uVGvkAqTVR95vRIq .arrowheadPath{fill:#333333;}#mermaid-svg-uVGvkAqTVR95vRIq .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-uVGvkAqTVR95vRIq .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-uVGvkAqTVR95vRIq .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-uVGvkAqTVR95vRIq .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-uVGvkAqTVR95vRIq .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-uVGvkAqTVR95vRIq .cluster text{fill:#333;}#mermaid-svg-uVGvkAqTVR95vRIq .cluster span{color:#333;}#mermaid-svg-uVGvkAqTVR95vRIq div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-uVGvkAqTVR95vRIq :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} OpenQueueAction OpenQueue CloseQueueAction CloseQueue Other SyncQueue 可以看出 执行逻辑 如果当前状态与预期状态一致 则调用SyncQueue同步状态如果当前状态与预期状态不一致 则调用OpenQueue或者CloseQueue更新状态如果状态未知 则调用SyncQueue同步状态 然后来看一下具体的函数实现 // syncQueue主要是更新queue中podgroup的状态计数 func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateFn state.UpdateQueueStatusFn) error {// 获取queue中的podgrouppodGroups : c.getPodGroups(queue.Name)queueStatus : schedulingv1beta1.QueueStatus{}for _, pgKey : range podGroups {// 获取podgroup对象pg, err : c.pgLister.PodGroups(ns).Get(name)// 更新计数器switch pg.Status.Phase {case schedulingv1beta1.PodGroupPending:queueStatus.Pendingcase schedulingv1beta1.PodGroupRunning:queueStatus.Runningcase schedulingv1beta1.PodGroupUnknown:queueStatus.Unknowncase schedulingv1beta1.PodGroupInqueue:queueStatus.Inqueue}}// updateStateFn是在执行器中定义的函数 用于更新queue的状态if updateStateFn ! nil {updateStateFn(queueStatus, podGroups)} else {queueStatus.State queue.Status.State}// 省略部分代码// ...// 调用api更新queue的状态if _, err : c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}); err ! nil {}return nil } func (c *queuecontroller) openQueue(queue *schedulingv1beta1.Queue, updateStateFn state.UpdateQueueStatusFn) error {newQueue : queue.DeepCopy()newQueue.Status.State schedulingv1beta1.QueueStateOpen// 这里调用Update没有看懂 copy出来的对应应该除了状态其他的都是一样的// 而Update方法是更新对象 而不是更新状态if queue.Status.State ! newQueue.Status.State {if _, err : c.vcClient.SchedulingV1beta1().Queues().Update(context.TODO(), newQueue, metav1.UpdateOptions{}); err ! nil {c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.OpenQueueAction),fmt.Sprintf(Open queue failed for %v, err))return err}c.recorder.Event(newQueue, v1.EventTypeNormal, string(v1alpha1.OpenQueueAction), Open queue succeed)} else {return nil}// 获取queue对象q, err : c.vcClient.SchedulingV1beta1().Queues().Get(context.TODO(), newQueue.Name, metav1.GetOptions{})newQueue q.DeepCopy()// 执行操作if updateStateFn ! nil {updateStateFn(newQueue.Status, nil)} else {return fmt.Errorf(internal error, update state function should be provided)}// 调用api更新queue的状态if queue.Status.State ! newQueue.Status.State {if _, err : c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}); err ! nil {}}return nil } // closeQueue与之类似 不再举例PodGroup Controller PodGroup Controller比较简单 它负责为未指定PodGroup的Pod分配PodGroup。 func (pg *pgcontroller) processNextReq() bool {// 省略部分代码// 获取pod对象pod, err : pg.podLister.Pods(req.podNamespace).Get(req.podName)// 根据调度器名称过滤if !commonutil.Contains(pg.schedulerNames, pod.Spec.SchedulerName) {return true}// 如果pod已经有podgroup 则不再处理if pod.Annotations ! nil pod.Annotations[scheduling.KubeGroupNameAnnotationKey] ! {return true}// 为pod分配podgroupif err : pg.createNormalPodPGIfNotExist(pod); err ! nil {// AddRateLimited将在一段时间后重新添加req到队列中pg.queue.AddRateLimited(req)return true}// 省略部分代码 } func (pg *pgcontroller) createNormalPodPGIfNotExist(pod *v1.Pod) error {// pgname将以”podgroup-“开头pgName : helpers.GeneratePodgroupName(pod)if _, err : pg.pgLister.PodGroups(pod.Namespace).Get(pgName); err ! nil {// podgroup不存在 则创建if !apierrors.IsNotFound(err) {return err}// 省略了一些从pod中继承赋值的代码obj : scheduling.PodGroup{ObjectMeta: metav1.ObjectMeta{// podgroup的ownerReference是podOwnerReferences: newPGOwnerReferences(pod),},Spec: scheduling.PodGroupSpec{// 最小成员数为1MinMember: 1,},Status: scheduling.PodGroupStatus{// 状态为pendingPhase: scheduling.PodGroupPending,},}// 继承pod的owner信息写入到annotationspg.inheritUpperAnnotations(pod, obj)// 继承pod annotationsif queueName, ok : pod.Annotations[scheduling.QueueNameAnnotationKey]; ok {obj.Spec.Queue queueName}// 省略annotations继承的代码// ...// 创建podgroupif _, err : pg.vcClient.SchedulingV1beta1().PodGroups(pod.Namespace).Create(context.TODO(), obj, metav1.CreateOptions{}); err ! nil {}}// 如果存在pg则更新pod的annotationsreturn pg.updatePodAnnotations(pod, pgName) }JobFlow Controller JobFlow是在volcano 1.8之后引入的CRD对象 它配合JobTemplate使用用于vcjob任务的编排。 JobFlow Controller主要监听JobFlow和Job两个对象的变化 并更新JobFlow的状态。 func (jf *jobflowcontroller) Initialize(opt *framework.ControllerOption) error {// ...jf.jobFlowInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: jf.addJobFlow,UpdateFunc: jf.updateJobFlow,})jf.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{UpdateFunc: jf.updateJob,})// 省略部分代码 }Job更新时会判断是否属于JobFlow 如果是则将关联的JobFlow加入到队列。 func (jf *jobflowcontroller) updateJob(oldObj, newObj interface{}) {// ...if newJob.ResourceVersion oldJob.ResourceVersion {return}jobFlowName : getJobFlowNameByJob(newJob)if jobFlowName {return}req : apis.FlowRequest{Namespace: newJob.Namespace,JobFlowName: jobFlowName,Action: jobflowv1alpha1.SyncJobFlowAction,Event: jobflowv1alpha1.OutOfSyncEvent,}jf.queue.Add(req) }放入队列的apis.FlowRequest对象最终会由handleJobFlow函数处理 然后根据当前JobFlow的状态生成并调用不同执行器。这里的运行逻辑和Queue的差不多 func (jf *jobflowcontroller) handleJobFlow(req *apis.FlowRequest) error {// 省略部分代码// ...jobflow, err : jf.jobFlowLister.JobFlows(req.Namespace).Get(req.JobFlowName)// 根据jobflow的状态 生成不同的执行器jobFlowState : jobflowstate.NewState(jobflow)if err : jobFlowState.Execute(req.Action); err ! nil {}return nil }JobFlow有5种状态(Flow Phase), 分别对应5种执行器: Succeed -- succeedStateTerminating -- terminatingState (这个状态的执行器并没有实际动作因为资源即将释放)Failed -- failedState (这个状态的执行器并没有实际动作因为状态异常)Running -- runningStatePending -- pendingState JobFlow目前只有1种动作SyncJobFlow(Action) 由SyncJobFlow函数执行具体操作。 func (jf *jobflowcontroller) syncJobFlow(jobFlow *v1alpha1flow.JobFlow, updateStateFn state.UpdateJobFlowStatusFn) error {// ...// 如果当前jobflow的状态为succeed 且job的保留策略为delete, 则删除所有由jobflow创建的jobif jobFlow.Spec.JobRetainPolicy v1alpha1flow.Delete jobFlow.Status.State.Phase v1alpha1flow.Succeed {if err : jf.deleteAllJobsCreatedByJobFlow(jobFlow); err ! nil {}return nil}// 根据jobflow中声明的jobtemplate创建job 声明顺序即为创建顺序if err : jf.deployJob(jobFlow); err ! nil {}// 获取jobflow下所有job的状态jobFlowStatus, err : jf.getAllJobStatus(jobFlow)if err ! nil {return err}// 更新jobflow的状态jobFlow.Status *jobFlowStatusupdateStateFn(jobFlow.Status, len(jobFlow.Spec.Flows))_, err jf.vcClient.FlowV1alpha1().JobFlows(jobFlow.Namespace).UpdateStatus(context.Background(), jobFlow, metav1.UpdateOptions{})return nil } func (jf *jobflowcontroller) deployJob(jobFlow *v1alpha1flow.JobFlow) error {for _, flow : range jobFlow.Spec.Flows {jobName : getJobName(jobFlow.Name, flow.Name)if _, err : jf.jobLister.Jobs(jobFlow.Namespace).Get(jobName); err ! nil {if errors.IsNotFound(err) {// 如果job没有依赖 则直接创建if flow.DependsOn nil || flow.DependsOn.Targets nil {// createJob根据jobtemplat创建job// 创建已经存在的job 不会报错if err : jf.createJob(jobFlow, flow); err ! nil {return err}} else {// 有依赖则判断依赖的job是否已经完成// 任何一个依赖的job未完成都不会创建flag, err : jf.judge(jobFlow, flow)if flag {if err : jf.createJob(jobFlow, flow); err ! nil {return err}}}continue}return err}}return nil }Job Controller Job是volcano中的核心资源对象 为了避免与k8s中的Job对象混淆 也会称之为vcjob或者vj。 Job Controller监听多个资源对象的变更事件: func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error {// ...cc.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: cc.addJob,UpdateFunc: cc.updateJob,DeleteFunc: cc.deleteJob,})cc.cmdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{FilterFunc: func(obj interface{}) bool {switch v : obj.(type) {case *busv1alpha1.Command:if v.TargetObject ! nil v.TargetObject.APIVersion batchv1alpha1.SchemeGroupVersion.String() v.TargetObject.Kind Job {return true}return falsedefault:return false}},Handler: cache.ResourceEventHandlerFuncs{AddFunc: cc.addCommand,},},)cc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: cc.addPod,UpdateFunc: cc.updatePod,DeleteFunc: cc.deletePod,})cc.pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{UpdateFunc: cc.updatePodGroup,})// ... }vcjob的处理量会比较大 所以Job Controller会启动多个worker来处理事件, 每个worker会有属于自己的queue。 func (cc *jobcontroller) Run(stopCh -chan struct{}) {// ...// commandQueue是用于处理busv1alpha1.Command对象的队列// 与Queue Controller中类似 最终会转换成apis.Request对象 放入queue中go wait.Until(cc.handleCommands, 0, stopCh)var i uint32// 启动多个workerfor i 0; i cc.workers; i {go func(num uint32) {wait.Until(func() {cc.worker(num)},time.Second,stopCh)}(i)}// cache用于缓存资源状态go cc.cache.Run(stopCh)// 处理错误taskgo wait.Until(cc.processResyncTask, 0, stopCh)// ... }新的事件会通过getWokerQueue函数来获取对应的queue 然后放入队列中。 func (cc *jobcontroller) getWorkerQueue(key string) workqueue.RateLimitingInterface {// ...hashVal fnv.New32()hashVal.Write([]byte(key))val hashVal.Sum32()// 通过hash值取模来获取queuequeue : cc.queueList[val%cc.workers]return queue }command事件转换成request事件的过程与Queue Controller类似 这里不再赘述。 queue中的数据处理是由processNextReq函数接收的。 func (cc *jobcontroller) processNextReq(count uint32) bool {// 获取queue, queue的数量与worker数量相同并一一对应queue : cc.queueList[count]req : obj.(apis.Request)key : jobcache.JobKeyByReq(req)if !cc.belongsToThisRoutine(key, count) {// 这里做了校验 如果key不属于当前worker 则重新放入queue中queueLocal : cc.getWorkerQueue(key)queueLocal.Add(req)return true}jobInfo, err : cc.cache.Get(key)// state.NewState 这个名字见过很多次了 用于生成执行器st : state.NewState(jobInfo)if st nil {return true}// 获取当前需要执行的动作action : applyPolicies(jobInfo.Job, req)// 非同步动作 记录事件if action ! busv1alpha1.SyncJobAction {cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf(Start to execute action %s , action))}// 执行动作if err : st.Execute(action); err ! nil {// 如果执行失败 则根据重试次数 决定是否重新放入queue中。// maxRequeueNum -1 表示无限重试if cc.maxRequeueNum -1 || queue.NumRequeues(req) cc.maxRequeueNum {queue.AddRateLimited(req)return true}}// 如果执行成功 则删除queue中的事件queue.Forget(req)return true }vcjob有10种状态JobPhase 对应8种执行器 Pending -- pendingStateAborting -- abortingStateAborted -- abortedStateRunning -- runningStateRestarting -- restartingStateCompleting -- completingStateTerminating -- terminatingStateTerminated、Failed、Completed -- terminatedState 以abortedState为例 代码实现如下 func (as *abortedState) Execute(action v1alpha1.Action) error {switch action {case v1alpha1.ResumeJobAction:return KillJob(as.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool {status.State.Phase vcbatch.Restartingstatus.RetryCountreturn true})default:return KillJob(as.job, PodRetainPhaseSoft, nil)} }vcjob有11种动作Action 执行器中将根据动作执行不同的操作 AbortJob 如果设置此操作整个工作将被中止;所有作业的Pod都将被驱逐并且不会重新创建任何PodRestartJob 如果设置了此操作整个作业将重新启动RestartTask 如果设置此操作则仅重新启动任务默认操作。TerminateJob 如果设置了此操作整个工作将被终止并且无法恢复;所有作业的Pod都将被驱逐并且不会重新创建任何Pod。CompleteJob 如果设置此操作未完成的pod将被杀死作业完成。ResumeJob 恢复中止的工作。SyncJob 同步Job/Pod状态的操作。内部动作EnqueueJob 同步作业入队状态的操作。(内部动作)SyncQueue 同步队列状态的操作。内部动作OpenQueue 打开队列的操作。内部动作CloseQueue 关闭队列的操作。内部动作 实际上 对应这些动作会有不同的处理函数,他们被定义为ActionFn类型和KillActionFn类型。 这两个类型被声明为SyncJob和KillJob的函数并被执行器调用。 type ActionFn func(job *apis.JobInfo, fn UpdateStatusFn) error type KillActionFn func(job *apis.JobInfo, podRetainPhase PhaseMap, fn UpdateStatusFn) error var (// SyncJob将根据Job的规范创建或删除Pod。SyncJob ActionFn// KillJob 将杀死状态不在podRetainPhase中的pod.KillJob KillActionFn )操作调用的函数如下(虽然不同动作调用的操作可能相同 但是会更新不同的状态信息) pendingState和runningState状态执行器中: #mermaid-svg-9flh94xkJCuA3ix3 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-9flh94xkJCuA3ix3 .error-icon{fill:#552222;}#mermaid-svg-9flh94xkJCuA3ix3 .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-9flh94xkJCuA3ix3 .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-9flh94xkJCuA3ix3 .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-9flh94xkJCuA3ix3 .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-9flh94xkJCuA3ix3 .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-9flh94xkJCuA3ix3 .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-9flh94xkJCuA3ix3 .marker{fill:#333333;stroke:#333333;}#mermaid-svg-9flh94xkJCuA3ix3 .marker.cross{stroke:#333333;}#mermaid-svg-9flh94xkJCuA3ix3 svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-9flh94xkJCuA3ix3 .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-9flh94xkJCuA3ix3 .cluster-label text{fill:#333;}#mermaid-svg-9flh94xkJCuA3ix3 .cluster-label span{color:#333;}#mermaid-svg-9flh94xkJCuA3ix3 .label text,#mermaid-svg-9flh94xkJCuA3ix3 span{fill:#333;color:#333;}#mermaid-svg-9flh94xkJCuA3ix3 .node rect,#mermaid-svg-9flh94xkJCuA3ix3 .node circle,#mermaid-svg-9flh94xkJCuA3ix3 .node ellipse,#mermaid-svg-9flh94xkJCuA3ix3 .node polygon,#mermaid-svg-9flh94xkJCuA3ix3 .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-9flh94xkJCuA3ix3 .node .label{text-align:center;}#mermaid-svg-9flh94xkJCuA3ix3 .node.clickable{cursor:pointer;}#mermaid-svg-9flh94xkJCuA3ix3 .arrowheadPath{fill:#333333;}#mermaid-svg-9flh94xkJCuA3ix3 .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-9flh94xkJCuA3ix3 .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-9flh94xkJCuA3ix3 .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-9flh94xkJCuA3ix3 .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-9flh94xkJCuA3ix3 .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-9flh94xkJCuA3ix3 .cluster text{fill:#333;}#mermaid-svg-9flh94xkJCuA3ix3 .cluster span{color:#333;}#mermaid-svg-9flh94xkJCuA3ix3 div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-9flh94xkJCuA3ix3 :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} RestartJobAction KillJob AbortJobAction CompleteJobAction TerminateJobAction Other SyncJob restartingState状态执行器中, 直接调用KillJob。finishedState为最终状态 所以不会执行任何动作。terminatingState 直接调用KillJob。abortingState和abortedState状态执行器中: #mermaid-svg-SbnPVzuQwuVf0rub {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-SbnPVzuQwuVf0rub .error-icon{fill:#552222;}#mermaid-svg-SbnPVzuQwuVf0rub .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-SbnPVzuQwuVf0rub .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-SbnPVzuQwuVf0rub .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-SbnPVzuQwuVf0rub .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-SbnPVzuQwuVf0rub .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-SbnPVzuQwuVf0rub .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-SbnPVzuQwuVf0rub .marker{fill:#333333;stroke:#333333;}#mermaid-svg-SbnPVzuQwuVf0rub .marker.cross{stroke:#333333;}#mermaid-svg-SbnPVzuQwuVf0rub svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-SbnPVzuQwuVf0rub .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-SbnPVzuQwuVf0rub .cluster-label text{fill:#333;}#mermaid-svg-SbnPVzuQwuVf0rub .cluster-label span{color:#333;}#mermaid-svg-SbnPVzuQwuVf0rub .label text,#mermaid-svg-SbnPVzuQwuVf0rub span{fill:#333;color:#333;}#mermaid-svg-SbnPVzuQwuVf0rub .node rect,#mermaid-svg-SbnPVzuQwuVf0rub .node circle,#mermaid-svg-SbnPVzuQwuVf0rub .node ellipse,#mermaid-svg-SbnPVzuQwuVf0rub .node polygon,#mermaid-svg-SbnPVzuQwuVf0rub .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-SbnPVzuQwuVf0rub .node .label{text-align:center;}#mermaid-svg-SbnPVzuQwuVf0rub .node.clickable{cursor:pointer;}#mermaid-svg-SbnPVzuQwuVf0rub .arrowheadPath{fill:#333333;}#mermaid-svg-SbnPVzuQwuVf0rub .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-SbnPVzuQwuVf0rub .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-SbnPVzuQwuVf0rub .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-SbnPVzuQwuVf0rub .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-SbnPVzuQwuVf0rub .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-SbnPVzuQwuVf0rub .cluster text{fill:#333;}#mermaid-svg-SbnPVzuQwuVf0rub .cluster span{color:#333;}#mermaid-svg-SbnPVzuQwuVf0rub div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-SbnPVzuQwuVf0rub :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} ResumeJobAction KillJob Other completingState直接调用KillJob。 可以看出 执行逻辑 如果是干预vcjob状态的动作 则调用KillJob。反之 则调用SyncJob。 然后来看一下具体实现函数。 killJob killJob对应删除pod的操作。 #mermaid-svg-uyOSS4r22YR0vtbW {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-uyOSS4r22YR0vtbW .error-icon{fill:#552222;}#mermaid-svg-uyOSS4r22YR0vtbW .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-uyOSS4r22YR0vtbW .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-uyOSS4r22YR0vtbW .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-uyOSS4r22YR0vtbW .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-uyOSS4r22YR0vtbW .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-uyOSS4r22YR0vtbW .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-uyOSS4r22YR0vtbW .marker{fill:#333333;stroke:#333333;}#mermaid-svg-uyOSS4r22YR0vtbW .marker.cross{stroke:#333333;}#mermaid-svg-uyOSS4r22YR0vtbW svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-uyOSS4r22YR0vtbW .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-uyOSS4r22YR0vtbW .cluster-label text{fill:#333;}#mermaid-svg-uyOSS4r22YR0vtbW .cluster-label span{color:#333;}#mermaid-svg-uyOSS4r22YR0vtbW .label text,#mermaid-svg-uyOSS4r22YR0vtbW span{fill:#333;color:#333;}#mermaid-svg-uyOSS4r22YR0vtbW .node rect,#mermaid-svg-uyOSS4r22YR0vtbW .node circle,#mermaid-svg-uyOSS4r22YR0vtbW .node ellipse,#mermaid-svg-uyOSS4r22YR0vtbW .node polygon,#mermaid-svg-uyOSS4r22YR0vtbW .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-uyOSS4r22YR0vtbW .node .label{text-align:center;}#mermaid-svg-uyOSS4r22YR0vtbW .node.clickable{cursor:pointer;}#mermaid-svg-uyOSS4r22YR0vtbW .arrowheadPath{fill:#333333;}#mermaid-svg-uyOSS4r22YR0vtbW .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-uyOSS4r22YR0vtbW .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-uyOSS4r22YR0vtbW .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-uyOSS4r22YR0vtbW .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-uyOSS4r22YR0vtbW .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-uyOSS4r22YR0vtbW .cluster text{fill:#333;}#mermaid-svg-uyOSS4r22YR0vtbW .cluster span{color:#333;}#mermaid-svg-uyOSS4r22YR0vtbW div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-uyOSS4r22YR0vtbW :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 遍历pod执行动作 设置job状态 执行删除插件 更新job状态 删除podgroup func (cc *jobcontroller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseMap, updateStatus state.UpdateStatusFn) error {// job已经处于删除状态 则不再处理if job.DeletionTimestamp ! nil {return nil}// 状态计数器 用于更新job的状态var pending, running, terminating, succeeded, failed, unknown int32taskStatusCount : make(map[string]batch.TaskState)for _, pods : range jobInfo.Pods {for _, pod : range pods {totalif pod.DeletionTimestamp ! nil {// pod处于删除状态 则不再处理continue}maxRetry : job.Spec.MaxRetrylastRetry : false// 判断是否是最后一次重试if job.Status.RetryCount maxRetry-1 {lastRetry true}// 如果是最后一次重试 则保留失败和成功的podretainPhase : podRetainPhaseif lastRetry {// var PodRetainPhaseSoft PhaseMap{// v1.PodSucceeded: {},// v1.PodFailed: {},// }retainPhase state.PodRetainPhaseSoft}_, retain : retainPhase[pod.Status.Phase]// 如果不保留pod 则删除podif !retain {err : cc.deleteJobPod(job.Name, pod)if err nil {terminatingcontinue}// 失败放入重试队列errs append(errs, err)cc.resyncTask(pod)}// 更新状态计数器classifyAndAddUpPodBaseOnPhase(pod, pending, running, succeeded, failed, unknown)calcPodStatus(pod, taskStatusCount)}}if len(errs) ! 0 {return fmt.Errorf(failed to kill %d pods of %d, len(errs), total)}// 更新job的状态计数job job.DeepCopy()job.Status.Versionjob.Status.Pending pendingjob.Status.Running runningjob.Status.Succeeded succeededjob.Status.Failed failedjob.Status.Terminating terminatingjob.Status.Unknown unknownjob.Status.TaskStatusCount taskStatusCount// 更新运行持续时间job.Status.RunningDuration metav1.Duration{Duration: time.Since(jobInfo.Job.CreationTimestamp.Time)}// 更新job的状态if updateStatus ! nil {if updateStatus(job.Status) {job.Status.State.LastTransitionTime metav1.Now()jobCondition : newCondition(job.Status.State.Phase, job.Status.State.LastTransitionTime)job.Status.Conditions append(job.Status.Conditions, jobCondition)}}// 执行删除插件if err : cc.pluginOnJobDelete(job); err ! nil {return err}// 调用api更新job的状态newJob, err : cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})if err ! nil {return err}if e : cc.cache.Update(newJob); e ! nil {return e}// 删除podgrouppgName : job.Name - string(job.UID)if err : cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Delete(context.TODO(), pgName, metav1.DeleteOptions{}); err ! nil {if !apierrors.IsNotFound(err) {return err}}return nil }syncJob syncJob对应创建pod的操作。 #mermaid-svg-N5YZ7piYMeVXOhqr {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-N5YZ7piYMeVXOhqr .error-icon{fill:#552222;}#mermaid-svg-N5YZ7piYMeVXOhqr .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-N5YZ7piYMeVXOhqr .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-N5YZ7piYMeVXOhqr .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-N5YZ7piYMeVXOhqr .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-N5YZ7piYMeVXOhqr .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-N5YZ7piYMeVXOhqr .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-N5YZ7piYMeVXOhqr .marker{fill:#333333;stroke:#333333;}#mermaid-svg-N5YZ7piYMeVXOhqr .marker.cross{stroke:#333333;}#mermaid-svg-N5YZ7piYMeVXOhqr svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-N5YZ7piYMeVXOhqr .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-N5YZ7piYMeVXOhqr .cluster-label text{fill:#333;}#mermaid-svg-N5YZ7piYMeVXOhqr .cluster-label span{color:#333;}#mermaid-svg-N5YZ7piYMeVXOhqr .label text,#mermaid-svg-N5YZ7piYMeVXOhqr span{fill:#333;color:#333;}#mermaid-svg-N5YZ7piYMeVXOhqr .node rect,#mermaid-svg-N5YZ7piYMeVXOhqr .node circle,#mermaid-svg-N5YZ7piYMeVXOhqr .node ellipse,#mermaid-svg-N5YZ7piYMeVXOhqr .node polygon,#mermaid-svg-N5YZ7piYMeVXOhqr .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-N5YZ7piYMeVXOhqr .node .label{text-align:center;}#mermaid-svg-N5YZ7piYMeVXOhqr .node.clickable{cursor:pointer;}#mermaid-svg-N5YZ7piYMeVXOhqr .arrowheadPath{fill:#333333;}#mermaid-svg-N5YZ7piYMeVXOhqr .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-N5YZ7piYMeVXOhqr .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-N5YZ7piYMeVXOhqr .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-N5YZ7piYMeVXOhqr .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-N5YZ7piYMeVXOhqr .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-N5YZ7piYMeVXOhqr .cluster text{fill:#333;}#mermaid-svg-N5YZ7piYMeVXOhqr .cluster span{color:#333;}#mermaid-svg-N5YZ7piYMeVXOhqr div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-N5YZ7piYMeVXOhqr :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 遍历task统计add/del的pod 创建pod 删除pod 更新job状态 func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {if jobInfo.Job.DeletionTimestamp ! nil {return nil}// ...// 获取job的queue信息queueInfo, err : cc.GetQueueInfo(job.Spec.Queue)if err ! nil {return err}var jobForwarding bool// ExtendClusters 这个属性没有找到介绍 好像只在这里用到了if len(queueInfo.Spec.ExtendClusters) ! 0 {jobForwarding trueif len(job.Annotations) 0 {job.Annotations make(map[string]string)}job.Annotations[batch.JobForwardingKey] truejob, err cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{})if err ! nil {return err}}// 初始化jobif !isInitiated(job) {// initiateJob中会更新job状态、调用add插件、更新podgroupif job, err cc.initiateJob(job); err ! nil {return err}} else {// initOnJobUpdate会调用add插件、更新podgroupif err cc.initOnJobUpdate(job); err ! nil {return err}}// ... 省略 queueInfo.Spec.ExtendClusters 的处理var syncTask boolpgName : job.Name - string(job.UID)if pg, _ : cc.pgLister.PodGroups(job.Namespace).Get(pgName); pg ! nil {if pg.Status.Phase ! pg.Status.Phase ! scheduling.PodGroupPending {syncTask true}// ...}var jobCondition batch.JobCondition// 如果包含刚创建的podgroup 则更新job状态if !syncTask {if updateStatus ! nil {if updateStatus(job.Status) {job.Status.State.LastTransitionTime metav1.Now()jobCondition newCondition(job.Status.State.Phase, job.Status.State.LastTransitionTime)job.Status.Conditions append(job.Status.Conditions, jobCondition)}}newJob, err : cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})// ...return nil}// ... 省略一些计数声明代码// ...waitCreationGroup : sync.WaitGroup{}// 遍历job中的taskfor _, ts : range job.Spec.Tasks {// ...var podToCreateEachTask []*v1.Pod// 根据副本数 创建或删除podfor i : 0; i int(ts.Replicas); i {podName : fmt.Sprintf(jobhelpers.PodNameFmt, job.Name, name, i)if pod, found : pods[podName]; !found {newPod : createJobPod(job, tc, ts.TopologyPolicy, i, jobForwarding)if err : cc.pluginOnPodCreate(job, newPod); err ! nil {return err}podToCreateEachTask append(podToCreateEachTask, newPod)waitCreationGroup.Add(1)} else {delete(pods, podName)if pod.DeletionTimestamp ! nil {atomic.AddInt32(terminating, 1)continue}// 更新状态计数器classifyAndAddUpPodBaseOnPhase(pod, pending, running, succeeded, failed, unknown)calcPodStatus(pod, taskStatusCount)}}// 统计需要创建和删除的podpodToCreate[ts.Name] podToCreateEachTaskfor _, pod : range pods {podToDelete append(podToDelete, pod)}}// 创建podfor taskName, podToCreateEachTask : range podToCreate {if len(podToCreateEachTask) 0 {continue}go func(taskName string, podToCreateEachTask []*v1.Pod) {taskIndex : jobhelpers.GetTasklndexUnderJob(taskName, job)if job.Spec.Tasks[taskIndex].DependsOn ! nil {// 统一判断依赖关系是否满足需求 不满足则不创建podif !cc.waitDependsOnTaskMeetCondition(taskName, taskIndex, podToCreateEachTask, job) {for _, pod : range podToCreateEachTask {go func(pod *v1.Pod) {defer waitCreationGroup.Done()}(pod)}return}}// 执行创建for _, pod : range podToCreateEachTask {go func(pod *v1.Pod) {defer waitCreationGroup.Done()newPod, err : cc.kubeClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})if err ! nil !apierrors.IsAlreadyExists(err) {appendError(creationErrs, fmt.Errorf(failed to create pod %s, err: %#v, pod.Name, err))} else {classifyAndAddUpPodBaseOnPhase(newPod, pending, running, succeeded, failed, unknown)calcPodStatus(pod, taskStatusCount)}}(pod)}}(taskName, podToCreateEachTask)}// 等待创建完成waitCreationGroup.Wait()if len(creationErrs) ! 0 {return fmt.Errorf(failed to create %d pods of %d, len(creationErrs), len(podToCreate))}// 删除podfor _, pod : range podToDelete {go func(pod *v1.Pod) {defer waitDeletionGroup.Done()err : cc.deleteJobPod(job.Name, pod)if err ! nil {appendError(deletionErrs, err)cc.resyncTask(pod)} else {klog.V(3).Infof(Deleted Task %s of Job %s/%s,pod.Name, job.Namespace, job.Name)atomic.AddInt32(terminating, 1)}}(pod)}// 等待删除完成waitDeletionGroup.Wait()if len(deletionErrs) ! 0 {return fmt.Errorf(failed to delete %d pods of %d, len(deletionErrs), len(podToDelete))}job.Status batch.JobStatus{State: job.Status.State,Pending: pending,Running: running,Succeeded: succeeded,Failed: failed,Terminating: terminating,Unknown: unknown,Version: job.Status.Version,MinAvailable: job.Spec.MinAvailable,TaskStatusCount: taskStatusCount,ControlledResources: job.Status.ControlledResources,Conditions: job.Status.Conditions,RetryCount: job.Status.RetryCount,}// 更新job状态if updateStatus ! nil updateStatus(job.Status) {job.Status.State.LastTransitionTime metav1.Now()jobCondition newCondition(job.Status.State.Phase, job.Status.State.LastTransitionTime)job.Status.Conditions append(job.Status.Conditions, jobCondition)}// 调用api更新job状态newJob, err : cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})if err ! nil {klog.Errorf(Failed to update status of Job %v/%v: %v,job.Namespace, job.Name, err)return err}if e : cc.cache.Update(newJob); e ! nil {return e}return nil }其他控制器 其他一些控制器因为逻辑比较简单就不再从代码解析了 jobTemplate controller 监听vcjob和jobtemplate 用于更新jobtemplate 状态中的JobDependsOnList 即有哪些vcjob依赖于该jobtemplate。jobTemplate被官方称之为vcjob的套壳jobTemplate.spec vcjob.spec 目的是为了职责区分。gc controller 监听具有.spec.ttlSecondsAfterFinished属性的vcjob, ttl过期则删除job。
http://www.hkea.cn/news/14298518/

相关文章:

  • 金华专业网站制作公司wordpress二次元网站
  • 淘宝网站建设没法上传怎么办好的产品设计网站
  • 天气预报网站开发山东建设监理协会继续教育网站
  • 福田做商城网站建设哪家便宜龙岗网站建设流程
  • 石家庄有学校交做网站和优化的吗wordpress被植入广告
  • 如何设计网站建设引导页这么做输入文字的网站
  • 鼓楼微网站开发深圳建设集团有限公司官网
  • 信息网站 模板创意设计公司官网
  • 微信公众号的网站企业营销是啥意思
  • 哪个域名注册网站好西安网站建设多少钱
  • 网站建设颜色注意事项电子商城网站建设参考文献
  • 网站建设介绍推广用语西安网站建立
  • 教学成果申报网站 化工专业建设买了网站模版怎么做
  • 成都专业网站营销北辰手机网站建设
  • 有创意的网站开发都匀网站建设公司
  • 网站目录权限设置东莞软文推广
  • 关于怎样把网站建设好的一些建议cms影视源码采集
  • 校园局域网站建设费用给个网站能用的2022
  • 做网站需要备案吗服装网站建设发展状况
  • 哪个网站代做ppt便宜长清网站建设
  • 石家庄集团公司网站建设wordpress主题怎么设置tdk
  • 内衣网站建设推广手机网站 微信平台
  • 做类似简书的网站排名
  • php企业网站 源码企业电子商务网站建设总结
  • 做建筑效果图最好的网站12380网站建设情况
  • 集团公司网站模板装修网页设计网站
  • 推荐佛山伦教网站设计网站空间和数据库空间
  • 泰州企业网站模板建站网站重大建设项目公开发布制度
  • 网站微信登录怎么做杭州比较好的软装设计公司
  • 企业网站建设好处中国制造网简介