sinks

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2024 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const EventBridgeSizeLimit = 256 * 1024

Variables

This section is empty.

Functions

This section is empty.

Types

type DocumentDetail

type DocumentDetail struct {
	UUID         string                    `json:"uuid"`
	URI          string                    `json:"uri"`
	Type         string                    `json:"type"`
	Title        string                    `json:"title"`
	LinkTypes    []string                  `json:"link_types,omitempty"`
	Links        map[string][]DocumentLink `json:"rels"`
	MetaTypes    []string                  `json:"meta_types,omitempty"`
	Meta         map[string][]DocumentMeta `json:"meta"`
	ContentTypes []string                  `json:"content_types,omitempty"`
	ContentUUIDs []string                  `json:"content_uuids,omitempty"`
	ContentURIs  []string                  `json:"content_uris,omitempty"`
}

func DetailFromDocument

func DetailFromDocument(d newsdoc.Document) *DocumentDetail
type DocumentLink struct {
	UUID  string `json:"uuid,omitempty"`
	URI   string `json:"uri,omitempty"`
	Type  string `json:"type,omitempty"`
	Role  string `json:"role,omitempty"`
	Value string `json:"value,omitempty"`
}

type DocumentMeta

type DocumentMeta struct {
	Role  string          `json:"role,omitempty"`
	Value string          `json:"value,omitempty"`
	Data  newsdoc.DataMap `json:"data,omitempty"`
}

type EventBridge

type EventBridge struct {
	// contains filtered or unexported fields
}

func NewEventBridge

func NewEventBridge(
	client EventBridgeEventPutter, opts EventBridgeOptions,
) *EventBridge

func (*EventBridge) SendEvents

func (eb *EventBridge) SendEvents(
	ctx context.Context, evts []EventDetail,
	skipMetric IncrementSkipMetricFunc,
) (int, error)

SendEvents implements EventSink.

func (*EventBridge) SinkName

func (*EventBridge) SinkName() string

SinkName implements EventSink.

type EventBridgeEventPutter

type EventBridgeEventPutter interface {
	PutEvents(
		ctx context.Context, params *eventbridge.PutEventsInput,
		optFns ...func(*eventbridge.Options),
	) (*eventbridge.PutEventsOutput, error)
}

type EventBridgeOptions

type EventBridgeOptions struct {
	Logger       *slog.Logger
	EventBusName string
}

type EventDetail

type EventDetail struct {
	Event    repository.Event `json:"event"`
	Document *DocumentDetail  `json:"document"`
}

type EventForwarder

type EventForwarder struct {
	// contains filtered or unexported fields
}

func NewEventForwarder

func NewEventForwarder(opts EventForwarderOptions) (*EventForwarder, error)

func (*EventForwarder) Run

func (r *EventForwarder) Run(ctx context.Context)

func (*EventForwarder) Stop

func (r *EventForwarder) Stop()

type EventForwarderOptions

type EventForwarderOptions struct {
	Logger            *slog.Logger
	DB                *pgxpool.Pool
	Documents         repository.Documents
	MetricsRegisterer prometheus.Registerer
	Sink              EventSink
	StateStore        SinkStateStore
}

type EventSink

type EventSink interface {
	SinkName() string

	// SendEvents to the sink. Returns the number events that were sent and
	// an error if the send failed. Due to batching behaviours some events
	// might have been sent before the processing fails, so the number of
	// sent events should not be ignored when an error is returned.
	SendEvents(
		ctx context.Context, evts []EventDetail,
		skipMetric IncrementSkipMetricFunc,
	) (int, error)
}

type IncrementSkipMetricFunc

type IncrementSkipMetricFunc func(eventType string, reason string)

type SinkStateStore

type SinkStateStore interface {
	GetSinkPosition(ctx context.Context, name string) (int64, error)
	SetSinkPosition(ctx context.Context, name string, pos int64) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL