diff --git a/go.mod b/go.mod index 0920557..0eb20c4 100644 --- a/go.mod +++ b/go.mod @@ -23,9 +23,9 @@ require ( k8s.io/klog/v2 v2.130.1 k8s.io/kubernetes v1.22.2 k8s.io/utils v0.0.0-20241210054802-24370beab758 - kusionstack.io/kube-api v0.7.4-0.20250727122744-2399b387a919 + kusionstack.io/kube-api v0.7.4-0.20250902122009-5447303fc5ee kusionstack.io/kube-utils v0.2.1-0.20250613035327-11e9cdaec9d6 - kusionstack.io/resourceconsist v0.0.2 + kusionstack.io/resourceconsist v0.0.4 sigs.k8s.io/controller-runtime v0.21.0 ) diff --git a/go.sum b/go.sum index 8b90bb3..0708d11 100644 --- a/go.sum +++ b/go.sum @@ -1021,28 +1021,40 @@ k8s.io/sample-apiserver v0.22.2/go.mod h1:h+/DIV5EmuNq4vfPr5TSXy9mIBVXXlPAKQMPbj k8s.io/system-validators v1.5.0/go.mod h1:bPldcLgkIUK22ALflnsXk8pvkTEndYdNuaHH6gRrl0Q= k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ= k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -kusionstack.io/kube-api v0.6.7-0.20250719054959-1cbe2be851f6 h1:ZXP+K55y4j9SKmLr8TMTgza5w1Zeu5Fmevk4MrTvFSQ= -kusionstack.io/kube-api v0.6.7-0.20250719054959-1cbe2be851f6/go.mod h1:ZrLpR6T7HzZp5UGSTXxzNCRizCC66mn2oGJWfL3VONc= -kusionstack.io/kube-api v0.6.7-0.20250720104212-3e44585627a1 h1:j/yoU/mjITbd5cQc7IaJGiEnVD9S36PsXbWkJcwYLKo= -kusionstack.io/kube-api v0.6.7-0.20250720104212-3e44585627a1/go.mod h1:ZrLpR6T7HzZp5UGSTXxzNCRizCC66mn2oGJWfL3VONc= -kusionstack.io/kube-api v0.6.7-0.20250720165902-a400661d33a0 h1:bgna3I0Xi0XGvdU+hwLc6ZFuNKPeTgZOAgUZaVW2ICQ= -kusionstack.io/kube-api v0.6.7-0.20250720165902-a400661d33a0/go.mod h1:ZrLpR6T7HzZp5UGSTXxzNCRizCC66mn2oGJWfL3VONc= -kusionstack.io/kube-api v0.6.7-0.20250721072644-0af27496d542 h1:JM7CsrsbwBK//eF8WkPlvw0IIj4sHT5SN3xHMryZSmM= -kusionstack.io/kube-api v0.6.7-0.20250721072644-0af27496d542/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= -kusionstack.io/kube-api v0.7.0 h1:jYjNq9LbpqVCNPU4cQZJI17G5gwZGJbhgAYi0BaTe7g= -kusionstack.io/kube-api v0.7.0/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= -kusionstack.io/kube-api v0.7.1 h1:m67X6mI1kOAvMPjiAYQRmjlugt25hN7jyd4nJn7LkPY= -kusionstack.io/kube-api v0.7.1/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= -kusionstack.io/kube-api v0.7.2 h1:qVrOnvGaqi4iZn6BZP0dQd/buLKLyDvY4PjkUCLVoZY= -kusionstack.io/kube-api v0.7.2/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= -kusionstack.io/kube-api v0.7.3 h1:Fxj/E+aixUuCB0VekEvEx6ss24Jke7MvcbRwAXwVYBo= -kusionstack.io/kube-api v0.7.3/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= kusionstack.io/kube-api v0.7.4-0.20250727122744-2399b387a919 h1:EMcsFMNMZO3oW7pqGOZd1nJC6YyQSt2RrcGdpC1J1gU= kusionstack.io/kube-api v0.7.4-0.20250727122744-2399b387a919/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250825051758-ecb7ab4bd47e h1:VK9EoO8P+ugxQE8sgCjUsTdVI90HsLJq8xAneSzhl7E= +kusionstack.io/kube-api v0.7.4-0.20250825051758-ecb7ab4bd47e/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250825063046-b78dd5a27a1d h1:eW004SEhcfGC/oGz2MQrZa88oDEMEES5Di9uP24Wp/g= +kusionstack.io/kube-api v0.7.4-0.20250825063046-b78dd5a27a1d/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250825073953-1db5a342fb25 h1:tjRlORACDHvIMudOp4ixEuz9MUzk+7Im4VDA5gE3mCc= +kusionstack.io/kube-api v0.7.4-0.20250825073953-1db5a342fb25/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250825080129-95c4ef2f91f2 h1:tiS+ABL0do2ccK+8tu/LuAf1p9Wt+Nvh+SAscUg5yKs= +kusionstack.io/kube-api v0.7.4-0.20250825080129-95c4ef2f91f2/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250825084555-df5d951e621b h1:EPM4aBdfXpIVexIO5F7mpmoRzJZVkRPaa1WlvmajSTU= +kusionstack.io/kube-api v0.7.4-0.20250825084555-df5d951e621b/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250825085826-67aefdbf9caa h1:8g+NRdhR04R2RE1hBCyGNyNqHOu60nX59nHSRnO6wR0= +kusionstack.io/kube-api v0.7.4-0.20250825085826-67aefdbf9caa/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250826035958-c1b30201ed5c h1:2165AAx76Vr1xDFgaHTzPVyvW83mPeB2TIh/AjVcIZU= +kusionstack.io/kube-api v0.7.4-0.20250826035958-c1b30201ed5c/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250826112948-7aeb5f5204de h1:c7L4xDVkHjGgzAkYYh1k4LCQD3j77pUxwo6buUCPIq0= +kusionstack.io/kube-api v0.7.4-0.20250826112948-7aeb5f5204de/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250828082633-8a73d98e9bf8 h1:1ARYh14b7ksYh1Ci3liyT9jhV2rV1Lyuvnxy8a5WQZU= +kusionstack.io/kube-api v0.7.4-0.20250828082633-8a73d98e9bf8/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250902063249-7e329edd6ea1 h1:o56pmV/Ip0EcYgxNablcaw/5Cy+nXbcOz5hlvt7Nyyk= +kusionstack.io/kube-api v0.7.4-0.20250902063249-7e329edd6ea1/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250902064055-250905b1fce5 h1:1s8FDyAlDet4qOjGts9GEiWC9Eya//z8rSNR0fLV9M4= +kusionstack.io/kube-api v0.7.4-0.20250902064055-250905b1fce5/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250902065440-03af2146e073 h1:Kel/+qk5fFf9QQsqTJo7KP6PfL1FZNMRINpvjNRGRho= +kusionstack.io/kube-api v0.7.4-0.20250902065440-03af2146e073/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250902100032-7b62d7607345 h1:SBfFLyJR2qRh19/qlC45iGZlMnnTAkKWmXtHrIlJFc8= +kusionstack.io/kube-api v0.7.4-0.20250902100032-7b62d7607345/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= +kusionstack.io/kube-api v0.7.4-0.20250902122009-5447303fc5ee h1:s1X/0d8ryRzr6AQ1WwSZ1TKCrXsZzqYZSjEmwMRVju0= +kusionstack.io/kube-api v0.7.4-0.20250902122009-5447303fc5ee/go.mod h1:e1jtrQH2LK5fD2nTyfIXG6nYrYbU8VXShRxTRwVPaLk= kusionstack.io/kube-utils v0.2.1-0.20250613035327-11e9cdaec9d6 h1:HYE6Wa8EzSlA6UmaTLtNKUgkB2mmasp6Ul69d3/SpK0= kusionstack.io/kube-utils v0.2.1-0.20250613035327-11e9cdaec9d6/go.mod h1:5Uy3GCJ1JEGqZw/Sp/uVnHBJN1t9wjY6USPSZ9s4idk= -kusionstack.io/resourceconsist v0.0.2 h1:gf+c/LOMsiKoVR+GLzOomw8qcUbZbPckQLczZllNdVM= -kusionstack.io/resourceconsist v0.0.2/go.mod h1:4KKqnyCzTlUUQgAx7BlazLe+erMkAthTnspZOnc632M= +kusionstack.io/resourceconsist v0.0.4 h1:wRqLJuNh8O4TT6p0uOklFpHUKiRdRxcAH71Sw/q9LhE= +kusionstack.io/resourceconsist v0.0.4/go.mod h1:/mZWzD30euHSfEVx7WhzJO94+yONnqEwwYwV2EA8c0s= modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw= modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk= modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k= diff --git a/pkg/controllers/rolloutrun/control/control.go b/pkg/controllers/rolloutrun/control/control.go index ffa8e2e..d01848c 100644 --- a/pkg/controllers/rolloutrun/control/control.go +++ b/pkg/controllers/rolloutrun/control/control.go @@ -276,6 +276,92 @@ func (c *CanaryReleaseControl) applyCanaryDefaults(canaryObj client.Object) { }) } +type RollbackReleaseControl struct { + workload workload.Accessor + control workload.RollbackReleaseControl + client client.Client +} + +func NewRollbackReleaseControl(impl workload.Accessor, c client.Client) *RollbackReleaseControl { + return &RollbackReleaseControl{ + workload: impl, + control: impl.(workload.RollbackReleaseControl), + client: c, + } +} + +func (c *RollbackReleaseControl) Initialize(ctx context.Context, info *workload.Info, ownerKind, ownerName, rolloutRun string, batchIndex int32) error { + // pre-check + if !c.control.Rollbackable() { + return fmt.Errorf("workload is not support rollback") + } + + if err := c.control.RollbackPreCheck(info.Object); err != nil { + return TerminalError(err) + } + + // add progressing annotation + pInfo := rolloutv1alpha1.ProgressingInfo{ + Kind: ownerKind, + RolloutName: ownerName, + RolloutID: rolloutRun, + Rollback: &rolloutv1alpha1.BatchProgressingInfo{ + CurrentBatchIndex: batchIndex, + }, + } + progress, _ := json.Marshal(pInfo) + + _, err := info.UpdateOnConflict(ctx, c.client, func(obj client.Object) error { + utils.MutateAnnotations(obj, func(annotations map[string]string) { + annotations[rolloutapi.AnnoRolloutProgressingInfo] = string(progress) + }) + return nil + }) + + return err +} + +func (c *RollbackReleaseControl) Revert(ctx context.Context, info *workload.Info) error { + // pre-check + if !c.control.Rollbackable() { + return fmt.Errorf("workload is not support rollback") + } + + if err := c.control.RollbackPreCheck(info.Object); err != nil { + return TerminalError(err) + } + + _, err := info.UpdateOnConflict(ctx, c.client, func(obj client.Object) error { + return c.control.RevertRevision(ctx, c.client, obj) + }) + + return err +} + +func (c *RollbackReleaseControl) UpdatePartition(ctx context.Context, info *workload.Info, expectedUpdated int32) (bool, error) { + ctx = clusterinfo.WithCluster(ctx, info.ClusterName) + obj := info.Object + return utils.UpdateOnConflict(ctx, c.client, c.client, obj, func() error { + return c.control.ApplyPartition(obj, expectedUpdated) + }) +} + +func (c *RollbackReleaseControl) Finalize(ctx context.Context, info *workload.Info) error { + // delete progressing annotation + changed, err := info.UpdateOnConflict(ctx, c.client, func(obj client.Object) error { + utils.MutateAnnotations(obj, func(annotations map[string]string) { + delete(annotations, rolloutapi.AnnoRolloutProgressingInfo) + }) + return nil + }) + + if changed { + logger := logr.FromContextOrDiscard(ctx) + logger.Info("delete progressing info on workload", "name", info.Name, "gvk", info.GroupVersionKind.String()) + } + return err +} + // TerminalError is an error that will not be retried but still be logged // and recorded in metrics. // diff --git a/pkg/controllers/rolloutrun/executor/alias.go b/pkg/controllers/rolloutrun/executor/alias.go index 0a017de..7efb79a 100644 --- a/pkg/controllers/rolloutrun/executor/alias.go +++ b/pkg/controllers/rolloutrun/executor/alias.go @@ -19,13 +19,15 @@ package executor import rolloutv1alpha1 "kusionstack.io/kube-api/rollout/v1alpha1" const ( - StepNone = rolloutv1alpha1.RolloutStepNone - StepPending = rolloutv1alpha1.RolloutStepPending - StepPreCanaryStepHook = rolloutv1alpha1.RolloutStepPreCanaryStepHook - StepPreBatchStepHook = rolloutv1alpha1.RolloutStepPreBatchStepHook - StepRunning = rolloutv1alpha1.RolloutStepRunning - StepPostCanaryStepHook = rolloutv1alpha1.RolloutStepPostCanaryStepHook - StepPostBatchStepHook = rolloutv1alpha1.RolloutStepPostBatchStepHook - StepSucceeded = rolloutv1alpha1.RolloutStepSucceeded - StepResourceRecycling = rolloutv1alpha1.RolloutStepResourceRecycling + StepNone = rolloutv1alpha1.RolloutStepNone + StepPending = rolloutv1alpha1.RolloutStepPending + StepPreCanaryStepHook = rolloutv1alpha1.RolloutStepPreCanaryStepHook + StepPreBatchStepHook = rolloutv1alpha1.RolloutStepPreBatchStepHook + StepPreRollbackStepHook = rolloutv1alpha1.RolloutStepPreRollbackStepHook + StepRunning = rolloutv1alpha1.RolloutStepRunning + StepPostCanaryStepHook = rolloutv1alpha1.RolloutStepPostCanaryStepHook + StepPostBatchStepHook = rolloutv1alpha1.RolloutStepPostBatchStepHook + StepPostRollbackStepHook = rolloutv1alpha1.RolloutStepPostRollbackStepHook + StepSucceeded = rolloutv1alpha1.RolloutStepSucceeded + StepResourceRecycling = rolloutv1alpha1.RolloutStepResourceRecycling ) diff --git a/pkg/controllers/rolloutrun/executor/batch.go b/pkg/controllers/rolloutrun/executor/batch.go index e619a02..e5098ee 100644 --- a/pkg/controllers/rolloutrun/executor/batch.go +++ b/pkg/controllers/rolloutrun/executor/batch.go @@ -221,7 +221,7 @@ func (e *batchExecutor) doBatchUpgrading(ctx *ExecutorContext) (bool, time.Durat allWorkloadReady = false logger.V(3).Info("still waiting for target to be ready", "target", item.CrossClusterObjectNameReference) - expectedReplicas, err := e.calculateExpectedReplicasBySlidingWindow(status, currentBatchExpectedReplicas, item.ReplicaSlidingWindow) + expectedReplicas, err := calculateExpectedReplicasBySlidingWindow(status, currentBatchExpectedReplicas, item.ReplicaSlidingWindow) if err != nil { return false, retryStop, err } @@ -250,7 +250,7 @@ func (e *batchExecutor) doBatchUpgrading(ctx *ExecutorContext) (bool, time.Durat // calculateExpectedReplicasBySlidingWindow calculate expected replicas by sliding window // if window is nil, return currentBatchExpectedReplicas // if window is not nil, return min(currentBatchExpectedReplicas, updatedAvailableReplicas + increment) -func (e *batchExecutor) calculateExpectedReplicasBySlidingWindow(status rolloutv1alpha1.RolloutWorkloadStatus, currentBatchExpectedReplicas int32, window *intstr.IntOrString) (int32, error) { +func calculateExpectedReplicasBySlidingWindow(status rolloutv1alpha1.RolloutWorkloadStatus, currentBatchExpectedReplicas int32, window *intstr.IntOrString) (int32, error) { if window == nil { return currentBatchExpectedReplicas, nil } diff --git a/pkg/controllers/rolloutrun/executor/context.go b/pkg/controllers/rolloutrun/executor/context.go index 6471523..559ea5b 100644 --- a/pkg/controllers/rolloutrun/executor/context.go +++ b/pkg/controllers/rolloutrun/executor/context.go @@ -86,6 +86,28 @@ func (c *ExecutorContext) Initialize() { newStatus.BatchStatus.Records = newStatus.BatchStatus.Records[:specBatchSize] } } + + // init RollbackStatus + if c.RolloutRun.Spec.Rollback != nil { + if newStatus.RollbackStatus == nil { + newStatus.RollbackStatus = &rolloutv1alpha1.RolloutRunBatchStatus{} + } + // resize records + specBatchSize := len(c.RolloutRun.Spec.Rollback.Batches) + statusBatchSize := len(newStatus.RollbackStatus.Records) + if specBatchSize > statusBatchSize { + for i := 0; i < specBatchSize-statusBatchSize; i++ { + newStatus.RollbackStatus.Records = append(newStatus.RollbackStatus.Records, + rolloutv1alpha1.RolloutRunStepStatus{ + Index: ptr.To(int32(statusBatchSize + i)), + State: StepNone, + }, + ) + } + } else if specBatchSize < statusBatchSize { + newStatus.RollbackStatus.Records = newStatus.RollbackStatus.Records[:specBatchSize] + } + } }) } @@ -107,7 +129,10 @@ func (c *ExecutorContext) GetWebhooksAndLatestStatusBy(hookType rolloutv1alpha1. return nil, nil } var webhookStatuses []rolloutv1alpha1.RolloutWebhookStatus - if c.inCanary() { + if c.inRollback() { + index := newStatus.RollbackStatus.CurrentBatchIndex + webhookStatuses = newStatus.RollbackStatus.Records[index].Webhooks + } else if c.inCanary() { webhookStatuses = newStatus.CanaryStatus.Webhooks } else { index := newStatus.BatchStatus.CurrentBatchIndex @@ -127,7 +152,10 @@ func (c *ExecutorContext) SetWebhookStatus(status rolloutv1alpha1.RolloutWebhook c.Initialize() newStatus := c.NewStatus - if c.inCanary() { + if c.inRollback() { + index := newStatus.RollbackStatus.CurrentBatchIndex + newStatus.RollbackStatus.Records[index].Webhooks = appendWebhookStatus(newStatus.RollbackStatus.Records[index].Webhooks, status) + } else if c.inCanary() { newStatus.CanaryStatus.Webhooks = appendWebhookStatus(newStatus.CanaryStatus.Webhooks, status) } else { index := newStatus.BatchStatus.CurrentBatchIndex @@ -140,7 +168,9 @@ func isFinalStepState(state rolloutv1alpha1.RolloutStepState) bool { } func (c *ExecutorContext) GetCurrentState() (string, rolloutv1alpha1.RolloutStepState) { - if c.inCanary() { + if c.inRollback() { + return "rollback", c.NewStatus.RollbackStatus.CurrentBatchState + } else if c.inCanary() { return "canary", c.NewStatus.CanaryStatus.State } else { return "batch", c.NewStatus.BatchStatus.CurrentBatchState @@ -151,7 +181,16 @@ func (c *ExecutorContext) MoveToNextState(nextState rolloutv1alpha1.RolloutStepS c.Initialize() newStatus := c.NewStatus - if c.inCanary() { + if c.inRollback() { + index := newStatus.RollbackStatus.CurrentBatchIndex + newStatus.RollbackStatus.CurrentBatchState = nextState + newStatus.RollbackStatus.Records[index].State = nextState + if nextState == StepPreRollbackStepHook { + newStatus.RollbackStatus.Records[index].StartTime = ptr.To(metav1.Now()) + } else if isFinalStepState(nextState) { + newStatus.RollbackStatus.Records[index].FinishTime = ptr.To(metav1.Now()) + } + } else if c.inCanary() { newStatus.CanaryStatus.State = nextState if nextState == StepPreCanaryStepHook { newStatus.CanaryStatus.StartTime = ptr.To(metav1.Now()) @@ -174,7 +213,22 @@ func (c *ExecutorContext) SkipCurrentRelease() { c.Initialize() newStatus := c.NewStatus - if c.inCanary() { + if c.inRollback() { + newStatus.RollbackStatus.CurrentBatchIndex = int32(len(c.RolloutRun.Spec.Rollback.Batches) - 1) + newStatus.RollbackStatus.CurrentBatchState = StepSucceeded + for i := range newStatus.RollbackStatus.Records { + if newStatus.RollbackStatus.Records[i].State == StepNone || + newStatus.RollbackStatus.Records[i].State == StepPending { + newStatus.RollbackStatus.Records[i].State = StepSucceeded + } + if newStatus.RollbackStatus.Records[i].StartTime == nil { + newStatus.RollbackStatus.Records[i].StartTime = ptr.To(metav1.Now()) + } + if newStatus.RollbackStatus.Records[i].FinishTime == nil { + newStatus.RollbackStatus.Records[i].FinishTime = ptr.To(metav1.Now()) + } + } + } else if c.inCanary() { newStatus.CanaryStatus.State = StepSucceeded if newStatus.CanaryStatus.StartTime == nil { newStatus.CanaryStatus.StartTime = ptr.To(metav1.Now()) @@ -258,6 +312,51 @@ func (r *ExecutorContext) inCanary() bool { return false } +func (r *ExecutorContext) inBatchGray() bool { + // todo: need to consider case of every batch gray + r.Initialize() + run := r.RolloutRun + newStatus := r.NewStatus + if newStatus.BatchStatus == nil { + return false + } + + currentBatchIndex := newStatus.BatchStatus.CurrentBatchIndex + currentBatch := run.Spec.Batch.Batches[currentBatchIndex] + if currentBatch.Traffic == nil || int(currentBatchIndex+1) == len(run.Spec.Batch.Batches) { + return false + } + + currentBatchState := newStatus.BatchStatus.CurrentBatchState + if !isFinalStepState(currentBatchState) { + return true + } + + if int(currentBatchIndex+1) < len(run.Spec.Batch.Batches) { + nextBatch := run.Spec.Batch.Batches[currentBatchIndex+1] + if nextBatch.Traffic != nil { + return true + } + } + + return false +} + +func (r *ExecutorContext) inRollback() bool { + r.Initialize() + run := r.RolloutRun + newStatus := r.NewStatus + if newStatus.RollbackStatus == nil { + return false + } + if run.Spec.Rollback != nil && len(run.Spec.Rollback.Batches) > 0 { + if newStatus.Phase != rolloutv1alpha1.RolloutRunPhaseRollbacked { + return true + } + } + return false +} + func (r *ExecutorContext) makeRolloutWebhookReview(hookType rolloutv1alpha1.HookType, webhook rolloutv1alpha1.RolloutWebhook) rolloutv1alpha1.RolloutWebhookReview { r.Initialize() @@ -279,7 +378,13 @@ func (r *ExecutorContext) makeRolloutWebhookReview(hookType rolloutv1alpha1.Hook }, } - if r.inCanary() { + if r.inRollback() { + review.Spec.Rollback = &rolloutv1alpha1.RolloutWebhookReviewBatch{ + BatchIndex: newStatus.RollbackStatus.CurrentBatchIndex, + Targets: rolloutRun.Spec.Rollback.Batches[newStatus.RollbackStatus.CurrentBatchIndex].Targets, + Properties: rolloutRun.Spec.Rollback.Batches[newStatus.RollbackStatus.CurrentBatchIndex].Properties, + } + } else if r.inCanary() { review.Spec.Canary = &rolloutv1alpha1.RolloutWebhookReviewCanary{ Targets: rolloutRun.Spec.Canary.Targets, Properties: rolloutRun.Spec.Canary.Properties, @@ -322,3 +427,12 @@ func (e *ExecutorContext) GetBatchLogger() logr.Logger { func (e *ExecutorContext) GetCanaryLogger() logr.Logger { return e.GetLogger().WithValues("step", "canary") } + +func (e *ExecutorContext) GetRollbackLogger() logr.Logger { + e.Initialize() + l := e.GetLogger().WithValues("step", "rollback") + if e.NewStatus != nil && e.NewStatus.RollbackStatus != nil { + l = l.WithValues("rollbackIndex", e.NewStatus.RollbackStatus.CurrentBatchIndex) + } + return l +} diff --git a/pkg/controllers/rolloutrun/executor/default.go b/pkg/controllers/rolloutrun/executor/default.go index ff5ba99..a081397 100644 --- a/pkg/controllers/rolloutrun/executor/default.go +++ b/pkg/controllers/rolloutrun/executor/default.go @@ -6,25 +6,29 @@ import ( "github.com/go-logr/logr" rolloutapis "kusionstack.io/kube-api/rollout" rolloutv1alpha1 "kusionstack.io/kube-api/rollout/v1alpha1" + "kusionstack.io/kube-api/rollout/v1alpha1/condition" ctrl "sigs.k8s.io/controller-runtime" "kusionstack.io/rollout/pkg/utils" ) type Executor struct { - logger logr.Logger - canary *canaryExecutor - batch *batchExecutor + logger logr.Logger + canary *canaryExecutor + batch *batchExecutor + rollback *rollbackExecutor } func NewDefaultExecutor(logger logr.Logger) *Executor { webhookExec := newWebhookExecutor(time.Second) canaryExec := newCanaryExecutor(webhookExec) batchExec := newBatchExecutor(webhookExec) + rollbackExec := newRollbackExecutor(webhookExec) e := &Executor{ - logger: logger, - canary: canaryExec, - batch: batchExec, + logger: logger, + canary: canaryExec, + batch: batchExec, + rollback: rollbackExec, } return e } @@ -67,6 +71,21 @@ func (r *Executor) lifecycle(executorContext *ExecutorContext) (done bool, resul return false, result, nil } + // determine whether to transit rolloutrun phase to rollbacking or not + if rolloutRun.Spec.Rollback != nil && len(rolloutRun.Spec.Rollback.Batches) > 0 && rolloutRun.DeletionTimestamp.IsZero() && + newStatus.Phase != rolloutv1alpha1.RolloutRunPhaseCanceling && newStatus.Phase != rolloutv1alpha1.RolloutRunPhaseRollbacking { + if newStatus.Phase == rolloutv1alpha1.RolloutRunPhasePaused { + progressingCond := condition.GetCondition(rolloutRun.Status.Conditions, rolloutv1alpha1.RolloutConditionProgressing) + if progressingCond == nil || progressingCond.Reason != rolloutv1alpha1.RolloutReasonProgressingRollbacking { + newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseRollbacking + return false, result, nil + } + } else { + newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseRollbacking + return false, result, nil + } + } + switch newStatus.Phase { case rolloutv1alpha1.RolloutRunPhaseInitial: newStatus.Phase = rolloutv1alpha1.RolloutRunPhasePreRollout @@ -86,12 +105,18 @@ func (r *Executor) lifecycle(executorContext *ExecutorContext) (done bool, resul if processingDone { newStatus.Phase = rolloutv1alpha1.RolloutRunPhasePostRollout } + case rolloutv1alpha1.RolloutRunPhaseRollbacking: + var rollbacked bool + rollbacked, result, err = r.doRollbacking(executorContext) + if rollbacked { + newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseRollbacked + } case rolloutv1alpha1.RolloutRunPhasePostRollout: newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseSucceeded case rolloutv1alpha1.RolloutRunPhasePaused: // rolloutRun is paused, do not requeue result.Requeue = false - case rolloutv1alpha1.RolloutRunPhaseSucceeded, rolloutv1alpha1.RolloutRunPhaseCanceled: + case rolloutv1alpha1.RolloutRunPhaseSucceeded, rolloutv1alpha1.RolloutRunPhaseCanceled, rolloutv1alpha1.RolloutRunPhaseRollbacked: done = true result.Requeue = false } @@ -149,6 +174,13 @@ func (r *Executor) doCanceling(ctx *ExecutorContext) (bool, ctrl.Result, error) rolloutRun := ctx.RolloutRun newStatus := ctx.NewStatus + if ctx.inRollback() { + // init RollbackStatus + if len(newStatus.RollbackStatus.CurrentBatchState) == 0 { + newStatus.RollbackStatus.CurrentBatchState = StepNone + } + return r.rollback.Cancel(ctx) + } if ctx.inCanary() { canceled, result, err := r.canary.Cancel(ctx) if err != nil { @@ -166,3 +198,38 @@ func (r *Executor) doCanceling(ctx *ExecutorContext) (bool, ctrl.Result, error) return true, ctrl.Result{Requeue: true}, nil } + +func (r *Executor) doRollbacking(ctx *ExecutorContext) (bool, ctrl.Result, error) { + rolloutRun := ctx.RolloutRun + newStatus := ctx.NewStatus + + logger := ctx.GetLogger() + + if newStatus.Error != nil { + // if error occurred, do nothing + return false, ctrl.Result{Requeue: true}, nil + } + + if rolloutRun.Spec.Rollback != nil && len(rolloutRun.Spec.Rollback.Batches) > 0 { + // init RollbackStatus + if len(newStatus.RollbackStatus.CurrentBatchState) == 0 { + newStatus.RollbackStatus.CurrentBatchState = StepNone + } + preCurrentBatchIndex := newStatus.RollbackStatus.CurrentBatchIndex + preCurrentBatchState := newStatus.RollbackStatus.CurrentBatchState + defer func() { + if preCurrentBatchIndex != newStatus.RollbackStatus.CurrentBatchIndex || + preCurrentBatchState != newStatus.RollbackStatus.CurrentBatchState { + logger.Info("rollback batch state trasition", + "current.index", preCurrentBatchIndex, + "current.state", preCurrentBatchState, + "next.index", newStatus.RollbackStatus.CurrentBatchIndex, + "next.state", newStatus.RollbackStatus.CurrentBatchState, + ) + } + }() + return r.rollback.Do(ctx) + } + + return true, ctrl.Result{Requeue: true}, nil +} diff --git a/pkg/controllers/rolloutrun/executor/do_command.go b/pkg/controllers/rolloutrun/executor/do_command.go index a2bad55..34dbca7 100644 --- a/pkg/controllers/rolloutrun/executor/do_command.go +++ b/pkg/controllers/rolloutrun/executor/do_command.go @@ -23,7 +23,11 @@ func (r *Executor) doCommand(ctx *ExecutorContext) ctrl.Result { newStatus.Phase = rolloutv1alpha1.RolloutRunPhasePausing case rolloutapis.AnnoManualCommandResume, rolloutapis.AnnoManualCommandContinue: // nolint if newStatus.Phase == rolloutv1alpha1.RolloutRunPhasePaused { - newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseProgressing + if rolloutRun.Spec.Rollback != nil && len(rolloutRun.Spec.Rollback.Batches) > 0 { + newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseRollbacking + } else { + newStatus.Phase = rolloutv1alpha1.RolloutRunPhaseProgressing + } } case rolloutapis.AnnoManualCommandRetry: if batchError != nil { diff --git a/pkg/controllers/rolloutrun/executor/rollback.go b/pkg/controllers/rolloutrun/executor/rollback.go new file mode 100644 index 0000000..e49a62f --- /dev/null +++ b/pkg/controllers/rolloutrun/executor/rollback.go @@ -0,0 +1,448 @@ +/** + * Copyright 2024 The KusionStack Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package executor + +import ( + "fmt" + "time" + + "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + rolloutv1alpha1 "kusionstack.io/kube-api/rollout/v1alpha1" + "kusionstack.io/kube-api/rollout/v1alpha1/condition" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + "kusionstack.io/rollout/pkg/controllers/rolloutrun/control" + "kusionstack.io/rollout/pkg/workload" +) + +func newDoRollbackError(reason, msg string) *rolloutv1alpha1.CodeReasonMessage { + return &rolloutv1alpha1.CodeReasonMessage{ + Code: "DoRollbackError", + Reason: reason, + Message: msg, + } +} + +type rollbackExecutor struct { + webhook webhookExecutor + stateEngine *stepStateEngine +} + +func newRollbackExecutor(webhook webhookExecutor) *rollbackExecutor { + e := &rollbackExecutor{ + webhook: webhook, + stateEngine: newStepStateEngine(), + } + + e.stateEngine.add(StepNone, StepPending, e.doPausing, e.release) + e.stateEngine.add(StepPending, StepPreRollbackStepHook, skipStep, e.release) + e.stateEngine.add(StepPreRollbackStepHook, StepRunning, e.doPreStepHook, e.release) + e.stateEngine.add(StepRunning, StepPostRollbackStepHook, e.doBatchUpgrading, e.release) + e.stateEngine.add(StepPostRollbackStepHook, StepResourceRecycling, e.doPostStepHook, e.release) + e.stateEngine.add(StepResourceRecycling, StepSucceeded, e.doFinalize, e.release) + e.stateEngine.add(StepSucceeded, "", skipStep, skipStep) + + return e +} + +func (e *rollbackExecutor) init(ctx *ExecutorContext) (done bool) { + logger := ctx.GetRollbackLogger() + if !e.isSupported(ctx) { + // skip rollback release if workload accessor don't support it. + logger.Info("workload accessor don't support rollback release, skip it") + ctx.SkipCurrentRelease() + return true + } + + if ctx.inCanary() { + ctx.TrafficManager.With(ctx.RolloutRun.Spec.Canary.Targets, ctx.RolloutRun.Spec.Canary.Traffic) + } else if ctx.inBatchGray() { + currentBatchIndex := ctx.NewStatus.BatchStatus.CurrentBatchIndex + currentBatch := ctx.RolloutRun.Spec.Batch.Batches[currentBatchIndex] + ctx.TrafficManager.With(currentBatch.Targets, currentBatch.Traffic) + } + return false +} + +func (e *rollbackExecutor) Do(ctx *ExecutorContext) (done bool, result ctrl.Result, err error) { + if e.init(ctx) { + return true, ctrl.Result{Requeue: true}, nil + } + + logger := ctx.GetRollbackLogger() + newStatus := ctx.NewStatus + currentBatchIndex := newStatus.RollbackStatus.CurrentBatchIndex + currentState := newStatus.RollbackStatus.CurrentBatchState + + // revert workload version and reset canary route before rollback stateEngine start + if currentBatchIndex == 0 && currentState == StepNone { + // revert revision + done, err := e.revert(ctx) + if !done { + logger.Error(err, "revert workload revision failed") + return false, ctrl.Result{}, err + } + // modify condition reason to rollbacking for progressing condition type after reverting revision, to prevent repeated revision reverting in each reconcile + newCondition := condition.NewCondition( + rolloutv1alpha1.RolloutConditionProgressing, + metav1.ConditionTrue, + rolloutv1alpha1.RolloutReasonProgressingRollbacking, + "rolloutRun is rollbacking", + ) + newStatus.Conditions = condition.SetCondition(newStatus.Conditions, *newCondition) + + // delete canary route + done, retry := e.deleteCanaryRoute(ctx) + if !done { + return false, ctrl.Result{RequeueAfter: retry}, nil + } + + // if in canary release, recycle canary resource before rollback batch progress + if ctx.inCanary() { + done, retry, err = e.recycle(ctx) + if !done { + if err != nil { + logger.Error(err, "recyle canary resource in canary batch failed") + } + switch retry { + case retryStop: + result = ctrl.Result{} + default: + result = ctrl.Result{RequeueAfter: retry} + } + return false, result, err + } + } + } + + stepDone, result, err := e.stateEngine.do(ctx, currentState) + if err != nil { + return false, result, err + } + if !stepDone { + return false, result, nil + } + + if int(currentBatchIndex+1) < len(ctx.RolloutRun.Spec.Rollback.Batches) { + // move to next batch + newStatus.RollbackStatus.CurrentBatchState = StepNone + newStatus.RollbackStatus.CurrentBatchIndex = currentBatchIndex + 1 + return false, result, nil + } + + // recyle cananry resource after rollback batch progress + recycleDone, retry := e.recyleCanaryResource(ctx) + if !recycleDone { + logger.Info("recyle canary resource after rollbacking failed, retry later", "rolloutrun", ctx.RolloutRun.Name) + return false, ctrl.Result{RequeueAfter: retry}, nil + } + + return true, result, nil +} + +func (e *rollbackExecutor) Cancel(ctx *ExecutorContext) (done bool, result ctrl.Result, err error) { + done = e.init(ctx) + if done { + return true, ctrl.Result{Requeue: true}, nil + } + + return e.stateEngine.cancel(ctx, ctx.NewStatus.RollbackStatus.CurrentBatchState) +} + +func (e *rollbackExecutor) isSupported(ctx *ExecutorContext) bool { + _, ok := ctx.Accessor.(workload.RollbackReleaseControl) + return ok +} + +func (e *rollbackExecutor) doPausing(ctx *ExecutorContext) (bool, time.Duration, error) { + rolloutRunName := ctx.RolloutRun.Name + newStatus := ctx.NewStatus + currentBatchIndex := newStatus.RollbackStatus.CurrentBatchIndex + currentBatch := ctx.RolloutRun.Spec.Rollback.Batches[currentBatchIndex] + + rollbackControl := control.NewRollbackReleaseControl(ctx.Accessor, ctx.Client) + + for _, item := range currentBatch.Targets { + wi := ctx.Workloads.Get(item.Cluster, item.Name) + if wi == nil { + return false, retryStop, newWorkloadNotFoundError(item.CrossClusterObjectNameReference) + } + err := rollbackControl.Initialize(ctx, wi, ctx.OwnerKind, ctx.OwnerName, rolloutRunName, currentBatchIndex) + if err != nil { + return false, retryStop, err + } + } + + if ctx.RolloutRun.Spec.Rollback.Batches[currentBatchIndex].Breakpoint { + ctx.Pause() + } + return true, retryImmediately, nil +} + +func (e *rollbackExecutor) doPreStepHook(ctx *ExecutorContext) (bool, time.Duration, error) { + return e.webhook.Do(ctx, rolloutv1alpha1.PreRollbackStepHook) +} + +func (e *rollbackExecutor) doPostStepHook(ctx *ExecutorContext) (bool, time.Duration, error) { + return e.webhook.Do(ctx, rolloutv1alpha1.PostRollbackStepHook) +} + +// doBatchUpgrading process upgrading state +func (e *rollbackExecutor) doBatchUpgrading(ctx *ExecutorContext) (bool, time.Duration, error) { + rolloutRun := ctx.RolloutRun + newStatus := ctx.NewStatus + currentBatchIndex := newStatus.RollbackStatus.CurrentBatchIndex + currentBatch := rolloutRun.Spec.Rollback.Batches[currentBatchIndex] + + logger := ctx.GetBatchLogger() + + rollbackControl := control.NewRollbackReleaseControl(ctx.Accessor, ctx.Client) + + batchTargetStatuses := make([]rolloutv1alpha1.RolloutWorkloadStatus, 0) + + allWorkloadReady := true + for _, item := range currentBatch.Targets { + info := ctx.Workloads.Get(item.Cluster, item.Name) + if info == nil { + // If the target workload does not exist, the retries will stop. + return false, retryStop, newWorkloadNotFoundError(item.CrossClusterObjectNameReference) + } + + status := info.APIStatus() + batchTargetStatuses = append(batchTargetStatuses, info.APIStatus()) + + currentBatchExpectedReplicas, _ := workload.CalculateUpdatedReplicas(&status.Replicas, item.Replicas) + + if info.CheckUpdatedReady(currentBatchExpectedReplicas) { + // if the target is ready, we will not change partition + continue + } + + allWorkloadReady = false + logger.V(3).Info("still waiting for target to be ready", "target", item.CrossClusterObjectNameReference) + + expectedReplicas, err := calculateExpectedReplicasBySlidingWindow(status, currentBatchExpectedReplicas, item.ReplicaSlidingWindow) + if err != nil { + return false, retryStop, err + } + + // ensure partition: upgradePartition is an idempotent function + changed, err := rollbackControl.UpdatePartition(ctx, info, expectedReplicas) + if err != nil { + return false, retryStop, err + } + if changed { + logger.V(2).Info("upgrade target partition", "target", item.CrossClusterObjectNameReference, "partition", expectedReplicas) + } + } + + // update target status in rollback + newStatus.RollbackStatus.Records[currentBatchIndex].Targets = batchTargetStatuses + + if allWorkloadReady { + return true, retryImmediately, nil + } + + // wait for next reconcile + return false, retryDefault, nil +} + +func (e *rollbackExecutor) revert(ctx *ExecutorContext) (bool, error) { + newStatus := ctx.NewStatus + progressingCond := condition.GetCondition(ctx.RolloutRun.Status.Conditions, rolloutv1alpha1.RolloutConditionProgressing) + if progressingCond != nil && progressingCond.Reason == rolloutv1alpha1.RolloutReasonProgressingRollbacking { + return true, nil + } + + currentBatchIndex := newStatus.RollbackStatus.CurrentBatchIndex + currentBatch := ctx.RolloutRun.Spec.Rollback.Batches[currentBatchIndex] + + rollbackControl := control.NewRollbackReleaseControl(ctx.Accessor, ctx.Client) + + for _, item := range currentBatch.Targets { + wi := ctx.Workloads.Get(item.Cluster, item.Name) + if wi == nil { + return false, newWorkloadNotFoundError(item.CrossClusterObjectNameReference) + } + err := rollbackControl.Revert(ctx, wi) + if err != nil { + return false, err + } + } + return true, nil +} + +func (e *rollbackExecutor) deleteCanaryRoute(ctx *ExecutorContext) (bool, time.Duration) { + if !ctx.inCanary() && !ctx.inBatchGray() { + return true, retryDefault + } + + done, retry := e.modifyTraffic(ctx, "deleteCanaryRoute") + if !done { + return false, retry + } + + return true, retryDefault +} + +func (e *rollbackExecutor) deleteCanaryWorkloads(ctx *ExecutorContext) (bool, time.Duration, error) { + if !ctx.inCanary() { + return true, retryDefault, nil + } + + rolloutRun := ctx.RolloutRun + releaseControl := control.NewCanaryReleaseControl(ctx.Accessor, ctx.Client) + + for _, item := range rolloutRun.Spec.Canary.Targets { + wi := ctx.Workloads.Get(item.Cluster, item.Name) + if wi == nil { + return false, retryStop, newWorkloadNotFoundError(item.CrossClusterObjectNameReference) + } + + if err := releaseControl.Finalize(ctx, wi); err != nil { + return false, retryStop, newDoRollbackError( + "FailedFinalize", + fmt.Sprintf("failed to delete canary resource for workload(%s), err: %v", item.CrossClusterObjectNameReference, err), + ) + } + } + + return true, retryDefault, nil +} + +func (e *rollbackExecutor) recyleCanaryResource(ctx *ExecutorContext) (bool, time.Duration) { + if !ctx.inCanary() && !ctx.inBatchGray() { + return true, retryDefault + } + + done, retry := e.modifyTraffic(ctx, "resetRoute") + if !done { + return false, retry + } + + done, retry = e.modifyTraffic(ctx, "deleteForkedBackends") + if !done { + return false, retry + } + + return true, retryDefault +} + +func (e *rollbackExecutor) recycle(ctx *ExecutorContext) (bool, time.Duration, error) { + done, retry, err := e.deleteCanaryWorkloads(ctx) + if !done { + return false, retry, err + } + + done, retry = e.recyleCanaryResource(ctx) + if !done { + return false, retry, nil + } + + return true, retryDefault, nil +} + +func (e *rollbackExecutor) doFinalize(ctx *ExecutorContext) (bool, time.Duration, error) { + // recycling only work on last batch now + if int(ctx.NewStatus.RollbackStatus.CurrentBatchIndex+1) < len(ctx.RolloutRun.Spec.Rollback.Batches) { + return true, retryImmediately, nil + } + return e.release(ctx) +} + +func (e *rollbackExecutor) release(ctx *ExecutorContext) (bool, time.Duration, error) { + // frstly try to stop webhook + e.webhook.Cancel(ctx) + + // try to finalize all workloads + allTargets := map[rolloutv1alpha1.CrossClusterObjectNameReference]bool{} + // finalize rollback release + rollbackControl := control.NewRollbackReleaseControl(ctx.Accessor, ctx.Client) + + for _, item := range ctx.RolloutRun.Spec.Rollback.Batches { + for _, target := range item.Targets { + allTargets[target.CrossClusterObjectNameReference] = true + } + } + + var finalizeErrs []error + + for target := range allTargets { + wi := ctx.Workloads.Get(target.Cluster, target.Name) + if wi == nil { + // ignore not found workload + continue + } + err := rollbackControl.Finalize(ctx, wi) + if err != nil { + // try our best to finalize all workloasd + finalizeErrs = append(finalizeErrs, err) + continue + } + } + + if len(finalizeErrs) > 0 { + return false, retryDefault, utilerrors.NewAggregate(finalizeErrs) + } + + return true, retryImmediately, nil +} + +func (e *rollbackExecutor) modifyTraffic(ctx *ExecutorContext, op string) (bool, time.Duration) { + logger := ctx.GetCanaryLogger() + rolloutRun := ctx.RolloutRun + currentBatchIndex := ctx.NewStatus.BatchStatus.CurrentBatchIndex + opResult := controllerutil.OperationResultNone + + if rolloutRun.Spec.Canary.Traffic == nil && rolloutRun.Spec.Batch.Batches[currentBatchIndex].Traffic == nil { + logger.V(3).Info("traffic is nil, skip modify traffic") + return true, retryImmediately + } + + goctx := logr.NewContext(ctx.Context, logger) + + var err error + switch op { + case "deleteCanaryRoute": + opResult, err = ctx.TrafficManager.DeleteCanaryRoute(goctx) + case "resetRoute": + opResult, err = ctx.TrafficManager.ResetRoute(goctx) + case "deleteForkedBackends": + opResult, err = ctx.TrafficManager.DeleteForkedBackends(goctx) + } + if err != nil { + logger.Error(err, "failed to modify traffic", "operation", op) + return false, retryDefault + } + + if opResult != controllerutil.OperationResultNone { + logger.Info("modify traffic routing", "operation", op, "result", opResult) + // check next time + return false, retryDefault + } + + // 1.b. waiting for traffic + ready := ctx.TrafficManager.CheckReady(ctx) + if !ready { + return false, retryDefault + } + + return true, retryImmediately +} diff --git a/pkg/controllers/rolloutrun/rolloutrun_controller.go b/pkg/controllers/rolloutrun/rolloutrun_controller.go index 7152aea..da90593 100644 --- a/pkg/controllers/rolloutrun/rolloutrun_controller.go +++ b/pkg/controllers/rolloutrun/rolloutrun_controller.go @@ -270,10 +270,22 @@ func (r *RolloutRunReconciler) syncRolloutRun( rolloutv1alpha1.RolloutReasonProgressingCompleted, "rolloutRun is completed", ) - if newStatus.Phase == rolloutv1alpha1.RolloutRunPhaseCanceled { - newCondition.Reason = rolloutv1alpha1.RolloutReasonProgressingCanceled - newCondition.Message = "rolloutRun is canceled" - } + newStatus.Conditions = condition.SetCondition(newStatus.Conditions, *newCondition) + } else if newStatus.Phase == rolloutv1alpha1.RolloutRunPhaseCanceled { + newCondition := condition.NewCondition( + rolloutv1alpha1.RolloutConditionProgressing, + metav1.ConditionFalse, + rolloutv1alpha1.RolloutReasonProgressingCanceled, + "rolloutRun is canceled", + ) + newStatus.Conditions = condition.SetCondition(newStatus.Conditions, *newCondition) + } else if newStatus.Phase == rolloutv1alpha1.RolloutRunPhaseRollbacked { + newCondition := condition.NewCondition( + rolloutv1alpha1.RolloutConditionProgressing, + metav1.ConditionFalse, + rolloutv1alpha1.RolloutReasonProgressingRollbacked, + "rolloutRun is rollbacked", + ) newStatus.Conditions = condition.SetCondition(newStatus.Conditions, *newCondition) } else if newStatus.Error != nil { newCondition := condition.NewCondition( @@ -283,7 +295,7 @@ func (r *RolloutRunReconciler) syncRolloutRun( "rolloutRun stop rolling since error exist", ) newStatus.Conditions = condition.SetCondition(newStatus.Conditions, *newCondition) - } else { + } else if obj.Spec.Rollback == nil || len(obj.Spec.Rollback.Batches) == 0 { newCondition := condition.NewCondition( rolloutv1alpha1.RolloutConditionProgressing, metav1.ConditionTrue, diff --git a/pkg/workload/collaset/release.go b/pkg/workload/collaset/release.go index 918edb3..467610e 100644 --- a/pkg/workload/collaset/release.go +++ b/pkg/workload/collaset/release.go @@ -17,6 +17,7 @@ package collaset import ( + "context" "fmt" "maps" @@ -29,8 +30,9 @@ import ( ) var ( - _ workload.CanaryReleaseControl = &accessorImpl{} - _ workload.BatchReleaseControl = &accessorImpl{} + _ workload.CanaryReleaseControl = &accessorImpl{} + _ workload.BatchReleaseControl = &accessorImpl{} + _ workload.RollbackReleaseControl = &accessorImpl{} ) func (c *accessorImpl) BatchPreCheck(object client.Object) error { @@ -95,6 +97,18 @@ func (c *accessorImpl) ApplyCanaryPatch(object client.Object, podTemplatePatch * return nil } +func (c *accessorImpl) Rollbackable() bool { + return false +} + +func (c *accessorImpl) RollbackPreCheck(object client.Object) error { + return nil +} + +func (c *accessorImpl) RevertRevision(ctx context.Context, cc client.Client, object client.Object) error { + return nil +} + func applyPodTemplateMetadataPatch(obj *operatingv1alpha1.CollaSet, patch *rolloutv1alpha1.MetadataPatch) { if patch == nil { return diff --git a/pkg/workload/info.go b/pkg/workload/info.go index df5f939..097ba7a 100644 --- a/pkg/workload/info.go +++ b/pkg/workload/info.go @@ -20,10 +20,13 @@ import ( "context" "fmt" "reflect" + "sort" + appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" rolloutv1alpha1 "kusionstack.io/kube-api/rollout/v1alpha1" @@ -127,6 +130,51 @@ func (o *Info) UpdateOnConflict(ctx context.Context, c client.Client, mutateFn f return updated, nil } +func (o *Info) GetPreviousRevision(ctx context.Context, c client.Client, matchLabels map[string]string) (*appsv1.ControllerRevision, error) { + crs := &appsv1.ControllerRevisionList{} + err := c.List(clusterinfo.WithCluster(ctx, o.GetClusterName()), crs, &client.ListOptions{ + Namespace: o.GetNamespace(), + LabelSelector: labels.SelectorFromSet(matchLabels), + }) + if err != nil { + return nil, err + } + + revisions := make([]*appsv1.ControllerRevision, 0) + for _, revision := range crs.Items { + for _, owner := range revision.OwnerReferences { + if owner.Kind == o.Kind && owner.Name == o.Name { + revisions = append(revisions, &revision) + break + } + } + } + + if len(revisions) < 2 { + return nil, fmt.Errorf("no previous available controllerrevision found for workload %s/%s", o.Kind, o.Name) + } + + sort.Slice(revisions, func(i, j int) bool { + return revisions[i].Revision > revisions[j].Revision + }) + + for idx, revision := range revisions { + if revision.Name == o.Status.StableRevision { + if o.Status.StableRevision != o.Status.UpdatedRevision { + return revision, nil + } else { + if idx+1 > len(revisions)-1 { + return nil, fmt.Errorf("no previous available controllerrevision found for workload %s/%s", o.Kind, o.Name) + } else { + return revisions[idx+1], nil + } + } + } + } + + return nil, fmt.Errorf("no previous available controllerrevision found for workload %s/%s", o.Kind, o.Name) +} + func IsWaitingRollout(info Info) bool { if len(info.Status.StableRevision) != 0 && info.Status.StableRevision != info.Status.UpdatedRevision && diff --git a/pkg/workload/interface.go b/pkg/workload/interface.go index 5bead8b..7a0db06 100644 --- a/pkg/workload/interface.go +++ b/pkg/workload/interface.go @@ -57,6 +57,17 @@ type CanaryReleaseControl interface { ApplyCanaryPatch(canary client.Object, podTemplatePatch *v1alpha1.MetadataPatch) error } +type RollbackReleaseControl interface { + // Rollbackable indicates whether this workload type can be rollbacked. + Rollbackable() bool + // RollbackPreCheck checks object before rollback release. + RollbackPreCheck(obj client.Object) error + // Revert the workload revision to the stable. + RevertRevision(ctx context.Context, c client.Client, obj client.Object) error + // ApplyPartition use expectedUpdated replicas to calculate partition and apply it to the workload. + ApplyPartition(obj client.Object, expectedUpdatedReplicas int32) error +} + type ReplicaObjectControl interface { // RepliceType returns the type of replica object ReplicaType() schema.GroupVersionKind diff --git a/pkg/workload/statefulset/release.go b/pkg/workload/statefulset/release.go index 61f3748..9e258bb 100644 --- a/pkg/workload/statefulset/release.go +++ b/pkg/workload/statefulset/release.go @@ -17,10 +17,13 @@ package statefulset import ( + "context" + "encoding/json" "fmt" "maps" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/utils/ptr" rolloutv1alpha1 "kusionstack.io/kube-api/rollout/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -29,8 +32,9 @@ import ( ) var ( - _ workload.CanaryReleaseControl = &accessorImpl{} - _ workload.BatchReleaseControl = &accessorImpl{} + _ workload.CanaryReleaseControl = &accessorImpl{} + _ workload.BatchReleaseControl = &accessorImpl{} + _ workload.RollbackReleaseControl = &accessorImpl{} ) func (c *accessorImpl) BatchPreCheck(object client.Object) error { @@ -94,6 +98,49 @@ func (c *accessorImpl) ApplyCanaryPatch(object client.Object, podTemplatePatch * return nil } +func (c *accessorImpl) Rollbackable() bool { + return true +} + +func (c *accessorImpl) RollbackPreCheck(object client.Object) error { + obj, err := checkObj(object) + if err != nil { + return err + } + if obj.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType { + return fmt.Errorf("rollout can not upgrade partition of StatefulSet in rollback if the upgrade strategy type is not RollingUpdate") + } + return nil +} + +func (c *accessorImpl) RevertRevision(ctx context.Context, cc client.Client, object client.Object) error { + obj, err := checkObj(object) + if err != nil { + return err + } + + workloadInfo := workload.NewInfo(workload.GetClusterFromLabel(obj.GetLabels()), GVK, obj, c.getStatus(obj)) + lastRevision, err := workloadInfo.GetPreviousRevision(ctx, cc, nil) + if err != nil { + return err + } + + var oldTemplate corev1.PodTemplateSpec + if err := json.Unmarshal(lastRevision.Data.Raw, &oldTemplate); err != nil { + return fmt.Errorf("failed to unmarshal old statefulset template: %w", err) + } + + // 更新 StatefulSet.spec.template + obj.Spec.Template = oldTemplate + + // partition设置为副本数 + obj.Spec.UpdateStrategy.RollingUpdate = &appsv1.RollingUpdateStatefulSetStrategy{ + Partition: obj.Spec.Replicas, + } + + return nil +} + func applyPodTemplateMetadataPatch(obj *appsv1.StatefulSet, patch *rolloutv1alpha1.MetadataPatch) { if patch == nil { return