Documentation ¶
Index ¶
Constants ¶
const ( EventQueueDepth = "event_queue_depth" DroppedQueueCount = "dropped_queue_count" WorkersCount = "workers_count" ConsumerDropUntilGauge = "consumer_drop_until" SlowConsumerCounter = "slow_consumer_cut_off_count" SlowConsumerDroppedMsgCounter = "slow_consumer_dropped_message_count" IncomingContentTypeCounter = "incoming_content_type_count" ConsumerDeliverUntilGauge = "consumer_deliver_until" DeliveryCounter = "delivery_count" DroppedPanic = "dropped_panic_count" DroppedNetworkErrCounter = "dropped_network_error_count" DroppedInvalidConfig = "dropped_invalid_config" DroppedExpiredCounter = "dropped_expired_count" )
Dispatcher metrics
Variables ¶
This section is empty.
Functions ¶
func ProvideMetrics ¶
ProvideMetrics returns the Measures relevant to dispatch package
Types ¶
type D ¶
type D interface { // Start sets up any initial sessions or connections needed in order to send events. Start(context.Context) error // Send delivers the message. Sending is done concurrently, so no error is returned. Send(*wrp.Message) // Update is called to make sure the dispatcher keeps up to date with the norn config. Update(norn model.Norn) }
D is dispatcher interface that abstracts away the exact delivery mechanism (ie HTTP or SQS).
type DispatcherSender ¶ added in v0.2.0
DispatcherSender contains config to construct a HTTPDispatcher.
type Filter ¶ added in v0.2.0
type Filter struct {
// contains filtered or unexported fields
}
Filter is used to implement the Filterer interface.
func NewFilter ¶ added in v0.2.0
func NewFilter(fs *FilterSender, dispatcher D, norn model.Norn, sender *http.Client, measures Measures) (*Filter, error)
NewFilter validates the config and creates a new Filter.
func (*Filter) Filter ¶ added in v0.2.0
Filter decides if an event should be sent by checking if the event's device ID matches the device ID of the norn. If they match, the event is queued to be dispatched. If not, the event is dropped. If the queue is full when queueing the event, the event is dropped, the queue is emptied and the norn consumer is cut off.
func (*Filter) Start ¶ added in v0.2.0
Start begins pulling events from the queue and calls sendEvents() for it to be delivered by its dispatcher.
type FilterSender ¶ added in v0.2.0
FilterSender contains config to construct a Filter.
type HTTPDispatcher ¶ added in v0.2.0
type HTTPDispatcher struct {
// contains filtered or unexported fields
}
HTTPDispatcher implements the dispatcher interface to send events through http.
func NewHTTPDispatcher ¶ added in v0.2.0
func NewHTTPDispatcher(ds *DispatcherSender, httpConfig model.HttpConfig, sender *http.Client, logger log.Logger, measures Measures) (*HTTPDispatcher, error)
NewHTTPDispatcher creates http dispatcher used to implement dispatcher interface.
func (*HTTPDispatcher) Send ¶ added in v0.2.0
func (h *HTTPDispatcher) Send(msg *wrp.Message)
Send uses the configured HTTP client to send a WRP message as a JSON. The request also includes a signature created from hashing the norn secret against the wrp message.
func (*HTTPDispatcher) Start ¶ added in v0.2.0
func (h *HTTPDispatcher) Start(context.Context) error
func (*HTTPDispatcher) Update ¶ added in v0.2.0
func (h *HTTPDispatcher) Update(norn model.Norn)
Update updates secret for http dispatcher for a norn.
type Measures ¶
type Measures struct { fx.In EventQueueDepthGauge metrics.Gauge `name:"event_queue_depth"` DroppedQueueCount metrics.Counter `name:"dropped_queue_count"` WorkersCount metrics.Gauge `name:"workers_count"` DropUntilGauge metrics.Gauge `name:"consumer_drop_until"` CutOffCounter metrics.Counter `name:"slow_consumer_cut_off_count"` DroppedCutoffCounter metrics.Counter `name:"slow_consumer_dropped_message_count"` ContentTypeCounter metrics.Counter `name:"incoming_content_type_count"` DroppedNetworkErrCounter metrics.Counter `name:"dropped_network_error_count"` DeliveryCounter metrics.Counter `name:"delivery_count"` DroppedInvalidConfig metrics.Counter `name:"dropped_invalid_config"` DroppedExpiredCounter metrics.Counter `name:"dropped_expired_count"` DroppedPanicCounter metrics.Counter `name:"dropped_panic_count"` }
Measures describes the defined metrics that will be used by dispatcher
func NewMeasures ¶
NewMeasures returns desired metrics
type SQSDispatcher ¶ added in v0.2.0
type SQSDispatcher struct {
// contains filtered or unexported fields
}
SQSDispatcher implements the dispatcher interface to send events to sqs.
func NewSqsDispatcher ¶ added in v0.2.0
func NewSqsDispatcher(ds *DispatcherSender, awsConfig model.AWSConfig, logger log.Logger, measures Measures) (*SQSDispatcher, error)
NewSqsDispatcher validates aws configs and creates a sqs dispatcher.
func (*SQSDispatcher) Send ¶ added in v0.2.0
func (s *SQSDispatcher) Send(msg *wrp.Message)
Send uses the configured sqs client to send a WRP message as a JSON.
func (*SQSDispatcher) Start ¶ added in v0.2.0
func (s *SQSDispatcher) Start(context.Context) error
Start creates a new aws session with the provided aws configs for event delivery to sqs.
func (*SQSDispatcher) Update ¶ added in v0.2.0
func (s *SQSDispatcher) Update(norn model.Norn)
Update creates a new aws session with updated credentials for a norn.