verity

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2024 License: MIT Imports: 44 Imported by: 0

README

Verity

Verity is an event-sourced Dogma engine.

Documentation Latest Version Build Status Code Coverage

This project will be superceded by Veracity in the future.

Its tests depend on the Docker stack provided by dogmatiq/sqltest.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultPersistenceProvider is the default persistence provider.
	//
	// It is overridden by the WithPersistence() option.
	DefaultPersistenceProvider persistence.Provider = &boltpersistence.FileProvider{
		Path: "/var/run/verity.boltdb",
	}

	// DefaultMessageTimeout is the default duration the engine allows for the
	// handling a single message by a Dogma handler.
	//
	// It is overridden by the WithMessageTimeout() option.
	DefaultMessageTimeout = 5 * time.Second

	// DefaultMessageBackoff is the default backoff strategy for message
	// handling retries.
	//
	// It is overridden by the WithMessageBackoff() option.
	DefaultMessageBackoff backoff.Strategy = backoff.WithTransforms(
		backoff.Exponential(100*time.Millisecond),
		linger.FullJitter,
		linger.Limiter(0, 1*time.Hour),
	)

	// DefaultConcurrencyLimit is the default number of messages to handle
	// (and projections to compact) concurrently.
	//
	// It is overridden by the WithConcurrencyLimit() option.
	DefaultConcurrencyLimit = uint(runtime.GOMAXPROCS(0) * 2)

	// DefaultProjectionCompactInterval is the default interval at which the
	// engine compacts projections.
	//
	// It is overridden by the WithProjectionCompactInterval() option.
	DefaultProjectionCompactInterval = 24 * time.Hour

	// DefaultProjectionCompactTimeout is the default timeout to use when
	// compacting a projection.
	//
	// It is overridden by the WithProjectionCompactTimeout() option.
	DefaultProjectionCompactTimeout = 5 * time.Minute

	// DefaultLogger is the default target for log messages produced by the
	// engine.
	//
	// It is overridden by the WithLogger() option.
	DefaultLogger = logging.DefaultLogger
)
View Source
var (
	// DefaultListenAddress is the default TCP address for the gRPC listener.
	//
	// It is overridden by the WithListenAddress() option.
	DefaultListenAddress = net.JoinHostPort("", discoverkit.DefaultGRPCPort)

	// DefaultDialer is the default dialer used to connect to other engine's
	// gRPC servers.
	//
	// It is overridden by the WithDialer() option.
	DefaultDialer discoverkit.Dialer = grpc.DialContext

	// DefaultDialerBackoff is the default backoff strategy for gRPC dialer
	// retries.
	//
	// It is overridden by the WithDialerBackoff() option.
	DefaultDialerBackoff backoff.Strategy = backoff.WithTransforms(
		backoff.Exponential(100*time.Millisecond),
		linger.FullJitter,
		linger.Limiter(0, 30*time.Second),
	)

	// DefaultDiscoverer is the default discoverer used to find other engine
	// instances on the network.
	//
	// It is overridden by the WithDiscoverer() option.
	DefaultDiscoverer discoverkit.TargetDiscoverer = discoverkit.StaticTargetDiscoverer{}
)

Functions

func NewDefaultMarshaler

func NewDefaultMarshaler(configs []configkit.RichApplication) marshalkit.Marshaler

NewDefaultMarshaler returns the default marshaler to use for the given applications.

It is used if the WithMarshaler() option is omitted.

func Run

func Run(ctx context.Context, app dogma.Application, options ...EngineOption) error

Run creates and runs a new engine that hosts the given application.

It runs until ctx is canceled or an error occurs.

Types

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine hosts a Dogma application.

func New

func New(app dogma.Application, options ...EngineOption) *Engine

New returns a new engine that hosts the given application.

app is the Dogma application to host on the engine. It may be nil, in which case at least one WithApplication() option must be specified.

func (*Engine) ExecuteCommand

func (e *Engine) ExecuteCommand(
	ctx context.Context,
	m dogma.Message,
	options ...dogma.ExecuteCommandOption,
) error

ExecuteCommand enqueues a command for execution.

func (*Engine) Run

func (e *Engine) Run(ctx context.Context) error

Run hosts the given application until ctx is canceled or an error occurs.

type EngineOption

type EngineOption func(*engineOptions)

EngineOption configures the behavior of an engine.

func WithApplication

func WithApplication(app dogma.Application) EngineOption

WithApplication returns an engine option that hosts an additional application on the engine.

There must always be at least one application specified either by using WithApplication(), the app parameter to New(), or both.

func WithConcurrencyLimit

func WithConcurrencyLimit(n uint) EngineOption

WithConcurrencyLimit returns an engine option that limits the number of messages that will be handled (and projections that will be compacted) at the same time.

If this option is omitted or n non-positive DefaultConcurrencyLimit is used.

func WithLogger

func WithLogger(l any) EngineOption

WithLogger returns an engine option that sets the target for log messages produced by the engine.

l must be a logging.Logger or a *zap.Logger.

If this option is omitted or l is nil DefaultLogger is used.

func WithMarshaler

func WithMarshaler(m marshalkit.Marshaler) EngineOption

WithMarshaler returns a engine option that sets the marshaler used to marshal and unmarshal messages and other types.

If this option is omitted or m is nil, NewDefaultMarshaler() is called to obtain the default marshaler.

func WithMessageBackoff

func WithMessageBackoff(s backoff.Strategy) EngineOption

WithMessageBackoff returns an engine option that sets the backoff strategy used to delay message handling retries.

If this option is omitted or s is nil DefaultMessageBackoff is used.

func WithMessageTimeout

func WithMessageTimeout(d time.Duration) EngineOption

WithMessageTimeout returns an engine option that sets the duration the engine allows for the handling of a single message by a Dogma handler.

If this option is omitted or d is zero DefaultMessageTimeout is used.

Individual handler implementations within the application may provide timeout "hints", which the engine may by use instead of, or in conjunction with the duration specified by this option.

func WithNetworking

func WithNetworking(options ...NetworkOption) EngineOption

WithNetworking returns an engine option that enables network communication between engine instances running different Dogma applications.

Engine instances communicate using gRPC APIs.

func WithPersistence

func WithPersistence(p persistence.Provider) EngineOption

WithPersistence returns an engine option that sets the persistence provider used to store and retrieve application state.

If this option is omitted or p is nil, DefaultPersistenceProvider is used.

func WithProjectionCompactInterval

func WithProjectionCompactInterval(d time.Duration) EngineOption

WithProjectionCompactInterval returns an engine option that sets the interval at which projections are compacted.

If this option is omitted or d is zero DefaultProjectionCompactInterval is used.

func WithProjectionCompactTimeout

func WithProjectionCompactTimeout(d time.Duration) EngineOption

WithProjectionCompactTimeout returns an engine option that sets the duration the engine allows for a single projection to be compacted.

If this option is omitted or d is zero DefaultProjectionCompactTimeout is used.

type NetworkOption

type NetworkOption func(*networkOptions)

NetworkOption configures the networking-related behavior of an engine.

func WithDialer

func WithDialer(d discoverkit.Dialer) NetworkOption

WithDialer returns a network option that sets the dialer used to connect to other engine's gRPC servers.

If this option is omitted or d is nil, DefaultDialer is used.

func WithDialerBackoff

func WithDialerBackoff(s backoff.Strategy) NetworkOption

WithDialerBackoff returns a network option that sets the backoff strategy used to delay gRPC dialing retries.

If this option is omitted or s is nil, DefaultDialerBackoff is used.

func WithDiscoverer

func WithDiscoverer(d discoverkit.TargetDiscoverer) NetworkOption

WithDiscoverer returns a network option that sets the discoverer used to find other engine instances on the network.

Currently this option MUST be specified.

TODO: https://github.com/dogmatiq/discoverkit/issues/2 Use Bonjour as the default discovery mechanism.

func WithListenAddress

func WithListenAddress(addr string) NetworkOption

WithListenAddress returns a network option that sets the TCP address for the engine's gRPC listener.

If this option is omitted or addr is empty, DefaultListenAddress is used.

func WithServerOptions

func WithServerOptions(options ...grpc.ServerOption) NetworkOption

WithServerOptions returns a network option that adds gRPC server options.

Directories

Path Synopsis
Package eventstream provides abstractions for consuming ordered streams of event messages.
Package eventstream provides abstractions for consuming ordered streams of event messages.
internal/streamtest
Package streamtest contains a common test suite for eventstream.Stream implementations.
Package streamtest contains a common test suite for eventstream.Stream implementations.
memorystream
Package memorystream provides an in-memory implementation of eventstream.Stream.
Package memorystream provides an in-memory implementation of eventstream.Stream.
networkstream
Package networkstream presents a persistence.EventRepository as an eventstream.Stream via the dogma.messaging.v1 EventStream service.
Package networkstream presents a persistence.EventRepository as an eventstream.Stream via the dogma.messaging.v1 EventStream service.
persistedstream
Package persistedstream provides an implementation of eventstream.Stream that streams persisted events from an event repository.
Package persistedstream provides an implementation of eventstream.Stream that streams persisted events from an event repository.
Package fixtures is a set of test fixtures and mocks of the various types.
Package fixtures is a set of test fixtures and mocks of the various types.
aggregate
Package aggregate provides an adaptor that exposes Dogma aggregate message handlers as Verity handlers.
Package aggregate provides an adaptor that exposes Dogma aggregate message handlers as Verity handlers.
cache
Package cache provides an in-memory cache for aggregate and process instances.
Package cache provides an in-memory cache for aggregate and process instances.
integration
Package integration provides an adaptor that exposes Dogma integration message handlers as Verity handlers.
Package integration provides an adaptor that exposes Dogma integration message handlers as Verity handlers.
process
Package process provides an adaptor that exposes Dogma process message handlers as Verity handlers.
Package process provides an adaptor that exposes Dogma process message handlers as Verity handlers.
projection
Package projection provides an adaptor that exposes Dogma projection message handlers as Verity event stream handlers.
Package projection provides an adaptor that exposes Dogma projection message handlers as Verity event stream handlers.
projection/resource
Package resource contains utilities for performing low-level manipulations of projection resource versions.
Package resource contains utilities for performing low-level manipulations of projection resource versions.
internal
mlog
Package mlog contains utilities for logging about messages.
Package mlog contains utilities for logging about messages.
x/bboltx
Package bboltx contains utilities for working with BoltDB databases.
Package bboltx contains utilities for working with BoltDB databases.
x/grpcx
Package grpcx contains utilities for writing gRPC clients and servers.
Package grpcx contains utilities for writing gRPC clients and servers.
x/sqlx
Package sqlx contains utilities for working with SQL databases.
Package sqlx contains utilities for working with SQL databases.
Package parcel provides a high-level container for messages and their envelopes.
Package parcel provides a high-level container for messages and their envelopes.
Package persistence provides abstractions for data persistence.
Package persistence provides abstractions for data persistence.
boltpersistence
Package boltpersistence is a BoltDB (bbolt) persistence provider.
Package boltpersistence is a BoltDB (bbolt) persistence provider.
internal/providertest
Package providertest contains a common test suite for persistence.Provider implementations.
Package providertest contains a common test suite for persistence.Provider implementations.
memorypersistence
Package memorypersistence is an in-memory persistence provider.
Package memorypersistence is an in-memory persistence provider.
sqlpersistence
Package sqlpersistence is an SQL-based persistence provider with drivers for several popular SQL database systems.
Package sqlpersistence is an SQL-based persistence provider with drivers for several popular SQL database systems.
sqlpersistence/mysql
Package mysql is a MySQL driver for the SQL persistence provider.
Package mysql is a MySQL driver for the SQL persistence provider.
sqlpersistence/postgres
Package postgres is a PostgreSQL driver for the SQL persistence provider.
Package postgres is a PostgreSQL driver for the SQL persistence provider.
sqlpersistence/sqlite
Package sqlite is an SQlite v3 driver for the SQL persistence provider.
Package sqlite is an SQlite v3 driver for the SQL persistence provider.
Package queue is an in-memory representation of the message queue.
Package queue is an in-memory representation of the message queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL