srv

package
v0.10.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2024 License: Apache-2.0 Imports: 20 Imported by: 10

Documentation

Overview

Package srv provides a framework and toolkit for service orchestration.

srv contains three basic components: a Service type for managing the lifecylce of specific services, an Orchestrator for process-level management of services, and a collection of tools

Index

Constants

View Source
const (
	// ErrServiceAlreadyStarted is returend by the Start() method
	// of a service that is already running. These are safe to
	// ignore in many contexts.
	ErrServiceAlreadyStarted ers.Error = ers.Error("service already started")
	// ErrServiceReturned is returned by the Start() method of a
	// service that has already returned.
	ErrServiceReturned ers.Error = ers.Error("service returned")
	// ErrServiceNotStarted is returned by the Wait() method if
	// the Start method has not yet returned.
	ErrServiceNotStarted ers.Error = ers.Error("service not started")
)

Variables

This section is empty.

Functions

func AddCleanup added in v0.9.0

func AddCleanup(ctx context.Context, cleanup fun.Worker)

AddCleanup appends a cleanup function to the cleanup service pending in the context. Raises an invariant failure if the cleanup service was not previously configured, or if you attempt to add a new cleanup function while shutdown is running.

func AddCleanupError added in v0.9.0

func AddCleanupError(ctx context.Context, err error)

AddCleanupError adds an error to the cleanup handler which is returned when that service shuts down. Useful to propagating errors encountered during runtime that don't require a panic to process shutdown.

func AddToWorkerPool added in v0.8.0

func AddToWorkerPool(ctx context.Context, key string, fn fun.Worker) error

AddToWorkerPool dispatches work to the WorkerPool's queue. If there is no worker pool attached with the given key, an error is returned. If the queue has been closed or the queue is full errors from the pubsub package are propagated to the caller.

AddToWorkerPool will propagate worker functions to both conventional and obsesrver pools. Conventional pools will retain any error produced a worker function until the service exits, while observer pools pass errors to the observer function and then release them.

func GetBaseContext added in v0.6.0

func GetBaseContext(ctx context.Context) context.Context

GetBaseContext gets a base context attached with SetBaseContext from the current context. If the base context is not attached, GetBaseContext panics with a fun.ErrInvariantViolation error.

Use this context to start background services that should respect global shutdown and have access to the process' context, in the scope of a request that has a context that will be canceled early.

func GetShutdownSignal added in v0.8.0

func GetShutdownSignal(ctx context.Context) context.CancelFunc

GetShutdownSignal returns the previously attached cancellation function (via SetShutdown) for this context chain. To cancel all contexts that descend from the context created by SetShutdown callers must call the cancelfunction returned by GetShutdownSignal.

If a shutdown function was not set, GetShutdownSignal panics with a fun.ErrInvariantViolation error.

func HasBaseContext added in v0.8.0

func HasBaseContext(ctx context.Context) bool

HasBaseContext returns true if a base context is already set, and false otherwise.

func HasCleanup added in v0.9.0

func HasCleanup(ctx context.Context) bool

HasCleanup returns true if a cleanup process is registered in the context.

func HasOrchestrator added in v0.8.0

func HasOrchestrator(ctx context.Context) bool

HasOrchestrator returns true if the orchestrator is attached to the configuration, and false otherwise.

func HasShutdownSignal added in v0.8.0

func HasShutdownSignal(ctx context.Context) bool

HasShutdownSignal returns true if a shutdown has already set and false otherwise.

func SetBaseContext added in v0.6.0

func SetBaseContext(ctx context.Context) context.Context

SetBaseContext attaches the current context as an accessible value from the returned context. Once you attach core services to a base context (e.g. loggers, orchestrators, etc.) call SetBaseContext to make that context accessible later. This base context is useful for starting background services or dispatching other asynchronous work in the context of request driven work which can be canceled early.

If a base context is already set on this context, this operation panics with an invariant violation.

func SetHandlerWorkerPool added in v0.10.2

func SetHandlerWorkerPool(
	ctx context.Context,
	key string,
	queue *pubsub.Queue[fun.Worker],
	observer fun.Handler[error],
	optp ...fun.OptionProvider[*fun.WorkerGroupConf],
) context.Context

SetHandlerWorkerPool constructs a WorkerPool based on the *pubsub.Queue provided. The lifecycle of the WorkerPool service is managed by the orchestrator attached to this context: if not orchestrator is attached to the context, a new one is created and added to the context.

Errors produced by the workers, including captured panics where appropriate, are passed to the observer function and are not retained.

The number of go routines servicing the work queue is determined by the options.NumWorkers: the minimum value is 1. Values less than one become one.

In order to permit multiple parallel worker pools at a time attached to one context, specify an ID.

Use AddToWorkerPool with the specified key to dispatch work to this worker pool.

func SetOrchestrator added in v0.6.0

func SetOrchestrator(ctx context.Context, or *Orchestrator) context.Context

SetOrchestrator attaches an orchestrator to a context, if one is already set this is a panic with an invariant violation.

func SetShutdownSignal added in v0.8.0

func SetShutdownSignal(ctx context.Context) context.Context

SetShutdownSignal attaches a context.CancelFunc for the current context to that context, which can be accesed with the GetShutdown function to make it possible to trigger a safe and clean shutdown in functions that have access to the context but that.

If a shutdown function is already set on this context, this operation is a noop.

func SetWorkerPool added in v0.8.0

func SetWorkerPool(
	ctx context.Context,
	key string,
	queue *pubsub.Queue[fun.Worker],
	optp ...fun.OptionProvider[*fun.WorkerGroupConf],
) context.Context

SetWorkerPool constructs a WorkerPool based on the *pubsub.Queue provided. The lifecycle of the WorkerPool service is managed by the orchestrator attached to this context: if not orchestrator is attached to the context, a new one is created and added to the context.

The number of go routines servicing the work queue is determined by the options.NumWorkers: the minimum value is 1. Values less than one become one.

In order to permit multiple parallel worker pools at a time attached to one context, specify an ID.

Use AddToWorkerPool with the specified key to dispatch work to this worker pool.

func WithCleanup added in v0.9.0

func WithCleanup(ctx context.Context) context.Context

WithCleanup adds a Cleanup service as created by the Cleanup() constructor, to an orchestrator attached to the context (or creates the orchestrator if needed,)

func WithHandlerWorkerPool added in v0.10.2

func WithHandlerWorkerPool(
	ctx context.Context,
	key string,
	observer fun.Handler[error],
	optp ...fun.OptionProvider[*fun.WorkerGroupConf],
) context.Context

WithHandlerWorkerPool setups a long running WorkerPool service, starts it, and attaches it to the returned context. The lifecycle of the WorkerPool service is managed by the orchestrator attached to this context: if not orchestrator is attached to the context, a new one is created and added to the context.

In order to permit multiple parallel worker pools at a time attached to one context, specify an ID.

The number of go routines servicing the work queue is determined by the options.NumWorkers: the minimum value is 1. Values less than one become one.

The default queue created by WithWorkerPool has a flexible capped size that's roughly twice the number of active workers and a hard limit of 4 times the number of active workers. You can use SetHandlerWorkerPool to create an unbounded queue or a queue with different capacity limits.

All errors encountered during the execution of worker functions, including panics, are passed to the observer function and are not retained.

func WithOrchestrator added in v0.6.0

func WithOrchestrator(ctx context.Context) context.Context

WithOrchestrator creates a new *Orchestrator, starts the associated service, and attaches it to the returned context. You should also, wait on the orchestrator's service to return before your process exits, as in:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = srv.WithOrchestrator(ctx)
defer srv.GetOrchestrator(ctx).Service().Wait()

There are two flaws with this example: nothing calls cancel on the orchestrators context, and nothing observes the error from Wait(). The base context passed to the orchestrator could easily be a singal.NotifyContext() so that the context is eventually canceled, or the caller should call cancel explicitly. The alternate implementation, that resolves these issues:

ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM)
defer cancel()
ctx = srv.WithOrchestrator(ctx)
defer func()  { risky.Force(srv.GetOrchestrator(ctx).Service().Wait()) }()

In this example, the wait will begin during shutdown and risky.Force will raise an ErrInvariantViolation panic with the contents of Wait's error.

If an Orchestrator is already set on the context, this operation panics with an invariant violation.

func WithWorkerPool added in v0.8.0

func WithWorkerPool(
	ctx context.Context,
	key string,
	optp ...fun.OptionProvider[*fun.WorkerGroupConf],
) context.Context

WithWorkerPool setups a long running WorkerPool service, starts it, and attaches it to the returned context. The lifecycle of the WorkerPool service is managed by the orchestrator attached to this context: if not orchestrator is attached to the context, a new one is created and added to the context.

In order to permit multiple parallel worker pools at a time attached to one context, specify an ID.

The number of go routines servicing the work queue is determined by the options.NumWorkers: the minimum value is 1. Values less than one become one.

The default queue created by WithWorkerPool has a flexible capped size that's roughly twice the number of active workers and a hard limit of 4 times the number of active workers. You can use SetWorkerPool to create an unbounded queue or a queue with different capacity limits.

Be aware that with this pool, errors returned from worker functions remain in memory until they are returned when the service exits. In many cases keeping these errors is reasonable; however, for very long lived processes with a high error volume, this may not be workable. In these cases: avoid returning errors, collect and process them within your worker functions, or use an observer worker pool.

Use AddToWorkerPool with the specified key to dispatch work to this worker pool.

Types

type Orchestrator

type Orchestrator struct {
	// Name is used in the string format of the Orchestrator
	// object and also propagated to the name of services.
	Name string
	// contains filtered or unexported fields
}

Orchestrator manages groups of services and makes it possible to add services to a running system, but with coordinated shutdown mechanisms of normal services.

Use the Service() method to generate a service that will wait for new services to added to the orchestrator and start them (if needed.)

func GetOrchestrator added in v0.6.0

func GetOrchestrator(ctx context.Context) *Orchestrator

GetOrchestrator resolves the Orchestrator attached to the context or panics if there is no orchestrator attached to the context.

func (*Orchestrator) Add

func (or *Orchestrator) Add(s *Service) error

Add sends a *Service to the orchestrator. If the service is running, it will be started after all of the other services have started. There is no limit on the number of services an orchestrator can manage, and you can add services before starting the orchestrator's service. Services are started in the order they're added.

Nil services are ignored without an error.

Once the orchestrator's service is closed, or its context is canceled, all successive calls to Add will return an error.

func (*Orchestrator) Service

func (or *Orchestrator) Service() *Service

Service returns a service that runs all of the constituent services of the orchestrator. The service must be started by the caller.

Constituent Services are started in the order they are passed to Add.

The orchestrator's service is blocking and will wait until it's closed or its context is canceled.

When called more than once, Service will return the same object. However, if you call Add or Service to an orchestrator whose Service has already completed and return, the Orchestrator will reset and a new service will be created. The new service must be started, but this can allow the orchestrator to drop all references to completed services that would otherwise remain.

func (*Orchestrator) Start added in v0.6.5

func (or *Orchestrator) Start(ctx context.Context) error

Start is a convenience function that run's the service's start function.

func (*Orchestrator) String added in v0.6.0

func (or *Orchestrator) String() string

String implements fmt.Stringer and returns the type name and

func (*Orchestrator) Wait added in v0.6.5

func (or *Orchestrator) Wait() error

Wait is a convenience function that blocks until the Orchestrator's service completes.

type Service

type Service struct {
	// Name is a human-readable name for the service, and is used
	// in the String() method, and to annotate errors.
	Name string

	// Run is a blocking function that does the main action of the
	// service. When the Run function returns the shutdown
	// function is called if it exists. The error produced by the
	// Run function is propagated to callers via the results of
	// the Wait method. Panics in the Run method are converted to
	// errors and propagated to the Wait method.
	//
	// Implementations are responsible for returning promptly when
	// the context is canceled.
	Run func(context.Context) error

	// Cleanup is optional, but if defined is always called after the
	// Run function returned. Cleanup operations should all return
	// relatively quickly and be used for releasing state rather
	// than doing potentially blocking work.
	Cleanup func() error

	// Shutdown is optional, but provides a hook that
	// implementations can be used to trigger a shutdown when the
	// context passed to Start is canceled but before the Run
	// function returns. The shutdown function, when defined,
	// must return before the Cleanup function runs.
	Shutdown func() error

	// ErrorHandler functions, if specified, are called when the
	// service returns, with the output of the Service's error
	// output. This is always the result of an
	// erc.Collector.Resolve() method, and so contains the
	// aggregated errors and panics collected during the service's
	// execution (e.g. Run, Shutdown, Cleanup), and should be
	// identical to the output of Wait(). Caller's set this value
	// during the execution of the service.
	ErrorHandler adt.Atomic[func(error)]
	// contains filtered or unexported fields
}

Service defines a background operation. The behavior of the service is defined by the Run, Shutdown, and Cleanup functions defined in the Service structure, with the service lifecycle managed by the context passed to start.

Applications often consist of a number of services, and the Group function amalgamates a number of services into a single service, while the Orchestrator provides a more dynamic mechanism for managing services during an application's lifetime.

There are no special construction requirements for Services and implementations must define Run methods, with the other options optional. The service can only be run once. There is no particular concurency control provided on Services, except via the atomic setter on the ErrorHandler. While callers should not modify the other attributes of the service, after Start() returns the Run/Cleanup/Shutdown functions are not referenced.

func Broker added in v0.6.6

func Broker[T any](broker *pubsub.Broker[T]) *Service

Broker creates a Service implementation that wraps a pubsub.Broker[T] implementation for integration into service orchestration frameworks.

func Cleanup added in v0.9.0

func Cleanup(pipe *pubsub.Queue[fun.Worker], timeout time.Duration) *Service

Cleanup provides a service which services the provided queue of worker functions and runs the shutdown functions during service shutdown (e.g. either after Close() is called or when the context is canceled.) The timeout, when non-zero, is passed to the clean up operation. Cleanup functions are dispatched in parallel.

func Cmd added in v0.8.2

func Cmd(c *exec.Cmd, shutdownTimeout time.Duration) *Service

Cmd wraps a exec.Command execution that **has not started** into a service. If the command fails, the service returns.

When the service is closed or the context is canceled, if the command has not returned, the process is sent SIGTERM. If, after the shutdownTimeout duration, the service has not returned, the process is sent SIGKILL. In all cases, the service will not return until the underlying command has returned, potentially blocking until the command returns.

func Daemon added in v0.8.2

func Daemon(s *Service, minInterval time.Duration) *Service

Daemon produces a service that wraps another service, restarting the base service in the case of errors or early termination. The interval governs how long. If the base service's run function returns an error that is either context.Canceled or context.DeadlineExceeded, the context passed to the daemon service is canceled, or the new services' Close() method is called, then the service will return. In all other cases the service will restart.

All errors encountered, *except* errors that occur after the context has been canceled *or* that are rooted in context cancellation errors are collected and aggregated to the Daemon services Wait() response.

The input services Run/Shutdown/Cleanup/ErrorHandler are captured when the Daemon service is created, and modifications to the base service are not reflected in the daemon service. The base Service's Run function is passed a context that is always canceled after that instance of the Run invocation returns to give each invocation of the daemon a chance to release its specific resources.

The minInterval value ensures that services don't crash in a tight loop: if the time between starting the input service and the next loop is less than the minInterval value, then the Daemon service will wait until at least that interval has passed from the last time the service started.

func Group

func Group(services *fun.Iterator[*Service]) *Service

Group makes it possible to have a collection of services, provided via an iterator, that behave as a single service. All services are started concurrently (and without order) and shutdown concurrently (and without order). The Service produced by group, has the same semantics as any other Service.

Use Group(itertool.Slice([]*Service)) to produce a group from a slice of *Services,

func HTTP

func HTTP(name string, shutdownTimeout time.Duration, hs *http.Server) *Service

HTTP wraps an http.Server object in a Service, both for common use and convenience, and as an example for implementing arbitrary services.

If the http.Server's BaseContext method is not set, it is overridden with a function that provides a copy of the context passed to the service's Start method: this ensures that requests have access to the same underlying context as the service.

func HandlerWorkerPool added in v0.10.2

func HandlerWorkerPool(
	workQueue *pubsub.Queue[fun.Worker],
	observer fun.Handler[error],
	optp ...fun.OptionProvider[*fun.WorkerGroupConf],
) *Service

HandlerWorkerPool has similar semantics and use to the WorkerPool, but rather than aggregating errors, all errors are passed to the observer function, which is responsible for ignoring or processing the errors. The worker pool will respect continue/abort on error or panics as expected.

Handler pools may be more operable if your workers generate many errors, and/or your process is long lived.

The service itself may have execution or shutdown related errors, particularly if there is an invariant violation or panic during service execution which will be propagated to the return value of the service's Wait method; but all errors that occur during the execution of the workload will be observed (including panics) as well.

func ProcessIterator added in v0.6.6

func ProcessIterator[T any](
	iter *fun.Iterator[T],
	processor fun.Processor[T],
	optp ...fun.OptionProvider[*fun.WorkerGroupConf],
) *Service

ProcessIterator runs an itertool.ParallelForEach operation as a *Service. For a long running service, use an iterator that is blocking (e.g. based on a pubsub queue/deque or a channel.)

func Wait added in v0.6.3

func Wait(iter *fun.Iterator[fun.Operation]) *Service

Wait creates a service that runs until *both* all wait functions have returned *and* the iterator is exhausted. The Service's wait function returns an error that aggregates all errors (e.g. panics) encountered by the constituent wait functions.

Wait produces a service that fills the same role as the fun.WaitMerge function, but that can be more easily integrated into existing orchestration tools.

When the service returns all worker Goroutines as well as the input worker will have returned. Use a blocking pubsub iterator to dispatch wait functions throughout the lifecycle of your program.

func WorkerPool added in v0.8.0

func WorkerPool(workQueue *pubsub.Queue[fun.Worker], optp ...fun.OptionProvider[*fun.WorkerGroupConf]) *Service

WorkerPool wraps a pubsub.Queue of functions that represent units of work in an worker pool. The pool follows the semantics configured by the itertool.Options, with regards to error handling, panic handling, and parallelism. Errors are collected and propogated to the service's ywait function.

func (*Service) Close

func (s *Service) Close()

Close forceably shuts down the service, causing the background process to terminate and wait to return. If the service hasn't started or isn't running this has no effect. Close is safe to call multiple times.

func (*Service) Running

func (s *Service) Running() bool

Running returns true as soon as start returns, and returns false after close is called.

func (*Service) Start

func (s *Service) Start(ctx context.Context) error

Start launches the configured service and tracks its lifecycle. If the context is canceled, the service returns, and any errors collected during the service's lifecycle are returned by the close (or wait) methods.

If the service is running or is finished, Start returns the appropriate sentinel error.

func (*Service) String

func (s *Service) String() string

String implements fmt.Stringer and includes value of s.Name.

func (*Service) Wait

func (s *Service) Wait() error

Wait blocks until the service returns or the service's start context is canceled. If the service hasn't been started Wait returns ErrServiceNotStarted. It is safe to call wait multiple times. While wait does not accept a context, it respects the context passed to start. Wait returns nil if the service's background operation encountered no errors, and otherwise returns an erc.ErrorStack object that contains the errors encountered when running the service, the shutdown hook, and any panics encountered during the service's execution.

func (*Service) Worker added in v0.8.5

func (s *Service) Worker() fun.Worker

Worker runs the service, starting it if needed and then waiting for the service to return. If the service has already finished, the fun.Worker, like Wait(), will return the same aggreagated error from the service when called multiple times on a completed service.

Unlike Wait(), the fun.Worker respects the context and will return early if the context is canceled: returning a context cancelation error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL