dispatch

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 11, 2022 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EventQueueDepth               = "event_queue_depth"
	DroppedQueueCount             = "dropped_queue_count"
	WorkersCount                  = "workers_count"
	ConsumerDropUntilGauge        = "consumer_drop_until"
	SlowConsumerCounter           = "slow_consumer_cut_off_count"
	SlowConsumerDroppedMsgCounter = "slow_consumer_dropped_message_count"
	IncomingContentTypeCounter    = "incoming_content_type_count"
	ConsumerDeliverUntilGauge     = "consumer_deliver_until"
	DeliveryCounter               = "delivery_count"
	DroppedPanic                  = "dropped_panic_count"
	DroppedNetworkErrCounter      = "dropped_network_error_count"
	DroppedInvalidConfig          = "dropped_invalid_config"
	DroppedExpiredCounter         = "dropped_expired_count"
)

Dispatcher metrics

Variables

This section is empty.

Functions

func ProvideMetrics

func ProvideMetrics() fx.Option

ProvideMetrics returns the Measures relevant to dispatch package

Types

type D

type D interface {
	// Start sets up any initial sessions or connections needed in order to send events.
	Start(context.Context) error

	// Send delivers the message.  Sending is done concurrently, so no error is returned.
	Send(*wrp.Message)

	// Update is called to make sure the dispatcher keeps up to date with the norn config.
	Update(norn model.Norn)
}

D is dispatcher interface that abstracts away the exact delivery mechanism (ie HTTP or SQS).

type DispatcherSender added in v0.2.0

type DispatcherSender struct {
	DeliveryInterval time.Duration
	DeliveryRetries  int
}

DispatcherSender contains config to construct a HTTPDispatcher.

type Filter added in v0.2.0

type Filter struct {
	// contains filtered or unexported fields
}

Filter is used to implement the Filterer interface.

func NewFilter added in v0.2.0

func NewFilter(fs *FilterSender, dispatcher D, norn model.Norn, sender *http.Client, measures Measures) (*Filter, error)

NewFilter validates the config and creates a new Filter.

func (*Filter) Filter added in v0.2.0

func (f *Filter) Filter(deviceID string, event *wrp.Message)

Filter decides if an event should be sent by checking if the event's device ID matches the device ID of the norn. If they match, the event is queued to be dispatched. If not, the event is dropped. If the queue is full when queueing the event, the event is dropped, the queue is emptied and the norn consumer is cut off.

func (*Filter) Start added in v0.2.0

func (f *Filter) Start(context.Context) error

Start begins pulling events from the queue and calls sendEvents() for it to be delivered by its dispatcher.

func (*Filter) Stop added in v0.2.0

func (f *Filter) Stop(context.Context) error

Stop closes the queue and resets its metric.

func (*Filter) Update added in v0.2.0

func (f *Filter) Update(norn model.Norn)

Update will update the time a norn expires.

type FilterSender added in v0.2.0

type FilterSender struct {
	QueueSize  int
	NumWorkers int
}

FilterSender contains config to construct a Filter.

type HTTPDispatcher added in v0.2.0

type HTTPDispatcher struct {
	// contains filtered or unexported fields
}

HTTPDispatcher implements the dispatcher interface to send events through http.

func NewHTTPDispatcher added in v0.2.0

func NewHTTPDispatcher(ds *DispatcherSender, httpConfig model.HttpConfig, sender *http.Client, logger log.Logger, measures Measures) (*HTTPDispatcher, error)

NewHTTPDispatcher creates http dispatcher used to implement dispatcher interface.

func (*HTTPDispatcher) Send added in v0.2.0

func (h *HTTPDispatcher) Send(msg *wrp.Message)

Send uses the configured HTTP client to send a WRP message as a JSON. The request also includes a signature created from hashing the norn secret against the wrp message.

func (*HTTPDispatcher) Start added in v0.2.0

func (h *HTTPDispatcher) Start(context.Context) error

func (*HTTPDispatcher) Update added in v0.2.0

func (h *HTTPDispatcher) Update(norn model.Norn)

Update updates secret for http dispatcher for a norn.

type Measures

type Measures struct {
	fx.In
	EventQueueDepthGauge     metrics.Gauge   `name:"event_queue_depth"`
	DroppedQueueCount        metrics.Counter `name:"dropped_queue_count"`
	WorkersCount             metrics.Gauge   `name:"workers_count"`
	DropUntilGauge           metrics.Gauge   `name:"consumer_drop_until"`
	CutOffCounter            metrics.Counter `name:"slow_consumer_cut_off_count"`
	DroppedCutoffCounter     metrics.Counter `name:"slow_consumer_dropped_message_count"`
	ContentTypeCounter       metrics.Counter `name:"incoming_content_type_count"`
	DroppedNetworkErrCounter metrics.Counter `name:"dropped_network_error_count"`
	DeliveryCounter          metrics.Counter `name:"delivery_count"`
	DroppedInvalidConfig     metrics.Counter `name:"dropped_invalid_config"`
	DroppedExpiredCounter    metrics.Counter `name:"dropped_expired_count"`
	DroppedPanicCounter      metrics.Counter `name:"dropped_panic_count"`
}

Measures describes the defined metrics that will be used by dispatcher

func NewMeasures

func NewMeasures(p provider.Provider) *Measures

NewMeasures returns desired metrics

type SQSDispatcher added in v0.2.0

type SQSDispatcher struct {
	// contains filtered or unexported fields
}

SQSDispatcher implements the dispatcher interface to send events to sqs.

func NewSqsDispatcher added in v0.2.0

func NewSqsDispatcher(ds *DispatcherSender, awsConfig model.AWSConfig, logger log.Logger, measures Measures) (*SQSDispatcher, error)

NewSqsDispatcher validates aws configs and creates a sqs dispatcher.

func (*SQSDispatcher) Send added in v0.2.0

func (s *SQSDispatcher) Send(msg *wrp.Message)

Send uses the configured sqs client to send a WRP message as a JSON.

func (*SQSDispatcher) Start added in v0.2.0

func (s *SQSDispatcher) Start(context.Context) error

Start creates a new aws session with the provided aws configs for event delivery to sqs.

func (*SQSDispatcher) Update added in v0.2.0

func (s *SQSDispatcher) Update(norn model.Norn)

Update creates a new aws session with updated credentials for a norn.

type SenderConfig

type SenderConfig struct {
	MaxWorkers            int
	ResponseHeaderTimeout time.Duration
	IdleConnTimeout       time.Duration
	DeliveryInterval      time.Duration
	DeliveryRetries       int
	FilterQueueSize       int
}

SenderConfig contains config to construct HTTPDispatcher, Transport, and Filter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL