Versions in this module Expand all Collapse all v1 v1.0.1 Feb 21, 2024 v1.0.0 Feb 21, 2024 Changes in this version + const FinalizerKey + func CalculateHoursToDelete(retentionPeriodHours, currentHourOfDay int) []string + func CalculateHoursToKeep(retentionPeriodHours int, currentTime time.Time) []string + func CompletedWorkflowsLabelSelector() *v1.LabelSelector + func CompletedWorkflowsSelectorOutsideRetentionPeriod(retentionPeriodHours int, currentTime time.Time) *v1.LabelSelector + func CreateControllerManager(ctx context.Context, cfg *config.Config, options manager.Options) (manager.Manager, error) + func DeprecatedCompletedWorkflowsSelectorOutsideRetentionPeriod(retentionPeriodHours int, currentTime time.Time) *v1.LabelSelector + func FinalizersIdentical(o1 v1.Object, o2 v1.Object) bool + func FormatTimeForLabel(currentTime time.Time) string + func HasCompletedLabel(w *v1alpha1.KozmoWorkflow) bool + func HasFinalizer(meta v1.Object) bool + func IgnoreCompletedWorkflowsLabelSelector() *v1.LabelSelector + func IsDeleted(meta v1.Object) bool + func NewWorkQueue(ctx context.Context, cfg config.WorkqueueConfig, name string) (workqueue.RateLimitingInterface, error) + func RecordSystemError(w *v1alpha1.KozmoWorkflow, err error) *v1alpha1.KozmoWorkflow + func ResetFinalizers(meta v1.Object) + func SetCompletedLabel(w *v1alpha1.KozmoWorkflow, currentTime time.Time) + func SetDefinitionVersionIfEmpty(wf *v1alpha1.KozmoWorkflow, version v1alpha1.WorkflowDefinitionVersion) + func SetFinalizerIfEmpty(meta v1.Object, finalizer string) + func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []informers.SharedInformerOption + func StartController(ctx context.Context, cfg *config.Config, defaultNamespace string, ...) error + func StartControllerManager(ctx context.Context, mgr manager.Manager) error + type BatchingWorkQueue struct + func (b *BatchingWorkQueue) AddToSubQueue(item interface{}) + func (b *BatchingWorkQueue) AddToSubQueueAfter(item interface{}, duration time.Duration) + func (b *BatchingWorkQueue) AddToSubQueueRateLimited(item interface{}) + func (b *BatchingWorkQueue) ShutdownAll() + func (b *BatchingWorkQueue) Start(ctx context.Context) + type CompositeWorkQueue interface + AddToSubQueue func(item interface{}) + AddToSubQueueAfter func(item interface{}, duration time.Duration) + AddToSubQueueRateLimited func(item interface{}) + ShutdownAll func() + Start func(ctx context.Context) + func NewCompositeWorkQueue(ctx context.Context, cfg config.CompositeQueueConfig, scope promutils.Scope) (CompositeWorkQueue, error) + type Controller struct + func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Interface, ...) (*Controller, error) + func (c *Controller) Run(ctx context.Context) error + type GarbageCollector struct + func NewGarbageCollector(cfg *config.Config, scope promutils.Scope, clk clock.WithTicker, ...) (*GarbageCollector, error) + func (g *GarbageCollector) StartGC(ctx context.Context) error + type Handler interface + Handle func(ctx context.Context, namespace, key string) error + Initialize func(ctx context.Context) error + type Propeller struct + func NewPropellerHandler(_ context.Context, cfg *config.Config, store *storage.DataStore, ...) *Propeller + func (p *Propeller) Handle(ctx context.Context, namespace, name string) error + func (p *Propeller) Initialize(ctx context.Context) error + func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.KozmoWorkflow) (*v1alpha1.KozmoWorkflow, error) + type ResourceLevelMonitor struct + CollectorTimer promutils.StopWatch + Scope promutils.Scope + func NewResourceLevelMonitor(scope promutils.Scope, lister lister.KozmoWorkflowLister) *ResourceLevelMonitor + func (r *ResourceLevelMonitor) RunCollector(ctx context.Context) + type SimpleWorkQueue struct + func (s *SimpleWorkQueue) AddToSubQueue(item interface{}) + func (s *SimpleWorkQueue) AddToSubQueueAfter(item interface{}, duration time.Duration) + func (s *SimpleWorkQueue) AddToSubQueueRateLimited(item interface{}) + func (s *SimpleWorkQueue) ShutdownAll() + func (s *SimpleWorkQueue) Start(ctx context.Context) + type WorkerPool struct + func NewWorkerPool(ctx context.Context, scope promutils.Scope, workQueue CompositeWorkQueue, ...) *WorkerPool + func (w *WorkerPool) Initialize(ctx context.Context) error + func (w *WorkerPool) Run(ctx context.Context, threadiness int, synced ...cache.InformerSynced) error