Documentation
¶
Index ¶
- func AddController[T Object](controllers *Controllers, controller Controller[T])
- func AddControllerSafely[T Object](controllers *Controllers, controller Controller[T]) error
- func ResourceDelete(err error) error
- type Client
- func (c *Client) Inform(ctx context.Context, object Object, opts *InformOpts) (*deltatype.ObjectInformResult, error)
- func (c *Client) InformTx(ctx context.Context, tx pgx.Tx, object Object, opts *InformOpts) (*deltatype.ObjectInformResult, error)
- func (c *Client) Invalidate(ctx context.Context, object Object) (*deltatype.ResourceRow, error)
- func (c *Client) ScheduleInform(ctx context.Context, params ScheduleInformParams, informOpts *InformOptions) error
- func (c *Client) Start(ctx context.Context) error
- func (c *Client) Stop(ctx context.Context) error
- func (c *Client) Subscribe(cateogories ...EventCategory) (<-chan Event, func())
- type ComparableObject
- type Config
- type Controller
- type Controllers
- type Event
- type EventCategory
- type InformArgs
- type InformOptions
- type InformOpts
- type InformScheduleArgs
- type Informer
- type InformerDefaults
- type NamespaceConfig
- type Object
- type ObjectSettings
- type ObjectWithCreatedAt
- type ObjectWithInformOpts
- type ObjectWithSettings
- type RescheduleResourceArgs
- type Resource
- type ResourceDeleteError
- type ScheduleArgs
- type ScheduleInformParams
- type SubscribeConfig
- type Worker
- type WorkerDefaults
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AddController ¶
func AddController[T Object](controllers *Controllers, controller Controller[T])
AddController registers a Controller on the provided Controllers bundle. Each Controller must be registered so that the Client knows it should handle a specific kind of resource (as returned by its `Kind()` method).
Use by explicitly specifying a Object type and then passing an instance of a controller for the same type:
delta.AddController(controllers, &UserController{})
Note that AddController can panic in some situations, such as if the controller is already registered or if its configuration is otherwise invalid. This default probably makes sense for most applications because you wouldn't want to start an application with invalid hardcoded runtime configuration. If you want to avoid panics, use AddControllerSafely instead.
func AddControllerSafely ¶
func AddControllerSafely[T Object](controllers *Controllers, controller Controller[T]) error
AddControllerSafely registers a controller on the provided Controllers bundle. Unlike AddController, AddControllerSafely does not panic and instead returns an error if the controller is already registered or if its configuration is invalid.
Use by explicitly specifying a Object type and then passing an instance of a controller for the same type:
delta.AddControllerSafely[User](controllers, &UserController{}).
func ResourceDelete ¶
ResourceDelete wraps err and can be returned from a Controller's Work method to indicate that the resource should be deleted.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a single isolated instance of Delta. Your application may use multiple instances operating on different databases or Postgres schemas within a single database.
func ClientFromContext ¶
ClientFromContext returns the Client from the context. This function can only be used within a Controller's Work() or Inform() methods because that is the only place Delta sets the Client on the context.
It panics if the context does not contain a Client, which will never happen from the context provided to a Controller's Work() or Inform() methods.
func ClientFromContextSafely ¶
ClientFromContext returns the Client from the context. This function can only be used within a Controller's Work() or Inform() methods because that is the only place Delta sets the Client on the context.
It returns an error if the context does not contain a Client, which will never happen from the context provided to a Controller's Work() or Inform() methods.
See the examples for ClientFromContext to understand how to use this function.
func (*Client) Inform ¶
func (c *Client) Inform(ctx context.Context, object Object, opts *InformOpts) (*deltatype.ObjectInformResult, error)
Inform the Delta system of an object.
func (*Client) InformTx ¶
func (c *Client) InformTx(ctx context.Context, tx pgx.Tx, object Object, opts *InformOpts) (*deltatype.ObjectInformResult, error)
InformTx is the same as Inform but allows you to pass in a transaction.
func (*Client) Invalidate ¶
Invalidate marks a resource as expired. This will cause the resource to be re-enqueued for processing/syncing. Normally, this is done automatically by the expirer maintenance job if the resource exists within a namespace that has an expiry ttl.
If the resource does not exist within a namespace that has an expiry ttl, then you must manually call Invalidate to re-enqueue the resource.
This is useful if you want to re-enqueue a resource that was previously synced, but should be re-processed for some reason.
func (*Client) ScheduleInform ¶
func (c *Client) ScheduleInform(ctx context.Context, params ScheduleInformParams, informOpts *InformOptions) error
ScheduleInform schedules an inform job for a controller to sync resources.
func (*Client) Subscribe ¶
func (c *Client) Subscribe(cateogories ...EventCategory) (<-chan Event, func())
Subscribe subscribes to the provided categories of object events that occur within the client, like ObjectCreated for when an object is created.
Returns a channel over which to receive events along with a cancel function that can be used to cancel and tear down resources associated with the subscription. It's recommended but not necessary to invoke the cancel function. Resources will be freed when the client stops in case it's not.
The event channel is buffered and sends on it are non-blocking. Consumers must process events in a timely manner or it's possible for events to be dropped. Any slow operations performed in a response to a receipt (e.g. persisting to a database) should be made asynchronous to avoid event loss.
Callers must specify the categories of events they're interested in. This allows for forward compatibility in case new categories of events are added in future versions. If new event categories are added, callers will have to explicitly add them to their requested list and ensure they can be handled correctly.
type ComparableObject ¶
ComparableObject is an interface that combines Object with a method for comparison.
type Config ¶
type Config struct { // Logger is the structured logger to use for logging purposes. If none is // specified, logs will be emitted to STDOUT with messages at warn level // or higher. Logger *slog.Logger // Controllers is a bundle of registered resource controllers. // // This field may be omitted for a program that's only informing resources // rather than working them, but if it is configured the client can validate // ahead of time that a controller is properly registered for an inserted resource. // (i.e. That it wasn't forgotten by accident.) Controllers *Controllers // Namespaces is a list of namespaces for this client to operate on along // with configuration for each namespace. // // This field may be omitted for a program that's only informing resources rather // than managing them. If it's specified, then Controllers must also be given. Namespaces map[string]NamespaceConfig // ResourceWorkTimeout is the maximum amount of time a controlle worker is allowed to run before its // context is cancelled. A timeout of zero means ResourceWorkTimeout will be // used, whereas a value of -1 means the controller's context will not be cancelled // unless the Client is shutting down. // // Defaults to 1 minute. ResourceWorkTimeout time.Duration // ControllerInformTimeout is the maximum amount of time a controller informer is allowed to run before its // context is cancelled. A timeout of zero means ControllerInformTimeout will be // used, whereas a value of -1 means the controller's context will not be cancelled // unless the Client is shutting down. // // Defaults to 1 minutes. ControllerInformTimeout time.Duration // MaintenanceJobInterval is the interval at which the maintenance jobs // will run. // // Defaults to 1 minute. MaintenanceJobInterval time.Duration // ResourceInformerInterval is the interval at which the resource informer // will run. If this is 0, the default inform interval of 1 hour is used. // // If this is < 0, the resource informers will be disabled. // // Defaults to 1 hour. ResourceInformInterval time.Duration // ResourceCleanerTimeout is the timeout of the individual queries within the // resource cleaner. // // Defaults to 30 seconds, which should be more than enough time for most // deployments. ResourceCleanerTimeout time.Duration // DeletedResourceRetentionPeriod is the amount of time to keep deleted resources // around before they're removed permanently. // // Defaults to 24 hours. DeletedResourceRetentionPeriod time.Duration // SyncedResourceRetentionPeriod is the amount of time to keep synced resources // around before they're removed permanently. // // Defaults to 24 hours. SyncedResourceRetentionPeriod time.Duration // DegradedResourceRetentionPeriod is the amount of time to keep degraded resources // around before they're removed permanently. // // Defaults to 24 hours. DegradedResourceRetentionPeriod time.Duration }
Config is the configuration for a Client.
type Controller ¶
Controller is an interface that can manage a resource with object of type T.
type Controllers ¶
type Controllers struct {
// contains filtered or unexported fields
}
Controllers is a list of available resource controllers. A Controller must be registered for each type of Resource to be handled.
Use the top-level AddController function combined with a Controllers to register a controller.
func NewControllers ¶
func NewControllers() *Controllers
NewControllers initializes a new registry of available resource controllers.
Use the top-level AddController function combined with a Controllers registry to register each available controller.
type Event ¶
type Event struct { // Resource is the object for the event. Resource *deltatype.ResourceRow // EventCategory returns the type of event that occurred. EventCategory EventCategory // Timestamp is the time the event occurred. Timestamp time.Time }
Event is a record of an action that occurred on an object.
type EventCategory ¶
type EventCategory string
const ( // EventCategoryObjectSynced occurs when a resource is synced. // Callers can use the resource object fields like `Attempt` and `CreatedAt` // to differentiate if the resource was created or updated. EventCategoryObjectSynced EventCategory = "object_synced" // EventCategoryObjectFailed occurs when a resource fails to sync. // Occurs both when a resource work fails and will be retried and when // a resource fails for the last time and is marked as degraded. // Callers can use the resource object fields like `Attempt` and `State` // to differentiate each type of occurrence. EventCategoryObjectFailed EventCategory = "object_failed" // EventCategoryObjectDeleted occurs when a resource is deleted. EventCategoryObjectDeleted EventCategory = "object_deleted" )
type InformArgs ¶
type InformArgs[T Object] struct { // ResourceKind is the kind of resource to inform // Required parameter ResourceKind string // ProcessExisting is used to determine if the informer should process existing resources // The informer checks existence based on object.Compare() or hash comparison // Defaults to false (skip existing) ProcessExisting bool // RunForeground is used to determine if the informer should run the work in the foreground or background // Defaults to false (background) // TODO: implement this RunForeground bool // Options are optional settings for filtering resources during inform. Options *InformOptions // contains filtered or unexported fields }
func (InformArgs[T]) InsertOpts ¶
func (i InformArgs[T]) InsertOpts() river.InsertOpts
func (InformArgs[T]) Kind ¶
func (i InformArgs[T]) Kind() string
type InformOptions ¶
type InformOptions struct { // Labels is a map of key-value pairs that can be used to filter resources Labels map[string]string // After allows filtering resources based on a specific time. // If specified, informers should inform resources that have been created or updated // after the specified time. // // Note that while the logic for which datetime field that this is used against is dependent // on the external system's API and the expected behavior of the resource/informer. // This should always be treated as exclusive date/time meaning the resources created or updated // at the exact time specified should not be included in the inform results. // // Note that this is mostly useful as a performance optimization to help keep periodic informer // jobs from re-informing entire datasets. After *time.Time // Limit sets the maximum number of resources to return (0 means no limit) Limit int // OrderBy specifies the field and direction for sorting (e.g., "created_at DESC") OrderBy string }
InformOptions are optional settings for filtering resources during inform. These are kept purposefully generic and it's up to the application to define what they mean.
type InformOpts ¶
type InformOpts struct { // Namespace is the namespace to use for resource isolation. // // Defaults to NamespaceDefault. Namespace string // Metadata is a JSON object blob of arbitrary data that will be stored with // the resource. Users should not overwrite or remove anything stored in this // field by Delta. Metadata []byte // MaxAttempts is the maximum number of times a resource will be attempted to be // synced. // // Defaults to 10. MaxAttempts int16 // Tags are an arbitrary list of keywords to add to the resource. They have no // functional behavior and are meant entirely as a user-specified construct // to help group and categorize resources. // // Tags should conform to the regex `\A[\w][\w\-]+[\w]\z` and be a maximum // of 255 characters long. No special characters are allowed. // // If tags are specified from both a resource override and from options on // Inform, the latter takes precedence. Tags are not merged. Tags []string }
InformOpts are optional settings for a new resource which can be provided at resource information time. These will override any default InformOpts settings provided by ObjectWithInformOpts, as well as any global defaults.
type InformScheduleArgs ¶
type InformScheduleArgs struct{}
func (InformScheduleArgs) InsertOpts ¶
func (s InformScheduleArgs) InsertOpts() river.InsertOpts
func (InformScheduleArgs) Kind ¶
func (s InformScheduleArgs) Kind() string
type Informer ¶
type Informer[T Object] interface { // Inform informs the controller of resources. // // // It is important to respect context cancellation to enable // the delta client to respond to shutdown requests. // There is no way to cancel a running controller that does not respect // context cancellation, other than terminating the process. Inform(ctx context.Context, opts *InformOptions) (<-chan T, error) // InformTimeout is the maximum amount of time the inform job is allowed to run before // its context is cancelled. A timeout of zero (the default) means the job // will inherit the Client-level timeout (defaults to 1 minute). // A timeout of -1 means the job's context will never time out. InformTimeout(args *InformArgs[T]) time.Duration }
Informer is an interface that provides a way to inform the controller of resources.
type InformerDefaults ¶
type InformerDefaults[T Object] struct{}
InformerDefaults is an empty struct that can be embedded in your controller struct to make it fulfill the Informer interface with default values.
func (InformerDefaults[T]) InformTimeout ¶
func (w InformerDefaults[T]) InformTimeout(*InformArgs[T]) time.Duration
Timeout returns the inform arg-specific timeout. Override this method to set a inform-specific timeout, otherwise the Client-level timeout will be applied.
type NamespaceConfig ¶
type NamespaceConfig struct { // MaxWorkers is the maximum number of workers to run for the namespace, or put // otherwise, the maximum parallelism to run. // // This is the maximum number of workers within this particular client // instance, but note that it doesn't control the total number of workers // across parallel processes. Installations will want to calculate their // total number by multiplying this number by the number of parallel nodes // running River clients configured to the same database and queue. // // Requires a minimum of 1, and a maximum of 10,000. MaxWorkers int // ResourceExpiry is the duration (seconds) after which a previously // synced resource is considered expired. // // This can be used to re-sync resources that may change or update in the // external system that may not be automatically re-informed based on a // creation time After filter. For immutable resources, this should NOT be set // to prevent unnecessary work for objects that will never change. // // If this is set to 0, resources within the namespace will never expire. // Defaults to 0 (resources never expire). ResourceExpiry time.Duration // DeletedResourceRetentionPeriod is the amount of time to keep deleted resources // around before they're removed permanently. // // Defaults to 24 hours. DeletedResourceRetentionPeriod time.Duration // SyncedResourceRetentionPeriod is the amount of time to keep synced resources // around before they're removed permanently. // This value is only respected if ResourceExpiry is set to a zero value. // // Defaults to 24 hours. SyncedResourceRetentionPeriod time.Duration // DegradedResourceRetentionPeriod is the amount of time to keep degraded resources // around before they're removed permanently. // // Defaults to 24 hours. DegradedResourceRetentionPeriod time.Duration }
NamespaceConfig contains namespace-specific configuration.
type Object ¶
type Object interface { // ID is a string that uniquely identifies the object. ID() string // Kind is a string that uniquely identifies the type of resource. This must be // provided on your resource object struct. Kind() string }
Object is an interface that represents the objects for a resource of type T. This object is serialized into JSON and stored in the database.
The struct is serialized using `encoding/json`. All exported fields are serialized, unless skipped with a struct field tag.
type ObjectSettings ¶
type ObjectSettings struct { // Parallelism is the number of concurrent workers that can be run for this // kind of resource. If this is 0, the default parallelism of 1 is used. // // Must be a zero or positive integer less than or equal to 10,000. Parallelism int // InformInterval is the interval at which the informer should run for this // kind of resource. If this is 0, the client's config inform interval is used. // // If < 0, the informer is disabled. InformInterval time.Duration }
type ObjectWithCreatedAt ¶
type ObjectWithCreatedAt interface { // CreatedAt returns the time the object was created. // This is used to determine the ordering of resources // and ensure periodic informers have 100% coverage of resources from // the controller's previous inform interval. CreatedAt() time.Time }
ObjectWithCreatedAt is an extra interface that a resource may implement on top of Object to provide a created at time as reported by an external system. It is strongly recommended to implement this interface for all resources.
This is important to prevent race conditions where a resource is created after the informer has finished but before the controller has updated its last inform time.
If this is not implemented, the current time will be used to track controller inform periods which is subject to Delta potentially excluding resources from being informed.
type ObjectWithInformOpts ¶
type ObjectWithInformOpts interface { // InformOpts returns options for all resources of this job type, overriding any // system defaults. These can also be overridden at inform time. InformOpts() InformOpts }
ObjectWithInformArgs is an extra interface that a resource may implement on top of Object to provide inform-time options for all resources of this type.
type ObjectWithSettings ¶
type ObjectWithSettings interface { // Behavior returns the settings for this object. Settings() ObjectSettings }
ObjectWithSettings is an extra interface that a resource may implement on top of Object to provide settings for all resources of this type.
type RescheduleResourceArgs ¶
type RescheduleResourceArgs struct{}
func (RescheduleResourceArgs) InsertOpts ¶
func (r RescheduleResourceArgs) InsertOpts() river.InsertOpts
func (RescheduleResourceArgs) Kind ¶
func (r RescheduleResourceArgs) Kind() string
type Resource ¶
type Resource[T Object] struct { *deltatype.ResourceRow // Object is the object for the resource. Object T }
Resource represents a single object, holding both the object and information for a resource with object of type T.
type ResourceDeleteError ¶
type ResourceDeleteError struct {
// contains filtered or unexported fields
}
ResourceDeleteError is the error type returned by ResourceDelete. It should not be initialized directly, but is returned from the ResourceDelete function and can be used for test assertions.
func (*ResourceDeleteError) Error ¶
func (e *ResourceDeleteError) Error() string
func (*ResourceDeleteError) Is ¶
func (e *ResourceDeleteError) Is(target error) bool
func (*ResourceDeleteError) Unwrap ¶
func (e *ResourceDeleteError) Unwrap() error
type ScheduleArgs ¶
func (ScheduleArgs[T]) InsertOpts ¶
func (s ScheduleArgs[T]) InsertOpts() river.InsertOpts
func (ScheduleArgs[T]) Kind ¶
func (s ScheduleArgs[T]) Kind() string
type ScheduleInformParams ¶
type SubscribeConfig ¶
type SubscribeConfig struct { // ChanSize is the size of the buffered channel that will be created for the // subscription. Incoming events that overall this number because a listener // isn't reading from the channel in a timely manner will be dropped. // // Defaults to 1000. ChanSize int // Categories are the category of object events that the subscription will receive. // Requiring that categories are specified explicitly allows for forward // compatibility in case new kinds of events are added in future versions. // If new event categories are added, callers will have to explicitly add them to // their requested list and ensure they can be handled correctly. Categories []EventCategory }
SubscribeConfig is more thorough subscription configuration used for Client.SubscribeConfig.
type Worker ¶
type Worker[T Object] interface { // Work performs the work on the resource object. This method must be idempotent. // The context will be configured with a timeout according to the worker settings // and may be canceled for other reasons. // // If no error is returned, the resource will be marked as synced. // // It is important to respect context cancellation to enable // the delta client to respond to shutdown requests. // There is no way to cancel a running resource that does not respect // context cancellation, other than terminating the process. Work(ctx context.Context, resource *Resource[T]) error // ResourceTimeout is the maximum amount of time the resource is allowed to be worked before // its context is cancelled. A timeout of zero (the default) means the job // will inherit the Client-level timeout (defaults to 1 minute). // A timeout of -1 means the job's context will never time out. ResourceTimeout(resource *Resource[T]) time.Duration }
Worker is an interface that can perform work on a resource object of type T. The Worker interface is one of two interfaces that all Controller[T]s must implement.
type WorkerDefaults ¶
type WorkerDefaults[T Object] struct{}
WorkerDefaults is an empty struct that can be embedded in your controller struct to make it fulfill the Worker interface with default values.
func (WorkerDefaults[T]) ResourceTimeout ¶
func (w WorkerDefaults[T]) ResourceTimeout(*Resource[T]) time.Duration
ResourceTimeout returns the resource-specific timeout. Override this method to set a resource-specific timeout, otherwise the Client-level timeout will be applied.