backend

package
v0.0.0-...-b9360c4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2023 License: BSD-3-Clause Imports: 39 Imported by: 0

Documentation

Index

Constants

View Source
const (
	InitialBatchSize = 50000
	MaximumBatchSize = 200000
)
View Source
const (
	MaximumDataRecordLength = 1024 * 10
)
View Source
const (
	MaximumRetriesBeforeTrash = 3
)
View Source
const (
	SecondsBetweenProgressUpdates = 1
)

Variables

View Source
var (
	ErrEmptyRefresh = errors.New("empty refresh")
)

Functions

func CreateMap

func CreateMap(ctx context.Context, services *BackgroundServices) gue.WorkMap

func Logger

func Logger(ctx context.Context) *zap.Logger

func NewMuxRecordHandler

func NewMuxRecordHandler(handlers []RecordHandler) *muxRecordHandler

func NewNoopRecordHandler

func NewNoopRecordHandler() *noopRecordHandler

func NotifyMentions

func NotifyMentions(mentions []*Mention) []*data.Notification

func ParseStationIDs

func ParseStationIDs(raw *string) []int32

func Register

func Register(ctx context.Context, services *BackgroundServices, work map[string]gue.WorkFunc, message interface{}, handler OurTransportMessageFunc)

func SignS3URL

func SignS3URL(svc *s3.S3, url string) (signed string, err error)

func WalkerProgressNoop

func WalkerProgressNoop(ctx context.Context, progress float64) error

Types

type AggregateQueryParams

type AggregateQueryParams struct {
	Start              time.Time
	End                time.Time
	Stations           []int32
	Sensors            []ModuleAndSensor
	Complete           bool
	Interval           int32
	TimeGroupThreshold int32
	AggregateName      string
	ExpectedRecords    int64
	Summary            *AggregateSummary
}

func NewAggregateQueryParams

func NewAggregateQueryParams(qp *QueryParams, selectedAggregateName string, summary *AggregateSummary) (*AggregateQueryParams, error)

type AggregateSummary

type AggregateSummary struct {
	NumberRecords int64                 `db:"number_records" json:"numberRecords"`
	Start         *data.NumericWireTime `db:"start" json:"start"`
	End           *data.NumericWireTime `db:"end" json:"end"`
}

type AsyncFileWriter

type AsyncFileWriter struct {
	// contains filtered or unexported fields
}

func NewAsyncFileWriter

func NewAsyncFileWriter(readFunc AsyncReaderFunc, writeFunc AsyncWriterFunc) *AsyncFileWriter

func (*AsyncFileWriter) Start

func (w *AsyncFileWriter) Start(ctx context.Context) error

func (*AsyncFileWriter) Wait

func (w *AsyncFileWriter) Wait(ctx context.Context) error

type AsyncReaderFunc

type AsyncReaderFunc func(ctx context.Context, reader io.Reader) error

type AsyncWriterFunc

type AsyncWriterFunc func(ctx context.Context, writer io.Writer) error

type BackgroundServices

type BackgroundServices struct {
	// contains filtered or unexported fields
}

func NewBackgroundServices

func NewBackgroundServices(database *sqlxcache.DB, dbpool *pgxpool.Pool, metrics *logging.Metrics, fileArchives *FileArchives, que *gue.Client, timeScaleConfig *storage.TimeScaleDBConfig, locations *data.DescribeLocations) *BackgroundServices

type CanExport

type CanExport interface {
	Prepare(ctx context.Context, urls []string) error
	Export(ctx context.Context, urls []string) error
}

type CsvExporter

type CsvExporter struct {
	// contains filtered or unexported fields
}

func NewCsvExporter

func NewCsvExporter(files files.FileArchive, metrics *logging.Metrics, writer io.Writer, progress OnWalkProgress) (self *CsvExporter)

func (*CsvExporter) Export

func (e *CsvExporter) Export(ctx context.Context, urls []string) error

func (*CsvExporter) OnData

func (e *CsvExporter) OnData(ctx context.Context, rawRecord *pb.DataRecord, rawMetaUnused *pb.DataRecord, bytes []byte) error

func (*CsvExporter) OnDone

func (e *CsvExporter) OnDone(ctx context.Context) (err error)

func (*CsvExporter) OnMeta

func (e *CsvExporter) OnMeta(ctx context.Context, recordNumber int64, rawRecord *pb.DataRecord, bytes []byte) error

func (*CsvExporter) OnSignedMeta

func (e *CsvExporter) OnSignedMeta(ctx context.Context, signedRecord *pb.SignedRecord, rawRecord *pb.DataRecord, bytes []byte) error

func (*CsvExporter) Prepare

func (e *CsvExporter) Prepare(ctx context.Context, urls []string) error

type DataQuerier

type DataQuerier struct {
	// contains filtered or unexported fields
}

func NewDataQuerier

func NewDataQuerier(db *sqlxcache.DB) *DataQuerier

func (*DataQuerier) GetIDs

func (*DataQuerier) GetStationIDs

func (dq *DataQuerier) GetStationIDs(ctx context.Context, stationIDs []int32) (*SensorDatabaseIDs, error)

func (*DataQuerier) QueryAggregate

func (dq *DataQuerier) QueryAggregate(ctx context.Context, aqp *AggregateQueryParams) (rows *sqlx.Rows, err error)

func (*DataQuerier) QueryMeta

func (dq *DataQuerier) QueryMeta(ctx context.Context, qp *QueryParams) (qm *QueryMeta, err error)

func (*DataQuerier) QueryOuterValues

func (dq *DataQuerier) QueryOuterValues(ctx context.Context, aqp *AggregateQueryParams) (rr []*DataRow, err error)

func (*DataQuerier) SelectAggregate

func (dq *DataQuerier) SelectAggregate(ctx context.Context, qp *QueryParams) (summaries map[string]*AggregateSummary, name string, err error)

type DataRow

type DataRow struct {
	Time      data.NumericWireTime `db:"time" json:"time"`
	StationID *int32               `db:"station_id" json:"stationId,omitempty"`
	SensorID  *int64               `db:"sensor_id" json:"sensorId,omitempty"`
	ModuleID  *string              `db:"module_id" json:"moduleId,omitempty"`
	Location  *data.Location       `db:"location" json:"location,omitempty"`
	Value     *float64             `db:"value" json:"value,omitempty"`

	// Deprecated
	Id        *int64 `db:"id" json:"-"`
	TimeGroup *int32 `db:"time_group" json:"-"`

	// TSDB
	BucketSamples *int32     `json:"-",omitempty`
	DataStart     *time.Time `json:"-",omitempty`
	DataEnd       *time.Time `json:"-",omitempty`
	AverageValue  *float64   `json:"avg,omitempty"`
	MinimumValue  *float64   `json:"min,omitempty"`
	MaximumValue  *float64   `json:"max,omitempty"`
	LastValue     *float64   `json:"last,omitempty"`
}

func (*DataRow) CoerceNaNs

func (row *DataRow) CoerceNaNs()

type DescribeStationLocationHandler

type DescribeStationLocationHandler struct {
	// contains filtered or unexported fields
}

func NewDescribeStationLocationHandler

func NewDescribeStationLocationHandler(db *sqlxcache.DB, metrics *logging.Metrics, publisher jobs.MessagePublisher, locations *data.DescribeLocations) *DescribeStationLocationHandler

func (*DescribeStationLocationHandler) Handle

type DirtyRange

type DirtyRange struct {
	ModifiedTime *time.Time `json:"modified"`
	DataStart    *time.Time `json:"data_start"`
	DataEnd      *time.Time `json:"data_end"`
}

type ExportDataHandler

type ExportDataHandler struct {
	// contains filtered or unexported fields
}

func NewExportDataHandler

func NewExportDataHandler(db *sqlxcache.DB, files files.FileArchive, metrics *logging.Metrics) *ExportDataHandler

func (*ExportDataHandler) Handle

type FileArchives

type FileArchives struct {
	Ingestion files.FileArchive
	Media     files.FileArchive
	Exported  files.FileArchive
}

type FkbHandler

type FkbHandler interface {
	OnSignedMeta(ctx context.Context, signedRecord *pb.SignedRecord, rawRecord *pb.DataRecord, bytes []byte) error
	OnMeta(ctx context.Context, recordNumber int64, rawMeta *pb.DataRecord, bytes []byte) error
	OnData(ctx context.Context, rawData *pb.DataRecord, rawMeta *pb.DataRecord, bytes []byte) error
	OnDone(ctx context.Context) error
}

type FkbWalker

type FkbWalker struct {
	// contains filtered or unexported fields
}

func NewFkbWalker

func NewFkbWalker(files files.FileArchive, metrics *logging.Metrics, handler FkbHandler, progress OnWalkProgress, verbose bool) (ra *FkbWalker)

func (*FkbWalker) WalkUrl

func (ra *FkbWalker) WalkUrl(ctx context.Context, url string) (info *WalkRecordsInfo, err error)

type HandlerCollectionHandler

type HandlerCollectionHandler struct {
	// contains filtered or unexported fields
}

func NewHandlerCollectionHandler

func NewHandlerCollectionHandler(handlers []RecordHandler) *HandlerCollectionHandler

func (*HandlerCollectionHandler) OnData

func (v *HandlerCollectionHandler) OnData(ctx context.Context, provision *data.Provision, rawData *pb.DataRecord, rawMeta *pb.DataRecord, db *data.DataRecord, meta *data.MetaRecord) error

func (*HandlerCollectionHandler) OnDone

func (*HandlerCollectionHandler) OnMeta

func (v *HandlerCollectionHandler) OnMeta(ctx context.Context, provision *data.Provision, rawMeta *pb.DataRecord, meta *data.MetaRecord) error

type IngestStationHandler

type IngestStationHandler struct {
	// contains filtered or unexported fields
}

func NewIngestStationHandler

func NewIngestStationHandler(db *sqlxcache.DB, dbpool *pgxpool.Pool, files files.FileArchive, metrics *logging.Metrics, publisher jobs.MessagePublisher, tsConfig *storage.TimeScaleDBConfig) *IngestStationHandler

func (*IngestStationHandler) IngestionCompleted

func (*IngestStationHandler) IngestionFailed

func (*IngestStationHandler) Start

type IngestionReceivedHandler

type IngestionReceivedHandler struct {
	// contains filtered or unexported fields
}

func NewIngestionReceivedHandler

func NewIngestionReceivedHandler(db *sqlxcache.DB, dbpool *pgxpool.Pool, files files.FileArchive, metrics *logging.Metrics, publisher jobs.MessagePublisher, tsConfig *storage.TimeScaleDBConfig) *IngestionReceivedHandler

func (*IngestionReceivedHandler) BatchCompleted

func (*IngestionReceivedHandler) Start

type IngestionSaga

type IngestionSaga struct {
	QueuedID  int64           `json:"queued_id"`
	UserID    int32           `json:"user_id"`
	StationID *int32          `json:"station_id"`
	DataStart time.Time       `json:"data_start"`
	DataEnd   time.Time       `json:"data_end"`
	Required  map[string]bool `json:"required"`
	Completed map[string]bool `json:"completed"`
}

func (*IngestionSaga) IsCompleted

func (s *IngestionSaga) IsCompleted() bool

type JsonLinesExporter

type JsonLinesExporter struct {
	// contains filtered or unexported fields
}

func NewJsonLinesExporter

func NewJsonLinesExporter(files files.FileArchive, metrics *logging.Metrics, writer io.Writer, progress OnWalkProgress) (self *JsonLinesExporter)

func (*JsonLinesExporter) Export

func (e *JsonLinesExporter) Export(ctx context.Context, urls []string) error

func (*JsonLinesExporter) OnData

func (e *JsonLinesExporter) OnData(ctx context.Context, rawRecord *pb.DataRecord, rawMetaUnused *pb.DataRecord, bytes []byte) error

func (*JsonLinesExporter) OnDone

func (e *JsonLinesExporter) OnDone(ctx context.Context) (err error)

func (*JsonLinesExporter) OnMeta

func (e *JsonLinesExporter) OnMeta(ctx context.Context, recordNumber int64, rawRecord *pb.DataRecord, bytes []byte) error

func (*JsonLinesExporter) OnSignedMeta

func (e *JsonLinesExporter) OnSignedMeta(ctx context.Context, signedRecord *pb.SignedRecord, rawRecord *pb.DataRecord, bytes []byte) error

func (*JsonLinesExporter) Prepare

func (e *JsonLinesExporter) Prepare(ctx context.Context, urls []string) error

type Mention

type Mention struct {
	UserID   int32 `json:"user_id"`
	PostID   int64 `json:"post_id"`
	AuthorID int32 `json:"author_id"`
}

func DiscoverMentions

func DiscoverMentions(ctx context.Context, postID int64, body string, authorID int32) ([]*Mention, error)

type MessageCollection

type MessageCollection []proto.Message

func ReadLengthPrefixedCollection

func ReadLengthPrefixedCollection(ctx context.Context, maximumMessageLength uint64, r io.Reader, f UnmarshalFunc) (pbs MessageCollection, totalBytesRead int, err error)

ReadLengthPrefixedCollection reads a collection of protocol buffer messages from the supplied reader. Each message is presumed prefixed by a 32 bit varint which represents the size of the ensuing message. The UnmarshalFunc argument is a supplied callback used to convert the raw bytes read as a message to the desired message type. The protocol buffer message collection is returned, along with any error arising. For more detailed information on this approach, see the official protocol buffer documentation https://developers.google.com/protocol-buffers/docs/techniques#streaming.

type ModuleAndSensor

type ModuleAndSensor struct {
	ModuleID string `json:"module_id"`
	SensorID int64  `json:"sensor_id"`
}

type OnWalkProgress

type OnWalkProgress func(ctx context.Context, progress WalkProgress) error

type OurTransportMessageFunc

type OurTransportMessageFunc func(ctx context.Context, j *gue.Job, services *BackgroundServices, tm *jobs.TransportMessage, mc *jobs.MessageContext) error

type ProgressReader

type ProgressReader struct {
	io.ReadCloser
	// contains filtered or unexported fields
}

func (*ProgressReader) Close

func (r *ProgressReader) Close() error

func (*ProgressReader) Read

func (r *ProgressReader) Read(p []byte) (int, error)

type QueriedModuleID

type QueriedModuleID struct {
	StationID  int32  `db:"station_id"`
	ModuleID   int64  `db:"module_id"`
	HardwareID []byte `db:"hardware_id"`
}

type QueryMeta

type QueryMeta struct {
	Sensors  map[int64]*SensorMeta
	Stations map[int32]*StationMeta
}

type QueryParams

type QueryParams struct {
	Start           time.Time         `json:"start"`
	End             time.Time         `json:"end"`
	Stations        []int32           `json:"stations"`
	Sensors         []ModuleAndSensor `json:"sensors"`
	Resolution      int32             `json:"resolution"`
	Aggregate       string            `json:"aggregate"`
	Tail            int32             `json:"tail"`
	Complete        bool              `json:"complete"`
	Backend         string            `json:"backend"`
	Eternity        bool              `json:"eternity"`
	BeginningOfTime bool              `json:"beginning_of_time"`
	EndOfTime       bool              `json:"end_of_time"`
}

func ExportQueryParams

func ExportQueryParams(de *data.DataExport) (*QueryParams, error)

type RawQueryParams

type RawQueryParams struct {
	Start      *int64  `json:"start"`
	End        *int64  `json:"end"`
	Resolution *int32  `json:"resolution"`
	Stations   *string `json:"stations"`
	Sensors    *string `json:"sensors"`
	Modules    *string `json:"modules"`
	Aggregate  *string `json:"aggregate"`
	Tail       *int32  `json:"tail"`
	Complete   *bool   `json:"complete"`
	Backend    *string `json:"backend"`
}

func (*RawQueryParams) BuildQueryParams

func (raw *RawQueryParams) BuildQueryParams() (qp *QueryParams, err error)

type RecordAdder

type RecordAdder struct {
	// contains filtered or unexported fields
}

func NewRecordAdder

func NewRecordAdder(db *sqlxcache.DB, files files.FileArchive, metrics *logging.Metrics, handler RecordHandler, verbose bool, saveData bool) (ra *RecordAdder)

func (*RecordAdder) OnData

func (ra *RecordAdder) OnData(ctx context.Context, rawRecord *pb.DataRecord, rawMetaUnused *pb.DataRecord, bytes []byte) error

func (*RecordAdder) OnDone

func (ra *RecordAdder) OnDone(ctx context.Context) (err error)

func (*RecordAdder) OnMeta

func (ra *RecordAdder) OnMeta(ctx context.Context, recordNumber int64, rawRecord *pb.DataRecord, bytes []byte) error

func (*RecordAdder) OnSignedMeta

func (ra *RecordAdder) OnSignedMeta(ctx context.Context, signedRecord *pb.SignedRecord, rawRecord *pb.DataRecord, bytes []byte) error

func (*RecordAdder) WriteRecords

func (ra *RecordAdder) WriteRecords(ctx context.Context, ingestion *data.Ingestion) (info *WriteInfo, err error)

type RecordHandler

type RecordHandler interface {
	OnMeta(ctx context.Context, provision *data.Provision, rawMeta *pb.DataRecord, db *data.MetaRecord) error
	OnData(ctx context.Context, provision *data.Provision, rawData *pb.DataRecord, rawMeta *pb.DataRecord, db *data.DataRecord, meta *data.MetaRecord) error
	OnDone(ctx context.Context) error
}

func NewAllHandlers

func NewAllHandlers(db *sqlxcache.DB, tsConfig *storage.TimeScaleDBConfig, publisher jobs.MessagePublisher, completions *jobs.CompletionIDs) RecordHandler

type RecordWalker

type RecordWalker struct {
	// contains filtered or unexported fields
}

func NewRecordWalker

func NewRecordWalker(db *sqlxcache.DB) (rw *RecordWalker)

func (*RecordWalker) Info

func (rw *RecordWalker) Info(ctx context.Context) (*WalkInfo, error)

func (*RecordWalker) WalkStation

func (rw *RecordWalker) WalkStation(ctx context.Context, handler RecordHandler, progress WalkerProgressFunc, params *WalkParameters) error

type RefreshMaterializedViewsHandler

type RefreshMaterializedViewsHandler struct {
	// contains filtered or unexported fields
}

func NewRefreshMaterializedViewsHandler

func NewRefreshMaterializedViewsHandler(metrics *logging.Metrics, tsConfig *storage.TimeScaleDBConfig) *RefreshMaterializedViewsHandler

func (*RefreshMaterializedViewsHandler) RefreshView

func (*RefreshMaterializedViewsHandler) Start

type RefreshStationHandler

type RefreshStationHandler struct {
	// contains filtered or unexported fields
}

func NewRefreshStationHandler

func NewRefreshStationHandler(db *sqlxcache.DB, tsConfig *storage.TimeScaleDBConfig) *RefreshStationHandler

func (*RefreshStationHandler) Handle

type RefreshWindows

type RefreshWindows struct {
	// contains filtered or unexported fields
}

func NewRefreshWindows

func NewRefreshWindows(pool *pgxpool.Pool) *RefreshWindows

func (*RefreshWindows) DeleteAll

func (rw *RefreshWindows) DeleteAll(ctx context.Context) error

func (*RefreshWindows) QueryForDirty

func (rw *RefreshWindows) QueryForDirty(ctx context.Context) ([]*DirtyRange, error)

type SensorDataBatchHandler

type SensorDataBatchHandler struct {
	// contains filtered or unexported fields
}

func NewSensorDataBatchHandler

func NewSensorDataBatchHandler(metrics *logging.Metrics, publisher jobs.MessagePublisher, tsConfig *storage.TimeScaleDBConfig) *SensorDataBatchHandler

func (*SensorDataBatchHandler) Handle

type SensorDataModifiedHandler

type SensorDataModifiedHandler struct {
	// contains filtered or unexported fields
}

func NewSensorDataModifiedHandler

func NewSensorDataModifiedHandler(db *sqlxcache.DB, metrics *logging.Metrics, publisher jobs.MessagePublisher, tsConfig *storage.TimeScaleDBConfig) *SensorDataModifiedHandler

func (*SensorDataModifiedHandler) Handle

type SensorDatabaseIDs

type SensorDatabaseIDs struct {
	ModuleIDs       []int64
	SensorIDs       []int64
	KeyToHardwareID map[int64]string
}

type SensorMeta

type SensorMeta struct {
	data.Sensor
}

type StationIngestionSaga

type StationIngestionSaga struct {
	UserID     int32           `json:"user_id"`
	StationID  int32           `json:"station_id"`
	Ingestions []int64         `json:"ingestions"`
	Required   map[int64]int64 `json:"required"`
	Completed  map[int64]bool  `json:"completed"`
}

func (*StationIngestionSaga) HasMoreIngestions

func (s *StationIngestionSaga) HasMoreIngestions() bool

func (*StationIngestionSaga) IsCompleted

func (s *StationIngestionSaga) IsCompleted() bool

func (*StationIngestionSaga) NextIngestionID

func (s *StationIngestionSaga) NextIngestionID() int64

type StationMeta

type StationMeta struct {
	ID   int32  `db:"id"`
	Name string `db:"name"`
}

type StationRefresher

type StationRefresher struct {
	// contains filtered or unexported fields
}

func NewStationRefresher

func NewStationRefresher(db *sqlxcache.DB, tsConfig *storage.TimeScaleDBConfig, tableSuffix string) (sr *StationRefresher, err error)

func (*StationRefresher) Refresh

func (sr *StationRefresher) Refresh(ctx context.Context, stationID int32, howRecently time.Duration, completely, skipManual bool) error

type UnmarshalFunc

type UnmarshalFunc func([]byte) (proto.Message, error)

type WalkInfo

type WalkInfo struct {
	MetaRecords int64
	DataRecords int64
}

type WalkParameters

type WalkParameters struct {
	StationIDs []int32
	Start      time.Time
	End        time.Time
}

type WalkProgress

type WalkProgress struct {
	// contains filtered or unexported fields
}

type WalkRecordsInfo

type WalkRecordsInfo struct {
	TotalRecords int64
	DataRecords  int64
	MetaRecords  int64
	MetaErrors   int64
	DataErrors   int64
}

type WalkStatistics

type WalkStatistics struct {
	Start   *time.Time `db:"start"`
	End     *time.Time `db:"end"`
	Records int64      `db:"records"`
}

type WalkerProgressFunc

type WalkerProgressFunc func(ctx context.Context, progress float64) error

type WorkFailed

type WorkFailed struct {
	Work *jobs.TransportMessage `json:"work"`
}

type WriteInfo

type WriteInfo struct {
	IngestionID   int64
	StationID     *int32
	Walk          *WalkRecordsInfo
	FutureIgnores int64
	DataStart     time.Time
	DataEnd       time.Time
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL