Openkruise/rollouts 源码解读

最近因为工作需要的原因,无意中接触到了 rollouts 这个开源项目。它是由阿里巴巴开源的一个k8s相关的一个工具。

其主要作用是工作负载(workload) 批量发布与回滚

当前支持的工作负载(workload)有:

  • Deployment
  • CloneSet
  • StatefulSet

同时也支持 Nginx/ALB/Istio 的流量路由控制

常规情况下,我们升级一个工作负载,以Deployment为例,我们更改Deployment 镜像的版本之后,Deployment的控制器会重新去创建新的副本,然后替换掉原来的副本。
这种情况下,如果我们创建出来的副本程序本身是Ready的,但业务不是Ready的,控制器本身仍然会继续将没有更新的副本更新完成,即使我们后续发现了问题,将其回滚,那它的影响面也会非常大。
所以我们需要一定的反悔时间,留给工作人员去观察业务是否正常,正常之后,再继续灰度下去,而不是ALL IN,然后ALL GET OUT。而 rollouts 正是帮我们做这个事情的。

rollout 状态流转

简单使用

参考 基础使用文档

源码解读

CRD设计

Rollout

WorkloadRef 定义了我们要去监听的workload对象,当这个对象 PodTemplate 发生变更后,就会执行更新策略。

// WorkloadRef holds a references to the Kubernetes object
type WorkloadRef struct {// API Version of the referentAPIVersion string `json:"apiVersion"`// Kind of the referentKind string `json:"kind"`// Name of the referentName string `json:"name"`
}

CanaryStrategy 定义了我们的更新策略,其中CanaryStep则定义了我们每一步的动作:更新的副本数,更新完成后暂停多久,流量的权重。
对于流量相关的我这里就不做详解了,有兴趣的可以自己看。

// CanaryStrategy defines parameters for a Replica Based Canary
type CanaryStrategy struct {// Steps define the order of phases to execute release in batches(20%, 40%, 60%, 80%, 100%)// +optionalSteps []CanaryStep `json:"steps,omitempty"`// TrafficRoutings hosts all the supported service meshes supported to enable more fine-grained traffic routingTrafficRoutings []*TrafficRouting `json:"trafficRoutings,omitempty"`
}// CanaryStep defines a step of a canary workload.
type CanaryStep struct {// SetWeight sets what percentage of the canary pods should receive// +optionalWeight *int32 `json:"weight,omitempty"`// Replicas is the number of expected canary pods in this batch// it can be an absolute number (ex: 5) or a percentage of total pods.Replicas *intstr.IntOrString `json:"replicas,omitempty"`// Pause defines a pause stage for a rollout, manual or auto// +optionalPause RolloutPause `json:"pause,omitempty"`
}

上面这部分定义了我们要观察的对象已经我们的动作。
下面这部分则定义了rollout的状态, rollout 在执行过程中,它的状态就会记录在这里面。

type RolloutStatus struct {// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster// Important: Run "make" to regenerate code after modifying this file// observedGeneration is the most recent generation observed for this Rollout.ObservedGeneration int64 `json:"observedGeneration,omitempty"`// CanaryRevision the hash of the canary pod template// +optional//CanaryRevision string `json:"canaryRevision,omitempty"`// StableRevision indicates the revision pods that has successfully rolled outStableRevision string `json:"stableRevision,omitempty"`// Conditions a list of conditions a rollout can have.// +optionalConditions []RolloutCondition `json:"conditions,omitempty"`// Canary describes the state of the canary rollout// +optionalCanaryStatus *CanaryStatus `json:"canaryStatus,omitempty"`// +optional//BlueGreenStatus *BlueGreenStatus `json:"blueGreenStatus,omitempty"`// Phase is the rollout phase.Phase RolloutPhase `json:"phase,omitempty"`// Message provides details on why the rollout is in its current phaseMessage string `json:"message,omitempty"`
}// RolloutCondition describes the state of a rollout at a certain point.
type RolloutCondition struct {// Type of rollout condition.Type RolloutConditionType `json:"type"`// Phase of the condition, one of True, False, Unknown.Status corev1.ConditionStatus `json:"status"`// The last time this condition was updated.LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"`// Last time the condition transitioned from one status to another.LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`// The reason for the condition's last transition.Reason string `json:"reason"`// A human readable message indicating details about the transition.Message string `json:"message"`
}
// CanaryStatus status fields that only pertain to the canary rollout
type CanaryStatus struct {// observedWorkloadGeneration is the most recent generation observed for this Rollout ref workload generation.ObservedWorkloadGeneration int64 `json:"observedWorkloadGeneration,omitempty"`// ObservedRolloutID will record the newest spec.RolloutID if status.canaryRevision equals to workload.updateRevisionObservedRolloutID string `json:"observedRolloutID,omitempty"`// RolloutHash from rollout.spec objectRolloutHash string `json:"rolloutHash,omitempty"`// CanaryService holds the name of a service which selects pods with canary version and don't select any pods with stable version.CanaryService string `json:"canaryService"`// CanaryRevision is calculated by rollout based on podTemplateHash, and the internal logic flow uses// It may be different from rs podTemplateHash in different k8s versions, so it cannot be used as service selector label// +optionalCanaryRevision string `json:"canaryRevision"`// pod template hash is used as service selector labelPodTemplateHash string `json:"podTemplateHash"`// CanaryReplicas the numbers of canary revision podsCanaryReplicas int32 `json:"canaryReplicas"`// CanaryReadyReplicas the numbers of ready canary revision podsCanaryReadyReplicas int32 `json:"canaryReadyReplicas"`// CurrentStepIndex defines the current step of the rollout is on. If the current step index is null, the// controller will execute the rollout.// +optionalCurrentStepIndex int32           `json:"currentStepIndex"`CurrentStepState CanaryStepState `json:"currentStepState"`Message          string          `json:"message,omitempty"`LastUpdateTime   *metav1.Time    `json:"lastUpdateTime,omitempty"`
}

其中整个rollout有这些阶段:

const (// RolloutPhaseInitial indicates a rollout is InitialRolloutPhaseInitial RolloutPhase = "Initial"// RolloutPhaseHealthy indicates a rollout is healthyRolloutPhaseHealthy RolloutPhase = "Healthy"// RolloutPhaseProgressing indicates a rollout is not yet healthy but still making progress towards a healthy stateRolloutPhaseProgressing RolloutPhase = "Progressing"// RolloutPhaseTerminating indicates a rollout is terminatedRolloutPhaseTerminating RolloutPhase = "Terminating"
)

每个CanaryStep则有这些状态:

const (CanaryStepStateUpgrade         CanaryStepState = "StepUpgrade"CanaryStepStateTrafficRouting  CanaryStepState = "StepTrafficRouting"CanaryStepStateMetricsAnalysis CanaryStepState = "StepMetricsAnalysis"CanaryStepStatePaused          CanaryStepState = "StepPaused"CanaryStepStateReady           CanaryStepState = "StepReady"CanaryStepStateCompleted       CanaryStepState = "Completed"
)

每个Conditions则有这些类型

const (RolloutConditionProgressing RolloutConditionType = "Progressing"// Progressing ReasonProgressingReasonInitializing = "Initializing"ProgressingReasonInRolling    = "InRolling"ProgressingReasonFinalising   = "Finalising"ProgressingReasonSucceeded    = "Succeeded"ProgressingReasonCancelling   = "Cancelling"ProgressingReasonCanceled     = "Canceled"ProgressingReasonPaused       = "Paused"// Terminating conditionRolloutConditionTerminating RolloutConditionType = "Terminating"// Terminating ReasonTerminatingReasonInTerminating = "InTerminating"TerminatingReasonCompleted     = "Completed"
)

BatchRelease

BatchRelease的Spec部分我就步过多介绍,它和 Rollout 差别不大。主要区别在与Status的记录。

BatchReleaseCanaryStatus记录了每个批次的发布状态,其中CurrentBatchState 3个状态:Upgrading,Verifying,Ready

type BatchReleaseCanaryStatus struct {// CurrentBatchState indicates the release state of the current batch.CurrentBatchState BatchReleaseBatchStateType `json:"batchState,omitempty"`// The current batch the rollout is working on/blocked, it starts from 0CurrentBatch int32 `json:"currentBatch"`// BatchReadyTime is the ready timestamp of the current batch or the last batch.// This field is updated once a batch ready, and the batches[x].pausedSeconds// relies on this field to calculate the real-time duration.BatchReadyTime *metav1.Time `json:"batchReadyTime,omitempty"`// UpdatedReplicas is the number of upgraded Pods.UpdatedReplicas int32 `json:"updatedReplicas,omitempty"`// UpdatedReadyReplicas is the number upgraded Pods that have a Ready Condition.UpdatedReadyReplicas int32 `json:"updatedReadyReplicas,omitempty"`
}const (// UpgradingBatchState indicates that current batch is at upgrading pod stateUpgradingBatchState BatchReleaseBatchStateType = "Upgrading"// VerifyingBatchState indicates that current batch is at verifying whether it's ready stateVerifyingBatchState BatchReleaseBatchStateType = "Verifying"// ReadyBatchState indicates that current batch is at batch ready stateReadyBatchState BatchReleaseBatchStateType = "Ready"
)

上述两个CRD就是整个项目的核心,它们的各种状态转换,构成了完整的升级流程;其中 Rollout 负责整体流程的把控,由用户创建;BatchRelease 负责具体的各个workload的发布,由Rollout进行创建。

代码解读

main函数我就不详细介绍了,其主要是启动了两个控制器,来分别管理Rollout和BatchRelease两个CRD,同时启动一个webhook,来暂停workload,使得k8s控制器,暂时不做升级。

// main.go
func main() {...// Rollout的管理器mgr, err := ctrl.NewManager(cfg, ctrl.Options{Scheme:                 scheme,MetricsBindAddress:     metricsAddr,Port:                   9443,HealthProbeBindAddress: probeAddr,LeaderElection:         enableLeaderElection,LeaderElectionID:       "71ddec2c.kruise.io",NewClient:              utilclient.NewClient,})...// BatchRelease的管理器if err = br.Add(mgr); err != nil {setupLog.Error(err, "unable to create controller", "controller", "BatchRelease")os.Exit(1)}...// workload 和 rollout 的 webhookif err = webhook.SetupWithManager(mgr); err != nil {setupLog.Error(err, "unable to setup webhook")os.Exit(1)}...
}

Rollout 的调协过程

type RolloutReconciler struct {client.ClientScheme *runtime.SchemeRecorder record.EventRecorder //用于事件记录Finder   *util.ControllerFinder //用于寻找对应workload的控制器
}

我们可以详细来看看 Rollout Reconcile的主流程,
大概可以概括为以下几步:

  1. 监听rollout变更
  2. 获取一个动态的Watcher来监听对应的workload更新
  3. 处理finalizer:如果Rollout是新创建的添加finalizer,如果是Rollout需要删除就移除finalizer
  4. 更新rollout status,主要处理阶段为:RolloutPhaseInitial->RolloutPhaseHealthy->RolloutPhaseProgressing->RolloutPhaseHealthy->RolloutPhaseTerminating
  5. 处理RolloutPhaseProgressing阶段更新流程
  6. 处理RolloutPhaseTerminating阶段终止流程
func (r *RolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {// 1.监听rollout变更rollout := &rolloutv1alpha1.Rollout{}err := r.Get(context.TODO(), req.NamespacedName, rollout)if err != nil {if errors.IsNotFound(err) {return ctrl.Result{}, nil}return ctrl.Result{}, err}// 2. 获取一个动态的Watcher来监听对应的workload更新workloadRef := rollout.Spec.ObjectRef.WorkloadRefworkloadGVK := util.GetGVKFrom(workloadRef)_, exists := watchedWorkload.Load(workloadGVK.String())if workloadRef != nil && !exists {succeeded, err := util.AddWatcherDynamically(runtimeController, workloadHandler, workloadGVK)if err != nil {return ctrl.Result{}, err} else if succeeded {watchedWorkload.LoadOrStore(workloadGVK.String(), struct{}{})klog.Infof("Rollout controller begin to watch workload type: %s", workloadGVK.String())return ctrl.Result{}, nil}}// 3.处理finalizererr = r.handleFinalizer(rollout)if err != nil {return ctrl.Result{}, err}// 4.更新rollout status的done, err := r.updateRolloutStatus(rollout)if err != nil {return ctrl.Result{}, err} else if !done {return ctrl.Result{}, nil}var recheckTime *time.Timeswitch rollout.Status.Phase {case rolloutv1alpha1.RolloutPhaseProgressing:// 5. 处理RolloutPhaseProgressing阶段更新流程recheckTime, err = r.reconcileRolloutProgressing(rollout)case rolloutv1alpha1.RolloutPhaseTerminating:// 6. 处理RolloutPhaseTerminating阶段终止流程recheckTime, err = r.reconcileRolloutTerminating(rollout)}if err != nil {return ctrl.Result{}, err} else if recheckTime != nil {return ctrl.Result{RequeueAfter: time.Until(*recheckTime)}, nil}return ctrl.Result{}, nil
}

1~3步比较简单,我就不多说,我们详细讲解一下4,5,6三步。

4. 更新rollout status

部分代码已经删减

func (r *RolloutReconciler) updateRolloutStatus(rollout *rolloutv1alpha1.Rollout) (done bool, err error) {... // 1.如果是删除CRD, rollout status Phase 就由其他阶段转换为(终止阶段)RolloutPhaseTerminating,同时添加 Condition:RolloutConditionTerminating=false.// 注意这里要特别注意Reason 因为Condition也状态转换条件之一// 2.如果rollout status Phase 没有,就默认是初始化阶段(RolloutPhaseInitial)if !rollout.DeletionTimestamp.IsZero() && newStatus.Phase != rolloutv1alpha1.RolloutPhaseTerminating {newStatus.Phase = rolloutv1alpha1.RolloutPhaseTerminatingcond := util.NewRolloutCondition(rolloutv1alpha1.RolloutConditionTerminating, corev1.ConditionFalse, rolloutv1alpha1.TerminatingReasonInTerminating, "Rollout is in terminating")util.SetRolloutCondition(&newStatus, *cond)} else if newStatus.Phase == "" {newStatus.Phase = rolloutv1alpha1.RolloutPhaseInitial}// 3.获取rollout 对于的workload 的信息,没有获取到重置rollout status Phase 为RolloutPhaseInitialworkload, err := r.Finder.GetWorkloadForRef(rollout.Namespace, rollout.Spec.ObjectRef.WorkloadRef)if err != nil {klog.Errorf("rollout(%s/%s) get workload failed: %s", rollout.Namespace, rollout.Name, err.Error())return} else if workload == nil {if rollout.DeletionTimestamp.IsZero() {resetStatus(&newStatus)klog.Infof("rollout(%s/%s) workload not found, and reset status be Initial", rollout.Namespace, rollout.Name)}done = truereturn}...// 4 rollout status Phase 状态转换,每次调协,如果条件成立就转换一次,转换顺序为:// RolloutPhaseInitial --> RolloutPhaseHealthy// RolloutPhaseHealthy --> RolloutPhaseProgressing// RolloutPhaseProgressing --> RolloutPhaseHealthy// 其中RolloutPhaseHealthy --> RolloutPhaseProgressing 阶段 会将Condition的Type设置为RolloutConditionProgressing,同时修改其Reason为ProgressingReasonInitializing// RolloutPhaseProgressing --> RolloutPhaseHealthy 阶段 则需要Condition的Type为RolloutConditionProgressing的Reason为ProgressingReasonSucceeded或ProgressingReasonCanceled.// 或者没有Type为RolloutConditionProgressing的Conditionswitch newStatus.Phase {case rolloutv1alpha1.RolloutPhaseInitial:klog.Infof("rollout(%s/%s) status phase from(%s) -> to(%s)", rollout.Namespace, rollout.Name, rolloutv1alpha1.RolloutPhaseInitial, rolloutv1alpha1.RolloutPhaseHealthy)newStatus.Phase = rolloutv1alpha1.RolloutPhaseHealthynewStatus.Message = "rollout is healthy"case rolloutv1alpha1.RolloutPhaseHealthy:if workload.InRolloutProgressing {// from healthy to progressingklog.Infof("rollout(%s/%s) status phase from(%s) -> to(%s)", rollout.Namespace, rollout.Name, rolloutv1alpha1.RolloutPhaseHealthy, rolloutv1alpha1.RolloutPhaseProgressing)newStatus.Phase = rolloutv1alpha1.RolloutPhaseProgressingcond := util.NewRolloutCondition(rolloutv1alpha1.RolloutConditionProgressing, corev1.ConditionFalse, rolloutv1alpha1.ProgressingReasonInitializing, "Rollout is in Progressing")util.SetRolloutCondition(&newStatus, *cond)} else if workload.IsInStable && newStatus.CanaryStatus == nil {newStatus.CanaryStatus = &rolloutv1alpha1.CanaryStatus{CanaryReplicas:             workload.CanaryReplicas,CanaryReadyReplicas:        workload.CanaryReadyReplicas,ObservedRolloutID:          getRolloutID(workload, rollout),ObservedWorkloadGeneration: workload.Generation,PodTemplateHash:            workload.PodTemplateHash,CanaryRevision:             workload.CanaryRevision,CurrentStepIndex:           int32(len(rollout.Spec.Strategy.Canary.Steps)),CurrentStepState:           rolloutv1alpha1.CanaryStepStateCompleted,}newStatus.Message = "workload deployment is completed"}case rolloutv1alpha1.RolloutPhaseProgressing:cond := util.GetRolloutCondition(newStatus, rolloutv1alpha1.RolloutConditionProgressing)if cond == nil || cond.Reason == rolloutv1alpha1.ProgressingReasonSucceeded || cond.Reason == rolloutv1alpha1.ProgressingReasonCanceled {newStatus.Phase = rolloutv1alpha1.RolloutPhaseHealthy}}done = truereturn
}
5. 处理RolloutPhaseProgressing阶段更新流程

当rollout status Phase的状态处于RolloutPhaseProgressing后,后续的状态转换则主要是Type为RolloutConditionProgressing的Condition的Reason转换。

func (r *RolloutReconciler) reconcileRolloutProgressing(rollout *rolloutv1alpha1.Rollout) (*time.Time, error) {//1.获取Type 为 RolloutConditionProgressing 的 Condition 和 workloadcond := util.GetRolloutCondition(rollout.Status, rolloutv1alpha1.RolloutConditionProgressing)workload, err := r.Finder.GetWorkloadForRef(rollout.Namespace, rollout.Spec.ObjectRef.WorkloadRef)if err != nil {return nil, err} else if workload == nil {return nil, nil} else if !workload.IsStatusConsistent {return nil, nil}var recheckTime *time.TimenewStatus := rollout.Status.DeepCopy()// Condition Reason 状态转换,每次调协,如果条件成立就转换一次,转换顺序为:// ProgressingReasonInitializing --> ProgressingReasonInRolling// ProgressingReasonInRolling --> ProgressingReasonCancelling// ProgressingReasonInRolling --> ProgressingReasonPaused// ProgressingReasonInRolling --> ProgressingReasonInitializing// ProgressingReasonInRolling --> ProgressingReasonFinalising// ProgressingReasonFinalising --> ProgressingReasonSucceeded// ProgressingReasonPaused --> ProgressingReasonCancelling// ProgressingReasonPaused --> ProgressingReasonInRolling// ProgressingReasonCancelling --> ProgressingReasonCanceledswitch cond.Reason {case rolloutv1alpha1.ProgressingReasonInitializing:newStatus.CanaryStatus = &rolloutv1alpha1.CanaryStatus{} // 初始化CanaryStatusdone, _, err := r.doProgressingInitializing(rollout, newStatus)if done {progressingStateTransition(newStatus, corev1.ConditionFalse, rolloutv1alpha1.ProgressingReasonInRolling, "Rollout is in Progressing")}case rolloutv1alpha1.ProgressingReasonInRolling:recheckTime, err = r.doProgressingInRolling(rollout, workload, newStatus)case rolloutv1alpha1.ProgressingReasonFinalising:var done booldone, recheckTime, err = r.doFinalising(rollout, newStatus, true)if done {progressingStateTransition(newStatus, corev1.ConditionTrue, rolloutv1alpha1.ProgressingReasonSucceeded, "Rollout has been completed, and succeed")}case rolloutv1alpha1.ProgressingReasonPaused:if workload.IsInRollback {newStatus.CanaryStatus.CanaryRevision = workload.CanaryRevisionr.Recorder.Eventf(rollout, corev1.EventTypeNormal, "Progressing", "workload has been rollback, then rollout is canceled")progressingStateTransition(newStatus, corev1.ConditionFalse, rolloutv1alpha1.ProgressingReasonCancelling, "The workload has been rolled back and the rollout process will be cancelled")} else if !rollout.Spec.Strategy.Paused {progressingStateTransition(newStatus, corev1.ConditionFalse, rolloutv1alpha1.ProgressingReasonInRolling, "")}case rolloutv1alpha1.ProgressingReasonCancelling:var done booldone, recheckTime, err = r.doFinalising(rollout, newStatus, false)if done {progressingStateTransition(newStatus, corev1.ConditionFalse, rolloutv1alpha1.ProgressingReasonCanceled, "")}case rolloutv1alpha1.ProgressingReasonSucceeded, rolloutv1alpha1.ProgressingReasonCanceled:}err = r.updateRolloutStatusInternal(rollout, *newStatus)if err != nil {return nil, err}return recheckTime, nil
}

其主要状态变更为:

  1. ProgressingReasonInitializing --> ProgressingReasonInRolling
  2. ProgressingReasonInRolling --> ProgressingReasonCancelling
  3. ProgressingReasonInRolling --> ProgressingReasonPaused
  4. ProgressingReasonInRolling --> ProgressingReasonInitializing
  5. ProgressingReasonInRolling --> ProgressingReasonFinalising
  6. ProgressingReasonFinalising --> ProgressingReasonSucceeded
  7. ProgressingReasonPaused --> ProgressingReasonCancelling
  8. ProgressingReasonPaused --> ProgressingReasonInRolling
  9. ProgressingReasonCancelling --> ProgressingReasonCanceled

其中Reason为ProgressingReasonInRolling则是核心,它负责处理工作负载在更新过程中碰到的各种问题。

  1. 如果workload需要回滚,则进入 ProgressingReasonCancelling 态。
  2. 如果workload需要暂停,则进入 ProgressingReasonPaused 态。
  3. 如果workload连续发布,就删除掉已经创建的batchRelease,然后重新进入 ProgressingReasonInitializing 态。
  4. 如果rollout发生改变,则重新计算当前的处于计划的第几步,然后更新Status.CanaryStatus。
  5. 如果Status.CanaryStatus.CurrentStepState的状态为CanaryStepStateCompleted,即已经更新完成,则进入 ProgressingReasonFinalising 态。
  6. 正常进行更新。

接下来,我们继续看如何进行正常的更新,其主要状态变更则转移为 Status.CanaryStatus 变更,接下来都以 CanaryStatus 进行表述。

其核心代码下面部分,为了方便阅读,我保留了注释和核心部分,日志则被我移除了:

// canary.go
func (r *rolloutContext) runCanary() error {canaryStatus := r.newStatus.CanaryStatus// init canary statusif canaryStatus.CanaryRevision == "" {canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStateUpgradecanaryStatus.CanaryRevision = r.workload.CanaryRevisioncanaryStatus.CurrentStepIndex = 1canaryStatus.RolloutHash = r.rollout.Annotations[util.RolloutHashAnnotation]}// update canary statusbatch, err := r.batchControl.FetchBatchRelease()if err != nil {canaryStatus.CanaryReplicas = r.workload.CanaryReplicascanaryStatus.CanaryReadyReplicas = r.workload.CanaryReadyReplicas} else {canaryStatus.CanaryReplicas = batch.Status.CanaryStatus.UpdatedReplicascanaryStatus.CanaryReadyReplicas = batch.Status.CanaryStatus.UpdatedReadyReplicas}switch canaryStatus.CurrentStepState {case rolloutv1alpha1.CanaryStepStateUpgrade:done, err := r.doCanaryUpgrade()if err != nil {return err} else if done {canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStateTrafficRoutingcanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}}case rolloutv1alpha1.CanaryStepStateTrafficRouting:done, err := r.doCanaryTrafficRouting()if err != nil {return err} else if done {canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStateMetricsAnalysis}expectedTime := time.Now().Add(time.Duration(defaultGracePeriodSeconds) * time.Second)r.recheckTime = &expectedTimecase rolloutv1alpha1.CanaryStepStateMetricsAnalysis:done, err := r.doCanaryMetricsAnalysis()if err != nil {return err} else if done {canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStatePaused}case rolloutv1alpha1.CanaryStepStatePaused:done, err := r.doCanaryPaused()if err != nil {return err} else if done {canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStateReady}case rolloutv1alpha1.CanaryStepStateReady:// run next stepif len(r.rollout.Spec.Strategy.Canary.Steps) > int(canaryStatus.CurrentStepIndex) {canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}canaryStatus.CurrentStepIndex++canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStateUpgrade} else {canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}canaryStatus.CurrentStepState = rolloutv1alpha1.CanaryStepStateCompleted}// canary completedcase rolloutv1alpha1.CanaryStepStateCompleted:}return nil
}

通过阅读上面代码,我们可以发现它的主要工作就是处理 CanaryStatus 的状态转换。
0. 在 上一段代码里 当Reason处于ProgressingReasonInitializing时,CanaryStatus 会被初始化,方便在这里进行初始化设置。

  1. 初始化 CanaryStatus 的 CurrentStepState,CanaryRevision,RolloutHash
  2. 更新 CanaryStatus 的 CanaryReplicas 和 CanaryReadyReplicas信息
  3. CanaryStatus.CurrentStepState状态转换:
  4. 如果当前这步工作负载升级完成,则 CanaryStepStateUpgrade --> CanaryStepStateTrafficRouting
  5. CanaryStepStateTrafficRouting --> CanaryStepStateMetricsAnalysis --> CanaryStepStatePaused
  6. 如果当前时间在允许允许的时间之前,则CanaryStepStatePaused --> CanaryStepStateReady,如果 duration 为 nil,就需要手动进入CanaryStepStateReady
  7. 如果还有步骤没有执行,则 CanaryStepStateReady --> CanaryStepStateUpgrade ,索引+1
  8. 如果所有步骤已经执行,则 CanaryStepStateReady --> CanaryStepStateCompleted

这里面工作工作负载更新,核心阶段为 CanaryStepStateUpgrade
核心代码为:

func (r *rolloutContext) doCanaryUpgrade() (bool, error) {// verify whether batchRelease configuration is the lateststeps := len(r.rollout.Spec.Strategy.Canary.Steps)canaryStatus := r.newStatus.CanaryStatusisLatest, err := r.batchControl.Verify(canaryStatus.CurrentStepIndex)if err != nil {return false, err} else if !isLatest {return false, nil}// fetch batchReleasebatch, err := r.batchControl.FetchBatchRelease()if err != nil {return false, err} else if batch.Status.ObservedReleasePlanHash != util.HashReleasePlanBatches(&batch.Spec.ReleasePlan) ||batch.Generation != batch.Status.ObservedGeneration {return false, nil}batchData := util.DumpJSON(batch.Status) cond := util.GetRolloutCondition(*r.newStatus, rolloutv1alpha1.RolloutConditionProgressing)cond.Message = fmt.Sprintf("Rollout is in step(%d/%d), and upgrade workload new versions", canaryStatus.CurrentStepIndex, steps)r.newStatus.Message = cond.Message// promote workload next batch releaseif !batchrelease.IsPromoted(r.rollout, batch, r.workload.IsInRollback) {r.recorder.Eventf(r.rollout, corev1.EventTypeNormal, "Progressing", fmt.Sprintf("start upgrade step(%d) canary pods with new versions", canaryStatus.CurrentStepIndex))klog.Infof("rollout(%s/%s) will promote batch from(%d) -> to(%d)", r.rollout.Namespace, r.rollout.Name, *batch.Spec.ReleasePlan.BatchPartition+1, canaryStatus.CurrentStepIndex)return r.batchControl.Promote(canaryStatus.CurrentStepIndex, r.workload.IsInRollback, false)}// check whether batchRelease is readyif batch.Status.CanaryStatus.CurrentBatchState != rolloutv1alpha1.ReadyBatchState ||batch.Status.CanaryStatus.CurrentBatch+1 < canaryStatus.CurrentStepIndex {return false, nil}r.recorder.Eventf(r.rollout, corev1.EventTypeNormal, "Progressing", fmt.Sprintf("upgrade step(%d) canary pods with new versions done", canaryStatus.CurrentStepIndex))return true, nil
}

未完待续

6. 处理RolloutPhaseTerminating阶段终止流程

Openkruise/rollouts 源码解读相关推荐

  1. Bert系列(二)——源码解读之模型主体

    本篇文章主要是解读模型主体代码modeling.py.在阅读这篇文章之前希望读者们对bert的相关理论有一定的了解,尤其是transformer的结构原理,网上的资料很多,本文内容对原理部分就不做过多 ...

  2. Bert系列(三)——源码解读之Pre-train

    https://www.jianshu.com/p/22e462f01d8c pre-train是迁移学习的基础,虽然Google已经发布了各种预训练好的模型,而且因为资源消耗巨大,自己再预训练也不现 ...

  3. linux下free源码,linux命令free源码解读:Procps free.c

    linux命令free源码解读 linux命令free源码解读:Procps free.c 作者:isayme 发布时间:September 26, 2011 分类:Linux 我们讨论的是linux ...

  4. nodeJS之eventproxy源码解读

    1.源码缩影 !(function (name, definition) { var hasDefine = typeof define === 'function', //检查上下文环境是否为AMD ...

  5. PyTorch 源码解读之即时编译篇

    点击上方"AI遇见机器学习",选择"星标"公众号 重磅干货,第一时间送达 作者丨OpenMMLab 来源丨https://zhuanlan.zhihu.com/ ...

  6. Alamofire源码解读系列(九)之响应封装(Response)

    本篇主要带来Alamofire中Response的解读 前言 在每篇文章的前言部分,我都会把我认为的本篇最重要的内容提前讲一下.我更想同大家分享这些顶级框架在设计和编码层次究竟有哪些过人的地方?当然, ...

  7. Feflow 源码解读

    Feflow 源码解读 Feflow(Front-end flow)是腾讯IVWEB团队的前端工程化解决方案,致力于改善多类型项目的开发流程中的规范和非业务相关的问题,可以让开发者将绝大部分精力集中在 ...

  8. spring-session源码解读 sesion

    2019独角兽企业重金招聘Python工程师标准>>> spring-session源码解读 sesion 博客分类: java spring 摘要: session通用策略 Ses ...

  9. 前端日报-20160527-underscore 源码解读

    underscore 源码解读 API文档浏览器 JavaScript 中加号操作符细节 抛弃 jQuery,拥抱原生 JS 从 0 开始学习 GitHub 系列之「加入 GitHub」 js实现克隆 ...

最新文章

  1. Leetcode每日必刷题库第80题,如何在不使用外部空间的情况下对有序数组去重?
  2. 错误:can't create 事务 lock on /var/lib/rpm/.rpm
  3. SpringCloud运行时刷新数据源相关配置
  4. Mysql是否开启binlog日志开启方法
  5. Apache支持ASP.NET方法浅析
  6. 承载(Host)通用语言执行时
  7. Go获取命令行参数及信号量处理
  8. 操盘软件富赢版V7 2016正式版
  9. 计算机质量监督检验报告,质量监督检验(检测)报告自动生成管理系统软件
  10. Linux系统升级及内核版本升级
  11. Python 3.8 官网文档(中文版附下载)
  12. DES入盒前的CPA攻击
  13. KeyTool 和 OpenSSL 相互转换 [转]
  14. 天玑9000和骁龙888plus哪个好
  15. 1.Linux基本简介和使用
  16. 【784. 字母大小写全排列】
  17. oracle 怎么看监听文件,【学习笔记】Oracle11G关于监听文件位置与监听文件大小限制...
  18. 相似图片搜索中的均值哈希(aHash)
  19. 社工获取ip tips
  20. 招商银行、伊利股份套利模型(1)

热门文章

  1. 怀揣坚定与执着,踏实稳步向前
  2. baidu卫兵世界杯智能提速 打破运营商OTT端阻力
  3. Java新AIO/NIO2:AsynchronousFileChannel以Future方式读
  4. 非科班关于gan的一点点学习
  5. 换工作穷3个月,换行业穷3年,怎么换工作,你真的知道吗?
  6. C# 关于默认打印纸张的设置
  7. MySQL之增删改以及外键作用
  8. 前端web之CSS基础(2)
  9. person search代码使用小教程
  10. ElasticSearch(全文检索服务的安装和使用)