process

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2020 License: MIT Imports: 12 Imported by: 2

README

Nacelle Process Runner GoDoc CircleCI Coverage Status

Process initializer and supervisor for nacelle.


Nacelle applications can be coarsely decomposed into several behavioral categories.

Process
A process is a long-running component of an application such as a server or an event processor. Processes will usually run for the life of the application (e.g. until a shutdown signal is received or an error occurs). There is a special class of processes that are allowed to exit once running, but these are the exception.
Initializer
An initializer is a component that is invoked once on application startup. An initializer usually instantiates a service or set up shared state required by other parts of the application
Service
A service is an object that encapsulates some data, state, or behavior, but does not have a rigid initialization. A service is generally instantiated by an initializer and inserted into a shared service container.

You can see additional examples of initializer and process definition and registration in the example repository. Specifically, there is an initializer to create a shared Redis connection and its registration in one of the program entrypoints. This project also provides a set of abstract base processes for common process types: an AWS Lambda event listener, a gRPC server, an HTTP server, and a generic worker process, which are a good and up-to-date source for best-practices.

Process Definition

A process is a struct with an Init, a Start, and a Stop method. The initialization method that takes a config object as a parameter. Each method may return an error value. For long-running processes, such as servers, the start method should be blocking. The stop method may signal the process to gracefully shut-down (via a channel or synchronization primitive), but does not need to wait until the application exits. A process is also an initializer, so the above also applies. The following example uses the database connection created by the initializer defined below, injected by the service container, and pings it on a loop to logs its latency. The stop method closes a channel to inform the start method to unblock.

type PingProcess struct {
    DB           *sqlx.DB       `service:"db"`
    Logger       nacelle.Logger `service:"logger"`
    halt         chan struct{}
    once         sync.Once
    tickInterval time.Duration
}

type Config struct {
    TickInterval int `env:"tick_interval"`
}

func (p *PingProcess) Init(config nacelle.Config) error {
    pingConfig := &Config{}
    if err := config.Load(pingConfig); err != nil {
        return err
    }

    p.tickInterval = time.Duration(pingConfig.tickInterval) * time.Second
    return nil
}

func (p *PingProcess) Start() error {
    for {
        select {
            case <-p.halt:
                return nil
            case <-time.After(p.tickInterval):
        }

        start := time.Now()
        err := p.DB.Ping()
        duration := time.Now().Sub(start)
        durationMs := float64(duration) / float64(time.Milliseconds)

        if err != nil {
            return err
        }

        p.Logger.Debug("Ping took %.2fms", durationMs)
    }

    return nil
}

func (p *PingProcess) Stop() error {
    p.once.Do(func() {
        close(p.halt)
    })

    return nil
}
Tracking Process Health

Processes can dynamically report their own health via a shared health tracker object available in the service container. The tracker maintains a list of reasons that an application is not fully healthy. When this list is empty, the application should be fully functional.

A process that does not interact with the health instance is assumed to be healthy when it is live. Usage of a global health instance should be used as follows.

type HealthConsciousProcess struct {
    Health nacelle.Health `service:"health"`
}

func (p *HealthConsciousProcess) Init(config nacelle.Config) error {
    // Before the process starts, add an "unhealthy" token unique to this process
    if err := p.Health.AddReason("health-conscious-process"); err != nil {
        return err
    }

    // ...
    return nil
}

func (p *HealthConsciousProcess) Start() error {
    // pre-healthy processing
    // ...

    // Once healthy, remove the reason registered above
    if err := p.Health.RemoveReason("health-conscious-process"); err != nil {
        return err
    }

    // post-healthy processing
    // ...

    return nil
}

func (p *HealthConsciousProcess) Stop() error {
    // ...
    return nil
}
Initializer Definition

An initializer is a struct with an Init method that takes a config object and may return an error value. Initializers should not perform long-running computation unless it is necessary for the startup of an application as they will block additional application startup. The following example creates a connection to a Postgres database and stores it in a service container for subsequent initializers and processes to use.

import (
    "github.com/go-nacelle/nacelle"
    "github.com/jmoiron/sqlx"
    _ "github.com/lib/pq"
)

type DatabaseInitializer struct {
    Services nacelle.ServiceContainer `service:"services"`
}

type Config struct {
    ConnectionString string `env:"connection_string" required:"true"`
}

func (i *DatabaseInitializer) Init(config nacelle.Config) error {
    dbConfig := &Config{}
    if err := config.Load(dbConfig); err != nil {
        return err
    }

    db, err := sqlx.Open("postgres", dbConfig.ConnectionString)
    if err != nil {
        return err
    }

    return i.Services.Set("db", db)
}

An initializer can also be a finalizer if it defines a Finalize method. This can be useful for initializers that need to do some cleanup action before the application shuts down such as closing a log or profile file, closing remote connections, or ensuring that certain buffers get flushed before the application ends.

Application Lifecycle

Concrete instances of initializers and processes are registered to a process container at application startup (as in the example shown below). The nacelle bootstrapper then handles initialization and invocation of the process container and runner that will control the initialization and supervision of registered components. The following sequence of events occur on application boot.

  • Initializer boot stage: Each initializer is booted sequentially in the order that it is registered. First, services are injected into the initializer instance via the shared service container. If a previous initializer had registered a service into the container, it will be available at this time. Next, the initializer's Init method is invoked. If an error occurs in either stage, the remainder of the boot process is abandoned. Any initializer that had successfully completed is unwound: each initializer that implements the finalizer interface will be have their finalization method invoked. Initializers are finalized sequentially in the reverse order of their initialization.

  • Process boot stage: After all initializers have completed successfully, the processes will begin to boot. First, services are injected into all processes, regardless of their priority order. If this fails, then the remainder of the boot process is abandoned and the initializers are unwound. Then, processes continue to boot in batches based on their (ascending) priority order. Within each batch, the following sequence of events occur.

    • Batch initialization: The Init method of each process is invoked sequentially in the order that it is registered. If this fails, then the remainder of the boot process is abandoned the initializers are unwound.

    • Batch launch: The Start method of each process is invoked. Each invocation is made concurrently and in a different goroutine. The remainder of the boot process is suspended until all processes within this priority become healthy. If a process returns from its Start method or does not become healthy within the given timeout period, the boot process is abandoned and the process is unwound, as described in the next stage.

  • Supervisory stage: Once all process batches have been started and have become healthy after initialization, the supervisory stage begins. This stage listens for one of the following events and begins to unwind the process.

    • The user sends the process a signal
    • A process's Start method returns with an error
    • A process's Start method returns without an error, but is not marked for silent exit

The process is unwound by stopping each process for which a Start goroutine was created. The Stop method of each process is called concurrently with all processes within its priority batch, and each priority batch is stopped by (descending) priority order. Finally, initializers are unwound as described above.

Registration

The following example registers a cache and a database initializer, then registers a health server, an HTTP server, and a gRPC server. The database initializer must complete within five seconds or the application fails (and, if orchestrated properly, will restart). The health server is registered at a lower priority than the other two. This ensures that the health server can take requests as early as the HTTP and gRPC servers, and also ensures that it will be shut down only after the higher-priority processes have been shut down.

func setupProcesses(processes nacelle.ProcessContainer, services nacelle.ServiceContainer) error {
    processes.RegisterInitializer(
        NewCacheInitializer(),
        nacelle.WithInitializerName("cache"),
    )

    processes.RegisterInitializer(
        NewDatabaseInitializer(),
        nacelle.WithInitializerName("db"),
        nacelle.WithInitializerTimeout(time.Second * 5),
    )

    processes.RegisterProcess(
        NewHealthServer(),
        nacelle.WithProcessName("health-server"),
    )

    processes.RegisterProcess(
        NewHTTPServer(),
        nacelle.WithProcessName("http-server"),
        nacelle.WithPriority(1),
    )

    processes.RegisterProcess(
        NewGRPCServer(),
        nacelle.WithProcessName("grpc-server"),
        nacelle.WithPriority(1),
    )

    return nil
}
Process Registration Options

The following options can be set on a process during registration.

WithPriority
WithPriority sets the priority group the process belongs to. Processes of the same - priority are initialized and started in parallel.
WithProcessInitTimeout
WithProcessInitTimeout sets the maximum time that the process can spend in its Init method.
WithProcessName
WithProcessName sets the name of the process in log messages.
WithProcessShutdownTimeout
WithProcessShutdownTimeout sets the maximum time that the process can spend waiting for its Start method to unblock after its Stop method is called.
WithProcessStartTimeout
WithProcessStartTimeout sets the maximum time that the process can spend unhealthy after its Start method is called. See health below.
WithSilentExit
WithSilentExit sets a flag that allows a nil error value to be returned without signaling an application shutdown. This can be useful for things like leader election on startup which should not stop hot standby processes from taking client requests.
Initializer Registration Options

The following options can be set on an initializer during registration.

WithFinalizerTimeout
WithFinalizerTimeout sets the maximum time that the initializer can spend on finalization. As the application is already shutting down, this will simply log and error and unblock the finalizer.
WithInitializerName
WithInitializerName sets the name of the initializer in log messages.
WithInitializerTimeout
WithInitializerTimeout sets the maximum time that the initializer can spend in its Init method. An error will be returned if this time is exceeded.
Parallel Initializers

Initializers run sequentially and non-concurrently so that a previously registered initializer can provide a service required by a subsequently registered initializer. However, this default sequencing behavior is not always necessary and can increase startup time.

When initializers can be run independently (for example, creating multiple SDK client instances for a list of AWS services), it is unnecessary to run them in sequence. Groups of such services can be registered to a ParallelInitializer. This is a bag of initializers that run the initializers registered to it in parallel. The initializer will block its siblings until all of its children complete successfully.

awsGroup := nacelle.NewParallelInitializer()
awsGroup.RegisterInitializer(NewDynamoDBClientInitializer())
awsGroup.RegisterInitializer(NewKinesisClientInitializer())
awsGroup.RegisterInitializer(NewLambdaClientInitializer())
awsGroup.RegisterInitializer(NewSQSClientInitializer())

// Register parallel group
processes.RegisterInitializer(awsGroup)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Finalizer

type Finalizer interface {
	// Finalize is called after the application has stopped
	// all running processes.
	Finalize() error
}

Finalizer is an optional extension of an Initializer that supports finalization. This is useful for initializers that need to tear down a background process before the process exits, but needs to be started early in the boot process (such as flushing logs or metrics).

type Health

type Health interface {
	Reasons() []Reason
	LastChange() time.Duration
	AddReason(key interface{}) error
	RemoveReason(key interface{}) error
	HasReason(key interface{}) bool
}

func NewHealth

func NewHealth(configs ...HealthConfigFunc) Health

type HealthConfigFunc

type HealthConfigFunc func(*health)

func WithHealthClock

func WithHealthClock(clock glock.Clock) HealthConfigFunc

type Initializer

type Initializer interface {
	// Init reads the given configuration and prepares
	// something for use by a process. This can be loading
	// files from disk, connecting to a remote service,
	// initializing shared data structures, and inserting
	// a service into a shared service container.
	Init(config config.Config) error
}

Initializer is an interface that is called once on app startup.

type InitializerConfigFunc

type InitializerConfigFunc func(*InitializerMeta)

InitializerConfigFunc is a function used to append additional metadata to an initializer during registration.

func WithFinalizerTimeout

func WithFinalizerTimeout(timeout time.Duration) InitializerConfigFunc

WithFinalizerTimeout sets the time limit for the finalizer.

func WithInitializerLogFields added in v1.1.0

func WithInitializerLogFields(fields log.LogFields) InitializerConfigFunc

WithInitializerLogFields sets additional fields sent with every log message from this initializer.

func WithInitializerName

func WithInitializerName(name string) InitializerConfigFunc

WithInitializerName assigns a name to an initializer, visible in logs.

func WithInitializerTimeout

func WithInitializerTimeout(timeout time.Duration) InitializerConfigFunc

WithInitializerTimeout sets the time limit for the initializer.

type InitializerFunc

type InitializerFunc func(config config.Config) error

InitializerFunc is a non-struct version of an initializer.

func (InitializerFunc) Init

func (f InitializerFunc) Init(config config.Config) error

Init calls the underlying function.

type InitializerMeta

type InitializerMeta struct {
	Initializer
	// contains filtered or unexported fields
}

InitializerMeta wraps an initializer with some package private fields.

func (*InitializerMeta) FinalizeTimeout

func (m *InitializerMeta) FinalizeTimeout() time.Duration

FinalizeTimeout returns the maximum timeout allowed for a call to the Finalize function. A zero value indicates no timeout.

func (*InitializerMeta) InitTimeout

func (m *InitializerMeta) InitTimeout() time.Duration

InitTimeout returns the maximum timeout allowed for a call to the Init function. A zero value indicates no timeout.

func (*InitializerMeta) LogFields added in v1.1.0

func (m *InitializerMeta) LogFields() log.LogFields

LogFields returns logging fields registered to this initializer.

func (*InitializerMeta) Name

func (m *InitializerMeta) Name() string

Name returns the name of the initializer.

func (*InitializerMeta) Wrapped

func (m *InitializerMeta) Wrapped() interface{}

Wrapped returns the underlying initializer.

type ParallelInitializer

type ParallelInitializer struct {
	Logger   log.Logger               `service:"logger"`
	Services service.ServiceContainer `service:"services"`
	// contains filtered or unexported fields
}

ParallelInitializer is a container for initializers that are initialized in parallel. This is useful when groups of initializers are independent and may contain some longer-running process (such as dialing a remote service).

func NewParallelInitializer

func NewParallelInitializer(initializerConfigs ...ParallelInitializerConfigFunc) *ParallelInitializer

NewParallelInitializer creates a new parallel initializer.

func (*ParallelInitializer) Finalize

func (pi *ParallelInitializer) Finalize() error

Finalize runs Finalize on all registered initializers concurrently.

func (*ParallelInitializer) Init

func (pi *ParallelInitializer) Init(config config.Config) error

Init runs Init on all registered initializers concurrently.

func (*ParallelInitializer) RegisterInitializer

func (i *ParallelInitializer) RegisterInitializer(
	initializer Initializer,
	initializerConfigs ...InitializerConfigFunc,
)

RegisterInitializer adds an initializer to the initializer set with the given configuration.

type ParallelInitializerConfigFunc

type ParallelInitializerConfigFunc func(*ParallelInitializer)

ParallelInitializerConfigFunc is a function used to configure an instance of a ParallelInitializer.

func WithParallelInitializerClock

func WithParallelInitializerClock(clock glock.Clock) ParallelInitializerConfigFunc

WithParallelInitializerClock sets the clock used by the runner.

func WithParallelInitializerContainer

func WithParallelInitializerContainer(container service.ServiceContainer) ParallelInitializerConfigFunc

WithParallelInitializerContainer sets the service container used by the runner.

func WithParallelInitializerLogger

func WithParallelInitializerLogger(logger log.Logger) ParallelInitializerConfigFunc

WithParallelInitializerLogger sets the logger used by the runner.

type Process

type Process interface {
	Initializer

	// Start begins performing the core action of the process.
	// For servers, this will begin accepting clients ona  port.
	// For workers, this may begin reading from a remote work
	// queue and processing messages. This method should block
	// until a fatal error occurs, or until the Stop method is
	// called (at which point a nil error should be returned).
	// If this method is non-blocking, then the process should
	// be registered with the WithSilentExit option.
	Start() error

	// Stop informs the work being performed by the Start
	// method to begin a graceful shutdown. This method is
	// not expected to block until shutdown completes.
	Stop() error
}

Process is an interface that continually performs a behavior during the life of a program. Generally, one process should do a single thing. Multiple processes can be registered to a process container and those processes can coordinate and communicate through shared services.

type ProcessConfigFunc

type ProcessConfigFunc func(*ProcessMeta)

ProcessConfigFunc is a function used to append additional metadata to an process during registration.

func WithPriority

func WithPriority(priority int) ProcessConfigFunc

WithPriority assigns a priority to a process. A process with a lower-valued priority is initialized and started before a process with a higher-valued priority. Two processes with the same priority are started concurrently.

func WithProcessInitTimeout

func WithProcessInitTimeout(timeout time.Duration) ProcessConfigFunc

WithProcessInitTimeout sets the time limit for the process's Init method.

func WithProcessLogFields added in v1.1.0

func WithProcessLogFields(fields log.LogFields) ProcessConfigFunc

WithProcessLogFields sets additional fields sent with every log message from this process.

func WithProcessName

func WithProcessName(name string) ProcessConfigFunc

WithProcessName assigns a name to an process, visible in logs.

func WithProcessShutdownTimeout

func WithProcessShutdownTimeout(timeout time.Duration) ProcessConfigFunc

WithProcessShutdownTimeout sets the time limit for the process's Start method to yield after the Stop method has been called.

func WithProcessStartTimeout

func WithProcessStartTimeout(timeout time.Duration) ProcessConfigFunc

WithProcessStartTimeout sets the time limit for the process to become healthy.

func WithSilentExit

func WithSilentExit() ProcessConfigFunc

WithSilentExit allows a process to exit without causing the program to halt. The default is the opposite, where the completion of any registered process (even successful) causes a graceful shutdown of the other processes.

type ProcessContainer

type ProcessContainer interface {
	// RegisterInitializer adds an initializer to the container
	// with the given configuration.
	RegisterInitializer(Initializer, ...InitializerConfigFunc)

	// RegisterProcess adds a process to the container with the
	// given configuration.
	RegisterProcess(Process, ...ProcessConfigFunc)

	// NumInitializers returns the number of registered initializers.
	NumInitializers() int

	// NumProcesses returns the number of registered processes.
	NumProcesses() int

	// NumPriorities returns the number of distinct registered
	// process priorities.
	NumPriorities() int

	// GetInitializers returns a slice of meta objects wrapping
	// all registered initializers.
	GetInitializers() []*InitializerMeta

	// GetProcessesAtPriorityIndex returns  aslice of meta objects
	// wrapping all processes registered to this priority index,
	// where zero denotes the lowest priority, one the second
	// lowest, and so on. The index parameter is not checked for
	// validity before indexing an internal slice - caller beware.
	GetProcessesAtPriorityIndex(index int) []*ProcessMeta
}

ProcessContainer is a collection of initializers and processes.

func NewProcessContainer

func NewProcessContainer() ProcessContainer

NewProcessContainer creates an empty process container.

type ProcessMeta

type ProcessMeta struct {
	Process
	// contains filtered or unexported fields
}

ProcessMeta wraps a process with some package private fields.

func (*ProcessMeta) InitTimeout

func (m *ProcessMeta) InitTimeout() time.Duration

InitTimeout returns the maximum timeout allowed for a call to the Init function. A zero value indicates no timeout.

func (*ProcessMeta) LogFields added in v1.1.0

func (m *ProcessMeta) LogFields() log.LogFields

Logields returns logging fields registered to this process.

func (*ProcessMeta) Name

func (m *ProcessMeta) Name() string

Name returns the name of the process.

func (*ProcessMeta) Stop

func (m *ProcessMeta) Stop() (err error)

Stop wraps the underlying process's Stop method with a Once value in order to guarantee that the Stop method will not take effect multiple times.

func (*ProcessMeta) Wrapped

func (m *ProcessMeta) Wrapped() interface{}

Wrapped returns the underlying process.

type Reason

type Reason struct {
	Key   interface{}
	Added time.Time
}

type Runner

type Runner interface {
	// Run takes a loaded configuration object, then starts and monitors
	// the registered items in the process container. This method returns
	// a channel of errors. Each error from an initializer or a process will
	// be sent on this channel (nil errors are ignored). This channel will
	// close once all processes have exited (or, alternatively, when the
	// shutdown timeout has elapsed).
	Run(config.Config) <-chan error

	// Shutdown will begin a graceful exit of all processes. This method
	// will block until the runner has exited (the channel from the Run
	// method has closed) or the given duration has elapsed. In the later
	// case a non-nil error is returned.
	Shutdown(time.Duration) error
}

Runner wraps a process container. Given a loaded configuration object, it can run the registered initializers and processes and wait for them to exit (cleanly or via shutdown request).

func NewRunner

func NewRunner(
	processes ProcessContainer,
	services service.ServiceContainer,
	health Health,
	runnerConfigs ...RunnerConfigFunc,
) Runner

NewRunner creates a process runner from the given process and service containers.

type RunnerConfigFunc

type RunnerConfigFunc func(*runner)

RunnerConfigFunc is a function used to configure an instance of a ProcessRunner.

func WithClock

func WithClock(clock glock.Clock) RunnerConfigFunc

WithClock sets the clock used by the runner.

func WithHealthCheckInterval added in v1.1.0

func WithHealthCheckInterval(interval time.Duration) RunnerConfigFunc

WithHealthCheckInterval sets the frequency between checks of process health during startup.

func WithLogger

func WithLogger(logger log.Logger) RunnerConfigFunc

WithLogger sets the logger used by the runner.

func WithShutdownTimeout

func WithShutdownTimeout(timeout time.Duration) RunnerConfigFunc

WithShutdownTimeout sets the maximum time it will wait for a process to exit during a graceful shutdown.

func WithStartTimeout

func WithStartTimeout(timeout time.Duration) RunnerConfigFunc

WithStartTimeout sets the time it will wait for a process to become healthy after startup.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL