kreconciler

package module
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2024 License: MIT Imports: 12 Imported by: 0

README

kreconciler

Go Report Card Go Reference Release

A library to build control-loops for things other than Kubernetes.

Principle

Kubernetes operators are amazing for building reliable operational tooling.

Unfortunately as its name points out it is specific to Kubernetes. This library brings a simple way to build reconcilers which is the core of an operator. It runs a loop that for each event coming in will trigger the control-loop.

Its core goals are:

  1. Remain simple, caching, resync are not meant to be builtins because they are hard to be generic.
  2. Observability is important so it's instrumented with opentelemetry.
  3. Keep the number of dependencies low.

The reason why 2 is so important is that testing an operator is incredibly complicated. It's therefore necessary to make execution of the reconciler as observable as possible to be able to figure out issues.

Who uses it

At Koyeb we've built reconcilers that helps keeping our models in sync with Hashicorp Nomad and Kuma.

Contributing

See Contributing.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultHasher = WorkerHasherFunc(func(_ context.Context, id string, count int) (int, error) {
	if count == 1 {
		return 0, nil
	}
	algorithm := fnv.New32a()
	algorithm.Write([]byte(id))
	return int(algorithm.Sum32() % uint32(count)), nil
})

DefaultHasher a WorkerHasher which hashes the id and return `hash % count`.

View Source
var NoopStream = EventStreamFunc(func(ctx context.Context, handler EventHandler) error {
	<-ctx.Done()
	return nil
})

NoopStream a stream that does nothing

Functions

This section is empty.

Types

type Config

type Config struct {
	// MaxItemRetries the number of times an item gets retried before dropping it
	MaxItemRetries int
	// WorkerQueueSize the size of the worker queue (outstanding reconciles)
	WorkerQueueSize int
	// WorkerHasher the function to assign work between workers
	WorkerHasher WorkerHasher
	// WorkerCount the number of workers
	WorkerCount int
	// LeaderElectionEnabled whether or not we should use
	LeaderElectionEnabled bool
	// DelayResolution the lowest possible time for a delay retry
	DelayResolution time.Duration
	// DelayQueueSize the maximum number of items in the scheduled delay queue
	DelayQueueSize int
	// MaxReconcileTime the maximum time a handle of an item should take
	MaxReconcileTime time.Duration
	// Observability configuration for logs, metrics and traces
	Observability Observability
}

Config use to configure a controller.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig a good set of configuration to get started

type Controller

type Controller interface {
	// Run execute the control-loop until the context is cancelled
	Run(ctx context.Context) error
	// BecomeLeader notify that this controller is now leader and that it should start the control-loop
	BecomeLeader()
}

Controller the core interface to define a control-loop

func New

func New(config Config, reconciler Reconciler, streams map[string]EventStream) Controller

New create a new controller

type Error

type Error interface {
	error
	// RetryDelay how long to wait before adding back in the queue
	RetryDelay() time.Duration
}

Error an error that has a custom retry delay.

type EventHandler

type EventHandler interface {
	Call(ctx context.Context, jobId string) error
}

EventHandler called whenever an event is triggered

func MeteredEventHandler

func MeteredEventHandler(meter metric.Meter, name string, child EventHandler) (EventHandler, error)

MeteredEventHandler adds metrics any event reconciler

type EventHandlerFunc

type EventHandlerFunc func(ctx context.Context, jobId string) error

EventHandlerFunc see EventHandler

func (EventHandlerFunc) Call

func (f EventHandlerFunc) Call(ctx context.Context, jobId string) error

Call calls f(ctx, jobId).

type EventStream

type EventStream interface {
	Subscribe(ctx context.Context, handler EventHandler) error
}

EventStream calls `reconciler` whenever a new event is triggered. Examples of EventStreams are: "KafkaConsumers", "PubSub systems", "Nomad event stream". It's usually a way to signal that an external change happened and that we should rerun the control loop for the element with a given id.

func ResyncLoopEventStream

func ResyncLoopEventStream(obs Observability, duration time.Duration, listFn func(ctx context.Context) ([]string, error)) (EventStream, error)

ResyncLoopEventStream an EventStream that calls `listFn` every `duration` interval. This is used for rerunning the control-loop for all entities periodically. Having one of these is recommended for any controller.

type EventStreamFunc

type EventStreamFunc func(ctx context.Context, handler EventHandler) error

EventStreamFunc see EventStream

func (EventStreamFunc) Subscribe

func (f EventStreamFunc) Subscribe(ctx context.Context, handler EventHandler) error

Subscribe calls f(ctx, handler)

type Logger

type Logger interface {
	// With create a new logger with fixed keys
	With(keyValues ...interface{}) Logger
	// Debug log at debug level
	Debug(msg string, keyValues ...interface{})
	// Info log at debug level
	Info(msg string, keyValues ...interface{})
	// Warn log at debug level
	Warn(msg string, keyValues ...interface{})
	// Error log at debug level
	Error(msg string, keyValues ...interface{})
}

Logger a wrapper for your logger implementation

type NoopLogger

type NoopLogger struct{}

NoopLogger a logger that does nothing

func (NoopLogger) Debug

func (n NoopLogger) Debug(msg string, keyValues ...interface{})

Debug noop

func (NoopLogger) Error

func (n NoopLogger) Error(msg string, keyValues ...interface{})

Error noop

func (NoopLogger) Info

func (n NoopLogger) Info(msg string, keyValues ...interface{})

Info noop

func (NoopLogger) Warn

func (n NoopLogger) Warn(msg string, keyValues ...interface{})

Warn noop

func (NoopLogger) With

func (n NoopLogger) With(keyValues ...interface{}) Logger

With return self

type Observability

type Observability struct {
	Logger
	metric.Meter
	trace.Tracer
}

Observability holds everything needed for instrumenting the reconciler code

func DefaultObservability

func DefaultObservability() Observability

DefaultObservability uses noopLogger and otel.GetMeter and otel.GetTracer

func NewObservability

func NewObservability(l Logger, m metric.MeterProvider, t trace.TracerProvider) Observability

NewObservability create a new observability wraooer (usually easier to use DefaultObservability)

func (Observability) LoggerWithCtx

func (o Observability) LoggerWithCtx(ctx context.Context) Logger

LoggerWithCtx add the tracing context to the logger

type Reconciler

type Reconciler interface {
	// Apply handle the item and potentially return an error
	Apply(ctx context.Context, id string) Result
}

Reconciler is the core implementation of the control-loop.

type ReconcilerFunc

type ReconcilerFunc func(ctx context.Context, id string) Result

ReconcilerFunc see Reconciler

func (ReconcilerFunc) Apply

func (f ReconcilerFunc) Apply(ctx context.Context, id string) Result

Apply calls f(ctx, id).

type Result

type Result struct {
	// RequeueDelay the time to wait before requeing, ignored is Error is not nil
	RequeueDelay time.Duration
	// Error the error
	Error error
}

Result a wrapper that is returned by a Reconciler.

func (Result) RequeueDelayWithDefault

func (r Result) RequeueDelayWithDefault(defaultDelay time.Duration) time.Duration

RequeueDelayWithDefault returns the requeue delay and use the default delay if the error is not a Error.

type WorkerHasher

type WorkerHasher interface {
	// Route decide on which worker this item will go (return a value < 0 to drop this item), count is the number of items
	Route(ctx context.Context, id string, count int) (int, error)
}

WorkerHasher specifies which of the control-loop workers should handle this specific item.

type WorkerHasherFunc

type WorkerHasherFunc func(ctx context.Context, id string, count int) (int, error)

WorkerHasherFunc see WorkerHasher

func (WorkerHasherFunc) Route

func (f WorkerHasherFunc) Route(ctx context.Context, id string, count int) (int, error)

Route calls f(ctx, id, count).

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL