store

package
v0.0.0-...-5107381 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2022 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package store encompasses access handling for the stream store

Index

Constants

This section is empty.

Variables

View Source
var ErrRequestTimedOut = errors.New("request timed out")

ErrRequestTimedOut is returned if the store is taking too long to return from query

Functions

This section is empty.

Types

type Option

type Option func(p *providerConfig)

Option defines an optional configuration parameter to the constructor of the Provider

func WithEventBufferSize

func WithEventBufferSize(size int) Option

WithEventBufferSize sets the size of the outgoing event buffer. There shouldn't be any real reason to change this unless the event consumer is very slow at consuming updates.

The default value is 10000.

func WithQueryTimeout

func WithQueryTimeout(t time.Duration) Option

WithQueryTimeout sets the timeout for read accesses to the store provider. Since all store acceses happen on the same goroutine, it is possible for misbehaving updates to block reads. The query methods on the provider will timeout after this duration if this scenario happens.

The default value is 5 seconds.

func WithRequestBufferSize

func WithRequestBufferSize(size int) Option

WithRequestBufferSize sets the size of the internal request queue. be any real reason to change this unless the the core API is getting hammered with requests.

The default value is 10.

func WithUpdateBufferSize

func WithUpdateBufferSize(size int) Option

WithUpdateBufferSize sets the size of the store provider channel that is responsible for receiving updates. There shouldn't be any real reason to change this unless the provider is very slow at consuming updates.

The default value is 10000.

type Provider

type Provider struct {
	// contains filtered or unexported fields
}

Provider provides access to the store. It runs as a long running service that updates the store in response to update events. All updates and accesses are handled in an evented loop to serialize changes to the store for thread safety. Provider also emits events for updates made to the store.

func NewProvider

func NewProvider(
	logger *zap.Logger,
	opts ...Option,
) *Provider

NewProvider creates a new store provider. It will also initialize the store. Options can optionally be provided like follows:

provider = store.NewProvider(
	logger,
	store.WithQueryTimeout(10*time.Millisecond),
	store.WithUpdateBufferSize(10),
	store.WithEventBufferSize(10),
	store.WithRequestBufferSize(10),
)

func (*Provider) Entity

func (p *Provider) Entity(streamID int, entityID uint64) (*models.Entity, error)

Entity returns a specific entity in a specific from the store, queried by streamID and entityID. It returns an error if the stream ID is not found or if the entityID is not found in the stream. This query will return an error if the request exceeds the timeout duration.

func (*Provider) EntityEventSource

func (p *Provider) EntityEventSource() models.EntityEventSource

EntityEventSource returns an event source that allows consumers to subscribe to entity events

func (*Provider) Serve

func (p *Provider) Serve()

Serve runs the main loop for the provider. It runs inside a goroutine as a service and is responsible for exclusively handling all reads and updates, including serving read requests.

func (*Provider) Stop

func (p *Provider) Stop()

Stop will shutdown this service and wait on it to stop before returning

func (*Provider) Stream

func (p *Provider) Stream(streamID int) (*models.Stream, error)

Stream returns a specific stream from the store, queried by streamID. This query will return an error if the request exceeds the timeout duration.

func (*Provider) StreamEventSource

func (p *Provider) StreamEventSource() models.StreamEventSource

StreamEventSource returns an event source that allows consumers to subscribe to stream events

func (*Provider) Streams

func (p *Provider) Streams() ([]models.Stream, error)

Streams returns all the streams from the internal store. This query will return an error if the request exceeds the timeout duration.

func (*Provider) UpdatesChan

func (p *Provider) UpdatesChan() chan<- Update

UpdatesChan returns a channel on which other services can send store updates

type Streams

type Streams struct {
	Map      map[int]*models.Stream
	KeyOrder []int
}

Streams defines the structure of the data store in the store provider and is consumed by updates for modification

type Update

type Update interface {
	ModifyStore(streams *Streams) ([]models.StreamEvent, []models.EntityEvent, error)
}

Update defines the interface for making modifications to the streams store and emitting the resulting stream and entity events. ModifyStore is expected to run in a thread safe environment so it doesn't compete with other updates running at the same time. It is assumed that the resulting stream events are applied first and in order. Then the resulting entity events are applied in order.

Directories

Path Synopsis
Package update describes the updates to the data store that are generated in response to network data
Package update describes the updates to the data store that are generated in response to network data

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL