您好,登錄后才能下訂單哦!
這篇文章主要講解了“Kubernetes HPA Controller怎么使用”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Kubernetes HPA Controller怎么使用”吧!
HorizontalPodAutoscaler(以下簡稱HPA)的主要代碼如下,主要涉及的文件不多。
cmd/kube-controller-manager/app/autoscaling.go // HPA Controller的啟動代碼 /pkg/controller/podautoscaler . ├── BUILD ├── OWNERS ├── doc.go ├── horizontal.go // podautoscaler的核心代碼,包括其創建和運行的代碼 ├── horizontal_test.go ├── metrics │ ├── BUILD │ ├── metrics_client.go │ ├── metrics_client_test.go │ ├── metrics_client_test.go.orig │ ├── metrics_client_test.go.rej │ └── utilization.go ├── replica_calculator.go // ReplicaCaculator的創建,以及根據cpu/metrics計算replicas的方法 └── replica_calculator_test.go
其中,horizontal.go和replica_calculator.go是最核心的文件,他們對應的Structure如下:
horizontal.go
replica_calculator.go
HPA Controller同其他Controller一樣,都是在kube-controller-manager啟動時完成初始化并啟動的,如下代碼所示。
cmd/kube-controller-manager/app/controllermanager.go:224 func newControllerInitializers() map[string]InitFunc { controllers := map[string]InitFunc{} ... controllers["horizontalpodautoscaling"] = startHPAController ... return controllers }
kube-controller-manager啟動時會initial一堆的controllers,對于HPA controller,它的啟動就交給startHPAController了。
cmd/kube-controller-manager/app/autoscaling.go:29 func startHPAController(ctx ControllerContext) (bool, error) { ... // HPA Controller需要集群已經部署Heapster,由Heapster提供監控數據,來進行replicas的計算。 metricsClient := metrics.NewHeapsterMetricsClient( hpaClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort, ) // 創建ReplicaCaculator,后面會用它來計算desired replicas。 replicaCalc := podautoscaler.NewReplicaCalculator(metricsClient, hpaClient.Core()) // 創建HPA Controller,并啟動goroutine執行其Run方法,開始工作。 go podautoscaler.NewHorizontalController( hpaClient.Core(), hpaClient.Extensions(), hpaClient.Autoscaling(), replicaCalc, ctx.Options.HorizontalPodAutoscalerSyncPeriod.Duration, ).Run(ctx.Stop) return true, nil }
首先我們來看看NewHorizontalController創建HPA Controller的代碼。
pkg/controller/podautoscaler/horizontal.go:112 func NewHorizontalController(evtNamespacer v1core.EventsGetter, scaleNamespacer unversionedextensions.ScalesGetter, hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter, replicaCalc *ReplicaCalculator, resyncPeriod time.Duration) *HorizontalController { ... // 構建HPA Controller controller := &HorizontalController{ replicaCalc: replicaCalc, eventRecorder: recorder, scaleNamespacer: scaleNamespacer, hpaNamespacer: hpaNamespacer, } // 創建Informer,配置對應的ListWatch Func,及其對應的EventHandler,用來監控HPA Resource的Add和Update事件。newInformer是HPA的核心代碼入口。 store, frameworkController := newInformer(controller, resyncPeriod) controller.store = store controller.controller = frameworkController return controller }
我們有必要來看看HPA Controller struct的定義:
pkg/controller/podautoscaler/horizontal.go:59 type HorizontalController struct { scaleNamespacer unversionedextensions.ScalesGetter hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter replicaCalc *ReplicaCalculator eventRecorder record.EventRecorder // A store of HPA objects, populated by the controller. store cache.Store // Watches changes to all HPA objects. controller *cache.Controller }
scaleNamespacer其實是一個ScaleInterface,包括Scale subresource的Get和Update接口。
hpaNamespacer是HorizontalPodAutoscalerInterface,包括HorizontalPodAutoscaler的Create, Update, UpdateStatus, Delete, Get, List, Watch等接口。
replicaCalc根據Heapster提供的監控數據,計算對應desired replicas。
pkg/controller/podautoscaler/replica_calculator.go:31 type ReplicaCalculator struct { metricsClient metricsclient.MetricsClient podsGetter v1core.PodsGetter }
store和controller:controller用來watch HPA objects,并更新到store這個cache中。
上面提到了Scale subresource,那是個什么東西?好吧,我們得看看Scale的定義。
pkg/apis/extensions/v1beta1/types.go:56 // represents a scaling request for a resource. type Scale struct { metav1.TypeMeta `json:",inline"` // Standard object metadata; More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata. // +optional v1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` // defines the behavior of the scale. More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#spec-and-status. // +optional Spec ScaleSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"` // current status of the scale. More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#spec-and-status. Read-only. // +optional Status ScaleStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"` } // describes the attributes of a scale subresource type ScaleSpec struct { // desired number of instances for the scaled object. Replicas int `json:"replicas,omitempty"` } // represents the current status of a scale subresource. type ScaleStatus struct { // actual number of observed instances of the scaled object. Replicas int `json:"replicas"` // label query over pods that should match the replicas count. Selector map[string]string `json:"selector,omitempty"` }
Scale struct作為一次scale動作的請求數據。
其中Spec定義的是desired replicas number。
ScaleStatus定義了current replicas number。
看完了HorizontalController的結構后,接著看看NewHorizontalController中調用的newInformer。在上面的注釋中,我提到newInformer是整個HPA的核心代碼入口。
pkg/controller/podautoscaler/horizontal.go:75 func newInformer(controller *HorizontalController, resyncPeriod time.Duration) (cache.Store, *cache.Controller) { return cache.NewInformer( // 配置ListFucn和WatchFunc,用來定期List和watch HPA resource。 &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { return controller.hpaNamespacer.HorizontalPodAutoscalers(v1.NamespaceAll).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { return controller.hpaNamespacer.HorizontalPodAutoscalers(v1.NamespaceAll).Watch(options) }, }, // 定義期望收到的object為HorizontalPodAutoscaler &autoscaling.HorizontalPodAutoscaler{}, // 定義定期List的周期 resyncPeriod, // 配置HPA resource event的Handler(AddFunc, UpdateFunc) cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { hpa := obj.(*autoscaling.HorizontalPodAutoscaler) hasCPUPolicy := hpa.Spec.TargetCPUUtilizationPercentage != nil _, hasCustomMetricsPolicy := hpa.Annotations[HpaCustomMetricsTargetAnnotationName] if !hasCPUPolicy && !hasCustomMetricsPolicy { controller.eventRecorder.Event(hpa, v1.EventTypeNormal, "DefaultPolicy", "No scaling policy specified - will use default one. See documentation for details") } // 根據監控調整hpa的數據 err := controller.reconcileAutoscaler(hpa) if err != nil { glog.Warningf("Failed to reconcile %s: %v", hpa.Name, err) } }, UpdateFunc: func(old, cur interface{}) { hpa := cur.(*autoscaling.HorizontalPodAutoscaler) // 根據監控調整hpa的數據 err := controller.reconcileAutoscaler(hpa) if err != nil { glog.Warningf("Failed to reconcile %s: %v", hpa.Name, err) } }, // We are not interested in deletions. }, ) }
newInformer的代碼也不長嘛,簡單說來,就是配置了HPA resource的ListWatch的Func,注冊HPA resource 的Add和Update Event的handler Func。
最終通過調用reconcileAutoscaler來矯正hpa的數據。
上面代碼中,將HPA resource的ListWatch Func注冊為HorizontalPodAutoscaler Interface定義的List和Watch接口。
等等,說了這么多,怎么還沒看到HorizontalPodAutoscaler struct的定義呢!好吧,下面就來看看,正好HorizontalPodAutoscaler Interface中出現了。
pkg/apis/autoscaling/v1/types.go:76 // configuration of a horizontal pod autoscaler. type HorizontalPodAutoscaler struct { metav1.TypeMeta `json:",inline"` // Standard object metadata. More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata // +optional v1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` // behaviour of autoscaler. More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#spec-and-status. // +optional Spec HorizontalPodAutoscalerSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"` // current information about the autoscaler. // +optional Status HorizontalPodAutoscalerStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"` }
Spec HorizontalPodAutoscalerSpec
存的是hpa的描述信息,是可以通過kube-controller-manager配置對應flag的信息。包括最小副本數MinReplicas,最大副本數MaxReplicas,hpa對應的所有pods的平均的百分比形式的目標CPU利用率TargetCPUUtilizationPercentage。
pkg/apis/autoscaling/v1/types.go:36 // specification of a horizontal pod autoscaler. type HorizontalPodAutoscalerSpec struct { // reference to scaled resource; horizontal pod autoscaler will learn the current resource consumption // and will set the desired number of pods by using its Scale subresource. ScaleTargetRef CrossVersionObjectReference `json:"scaleTargetRef" protobuf:"bytes,1,opt,name=scaleTargetRef"` // lower limit for the number of pods that can be set by the autoscaler, default 1. // +optional MinReplicas *int32 `json:"minReplicas,omitempty" protobuf:"varint,2,opt,name=minReplicas"` // upper limit for the number of pods that can be set by the autoscaler; cannot be smaller than MinReplicas. MaxReplicas int32 `json:"maxReplicas" protobuf:"varint,3,opt,name=maxReplicas"` // target average CPU utilization (represented as a percentage of requested CPU) over all the pods; // if not specified the default autoscaling policy will be used. // +optional TargetCPUUtilizationPercentage *int32 `json:"targetCPUUtilizationPercentage,omitempty" protobuf:"varint,4,opt,name=targetCPUUtilizationPercentage"` }
Status HorizontalPodAutoscalerStatu
存的是HPA的當前狀態數據,包括前后兩次scale的時間間隔ObservedGeneration,上一次scale的時間戳LastScaleTime,當前副本數CurrentReplicas,期望副本數DesiredReplicas,hpa對應的所有pods的平均的百分比形式的當前CPU利用率。
pkg/apis/autoscaling/v1/types.go:52 // current status of a horizontal pod autoscaler type HorizontalPodAutoscalerStatus struct { // most recent generation observed by this autoscaler. // +optional ObservedGeneration *int64 `json:"observedGeneration,omitempty" protobuf:"varint,1,opt,name=observedGeneration"` // last time the HorizontalPodAutoscaler scaled the number of pods; // used by the autoscaler to control how often the number of pods is changed. // +optional LastScaleTime *metav1.Time `json:"lastScaleTime,omitempty" protobuf:"bytes,2,opt,name=lastScaleTime"` // current number of replicas of pods managed by this autoscaler. CurrentReplicas int32 `json:"currentReplicas" protobuf:"varint,3,opt,name=currentReplicas"` // desired number of replicas of pods managed by this autoscaler. DesiredReplicas int32 `json:"desiredReplicas" protobuf:"varint,4,opt,name=desiredReplicas"` // current average CPU utilization over all pods, represented as a percentage of requested CPU, // e.g. 70 means that an average pod is using now 70% of its requested CPU. // +optional CurrentCPUUtilizationPercentage *int32 `json:"currentCPUUtilizationPercentage,omitempty" protobuf:"varint,5,opt,name=currentCPUUtilizationPercentage"` }
newInformer的代碼可見,不管hpa resource的event為Add或者update,最終都是調用reconcileAutoscaler來觸發HorizontalPodAutoscaler數據的更新。
pkg/controller/podautoscaler/horizontal.go:272 func (a *HorizontalController) reconcileAutoscaler(hpa *autoscaling.HorizontalPodAutoscaler) error { ... // 獲取對應resource的scale subresource數據。 scale, err := a.scaleNamespacer.Scales(hpa.Namespace).Get(hpa.Spec.ScaleTargetRef.Kind, hpa.Spec.ScaleTargetRef.Name) ... // 得到當前副本數 currentReplicas := scale.Status.Replicas cpuDesiredReplicas := int32(0) cpuCurrentUtilization := new(int32) cpuTimestamp := time.Time{} cmDesiredReplicas := int32(0) cmMetric := "" cmStatus := "" cmTimestamp := time.Time{} desiredReplicas := int32(0) rescaleReason := "" timestamp := time.Now() rescale := true // 如果期望副本數為0,這不進行scale操作。 if scale.Spec.Replicas == 0 { // Autoscaling is disabled for this resource desiredReplicas = 0 rescale = false } // 期望副本數不能超過hpa中配置的最大副本數 else if currentReplicas > hpa.Spec.MaxReplicas { rescaleReason = "Current number of replicas above Spec.MaxReplicas" desiredReplicas = hpa.Spec.MaxReplicas } // 期望副本數不能低于配置的最小副本數 else if hpa.Spec.MinReplicas != nil && currentReplicas < *hpa.Spec.MinReplicas { rescaleReason = "Current number of replicas below Spec.MinReplicas" desiredReplicas = *hpa.Spec.MinReplicas } // 期望副本數最少為1 else if currentReplicas == 0 { rescaleReason = "Current number of replicas must be greater than 0" desiredReplicas = 1 } // 如果當前副本數在Min和Max之間,則需要根據cpu或者custom metrics(如果加了對應的Annotation)數據進行算法計算得到期望副本數。 else { // All basic scenarios covered, the state should be sane, lets use metrics. cmAnnotation, cmAnnotationFound := hpa.Annotations[HpaCustomMetricsTargetAnnotationName] if hpa.Spec.TargetCPUUtilizationPercentage != nil || !cmAnnotationFound { // 根據cpu利用率計算期望副本數 cpuDesiredReplicas, cpuCurrentUtilization, cpuTimestamp, err = a.computeReplicasForCPUUtilization(hpa, scale) if err != nil { // 更新hpa的當前副本數 a.updateCurrentReplicasInStatus(hpa, currentReplicas) return fmt.Errorf("failed to compute desired number of replicas based on CPU utilization for %s: %v", reference, err) } } if cmAnnotationFound { // 根據custom metrics數據計算期望副本數 cmDesiredReplicas, cmMetric, cmStatus, cmTimestamp, err = a.computeReplicasForCustomMetrics(hpa, scale, cmAnnotation) if err != nil { // 更新hpa的當前副本數 a.updateCurrentReplicasInStatus(hpa, currentReplicas) return fmt.Errorf("failed to compute desired number of replicas based on Custom Metrics for %s: %v", reference, err) } } // 取cpu和custom metric得到的期望副本數的最大值作為最終的desired replicas,并且要在min和max范圍內。 rescaleMetric := "" if cpuDesiredReplicas > desiredReplicas { desiredReplicas = cpuDesiredReplicas timestamp = cpuTimestamp rescaleMetric = "CPU utilization" } if cmDesiredReplicas > desiredReplicas { desiredReplicas = cmDesiredReplicas timestamp = cmTimestamp rescaleMetric = cmMetric } if desiredReplicas > currentReplicas { rescaleReason = fmt.Sprintf("%s above target", rescaleMetric) } if desiredReplicas < currentReplicas { rescaleReason = "All metrics below target" } if hpa.Spec.MinReplicas != nil && desiredReplicas < *hpa.Spec.MinReplicas { desiredReplicas = *hpa.Spec.MinReplicas } // never scale down to 0, reserved for disabling autoscaling if desiredReplicas == 0 { desiredReplicas = 1 } if desiredReplicas > hpa.Spec.MaxReplicas { desiredReplicas = hpa.Spec.MaxReplicas } // Do not upscale too much to prevent incorrect rapid increase of the number of master replicas caused by // bogus CPU usage report from heapster/kubelet (like in issue #32304). if desiredReplicas > calculateScaleUpLimit(currentReplicas) { desiredReplicas = calculateScaleUpLimit(currentReplicas) } // 根據currentReplicas和desiredReplicas的對比,以及scale時間是否滿足配置間隔要求,決定是否此時需要rescale rescale = shouldScale(hpa, currentReplicas, desiredReplicas, timestamp) } if rescale { scale.Spec.Replicas = desiredReplicas // 執行ScaleInterface的Update接口,觸發調用API Server的對應resource的scale subresource的數據更新。其實最終會去修改對應rc或者deployment的replicas,然后由rc或deployment Controller去最終擴容或者縮容,使得副本數達到新的期望值。 _, err = a.scaleNamespacer.Scales(hpa.Namespace).Update(hpa.Spec.ScaleTargetRef.Kind, scale) if err != nil { a.eventRecorder.Eventf(hpa, v1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error()) return fmt.Errorf("failed to rescale %s: %v", reference, err) } a.eventRecorder.Eventf(hpa, v1.EventTypeNormal, "SuccessfulRescale", "New size: %d; reason: %s", desiredReplicas, rescaleReason) glog.Infof("Successfull rescale of %s, old size: %d, new size: %d, reason: %s", hpa.Name, currentReplicas, desiredReplicas, rescaleReason) } else { desiredReplicas = currentReplicas } // 更新hpa resource的status數據 return a.updateStatus(hpa, currentReplicas, desiredReplicas, cpuCurrentUtilization, cmStatus, rescale) }
上面reconcileAutoscaler的代碼很重要,把想說的都寫到對應的注釋了。其中computeReplicasForCPUUtilization
和computeReplicasForCustomMetrics
需要單獨提出來看看,因為這兩個方法是HPA算法的體現,實際上最終算法是在pkg/controller/podautoscaler/replica_calculator.go:45#GetResourceReplicas
和pkg/controller/podautoscaler/replica_calculator.go:153#GetMetricReplicas
實現的:
pkg/controller/podautoscaler/replica_calculator.go:45#GetResourceReplicas
負責根據heapster提供的cpu利用率數據計算得到desired replicas number。
pkg/controller/podautoscaler/replica_calculator.go:153#GetMetricReplicas
負責根據heapster提供的custom raw metric數據計算得到desired replicas number。
具體關于HPA算法的源碼分析,我后續會單獨寫一篇博客,有興趣的可以關注(對于絕大部分同學來說沒必要關注,除非需要定制HPA算法時,才會具體去分析)。
總而言之,根據cpu和custom metric數據分別計算得到desired replicas后,取兩者最大的值,但不能超過配置的Max Replicas。
稍等稍等,計算出了desired replicas還還夠,我們還要通過shouldScale
看看現在距離上一次彈性伸縮的時間間隔是否滿足條件:
兩次縮容的間隔不得小于5min。
兩次擴容的間隔不得小于3min。
shouldScale
的代碼如下:
pkg/controller/podautoscaler/horizontal.go:387 ... var downscaleForbiddenWindow = 5 * time.Minute var upscaleForbiddenWindow = 3 * time.Minute ... func shouldScale(hpa *autoscaling.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, timestamp time.Time) bool { if desiredReplicas == currentReplicas { return false } if hpa.Status.LastScaleTime == nil { return true } // Going down only if the usageRatio dropped significantly below the target // and there was no rescaling in the last downscaleForbiddenWindow. if desiredReplicas < currentReplicas && hpa.Status.LastScaleTime.Add(downscaleForbiddenWindow).Before(timestamp) { return true } // Going up only if the usage ratio increased significantly above the target // and there was no rescaling in the last upscaleForbiddenWindow. if desiredReplicas > currentReplicas && hpa.Status.LastScaleTime.Add(upscaleForbiddenWindow).Before(timestamp) { return true } return false }
只有滿足這個條件后,接著才會調用Scales.Update接口與API Server交互,完成Scale對應的RC的replicas的設置。以rc Controller為例(deployment Controller的雷同),API Server對應的Scales.Update接口的實現邏輯如下:
pkg/registry/core/rest/storage_core.go:91 func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) { ... if autoscalingGroupVersion := (schema.GroupVersion{Group: "autoscaling", Version: "v1"}); registered.IsEnabledVersion(autoscalingGroupVersion) { apiGroupInfo.SubresourceGroupVersionKind["replicationcontrollers/scale"] = autoscalingGroupVersion.WithKind("Scale") } ... restStorageMap := map[string]rest.Storage{ ... "replicationControllers": controllerStorage.Controller, "replicationControllers/status": controllerStorage.Status, ... } return restStorage, apiGroupInfo, nil } pkg/registry/core/controller/etcd/etcd.go:124 func (r *ScaleREST) Update(ctx api.Context, name string, objInfo rest.UpdatedObjectInfo) (runtime.Object, bool, error) { rc, err := r.registry.GetController(ctx, name, &metav1.GetOptions{}) if err != nil { return nil, false, errors.NewNotFound(autoscaling.Resource("replicationcontrollers/scale"), name) } oldScale := scaleFromRC(rc) obj, err := objInfo.UpdatedObject(ctx, oldScale) if err != nil { return nil, false, err } if obj == nil { return nil, false, errors.NewBadRequest("nil update passed to Scale") } scale, ok := obj.(*autoscaling.Scale) if !ok { return nil, false, errors.NewBadRequest(fmt.Sprintf("wrong object passed to Scale update: %v", obj)) } if errs := validation.ValidateScale(scale); len(errs) > 0 { return nil, false, errors.NewInvalid(autoscaling.Kind("Scale"), scale.Name, errs) } // 設置rc對應spec.replicas為Scale中的期望副本數 rc.Spec.Replicas = scale.Spec.Replicas rc.ResourceVersion = scale.ResourceVersion // 更新到etcd rc, err = r.registry.UpdateController(ctx, rc) if err != nil { return nil, false, err } return scaleFromRC(rc), false, nil }
了解kubernetes rc Controller的同學很清楚,修改rc的replicas后,會被rc Controller watch到,然后觸發rc Controller去執行創建或者銷毀對應差額數量的replicas,最終使得其副本數達到HPA計算得到的期望值。也就是說,最終由rc controller去執行具體的擴容或縮容動作。
最后,來看看HorizontalController的Run方法:
pkg/controller/podautoscaler/horizontal.go:130 func (a *HorizontalController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() glog.Infof("Starting HPA Controller") go a.controller.Run(stopCh) <-stopCh glog.Infof("Shutting down HPA Controller") }
很簡單,就是負責 HPA Resource的ListWatch,將change更新到對應的store(cache)。
HPA Resource的同步周期通過
--horizontal-pod-autoscaler-sync-period
設置,默認值為30s。
感謝各位的閱讀,以上就是“Kubernetes HPA Controller怎么使用”的內容了,經過本文的學習后,相信大家對Kubernetes HPA Controller怎么使用這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。