governor

package
v0.28.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package governor controls the concurrency of a network wide process

Using this one can, for example, create CRON jobs that can trigger 100s or 1000s concurrently but where most will wait for a set limit to complete. In effect limiting the overall concurrency of these execution.

To do this a Stream is created that has a maximum message limit and that will reject new entries when full.

Workers will try to place themselves in the Stream, they do their work if they succeed and remove themselves from the Stream once they are done.

As a fail safe the stack will evict entries after a set time based on Stream max age.

A manager is included to create, observe and edit these streams and the choria CLI has a new command build on this library: choria governor

Index

Constants

View Source
const DefaultInterval = 250 * time.Millisecond

DefaultInterval default sleep between tries, set with WithInterval()

Variables

This section is empty.

Functions

func List

func List(nc *nats.Conn, collective string) ([]string, error)

func StreamName

func StreamName(governor string) string

Types

type Finisher

type Finisher func() error

Finisher signals that work is completed releasing the slot on the stack

type Governor

type Governor interface {
	// Start attempts to get a spot in the Governor, gives up on context, call Finisher to signal end of work
	Start(ctx context.Context, name string) (fin Finisher, seq uint64, err error)
	// Connection is the NATS connection used to communicate
	Connection() *nats.Conn
}

Governor controls concurrency of distributed processes using a named governor stream

func New

func New(name string, nc *nats.Conn, opts ...Option) Governor

type Logger

type Logger interface {
	Debugf(format string, a ...any)
	Infof(format string, a ...any)
	Warnf(format string, a ...any)
	Errorf(format string, a ...any)
}

Logger is a custom logger

type Manager

type Manager interface {
	// Limit is the configured maximum entries in the Governor
	Limit() int64
	// MaxAge is the time after which entries will be evicted
	MaxAge() time.Duration
	// Name is the Governor name
	Name() string
	// Replicas is how many data replicas are kept of the data
	Replicas() int
	// SetLimit configures the maximum entries in the Governor and takes immediate effect
	SetLimit(uint64) error
	// SetMaxAge configures the maximum age of entries, takes immediate effect
	SetMaxAge(time.Duration) error
	// SetSubject configures the underlying NATS subject the Governor listens on for entry campaigns
	SetSubject(subj string) error
	// Stream is the underlying JetStream stream
	Stream() *jsm.Stream
	// Subject is the subject the Governor listens on for entry campaigns
	Subject() string
	// Reset resets the governor removing all current entries from it
	Reset() error
	// Active is the number of active entries in the Governor
	Active() (uint64, error)
	// Evict removes an entry from the Governor given its unique id, returns the name that was on that entry
	Evict(entry uint64) (name string, err error)
	// LastActive returns the the since entry was added to the Governor, can be zero time when no entries were added
	LastActive() (time.Time, error)
	// Connection is the NATS connection used to communicate
	Connection() *nats.Conn
}

Manager controls concurrent executions of work distributed throughout a nats network by using a stream as a capped stack where workers reserve a slot and later release the slot

func NewManager

func NewManager(name string, limit uint64, maxAge time.Duration, replicas uint, nc *nats.Conn, update bool, opts ...Option) (Manager, error)

type Option

type Option func(mgr *jsGMgr)

func WithBackoff

func WithBackoff(p backoff.Policy) Option

WithBackoff sets a backoff policy for gradually reducing try interval

func WithInterval

func WithInterval(i time.Duration) Option

WithInterval sets the interval between tries

func WithLogger

func WithLogger(log Logger) Option

WithLogger configures the logger to use, no logging when none is given

func WithSubject

func WithSubject(s string) Option

WithSubject configures a specific subject for the governor to act on

func WithoutLeavingOnCompletion

func WithoutLeavingOnCompletion() Option

WithoutLeavingOnCompletion prevents removal from the governor after execution

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL