store

package
v0.0.0-...-ff5f600 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2016 License: BSD-3-Clause Imports: 5 Imported by: 0

Documentation

Overview

Package store defines the API for the syncbase storage engine. Currently, this API and its implementations are meant to be internal.

Index

Constants

View Source
const (
	ErrMsgClosedStore     = "closed store"
	ErrMsgAbortedSnapshot = "aborted snapshot"
	ErrMsgCanceledStream  = "canceled stream"
	ErrMsgCommittedTxn    = "already called commit"
	ErrMsgAbortedTxn      = "already called abort"
	ErrMsgExpiredTxn      = "expired transaction"
)

TODO(sadovsky): Maybe define verrors for these.

View Source
const EngineForTest = "leveldb"

Variables

View Source
var (

	// ConcurrentTransaction means that the current transaction failed to commit
	// because its read set was invalidated by some other transaction.
	ErrConcurrentTransaction = verror.Register("v.io/x/ref/services/syncbase/store.ConcurrentTransaction", verror.NoRetry, "{1:}{2:} Concurrent transaction{:_}")
	// UnknownKey means the given key does not exist in the store.
	ErrUnknownKey = verror.Register("v.io/x/ref/services/syncbase/store.UnknownKey", verror.NoRetry, "{1:}{2:} Unknown key{:_}")
)

Functions

func ConvertError

func ConvertError(err error) error

ConvertError returns a copy of the verror, appending the current stack to it.

func CopyBytes

func CopyBytes(dst, src []byte) []byte

CopyBytes copies elements from a source slice into a destination slice. The returned slice may be a sub-slice of dst if dst was large enough to hold src. Otherwise, a newly allocated slice will be returned. TODO(rogulenko): add some tests.

func Delete

func Delete(ctx *context.T, st StoreWriter, k string) error

Delete does st.Delete(k) and wraps the returned error.

func Exists

func Exists(ctx *context.T, st StoreReader, k string) (bool, error)

Exists returns true if the key exists in the store. TODO(rdaoud): for now it only bypasses the Get's VOM decode step. It should be optimized further by adding a st.Exists(k) API and let each implementation do its best to reduce data fetching in its key lookup.

func Get

func Get(ctx *context.T, st StoreReader, k string, v interface{}) error

Get does st.Get(k, v) and wraps the returned error.

func NewErrConcurrentTransaction

func NewErrConcurrentTransaction(ctx *context.T) error

NewErrConcurrentTransaction returns an error with the ErrConcurrentTransaction ID.

func NewErrUnknownKey

func NewErrUnknownKey(ctx *context.T) error

NewErrUnknownKey returns an error with the ErrUnknownKey ID.

func Put

func Put(ctx *context.T, st StoreWriter, k string, v interface{}) error

Put does st.Put(k, v) and wraps the returned error.

func RunInTransaction

func RunInTransaction(st Store, fn func(tx Transaction) error) error

RunInTransaction runs the given fn in a transaction, managing retries and commit/abort.

func RunInTransactionWithOpts

func RunInTransactionWithOpts(st Store, opts *TransactionOptions, fn func(tx Transaction) error) error

RunInTransactionWithOpts runs the given fn in a transaction, managing retries and commit/abort.

func RunWithSnapshot

func RunWithSnapshot(st Store, fn func(sntx SnapshotOrTransaction) error) error

RunWithSnapshot runs the given fn with a snapshot that is aborted afterwards.

Types

type InvalidSnapshot

type InvalidSnapshot struct {
	SnapshotSpecImpl
	Error error // returned by all methods
}

InvalidSnapshot is a Snapshot for which all methods return errors.

func (*InvalidSnapshot) Abort

func (s *InvalidSnapshot) Abort() error

Abort implements the store.Snapshot interface.

func (*InvalidSnapshot) Get

func (s *InvalidSnapshot) Get(key, valbuf []byte) ([]byte, error)

Get implements the store.StoreReader interface.

func (*InvalidSnapshot) Scan

func (s *InvalidSnapshot) Scan(start, limit []byte) Stream

Scan implements the store.StoreReader interface.

type InvalidStream

type InvalidStream struct {
	Error error // returned by all methods
}

InvalidStream is a Stream for which all methods return errors.

func (*InvalidStream) Advance

func (s *InvalidStream) Advance() bool

Advance implements the store.Stream interface.

func (*InvalidStream) Cancel

func (s *InvalidStream) Cancel()

Cancel implements the store.Stream interface.

func (*InvalidStream) Err

func (s *InvalidStream) Err() error

Err implements the store.Stream interface.

func (*InvalidStream) Key

func (s *InvalidStream) Key(keybuf []byte) []byte

Key implements the store.Stream interface.

func (*InvalidStream) Value

func (s *InvalidStream) Value(valbuf []byte) []byte

Value implements the store.Stream interface.

type InvalidTransaction

type InvalidTransaction struct {
	Error error // returned by all methods
}

InvalidTransaction is a Transaction for which all methods return errors.

func (*InvalidTransaction) Abort

func (tx *InvalidTransaction) Abort() error

Abort implements the store.Transaction interface.

func (*InvalidTransaction) Commit

func (tx *InvalidTransaction) Commit() error

Commit implements the store.Transaction interface.

func (*InvalidTransaction) Delete

func (tx *InvalidTransaction) Delete(key []byte) error

Delete implements the store.StoreWriter interface.

func (*InvalidTransaction) Get

func (tx *InvalidTransaction) Get(key, valbuf []byte) ([]byte, error)

Get implements the store.StoreReader interface.

func (*InvalidTransaction) Put

func (tx *InvalidTransaction) Put(key, value []byte) error

Put implements the store.StoreWriter interface.

func (*InvalidTransaction) ResetForRetry

func (tx *InvalidTransaction) ResetForRetry()

ResetForRetry implements the store.Transaction interface.

func (*InvalidTransaction) Scan

func (tx *InvalidTransaction) Scan(start, limit []byte) Stream

Scan implements the store.StoreReader interface.

type ResourceNode

type ResourceNode struct {
	// contains filtered or unexported fields
}

ResourceNode is a node in a dependency graph. This graph is used to ensure that when a resource is freed, downstream resources are also freed. For example, closing a store closes all downstream transactions, snapshots and streams.

func NewResourceNode

func NewResourceNode() *ResourceNode

NewResourceNode creates a new isolated node in the dependency graph.

func (*ResourceNode) AddChild

func (r *ResourceNode) AddChild(node *ResourceNode, closefn func())

AddChild adds a parent-child relation between this node and the provided node. The provided function is called to close the child when this node is closed.

func (*ResourceNode) Close

func (r *ResourceNode) Close()

Close closes this node and detaches it from its parent. All of this node's children are closed using close functions provided to AddChild.

type Snapshot

type Snapshot interface {
	SnapshotOrTransaction
	// contains filtered or unexported methods
}

Snapshot is a handle to particular state in time of a Store.

All read operations are executed against a consistent view of Store commit history. Snapshots don't acquire locks and thus don't block transactions.

type SnapshotOrTransaction

type SnapshotOrTransaction interface {
	StoreReader

	// Abort closes the snapshot or transaction.
	// Any subsequent method calls will fail.
	// NOTE: this method is also used to distinguish between StoreReader and
	// SnapshotOrTransaction.
	Abort() error
}

SnapshotOrTransaction represents a Snapshot or a Transaction.

type SnapshotSpecImpl

type SnapshotSpecImpl struct{}

type Store

type Store interface {
	StoreReader
	StoreWriter

	// Close closes the store.
	Close() error

	// NewTransaction creates a transaction.
	// TODO(rogulenko): add transaction options.
	NewTransaction() Transaction

	// NewSnapshot creates a snapshot.
	// TODO(rogulenko): add snapshot options.
	NewSnapshot() Snapshot
}

Store is a CRUD-capable storage engine that supports transactions.

type StoreReader

type StoreReader interface {
	// Get returns the value for the given key. The returned slice may be a
	// sub-slice of valbuf if valbuf was large enough to hold the entire value.
	// Otherwise, a newly allocated slice will be returned. It is valid to pass a
	// nil valbuf.
	// If the given key is unknown, valbuf is returned unchanged and the function
	// fails with ErrUnknownKey.
	//
	// It is safe to modify the contents of the key after Get returns.
	Get(key, valbuf []byte) ([]byte, error)

	// Scan returns all rows with keys in range [start, limit). If limit is "",
	// all rows with keys >= start are included.
	// Concurrency semantics: It is legal to perform writes concurrently with
	// Scan. The returned stream may or may not reflect subsequent writes to keys
	// not yet reached by the stream.
	//
	// It is safe to modify the contents of the arguments after Scan returns.
	Scan(start, limit []byte) Stream
}

StoreReader reads data from a CRUD-capable storage engine.

type StoreWriter

type StoreWriter interface {
	// Put writes the given value for the given key.
	//
	// WARNING: For performance reasons, a Put inside a transaction doesn't make
	// a defensive copy of the value. The client MUST keep the value unchanged
	// until the transaction commits or aborts.
	//
	// It is safe to modify the contents of the key after Put returns.
	Put(key, value []byte) error

	// Delete deletes the entry for the given key.
	// Succeeds (no-op) if the given key is unknown.
	//
	// It is safe to modify the contents of the key after Delete returns.
	Delete(key []byte) error
}

StoreWriter writes data to a CRUD-capable storage engine.

type Stream

type Stream interface {
	// Advance stages an element so the client can retrieve it with Key or Value.
	// Advance returns true iff there is an element to retrieve. The client must
	// call Advance before calling Key or Value. The client must call Cancel if it
	// does not iterate through all elements (i.e. until Advance returns false).
	// Advance may block if an element is not immediately available.
	Advance() bool

	// Key returns the key of the element that was staged by Advance. The returned
	// slice may be a sub-slice of keybuf if keybuf was large enough to hold the
	// entire key. Otherwise, a newly allocated slice will be returned. It is
	// valid to pass a nil keybuf.
	// Key may panic if Advance returned false or was not called at all.
	// Key does not block.
	Key(keybuf []byte) []byte

	// Value returns the value of the element that was staged by Advance. The
	// returned slice may be a sub-slice of valbuf if valbuf was large enough to
	// hold the entire value. Otherwise, a newly allocated slice will be returned.
	// It is valid to pass a nil valbuf.
	// Value may panic if Advance returned false or was not called at all.
	// Value does not block.
	Value(valbuf []byte) []byte

	// Err returns a non-nil error iff the stream encountered any errors. Err does
	// not block.
	Err() error

	// Cancel notifies the stream provider that it can stop producing elements.
	// The client must call Cancel if it does not iterate through all elements
	// (i.e. until Advance returns false). Cancel is idempotent and can be called
	// concurrently with a goroutine that is iterating via Advance/Key/Value.
	// Cancel causes Advance to subsequently return false. Cancel does not block.
	Cancel()
}

Stream is an interface for iterating through a collection of key-value pairs.

type Transaction

type Transaction interface {
	SnapshotOrTransaction
	StoreWriter

	// Commit commits the transaction.
	// Fails if writes from outside this transaction conflict with reads from
	// within this transaction.
	Commit() error
}

Transaction provides a mechanism for atomic reads and writes. Instead of calling this function directly, clients are encouraged to use the RunInTransaction() helper function, which detects "concurrent transaction" errors and handles retries internally.

Default concurrency semantics:

  • Reads (e.g. gets, scans) inside a transaction operate over a consistent snapshot taken during NewTransaction(), and will see the effects of prior writes performed inside the transaction.
  • Commit() may fail with ErrConcurrentTransaction, indicating that after NewTransaction() but before Commit(), some concurrent routine wrote to a key that matches a key or row-range read inside this transaction.
  • Other methods will never fail with error ErrConcurrentTransaction, even if it is known that Commit() will fail with this error.

Once a transaction has been committed or aborted, subsequent method calls will fail with no effect.

type TransactionOptions

type TransactionOptions struct {
	NumAttempts int // number of attempts; only used by RunInTransaction
}

TODO(razvanm): Another copy of this is inside in store/watchable/. TODO(sadovsky): Move this to model.go and make it an argument to Store.NewTransaction.

Directories

Path Synopsis
Package leveldb provides a LevelDB-based implementation of store.Store.
Package leveldb provides a LevelDB-based implementation of store.Store.
Package memstore provides a simple, in-memory implementation of store.Store.
Package memstore provides a simple, in-memory implementation of store.Store.
Package ptrie provides a ptrie to store a mapping from bit strings to arbitrary values.
Package ptrie provides a ptrie to store a mapping from bit strings to arbitrary values.
Package watchable provides a Syncbase-specific store.Store wrapper that provides versioned storage for specified prefixes and maintains a watchable log of operations performed on versioned records.
Package watchable provides a Syncbase-specific store.Store wrapper that provides versioned storage for specified prefixes and maintains a watchable log of operations performed on versioned records.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL