Documentation ¶
Index ¶
- Constants
- Variables
- func CreatePostgresSchema(ctx context.Context, sqlTx *pachsql.Tx) error
- func IsErrExists(err error) bool
- func IsErrNotFound(err error) bool
- func IsErrNotUnique(err error) bool
- func NewDryrunSQLTx(ctx context.Context, db *pachsql.DB, apply func(*pachsql.Tx) error) error
- func NewDryrunSTM(ctx context.Context, c *v3.Client, apply func(STM) error) error
- func NewSTM(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error)
- func SetupPostgresCollections(ctx context.Context, sqlTx *pachsql.Tx, collections ...PostgresCollection) error
- func SetupPostgresV0(ctx context.Context, sqlTx *pachsql.Tx) error
- type ErrExists
- type ErrNotFound
- type ErrNotUnique
- type EtcdCollection
- type EtcdReadOnlyCollection
- type EtcdReadWriteCollection
- type Index
- type Notification
- type Notifier
- type Option
- func WithExistsMessage(format func(interface{}) string) Option
- func WithKeyCheck(check func(string) error) Option
- func WithKeyGen(gen func(interface{}) (string, error)) Option
- func WithListBufferCapacity(cap int) Option
- func WithNotFoundMessage(format func(interface{}) string) Option
- func WithPutHook(putHook func(*pachsql.Tx, interface{}) error) Option
- type Options
- type PostgresCollection
- type PostgresListener
- type PostgresReadOnlyCollection
- type PostgresReadWriteCollection
- type ReadOnlyCollection
- type ReadWriteCollection
- type Renewer
- type STM
- type SortOrder
- type SortTarget
- type TestItem
- func (*TestItem) Descriptor() ([]byte, []int)deprecated
- func (x *TestItem) GetData() string
- func (x *TestItem) GetId() string
- func (x *TestItem) GetValue() string
- func (x *TestItem) MarshalLogObject(enc zapcore.ObjectEncoder) error
- func (*TestItem) ProtoMessage()
- func (x *TestItem) ProtoReflect() protoreflect.Message
- func (x *TestItem) Reset()
- func (x *TestItem) String() string
Constants ¶
const (
ChannelBufferSize = 1000
)
const (
DefaultPrefix string = "pachyderm/1.7.0"
)
defaultLimit was experimentally determined to be the highest value that could work (It gets scaled down for specific collections if they trip the max-message size.)
Variables ¶
var ( // ErrNotClaimed is an error used to indicate that a different requester beat // the current requester to a key claim. ErrNotClaimed = errors.New("NOT_CLAIMED") DefaultTTL = int64(30) )
var ( SortByCreateRevision = etcd.SortByCreateRevision SortByModRevision = etcd.SortByModRevision SortByKey = etcd.SortByKey SortNone = etcd.SortNone SortAscend = etcd.SortAscend SortDescend = etcd.SortDescend )
Hoist these consts so users don't have to import etcd
var File_internal_collection_test_proto protoreflect.FileDescriptor
Functions ¶
func IsErrExists ¶
IsErrExists determines if an error is an ErrExists error
func IsErrNotFound ¶
IsErrNotFound determines if an error is an ErrNotFound error
func IsErrNotUnique ¶
IsErrNotUnique determines if an error is an ErrNotUnique error
func NewDryrunSQLTx ¶
NewDryrunSQLTx is identical to NewSQLTx except it will always roll back the transaction instead of committing it.
func NewDryrunSTM ¶
NewDryrunSTM intiates a new STM operation, but the final commit is skipped. It uses a serializable model.
Types ¶
type ErrExists ¶
ErrExists indicates that a key was found to exist when it was expected not to.
func (ErrExists) GRPCStatus ¶
type ErrNotFound ¶
ErrNotFound indicates that a key was not found when it was expected to exist.
func (ErrNotFound) Error ¶
func (err ErrNotFound) Error() string
func (ErrNotFound) GRPCStatus ¶
func (e ErrNotFound) GRPCStatus() *status.Status
func (ErrNotFound) Is ¶
func (err ErrNotFound) Is(other error) bool
type ErrNotUnique ¶
ErrNotUnique indicates that an indexed query expected to have exactly one result but had more than one result.
func (ErrNotUnique) Error ¶
func (err ErrNotUnique) Error() string
func (ErrNotUnique) Is ¶
func (err ErrNotUnique) Is(other error) bool
type EtcdCollection ¶
type EtcdCollection interface { // ReadWrite enables reads and writes on a collection in a // transactional manner. Specifically, all writes are applied // atomically, and writes are only applied if reads have not been // invalidated at the end of the transaction. Basically, it's // software transactional memory. See this blog post for details: // https://coreos.com/blog/transactional-memory-with-etcd3.html ReadWrite(stm STM) EtcdReadWriteCollection // For read-only operations, use the ReadOnly for better performance ReadOnly(ctx context.Context) EtcdReadOnlyCollection // Claim attempts to claim a key and run the passed in callback with // the context for the claim. Claim(ctx context.Context, key string, val proto.Message, f func(context.Context) error) error WithRenewer(ctx context.Context, cb func(context.Context, *Renewer) error) error }
type EtcdReadOnlyCollection ¶
type EtcdReadOnlyCollection interface { ReadOnlyCollection // TTL returns the number of seconds that 'key' will continue to exist in the // collection, or '0' if 'key' will remain in the collection indefinitely // TODO: TTL might be unused TTL(key string) (int64, error) // CountRev returns the number of items in the collection at a specific // revision, it's only in EtcdReadOnlyCollection because only etcd has // revs. // TODO: CountRev might be unused CountRev(int64) (int64, int64, error) }
type EtcdReadWriteCollection ¶
type EtcdReadWriteCollection interface { ReadWriteCollection // TTL returns the amount of time that 'key' will continue to exist in the // collection, or '0' if 'key' will remain in the collection indefinitely TTL(key string) (int64, error) // PutTTL is the same as Put except that the object is removed after // TTL seconds. // WARNING: using PutTTL with a collection that has secondary indices // can result in inconsistency, as the indices are removed at roughly // but not exactly the same time as the documents. PutTTL(key string, val proto.Message, ttl int64) error DeleteAllPrefix(prefix string) error }
type Index ¶
type Index struct { Name string Extract func(val proto.Message) string // contains filtered or unexported fields }
Index specifies a secondary index on a collection.
Indexes are created in a transactional manner thanks to etcd's transactional support.
A secondary index for collection "foo" on field "bar" will reside under the path `/foo__index_bar`. Each item under the path is in turn a directory whose name is the value of the field `bar`. For instance, if you have a object in collection `foo` whose `bar` field is `test`, then you will see a directory at path `/foo__index_bar/test`.
Under that directory, you have keys that point to items in the collection. For instance, if the aforementioned object has the key "buzz", then you will see an item at `/foo__index_bar/test/buzz`. The value of this item is empty. Thus, to get all items in collection `foo` whose values of field `bar` is `test`, we issue a query for all items under `foo__index_bar/test`.
type Notification ¶
type Notification = pq.Notification
type Notifier ¶
type Notifier interface { // ID is a unique identifier for the notifier. ID() string // Channel is the channel that this notifier should receive notifications for. Channel() string // Notify sends a notification to the notifier. Notify(*Notification) // Error sends an error to the notifier. Error(error) }
type Option ¶
type Option func(collection *postgresCollection)
func WithExistsMessage ¶
func WithKeyCheck ¶
func WithKeyGen ¶
func WithListBufferCapacity ¶
func WithNotFoundMessage ¶
type Options ¶
type Options struct { Target SortTarget Order SortOrder // Limit is only implemented for postgres collections Limit int }
Options are the sort options when iterating through etcd key/values. Currently implemented sort targets are CreateRevision and ModRevision.
func DefaultOptions ¶
func DefaultOptions() *Options
DefaultOptions are the default sort options when iterating through etcd key/values.
type PostgresCollection ¶
type PostgresCollection interface { // ReadWrite enables reads and writes on a collection in a // transactional manner. Specifically, all writes are applied // atomically, and writes are only applied if reads have not been // invalidated at the end of the transaction. Basically, it's // software transactional memory. See this blog post for details: // https://coreos.com/blog/transactional-memory-with-etcd3.html ReadWrite(tx *pachsql.Tx) PostgresReadWriteCollection // For read-only operations, use the ReadOnly for better performance ReadOnly(ctx context.Context) PostgresReadOnlyCollection }
func NewPostgresCollection ¶
func NewPostgresCollection(name string, db *pachsql.DB, listener PostgresListener, template proto.Message, indexes []*Index, opts ...Option) PostgresCollection
NewPostgresCollection creates a new collection backed by postgres.
type PostgresListener ¶
type PostgresListener interface { // Register registers a notifier with the postgres listener. // A notifier will receive notifications for the channel it is associated // with while registered with the postgres listener. Register(Notifier) error // Unregister unregisters a notifier with the postgres listener. // A notifier will no longer receive notifications when this call completes. Unregister(Notifier) error // Close closes the postgres listener. Close() error }
func NewPostgresListener ¶
func NewPostgresListener(dsn string) PostgresListener
type PostgresReadOnlyCollection ¶
type PostgresReadOnlyCollection interface { ReadOnlyCollection GetRevByIndex(index *Index, indexVal string, val proto.Message, opts *Options, f func(string, int64) error) error // GetUniqueByIndex is identical to GetByIndex except it is an error if // exactly one row is not found. // TODO: decide if we should merge this with GetByIndex and use an `Options`. GetUniqueByIndex(index *Index, indexVal string, val proto.Message) error }
type PostgresReadWriteCollection ¶
type PostgresReadWriteCollection interface { ReadWriteCollection DeleteByIndex(index *Index, indexVal string) error // GetByIndex can have a large impact on database contention if used to retrieve // a large number of rows. Consider using a read-only collection if possible GetByIndex(index *Index, indexVal string, val proto.Message, opts *Options, f func(string) error) error // GetUniqueByIndex is identical to GetByIndex except it is an error if // exactly one row is not found. // TODO: decide if we should merge this with GetByIndex and use an `Options`. GetUniqueByIndex(index *Index, indexVal string, val proto.Message) error // NOTE: List scans the collection over multiple queries, // making this method susceptible to inconsistent reads List(val proto.Message, opts *Options, f func(string) error) error }
type ReadOnlyCollection ¶
type ReadOnlyCollection interface { Get(key interface{}, val proto.Message) error GetByIndex(index *Index, indexVal string, val proto.Message, opts *Options, f func(string) error) error List(val proto.Message, opts *Options, f func(string) error) error ListRev(val proto.Message, opts *Options, f func(string, int64) error) error Count() (int64, error) Watch(opts ...watch.Option) (watch.Watcher, error) WatchF(f func(*watch.Event) error, opts ...watch.Option) error WatchOne(key interface{}, opts ...watch.Option) (watch.Watcher, error) WatchOneF(key interface{}, f func(*watch.Event) error, opts ...watch.Option) error WatchByIndex(index *Index, val string, opts ...watch.Option) (watch.Watcher, error) WatchByIndexF(index *Index, val string, f func(*watch.Event) error, opts ...watch.Option) error }
ReadOnlyCollection is a collection interface that only supports read ops.
type ReadWriteCollection ¶
type ReadWriteCollection interface { Get(key interface{}, val proto.Message) error Put(key interface{}, val proto.Message) error // Update reads the current value associated with 'key', calls 'f' to update // the value, and writes the new value back to the collection. 'key' must be // present in the collection, or a 'Not Found' error is returned Update(key interface{}, val proto.Message, f func() error) error // Upsert is like Update but 'key' is not required to be present Upsert(key interface{}, val proto.Message, f func() error) error Create(key interface{}, val proto.Message) error Delete(key interface{}) error DeleteAll() error }
ReadWriteCollection is a collection interface that supports read,write and delete operations.
type STM ¶
type STM interface { // Get returns the value for a key and inserts the key in the txn's read set. // If Get fails, it aborts the transaction with an error, never returning. Get(key string) (string, error) // Put adds a value for a key to the write set. Put(key, val string, ttl int64, ptr uintptr) error PutLease(key, val string, lease v3.LeaseID, ptr uintptr) error PutIgnoreLease(key, val string, ptr uintptr) error // Del deletes a key. Del(key string) // TTL returns the remaining time to live for 'key', or 0 if 'key' has no TTL TTL(key string) (int64, error) // DelAll deletes all keys with the given prefix // Note that the current implementation of DelAll is incomplete. // To use DelAll safely, do not issue any Get/Put operations after // DelAll is called. DelAll(key string) Context() context.Context // SetSafePutCheck sets the bit pattern to check if a put is safe. SetSafePutCheck(key string, ptr uintptr) // IsSafePut checks against the bit pattern for a key to see if it is safe to put. IsSafePut(key string, ptr uintptr) bool // contains filtered or unexported methods }
STM is an interface for software transactional memory.
type SortTarget ¶
type SortTarget = etcd.SortTarget
type TestItem ¶
type TestItem struct { Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` Data string `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` // contains filtered or unexported fields }
func (*TestItem) Descriptor
deprecated
func (*TestItem) MarshalLogObject ¶
func (x *TestItem) MarshalLogObject(enc zapcore.ObjectEncoder) error
func (*TestItem) ProtoMessage ¶
func (*TestItem) ProtoMessage()
func (*TestItem) ProtoReflect ¶ added in v2.7.0
func (x *TestItem) ProtoReflect() protoreflect.Message