Documentation
¶
Index ¶
- Constants
- Variables
- func CreatePostgresSchema(ctx context.Context, sqlTx *pachsql.Tx) error
- func IsErrExists(err error) bool
- func IsErrNotFound(err error) bool
- func IsErrNotUnique(err error) bool
- func NewDryrunSQLTx(ctx context.Context, db *pachsql.DB, apply func(*pachsql.Tx) error) error
- func NewDryrunSTM(ctx context.Context, c *v3.Client, apply func(STM) error) error
- func NewSTM(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error)
- func SetupPostgresCollections(ctx context.Context, sqlTx *pachsql.Tx, collections ...PostgresCollection) error
- func SetupPostgresV0(ctx context.Context, sqlTx *pachsql.Tx) error
- type ErrExists
- type ErrNotFound
- type ErrNotUnique
- type EtcdCollection
- type EtcdReadOnlyCollection
- type EtcdReadWriteCollection
- type Index
- type Notification
- type Notifier
- type Option
- func WithExistsMessage(format func(interface{}) string) Option
- func WithKeyCheck(check func(string) error) Option
- func WithKeyGen(gen func(interface{}) (string, error)) Option
- func WithListBufferCapacity(cap int) Option
- func WithNotFoundMessage(format func(interface{}) string) Option
- func WithPutHook(putHook func(*pachsql.Tx, interface{}) error) Option
- type Options
- type PostgresCollection
- type PostgresListener
- type PostgresReadOnlyCollection
- type PostgresReadWriteCollection
- type ReadOnlyCollection
- type ReadWriteCollection
- type Renewer
- type STM
- type SortOrder
- type SortTarget
- type TestItem
- func (*TestItem) Descriptor() ([]byte, []int)deprecated
- func (x *TestItem) GetData() string
- func (x *TestItem) GetId() string
- func (x *TestItem) GetValue() string
- func (x *TestItem) MarshalLogObject(enc zapcore.ObjectEncoder) error
- func (*TestItem) ProtoMessage()
- func (x *TestItem) ProtoReflect() protoreflect.Message
- func (x *TestItem) Reset()
- func (x *TestItem) String() string
Constants ¶
const (
ChannelBufferSize = 1000
)
const (
DefaultPrefix string = "pachyderm/1.7.0"
)
defaultLimit was experimentally determined to be the highest value that could work (It gets scaled down for specific collections if they trip the max-message size.)
Variables ¶
var ( // ErrNotClaimed is an error used to indicate that a different requester beat // the current requester to a key claim. ErrNotClaimed = errors.New("NOT_CLAIMED") DefaultTTL = int64(30) )
var ( SortByCreateRevision = etcd.SortByCreateRevision SortByModRevision = etcd.SortByModRevision SortByKey = etcd.SortByKey SortNone = etcd.SortNone SortAscend = etcd.SortAscend SortDescend = etcd.SortDescend )
Hoist these consts so users don't have to import etcd
var File_internal_collection_test_proto protoreflect.FileDescriptor
Functions ¶
func IsErrExists ¶
IsErrExists determines if an error is an ErrExists error
func IsErrNotFound ¶
IsErrNotFound determines if an error is an ErrNotFound error
func IsErrNotUnique ¶
IsErrNotUnique determines if an error is an ErrNotUnique error
func NewDryrunSQLTx ¶
NewDryrunSQLTx is identical to NewSQLTx except it will always roll back the transaction instead of committing it.
func NewDryrunSTM ¶
NewDryrunSTM intiates a new STM operation, but the final commit is skipped. It uses a serializable model.
Types ¶
type ErrExists ¶
ErrExists indicates that a key was found to exist when it was expected not to.
func (ErrExists) GRPCStatus ¶
type ErrNotFound ¶
ErrNotFound indicates that a key was not found when it was expected to exist.
func (ErrNotFound) Error ¶
func (err ErrNotFound) Error() string
func (ErrNotFound) GRPCStatus ¶
func (e ErrNotFound) GRPCStatus() *status.Status
func (ErrNotFound) Is ¶
func (err ErrNotFound) Is(other error) bool
type ErrNotUnique ¶
ErrNotUnique indicates that an indexed query expected to have exactly one result but had more than one result.
func (ErrNotUnique) Error ¶
func (err ErrNotUnique) Error() string
func (ErrNotUnique) Is ¶
func (err ErrNotUnique) Is(other error) bool
type EtcdCollection ¶
type EtcdCollection interface {
// ReadWrite enables reads and writes on a collection in a
// transactional manner. Specifically, all writes are applied
// atomically, and writes are only applied if reads have not been
// invalidated at the end of the transaction. Basically, it's
// software transactional memory. See this blog post for details:
// https://coreos.com/blog/transactional-memory-with-etcd3.html
ReadWrite(stm STM) EtcdReadWriteCollection
// For read-only operations, use the ReadOnly for better performance
ReadOnly(ctx context.Context) EtcdReadOnlyCollection
// Claim attempts to claim a key and run the passed in callback with
// the context for the claim.
Claim(ctx context.Context, key string, val proto.Message, f func(context.Context) error) error
WithRenewer(ctx context.Context, cb func(context.Context, *Renewer) error) error
}
type EtcdReadOnlyCollection ¶
type EtcdReadOnlyCollection interface {
ReadOnlyCollection
// TTL returns the number of seconds that 'key' will continue to exist in the
// collection, or '0' if 'key' will remain in the collection indefinitely
// TODO: TTL might be unused
TTL(key string) (int64, error)
// CountRev returns the number of items in the collection at a specific
// revision, it's only in EtcdReadOnlyCollection because only etcd has
// revs.
// TODO: CountRev might be unused
CountRev(int64) (int64, int64, error)
}
type EtcdReadWriteCollection ¶
type EtcdReadWriteCollection interface {
ReadWriteCollection
// TTL returns the amount of time that 'key' will continue to exist in the
// collection, or '0' if 'key' will remain in the collection indefinitely
TTL(key string) (int64, error)
// PutTTL is the same as Put except that the object is removed after
// TTL seconds.
// WARNING: using PutTTL with a collection that has secondary indices
// can result in inconsistency, as the indices are removed at roughly
// but not exactly the same time as the documents.
PutTTL(key string, val proto.Message, ttl int64) error
DeleteAllPrefix(prefix string) error
}
type Index ¶
type Index struct {
Name string
Extract func(val proto.Message) string
// contains filtered or unexported fields
}
Index specifies a secondary index on a collection.
Indexes are created in a transactional manner thanks to etcd's transactional support.
A secondary index for collection "foo" on field "bar" will reside under the path `/foo__index_bar`. Each item under the path is in turn a directory whose name is the value of the field `bar`. For instance, if you have a object in collection `foo` whose `bar` field is `test`, then you will see a directory at path `/foo__index_bar/test`.
Under that directory, you have keys that point to items in the collection. For instance, if the aforementioned object has the key "buzz", then you will see an item at `/foo__index_bar/test/buzz`. The value of this item is empty. Thus, to get all items in collection `foo` whose values of field `bar` is `test`, we issue a query for all items under `foo__index_bar/test`.
type Notification ¶
type Notification = pq.Notification
type Notifier ¶
type Notifier interface {
// ID is a unique identifier for the notifier.
ID() string
// Channel is the channel that this notifier should receive notifications for.
Channel() string
// Notify sends a notification to the notifier.
Notify(*Notification)
// Error sends an error to the notifier.
Error(error)
}
type Option ¶
type Option func(collection *postgresCollection)
func WithExistsMessage ¶
func WithKeyCheck ¶
func WithKeyGen ¶
func WithListBufferCapacity ¶
func WithNotFoundMessage ¶
type Options ¶
type Options struct {
Target SortTarget
Order SortOrder
// Limit is only implemented for postgres collections
Limit int
}
Options are the sort options when iterating through etcd key/values. Currently implemented sort targets are CreateRevision and ModRevision.
func DefaultOptions ¶
func DefaultOptions() *Options
DefaultOptions are the default sort options when iterating through etcd key/values.
type PostgresCollection ¶
type PostgresCollection interface {
// ReadWrite enables reads and writes on a collection in a
// transactional manner. Specifically, all writes are applied
// atomically, and writes are only applied if reads have not been
// invalidated at the end of the transaction. Basically, it's
// software transactional memory. See this blog post for details:
// https://coreos.com/blog/transactional-memory-with-etcd3.html
ReadWrite(tx *pachsql.Tx) PostgresReadWriteCollection
// For read-only operations, use the ReadOnly for better performance
ReadOnly(ctx context.Context) PostgresReadOnlyCollection
}
func NewPostgresCollection ¶
func NewPostgresCollection(name string, db *pachsql.DB, listener PostgresListener, template proto.Message, indexes []*Index, opts ...Option) PostgresCollection
NewPostgresCollection creates a new collection backed by postgres.
type PostgresListener ¶
type PostgresListener interface {
// Register registers a notifier with the postgres listener.
// A notifier will receive notifications for the channel it is associated
// with while registered with the postgres listener.
Register(Notifier) error
// Unregister unregisters a notifier with the postgres listener.
// A notifier will no longer receive notifications when this call completes.
Unregister(Notifier) error
// Close closes the postgres listener.
Close() error
}
func NewPostgresListener ¶
func NewPostgresListener(dsn string) PostgresListener
type PostgresReadOnlyCollection ¶
type PostgresReadOnlyCollection interface {
ReadOnlyCollection
GetRevByIndex(index *Index, indexVal string, val proto.Message, opts *Options, f func(string, int64) error) error
// GetUniqueByIndex is identical to GetByIndex except it is an error if
// exactly one row is not found.
// TODO: decide if we should merge this with GetByIndex and use an `Options`.
GetUniqueByIndex(index *Index, indexVal string, val proto.Message) error
}
type PostgresReadWriteCollection ¶
type PostgresReadWriteCollection interface {
ReadWriteCollection
DeleteByIndex(index *Index, indexVal string) error
// GetByIndex can have a large impact on database contention if used to retrieve
// a large number of rows. Consider using a read-only collection if possible
GetByIndex(index *Index, indexVal string, val proto.Message, opts *Options, f func(string) error) error
// GetUniqueByIndex is identical to GetByIndex except it is an error if
// exactly one row is not found.
// TODO: decide if we should merge this with GetByIndex and use an `Options`.
GetUniqueByIndex(index *Index, indexVal string, val proto.Message) error
// NOTE: List scans the collection over multiple queries,
// making this method susceptible to inconsistent reads
List(val proto.Message, opts *Options, f func(string) error) error
}
type ReadOnlyCollection ¶
type ReadOnlyCollection interface {
Get(key interface{}, val proto.Message) error
GetByIndex(index *Index, indexVal string, val proto.Message, opts *Options, f func(string) error) error
List(val proto.Message, opts *Options, f func(string) error) error
ListRev(val proto.Message, opts *Options, f func(string, int64) error) error
Count() (int64, error)
Watch(opts ...watch.Option) (watch.Watcher, error)
WatchF(f func(*watch.Event) error, opts ...watch.Option) error
WatchOne(key interface{}, opts ...watch.Option) (watch.Watcher, error)
WatchOneF(key interface{}, f func(*watch.Event) error, opts ...watch.Option) error
WatchByIndex(index *Index, val string, opts ...watch.Option) (watch.Watcher, error)
WatchByIndexF(index *Index, val string, f func(*watch.Event) error, opts ...watch.Option) error
}
ReadOnlyCollection is a collection interface that only supports read ops.
type ReadWriteCollection ¶
type ReadWriteCollection interface {
Get(key interface{}, val proto.Message) error
Put(key interface{}, val proto.Message) error
// Update reads the current value associated with 'key', calls 'f' to update
// the value, and writes the new value back to the collection. 'key' must be
// present in the collection, or a 'Not Found' error is returned
Update(key interface{}, val proto.Message, f func() error) error
// Upsert is like Update but 'key' is not required to be present
Upsert(key interface{}, val proto.Message, f func() error) error
Create(key interface{}, val proto.Message) error
Delete(key interface{}) error
DeleteAll() error
}
ReadWriteCollection is a collection interface that supports read,write and delete operations.
type STM ¶
type STM interface {
// Get returns the value for a key and inserts the key in the txn's read set.
// If Get fails, it aborts the transaction with an error, never returning.
Get(key string) (string, error)
// Put adds a value for a key to the write set.
Put(key, val string, ttl int64, ptr uintptr) error
PutLease(key, val string, lease v3.LeaseID, ptr uintptr) error
PutIgnoreLease(key, val string, ptr uintptr) error
// Del deletes a key.
Del(key string)
// TTL returns the remaining time to live for 'key', or 0 if 'key' has no TTL
TTL(key string) (int64, error)
// DelAll deletes all keys with the given prefix
// Note that the current implementation of DelAll is incomplete.
// To use DelAll safely, do not issue any Get/Put operations after
// DelAll is called.
DelAll(key string)
Context() context.Context
// SetSafePutCheck sets the bit pattern to check if a put is safe.
SetSafePutCheck(key string, ptr uintptr)
// IsSafePut checks against the bit pattern for a key to see if it is safe to put.
IsSafePut(key string, ptr uintptr) bool
// contains filtered or unexported methods
}
STM is an interface for software transactional memory.
type SortTarget ¶
type SortTarget = etcd.SortTarget
type TestItem ¶
type TestItem struct {
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
Data string `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
// contains filtered or unexported fields
}
func (*TestItem) Descriptor
deprecated
func (*TestItem) MarshalLogObject ¶
func (x *TestItem) MarshalLogObject(enc zapcore.ObjectEncoder) error
func (*TestItem) ProtoMessage ¶
func (*TestItem) ProtoMessage()
func (*TestItem) ProtoReflect ¶ added in v2.7.0
func (x *TestItem) ProtoReflect() protoreflect.Message