layers

package
v0.0.0-...-f5e1f6b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DatasetRequest

type DatasetRequest struct {
	DatasetName string
	Since       string
	Limit       int64
}

type Layer

type Layer struct {
	Repo *Repository //exported because it needs to deferred from main
	// contains filtered or unexported fields
}

func NewLayer

func NewLayer(lc fx.Lifecycle, cmgr *conf.ConfigurationManager, env *conf.Env) *Layer

func (*Layer) Dataset

func (l *Layer) Dataset(request db.DatasetRequest) (ReadableDataset, error)

func (*Layer) DoesDatasetExist

func (l *Layer) DoesDatasetExist(datasetName string) bool

func (*Layer) GetContext

func (l *Layer) GetContext(datasetName string) map[string]any

func (*Layer) GetDatasetNames

func (l *Layer) GetDatasetNames() []string

func (*Layer) GetDatasetPostNames

func (l *Layer) GetDatasetPostNames() []string

func (*Layer) GetTableDefinition

func (l *Layer) GetTableDefinition(datasetName string) *conf.TableMapping

type PostLayer

type PostLayer struct {
	PostRepo *PostRepository //exported because it needs to deferred from main??
	// contains filtered or unexported fields
}

func NewPostLayer

func NewPostLayer(cmgr *conf.ConfigurationManager, logger *zap.SugaredLogger) *PostLayer

func (*PostLayer) Dataset

func (postLayer *PostLayer) Dataset(request db.DatasetRequest) (WriteableDataset, error)

type PostRepository

type PostRepository struct {
	DB *pgxpool.Pool
	// contains filtered or unexported fields
}

type PostgresDataset

type PostgresDataset struct {
	// contains filtered or unexported fields
}

func NewPostgresDataset

func NewPostgresDataset(pg *pgxpool.Pool, table *db.ReadTable, writeTable *db.WriteTable, request db.DatasetRequest) *PostgresDataset

func (*PostgresDataset) Read

func (ds *PostgresDataset) Read(ctx context.Context, entities chan<- *uda.Entity) error

Read reads from a postgres query result, and emits an uda.Entity to the entities chan

func (*PostgresDataset) ReadChanges

func (ds *PostgresDataset) ReadChanges(ctx context.Context, since string, entities chan<- *uda.Entity) (string, error)

func (*PostgresDataset) Write

func (ds *PostgresDataset) Write(ctx context.Context, entities <-chan *uda.Entity, entityContext *uda.Context) error

Write takes a chan of uda.Entity and queues this in a batch request. Once the batch request has batchChunkSize in entities, it attempts to commit the batch in a transaction. If the transaction fails, an error is returned from the writer, and control is returned to the caller.

type ReadableDataset

type ReadableDataset interface {
	Read(ctx context.Context, entities chan<- *uda.Entity) error
	ReadChanges(ctx context.Context, since string, entities chan<- *uda.Entity) (string, error)
}

type Repository

type Repository struct {
	DB *pgxpool.Pool
	// contains filtered or unexported fields
}

type WriteableDataset

type WriteableDataset interface {
	Write(ctx context.Context, entities <-chan *uda.Entity, entityContext *uda.Context) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL