From 3695ba28574f5a304c944d772ec46d1b84041c25 Mon Sep 17 00:00:00 2001 From: youngLiuHY Date: Mon, 1 Sep 2025 11:17:49 +0800 Subject: [PATCH 1/9] feat : rollback init --- go.mod | 2 +- go.sum | 18 + pkg/controllers/rolloutrun/control/control.go | 87 ++++ pkg/controllers/rolloutrun/executor/alias.go | 20 +- pkg/controllers/rolloutrun/executor/batch.go | 4 +- .../rolloutrun/executor/context.go | 114 +++++ .../rolloutrun/executor/default.go | 70 ++- .../rolloutrun/executor/do_command.go | 6 +- .../rolloutrun/executor/rollback.go | 427 ++++++++++++++++++ pkg/workload/collaset/release.go | 18 +- pkg/workload/info.go | 51 +++ pkg/workload/interface.go | 12 + pkg/workload/statefulset/release.go | 47 +- 13 files changed, 852 insertions(+), 24 deletions(-) create mode 100644 pkg/controllers/rolloutrun/executor/rollback.go diff --git a/go.mod b/go.mod index 0920557..308658c 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( k8s.io/klog/v2 v2.130.1 k8s.io/kubernetes v1.22.2 k8s.io/utils v0.0.0-20241210054802-24370beab758 - kusionstack.io/kube-api v0.7.4-0.20250727122744-2399b387a919 + kusionstack.io/kube-api v0.7.4-0.20250828082633-8a73d98e9bf8 kusionstack.io/kube-utils v0.2.1-0.20250613035327-11e9cdaec9d6 kusionstack.io/resourceconsist v0.0.2 sigs.k8s.io/controller-runtime v0.21.0 diff --git a/go.sum b/go.sum index 8b90bb3..3665ced 100644 --- a/go.sum +++ b/go.sum @@ -1039,6 +1039,24 @@ kusionstack.io/kube-api v0.7.3 h1:Fxj/E+aixUuCB0VekEvEx6ss24Jke7MvcbRwAXwVYBo= kusionstack.io/kube-api v0.7.3/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= kusionstack.io/kube-api v0.7.4-0.20250727122744-2399b387a919 h1:EMcsFMNMZO3oW7pqGOZd1nJC6YyQSt2RrcGdpC1J1gU= kusionstack.io/kube-api v0.7.4-0.20250727122744-2399b387a919/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250825051758-ecb7ab4bd47e h1:VK9EoO8P+ugxQE8sgCjUsTdVI90HsLJq8xAneSzhl7E= +kusionstack.io/kube-api v0.7.4-0.20250825051758-ecb7ab4bd47e/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250825063046-b78dd5a27a1d h1:eW004SEhcfGC/oGz2MQrZa88oDEMEES5Di9uP24Wp/g= +kusionstack.io/kube-api v0.7.4-0.20250825063046-b78dd5a27a1d/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250825073953-1db5a342fb25 h1:tjRlORACDHvIMudOp4ixEuz9MUzk+7Im4VDA5gE3mCc= +kusionstack.io/kube-api v0.7.4-0.20250825073953-1db5a342fb25/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250825080129-95c4ef2f91f2 h1:tiS+ABL0do2ccK+8tu/LuAf1p9Wt+Nvh+SAscUg5yKs= +kusionstack.io/kube-api v0.7.4-0.20250825080129-95c4ef2f91f2/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250825084555-df5d951e621b h1:EPM4aBdfXpIVexIO5F7mpmoRzJZVkRPaa1WlvmajSTU= +kusionstack.io/kube-api v0.7.4-0.20250825084555-df5d951e621b/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250825085826-67aefdbf9caa h1:8g+NRdhR04R2RE1hBCyGNyNqHOu60nX59nHSRnO6wR0= +kusionstack.io/kube-api v0.7.4-0.20250825085826-67aefdbf9caa/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250826035958-c1b30201ed5c h1:2165AAx76Vr1xDFgaHTzPVyvW83mPeB2TIh/AjVcIZU= +kusionstack.io/kube-api v0.7.4-0.20250826035958-c1b30201ed5c/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250826112948-7aeb5f5204de h1:c7L4xDVkHjGgzAkYYh1k4LCQD3j77pUxwo6buUCPIq0= +kusionstack.io/kube-api v0.7.4-0.20250826112948-7aeb5f5204de/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250828082633-8a73d98e9bf8 h1:1ARYh14b7ksYh1Ci3liyT9jhV2rV1Lyuvnxy8a5WQZU= +kusionstack.io/kube-api v0.7.4-0.20250828082633-8a73d98e9bf8/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= kusionstack.io/kube-utils v0.2.1-0.20250613035327-11e9cdaec9d6 h1:HYE6Wa8EzSlA6UmaTLtNKUgkB2mmasp6Ul69d3/SpK0= kusionstack.io/kube-utils v0.2.1-0.20250613035327-11e9cdaec9d6/go.mod h1:5Uy3GCJ1JEGqZw/Sp/uVnHBJN1t9wjY6USPSZ9s4idk= kusionstack.io/resourceconsist v0.0.2 h1:gf+c/LOMsiKoVR+GLzOomw8qcUbZbPckQLczZllNdVM= diff --git a/pkg/controllers/rolloutrun/control/control.go b/pkg/controllers/rolloutrun/control/control.go index ffa8e2e..384423d 100644 --- a/pkg/controllers/rolloutrun/control/control.go +++ b/pkg/controllers/rolloutrun/control/control.go @@ -24,6 +24,7 @@ import ( "strings" "github.com/go-logr/logr" + appsv1 "k8s.io/api/apps/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -276,6 +277,92 @@ func (c *CanaryReleaseControl) applyCanaryDefaults(canaryObj client.Object) { }) } +type RollbackReleaseControl struct { + workload workload.Accessor + control workload.RollbackReleaseControl + client client.Client +} + +func NewRollbackReleaseControl(impl workload.Accessor, c client.Client) *RollbackReleaseControl { + return &RollbackReleaseControl{ + workload: impl, + control: impl.(workload.RollbackReleaseControl), + client: c, + } +} + +func (c *RollbackReleaseControl) Initialize(ctx context.Context, info *workload.Info, ownerKind, ownerName, rolloutRun string, batchIndex int32) error { + // pre-check + if !c.control.Rollbackable() { + return fmt.Errorf("workload is not support rollback") + } + + if err := c.control.RollbackPreCheck(info.Object); err != nil { + return TerminalError(err) + } + + // add progressing annotation + pInfo := rolloutv1alpha1.ProgressingInfo{ + Kind: ownerKind, + RolloutName: ownerName, + RolloutID: rolloutRun, + Rollback: &rolloutv1alpha1.BatchProgressingInfo{ + CurrentBatchIndex: batchIndex, + }, + } + progress, _ := json.Marshal(pInfo) + + _, err := info.UpdateOnConflict(ctx, c.client, func(obj client.Object) error { + utils.MutateAnnotations(obj, func(annotations map[string]string) { + annotations[rolloutapi.AnnoRolloutProgressingInfo] = string(progress) + }) + return nil + }) + + return err +} + +func (c *RollbackReleaseControl) Revert(ctx context.Context, info *workload.Info) error { + // pre-check + if !c.control.Rollbackable() { + return fmt.Errorf("workload is not support rollback") + } + + if err := c.control.RollbackPreCheck(info.Object); err != nil { + return TerminalError(err) + } + + _, err := info.UpdateForRevisionOnConflict(ctx, c.client, func(obj client.Object, revison *appsv1.ControllerRevision) error { + return c.control.RevertRevision(obj, revison) + }) + + return err +} + +func (c *RollbackReleaseControl) UpdatePartition(ctx context.Context, info *workload.Info, expectedUpdated int32) (bool, error) { + ctx = clusterinfo.WithCluster(ctx, info.ClusterName) + obj := info.Object + return utils.UpdateOnConflict(ctx, c.client, c.client, obj, func() error { + return c.control.ApplyPartition(obj, expectedUpdated) + }) +} + +func (c *RollbackReleaseControl) Finalize(ctx context.Context, info *workload.Info) error { + // delete progressing annotation + changed, err := info.UpdateOnConflict(ctx, c.client, func(obj client.Object) error { + utils.MutateAnnotations(obj, func(annotations map[string]string) { + delete(annotations, rolloutapi.AnnoRolloutProgressingInfo) + }) + return nil + }) + + if changed { + logger := logr.FromContextOrDiscard(ctx) + logger.Info("delete progressing info on workload", "name", info.Name, "gvk", info.GroupVersionKind.String()) + } + return err +} + // TerminalError is an error that will not be retried but still be logged // and recorded in metrics. // diff --git a/pkg/controllers/rolloutrun/executor/alias.go b/pkg/controllers/rolloutrun/executor/alias.go index 0a017de..7efb79a 100644 --- a/pkg/controllers/rolloutrun/executor/alias.go +++ b/pkg/controllers/rolloutrun/executor/alias.go @@ -19,13 +19,15 @@ package executor import rolloutv1alpha1 "kusionstack.io/kube-api/rollout/v1alpha1" const ( - StepNone = rolloutv1alpha1.RolloutStepNone - StepPending = rolloutv1alpha1.RolloutStepPending - StepPreCanaryStepHook = rolloutv1alpha1.RolloutStepPreCanaryStepHook - StepPreBatchStepHook = rolloutv1alpha1.RolloutStepPreBatchStepHook - StepRunning = rolloutv1alpha1.RolloutStepRunning - StepPostCanaryStepHook = rolloutv1alpha1.RolloutStepPostCanaryStepHook - StepPostBatchStepHook = rolloutv1alpha1.RolloutStepPostBatchStepHook - StepSucceeded = rolloutv1alpha1.RolloutStepSucceeded - StepResourceRecycling = rolloutv1alpha1.RolloutStepResourceRecycling + StepNone = rolloutv1alpha1.RolloutStepNone + StepPending = rolloutv1alpha1.RolloutStepPending + StepPreCanaryStepHook = rolloutv1alpha1.RolloutStepPreCanaryStepHook + StepPreBatchStepHook = rolloutv1alpha1.RolloutStepPreBatchStepHook + StepPreRollbackStepHook = rolloutv1alpha1.RolloutStepPreRollbackStepHook + StepRunning = rolloutv1alpha1.RolloutStepRunning + StepPostCanaryStepHook = rolloutv1alpha1.RolloutStepPostCanaryStepHook + StepPostBatchStepHook = rolloutv1alpha1.RolloutStepPostBatchStepHook + StepPostRollbackStepHook = rolloutv1alpha1.RolloutStepPostRollbackStepHook + StepSucceeded = rolloutv1alpha1.RolloutStepSucceeded + StepResourceRecycling = rolloutv1alpha1.RolloutStepResourceRecycling ) diff --git a/pkg/controllers/rolloutrun/executor/batch.go b/pkg/controllers/rolloutrun/executor/batch.go index e619a02..e5098ee 100644 --- a/pkg/controllers/rolloutrun/executor/batch.go +++ b/pkg/controllers/rolloutrun/executor/batch.go @@ -221,7 +221,7 @@ func (e *batchExecutor) doBatchUpgrading(ctx *ExecutorContext) (bool, time.Durat allWorkloadReady = false logger.V(3).Info("still waiting for target to be ready", "target", item.CrossClusterObjectNameReference) - expectedReplicas, err := e.calculateExpectedReplicasBySlidingWindow(status, currentBatchExpectedReplicas, item.ReplicaSlidingWindow) + expectedReplicas, err := calculateExpectedReplicasBySlidingWindow(status, currentBatchExpectedReplicas, item.ReplicaSlidingWindow) if err != nil { return false, retryStop, err } @@ -250,7 +250,7 @@ func (e *batchExecutor) doBatchUpgrading(ctx *ExecutorContext) (bool, time.Durat // calculateExpectedReplicasBySlidingWindow calculate expected replicas by sliding window // if window is nil, return currentBatchExpectedReplicas // if window is not nil, return min(currentBatchExpectedReplicas, updatedAvailableReplicas + increment) -func (e *batchExecutor) calculateExpectedReplicasBySlidingWindow(status rolloutv1alpha1.RolloutWorkloadStatus, currentBatchExpectedReplicas int32, window *intstr.IntOrString) (int32, error) { +func calculateExpectedReplicasBySlidingWindow(status rolloutv1alpha1.RolloutWorkloadStatus, currentBatchExpectedReplicas int32, window *intstr.IntOrString) (int32, error) { if window == nil { return currentBatchExpectedReplicas, nil } diff --git a/pkg/controllers/rolloutrun/executor/context.go b/pkg/controllers/rolloutrun/executor/context.go index 6471523..29d3b29 100644 --- a/pkg/controllers/rolloutrun/executor/context.go +++ b/pkg/controllers/rolloutrun/executor/context.go @@ -86,6 +86,28 @@ func (c *ExecutorContext) Initialize() { newStatus.BatchStatus.Records = newStatus.BatchStatus.Records[:specBatchSize] } } + + // init RollbackStatus + if c.RolloutRun.Spec.Rollback != nil { + if newStatus.RollbackStatus == nil { + newStatus.RollbackStatus = &rolloutv1alpha1.RolloutRunBatchStatus{} + } + // resize records + specBatchSize := len(c.RolloutRun.Spec.Rollback.Batches) + statusBatchSize := len(newStatus.RollbackStatus.Records) + if specBatchSize > statusBatchSize { + for i := 0; i < specBatchSize-statusBatchSize; i++ { + newStatus.RollbackStatus.Records = append(newStatus.RollbackStatus.Records, + rolloutv1alpha1.RolloutRunStepStatus{ + Index: ptr.To(int32(statusBatchSize + i)), + State: StepNone, + }, + ) + } + } else if specBatchSize < statusBatchSize { + newStatus.RollbackStatus.Records = newStatus.RollbackStatus.Records[:specBatchSize] + } + } }) } @@ -109,6 +131,9 @@ func (c *ExecutorContext) GetWebhooksAndLatestStatusBy(hookType rolloutv1alpha1. var webhookStatuses []rolloutv1alpha1.RolloutWebhookStatus if c.inCanary() { webhookStatuses = newStatus.CanaryStatus.Webhooks + } else if c.inRollback() { + index := newStatus.RollbackStatus.CurrentBatchIndex + webhookStatuses = newStatus.RollbackStatus.Records[index].Webhooks } else { index := newStatus.BatchStatus.CurrentBatchIndex webhookStatuses = newStatus.BatchStatus.Records[index].Webhooks @@ -129,6 +154,9 @@ func (c *ExecutorContext) SetWebhookStatus(status rolloutv1alpha1.RolloutWebhook newStatus := c.NewStatus if c.inCanary() { newStatus.CanaryStatus.Webhooks = appendWebhookStatus(newStatus.CanaryStatus.Webhooks, status) + } else if c.inRollback() { + index := newStatus.RollbackStatus.CurrentBatchIndex + newStatus.RollbackStatus.Records[index].Webhooks = appendWebhookStatus(newStatus.RollbackStatus.Records[index].Webhooks, status) } else { index := newStatus.BatchStatus.CurrentBatchIndex newStatus.BatchStatus.Records[index].Webhooks = appendWebhookStatus(newStatus.BatchStatus.Records[index].Webhooks, status) @@ -142,6 +170,8 @@ func isFinalStepState(state rolloutv1alpha1.RolloutStepState) bool { func (c *ExecutorContext) GetCurrentState() (string, rolloutv1alpha1.RolloutStepState) { if c.inCanary() { return "canary", c.NewStatus.CanaryStatus.State + } else if c.inRollback() { + return "rollback", c.NewStatus.RollbackStatus.CurrentBatchState } else { return "batch", c.NewStatus.BatchStatus.CurrentBatchState } @@ -158,6 +188,15 @@ func (c *ExecutorContext) MoveToNextState(nextState rolloutv1alpha1.RolloutStepS } else if isFinalStepState(nextState) { newStatus.CanaryStatus.FinishTime = ptr.To(metav1.Now()) } + } else if c.inRollback() { + index := newStatus.RollbackStatus.CurrentBatchIndex + newStatus.RollbackStatus.CurrentBatchState = nextState + newStatus.RollbackStatus.Records[index].State = nextState + if nextState == StepPreRollbackStepHook { + newStatus.RollbackStatus.Records[index].StartTime = ptr.To(metav1.Now()) + } else if isFinalStepState(nextState) { + newStatus.RollbackStatus.Records[index].FinishTime = ptr.To(metav1.Now()) + } } else { index := newStatus.BatchStatus.CurrentBatchIndex newStatus.BatchStatus.CurrentBatchState = nextState @@ -180,6 +219,21 @@ func (c *ExecutorContext) SkipCurrentRelease() { newStatus.CanaryStatus.StartTime = ptr.To(metav1.Now()) } newStatus.CanaryStatus.FinishTime = ptr.To(metav1.Now()) + } else if c.inRollback() { + newStatus.RollbackStatus.CurrentBatchIndex = int32(len(c.RolloutRun.Spec.Rollback.Batches) - 1) + newStatus.RollbackStatus.CurrentBatchState = StepSucceeded + for i := range newStatus.RollbackStatus.Records { + if newStatus.RollbackStatus.Records[i].State == StepNone || + newStatus.RollbackStatus.Records[i].State == StepPending { + newStatus.RollbackStatus.Records[i].State = StepSucceeded + } + if newStatus.RollbackStatus.Records[i].StartTime == nil { + newStatus.RollbackStatus.Records[i].StartTime = ptr.To(metav1.Now()) + } + if newStatus.RollbackStatus.Records[i].FinishTime == nil { + newStatus.RollbackStatus.Records[i].FinishTime = ptr.To(metav1.Now()) + } + } } else { newStatus.BatchStatus.CurrentBatchIndex = int32(len(c.RolloutRun.Spec.Batch.Batches) - 1) newStatus.BatchStatus.CurrentBatchState = StepSucceeded @@ -258,6 +312,51 @@ func (r *ExecutorContext) inCanary() bool { return false } +func (r *ExecutorContext) inBatchGray() bool { + // todo: need to consider case of every batch gray + r.Initialize() + run := r.RolloutRun + newStatus := r.NewStatus + if newStatus.BatchStatus == nil { + return false + } + + currentBatchIndex := newStatus.BatchStatus.CurrentBatchIndex + currentBatch := run.Spec.Batch.Batches[currentBatchIndex] + if currentBatch.Traffic == nil || int(currentBatchIndex+1) == len(run.Spec.Batch.Batches) { + return false + } + + currentBatchState := newStatus.BatchStatus.CurrentBatchState + if !isFinalStepState(currentBatchState) { + return true + } + + if int(currentBatchIndex+1) < len(run.Spec.Batch.Batches) { + nextBatch := run.Spec.Batch.Batches[currentBatchIndex+1] + if nextBatch.Traffic != nil { + return true + } + } + + return false +} + +func (r *ExecutorContext) inRollback() bool { + r.Initialize() + run := r.RolloutRun + newStatus := r.NewStatus + if newStatus.RollbackStatus == nil { + return false + } + if run.Spec.Rollback != nil { + if newStatus.Phase != rolloutv1alpha1.RolloutRunPhaseRollbacked { + return true + } + } + return false +} + func (r *ExecutorContext) makeRolloutWebhookReview(hookType rolloutv1alpha1.HookType, webhook rolloutv1alpha1.RolloutWebhook) rolloutv1alpha1.RolloutWebhookReview { r.Initialize() @@ -284,6 +383,12 @@ func (r *ExecutorContext) makeRolloutWebhookReview(hookType rolloutv1alpha1.Hook Targets: rolloutRun.Spec.Canary.Targets, Properties: rolloutRun.Spec.Canary.Properties, } + } else if r.inRollback() { + review.Spec.Rollback = &rolloutv1alpha1.RolloutWebhookReviewBatch{ + BatchIndex: newStatus.RollbackStatus.CurrentBatchIndex, + Targets: rolloutRun.Spec.Rollback.Batches[newStatus.RollbackStatus.CurrentBatchIndex].Targets, + Properties: rolloutRun.Spec.Rollback.Batches[newStatus.RollbackStatus.CurrentBatchIndex].Properties, + } } else { review.Spec.Batch = &rolloutv1alpha1.RolloutWebhookReviewBatch{ BatchIndex: newStatus.BatchStatus.CurrentBatchIndex, @@ -322,3 +427,12 @@ func (e *ExecutorContext) GetBatchLogger() logr.Logger { func (e *ExecutorContext) GetCanaryLogger() logr.Logger { return e.GetLogger().WithValues("step", "canary") } + +func (e *ExecutorContext) GetRollbackLogger() logr.Logger { + e.Initialize() + l := e.GetLogger().WithValues("step", "rollback") + if e.NewStatus != nil && e.NewStatus.RollbackStatus != nil { + l = l.WithValues("rollbackIndex", e.NewStatus.RollbackStatus.CurrentBatchIndex) + } + return l +} diff --git a/pkg/controllers/rolloutrun/executor/default.go b/pkg/controllers/rolloutrun/executor/default.go index ff5ba99..69aa438 100644 --- a/pkg/controllers/rolloutrun/executor/default.go +++ b/pkg/controllers/rolloutrun/executor/default.go @@ -12,19 +12,22 @@ import ( ) type Executor struct { - logger logr.Logger - canary *canaryExecutor - batch *batchExecutor + logger logr.Logger + canary *canaryExecutor + batch *batchExecutor + rollback *rollbackExecutor } func NewDefaultExecutor(logger logr.Logger) *Executor { webhookExec := newWebhookExecutor(time.Second) canaryExec := newCanaryExecutor(webhookExec) batchExec := newBatchExecutor(webhookExec) + rollbackExec := newRollbackExecutor(webhookExec) e := &Executor{ - logger: logger, - canary: canaryExec, - batch: batchExec, + logger: logger, + canary: canaryExec, + batch: batchExec, + rollback: rollbackExec, } return e } @@ -67,6 +70,11 @@ func (r *Executor) lifecycle(executorContext *ExecutorContext) (done bool, resul return false, result, nil } + if rolloutRun.Spec.Rollback != nil && len(rolloutRun.Spec.Rollback.Batches) > 0 && newStatus.Phase != rolloutv1alpha1.RolloutRunPhaseRollbacking { + newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseRollbacking + return false, result, nil + } + switch newStatus.Phase { case rolloutv1alpha1.RolloutRunPhaseInitial: newStatus.Phase = rolloutv1alpha1.RolloutRunPhasePreRollout @@ -86,12 +94,18 @@ func (r *Executor) lifecycle(executorContext *ExecutorContext) (done bool, resul if processingDone { newStatus.Phase = rolloutv1alpha1.RolloutRunPhasePostRollout } + case rolloutv1alpha1.RolloutRunPhaseRollbacking: + var rollbacked bool + rollbacked, result, err = r.doRollbacking(executorContext) + if rollbacked { + newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseRollbacked + } case rolloutv1alpha1.RolloutRunPhasePostRollout: newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseSucceeded case rolloutv1alpha1.RolloutRunPhasePaused: // rolloutRun is paused, do not requeue result.Requeue = false - case rolloutv1alpha1.RolloutRunPhaseSucceeded, rolloutv1alpha1.RolloutRunPhaseCanceled: + case rolloutv1alpha1.RolloutRunPhaseSucceeded, rolloutv1alpha1.RolloutRunPhaseCanceled, rolloutv1alpha1.RolloutRunPhaseRollbacked: done = true result.Requeue = false } @@ -156,6 +170,13 @@ func (r *Executor) doCanceling(ctx *ExecutorContext) (bool, ctrl.Result, error) } return canceled, result, nil } + if rolloutRun.Spec.Rollback != nil && len(rolloutRun.Spec.Rollback.Batches) > 0 { + // init RollbackStatus + if len(newStatus.RollbackStatus.CurrentBatchState) == 0 { + newStatus.RollbackStatus.CurrentBatchState = StepNone + } + return r.rollback.Cancel(ctx) + } if rolloutRun.Spec.Batch != nil && len(rolloutRun.Spec.Batch.Batches) > 0 { // init BatchStatus if len(newStatus.BatchStatus.CurrentBatchState) == 0 { @@ -166,3 +187,38 @@ func (r *Executor) doCanceling(ctx *ExecutorContext) (bool, ctrl.Result, error) return true, ctrl.Result{Requeue: true}, nil } + +func (r *Executor) doRollbacking(ctx *ExecutorContext) (bool, ctrl.Result, error) { + rolloutRun := ctx.RolloutRun + newStatus := ctx.NewStatus + + logger := ctx.GetLogger() + + if newStatus.Error != nil { + // if error occurred, do nothing + return false, ctrl.Result{Requeue: true}, nil + } + + if rolloutRun.Spec.Rollback != nil && len(rolloutRun.Spec.Rollback.Batches) > 0 { + // init RollbackStatus + if len(newStatus.RollbackStatus.CurrentBatchState) == 0 { + newStatus.RollbackStatus.CurrentBatchState = StepNone + } + preCurrentBatchIndex := newStatus.RollbackStatus.CurrentBatchIndex + preCurrentBatchState := newStatus.RollbackStatus.CurrentBatchState + defer func() { + if preCurrentBatchIndex != newStatus.RollbackStatus.CurrentBatchIndex || + preCurrentBatchState != newStatus.RollbackStatus.CurrentBatchState { + logger.Info("rollback batch state trasition", + "current.index", preCurrentBatchIndex, + "current.state", preCurrentBatchState, + "next.index", newStatus.RollbackStatus.CurrentBatchIndex, + "next.state", newStatus.RollbackStatus.CurrentBatchState, + ) + } + }() + return r.rollback.Do(ctx) + } + + return true, ctrl.Result{Requeue: true}, nil +} diff --git a/pkg/controllers/rolloutrun/executor/do_command.go b/pkg/controllers/rolloutrun/executor/do_command.go index a2bad55..34dbca7 100644 --- a/pkg/controllers/rolloutrun/executor/do_command.go +++ b/pkg/controllers/rolloutrun/executor/do_command.go @@ -23,7 +23,11 @@ func (r *Executor) doCommand(ctx *ExecutorContext) ctrl.Result { newStatus.Phase = rolloutv1alpha1.RolloutRunPhasePausing case rolloutapis.AnnoManualCommandResume, rolloutapis.AnnoManualCommandContinue: // nolint if newStatus.Phase == rolloutv1alpha1.RolloutRunPhasePaused { - newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseProgressing + if rolloutRun.Spec.Rollback != nil && len(rolloutRun.Spec.Rollback.Batches) > 0 { + newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseRollbacking + } else { + newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseProgressing + } } case rolloutapis.AnnoManualCommandRetry: if batchError != nil { diff --git a/pkg/controllers/rolloutrun/executor/rollback.go b/pkg/controllers/rolloutrun/executor/rollback.go new file mode 100644 index 0000000..ee1d650 --- /dev/null +++ b/pkg/controllers/rolloutrun/executor/rollback.go @@ -0,0 +1,427 @@ +/** + * Copyright 2024 The KusionStack Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package executor + +import ( + "fmt" + "time" + + "github.com/go-logr/logr" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + rolloutv1alpha1 "kusionstack.io/kube-api/rollout/v1alpha1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + "kusionstack.io/rollout/pkg/controllers/rolloutrun/control" + "kusionstack.io/rollout/pkg/workload" +) + +func newDoRollbackError(reason, msg string) *rolloutv1alpha1.CodeReasonMessage { + return &rolloutv1alpha1.CodeReasonMessage{ + Code: "DoRollbackError", + Reason: reason, + Message: msg, + } +} + +type rollbackExecutor struct { + webhook webhookExecutor + stateEngine *stepStateEngine +} + +func newRollbackExecutor(webhook webhookExecutor) *rollbackExecutor { + e := &rollbackExecutor{ + webhook: webhook, + stateEngine: newStepStateEngine(), + } + + e.stateEngine.add(StepNone, StepPending, e.doPausing, e.release) + e.stateEngine.add(StepPending, StepPreRollbackStepHook, skipStep, e.release) + e.stateEngine.add(StepPreRollbackStepHook, StepRunning, e.doPreStepHook, e.release) + e.stateEngine.add(StepRunning, StepPostRollbackStepHook, e.doBatchUpgrading, e.release) + e.stateEngine.add(StepPostRollbackStepHook, StepResourceRecycling, e.doPostStepHook, e.release) + e.stateEngine.add(StepResourceRecycling, StepSucceeded, e.doFinalize, e.release) + e.stateEngine.add(StepSucceeded, "", skipStep, skipStep) + + return e +} + +func (e *rollbackExecutor) init(ctx *ExecutorContext) (done bool) { + logger := ctx.GetRollbackLogger() + if !e.isSupported(ctx) { + // skip rollback release if workload accessor don't support it. + logger.Info("workload accessor don't support rollback release, skip it") + ctx.SkipCurrentRelease() + return true + } + + if ctx.inCanary() { + ctx.TrafficManager.With(ctx.RolloutRun.Spec.Canary.Targets, ctx.RolloutRun.Spec.Canary.Traffic) + } else if ctx.inBatchGray() { + currentBatchIndex := ctx.NewStatus.BatchStatus.CurrentBatchIndex + currentBatch := ctx.RolloutRun.Spec.Batch.Batches[currentBatchIndex] + ctx.TrafficManager.With(currentBatch.Targets, currentBatch.Traffic) + } + return false +} + +func (e *rollbackExecutor) Do(ctx *ExecutorContext) (done bool, result ctrl.Result, err error) { + if e.init(ctx) { + return true, ctrl.Result{Requeue: true}, nil + } + + newStatus := ctx.NewStatus + currentBatchIndex := newStatus.RollbackStatus.CurrentBatchIndex + currentState := newStatus.RollbackStatus.CurrentBatchState + + // revert workload version and reset canary route before rollback stateEngine start + if currentBatchIndex == 0 && currentState == StepNone { + done, err := e.revert(ctx) + if !done { + return false, ctrl.Result{}, err + } + + done, retry := e.deleteCanaryRoute(ctx) + if !done { + return false, ctrl.Result{RequeueAfter: retry}, nil + } + + if ctx.inCanary() { + done, retry, err = e.recycle(ctx) + if !done { + switch retry { + case retryStop: + result = ctrl.Result{} + default: + result = ctrl.Result{RequeueAfter: retry} + } + return false, result, err + } + } + } + + stepDone, result, err := e.stateEngine.do(ctx, currentState) + if err != nil { + return false, result, err + } + if !stepDone { + return false, result, nil + } + + if int(currentBatchIndex+1) < len(ctx.RolloutRun.Spec.Rollback.Batches) { + // move to next batch + newStatus.RollbackStatus.CurrentBatchState = StepNone + newStatus.RollbackStatus.CurrentBatchIndex = currentBatchIndex + 1 + return false, result, nil + } + + recycleDone, retry := e.recyleCanaryResource(ctx) + if !recycleDone { + return false, ctrl.Result{RequeueAfter: retry}, nil + } + + return true, result, nil +} + +func (e *rollbackExecutor) Cancel(ctx *ExecutorContext) (done bool, result ctrl.Result, err error) { + done = e.init(ctx) + if done { + return true, ctrl.Result{Requeue: true}, nil + } + + return e.stateEngine.cancel(ctx, ctx.NewStatus.RollbackStatus.CurrentBatchState) +} + +func (e *rollbackExecutor) isSupported(ctx *ExecutorContext) bool { + _, ok := ctx.Accessor.(workload.RollbackReleaseControl) + return ok +} + +func (e *rollbackExecutor) doPausing(ctx *ExecutorContext) (bool, time.Duration, error) { + rolloutRunName := ctx.RolloutRun.Name + newStatus := ctx.NewStatus + currentBatchIndex := newStatus.RollbackStatus.CurrentBatchIndex + currentBatch := ctx.RolloutRun.Spec.Rollback.Batches[currentBatchIndex] + + rollbackControl := control.NewRollbackReleaseControl(ctx.Accessor, ctx.Client) + + for _, item := range currentBatch.Targets { + wi := ctx.Workloads.Get(item.Cluster, item.Name) + if wi == nil { + return false, retryStop, newWorkloadNotFoundError(item.CrossClusterObjectNameReference) + } + err := rollbackControl.Initialize(ctx, wi, ctx.OwnerKind, ctx.OwnerName, rolloutRunName, currentBatchIndex) + if err != nil { + return false, retryStop, err + } + } + + if ctx.RolloutRun.Spec.Rollback.Batches[currentBatchIndex].Breakpoint { + ctx.Pause() + } + return true, retryImmediately, nil +} + +func (e *rollbackExecutor) doPreStepHook(ctx *ExecutorContext) (bool, time.Duration, error) { + return e.webhook.Do(ctx, rolloutv1alpha1.PreRollbackStepHook) +} + +func (e *rollbackExecutor) doPostStepHook(ctx *ExecutorContext) (bool, time.Duration, error) { + done, retry, err := e.webhook.Do(ctx, rolloutv1alpha1.PostRollbackStepHook) + if done { + ctx.Pause() + } + return done, retry, err +} + +// doBatchUpgrading process upgrading state +func (e *rollbackExecutor) doBatchUpgrading(ctx *ExecutorContext) (bool, time.Duration, error) { + rolloutRun := ctx.RolloutRun + newStatus := ctx.NewStatus + currentBatchIndex := newStatus.RollbackStatus.CurrentBatchIndex + currentBatch := rolloutRun.Spec.Rollback.Batches[currentBatchIndex] + + logger := ctx.GetBatchLogger() + + rollbackControl := control.NewRollbackReleaseControl(ctx.Accessor, ctx.Client) + + batchTargetStatuses := make([]rolloutv1alpha1.RolloutWorkloadStatus, 0) + + allWorkloadReady := true + for _, item := range currentBatch.Targets { + info := ctx.Workloads.Get(item.Cluster, item.Name) + if info == nil { + // If the target workload does not exist, the retries will stop. + return false, retryStop, newWorkloadNotFoundError(item.CrossClusterObjectNameReference) + } + + status := info.APIStatus() + batchTargetStatuses = append(batchTargetStatuses, info.APIStatus()) + + currentBatchExpectedReplicas, _ := workload.CalculateUpdatedReplicas(&status.Replicas, item.Replicas) + + if info.CheckUpdatedReady(currentBatchExpectedReplicas) { + // if the target is ready, we will not change partition + continue + } + + allWorkloadReady = false + logger.V(3).Info("still waiting for target to be ready", "target", item.CrossClusterObjectNameReference) + + expectedReplicas, err := calculateExpectedReplicasBySlidingWindow(status, currentBatchExpectedReplicas, item.ReplicaSlidingWindow) + if err != nil { + return false, retryStop, err + } + + // ensure partition: upgradePartition is an idempotent function + changed, err := rollbackControl.UpdatePartition(ctx, info, expectedReplicas) + if err != nil { + return false, retryStop, err + } + if changed { + logger.V(2).Info("upgrade target partition", "target", item.CrossClusterObjectNameReference, "partition", expectedReplicas) + } + } + + // update target status in rollback + newStatus.RollbackStatus.Records[currentBatchIndex].Targets = batchTargetStatuses + + if allWorkloadReady { + return true, retryImmediately, nil + } + + // wait for next reconcile + return false, retryDefault, nil +} + +func (e *rollbackExecutor) revert(ctx *ExecutorContext) (bool, error) { + newStatus := ctx.NewStatus + currentBatchIndex := newStatus.RollbackStatus.CurrentBatchIndex + currentBatch := ctx.RolloutRun.Spec.Rollback.Batches[currentBatchIndex] + + rollbackControl := control.NewRollbackReleaseControl(ctx.Accessor, ctx.Client) + + for _, item := range currentBatch.Targets { + wi := ctx.Workloads.Get(item.Cluster, item.Name) + if wi == nil { + return false, newWorkloadNotFoundError(item.CrossClusterObjectNameReference) + } + err := rollbackControl.Revert(ctx, wi) + if err != nil { + return false, err + } + } + return true, nil +} + +func (e *rollbackExecutor) deleteCanaryRoute(ctx *ExecutorContext) (bool, time.Duration) { + if !ctx.inCanary() && !ctx.inBatchGray() { + return true, retryDefault + } + + done, retry := e.modifyTraffic(ctx, "deleteCanaryRoute") + if !done { + return false, retry + } + + return true, retryDefault +} + +func (e *rollbackExecutor) deleteCanaryWorkloads(ctx *ExecutorContext) (bool, time.Duration, error) { + if !ctx.inCanary() { + return true, retryDefault, nil + } + + rolloutRun := ctx.RolloutRun + releaseControl := control.NewCanaryReleaseControl(ctx.Accessor, ctx.Client) + + for _, item := range rolloutRun.Spec.Canary.Targets { + wi := ctx.Workloads.Get(item.Cluster, item.Name) + if wi == nil { + return false, retryStop, newWorkloadNotFoundError(item.CrossClusterObjectNameReference) + } + + if err := releaseControl.Finalize(ctx, wi); err != nil { + return false, retryStop, newDoRollbackError( + "FailedFinalize", + fmt.Sprintf("failed to delete canary resource for workload(%s), err: %v", item.CrossClusterObjectNameReference, err), + ) + } + } + + return true, retryDefault, nil +} + +func (e *rollbackExecutor) recyleCanaryResource(ctx *ExecutorContext) (bool, time.Duration) { + if !ctx.inCanary() && !ctx.inBatchGray() { + return true, retryDefault + } + + done, retry := e.modifyTraffic(ctx, "resetRoute") + if !done { + return false, retry + } + + done, retry = e.modifyTraffic(ctx, "deleteForkedBackends") + if !done { + return false, retry + } + + return true, retryDefault +} + +func (e *rollbackExecutor) recycle(ctx *ExecutorContext) (bool, time.Duration, error) { + done, retry, err := e.deleteCanaryWorkloads(ctx) + if !done { + return false, retry, err + } + + done, retry = e.recyleCanaryResource(ctx) + if !done { + return false, retry, nil + } + + return true, retryDefault, nil +} + +func (e *rollbackExecutor) doFinalize(ctx *ExecutorContext) (bool, time.Duration, error) { + // recycling only work on last batch now + if int(ctx.NewStatus.RollbackStatus.CurrentBatchIndex+1) < len(ctx.RolloutRun.Spec.Rollback.Batches) { + return true, retryImmediately, nil + } + return e.release(ctx) +} + +func (e *rollbackExecutor) release(ctx *ExecutorContext) (bool, time.Duration, error) { + // frstly try to stop webhook + e.webhook.Cancel(ctx) + + // try to finalize all workloads + allTargets := map[rolloutv1alpha1.CrossClusterObjectNameReference]bool{} + // finalize rollback release + rollbackControl := control.NewRollbackReleaseControl(ctx.Accessor, ctx.Client) + + for _, item := range ctx.RolloutRun.Spec.Rollback.Batches { + for _, target := range item.Targets { + allTargets[target.CrossClusterObjectNameReference] = true + } + } + + var finalizeErrs []error + + for target := range allTargets { + wi := ctx.Workloads.Get(target.Cluster, target.Name) + if wi == nil { + // ignore not found workload + continue + } + err := rollbackControl.Finalize(ctx, wi) + if err != nil { + // try our best to finalize all workloasd + finalizeErrs = append(finalizeErrs, err) + continue + } + } + + if len(finalizeErrs) > 0 { + return false, retryDefault, utilerrors.NewAggregate(finalizeErrs) + } + + return true, retryImmediately, nil +} + +func (e *rollbackExecutor) modifyTraffic(ctx *ExecutorContext, op string) (bool, time.Duration) { + logger := ctx.GetCanaryLogger() + rolloutRun := ctx.RolloutRun + currentBatchIndex := ctx.NewStatus.BatchStatus.CurrentBatchIndex + opResult := controllerutil.OperationResultNone + + if rolloutRun.Spec.Canary.Traffic == nil && rolloutRun.Spec.Batch.Batches[currentBatchIndex].Traffic == nil { + logger.V(3).Info("traffic is nil, skip modify traffic") + return true, retryImmediately + } + + goctx := logr.NewContext(ctx.Context, logger) + + var err error + switch op { + case "deleteCanaryRoute": + opResult, err = ctx.TrafficManager.DeleteCanaryRoute(goctx) + case "resetRoute": + opResult, err = ctx.TrafficManager.ResetRoute(goctx) + case "deleteForkedBackends": + opResult, err = ctx.TrafficManager.DeleteForkedBackends(goctx) + } + if err != nil { + logger.Error(err, "failed to modify traffic", "operation", op) + return false, retryDefault + } + + if opResult != controllerutil.OperationResultNone { + logger.Info("modify traffic routing", "operation", op, "result", opResult) + // check next time + return false, retryDefault + } + + // 1.b. waiting for traffic + ready := ctx.TrafficManager.CheckReady(ctx) + if !ready { + return false, retryDefault + } + + return true, retryImmediately +} diff --git a/pkg/workload/collaset/release.go b/pkg/workload/collaset/release.go index 918edb3..a213e3c 100644 --- a/pkg/workload/collaset/release.go +++ b/pkg/workload/collaset/release.go @@ -20,6 +20,7 @@ import ( "fmt" "maps" + appsv1 "k8s.io/api/apps/v1" "k8s.io/utils/ptr" operatingv1alpha1 "kusionstack.io/kube-api/apps/v1alpha1" rolloutv1alpha1 "kusionstack.io/kube-api/rollout/v1alpha1" @@ -29,8 +30,9 @@ import ( ) var ( - _ workload.CanaryReleaseControl = &accessorImpl{} - _ workload.BatchReleaseControl = &accessorImpl{} + _ workload.CanaryReleaseControl = &accessorImpl{} + _ workload.BatchReleaseControl = &accessorImpl{} + _ workload.RollbackReleaseControl = &accessorImpl{} ) func (c *accessorImpl) BatchPreCheck(object client.Object) error { @@ -95,6 +97,18 @@ func (c *accessorImpl) ApplyCanaryPatch(object client.Object, podTemplatePatch * return nil } +func (c *accessorImpl) Rollbackable() bool { + return false +} + +func (c *accessorImpl) RollbackPreCheck(object client.Object) error { + return nil +} + +func (c *accessorImpl) RevertRevision(object client.Object, revision *appsv1.ControllerRevision) error { + return nil +} + func applyPodTemplateMetadataPatch(obj *operatingv1alpha1.CollaSet, patch *rolloutv1alpha1.MetadataPatch) { if patch == nil { return diff --git a/pkg/workload/info.go b/pkg/workload/info.go index df5f939..406a354 100644 --- a/pkg/workload/info.go +++ b/pkg/workload/info.go @@ -20,7 +20,9 @@ import ( "context" "fmt" "reflect" + "sort" + appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/conversion" @@ -127,6 +129,55 @@ func (o *Info) UpdateOnConflict(ctx context.Context, c client.Client, mutateFn f return updated, nil } +func (o *Info) UpdateForRevisionOnConflict(ctx context.Context, c client.Client, mutateFn func(client.Object, *appsv1.ControllerRevision) error) (bool, error) { + ctx = clusterinfo.WithCluster(ctx, o.ClusterName) + obj := o.Object + lastRevision, err := o.GetPreviousRevision(ctx, c) + if err != nil { + return false, err + } + updated, err := utils.UpdateOnConflict(ctx, c, c, obj, func() error { + return mutateFn(obj, lastRevision) + }) + if err != nil { + return false, err + } + if updated { + o.Object = obj + } + return updated, nil +} + +func (o *Info) GetPreviousRevision(ctx context.Context, c client.Client) (*appsv1.ControllerRevision, error) { + crs := &appsv1.ControllerRevisionList{} + err := c.List(clusterinfo.WithCluster(ctx, o.GetClusterName()), crs, &client.ListOptions{ + Namespace: o.GetNamespace(), + }) + if err != nil { + return nil, err + } + + revisions := make([]*appsv1.ControllerRevision, 0) + for _, revision := range crs.Items { + for _, owner := range revision.OwnerReferences { + if owner.Kind == o.Kind && owner.Name == o.Name { + revisions = append(revisions, &revision) + break + } + } + } + + if len(revisions) < 2 { + return nil, fmt.Errorf("no previous available controllerrevision found for workload %s/%s", o.Kind, o.Name) + } + + sort.Slice(revisions, func(i, j int) bool { + return revisions[i].Revision > revisions[j].Revision + }) + + return revisions[len(revisions)-2], nil +} + func IsWaitingRollout(info Info) bool { if len(info.Status.StableRevision) != 0 && info.Status.StableRevision != info.Status.UpdatedRevision && diff --git a/pkg/workload/interface.go b/pkg/workload/interface.go index 5bead8b..9321c6a 100644 --- a/pkg/workload/interface.go +++ b/pkg/workload/interface.go @@ -17,6 +17,7 @@ package workload import ( "context" + appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/runtime/schema" "kusionstack.io/kube-api/rollout/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -57,6 +58,17 @@ type CanaryReleaseControl interface { ApplyCanaryPatch(canary client.Object, podTemplatePatch *v1alpha1.MetadataPatch) error } +type RollbackReleaseControl interface { + // Rollbackable indicates whether this workload type can be rollbacked. + Rollbackable() bool + // RollbackPreCheck checks object before rollback release. + RollbackPreCheck(obj client.Object) error + // Revert the workload revision to the stable. + RevertRevision(obj client.Object, revision *appsv1.ControllerRevision) error + // ApplyPartition use expectedUpdated replicas to calculate partition and apply it to the workload. + ApplyPartition(obj client.Object, expectedUpdatedReplicas int32) error +} + type ReplicaObjectControl interface { // RepliceType returns the type of replica object ReplicaType() schema.GroupVersionKind diff --git a/pkg/workload/statefulset/release.go b/pkg/workload/statefulset/release.go index 61f3748..52234bb 100644 --- a/pkg/workload/statefulset/release.go +++ b/pkg/workload/statefulset/release.go @@ -17,10 +17,12 @@ package statefulset import ( + "encoding/json" "fmt" "maps" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/utils/ptr" rolloutv1alpha1 "kusionstack.io/kube-api/rollout/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -29,8 +31,9 @@ import ( ) var ( - _ workload.CanaryReleaseControl = &accessorImpl{} - _ workload.BatchReleaseControl = &accessorImpl{} + _ workload.CanaryReleaseControl = &accessorImpl{} + _ workload.BatchReleaseControl = &accessorImpl{} + _ workload.RollbackReleaseControl = &accessorImpl{} ) func (c *accessorImpl) BatchPreCheck(object client.Object) error { @@ -94,6 +97,46 @@ func (c *accessorImpl) ApplyCanaryPatch(object client.Object, podTemplatePatch * return nil } +func (c *accessorImpl) Rollbackable() bool { + return true +} + +func (c *accessorImpl) RollbackPreCheck(object client.Object) error { + obj, err := checkObj(object) + if err != nil { + return err + } + if obj.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType { + return fmt.Errorf("rollout can not upgrade partition of StatefulSet in rollback if the upgrade strategy type is not RollingUpdate") + } + return nil +} + +func (c *accessorImpl) RevertRevision(object client.Object, revision *appsv1.ControllerRevision) error { + obj, err := checkObj(object) + if err != nil { + return err + } + + var oldTemplate corev1.PodTemplateSpec + if err := json.Unmarshal(revision.Data.Raw, &oldTemplate); err != nil { + return fmt.Errorf("failed to unmarshal old statefulset template: %w", err) + } + + // 更新 StatefulSet.spec.template + obj.Spec.Template = oldTemplate + + // partition设置为副本数 + obj.Spec.UpdateStrategy = appsv1.StatefulSetUpdateStrategy{ + Type: appsv1.RollingUpdateStatefulSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{ + Partition: obj.Spec.Replicas, + }, + } + + return nil +} + func applyPodTemplateMetadataPatch(obj *appsv1.StatefulSet, patch *rolloutv1alpha1.MetadataPatch) { if patch == nil { return From 21e73144d0c4722f5f3daa6fd4b9f6a2694b8fbf Mon Sep 17 00:00:00 2001 From: WeichengWang1 <124144418+WeichengWang1@users.noreply.github.com> Date: Tue, 26 Aug 2025 12:10:13 +0800 Subject: [PATCH 2/9] upgrade resourceconsist to v0.0.4 (#141) --- go.mod | 2 +- go.sum | 20 ++------------------ 2 files changed, 3 insertions(+), 19 deletions(-) diff --git a/go.mod b/go.mod index 308658c..75119f2 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( k8s.io/utils v0.0.0-20241210054802-24370beab758 kusionstack.io/kube-api v0.7.4-0.20250828082633-8a73d98e9bf8 kusionstack.io/kube-utils v0.2.1-0.20250613035327-11e9cdaec9d6 - kusionstack.io/resourceconsist v0.0.2 + kusionstack.io/resourceconsist v0.0.4 sigs.k8s.io/controller-runtime v0.21.0 ) diff --git a/go.sum b/go.sum index 3665ced..3b4efc9 100644 --- a/go.sum +++ b/go.sum @@ -1021,22 +1021,6 @@ k8s.io/sample-apiserver v0.22.2/go.mod h1:h+/DIV5EmuNq4vfPr5TSXy9mIBVXXlPAKQMPbj k8s.io/system-validators v1.5.0/go.mod h1:bPldcLgkIUK22ALflnsXk8pvkTEndYdNuaHH6gRrl0Q= k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ= k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -kusionstack.io/kube-api v0.6.7-0.20250719054959-1cbe2be851f6 h1:ZXP+K55y4j9SKmLr8TMTgza5w1Zeu5Fmevk4MrTvFSQ= -kusionstack.io/kube-api v0.6.7-0.20250719054959-1cbe2be851f6/go.mod h1:ZrLpR6T7HzZp5UGSTXxzNCRizCC66mn2oGJWfL3VONc= -kusionstack.io/kube-api v0.6.7-0.20250720104212-3e44585627a1 h1:j/yoU/mjITbd5cQc7IaJGiEnVD9S36PsXbWkJcwYLKo= -kusionstack.io/kube-api v0.6.7-0.20250720104212-3e44585627a1/go.mod h1:ZrLpR6T7HzZp5UGSTXxzNCRizCC66mn2oGJWfL3VONc= -kusionstack.io/kube-api v0.6.7-0.20250720165902-a400661d33a0 h1:bgna3I0Xi0XGvdU+hwLc6ZFuNKPeTgZOAgUZaVW2ICQ= -kusionstack.io/kube-api v0.6.7-0.20250720165902-a400661d33a0/go.mod h1:ZrLpR6T7HzZp5UGSTXxzNCRizCC66mn2oGJWfL3VONc= -kusionstack.io/kube-api v0.6.7-0.20250721072644-0af27496d542 h1:JM7CsrsbwBK//eF8WkPlvw0IIj4sHT5SN3xHMryZSmM= -kusionstack.io/kube-api v0.6.7-0.20250721072644-0af27496d542/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= -kusionstack.io/kube-api v0.7.0 h1:jYjNq9LbpqVCNPU4cQZJI17G5gwZGJbhgAYi0BaTe7g= -kusionstack.io/kube-api v0.7.0/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= -kusionstack.io/kube-api v0.7.1 h1:m67X6mI1kOAvMPjiAYQRmjlugt25hN7jyd4nJn7LkPY= -kusionstack.io/kube-api v0.7.1/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= -kusionstack.io/kube-api v0.7.2 h1:qVrOnvGaqi4iZn6BZP0dQd/buLKLyDvY4PjkUCLVoZY= -kusionstack.io/kube-api v0.7.2/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= -kusionstack.io/kube-api v0.7.3 h1:Fxj/E+aixUuCB0VekEvEx6ss24Jke7MvcbRwAXwVYBo= -kusionstack.io/kube-api v0.7.3/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= kusionstack.io/kube-api v0.7.4-0.20250727122744-2399b387a919 h1:EMcsFMNMZO3oW7pqGOZd1nJC6YyQSt2RrcGdpC1J1gU= kusionstack.io/kube-api v0.7.4-0.20250727122744-2399b387a919/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= kusionstack.io/kube-api v0.7.4-0.20250825051758-ecb7ab4bd47e h1:VK9EoO8P+ugxQE8sgCjUsTdVI90HsLJq8xAneSzhl7E= @@ -1059,8 +1043,8 @@ kusionstack.io/kube-api v0.7.4-0.20250828082633-8a73d98e9bf8 h1:1ARYh14b7ksYh1Ci kusionstack.io/kube-api v0.7.4-0.20250828082633-8a73d98e9bf8/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= kusionstack.io/kube-utils v0.2.1-0.20250613035327-11e9cdaec9d6 h1:HYE6Wa8EzSlA6UmaTLtNKUgkB2mmasp6Ul69d3/SpK0= kusionstack.io/kube-utils v0.2.1-0.20250613035327-11e9cdaec9d6/go.mod h1:5Uy3GCJ1JEGqZw/Sp/uVnHBJN1t9wjY6USPSZ9s4idk= -kusionstack.io/resourceconsist v0.0.2 h1:gf+c/LOMsiKoVR+GLzOomw8qcUbZbPckQLczZllNdVM= -kusionstack.io/resourceconsist v0.0.2/go.mod h1:4KKqnyCzTlUUQgAx7BlazLe+erMkAthTnspZOnc632M= +kusionstack.io/resourceconsist v0.0.4 h1:wRqLJuNh8O4TT6p0uOklFpHUKiRdRxcAH71Sw/q9LhE= +kusionstack.io/resourceconsist v0.0.4/go.mod h1:/mZWzD30euHSfEVx7WhzJO94+yONnqEwwYwV2EA8c0s= modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw= modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk= modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k= From dd2d11d000487d5656f6ea42291f887c18e928d0 Mon Sep 17 00:00:00 2001 From: youngLiuHY Date: Mon, 1 Sep 2025 20:25:58 +0800 Subject: [PATCH 3/9] fix: rollbacking decision && get last revision --- pkg/controllers/rolloutrun/executor/default.go | 3 ++- pkg/workload/info.go | 12 +++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/pkg/controllers/rolloutrun/executor/default.go b/pkg/controllers/rolloutrun/executor/default.go index 69aa438..20e11e1 100644 --- a/pkg/controllers/rolloutrun/executor/default.go +++ b/pkg/controllers/rolloutrun/executor/default.go @@ -70,7 +70,8 @@ func (r *Executor) lifecycle(executorContext *ExecutorContext) (done bool, resul return false, result, nil } - if rolloutRun.Spec.Rollback != nil && len(rolloutRun.Spec.Rollback.Batches) > 0 && newStatus.Phase != rolloutv1alpha1.RolloutRunPhaseRollbacking { + if rolloutRun.Spec.Rollback != nil && rolloutRun.DeletionTimestamp.IsZero() && newStatus.Phase != rolloutv1alpha1.RolloutRunPhaseCanceling && + len(rolloutRun.Spec.Rollback.Batches) > 0 && newStatus.Phase != rolloutv1alpha1.RolloutRunPhaseRollbacking { newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseRollbacking return false, result, nil } diff --git a/pkg/workload/info.go b/pkg/workload/info.go index 406a354..2f395f7 100644 --- a/pkg/workload/info.go +++ b/pkg/workload/info.go @@ -175,7 +175,17 @@ func (o *Info) GetPreviousRevision(ctx context.Context, c client.Client) (*appsv return revisions[i].Revision > revisions[j].Revision }) - return revisions[len(revisions)-2], nil + if o.Status.StableRevision == o.Status.UpdatedRevision { + return revisions[len(revisions)-2], nil + } else { + for _, revision := range revisions { + if revision.Name == o.Status.StableRevision { + return revision, nil + } + } + } + + return nil, fmt.Errorf("no previous available controllerrevision found for workload %s/%s", o.Kind, o.Name) } func IsWaitingRollout(info Info) bool { From add3943e42552e5f31d966a06f50e2fdbb645638 Mon Sep 17 00:00:00 2001 From: youngLiuHY Date: Tue, 2 Sep 2025 11:57:44 +0800 Subject: [PATCH 4/9] fix: rollback batch move to next state --- pkg/controllers/rolloutrun/executor/context.go | 16 ++++++++-------- pkg/controllers/rolloutrun/executor/rollback.go | 9 ++++----- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/pkg/controllers/rolloutrun/executor/context.go b/pkg/controllers/rolloutrun/executor/context.go index 29d3b29..eb794f4 100644 --- a/pkg/controllers/rolloutrun/executor/context.go +++ b/pkg/controllers/rolloutrun/executor/context.go @@ -181,14 +181,7 @@ func (c *ExecutorContext) MoveToNextState(nextState rolloutv1alpha1.RolloutStepS c.Initialize() newStatus := c.NewStatus - if c.inCanary() { - newStatus.CanaryStatus.State = nextState - if nextState == StepPreCanaryStepHook { - newStatus.CanaryStatus.StartTime = ptr.To(metav1.Now()) - } else if isFinalStepState(nextState) { - newStatus.CanaryStatus.FinishTime = ptr.To(metav1.Now()) - } - } else if c.inRollback() { + if c.inRollback() { index := newStatus.RollbackStatus.CurrentBatchIndex newStatus.RollbackStatus.CurrentBatchState = nextState newStatus.RollbackStatus.Records[index].State = nextState @@ -197,6 +190,13 @@ func (c *ExecutorContext) MoveToNextState(nextState rolloutv1alpha1.RolloutStepS } else if isFinalStepState(nextState) { newStatus.RollbackStatus.Records[index].FinishTime = ptr.To(metav1.Now()) } + } else if c.inCanary() { + newStatus.CanaryStatus.State = nextState + if nextState == StepPreCanaryStepHook { + newStatus.CanaryStatus.StartTime = ptr.To(metav1.Now()) + } else if isFinalStepState(nextState) { + newStatus.CanaryStatus.FinishTime = ptr.To(metav1.Now()) + } } else { index := newStatus.BatchStatus.CurrentBatchIndex newStatus.BatchStatus.CurrentBatchState = nextState diff --git a/pkg/controllers/rolloutrun/executor/rollback.go b/pkg/controllers/rolloutrun/executor/rollback.go index ee1d650..c288676 100644 --- a/pkg/controllers/rolloutrun/executor/rollback.go +++ b/pkg/controllers/rolloutrun/executor/rollback.go @@ -152,6 +152,8 @@ func (e *rollbackExecutor) isSupported(ctx *ExecutorContext) bool { } func (e *rollbackExecutor) doPausing(ctx *ExecutorContext) (bool, time.Duration, error) { + logger := ctx.GetRollbackLogger() + rolloutRunName := ctx.RolloutRun.Name newStatus := ctx.NewStatus currentBatchIndex := newStatus.RollbackStatus.CurrentBatchIndex @@ -171,6 +173,7 @@ func (e *rollbackExecutor) doPausing(ctx *ExecutorContext) (bool, time.Duration, } if ctx.RolloutRun.Spec.Rollback.Batches[currentBatchIndex].Breakpoint { + logger.Info("current batch has breakpoint, set status phase to Paused") ctx.Pause() } return true, retryImmediately, nil @@ -181,11 +184,7 @@ func (e *rollbackExecutor) doPreStepHook(ctx *ExecutorContext) (bool, time.Durat } func (e *rollbackExecutor) doPostStepHook(ctx *ExecutorContext) (bool, time.Duration, error) { - done, retry, err := e.webhook.Do(ctx, rolloutv1alpha1.PostRollbackStepHook) - if done { - ctx.Pause() - } - return done, retry, err + return e.webhook.Do(ctx, rolloutv1alpha1.PostRollbackStepHook) } // doBatchUpgrading process upgrading state From 7b9df6d937e3376d2c9a176adf268ceaf611a4fa Mon Sep 17 00:00:00 2001 From: youngLiuHY Date: Tue, 2 Sep 2025 16:06:16 +0800 Subject: [PATCH 5/9] fix: mark rolloutrun into rollbacking --- go.mod | 2 +- go.sum | 6 +++ .../rolloutrun/executor/context.go | 44 +++++++++---------- .../rolloutrun/executor/default.go | 17 +++++-- .../rolloutrun/executor/rollback.go | 7 +-- 5 files changed, 46 insertions(+), 30 deletions(-) diff --git a/go.mod b/go.mod index 75119f2..d73cb79 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( k8s.io/klog/v2 v2.130.1 k8s.io/kubernetes v1.22.2 k8s.io/utils v0.0.0-20241210054802-24370beab758 - kusionstack.io/kube-api v0.7.4-0.20250828082633-8a73d98e9bf8 + kusionstack.io/kube-api v0.7.4-0.20250902065440-03af2146e073 kusionstack.io/kube-utils v0.2.1-0.20250613035327-11e9cdaec9d6 kusionstack.io/resourceconsist v0.0.4 sigs.k8s.io/controller-runtime v0.21.0 diff --git a/go.sum b/go.sum index 3b4efc9..8357312 100644 --- a/go.sum +++ b/go.sum @@ -1041,6 +1041,12 @@ kusionstack.io/kube-api v0.7.4-0.20250826112948-7aeb5f5204de h1:c7L4xDVkHjGgzAkY kusionstack.io/kube-api v0.7.4-0.20250826112948-7aeb5f5204de/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= kusionstack.io/kube-api v0.7.4-0.20250828082633-8a73d98e9bf8 h1:1ARYh14b7ksYh1Ci3liyT9jhV2rV1Lyuvnxy8a5WQZU= kusionstack.io/kube-api v0.7.4-0.20250828082633-8a73d98e9bf8/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250902063249-7e329edd6ea1 h1:o56pmV/Ip0EcYgxNablcaw/5Cy+nXbcOz5hlvt7Nyyk= +kusionstack.io/kube-api v0.7.4-0.20250902063249-7e329edd6ea1/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250902064055-250905b1fce5 h1:1s8FDyAlDet4qOjGts9GEiWC9Eya//z8rSNR0fLV9M4= +kusionstack.io/kube-api v0.7.4-0.20250902064055-250905b1fce5/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250902065440-03af2146e073 h1:Kel/+qk5fFf9QQsqTJo7KP6PfL1FZNMRINpvjNRGRho= +kusionstack.io/kube-api v0.7.4-0.20250902065440-03af2146e073/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= kusionstack.io/kube-utils v0.2.1-0.20250613035327-11e9cdaec9d6 h1:HYE6Wa8EzSlA6UmaTLtNKUgkB2mmasp6Ul69d3/SpK0= kusionstack.io/kube-utils v0.2.1-0.20250613035327-11e9cdaec9d6/go.mod h1:5Uy3GCJ1JEGqZw/Sp/uVnHBJN1t9wjY6USPSZ9s4idk= kusionstack.io/resourceconsist v0.0.4 h1:wRqLJuNh8O4TT6p0uOklFpHUKiRdRxcAH71Sw/q9LhE= diff --git a/pkg/controllers/rolloutrun/executor/context.go b/pkg/controllers/rolloutrun/executor/context.go index eb794f4..39e922e 100644 --- a/pkg/controllers/rolloutrun/executor/context.go +++ b/pkg/controllers/rolloutrun/executor/context.go @@ -129,11 +129,11 @@ func (c *ExecutorContext) GetWebhooksAndLatestStatusBy(hookType rolloutv1alpha1. return nil, nil } var webhookStatuses []rolloutv1alpha1.RolloutWebhookStatus - if c.inCanary() { - webhookStatuses = newStatus.CanaryStatus.Webhooks - } else if c.inRollback() { + if c.inRollback() { index := newStatus.RollbackStatus.CurrentBatchIndex webhookStatuses = newStatus.RollbackStatus.Records[index].Webhooks + } else if c.inCanary() { + webhookStatuses = newStatus.CanaryStatus.Webhooks } else { index := newStatus.BatchStatus.CurrentBatchIndex webhookStatuses = newStatus.BatchStatus.Records[index].Webhooks @@ -152,11 +152,11 @@ func (c *ExecutorContext) SetWebhookStatus(status rolloutv1alpha1.RolloutWebhook c.Initialize() newStatus := c.NewStatus - if c.inCanary() { - newStatus.CanaryStatus.Webhooks = appendWebhookStatus(newStatus.CanaryStatus.Webhooks, status) - } else if c.inRollback() { + if c.inRollback() { index := newStatus.RollbackStatus.CurrentBatchIndex newStatus.RollbackStatus.Records[index].Webhooks = appendWebhookStatus(newStatus.RollbackStatus.Records[index].Webhooks, status) + } else if c.inCanary() { + newStatus.CanaryStatus.Webhooks = appendWebhookStatus(newStatus.CanaryStatus.Webhooks, status) } else { index := newStatus.BatchStatus.CurrentBatchIndex newStatus.BatchStatus.Records[index].Webhooks = appendWebhookStatus(newStatus.BatchStatus.Records[index].Webhooks, status) @@ -168,10 +168,10 @@ func isFinalStepState(state rolloutv1alpha1.RolloutStepState) bool { } func (c *ExecutorContext) GetCurrentState() (string, rolloutv1alpha1.RolloutStepState) { - if c.inCanary() { - return "canary", c.NewStatus.CanaryStatus.State - } else if c.inRollback() { + if c.inRollback() { return "rollback", c.NewStatus.RollbackStatus.CurrentBatchState + } else if c.inCanary() { + return "canary", c.NewStatus.CanaryStatus.State } else { return "batch", c.NewStatus.BatchStatus.CurrentBatchState } @@ -213,13 +213,7 @@ func (c *ExecutorContext) SkipCurrentRelease() { c.Initialize() newStatus := c.NewStatus - if c.inCanary() { - newStatus.CanaryStatus.State = StepSucceeded - if newStatus.CanaryStatus.StartTime == nil { - newStatus.CanaryStatus.StartTime = ptr.To(metav1.Now()) - } - newStatus.CanaryStatus.FinishTime = ptr.To(metav1.Now()) - } else if c.inRollback() { + if c.inRollback() { newStatus.RollbackStatus.CurrentBatchIndex = int32(len(c.RolloutRun.Spec.Rollback.Batches) - 1) newStatus.RollbackStatus.CurrentBatchState = StepSucceeded for i := range newStatus.RollbackStatus.Records { @@ -234,6 +228,12 @@ func (c *ExecutorContext) SkipCurrentRelease() { newStatus.RollbackStatus.Records[i].FinishTime = ptr.To(metav1.Now()) } } + } else if c.inCanary() { + newStatus.CanaryStatus.State = StepSucceeded + if newStatus.CanaryStatus.StartTime == nil { + newStatus.CanaryStatus.StartTime = ptr.To(metav1.Now()) + } + newStatus.CanaryStatus.FinishTime = ptr.To(metav1.Now()) } else { newStatus.BatchStatus.CurrentBatchIndex = int32(len(c.RolloutRun.Spec.Batch.Batches) - 1) newStatus.BatchStatus.CurrentBatchState = StepSucceeded @@ -378,17 +378,17 @@ func (r *ExecutorContext) makeRolloutWebhookReview(hookType rolloutv1alpha1.Hook }, } - if r.inCanary() { - review.Spec.Canary = &rolloutv1alpha1.RolloutWebhookReviewCanary{ - Targets: rolloutRun.Spec.Canary.Targets, - Properties: rolloutRun.Spec.Canary.Properties, - } - } else if r.inRollback() { + if r.inRollback() { review.Spec.Rollback = &rolloutv1alpha1.RolloutWebhookReviewBatch{ BatchIndex: newStatus.RollbackStatus.CurrentBatchIndex, Targets: rolloutRun.Spec.Rollback.Batches[newStatus.RollbackStatus.CurrentBatchIndex].Targets, Properties: rolloutRun.Spec.Rollback.Batches[newStatus.RollbackStatus.CurrentBatchIndex].Properties, } + } else if r.inCanary() { + review.Spec.Canary = &rolloutv1alpha1.RolloutWebhookReviewCanary{ + Targets: rolloutRun.Spec.Canary.Targets, + Properties: rolloutRun.Spec.Canary.Properties, + } } else { review.Spec.Batch = &rolloutv1alpha1.RolloutWebhookReviewBatch{ BatchIndex: newStatus.BatchStatus.CurrentBatchIndex, diff --git a/pkg/controllers/rolloutrun/executor/default.go b/pkg/controllers/rolloutrun/executor/default.go index 20e11e1..7be4e2b 100644 --- a/pkg/controllers/rolloutrun/executor/default.go +++ b/pkg/controllers/rolloutrun/executor/default.go @@ -70,10 +70,19 @@ func (r *Executor) lifecycle(executorContext *ExecutorContext) (done bool, resul return false, result, nil } - if rolloutRun.Spec.Rollback != nil && rolloutRun.DeletionTimestamp.IsZero() && newStatus.Phase != rolloutv1alpha1.RolloutRunPhaseCanceling && - len(rolloutRun.Spec.Rollback.Batches) > 0 && newStatus.Phase != rolloutv1alpha1.RolloutRunPhaseRollbacking { - newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseRollbacking - return false, result, nil + // determine whether to transit rolloutrun phase to rollbacking or not + if rolloutRun.Spec.Rollback != nil && len(rolloutRun.Spec.Rollback.Batches) > 0 && rolloutRun.DeletionTimestamp.IsZero() && + newStatus.Phase != rolloutv1alpha1.RolloutRunPhaseCanceling && newStatus.Phase != rolloutv1alpha1.RolloutRunPhaseRollbacking { + if newStatus.Phase == rolloutv1alpha1.RolloutRunPhasePaused { + rollback, ok := utils.GetMapValue(rolloutRun.Annotations, rolloutapis.AnnoRolloutPhaseRollbacking) + if !ok || rollback != "true" { + newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseRollbacking + return false, result, nil + } + } else { + newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseRollbacking + return false, result, nil + } } switch newStatus.Phase { diff --git a/pkg/controllers/rolloutrun/executor/rollback.go b/pkg/controllers/rolloutrun/executor/rollback.go index c288676..eafcbf2 100644 --- a/pkg/controllers/rolloutrun/executor/rollback.go +++ b/pkg/controllers/rolloutrun/executor/rollback.go @@ -22,6 +22,7 @@ import ( "github.com/go-logr/logr" utilerrors "k8s.io/apimachinery/pkg/util/errors" + "kusionstack.io/kube-api/rollout" rolloutv1alpha1 "kusionstack.io/kube-api/rollout/v1alpha1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -95,6 +96,9 @@ func (e *rollbackExecutor) Do(ctx *ExecutorContext) (done bool, result ctrl.Resu return false, ctrl.Result{}, err } + //mark rollout into rollback + ctx.RolloutRun.Annotations[rollout.AnnoRolloutPhaseRollbacking] = "true" + done, retry := e.deleteCanaryRoute(ctx) if !done { return false, ctrl.Result{RequeueAfter: retry}, nil @@ -152,8 +156,6 @@ func (e *rollbackExecutor) isSupported(ctx *ExecutorContext) bool { } func (e *rollbackExecutor) doPausing(ctx *ExecutorContext) (bool, time.Duration, error) { - logger := ctx.GetRollbackLogger() - rolloutRunName := ctx.RolloutRun.Name newStatus := ctx.NewStatus currentBatchIndex := newStatus.RollbackStatus.CurrentBatchIndex @@ -173,7 +175,6 @@ func (e *rollbackExecutor) doPausing(ctx *ExecutorContext) (bool, time.Duration, } if ctx.RolloutRun.Spec.Rollback.Batches[currentBatchIndex].Breakpoint { - logger.Info("current batch has breakpoint, set status phase to Paused") ctx.Pause() } return true, retryImmediately, nil From 590b6bc50e801b31756e5420a6f82e6eb2212f7c Mon Sep 17 00:00:00 2001 From: youngLiuHY Date: Tue, 2 Sep 2025 16:32:24 +0800 Subject: [PATCH 6/9] fix: mark rollout into rollback --- pkg/controllers/rolloutrun/executor/rollback.go | 6 +++--- pkg/controllers/rolloutrun/rolloutrun_controller.go | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/controllers/rolloutrun/executor/rollback.go b/pkg/controllers/rolloutrun/executor/rollback.go index eafcbf2..a0d15f7 100644 --- a/pkg/controllers/rolloutrun/executor/rollback.go +++ b/pkg/controllers/rolloutrun/executor/rollback.go @@ -96,9 +96,6 @@ func (e *rollbackExecutor) Do(ctx *ExecutorContext) (done bool, result ctrl.Resu return false, ctrl.Result{}, err } - //mark rollout into rollback - ctx.RolloutRun.Annotations[rollout.AnnoRolloutPhaseRollbacking] = "true" - done, retry := e.deleteCanaryRoute(ctx) if !done { return false, ctrl.Result{RequeueAfter: retry}, nil @@ -116,6 +113,9 @@ func (e *rollbackExecutor) Do(ctx *ExecutorContext) (done bool, result ctrl.Resu return false, result, err } } + + //mark rollout into rollback + ctx.RolloutRun.Annotations[rollout.AnnoRolloutPhaseRollbacking] = "true" } stepDone, result, err := e.stateEngine.do(ctx, currentState) diff --git a/pkg/controllers/rolloutrun/rolloutrun_controller.go b/pkg/controllers/rolloutrun/rolloutrun_controller.go index 7152aea..c298e4c 100644 --- a/pkg/controllers/rolloutrun/rolloutrun_controller.go +++ b/pkg/controllers/rolloutrun/rolloutrun_controller.go @@ -138,6 +138,7 @@ func (r *RolloutRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) var result ctrl.Result result, err = r.syncRolloutRun(ctx, obj, newStatus, accessor, workloads) + logger.Info("current rolloutrun annotations: ", obj.Annotations) if tempErr := r.cleanupAnnotation(ctx, obj); tempErr != nil { logger.Error(tempErr, "failed to clean up annotation") } From 99494c00609968911f06f629b21f4a9cd6817347 Mon Sep 17 00:00:00 2001 From: youngLiuHY Date: Tue, 2 Sep 2025 16:39:40 +0800 Subject: [PATCH 7/9] fix: anno log --- pkg/controllers/rolloutrun/rolloutrun_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controllers/rolloutrun/rolloutrun_controller.go b/pkg/controllers/rolloutrun/rolloutrun_controller.go index c298e4c..d93c58b 100644 --- a/pkg/controllers/rolloutrun/rolloutrun_controller.go +++ b/pkg/controllers/rolloutrun/rolloutrun_controller.go @@ -138,7 +138,7 @@ func (r *RolloutRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) var result ctrl.Result result, err = r.syncRolloutRun(ctx, obj, newStatus, accessor, workloads) - logger.Info("current rolloutrun annotations: ", obj.Annotations) + logger.Info("current rolloutrun annotations", "rollbacking", utils.GetMapValueByDefault(obj.Annotations, rollout.AnnoRolloutPhaseRollbacking, "false")) if tempErr := r.cleanupAnnotation(ctx, obj); tempErr != nil { logger.Error(tempErr, "failed to clean up annotation") } From e0aeecb8348b750d7f4e823e6f5754a48cc0eeb8 Mon Sep 17 00:00:00 2001 From: youngLiuHY Date: Tue, 2 Sep 2025 16:55:11 +0800 Subject: [PATCH 8/9] fix: add rollback anno --- .../rolloutrun/rolloutrun_controller.go | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/pkg/controllers/rolloutrun/rolloutrun_controller.go b/pkg/controllers/rolloutrun/rolloutrun_controller.go index d93c58b..1c28fc0 100644 --- a/pkg/controllers/rolloutrun/rolloutrun_controller.go +++ b/pkg/controllers/rolloutrun/rolloutrun_controller.go @@ -138,7 +138,14 @@ func (r *RolloutRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) var result ctrl.Result result, err = r.syncRolloutRun(ctx, obj, newStatus, accessor, workloads) - logger.Info("current rolloutrun annotations", "rollbacking", utils.GetMapValueByDefault(obj.Annotations, rollout.AnnoRolloutPhaseRollbacking, "false")) + // add rollback annotations to mark rolloutrun entering rollbacking phase + rollback := utils.GetMapValueByDefault(obj.Annotations, rollout.AnnoRolloutPhaseRollbacking, "false") + if rollback == "true" { + if tempErr := r.addRollbackAnnotation(ctx, obj); tempErr != nil { + logger.Error(tempErr, "failed to add rollback annotation") + } + } + if tempErr := r.cleanupAnnotation(ctx, obj); tempErr != nil { logger.Error(tempErr, "failed to clean up annotation") } @@ -198,6 +205,20 @@ func (r *RolloutRunReconciler) cleanupAnnotation(ctx context.Context, obj *rollo return nil } +func (r *RolloutRunReconciler) addRollbackAnnotation(ctx context.Context, obj *rolloutv1alpha1.RolloutRun) error { + // delete manual command annotations from rollout + _, err := utils.UpdateOnConflict(clusterinfo.WithCluster(ctx, clusterinfo.Fed), r.Client, r.Client, obj, func() error { + obj.Annotations[rollout.AnnoRolloutPhaseRollbacking] = "true" + return nil + }) + if err != nil { + return err + } + key := utils.ObjectKeyString(obj) + r.rvExpectation.ExpectUpdate(key, obj.ResourceVersion) // nolint + return nil +} + func (r *RolloutRunReconciler) findTrafficTopology(ctx context.Context, obj *rolloutv1alpha1.RolloutRun) ([]rolloutv1alpha1.TrafficTopology, error) { topologies := make([]rolloutv1alpha1.TrafficTopology, 0) for _, name := range obj.Spec.TrafficTopologyRefs { From 0dcdad49d82e60dd04e11f153555f793505be38f Mon Sep 17 00:00:00 2001 From: youngLiuHY Date: Thu, 4 Sep 2025 16:53:18 +0800 Subject: [PATCH 9/9] fix: rollout rollback --- go.mod | 2 +- go.sum | 4 ++ pkg/controllers/rolloutrun/control/control.go | 5 +-- .../rolloutrun/executor/context.go | 2 +- .../rolloutrun/executor/default.go | 21 ++++----- .../rolloutrun/executor/rollback.go | 31 ++++++++++--- .../rolloutrun/rolloutrun_controller.go | 44 +++++++------------ pkg/workload/collaset/release.go | 4 +- pkg/workload/info.go | 39 ++++++---------- pkg/workload/interface.go | 3 +- pkg/workload/statefulset/release.go | 18 +++++--- 11 files changed, 89 insertions(+), 84 deletions(-) diff --git a/go.mod b/go.mod index d73cb79..0eb20c4 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( k8s.io/klog/v2 v2.130.1 k8s.io/kubernetes v1.22.2 k8s.io/utils v0.0.0-20241210054802-24370beab758 - kusionstack.io/kube-api v0.7.4-0.20250902065440-03af2146e073 + kusionstack.io/kube-api v0.7.4-0.20250902122009-5447303fc5ee kusionstack.io/kube-utils v0.2.1-0.20250613035327-11e9cdaec9d6 kusionstack.io/resourceconsist v0.0.4 sigs.k8s.io/controller-runtime v0.21.0 diff --git a/go.sum b/go.sum index 8357312..0708d11 100644 --- a/go.sum +++ b/go.sum @@ -1047,6 +1047,10 @@ kusionstack.io/kube-api v0.7.4-0.20250902064055-250905b1fce5 h1:1s8FDyAlDet4qOjG kusionstack.io/kube-api v0.7.4-0.20250902064055-250905b1fce5/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= kusionstack.io/kube-api v0.7.4-0.20250902065440-03af2146e073 h1:Kel/+qk5fFf9QQsqTJo7KP6PfL1FZNMRINpvjNRGRho= kusionstack.io/kube-api v0.7.4-0.20250902065440-03af2146e073/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250902100032-7b62d7607345 h1:SBfFLyJR2qRh19/qlC45iGZlMnnTAkKWmXtHrIlJFc8= +kusionstack.io/kube-api v0.7.4-0.20250902100032-7b62d7607345/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250902122009-5447303fc5ee h1:s1X/0d8ryRzr6AQ1WwSZ1TKCrXsZzqYZSjEmwMRVju0= +kusionstack.io/kube-api v0.7.4-0.20250902122009-5447303fc5ee/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= kusionstack.io/kube-utils v0.2.1-0.20250613035327-11e9cdaec9d6 h1:HYE6Wa8EzSlA6UmaTLtNKUgkB2mmasp6Ul69d3/SpK0= kusionstack.io/kube-utils v0.2.1-0.20250613035327-11e9cdaec9d6/go.mod h1:5Uy3GCJ1JEGqZw/Sp/uVnHBJN1t9wjY6USPSZ9s4idk= kusionstack.io/resourceconsist v0.0.4 h1:wRqLJuNh8O4TT6p0uOklFpHUKiRdRxcAH71Sw/q9LhE= diff --git a/pkg/controllers/rolloutrun/control/control.go b/pkg/controllers/rolloutrun/control/control.go index 384423d..d01848c 100644 --- a/pkg/controllers/rolloutrun/control/control.go +++ b/pkg/controllers/rolloutrun/control/control.go @@ -24,7 +24,6 @@ import ( "strings" "github.com/go-logr/logr" - appsv1 "k8s.io/api/apps/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -332,8 +331,8 @@ func (c *RollbackReleaseControl) Revert(ctx context.Context, info *workload.Info return TerminalError(err) } - _, err := info.UpdateForRevisionOnConflict(ctx, c.client, func(obj client.Object, revison *appsv1.ControllerRevision) error { - return c.control.RevertRevision(obj, revison) + _, err := info.UpdateOnConflict(ctx, c.client, func(obj client.Object) error { + return c.control.RevertRevision(ctx, c.client, obj) }) return err diff --git a/pkg/controllers/rolloutrun/executor/context.go b/pkg/controllers/rolloutrun/executor/context.go index 39e922e..559ea5b 100644 --- a/pkg/controllers/rolloutrun/executor/context.go +++ b/pkg/controllers/rolloutrun/executor/context.go @@ -349,7 +349,7 @@ func (r *ExecutorContext) inRollback() bool { if newStatus.RollbackStatus == nil { return false } - if run.Spec.Rollback != nil { + if run.Spec.Rollback != nil && len(run.Spec.Rollback.Batches) > 0 { if newStatus.Phase != rolloutv1alpha1.RolloutRunPhaseRollbacked { return true } diff --git a/pkg/controllers/rolloutrun/executor/default.go b/pkg/controllers/rolloutrun/executor/default.go index 7be4e2b..a081397 100644 --- a/pkg/controllers/rolloutrun/executor/default.go +++ b/pkg/controllers/rolloutrun/executor/default.go @@ -6,6 +6,7 @@ import ( "github.com/go-logr/logr" rolloutapis "kusionstack.io/kube-api/rollout" rolloutv1alpha1 "kusionstack.io/kube-api/rollout/v1alpha1" + "kusionstack.io/kube-api/rollout/v1alpha1/condition" ctrl "sigs.k8s.io/controller-runtime" "kusionstack.io/rollout/pkg/utils" @@ -72,10 +73,10 @@ func (r *Executor) lifecycle(executorContext *ExecutorContext) (done bool, resul // determine whether to transit rolloutrun phase to rollbacking or not if rolloutRun.Spec.Rollback != nil && len(rolloutRun.Spec.Rollback.Batches) > 0 && rolloutRun.DeletionTimestamp.IsZero() && - newStatus.Phase != rolloutv1alpha1.RolloutRunPhaseCanceling && newStatus.Phase != rolloutv1alpha1.RolloutRunPhaseRollbacking { + newStatus.Phase != rolloutv1alpha1.RolloutRunPhaseCanceling && newStatus.Phase != rolloutv1alpha1.RolloutRunPhaseRollbacking { if newStatus.Phase == rolloutv1alpha1.RolloutRunPhasePaused { - rollback, ok := utils.GetMapValue(rolloutRun.Annotations, rolloutapis.AnnoRolloutPhaseRollbacking) - if !ok || rollback != "true" { + progressingCond := condition.GetCondition(rolloutRun.Status.Conditions, rolloutv1alpha1.RolloutConditionProgressing) + if progressingCond == nil || progressingCond.Reason != rolloutv1alpha1.RolloutReasonProgressingRollbacking { newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseRollbacking return false, result, nil } @@ -173,6 +174,13 @@ func (r *Executor) doCanceling(ctx *ExecutorContext) (bool, ctrl.Result, error) rolloutRun := ctx.RolloutRun newStatus := ctx.NewStatus + if ctx.inRollback() { + // init RollbackStatus + if len(newStatus.RollbackStatus.CurrentBatchState) == 0 { + newStatus.RollbackStatus.CurrentBatchState = StepNone + } + return r.rollback.Cancel(ctx) + } if ctx.inCanary() { canceled, result, err := r.canary.Cancel(ctx) if err != nil { @@ -180,13 +188,6 @@ func (r *Executor) doCanceling(ctx *ExecutorContext) (bool, ctrl.Result, error) } return canceled, result, nil } - if rolloutRun.Spec.Rollback != nil && len(rolloutRun.Spec.Rollback.Batches) > 0 { - // init RollbackStatus - if len(newStatus.RollbackStatus.CurrentBatchState) == 0 { - newStatus.RollbackStatus.CurrentBatchState = StepNone - } - return r.rollback.Cancel(ctx) - } if rolloutRun.Spec.Batch != nil && len(rolloutRun.Spec.Batch.Batches) > 0 { // init BatchStatus if len(newStatus.BatchStatus.CurrentBatchState) == 0 { diff --git a/pkg/controllers/rolloutrun/executor/rollback.go b/pkg/controllers/rolloutrun/executor/rollback.go index a0d15f7..e49a62f 100644 --- a/pkg/controllers/rolloutrun/executor/rollback.go +++ b/pkg/controllers/rolloutrun/executor/rollback.go @@ -21,9 +21,10 @@ import ( "time" "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" - "kusionstack.io/kube-api/rollout" rolloutv1alpha1 "kusionstack.io/kube-api/rollout/v1alpha1" + "kusionstack.io/kube-api/rollout/v1alpha1/condition" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -85,25 +86,41 @@ func (e *rollbackExecutor) Do(ctx *ExecutorContext) (done bool, result ctrl.Resu return true, ctrl.Result{Requeue: true}, nil } + logger := ctx.GetRollbackLogger() newStatus := ctx.NewStatus currentBatchIndex := newStatus.RollbackStatus.CurrentBatchIndex currentState := newStatus.RollbackStatus.CurrentBatchState // revert workload version and reset canary route before rollback stateEngine start if currentBatchIndex == 0 && currentState == StepNone { + // revert revision done, err := e.revert(ctx) if !done { + logger.Error(err, "revert workload revision failed") return false, ctrl.Result{}, err } - + // modify condition reason to rollbacking for progressing condition type after reverting revision, to prevent repeated revision reverting in each reconcile + newCondition := condition.NewCondition( + rolloutv1alpha1.RolloutConditionProgressing, + metav1.ConditionTrue, + rolloutv1alpha1.RolloutReasonProgressingRollbacking, + "rolloutRun is rollbacking", + ) + newStatus.Conditions = condition.SetCondition(newStatus.Conditions, *newCondition) + + // delete canary route done, retry := e.deleteCanaryRoute(ctx) if !done { return false, ctrl.Result{RequeueAfter: retry}, nil } + // if in canary release, recycle canary resource before rollback batch progress if ctx.inCanary() { done, retry, err = e.recycle(ctx) if !done { + if err != nil { + logger.Error(err, "recyle canary resource in canary batch failed") + } switch retry { case retryStop: result = ctrl.Result{} @@ -113,9 +130,6 @@ func (e *rollbackExecutor) Do(ctx *ExecutorContext) (done bool, result ctrl.Resu return false, result, err } } - - //mark rollout into rollback - ctx.RolloutRun.Annotations[rollout.AnnoRolloutPhaseRollbacking] = "true" } stepDone, result, err := e.stateEngine.do(ctx, currentState) @@ -133,8 +147,10 @@ func (e *rollbackExecutor) Do(ctx *ExecutorContext) (done bool, result ctrl.Resu return false, result, nil } + // recyle cananry resource after rollback batch progress recycleDone, retry := e.recyleCanaryResource(ctx) if !recycleDone { + logger.Info("recyle canary resource after rollbacking failed, retry later", "rolloutrun", ctx.RolloutRun.Name) return false, ctrl.Result{RequeueAfter: retry}, nil } @@ -250,6 +266,11 @@ func (e *rollbackExecutor) doBatchUpgrading(ctx *ExecutorContext) (bool, time.Du func (e *rollbackExecutor) revert(ctx *ExecutorContext) (bool, error) { newStatus := ctx.NewStatus + progressingCond := condition.GetCondition(ctx.RolloutRun.Status.Conditions, rolloutv1alpha1.RolloutConditionProgressing) + if progressingCond != nil && progressingCond.Reason == rolloutv1alpha1.RolloutReasonProgressingRollbacking { + return true, nil + } + currentBatchIndex := newStatus.RollbackStatus.CurrentBatchIndex currentBatch := ctx.RolloutRun.Spec.Rollback.Batches[currentBatchIndex] diff --git a/pkg/controllers/rolloutrun/rolloutrun_controller.go b/pkg/controllers/rolloutrun/rolloutrun_controller.go index 1c28fc0..da90593 100644 --- a/pkg/controllers/rolloutrun/rolloutrun_controller.go +++ b/pkg/controllers/rolloutrun/rolloutrun_controller.go @@ -138,14 +138,6 @@ func (r *RolloutRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) var result ctrl.Result result, err = r.syncRolloutRun(ctx, obj, newStatus, accessor, workloads) - // add rollback annotations to mark rolloutrun entering rollbacking phase - rollback := utils.GetMapValueByDefault(obj.Annotations, rollout.AnnoRolloutPhaseRollbacking, "false") - if rollback == "true" { - if tempErr := r.addRollbackAnnotation(ctx, obj); tempErr != nil { - logger.Error(tempErr, "failed to add rollback annotation") - } - } - if tempErr := r.cleanupAnnotation(ctx, obj); tempErr != nil { logger.Error(tempErr, "failed to clean up annotation") } @@ -205,20 +197,6 @@ func (r *RolloutRunReconciler) cleanupAnnotation(ctx context.Context, obj *rollo return nil } -func (r *RolloutRunReconciler) addRollbackAnnotation(ctx context.Context, obj *rolloutv1alpha1.RolloutRun) error { - // delete manual command annotations from rollout - _, err := utils.UpdateOnConflict(clusterinfo.WithCluster(ctx, clusterinfo.Fed), r.Client, r.Client, obj, func() error { - obj.Annotations[rollout.AnnoRolloutPhaseRollbacking] = "true" - return nil - }) - if err != nil { - return err - } - key := utils.ObjectKeyString(obj) - r.rvExpectation.ExpectUpdate(key, obj.ResourceVersion) // nolint - return nil -} - func (r *RolloutRunReconciler) findTrafficTopology(ctx context.Context, obj *rolloutv1alpha1.RolloutRun) ([]rolloutv1alpha1.TrafficTopology, error) { topologies := make([]rolloutv1alpha1.TrafficTopology, 0) for _, name := range obj.Spec.TrafficTopologyRefs { @@ -292,10 +270,22 @@ func (r *RolloutRunReconciler) syncRolloutRun( rolloutv1alpha1.RolloutReasonProgressingCompleted, "rolloutRun is completed", ) - if newStatus.Phase == rolloutv1alpha1.RolloutRunPhaseCanceled { - newCondition.Reason = rolloutv1alpha1.RolloutReasonProgressingCanceled - newCondition.Message = "rolloutRun is canceled" - } + newStatus.Conditions = condition.SetCondition(newStatus.Conditions, *newCondition) + } else if newStatus.Phase == rolloutv1alpha1.RolloutRunPhaseCanceled { + newCondition := condition.NewCondition( + rolloutv1alpha1.RolloutConditionProgressing, + metav1.ConditionFalse, + rolloutv1alpha1.RolloutReasonProgressingCanceled, + "rolloutRun is canceled", + ) + newStatus.Conditions = condition.SetCondition(newStatus.Conditions, *newCondition) + } else if newStatus.Phase == rolloutv1alpha1.RolloutRunPhaseRollbacked { + newCondition := condition.NewCondition( + rolloutv1alpha1.RolloutConditionProgressing, + metav1.ConditionFalse, + rolloutv1alpha1.RolloutReasonProgressingRollbacked, + "rolloutRun is rollbacked", + ) newStatus.Conditions = condition.SetCondition(newStatus.Conditions, *newCondition) } else if newStatus.Error != nil { newCondition := condition.NewCondition( @@ -305,7 +295,7 @@ func (r *RolloutRunReconciler) syncRolloutRun( "rolloutRun stop rolling since error exist", ) newStatus.Conditions = condition.SetCondition(newStatus.Conditions, *newCondition) - } else { + } else if obj.Spec.Rollback == nil || len(obj.Spec.Rollback.Batches) == 0 { newCondition := condition.NewCondition( rolloutv1alpha1.RolloutConditionProgressing, metav1.ConditionTrue, diff --git a/pkg/workload/collaset/release.go b/pkg/workload/collaset/release.go index a213e3c..467610e 100644 --- a/pkg/workload/collaset/release.go +++ b/pkg/workload/collaset/release.go @@ -17,10 +17,10 @@ package collaset import ( + "context" "fmt" "maps" - appsv1 "k8s.io/api/apps/v1" "k8s.io/utils/ptr" operatingv1alpha1 "kusionstack.io/kube-api/apps/v1alpha1" rolloutv1alpha1 "kusionstack.io/kube-api/rollout/v1alpha1" @@ -105,7 +105,7 @@ func (c *accessorImpl) RollbackPreCheck(object client.Object) error { return nil } -func (c *accessorImpl) RevertRevision(object client.Object, revision *appsv1.ControllerRevision) error { +func (c *accessorImpl) RevertRevision(ctx context.Context, cc client.Client, object client.Object) error { return nil } diff --git a/pkg/workload/info.go b/pkg/workload/info.go index 2f395f7..097ba7a 100644 --- a/pkg/workload/info.go +++ b/pkg/workload/info.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" rolloutv1alpha1 "kusionstack.io/kube-api/rollout/v1alpha1" @@ -129,29 +130,11 @@ func (o *Info) UpdateOnConflict(ctx context.Context, c client.Client, mutateFn f return updated, nil } -func (o *Info) UpdateForRevisionOnConflict(ctx context.Context, c client.Client, mutateFn func(client.Object, *appsv1.ControllerRevision) error) (bool, error) { - ctx = clusterinfo.WithCluster(ctx, o.ClusterName) - obj := o.Object - lastRevision, err := o.GetPreviousRevision(ctx, c) - if err != nil { - return false, err - } - updated, err := utils.UpdateOnConflict(ctx, c, c, obj, func() error { - return mutateFn(obj, lastRevision) - }) - if err != nil { - return false, err - } - if updated { - o.Object = obj - } - return updated, nil -} - -func (o *Info) GetPreviousRevision(ctx context.Context, c client.Client) (*appsv1.ControllerRevision, error) { +func (o *Info) GetPreviousRevision(ctx context.Context, c client.Client, matchLabels map[string]string) (*appsv1.ControllerRevision, error) { crs := &appsv1.ControllerRevisionList{} err := c.List(clusterinfo.WithCluster(ctx, o.GetClusterName()), crs, &client.ListOptions{ - Namespace: o.GetNamespace(), + Namespace: o.GetNamespace(), + LabelSelector: labels.SelectorFromSet(matchLabels), }) if err != nil { return nil, err @@ -175,12 +158,16 @@ func (o *Info) GetPreviousRevision(ctx context.Context, c client.Client) (*appsv return revisions[i].Revision > revisions[j].Revision }) - if o.Status.StableRevision == o.Status.UpdatedRevision { - return revisions[len(revisions)-2], nil - } else { - for _, revision := range revisions { - if revision.Name == o.Status.StableRevision { + for idx, revision := range revisions { + if revision.Name == o.Status.StableRevision { + if o.Status.StableRevision != o.Status.UpdatedRevision { return revision, nil + } else { + if idx+1 > len(revisions)-1 { + return nil, fmt.Errorf("no previous available controllerrevision found for workload %s/%s", o.Kind, o.Name) + } else { + return revisions[idx+1], nil + } } } } diff --git a/pkg/workload/interface.go b/pkg/workload/interface.go index 9321c6a..7a0db06 100644 --- a/pkg/workload/interface.go +++ b/pkg/workload/interface.go @@ -17,7 +17,6 @@ package workload import ( "context" - appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/runtime/schema" "kusionstack.io/kube-api/rollout/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -64,7 +63,7 @@ type RollbackReleaseControl interface { // RollbackPreCheck checks object before rollback release. RollbackPreCheck(obj client.Object) error // Revert the workload revision to the stable. - RevertRevision(obj client.Object, revision *appsv1.ControllerRevision) error + RevertRevision(ctx context.Context, c client.Client, obj client.Object) error // ApplyPartition use expectedUpdated replicas to calculate partition and apply it to the workload. ApplyPartition(obj client.Object, expectedUpdatedReplicas int32) error } diff --git a/pkg/workload/statefulset/release.go b/pkg/workload/statefulset/release.go index 52234bb..9e258bb 100644 --- a/pkg/workload/statefulset/release.go +++ b/pkg/workload/statefulset/release.go @@ -17,6 +17,7 @@ package statefulset import ( + "context" "encoding/json" "fmt" "maps" @@ -112,14 +113,20 @@ func (c *accessorImpl) RollbackPreCheck(object client.Object) error { return nil } -func (c *accessorImpl) RevertRevision(object client.Object, revision *appsv1.ControllerRevision) error { +func (c *accessorImpl) RevertRevision(ctx context.Context, cc client.Client, object client.Object) error { obj, err := checkObj(object) if err != nil { return err } + workloadInfo := workload.NewInfo(workload.GetClusterFromLabel(obj.GetLabels()), GVK, obj, c.getStatus(obj)) + lastRevision, err := workloadInfo.GetPreviousRevision(ctx, cc, nil) + if err != nil { + return err + } + var oldTemplate corev1.PodTemplateSpec - if err := json.Unmarshal(revision.Data.Raw, &oldTemplate); err != nil { + if err := json.Unmarshal(lastRevision.Data.Raw, &oldTemplate); err != nil { return fmt.Errorf("failed to unmarshal old statefulset template: %w", err) } @@ -127,11 +134,8 @@ func (c *accessorImpl) RevertRevision(object client.Object, revision *appsv1.Con obj.Spec.Template = oldTemplate // partition设置为副本数 - obj.Spec.UpdateStrategy = appsv1.StatefulSetUpdateStrategy{ - Type: appsv1.RollingUpdateStatefulSetStrategyType, - RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{ - Partition: obj.Spec.Replicas, - }, + obj.Spec.UpdateStrategy.RollingUpdate = &appsv1.RollingUpdateStatefulSetStrategy{ + Partition: obj.Spec.Replicas, } return nil