delta

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2025 License: Apache-2.0 Imports: 23 Imported by: 0

README

delta

License Contributor Covenant CI

Delta is a robust high-performance resource synchronization system for Go and Postgres.

Overview

In robotics and automation, a control loop is a non-terminating loop that regulates the state of a system.

In Delta, controllers are control loops that watch the state of your database resources, check for state changes externally, and then make or request changes where needed. Each controller tries to move the current database state closer to the desired state.

Controller Pattern

A controller tracks exactly one Delta resource type. These objects are defined as structs that implement the Object interface. The controller(s) for that resource are responsible for making the current state come closer to that desired state.

Desired versus current state

Your database or external systems could be changing at any point as work happens and control loops automatically fix failures and discrepancies. This means that, potentially, your database never reaches a stable state.

As long as the controllers for your database are running and able to make useful changes, it doesn't matter if the overall state is stable or not.

Resources

Resources are defined via an struct that implements the Object interface:

type User struct {
    Email string
    Name  string
    UpdatedAt int64
}

func (u *User) ID() string {
    // The Email uniquely identifies the user on the external system.
    return e.Email
}

func (u *User) Kind() string {
    return "user"
}

func (u *User) Compare(other delta.Object) (int, bool) {
    user, ok := other.(User)
    if !ok {
        return 0, false
    }
    if u.Email != user.Email {
        return 0, false
    }
    return u.UpdatedAt - user.UpdatedAt, true
}

Controller

Controllers are defined via a struct that implement the Worker and Informer interfaces:

type UserController struct {
    // An embedded WorkerDefaults sets up default methods to fulfill the rest of
    // the Worker interface:
    delta.WorkerDefaults[User]
    // An embedded StreamDefaults sets up default methods to fulfill the rest of
    // the Stream interface:
    delta.InformerDefaults[User]
}

// Work does the heavy lifting of processing a resource.
func (c *UserController) Work(ctx context.Context, resource *delta.Resource[User]) error {
    fmt.Printf("Worked user: %+v\n", resource.Object.Email)
    return nil
}

// Inform pushes resources into a channel for processing.
func (c *UserController) Inform(ctx context.Context, opts *delta.InformOptions) (<-chan User, error) {
    userChan := make(chan User)
    resp, _ := http.DefaultClient.Get("https://api.example.com/users", nil)
    defer resp.Body.Close()

    var users []User
    if err := json.NewDecoder(resp.Body).Decode(&users); err != nil {
        return err
    }

    go func() {
        defer close(userChan)
        for _, user := range users {
            if !c.Match(&user) {
                continue
            }
            select {
            case <-ctx.Done():
                return ctx.Err()
            case userChan <- user:
            }
        }
    }()

    return userChan, nil
}
Registering controllers

Resources are uniquely identified by their kind string. Controllers are registered on start up so that Delta knows how to assign resources to controllers:

controllers := delta.NewControllers()
// AddWorker panics if the controller is already registered or invalid:
delta.AddController(controllers, &UserController{})

Informing resources

[Client.InformTx] is used in conjunction with an instance implementation of Resource to inform a Delta Controller about a resource object:

_, err = deltaClient.InformTx(ctx, tx, User{
    Email: "bob@hello.com",
    Name:  "Bob",
    UpdatedAt: time.Now().Unix(),
}, nil)

if err != nil {
    panic(err)
}

Starting a client

A Delta [Client] provides an interface for resource synchronization and background job processing. A client's created with a database pool and config struct containing a Controllers bundle and other settings. Here's a client Client working one namespace ("default") with up to 10 controller goroutines at a time:

deltaClient, err := delta.NewClient(dbPool, &delta.Config{
    Namespaces: map[string]delta.NamespaceConfig{
        delta.NamespaceDefault: {MaxWorkers: 10},
    },
    Controllers: controllers,
})
if err != nil {
    panic(err)
}

// Run the client inline. All executed processes will inherit from ctx:
if err := deltaClient.Start(ctx); err != nil {
    panic(err)
}

Stopping

The client should also be stopped on program shutdown:

// Stop fetching new work and wait for active jobs to finish.
if err := deltaClient.Stop(ctx); err != nil {
    panic(err)
}

There are some complexities around ensuring clients stop cleanly, but also in a timely manner.

Control Plane Components

The control plane's components make global decisions about how Delta manages resources (for example, scheduling and executing jobs) as well as detecting and responding to events.

postgres

Persisted state is stored in a Postgres database.

river

Control plane component that manages, queues, and schedules execution of background jobs. River relies on Postgres to store job and queue state, and also embeds a leader election mechanism to manage periodic maintenance of Delta's control plane.

You can learn more about River in the official documentation.

Architecture

The Delta architecture is designed to be modular and extensible. Below you will a high-level diagram of the Delta architecture and how an application interacts with the Delta system.

Architecture

Getting Started

To get started with Delta, you can follow the Getting Started Guide.

Contributing

Please read the Contributing Guide for details on our code of conduct, and the process for submitting pull requests to us.

Acknowledgements

This project uses the following dependencies:

  • River, which is licensed under the Mozilla Public License 2.0 (MPL-2.0).
  • pgx, which is licensed under the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddController

func AddController[T Object](controllers *Controllers, controller Controller[T])

AddController registers a Controller on the provided Controllers bundle. Each Controller must be registered so that the Client knows it should handle a specific kind of resource (as returned by its `Kind()` method).

Use by explicitly specifying a Object type and then passing an instance of a controller for the same type:

delta.AddController(controllers, &UserController{})

Note that AddController can panic in some situations, such as if the controller is already registered or if its configuration is otherwise invalid. This default probably makes sense for most applications because you wouldn't want to start an application with invalid hardcoded runtime configuration. If you want to avoid panics, use AddControllerSafely instead.

func AddControllerSafely

func AddControllerSafely[T Object](controllers *Controllers, controller Controller[T]) error

AddControllerSafely registers a controller on the provided Controllers bundle. Unlike AddController, AddControllerSafely does not panic and instead returns an error if the controller is already registered or if its configuration is invalid.

Use by explicitly specifying a Object type and then passing an instance of a controller for the same type:

delta.AddControllerSafely[User](controllers, &UserController{}).

func ResourceDelete

func ResourceDelete(err error) error

ResourceDelete wraps err and can be returned from a Controller's Work method to indicate that the resource should be deleted.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a single isolated instance of Delta. Your application may use multiple instances operating on different databases or Postgres schemas within a single database.

func ClientFromContext

func ClientFromContext(ctx context.Context) *Client

ClientFromContext returns the Client from the context. This function can only be used within a Controller's Work() or Inform() methods because that is the only place Delta sets the Client on the context.

It panics if the context does not contain a Client, which will never happen from the context provided to a Controller's Work() or Inform() methods.

func ClientFromContextSafely

func ClientFromContextSafely(ctx context.Context) (*Client, error)

ClientFromContext returns the Client from the context. This function can only be used within a Controller's Work() or Inform() methods because that is the only place Delta sets the Client on the context.

It returns an error if the context does not contain a Client, which will never happen from the context provided to a Controller's Work() or Inform() methods.

See the examples for ClientFromContext to understand how to use this function.

func NewClient

func NewClient(dbPool *pgxpool.Pool, config *Config) (*Client, error)

func (*Client) Inform

func (c *Client) Inform(ctx context.Context, object Object, opts *InformOpts) (*deltatype.ObjectInformResult, error)

Inform the Delta system of an object.

func (*Client) InformTx

func (c *Client) InformTx(ctx context.Context, tx pgx.Tx, object Object, opts *InformOpts) (*deltatype.ObjectInformResult, error)

InformTx is the same as Inform but allows you to pass in a transaction.

func (*Client) Invalidate

func (c *Client) Invalidate(ctx context.Context, object Object) (*deltatype.ResourceRow, error)

Invalidate marks a resource as expired. This will cause the resource to be re-enqueued for processing/syncing. Normally, this is done automatically by the expirer maintenance job if the resource exists within a namespace that has an expiry ttl.

If the resource does not exist within a namespace that has an expiry ttl, then you must manually call Invalidate to re-enqueue the resource.

This is useful if you want to re-enqueue a resource that was previously synced, but should be re-processed for some reason.

func (*Client) ScheduleInform

func (c *Client) ScheduleInform(ctx context.Context, params ScheduleInformParams, informOpts *InformOptions) error

ScheduleInform schedules an inform job for a controller to sync resources.

func (*Client) Start

func (c *Client) Start(ctx context.Context) error

func (*Client) Stop

func (c *Client) Stop(ctx context.Context) error

func (*Client) Subscribe

func (c *Client) Subscribe(cateogories ...EventCategory) (<-chan Event, func())

Subscribe subscribes to the provided categories of object events that occur within the client, like ObjectCreated for when an object is created.

Returns a channel over which to receive events along with a cancel function that can be used to cancel and tear down resources associated with the subscription. It's recommended but not necessary to invoke the cancel function. Resources will be freed when the client stops in case it's not.

The event channel is buffered and sends on it are non-blocking. Consumers must process events in a timely manner or it's possible for events to be dropped. Any slow operations performed in a response to a receipt (e.g. persisting to a database) should be made asynchronous to avoid event loss.

Callers must specify the categories of events they're interested in. This allows for forward compatibility in case new categories of events are added in future versions. If new event categories are added, callers will have to explicitly add them to their requested list and ensure they can be handled correctly.

type ComparableObject

type ComparableObject interface {
	Object
	Compare(other Object) int
}

ComparableObject is an interface that combines Object with a method for comparison.

type Config

type Config struct {
	// Logger is the structured logger to use for logging purposes. If none is
	// specified, logs will be emitted to STDOUT with messages at warn level
	// or higher.
	Logger *slog.Logger

	// Controllers is a bundle of registered resource controllers.
	//
	// This field may be omitted for a program that's only informing resources
	// rather than working them, but if it is configured the client can validate
	// ahead of time that a controller is properly registered for an inserted resource.
	// (i.e.  That it wasn't forgotten by accident.)
	Controllers *Controllers

	// Namespaces is a list of namespaces for this client to operate on along
	// with configuration for each namespace.
	//
	// This field may be omitted for a program that's only informing resources rather
	// than managing them. If it's specified, then Controllers must also be given.
	Namespaces map[string]NamespaceConfig

	// ResourceWorkTimeout is the maximum amount of time a controlle worker is allowed to run before its
	// context is cancelled. A timeout of zero means ResourceWorkTimeout will be
	// used, whereas a value of -1 means the controller's context will not be cancelled
	// unless the Client is shutting down.
	//
	// Defaults to 1 minute.
	ResourceWorkTimeout time.Duration

	// ControllerInformTimeout is the maximum amount of time a controller informer is allowed to run before its
	// context is cancelled. A timeout of zero means ControllerInformTimeout will be
	// used, whereas a value of -1 means the controller's context will not be cancelled
	// unless the Client is shutting down.
	//
	// Defaults to 1 minutes.
	ControllerInformTimeout time.Duration

	// MaintenanceJobInterval is the interval at which the maintenance jobs
	// will run.
	//
	// Defaults to 1 minute.
	MaintenanceJobInterval time.Duration

	// ResourceInformerInterval is the interval at which the resource informer
	// will run. If this is 0, the default inform interval of 1 hour is used.
	//
	// If this is < 0, the resource informers will be disabled.
	//
	// Defaults to 1 hour.
	ResourceInformInterval time.Duration

	// ResourceCleanerTimeout is the timeout of the individual queries within the
	// resource cleaner.
	//
	// Defaults to 30 seconds, which should be more than enough time for most
	// deployments.
	ResourceCleanerTimeout time.Duration

	// DeletedResourceRetentionPeriod is the amount of time to keep deleted resources
	// around before they're removed permanently.
	//
	// Defaults to 24 hours.
	DeletedResourceRetentionPeriod time.Duration

	// SyncedResourceRetentionPeriod is the amount of time to keep synced resources
	// around before they're removed permanently.
	//
	// Defaults to 24 hours.
	SyncedResourceRetentionPeriod time.Duration

	// DegradedResourceRetentionPeriod is the amount of time to keep degraded resources
	// around before they're removed permanently.
	//
	// Defaults to 24 hours.
	DegradedResourceRetentionPeriod time.Duration
}

Config is the configuration for a Client.

type Controller

type Controller[T Object] interface {
	Informer[T]
	Worker[T]
}

Controller is an interface that can manage a resource with object of type T.

type Controllers

type Controllers struct {
	// contains filtered or unexported fields
}

Controllers is a list of available resource controllers. A Controller must be registered for each type of Resource to be handled.

Use the top-level AddController function combined with a Controllers to register a controller.

func NewControllers

func NewControllers() *Controllers

NewControllers initializes a new registry of available resource controllers.

Use the top-level AddController function combined with a Controllers registry to register each available controller.

type Event

type Event struct {
	// Resource is the object for the event.
	Resource *deltatype.ResourceRow
	// EventCategory returns the type of event that occurred.
	EventCategory EventCategory
	// Timestamp is the time the event occurred.
	Timestamp time.Time
}

Event is a record of an action that occurred on an object.

type EventCategory

type EventCategory string
const (
	// EventCategoryObjectSynced occurs when a resource is synced.
	// Callers can use the resource object fields like `Attempt` and `CreatedAt`
	// to differentiate if the resource was created or updated.
	EventCategoryObjectSynced EventCategory = "object_synced"

	// EventCategoryObjectFailed occurs when a resource fails to sync.
	// Occurs both when a resource work fails and will be retried and when
	// a resource fails for the last time and is marked as degraded.
	// Callers can use the resource object fields like `Attempt` and `State`
	// to differentiate each type of occurrence.
	EventCategoryObjectFailed EventCategory = "object_failed"

	// EventCategoryObjectDeleted occurs when a resource is deleted.
	EventCategoryObjectDeleted EventCategory = "object_deleted"
)

type InformArgs

type InformArgs[T Object] struct {
	// ResourceKind is the kind of resource to inform
	// Required parameter
	ResourceKind string
	// ProcessExisting is used to determine if the informer should process existing resources
	// The informer checks existence based on object.Compare() or hash comparison
	// Defaults to false (skip existing)
	ProcessExisting bool
	// RunForeground is used to determine if the informer should run the work in the foreground or background
	// Defaults to false (background)
	// TODO: implement this
	RunForeground bool
	// Options are optional settings for filtering resources during inform.
	Options *InformOptions
	// contains filtered or unexported fields
}

func (InformArgs[T]) InsertOpts

func (i InformArgs[T]) InsertOpts() river.InsertOpts

func (InformArgs[T]) Kind

func (i InformArgs[T]) Kind() string

type InformOptions

type InformOptions struct {
	// Labels is a map of key-value pairs that can be used to filter resources
	Labels map[string]string
	// After allows filtering resources based on a specific time.
	// If specified, informers should inform resources that have been created or updated
	// after the specified time.
	//
	// Note that while the logic for which datetime field that this is used against is dependent
	// on the external system's API and the expected behavior of the resource/informer.
	// This should always be treated as exclusive date/time meaning the resources created or updated
	// at the exact time specified should not be included in the inform results.
	//
	// Note that this is mostly useful as a performance optimization to help keep periodic informer
	// jobs from re-informing entire datasets.
	After *time.Time
	// Limit sets the maximum number of resources to return (0 means no limit)
	Limit int
	// OrderBy specifies the field and direction for sorting (e.g., "created_at DESC")
	OrderBy string
}

InformOptions are optional settings for filtering resources during inform. These are kept purposefully generic and it's up to the application to define what they mean.

type InformOpts

type InformOpts struct {
	// Namespace is the namespace to use for resource isolation.
	//
	// Defaults to NamespaceDefault.
	Namespace string

	// Metadata is a JSON object blob of arbitrary data that will be stored with
	// the resource. Users should not overwrite or remove anything stored in this
	// field by Delta.
	Metadata []byte

	// MaxAttempts is the maximum number of times a resource will be attempted to be
	// synced.
	//
	// Defaults to 10.
	MaxAttempts int16

	// Tags are an arbitrary list of keywords to add to the resource. They have no
	// functional behavior and are meant entirely as a user-specified construct
	// to help group and categorize resources.
	//
	// Tags should conform to the regex `\A[\w][\w\-]+[\w]\z` and be a maximum
	// of 255 characters long. No special characters are allowed.
	//
	// If tags are specified from both a resource override and from options on
	// Inform, the latter takes precedence. Tags are not merged.
	Tags []string
}

InformOpts are optional settings for a new resource which can be provided at resource information time. These will override any default InformOpts settings provided by ObjectWithInformOpts, as well as any global defaults.

type InformScheduleArgs

type InformScheduleArgs struct{}

func (InformScheduleArgs) InsertOpts

func (s InformScheduleArgs) InsertOpts() river.InsertOpts

func (InformScheduleArgs) Kind

func (s InformScheduleArgs) Kind() string

type Informer

type Informer[T Object] interface {
	// Inform informs the controller of resources.
	//
	// // It is important to respect context cancellation to enable
	// the delta client to respond to shutdown requests.
	// There is no way to cancel a running controller that does not respect
	// context cancellation, other than terminating the process.
	Inform(ctx context.Context, opts *InformOptions) (<-chan T, error)
	// InformTimeout is the maximum amount of time the inform job is allowed to run before
	// its context is cancelled. A timeout of zero (the default) means the job
	// will inherit the Client-level timeout (defaults to 1 minute).
	// A timeout of -1 means the job's context will never time out.
	InformTimeout(args *InformArgs[T]) time.Duration
}

Informer is an interface that provides a way to inform the controller of resources.

type InformerDefaults

type InformerDefaults[T Object] struct{}

InformerDefaults is an empty struct that can be embedded in your controller struct to make it fulfill the Informer interface with default values.

func (InformerDefaults[T]) InformTimeout

func (w InformerDefaults[T]) InformTimeout(*InformArgs[T]) time.Duration

Timeout returns the inform arg-specific timeout. Override this method to set a inform-specific timeout, otherwise the Client-level timeout will be applied.

type NamespaceConfig

type NamespaceConfig struct {
	// MaxWorkers is the maximum number of workers to run for the namespace, or put
	// otherwise, the maximum parallelism to run.
	//
	// This is the maximum number of workers within this particular client
	// instance, but note that it doesn't control the total number of workers
	// across parallel processes. Installations will want to calculate their
	// total number by multiplying this number by the number of parallel nodes
	// running River clients configured to the same database and queue.
	//
	// Requires a minimum of 1, and a maximum of 10,000.
	MaxWorkers int

	// ResourceExpiry is the duration (seconds) after which a previously
	// synced resource is considered expired.
	//
	// This can be used to re-sync resources that may change or update in the
	// external system that may not be automatically re-informed based on a
	// creation time After filter. For immutable resources, this should NOT be set
	// to prevent unnecessary work for objects that will never change.
	//
	// If this is set to 0, resources within the namespace will never expire.
	// Defaults to 0 (resources never expire).
	ResourceExpiry time.Duration

	// DeletedResourceRetentionPeriod is the amount of time to keep deleted resources
	// around before they're removed permanently.
	//
	// Defaults to 24 hours.
	DeletedResourceRetentionPeriod time.Duration

	// SyncedResourceRetentionPeriod is the amount of time to keep synced resources
	// around before they're removed permanently.
	// This value is only respected if ResourceExpiry is set to a zero value.
	//
	// Defaults to 24 hours.
	SyncedResourceRetentionPeriod time.Duration

	// DegradedResourceRetentionPeriod is the amount of time to keep degraded resources
	// around before they're removed permanently.
	//
	// Defaults to 24 hours.
	DegradedResourceRetentionPeriod time.Duration
}

NamespaceConfig contains namespace-specific configuration.

type Object

type Object interface {
	// ID is a string that uniquely identifies the object.
	ID() string

	// Kind is a string that uniquely identifies the type of resource. This must be
	// provided on your resource object struct.
	Kind() string
}

Object is an interface that represents the objects for a resource of type T. This object is serialized into JSON and stored in the database.

The struct is serialized using `encoding/json`. All exported fields are serialized, unless skipped with a struct field tag.

type ObjectSettings

type ObjectSettings struct {
	// Parallelism is the number of concurrent workers that can be run for this
	// kind of resource. If this is 0, the default parallelism of 1 is used.
	//
	// Must be a zero or positive integer less than or equal to 10,000.
	Parallelism int

	// InformInterval is the interval at which the informer should run for this
	// kind of resource. If this is 0, the client's config inform interval is used.
	//
	// If < 0, the informer is disabled.
	InformInterval time.Duration
}

type ObjectWithCreatedAt

type ObjectWithCreatedAt interface {
	// CreatedAt returns the time the object was created.
	// This is used to determine the ordering of resources
	// and ensure periodic informers have 100% coverage of resources from
	// the controller's previous inform interval.
	CreatedAt() time.Time
}

ObjectWithCreatedAt is an extra interface that a resource may implement on top of Object to provide a created at time as reported by an external system. It is strongly recommended to implement this interface for all resources.

This is important to prevent race conditions where a resource is created after the informer has finished but before the controller has updated its last inform time.

If this is not implemented, the current time will be used to track controller inform periods which is subject to Delta potentially excluding resources from being informed.

type ObjectWithInformOpts

type ObjectWithInformOpts interface {
	// InformOpts returns options for all resources of this job type, overriding any
	// system defaults. These can also be overridden at inform time.
	InformOpts() InformOpts
}

ObjectWithInformArgs is an extra interface that a resource may implement on top of Object to provide inform-time options for all resources of this type.

type ObjectWithSettings

type ObjectWithSettings interface {
	// Behavior returns the settings for this object.
	Settings() ObjectSettings
}

ObjectWithSettings is an extra interface that a resource may implement on top of Object to provide settings for all resources of this type.

type RescheduleResourceArgs

type RescheduleResourceArgs struct{}

func (RescheduleResourceArgs) InsertOpts

func (r RescheduleResourceArgs) InsertOpts() river.InsertOpts

func (RescheduleResourceArgs) Kind

func (r RescheduleResourceArgs) Kind() string

type Resource

type Resource[T Object] struct {
	*deltatype.ResourceRow

	// Object is the object for the resource.
	Object T
}

Resource represents a single object, holding both the object and information for a resource with object of type T.

func (Resource[T]) Kind

func (r Resource[T]) Kind() string

Kind is specific to the River package and NOT meant to be used as the Resource/Object Kind.

type ResourceDeleteError

type ResourceDeleteError struct {
	// contains filtered or unexported fields
}

ResourceDeleteError is the error type returned by ResourceDelete. It should not be initialized directly, but is returned from the ResourceDelete function and can be used for test assertions.

func (*ResourceDeleteError) Error

func (e *ResourceDeleteError) Error() string

func (*ResourceDeleteError) Is

func (e *ResourceDeleteError) Is(target error) bool

func (*ResourceDeleteError) Unwrap

func (e *ResourceDeleteError) Unwrap() error

type ScheduleArgs

type ScheduleArgs[T Object] struct {
	ResourceID int64
	// contains filtered or unexported fields
}

func (ScheduleArgs[T]) InsertOpts

func (s ScheduleArgs[T]) InsertOpts() river.InsertOpts

func (ScheduleArgs[T]) Kind

func (s ScheduleArgs[T]) Kind() string

type ScheduleInformParams

type ScheduleInformParams struct {
	ResourceKind    string
	ProcessExisting bool
	RunForeground   bool
}

type SubscribeConfig

type SubscribeConfig struct {
	// ChanSize is the size of the buffered channel that will be created for the
	// subscription. Incoming events that overall this number because a listener
	// isn't reading from the channel in a timely manner will be dropped.
	//
	// Defaults to 1000.
	ChanSize int

	// Categories are the category of object events that the subscription will receive.
	// Requiring that categories are specified explicitly allows for forward
	// compatibility in case new kinds of events are added in future versions.
	// If new event categories are added, callers will have to explicitly add them to
	// their requested list and ensure they can be handled correctly.
	Categories []EventCategory
}

SubscribeConfig is more thorough subscription configuration used for Client.SubscribeConfig.

type Worker

type Worker[T Object] interface {
	// Work performs the work on the resource object. This method must be idempotent.
	// The context will be configured with a timeout according to the worker settings
	// and may be canceled for other reasons.
	//
	// If no error is returned, the resource will be marked as synced.
	//
	// It is important to respect context cancellation to enable
	// the delta client to respond to shutdown requests.
	// There is no way to cancel a running resource that does not respect
	// context cancellation, other than terminating the process.
	Work(ctx context.Context, resource *Resource[T]) error
	// ResourceTimeout is the maximum amount of time the resource is allowed to be worked before
	// its context is cancelled. A timeout of zero (the default) means the job
	// will inherit the Client-level timeout (defaults to 1 minute).
	// A timeout of -1 means the job's context will never time out.
	ResourceTimeout(resource *Resource[T]) time.Duration
}

Worker is an interface that can perform work on a resource object of type T. The Worker interface is one of two interfaces that all Controller[T]s must implement.

type WorkerDefaults

type WorkerDefaults[T Object] struct{}

WorkerDefaults is an empty struct that can be embedded in your controller struct to make it fulfill the Worker interface with default values.

func (WorkerDefaults[T]) ResourceTimeout

func (w WorkerDefaults[T]) ResourceTimeout(*Resource[T]) time.Duration

ResourceTimeout returns the resource-specific timeout. Override this method to set a resource-specific timeout, otherwise the Client-level timeout will be applied.

Directories

Path Synopsis
examples
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL