collection

package
v2.9.0-nightly.20240112 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2024 License: Apache-2.0 Imports: 47 Imported by: 0

README

Collections

Collections are a library for storing protobufs in a persistent database with a simple interface for reading, writing, and watching for changes. There are two existing implementations, one for etcd and one for postgres.

A collection can be constructed via NewEtcdCollection or NewPostgresCollection, depending on the backend to be used. This function must be given:

  • a template protobuf Message, which will be clones for use in serialization and deserialization
  • a set of fields to create secondary indexes for (used by GetByIndex and WatchByIndex)

API

The collection API is split into two means of accessing a collection - ReadOnly and ReadWrite. A ReadOnly collection does not make any consistency guarantees, while a ReadWrite collection is guaranteed to do all reads and writes within a single transaction, although this comes with trade-offs.

ReadOnly

A ReadOnlyCollection is constructed with a context that can be used for cancellation.

Operations:

  • Get, GetByIndex - point reads
  • List - stream the entire collection with some sorting options
  • Count - get the number of items in the collection
  • Watch, WatchF - stream the entire collection, then get notified of changes until canceled
  • WatchOne, WatchOneF - stream changes to a single item in the collection

EtcdReadOnlyCollection additionally supports:

  • TTL - get the time-to-live of a given item in the collection
  • ListRev - stream the entire collection alongside the etcd revision of each item
  • WatchByIndex - watch the entire collection based on an index (unused)
ReadWrite

A ReadWriteCollection is constructed with a transaction (STM for etcd, sqlx.Tx for postgres) which is used to isolate any operations until the transaction is finished. The transaction itself may reattempt its callback multiple times in case the transaction is invalidated by other clients to the database. It is important to note that you cannot preserve transactionality between an STM and a sqlx.Tx, so any code that must change things in both etcd and postgres must be aware of this (and generally, such code should be avoided).

Operations:

  • Get - same as in ReadOnlyCollection, but guaranteed to be consistent with the state of the transaction
  • Put - create or overwrite a given item in the collection
  • Update - get a given item from the collection, modify it in a callback, then write it out afterwards
  • Upsert - same as Update except it will continue with the callback and insert if the item does not exist already
  • Create - create a given item in the collection, error if it already exists
  • Delete - remove a given item from the collection, error if it does not exist
  • DeleteAll - remove all items from the collection

EtcdReadWriteCollection additional supports:

  • TTL - get the time-to-live of a given item in the collection
  • PutTTL - write an item to the collection that will be removed after the given time-to-live

Implementation

Etcd

TODO

STM

The STM can be constructed via NewSTM (or NewDryrunSTM, which will not commit changes). This code is copied from the etcd library in order to extend it. It is a relatively common problem that etcd transactions will get too large and be rejected by etcd. While the etcd config can be adjusted to increase the limit on transaction size, this can only do so much. For any arbitrarily-large transactions, you may need to find a way to break it up into pieces of a controlled size.

Postgres

The postgres implementation is written to mirror the functionality of the etcd implementation rather than to provide an ORM-style table of the protobuf fields inside a database.

sqlx.Tx

The sqlx.Tx can be constructed via NewSQLTx. This will occupy a sql session for its lifetime. Constructing tehe sqlx.Tx will result in a BEGIN; at the start, and a COMMIT; at the end of the callback, unless an error occurs, in which case a ROLLBACK; will be issued. This will reattempt up to three times in the case of a commit error.

Tables

A table will be created for each postgres collection. Construction of the collection will check that the table exists, as well as all relevant triggers and indexes. If any are missing, they will be created.

The model struct exists to mirror the columns in the database for reading results from sqlx into a struct:

  • CreatedAt (golang) -> createdat (postgres) - the timestamp at which the item was created
  • UpdatedAt (golang) -> updatedat (postgres) - the timestamp at which the item was last modified
  • Version (golang) -> version (postgres) - the version of pachyderm which serialized the protobuf
  • Key (golang) -> key (postgres) - the primary key for the item
  • Proto (golang) -> proto (postgres) - the item's serialized protobuf

Additionally, columns will be created for each indexed field, following the pattern idx_<field> - these are only used for indexing purposes and are never actually read back out by a client, the user can get that information from the protobuf.

Triggers

Two triggers exist on each postgres collection table, update_modified_trigger and notify_watch_trigger.

update_modified_trigger will modify the row before any write to update the updatedat column to the current time.

notify_watch_trigger will issue NOTIFY commands to one or several channels in case any clients are running watches on the collection. For table-wide watches, the channel is straightforwardly pwc_<table>. For point-watches, the trigger will also notify one channel for each index on the collection. The channel name must depend on the value of the field being indexed so that clients do not have to discard O(n) events per interesting event. Unfortunately, channel names must be valid postgres identifiers which puts several limitations on the channel name, so the value must be hashed. This still leaves the possibility open for a collision in the channel name, so the client must still check that the event belongs to the watch in question (see Listeners for details).

The payload on the channel includes several fields:

  • the field name of the index being notified
  • the value of the index being notified
  • the primary key of the row
  • the postgres operation that caused the trigger ('INSERT', 'UPDATE', 'DELETE')
  • the timestamp of the transaction that caused the trigger
  • a base64-encoded serialized protobuf of the item
Listeners

In order to provide watch semantics, a postgres connection must be dedicated to listening for notifications. This functionality is not available on sqlx connections, so the pq library is used instead. This is the same library that provides the underlying connection for sqlx, although we don't get much benefit from this. A PostgresListener must be passed to any collection at construction time so that watches can be performed. The PostgresListener lazily connect for the first LISTEN operation, and all LISTENs should be multiplexed through a single connection.

Each Watch operation will listen on a postgres channel according to the watch type:

  • Watch, WatchF - listens to the table-wide pwc_<table> channel
  • WatchOne, WatchOneF, WatchByIndex - listens to a hashed channel pwc_<table>_<hash> based on the index key

Once the LISTEN has returned, events must be buffered until the user has read out the current state of objects in the collections. The Watch operation will then issue a Get or List operation to load the current state and keep track of the last modified timestamp. The existing items will be provided to the Watch as EventPut events, and once these are done, the buffered events will be forwarded to the Watch, filtering out any that arrived before loading the existing items.

Events that arrive from the LISTEN may not belong to the Watch due to hash collisions or a race condition in how we set up the Watch. Therefore, the payload must be parsed to determine if the Watch is interested in the event, by comparing the index field and value to the Watch's parameters, and potentially filtering out 'Delete' or 'Put' events, or events that arrived before the Watch was ready.

Documentation

Index

Constants

View Source
const (
	ChannelBufferSize = 1000
)
View Source
const (
	DefaultPrefix string = "pachyderm/1.7.0"
)

defaultLimit was experimentally determined to be the highest value that could work (It gets scaled down for specific collections if they trip the max-message size.)

Variables

View Source
var (
	// ErrNotClaimed is an error used to indicate that a different requester beat
	// the current requester to a key claim.
	ErrNotClaimed = errors.New("NOT_CLAIMED")
	DefaultTTL    = int64(30)
)
View Source
var (
	SortByCreateRevision = etcd.SortByCreateRevision
	SortByModRevision    = etcd.SortByModRevision
	SortByKey            = etcd.SortByKey
	SortNone             = etcd.SortNone

	SortAscend  = etcd.SortAscend
	SortDescend = etcd.SortDescend
)

Hoist these consts so users don't have to import etcd

View Source
var File_internal_collection_test_proto protoreflect.FileDescriptor

Functions

func CreatePostgresSchema

func CreatePostgresSchema(ctx context.Context, sqlTx *pachsql.Tx) error

func IsErrExists

func IsErrExists(err error) bool

IsErrExists determines if an error is an ErrExists error

func IsErrNotFound

func IsErrNotFound(err error) bool

IsErrNotFound determines if an error is an ErrNotFound error

func IsErrNotUnique

func IsErrNotUnique(err error) bool

IsErrNotUnique determines if an error is an ErrNotUnique error

func NewDryrunSQLTx

func NewDryrunSQLTx(ctx context.Context, db *pachsql.DB, apply func(*pachsql.Tx) error) error

NewDryrunSQLTx is identical to NewSQLTx except it will always roll back the transaction instead of committing it.

func NewDryrunSTM

func NewDryrunSTM(ctx context.Context, c *v3.Client, apply func(STM) error) error

NewDryrunSTM intiates a new STM operation, but the final commit is skipped. It uses a serializable model.

func NewSTM

func NewSTM(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error)

NewSTM intiates a new STM operation. It uses a serializable model.

func SetupPostgresCollections

func SetupPostgresCollections(ctx context.Context, sqlTx *pachsql.Tx, collections ...PostgresCollection) error

func SetupPostgresV0

func SetupPostgresV0(ctx context.Context, sqlTx *pachsql.Tx) error

DO NOT MODIFY THIS FUNCTION IT HAS BEEN USED IN A RELEASED MIGRATION

Types

type ErrExists

type ErrExists struct {
	Type string
	Key  string
	// contains filtered or unexported fields
}

ErrExists indicates that a key was found to exist when it was expected not to.

func (ErrExists) Error

func (err ErrExists) Error() string

func (ErrExists) GRPCStatus

func (e ErrExists) GRPCStatus() *status.Status

func (ErrExists) Is

func (err ErrExists) Is(other error) bool

type ErrNotFound

type ErrNotFound struct {
	Type string
	Key  string
	// contains filtered or unexported fields
}

ErrNotFound indicates that a key was not found when it was expected to exist.

func (ErrNotFound) Error

func (err ErrNotFound) Error() string

func (ErrNotFound) GRPCStatus

func (e ErrNotFound) GRPCStatus() *status.Status

func (ErrNotFound) Is

func (err ErrNotFound) Is(other error) bool

type ErrNotUnique

type ErrNotUnique struct {
	Type  string
	Index string
	Value string
}

ErrNotUnique indicates that an indexed query expected to have exactly one result but had more than one result.

func (ErrNotUnique) Error

func (err ErrNotUnique) Error() string

func (ErrNotUnique) Is

func (err ErrNotUnique) Is(other error) bool

type EtcdCollection

type EtcdCollection interface {
	// ReadWrite enables reads and writes on a collection in a
	// transactional manner.  Specifically, all writes are applied
	// atomically, and writes are only applied if reads have not been
	// invalidated at the end of the transaction.  Basically, it's
	// software transactional memory.  See this blog post for details:
	// https://coreos.com/blog/transactional-memory-with-etcd3.html
	ReadWrite(stm STM) EtcdReadWriteCollection

	// For read-only operations, use the ReadOnly for better performance
	ReadOnly(ctx context.Context) EtcdReadOnlyCollection

	// Claim attempts to claim a key and run the passed in callback with
	// the context for the claim.
	Claim(ctx context.Context, key string, val proto.Message, f func(context.Context) error) error

	WithRenewer(ctx context.Context, cb func(context.Context, *Renewer) error) error
}

func NewEtcdCollection

func NewEtcdCollection(etcdClient *etcd.Client, prefix string, indexes []*Index, template proto.Message, keyCheck func(string) error, valCheck func(proto.Message) error) EtcdCollection

NewEtcdCollection creates a new collection backed by etcd.

type EtcdReadOnlyCollection

type EtcdReadOnlyCollection interface {
	ReadOnlyCollection

	// TTL returns the number of seconds that 'key' will continue to exist in the
	// collection, or '0' if 'key' will remain in the collection indefinitely
	// TODO: TTL might be unused
	TTL(key string) (int64, error)
	// CountRev returns the number of items in the collection at a specific
	// revision, it's only in EtcdReadOnlyCollection because only etcd has
	// revs.
	// TODO: CountRev might be unused
	CountRev(int64) (int64, int64, error)
}

type EtcdReadWriteCollection

type EtcdReadWriteCollection interface {
	ReadWriteCollection

	// TTL returns the amount of time that 'key' will continue to exist in the
	// collection, or '0' if 'key' will remain in the collection indefinitely
	TTL(key string) (int64, error)
	// PutTTL is the same as Put except that the object is removed after
	// TTL seconds.
	// WARNING: using PutTTL with a collection that has secondary indices
	// can result in inconsistency, as the indices are removed at roughly
	// but not exactly the same time as the documents.
	PutTTL(key string, val proto.Message, ttl int64) error

	DeleteAllPrefix(prefix string) error
}

type Index

type Index struct {
	Name    string
	Extract func(val proto.Message) string
	// contains filtered or unexported fields
}

Index specifies a secondary index on a collection.

Indexes are created in a transactional manner thanks to etcd's transactional support.

A secondary index for collection "foo" on field "bar" will reside under the path `/foo__index_bar`. Each item under the path is in turn a directory whose name is the value of the field `bar`. For instance, if you have a object in collection `foo` whose `bar` field is `test`, then you will see a directory at path `/foo__index_bar/test`.

Under that directory, you have keys that point to items in the collection. For instance, if the aforementioned object has the key "buzz", then you will see an item at `/foo__index_bar/test/buzz`. The value of this item is empty. Thus, to get all items in collection `foo` whose values of field `bar` is `test`, we issue a query for all items under `foo__index_bar/test`.

type Notification

type Notification = pq.Notification

type Notifier

type Notifier interface {
	// ID is a unique identifier for the notifier.
	ID() string
	// Channel is the channel that this notifier should receive notifications for.
	Channel() string
	// Notify sends a notification to the notifier.
	Notify(*Notification)
	// Error sends an error to the notifier.
	Error(error)
}

type Option

type Option func(collection *postgresCollection)

func WithExistsMessage

func WithExistsMessage(format func(interface{}) string) Option

func WithKeyCheck

func WithKeyCheck(check func(string) error) Option

func WithKeyGen

func WithKeyGen(gen func(interface{}) (string, error)) Option

func WithListBufferCapacity

func WithListBufferCapacity(cap int) Option

func WithNotFoundMessage

func WithNotFoundMessage(format func(interface{}) string) Option

func WithPutHook

func WithPutHook(putHook func(*pachsql.Tx, interface{}) error) Option

type Options

type Options struct {
	Target SortTarget
	Order  SortOrder
	// Limit is only implemented for postgres collections
	Limit int
}

Options are the sort options when iterating through etcd key/values. Currently implemented sort targets are CreateRevision and ModRevision.

func DefaultOptions

func DefaultOptions() *Options

DefaultOptions are the default sort options when iterating through etcd key/values.

type PostgresCollection

type PostgresCollection interface {
	// ReadWrite enables reads and writes on a collection in a
	// transactional manner.  Specifically, all writes are applied
	// atomically, and writes are only applied if reads have not been
	// invalidated at the end of the transaction.  Basically, it's
	// software transactional memory.  See this blog post for details:
	// https://coreos.com/blog/transactional-memory-with-etcd3.html
	ReadWrite(tx *pachsql.Tx) PostgresReadWriteCollection

	// For read-only operations, use the ReadOnly for better performance
	ReadOnly(ctx context.Context) PostgresReadOnlyCollection
}

func NewPostgresCollection

func NewPostgresCollection(name string, db *pachsql.DB, listener PostgresListener, template proto.Message, indexes []*Index, opts ...Option) PostgresCollection

NewPostgresCollection creates a new collection backed by postgres.

type PostgresListener

type PostgresListener interface {
	// Register registers a notifier with the postgres listener.
	// A notifier will receive notifications for the channel it is associated
	// with while registered with the postgres listener.
	Register(Notifier) error
	// Unregister unregisters a notifier with the postgres listener.
	// A notifier will no longer receive notifications when this call completes.
	Unregister(Notifier) error
	// Close closes the postgres listener.
	Close() error
}

func NewPostgresListener

func NewPostgresListener(dsn string) PostgresListener

type PostgresReadOnlyCollection

type PostgresReadOnlyCollection interface {
	ReadOnlyCollection

	GetRevByIndex(index *Index, indexVal string, val proto.Message, opts *Options, f func(string, int64) error) error

	// GetUniqueByIndex is identical to GetByIndex except it is an error if
	// exactly one row is not found.
	// TODO: decide if we should merge this with GetByIndex and use an `Options`.
	GetUniqueByIndex(index *Index, indexVal string, val proto.Message) error
}

type PostgresReadWriteCollection

type PostgresReadWriteCollection interface {
	ReadWriteCollection

	DeleteByIndex(index *Index, indexVal string) error
	// GetByIndex can have a large impact on database contention if used to retrieve
	// a large number of rows. Consider using a read-only collection if possible
	GetByIndex(index *Index, indexVal string, val proto.Message, opts *Options, f func(string) error) error

	// GetUniqueByIndex is identical to GetByIndex except it is an error if
	// exactly one row is not found.
	// TODO: decide if we should merge this with GetByIndex and use an `Options`.
	GetUniqueByIndex(index *Index, indexVal string, val proto.Message) error
	// NOTE: List scans the collection over multiple queries,
	// making this method susceptible to inconsistent reads
	List(val proto.Message, opts *Options, f func(string) error) error
}

type ReadOnlyCollection

type ReadOnlyCollection interface {
	Get(key interface{}, val proto.Message) error
	GetByIndex(index *Index, indexVal string, val proto.Message, opts *Options, f func(string) error) error
	List(val proto.Message, opts *Options, f func(string) error) error
	ListRev(val proto.Message, opts *Options, f func(string, int64) error) error
	Count() (int64, error)
	Watch(opts ...watch.Option) (watch.Watcher, error)
	WatchF(f func(*watch.Event) error, opts ...watch.Option) error
	WatchOne(key interface{}, opts ...watch.Option) (watch.Watcher, error)
	WatchOneF(key interface{}, f func(*watch.Event) error, opts ...watch.Option) error
	WatchByIndex(index *Index, val string, opts ...watch.Option) (watch.Watcher, error)
	WatchByIndexF(index *Index, val string, f func(*watch.Event) error, opts ...watch.Option) error
}

ReadOnlyCollection is a collection interface that only supports read ops.

type ReadWriteCollection

type ReadWriteCollection interface {
	Get(key interface{}, val proto.Message) error
	Put(key interface{}, val proto.Message) error
	// Update reads the current value associated with 'key', calls 'f' to update
	// the value, and writes the new value back to the collection. 'key' must be
	// present in the collection, or a 'Not Found' error is returned
	Update(key interface{}, val proto.Message, f func() error) error
	// Upsert is like Update but 'key' is not required to be present
	Upsert(key interface{}, val proto.Message, f func() error) error
	Create(key interface{}, val proto.Message) error
	Delete(key interface{}) error
	DeleteAll() error
}

ReadWriteCollection is a collection interface that supports read,write and delete operations.

type Renewer

type Renewer struct {
	// contains filtered or unexported fields
}

func (*Renewer) Put

func (r *Renewer) Put(ctx context.Context, key string, val proto.Message) error

type STM

type STM interface {
	// Get returns the value for a key and inserts the key in the txn's read set.
	// If Get fails, it aborts the transaction with an error, never returning.
	Get(key string) (string, error)
	// Put adds a value for a key to the write set.
	Put(key, val string, ttl int64, ptr uintptr) error
	PutLease(key, val string, lease v3.LeaseID, ptr uintptr) error
	PutIgnoreLease(key, val string, ptr uintptr) error
	// Del deletes a key.
	Del(key string)
	// TTL returns the remaining time to live for 'key', or 0 if 'key' has no TTL
	TTL(key string) (int64, error)
	// DelAll deletes all keys with the given prefix
	// Note that the current implementation of DelAll is incomplete.
	// To use DelAll safely, do not issue any Get/Put operations after
	// DelAll is called.
	DelAll(key string)
	Context() context.Context
	// SetSafePutCheck sets the bit pattern to check if a put is safe.
	SetSafePutCheck(key string, ptr uintptr)
	// IsSafePut checks against the bit pattern for a key to see if it is safe to put.
	IsSafePut(key string, ptr uintptr) bool
	// contains filtered or unexported methods
}

STM is an interface for software transactional memory.

type SortOrder

type SortOrder = etcd.SortOrder

type SortTarget

type SortTarget = etcd.SortTarget

type TestItem

type TestItem struct {
	Id    string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
	Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
	Data  string `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
	// contains filtered or unexported fields
}

func (*TestItem) Descriptor deprecated

func (*TestItem) Descriptor() ([]byte, []int)

Deprecated: Use TestItem.ProtoReflect.Descriptor instead.

func (*TestItem) GetData

func (x *TestItem) GetData() string

func (*TestItem) GetId added in v2.7.0

func (x *TestItem) GetId() string

func (*TestItem) GetValue

func (x *TestItem) GetValue() string

func (*TestItem) MarshalLogObject

func (x *TestItem) MarshalLogObject(enc zapcore.ObjectEncoder) error

func (*TestItem) ProtoMessage

func (*TestItem) ProtoMessage()

func (*TestItem) ProtoReflect added in v2.7.0

func (x *TestItem) ProtoReflect() protoreflect.Message

func (*TestItem) Reset

func (x *TestItem) Reset()

func (*TestItem) String

func (x *TestItem) String() string

func (*TestItem) Validate added in v2.8.0

func (m *TestItem) Validate() error

Validate checks the field values on TestItem with the rules defined in the proto definition for this message. If any rules are violated, the first error encountered is returned, or nil if there are no violations.

func (*TestItem) ValidateAll added in v2.8.0

func (m *TestItem) ValidateAll() error

ValidateAll checks the field values on TestItem with the rules defined in the proto definition for this message. If any rules are violated, the result is a list of violation errors wrapped in TestItemMultiError, or nil if none found.

type TestItemMultiError added in v2.8.0

type TestItemMultiError []error

TestItemMultiError is an error wrapping multiple validation errors returned by TestItem.ValidateAll() if the designated constraints aren't met.

func (TestItemMultiError) AllErrors added in v2.8.0

func (m TestItemMultiError) AllErrors() []error

AllErrors returns a list of validation violation errors.

func (TestItemMultiError) Error added in v2.8.0

func (m TestItemMultiError) Error() string

Error returns a concatenation of all the error messages it wraps.

type TestItemValidationError added in v2.8.0

type TestItemValidationError struct {
	// contains filtered or unexported fields
}

TestItemValidationError is the validation error returned by TestItem.Validate if the designated constraints aren't met.

func (TestItemValidationError) Cause added in v2.8.0

func (e TestItemValidationError) Cause() error

Cause function returns cause value.

func (TestItemValidationError) Error added in v2.8.0

func (e TestItemValidationError) Error() string

Error satisfies the builtin error interface

func (TestItemValidationError) ErrorName added in v2.8.0

func (e TestItemValidationError) ErrorName() string

ErrorName returns error name.

func (TestItemValidationError) Field added in v2.8.0

func (e TestItemValidationError) Field() string

Field function returns field value.

func (TestItemValidationError) Key added in v2.8.0

func (e TestItemValidationError) Key() bool

Key function returns key value.

func (TestItemValidationError) Reason added in v2.8.0

func (e TestItemValidationError) Reason() string

Reason function returns reason value.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL