您好,登錄后才能下訂單哦!
這篇文章主要講解了“Kubernetes Job Controller怎么構造”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Kubernetes Job Controller怎么構造”吧!
廢話不多說,先把完整流程貼出來。
type JobController struct { kubeClient clientset.Interface podControl controller.PodControlInterface // To allow injection of updateJobStatus for testing. updateHandler func(job *batch.Job) error syncHandler func(jobKey string) (bool, error) // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced cache.InformerSynced // jobStoreSynced returns true if the job store has been synced at least once. // Added as a member to the struct to allow injection for testing. jobStoreSynced cache.InformerSynced // A TTLCache of pod creates/deletes each rc expects to see expectations controller.ControllerExpectationsInterface // A store of jobs jobLister batchv1listers.JobLister // A store of pods, populated by the podController podStore corelisters.PodLister // Jobs that need to be updated queue workqueue.RateLimitingInterface recorder record.EventRecorder } func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *JobController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")}) if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) } jm := &JobController{ kubeClient: kubeClient, podControl: controller.RealPodControl{ KubeClient: kubeClient, Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), }, expectations: controller.NewControllerExpectations(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"), recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), } jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: jm.enqueueController, UpdateFunc: jm.updateJob, DeleteFunc: jm.enqueueController, }) jm.jobLister = jobInformer.Lister() jm.jobStoreSynced = jobInformer.Informer().HasSynced podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: jm.addPod, UpdateFunc: jm.updatePod, DeleteFunc: jm.deletePod, }) jm.podStore = podInformer.Lister() jm.podStoreSynced = podInformer.Informer().HasSynced jm.updateHandler = jm.updateJobStatus jm.syncHandler = jm.syncJob return jm }
構造JobController,并初始化相關數據,比如rate limiter queue;
watch pod and job object;
注冊podInformer的add/del/update EventHandler;
注冊jobInformer的add/del/update EventHandler;
注冊updataHandler為updateJobStatus,用來更新Job狀態;
注冊syncHandler為syncJob,用來進行處理queue中的Job;
// Run the main goroutine responsible for watching and syncing jobs. func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer jm.queue.ShutDown() glog.Infof("Starting job controller") defer glog.Infof("Shutting down job controller") if !controller.WaitForCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) { return } for i := 0; i < workers; i++ { go wait.Until(jm.worker, time.Second, stopCh) } <-stopCh }
// worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (jm *JobController) worker() { for jm.processNextWorkItem() { } } func (jm *JobController) processNextWorkItem() bool { key, quit := jm.queue.Get() if quit { return false } defer jm.queue.Done(key) forget, err := jm.syncHandler(key.(string)) if err == nil { if forget { jm.queue.Forget(key) } return true } utilruntime.HandleError(fmt.Errorf("Error syncing job: %v", err)) jm.queue.AddRateLimited(key) return true }
WaitForCacheSync等待jobController cache同步;
啟動5個goruntine,每個協程分別執行worker,每個worker執行完后等待1s,繼續執行,如此循環;
worker負責從從queue中get job key,對每個job,調用syncJob進行同步,如果syncJob成功,則forget the job(其實就是讓rate limiter 停止tracking it),否則將該key再次加入到queue中,等待下次sync。
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked // concurrently with the same key. func (jm *JobController) syncJob(key string) (bool, error) { startTime := time.Now() defer func() { glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime)) }() ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return false, err } if len(ns) == 0 || len(name) == 0 { return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key) } sharedJob, err := jm.jobLister.Jobs(ns).Get(name) if err != nil { if errors.IsNotFound(err) { glog.V(4).Infof("Job has been deleted: %v", key) jm.expectations.DeleteExpectations(key) return true, nil } return false, err } job := *sharedJob // if job was finished previously, we don't want to redo the termination if IsJobFinished(&job) { return true, nil } // retrieve the previous number of retry previousRetry := jm.queue.NumRequeues(key) // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in // and update the expectations after we've retrieved active pods from the store. If a new pod enters // the store after we've checked the expectation, the job sync is just deferred till the next relist. jobNeedsSync := jm.expectations.SatisfiedExpectations(key) pods, err := jm.getPodsForJob(&job) if err != nil { return false, err } activePods := controller.FilterActivePods(pods) active := int32(len(activePods)) succeeded, failed := getStatus(pods) conditions := len(job.Status.Conditions) // job first start if job.Status.StartTime == nil { now := metav1.Now() job.Status.StartTime = &now // enqueue a sync to check if job past ActiveDeadlineSeconds if job.Spec.ActiveDeadlineSeconds != nil { glog.V(4).Infof("Job %s have ActiveDeadlineSeconds will sync after %d seconds", key, *job.Spec.ActiveDeadlineSeconds) jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second) } } var manageJobErr error jobFailed := false var failureReason string var failureMessage string jobHaveNewFailure := failed > job.Status.Failed // check if the number of failed jobs increased since the last syncJob if jobHaveNewFailure && (int32(previousRetry)+1 > *job.Spec.BackoffLimit) { jobFailed = true failureReason = "BackoffLimitExceeded" failureMessage = "Job has reach the specified backoff limit" } else if pastActiveDeadline(&job) { jobFailed = true failureReason = "DeadlineExceeded" failureMessage = "Job was active longer than specified deadline" } if jobFailed { errCh := make(chan error, active) jm.deleteJobPods(&job, activePods, errCh) select { case manageJobErr = <-errCh: if manageJobErr != nil { break } default: } // update status values accordingly failed += active active = 0 job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage)) jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage) } else { if jobNeedsSync && job.DeletionTimestamp == nil { active, manageJobErr = jm.manageJob(activePods, succeeded, &job) } completions := succeeded complete := false if job.Spec.Completions == nil { // This type of job is complete when any pod exits with success. // Each pod is capable of // determining whether or not the entire Job is done. Subsequent pods are // not expected to fail, but if they do, the failure is ignored. Once any // pod succeeds, the controller waits for remaining pods to finish, and // then the job is complete. if succeeded > 0 && active == 0 { complete = true } } else { // Job specifies a number of completions. This type of job signals // success by having that number of successes. Since we do not // start more pods than there are remaining completions, there should // not be any remaining active pods once this count is reached. if completions >= *job.Spec.Completions { complete = true if active > 0 { jm.recorder.Event(&job, v1.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached") } if completions > *job.Spec.Completions { jm.recorder.Event(&job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached") } } } if complete { job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", "")) now := metav1.Now() job.Status.CompletionTime = &now } } forget := false // no need to update the job if the status hasn't changed since last time if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions { job.Status.Active = active job.Status.Succeeded = succeeded job.Status.Failed = failed if err := jm.updateHandler(&job); err != nil { return false, err } if jobHaveNewFailure && !IsJobFinished(&job) { // returning an error will re-enqueue Job after the backoff period return false, fmt.Errorf("failed pod(s) detected for job key %q", key) } forget = true } return forget, manageJobErr }
從Indexer中查找指定的Job是否存在,如果不存在,則從expectations中刪除該job,流程結束返回true。否則繼續下面流程。
根據JobCondition Complete or Failed判斷Job是否Finished,如果Finished,則流程結束返回true,否則繼續下面流程。
調用SatisfiedExpectations,如果ControlleeExpectations中待add和del都<=0,或者expectations已經超過5分鐘沒更新過了,則返回jobNeedsSync=true,表示需要進行一次manageJob了。
對于那些第一次啟動的jobs (StartTime==nil), 需要把設置StartTime,并且如果ActiveDeadlineSeconds不為空,則經過ActiveDeadlineSeconds后再次把該job加入到queue中進行sync。
獲取該job管理的所有pods,過濾出activePods,計算出actived,successed,failed pods的數量。如果failed > job.Status.Failed,說明該job又有新failed Pods了,則jobHaveNewFailure為true。
如果jobHaveNewFailure,并且queue記錄的該job retry次數加1,比job.Spec.BackoffLimit(默認為6),則表示該job BackoffLimitExceeded,jobFailed。如果job StartTime到現在為止的歷時>=ActiveDeadlineSeconds,則表示該job DeadlineExceeded,jobFailed。
如果jobFailed,則用sync.WaitGroup并發等待刪除所有的前面過濾出來的activePods,刪除成功,則failed += acitve, active = 0, 并設置Condition Failed為true。
如果job not failed, jobNeedSync為true,并且job的DeletionTimestamp為空(沒有標記為刪除),則調用manageJob對Job管理的pods根據復雜的策略進行add or del。
如果job not failed且job.Spec.Completions為nil,表示This type of job is complete when any pod exits with success。因此如果succeeded > 0 && active == 0,則表示job completed。
如果如果job not failed且job.Spec.Completions不為nil,表示This type of job signals success by having that number of successes。因此如果succeeded >= job.Spec.Completions,則表示job completed。
如果job completed,則更新其Conditions Complete為true,并設置CompletionTime。
接下來invoke updateJobStatus更新etcd中job狀態,如果更新失敗,則返回false,該job將再次加入queue。如果jobHaveNewFailure為true,并且Job Condition顯示該Job not Finished,則返回false,該job將再次加入queue。
// manageJob is the core method responsible for managing the number of running // pods according to what is specified in the job.Spec. // Does NOT modify <activePods>. func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) { var activeLock sync.Mutex active := int32(len(activePods)) parallelism := *job.Spec.Parallelism jobKey, err := controller.KeyFunc(job) if err != nil { utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err)) return 0, nil } var errCh chan error if active > parallelism { diff := active - parallelism errCh = make(chan error, diff) jm.expectations.ExpectDeletions(jobKey, int(diff)) glog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff) // Sort the pods in the order such that not-ready < ready, unscheduled // < scheduled, and pending < running. This ensures that we delete pods // in the earlier stages whenever possible. sort.Sort(controller.ActivePods(activePods)) active -= diff wait := sync.WaitGroup{} wait.Add(int(diff)) for i := int32(0); i < diff; i++ { go func(ix int32) { defer wait.Done() if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil { defer utilruntime.HandleError(err) // Decrement the expected number of deletes because the informer won't observe this deletion glog.V(2).Infof("Failed to delete %v, decrementing expectations for job %q/%q", activePods[ix].Name, job.Namespace, job.Name) jm.expectations.DeletionObserved(jobKey) activeLock.Lock() active++ activeLock.Unlock() errCh <- err } }(i) } wait.Wait() } else if active < parallelism { wantActive := int32(0) if job.Spec.Completions == nil { // Job does not specify a number of completions. Therefore, number active // should be equal to parallelism, unless the job has seen at least // once success, in which leave whatever is running, running. if succeeded > 0 { wantActive = active } else { wantActive = parallelism } } else { // Job specifies a specific number of completions. Therefore, number // active should not ever exceed number of remaining completions. wantActive = *job.Spec.Completions - succeeded if wantActive > parallelism { wantActive = parallelism } } diff := wantActive - active if diff < 0 { utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active)) diff = 0 } jm.expectations.ExpectCreations(jobKey, int(diff)) errCh = make(chan error, diff) glog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff) active += diff wait := sync.WaitGroup{} // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize // and double with each successful iteration in a kind of "slow start". // This handles attempts to start large numbers of pods that would // likely all fail with the same error. For example a project with a // low quota that attempts to create a large number of pods will be // prevented from spamming the API service with the pod create requests // after one of its pods fails. Conveniently, this also prevents the // event spam that those failures would generate. for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) { errorCount := len(errCh) wait.Add(int(batchSize)) for i := int32(0); i < batchSize; i++ { go func() { defer wait.Done() err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind)) if err != nil && errors.IsTimeout(err) { // Pod is created but its initialization has timed out. // If the initialization is successful eventually, the // controller will observe the creation via the informer. // If the initialization fails, or if the pod keeps // uninitialized for a long time, the informer will not // receive any update, and the controller will create a new // pod when the expectation expires. return } if err != nil { defer utilruntime.HandleError(err) // Decrement the expected number of creates because the informer won't observe this pod glog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name) jm.expectations.CreationObserved(jobKey) activeLock.Lock() active-- activeLock.Unlock() errCh <- err } }() } wait.Wait() // any skipped pods that we never attempted to start shouldn't be expected. skippedPods := diff - batchSize if errorCount < len(errCh) && skippedPods > 0 { glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q", skippedPods, job.Namespace, job.Name) active -= skippedPods for i := int32(0); i < skippedPods; i++ { // Decrement the expected number of creates because the informer won't observe this pod jm.expectations.CreationObserved(jobKey) } // The skipped pods will be retried later. The next controller resync will // retry the slow start process. break } diff -= batchSize } } select { case err := <-errCh: // all errors have been reported before, we only need to inform the controller that there was an error and it should re-try this job once more next time. if err != nil { return active, err } default: } return active, nil }
如果active > job.Spec.Parallelism, 表示要scale down:
計算active與parallelism的差值diff,修改ControllerExpectations中該job的dels為diff,表示要刪除diff這么多的pod。
計算active與parallelism的差值diff,修改ControllerExpectations中該job的dels為diff,表示要刪除diff這么多的pod。
將activePods中的Pods按照not-ready < ready, unscheduled < scheduled, pending < running進行排序,確保先刪除stage越早的pods。
更新active (active減去diff),用sync.WaitGroup并發等待刪除etcd中那些Pods。如果刪除某個Pod失敗,active要加1,expectations中dels要減1.
返回active
如果active < job.Spec.Parallelism, 表示要scale up:
如果job.Spec.Completions為nil,且succeeded大于0,則diff設為0;如果job.Spec.Completions為nil,但successed = 0,則diff為 parallelism-active;如果job.Spec.Completions不為nil,則diff為max(job.Spec.Completions - succeeded,parallelim) - active;
修改ControllerExpectations中該job的adds為diff,表示要新增diff這么多的pod。
更新active (active加上diff),用sync.WaitGroup分批的創建Pods,第一批創建1個(代碼寫死SlowStartInitialBatchSize = 1),第二批創建2,然后4,8,16...這樣下去,但是每次不能超過diff的值。每一批創建pod后,注意更新diff的值(減去batchsize)。如果某一批創建過程Pods中存在失敗情況,則更新active和expectations中adds,且不進行后續未啟動的批量創建pods行為。
如果active == job.Spec.Parallelism,返回active。
感謝各位的閱讀,以上就是“Kubernetes Job Controller怎么構造”的內容了,經過本文的學習后,相信大家對Kubernetes Job Controller怎么構造這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。