serving

package
v0.3.29 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 23, 2022 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BatchQueries

func BatchQueries(values <-chan *model.EventRequest, maxItems int, maxTimeout time.Duration) chan []*model.EventRequest

BatchQueries Reads from a channel of UpdateRequests and batches them up, waiting for either maxItems to be present in the batch or maxTimeout to have elapsed since the batch was started (whichever occurs first). Once the condition has been met, the batch is released and the next batch is started.

Types

type DefaultSequenceManager

type DefaultSequenceManager struct {
	// contains filtered or unexported fields
}

func NewStaticSequenceManager

func NewStaticSequenceManager(initialSequences map[int64]int64) *DefaultSequenceManager

NewStaticSequenceManager returns a SequenceManager that only updates if `Update` is called. This is mainly usefuly for test purposes

func NewUpdatingSequenceManager

func NewUpdatingSequenceManager(ctx context.Context, eventDb *eventdb.EventDb, pulsarClient pulsar.Client, updateTopic string) (*DefaultSequenceManager, error)

NewUpdatingSequenceManager returns a SequenceManager that is initialised from the eventDb and then receives updates from pulsar

func (*DefaultSequenceManager) Get

func (sm *DefaultSequenceManager) Get(jobsetId int64) (int64, bool)

Get Retrieves the latets sequence for the given jobset. The boolean returned will be true if an offset exists and false otherwise

func (*DefaultSequenceManager) Update

func (sm *DefaultSequenceManager) Update(newSequences map[int64]int64)

Update updates the sequences for the supplied jobsets. Any sequences in the update which are lower than the sequences we already store will be ignored.

type DefaultSubscriptionManager

type DefaultSubscriptionManager struct {
	// contains filtered or unexported fields
}

func NewSubscriptionManager

func NewSubscriptionManager(sequenceManager SequenceManager, db eventDbRO, maxBatchSize int, maxTimeout time.Duration, pollPeriod time.Duration, queryConcurrency int, maxFetchSize int, clock clock.Clock) *DefaultSubscriptionManager

NewSubscriptionManager returns a DefaultSubscriptionManager that can fetch events from postgres and manage subscription requests for new data

func (*DefaultSubscriptionManager) Subscribe

func (sm *DefaultSubscriptionManager) Subscribe(jobset int64, fromOffset int64) *model.EventSubscription

Subscribe returns an EventSubscription which consists of a stream of events along with a subscription id Callers should pass back the subscriptionId when they want to Unsubscribe.

func (*DefaultSubscriptionManager) Unsubscribe

func (sm *DefaultSubscriptionManager) Unsubscribe(subscriptionId int64)

Unsubscribe frees up resources associated with the stream

type EventApi

type EventApi struct {
	// contains filtered or unexported fields
}

EventApi is responsible for serveing User requests for event messages

func NewEventApi

func NewEventApi(jobsetMapper eventapi.JobsetMapper, subscriptionManager SubscriptionManager, sequenceManager SequenceManager) *EventApi

func (*EventApi) GetJobSetEvents

func (r *EventApi) GetJobSetEvents(request *api.JobSetRequest, stream api.Event_GetJobSetEventsServer) error

GetJobSetEvents Returns a stream of events from the events Db If request.Watch is set then the stream will only end when the user requests it, otherwise it will return all events present in the database when the request was made.

type SequenceManager

type SequenceManager interface {
	Get(jobsetId int64) (int64, bool)
	Update(newOffsets map[int64]int64)
}

SequenceManager is responsible for storing the latest available Sequence number for each jobset

type SubscriptionManager

type SubscriptionManager interface {
	Subscribe(jobset int64, fromOffset int64) *model.EventSubscription
	Unsubscribe(subscriptionId int64)
}

SubscriptionManager lets callers subscribe to channels of events in an efficient manner

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL