Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 58 additions & 18 deletions images/virtualization-artifact/pkg/controller/gc/cron_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/robfig/cron/v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
Expand All @@ -33,33 +34,41 @@ import (
"github.com/deckhouse/virtualization-controller/pkg/logger"
)

/**
CronSource is an implementation of the controller-runtime Source interface.
It periodically triggers and emit events for a list of objects.

The component is independent of kubernetes client: developer should implement
ObjectLister interface that CronSource will use to determine what to enqueue on trigger.

NewSingleObjectLister can be used if the main objective is to get periodical event,
but specific namespace and name are not important. Also, for this situation
object name can be used to distinguish cron trigger from the kubernetes trigger.
*/

var _ source.Source = &CronSource{}

const sourceName = "CronSource"

type SourceGCManager interface {
ListForDelete(ctx context.Context, now time.Time) ([]client.Object, error)
}

func NewCronSource(scheduleSpec string, mgr SourceGCManager, log *log.Logger) (*CronSource, error) {
func NewCronSource(scheduleSpec string, objLister ObjectLister, log *log.Logger) (*CronSource, error) {
schedule, err := cron.ParseStandard(scheduleSpec)
if err != nil {
return nil, fmt.Errorf("parsing standard spec %q: %w", scheduleSpec, err)
}

return &CronSource{
schedule: schedule,
mgr: mgr,
log: log.With("WatchSource", sourceName),
clock: &clock.RealClock{},
schedule: schedule,
objLister: objLister,
log: log.With("watchSource", sourceName),
clock: &clock.RealClock{},
}, nil
}

type CronSource struct {
schedule cron.Schedule
mgr SourceGCManager
log *log.Logger
clock clock.Clock
schedule cron.Schedule
objLister ObjectLister
log *log.Logger
clock clock.Clock
}

func (c *CronSource) Start(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
Expand All @@ -70,28 +79,29 @@ func (c *CronSource) Start(ctx context.Context, queue workqueue.TypedRateLimitin
case <-ctx.Done():
return
case <-c.clock.After(nextTime):
c.addObjects(ctx, queue.Add)
c.enqueueObjects(ctx, queue.Add)
nextTime = nextScheduleTimeDuration(c.schedule, c.clock.Now())
}
}
}()
return nil
}

func (c *CronSource) addObjects(ctx context.Context, addToQueue func(reconcile.Request)) {
objs, err := c.mgr.ListForDelete(ctx, c.clock.Now())
func (c *CronSource) enqueueObjects(ctx context.Context, queueAddFunc func(reconcile.Request)) {
now := c.clock.Now()
objs, err := c.objLister.List(ctx, now)
if err != nil {
c.log.Error("Failed to get ObjectList for delete", logger.SlogErr(err))
return
}

if len(objs) == 0 {
c.log.Debug("No resources, skip")
c.log.Debug(fmt.Sprintf("No resources at %s, skip queueing", now))
return
}

for _, obj := range objs {
addToQueue(reconcile.Request{
queueAddFunc(reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
Expand All @@ -104,3 +114,33 @@ func (c *CronSource) addObjects(ctx context.Context, addToQueue func(reconcile.R
func nextScheduleTimeDuration(schedule cron.Schedule, now time.Time) time.Duration {
return schedule.Next(now).Sub(now)
}

type ObjectLister interface {
List(ctx context.Context, now time.Time) ([]client.Object, error)
}

type ObjectListerImpl struct {
ListFunc func(ctx context.Context, now time.Time) ([]client.Object, error)
}

func (o *ObjectListerImpl) List(ctx context.Context, now time.Time) ([]client.Object, error) {
if o.ListFunc == nil {
return nil, nil
}
return o.ListFunc(ctx, now)
}

func NewObjectLister(listFunc func(ctx context.Context, now time.Time) ([]client.Object, error)) *ObjectListerImpl {
return &ObjectListerImpl{listFunc}
}

func NewSingleObjectLister(namespace, name string) *ObjectListerImpl {
return &ObjectListerImpl{ListFunc: func(ctx context.Context, now time.Time) ([]client.Object, error) {
return []client.Object{&metav1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: name,
},
}}, nil
}}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

var _ = Describe("CronSource", func() {
const (
// Every day a 00:00
// Every day at 00:00
scheduleSpec = "0 0 * * *"
)

Expand All @@ -58,7 +58,7 @@ var _ = Describe("CronSource", func() {
})

newSource := func(scheduleSpec string) *CronSource {
source, err := NewCronSource(scheduleSpec, mgr, log)
source, err := NewCronSource(scheduleSpec, NewObjectLister(mgr.ListForDelete), log)
Expect(err).NotTo(HaveOccurred())
source.clock = fakeClock
return source
Expand Down
5 changes: 1 addition & 4 deletions images/virtualization-artifact/pkg/controller/gc/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,7 @@ const (
fakeObjectPhaseCompleted = "Completed"
)

var (
_ SourceGCManager = &fakeGCManager{}
_ ReconcileGCManager = &fakeGCManager{}
)
var _ ReconcileGCManager = &fakeGCManager{}

func newFakeGCManager(client client.Client, ttl time.Duration, max int) *fakeGCManager {
if ttl == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ limitations under the License.
package gc

import (
"context"
"time"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/deckhouse/deckhouse/pkg/log"
"github.com/deckhouse/virtualization-controller/pkg/logger"
Expand All @@ -28,22 +30,29 @@ import (
type ReconcileGCManager interface {
New() client.Object
ShouldBeDeleted(obj client.Object) bool
ListForDelete(ctx context.Context, now time.Time) ([]client.Object, error)
}

func SetupGcController(
controllerName string,
mgr manager.Manager,
log *log.Logger,
watchSource source.Source,
schedule string,
gcMgr ReconcileGCManager,
) error {
log = log.With(logger.SlogController(controllerName))

cronSource, err := NewCronSource(schedule, NewObjectLister(gcMgr.ListForDelete), log)
if err != nil {
return err
}

reconciler := NewReconciler(mgr.GetClient(),
watchSource,
cronSource,
gcMgr,
)

err := reconciler.SetupWithManager(controllerName, mgr, log)
err = reconciler.SetupWithManager(controllerName, mgr, log)
if err != nil {
return err
}
Expand Down
10 changes: 4 additions & 6 deletions images/virtualization-artifact/pkg/controller/vm/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,11 @@ func SetupGC(
) error {
mgrClient := mgr.GetClient()
vmimGCMgr := newVMIMGCManager(mgrClient, gcSettings.TTL.Duration, 10)
source, err := gc.NewCronSource(gcSettings.Schedule, vmimGCMgr, log.With("resource", "vmi-migration"))
if err != nil {
return err
}

return gc.SetupGcController(gcVMMigrationControllerName,
mgr,
log,
source,
log.With("resource", "vmi-migration"),
gcSettings.Schedule,
vmimGCMgr,
)
}
Expand All @@ -65,6 +61,8 @@ func newVMIMGCManager(client client.Client, ttl time.Duration, max int) *vmimGCM
}
}

var _ gc.ReconcileGCManager = &vmimGCManager{}

type vmimGCManager struct {
client client.Client
ttl time.Duration
Expand Down
13 changes: 3 additions & 10 deletions images/virtualization-artifact/pkg/controller/vmop/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,11 @@ const gcControllerName = "vmop-gc-controller"

func SetupGC(mgr manager.Manager, log *log.Logger, gcSettings config.BaseGcSettings) error {
vmopGCMgr := newVMOPGCManager(mgr.GetClient(), gcSettings.TTL.Duration, 10)
source, err := gc.NewCronSource(gcSettings.Schedule, vmopGCMgr, log.With("resource", "vmop"))
if err != nil {
return err
}

return gc.SetupGcController(gcControllerName,
mgr,
log,
source,
log.With("resource", "vmop"),
gcSettings.Schedule,
vmopGCMgr,
)
}
Expand All @@ -61,10 +57,7 @@ func newVMOPGCManager(client client.Client, ttl time.Duration, max int) *vmopGCM
}
}

var (
_ gc.ReconcileGCManager = &vmopGCManager{}
_ gc.SourceGCManager = &vmopGCManager{}
)
var _ gc.ReconcileGCManager = &vmopGCManager{}

type vmopGCManager struct {
client client.Client
Expand Down
Loading