storage

package
v0.25.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2024 License: Apache-2.0 Imports: 26 Imported by: 1

Documentation

Overview

Package storage provide generic interface to interact with storage backend.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotFound             = errors.New("record not found")
	ErrStreamDone           = errors.New("record stream done")
	ErrInvalidServerVersion = status.Error(codes.Aborted, "invalid server version")
)

Errors

Functions

func GetRecordIndex added in v0.18.0

func GetRecordIndex(msg proto.Message) *structpb.Struct

GetRecordIndex gets a record's index. If there is no index, nil is returned.

func GetRecordIndexCIDR added in v0.18.0

func GetRecordIndexCIDR(msg proto.Message) *netip.Prefix

GetRecordIndexCIDR returns the $index.cidr for a record's data. If none is available nil is returned.

func IsNotFound added in v0.18.0

func IsNotFound(err error) bool

IsNotFound returns true if the error is because a record was not found.

func MatchAny added in v0.11.0

func MatchAny(any *anypb.Any, query string) bool

MatchAny searches any data with a query.

func PatchRecord added in v0.24.0

func PatchRecord(existing, record *databroker.Record, fields *fieldmaskpb.FieldMask) error

PatchRecord extracts the data from existing and record, updates the existing data subject to the provided field mask, and stores the result back into record. The existing record is not modified.

func RecordStreamToList added in v0.17.3

func RecordStreamToList(recordStream RecordStream) ([]*databroker.Record, error)

RecordStreamToList converts a record stream to a list.

func WithQuerier added in v0.18.0

func WithQuerier(ctx context.Context, querier Querier) context.Context

WithQuerier sets the databroker Querier on a context.

Types

type AndFilterExpression added in v0.18.0

type AndFilterExpression []FilterExpression

An AndFilterExpression represents a logical-and comparison operator.

type Backend

type Backend interface {
	// Close closes the backend.
	Close() error
	// Get is used to retrieve a record.
	Get(ctx context.Context, recordType, id string) (*databroker.Record, error)
	// GetOptions gets the options for a type.
	GetOptions(ctx context.Context, recordType string) (*databroker.Options, error)
	// Lease acquires a lease, or renews an existing one. If the lease is acquired true is returned.
	Lease(ctx context.Context, leaseName, leaseID string, ttl time.Duration) (bool, error)
	// ListTypes lists all the known record types.
	ListTypes(ctx context.Context) ([]string, error)
	// Put is used to insert or update records.
	Put(ctx context.Context, records []*databroker.Record) (serverVersion uint64, err error)
	// Patch is used to update specific fields of existing records.
	Patch(ctx context.Context, records []*databroker.Record, fields *fieldmaskpb.FieldMask) (serverVersion uint64, patchedRecords []*databroker.Record, err error)
	// SetOptions sets the options for a type.
	SetOptions(ctx context.Context, recordType string, options *databroker.Options) error
	// Sync syncs record changes after the specified version.
	Sync(ctx context.Context, recordType string, serverVersion, recordVersion uint64) (RecordStream, error)
	// SyncLatest syncs all the records.
	SyncLatest(ctx context.Context, recordType string, filter FilterExpression) (serverVersion, recordVersion uint64, stream RecordStream, err error)
}

Backend is the interface required for a storage backend.

type Cache added in v0.18.0

type Cache interface {
	GetOrUpdate(
		ctx context.Context,
		key []byte,
		update func(ctx context.Context) ([]byte, error),
	) ([]byte, error)
	Invalidate(key []byte)
}

A Cache will return cached data when available or call update when not.

func NewGlobalCache added in v0.18.0

func NewGlobalCache(ttl time.Duration) Cache

NewGlobalCache creates a new Cache backed by fastcache and a TTL.

func NewLocalCache added in v0.18.0

func NewLocalCache() Cache

NewLocalCache creates a new Cache backed by a map.

type EqualsFilterExpression added in v0.18.0

type EqualsFilterExpression struct {
	Fields []string
	Value  string
}

An EqualsFilterExpression represents a field comparison operator.

type FilterExpression added in v0.18.0

type FilterExpression interface {
	// contains filtered or unexported methods
}

A FilterExpression describes an AST for record stream filters.

func FilterExpressionFromStruct added in v0.18.0

func FilterExpressionFromStruct(s *structpb.Struct) (FilterExpression, error)

FilterExpressionFromStruct creates a FilterExpression from a protobuf struct.

type OrFilterExpression added in v0.18.0

type OrFilterExpression []FilterExpression

An OrFilterExpression represents a logical-or comparison operator.

type Querier added in v0.18.0

type Querier interface {
	InvalidateCache(ctx context.Context, in *databroker.QueryRequest)
	Query(ctx context.Context, in *databroker.QueryRequest, opts ...grpc.CallOption) (*databroker.QueryResponse, error)
}

A Querier is a read-only subset of the client methods

func GetQuerier added in v0.18.0

func GetQuerier(ctx context.Context) Querier

GetQuerier gets the databroker Querier from the context.

func NewCachingQuerier added in v0.18.0

func NewCachingQuerier(q Querier, cache Cache) Querier

NewCachingQuerier creates a new querier that caches results in a Cache.

func NewQuerier added in v0.18.0

func NewQuerier(client databroker.DataBrokerServiceClient) Querier

NewQuerier creates a new Querier that implements the Querier interface by making calls to the databroker over gRPC.

func NewStaticQuerier added in v0.18.0

func NewStaticQuerier(msgs ...proto.Message) Querier

NewStaticQuerier creates a Querier that returns statically defined protobuf records.

type QueryTrace added in v0.18.0

type QueryTrace struct {
	ServerVersion, RecordVersion uint64

	RecordType string
	Query      string
	Filter     *structpb.Struct
}

A QueryTrace traces a call to Query.

type RecordStream added in v0.14.0

type RecordStream interface {
	// Close closes the record stream and releases any underlying resources.
	Close() error
	// Next is called to retrieve the next record. If one is available it will
	// be returned immediately. If none is available and block is true, the method
	// will block until one is available or an error occurs. The error should be
	// checked with a call to `.Err()`.
	Next(block bool) bool
	// Record returns the current record.
	Record() *databroker.Record
	// Err returns any error that occurred while streaming.
	Err() error
}

A RecordStream is a stream of records.

func NewConcatenatedRecordStream added in v0.18.0

func NewConcatenatedRecordStream(streams ...RecordStream) RecordStream

NewConcatenatedRecordStream creates a new record stream that streams all the records from the first stream before streaming all the records of the subsequent streams.

func NewRecordStream added in v0.17.3

func NewRecordStream(
	ctx context.Context,
	backendClosed chan struct{},
	generators []RecordStreamGenerator,
	onClose func(),
) RecordStream

NewRecordStream creates a new RecordStream from a list of generators and an onClose function.

func RecordListToStream added in v0.17.3

func RecordListToStream(ctx context.Context, records []*databroker.Record) RecordStream

RecordListToStream converts a record list to a stream.

type RecordStreamFilter added in v0.18.0

type RecordStreamFilter func(record *databroker.Record) (keep bool)

A RecordStreamFilter filters a RecordStream.

func RecordStreamFilterFromFilterExpression added in v0.18.0

func RecordStreamFilterFromFilterExpression(
	expr FilterExpression,
) (filter RecordStreamFilter, err error)

RecordStreamFilterFromFilterExpression returns a RecordStreamFilter from a FilterExpression.

func (RecordStreamFilter) And added in v0.18.0

And creates a new RecordStreamFilter by applying both functions to a record.

type RecordStreamGenerator added in v0.17.3

type RecordStreamGenerator = func(ctx context.Context, block bool) (*databroker.Record, error)

A RecordStreamGenerator generates records for a record stream.

func FilteredRecordStreamGenerator added in v0.18.0

func FilteredRecordStreamGenerator(
	generator RecordStreamGenerator,
	filter RecordStreamFilter,
) RecordStreamGenerator

FilteredRecordStreamGenerator creates a RecordStreamGenerator that only returns records that pass the filter.

type TracingQuerier added in v0.18.0

type TracingQuerier struct {
	// contains filtered or unexported fields
}

A TracingQuerier records calls to Query.

func NewTracingQuerier added in v0.18.0

func NewTracingQuerier(q Querier) *TracingQuerier

NewTracingQuerier creates a new TracingQuerier.

func (*TracingQuerier) InvalidateCache added in v0.18.0

func (q *TracingQuerier) InvalidateCache(ctx context.Context, in *databroker.QueryRequest)

InvalidateCache invalidates the cache.

func (*TracingQuerier) Query added in v0.18.0

Query queries for records.

func (*TracingQuerier) Traces added in v0.18.0

func (q *TracingQuerier) Traces() []QueryTrace

Traces returns all the traces.

Directories

Path Synopsis
Package inmemory contains an in-memory implementation of the databroker backend.
Package inmemory contains an in-memory implementation of the databroker backend.
Package postgres contains an implementation of the storage.Backend backed by postgres.
Package postgres contains an implementation of the storage.Backend backed by postgres.
Package storagetest contains test cases for use in verifying the behavior of a storage.Backend implementation.
Package storagetest contains test cases for use in verifying the behavior of a storage.Backend implementation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL