observation

package
v1.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2023 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Marshal

func Marshal(insertedEvent InsertedEvent) ([]byte, error)

Marshal converts the given ObservationsInsertedEvent to a []byte.

Types

type DimensionHeaderCache

type DimensionHeaderCache interface {
	GetOrder(ctx context.Context, instanceID string) ([]string, error)
}

DimensionHeaderCache provides the an array of dimension names to define the order of dimensions (v4 format)

type DimensionIDCache

type DimensionIDCache interface {
	GetNodeIDs(ctx context.Context, instanceID string) (map[string]string, error)
}

DimensionIDCache provides database ID's of dimensions when inserting observations.

type InsertedEvent

type InsertedEvent struct {
	ObservationsInserted int32  `avro:"observations_inserted"`
	InstanceID           string `avro:"instance_id"`
}

InsertedEvent is the data that is output for each observation batch inserted.

type Mapper

type Mapper struct {
	// contains filtered or unexported fields
}

Mapper interprets a CSV line and returns an observation instance.

func NewMapper

func NewMapper(dimensionOrderCache DimensionHeaderCache) *Mapper

NewMapper returns a new Mapper instance

func (*Mapper) Map

func (mapper *Mapper) Map(ctx context.Context, row string, rowIndex int64, instanceID string) (*models.Observation, error)

Map the given CSV row to an observation instance.

type MessageProducer

type MessageProducer interface {
	Channels() *kafka.ProducerChannels
}

MessageProducer dependency that writes messages

type MessageWriter

type MessageWriter struct {
	MessageProducer MessageProducer
}

MessageWriter writes observations as messages

func NewResultWriter

func NewResultWriter(messageProducer MessageProducer) *MessageWriter

NewResultWriter returns a new observation message writer.

func (MessageWriter) Write

func (messageWriter MessageWriter) Write(ctx context.Context, results []*Result)

Write results as messages.

type Result

type Result struct {
	InstanceID           string
	ObservationsInserted int32
}

Result holds the result for an individual instance

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store provides persistence for observations.

func NewStore

func NewStore(dimensionIDCache DimensionIDCache, db graph.Observation, errorReporter reporter.ErrorReporter, getGraphDimensionID bool) *Store

NewStore returns a new Observation store instance that uses the given dimension ID cache and db connection.

func (*Store) SaveAll

func (store *Store) SaveAll(ctx context.Context, observations []*models.Observation) ([]*Result, error)

SaveAll the observations against the provided dimension options and instanceID.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL