tables

package
v0.0.0-...-cdb9ebd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2024 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultBulkConcurrency is the number of concurrent updates or actions
	// permitted at once when using bulk operations.
	DefaultBulkConcurrency = 64

	// DefaultPageSize is the number of records fetched in a page.
	DefaultPageSize = 100

	// TracingModuleName is the name of the module to show in any OpenTelemetry
	// trace records for this package.
	TracingModuleName = "charydbis"
)

Variables

View Source
var ErrPreconditionFailed = errors.New("precondition failed for LWT operation")

ErrPreconditionFailed indicates an IF predicate on an LWT was not satisfied

Functions

This section is empty.

Types

type ChangeHook

type ChangeHook[T any] func(ctx context.Context, updatedRecord *T) error

ChangeHook is a function that receives an object in response to a pre or post change event.

type GreedyScanner

type GreedyScanner[T any] struct {
	// contains filtered or unexported fields
}

GreedyScanner is a helper type that lets us read all records from a table without iterating. It is essentially able have its OnPage function passed as the PageHandlerFn to other functions.

func (*GreedyScanner[T]) OnPage

func (g *GreedyScanner[T]) OnPage(ctx context.Context, records []*T, originalPagingState []byte, newPagingState []byte) (bool, error)

OnPage is a PageHandlerFn that always keeps requesting more data

func (*GreedyScanner[T]) Preallocate

func (g *GreedyScanner[T]) Preallocate(cap int)

Preallocate ensures the slice exists, pre-allocated to a given size

func (*GreedyScanner[T]) Result

func (g *GreedyScanner[T]) Result() []*T

Result of the scan operation

type InsertOption

type InsertOption interface {
	// contains filtered or unexported methods
}

InsertOption is an interface that describes options that can mutate an insert

func WithInsertTTL

func WithInsertTTL(d time.Duration) InsertOption

WithInsertTTL sets the TTL option for an insert.

func WithNotExists

func WithNotExists() InsertOption

WithNotExists sets IF NOT EXISTS on the query to ensure an insert is a new record.

type ManagerOption

type ManagerOption interface {
	// contains filtered or unexported methods
}

ManagerOption defines an option for the table manager

func WithCluster

func WithCluster(cluster utils.ClusterConfigGeneratorFn) ManagerOption

WithCluster sets the cluster connection to use when working with the manager instance

func WithDefaultReadConsistency

func WithDefaultReadConsistency(level gocql.Consistency) ManagerOption

WithDefaultReadConsistency sets the default read consistency

func WithDefaultTTL

func WithDefaultTTL(d time.Duration) ManagerOption

WithDefaultTTL is a table-manager option that sets the default TTL for inserts and updates.

func WithDefaultWriteConsistency

func WithDefaultWriteConsistency(level gocql.Consistency) ManagerOption

WithDefaultWriteConsistency sets the default read consistency

func WithKeyspace

func WithKeyspace(keyspace string) ManagerOption

WithKeyspace sets the keyspace of the table-manager

func WithLogger

func WithLogger(log *zap.Logger) ManagerOption

WithLogger sets the logger for the table manager

func WithSpecMutator

func WithSpecMutator(mutator SpecMutator) ManagerOption

WithSpecMutator mutates the table/view specifications on startup

func WithStartupFn

func WithStartupFn(fn TableManagerStartupFn) ManagerOption

WithStartupFn attaches a function at startup time for the table-manager. This can for example be used to perform DDL/table specificiation maintainance.

func WithTableSpecification

func WithTableSpecification(spec *metadata.TableSpecification) ManagerOption

WithTableSpecification sets the table specification to use

func WithTraceProvider

func WithTraceProvider(provider trace.TracerProvider) ManagerOption

WithTraceProvider sets the trace provider

func WithViewSpecification

func WithViewSpecification(spec *metadata.ViewSpecification) ManagerOption

WithViewSpecification sets the view specification to use

type PageHandlerFn

type PageHandlerFn[T any] func(ctx context.Context, records []*T, originalPagingState []byte, newPagingState []byte) (bool, error)

PageHandlerFn is a function used when querying a block of records from the table. If true is returned the scan will continue advancing. The page state is an opaque value that can be passed using WithPageState to resume a query later.

type QueryBuilderFn

type QueryBuilderFn func(ctx context.Context, sess gocqlx.Session) *gocqlx.Queryx

QueryBuilderFn is a function used to provide custom query instances to execute.

type QueryOption

type QueryOption interface {
	// contains filtered or unexported methods
}

QueryOption is an interface that describes options that can mutate a scan.

func WithPaging

func WithPaging(pageSize int, state []byte) QueryOption

WithPaging sets the paging state to enable resuming a query on a revisit

func WithSort

func WithSort(column string, order int) QueryOption

WithSort sets the sort order for a query result

type SessionFactory

type SessionFactory func(keyspace string) (*gocql.Session, error)

type SinglePageScanner

type SinglePageScanner[T any] struct {
	// contains filtered or unexported fields
}

SinglePageScanner reads a single page of records and stops iteration.

func (*SinglePageScanner[T]) OnPage

func (g *SinglePageScanner[T]) OnPage(ctx context.Context, records []*T, originalPagingState []byte, newPagingState []byte) (bool, error)

OnPage is a PageHandlerFn that stops after a single page

func (*SinglePageScanner[T]) PageState

func (g *SinglePageScanner[T]) PageState() []byte

PageState returns the current page state of the scanner

func (*SinglePageScanner[T]) Preallocate

func (g *SinglePageScanner[T]) Preallocate(cap int)

Preallocate ensures the slice exists, pre-allocated to a given size

func (*SinglePageScanner[T]) Result

func (g *SinglePageScanner[T]) Result() []*T

Result of the scan operation

type TableManager

type TableManager[T any] interface {
	// Count the number of records in the table.
	Count(ctx context.Context) (int64, error)

	// CountByPartitionKey gets the number of records in the partition.
	CountByPartitionKey(ctx context.Context, partitionKeys ...any) (int64, error)

	// CountByCustomQuery gets the number of records in a custom query.
	CountByCustomQuery(ctx context.Context, queryBuilder QueryBuilderFn) (int64, error)

	// Delete removes an object. Technically only the object keys need be present.
	Delete(ctx context.Context, instance *T) error

	// DeleteByPrimaryKey removes a single row by its primary key values. Keys must be specified in order.
	DeleteByPrimaryKey(ctx context.Context, keys ...any) error

	// GetByPartitionKey gets the first record from a partition. If there are multiple records, the
	// behaviour is to return the first record by clustering order. Equivalent to GetByPrimaryKey
	// if no clustering key is set
	GetByPartitionKey(ctx context.Context, keys ...any) (*T, error)

	// GetByPrimaryKey gets by the full primary key (partitioning and clustering keys)
	GetByPrimaryKey(ctx context.Context, primaryKeys ...any) (*T, error)

	// GetByIndexedColumn gets the first record matching an index
	GetByIndexedColumn(ctx context.Context, columnName string, value any, opts ...QueryOption) (*T, error)

	// GetTableSpec gets the table specification for this table-manager
	GetTableSpec() *metadata.TableSpecification

	// Insert a single record
	Insert(ctx context.Context, instance *T, options ...InsertOption) error

	// InsertOrReplace inserts a single record if there is no existing record.
	InsertOrReplace(ctx context.Context, instance *T, options ...InsertOption) error

	// InsertBulk inserts many objects in parallel, up to a given number. If the concurrency limit is not set,
	// then a default of DefaultBulkConcurrency is used.
	InsertBulk(ctx context.Context, instances []*T, concurrency int, opts ...InsertOption) error

	// Scan performs a paged scan of the table, processing each batch of records. If the ScanFn returns true,
	// the scan will continue advancing until no more records are returned.
	Scan(ctx context.Context, fn PageHandlerFn[T], opts ...QueryOption) error

	// SelectByCustomQuery gets all records by a custom query in a paged fashion
	SelectByCustomQuery(ctx context.Context, queryBuilder QueryBuilderFn, pagingFn PageHandlerFn[T], opts ...QueryOption) error

	// SelectByPartitionKey gets all records from a partition
	SelectByPartitionKey(ctx context.Context, fn PageHandlerFn[T], opts []QueryOption, partitionKeys ...any) error

	// SelectByPrimaryKey gets all records by partition key and any clustering keys provided
	SelectByPrimaryKey(ctx context.Context, fn PageHandlerFn[T], opts []QueryOption, primaryKeys ...any) error

	// SelectByIndexedColumn gets all records matching an indexed column
	SelectByIndexedColumn(ctx context.Context, fn PageHandlerFn[T], columnName string, columnValue any, opts ...QueryOption) error

	// Update an object. Will error if the object does not exist.
	Update(ctx context.Context, instance *T, opts ...UpdateOption) error

	// Upsert overwrites or inserts an object.
	Upsert(ctx context.Context, instance *T, opts ...UpsertOption) error

	// UpsertBulk upserts many objects in parallel, up to a given number. If the concurrency limit is not set,
	// then a default of DefaultBulkConcurrency is used.
	UpsertBulk(ctx context.Context, instances []*T, concurrency int, opts ...UpsertOption) error

	// AddPreChangeHook adds a pre-change hook. These hooks do not fire for deletes.
	AddPreChangeHook(hook ChangeHook[T])

	// AddPostChangeHook adds a post-change hook. Note that post-change hooks that fail
	// will leave the base tables updated. These hooks do not fire for deletes.
	AddPostChangeHook(hook ChangeHook[T])

	// AddPreDeleteHook adds a pre-delete hook. This will force an additional cost, in
	// that we must retrieve the full record first before.
	AddPreDeleteHook(hook ChangeHook[T])
}

TableManager is an object that provides an abstraction over a table in ScyllaDB

func NewTableManager

func NewTableManager[T any](ctx context.Context, options ...ManagerOption) (TableManager[T], error)

NewTableManager creates a table-manager instance

type TableManagerStartupFn

type TableManagerStartupFn func(ctx context.Context, keyspace string, table *metadata.TableSpecification, view *metadata.ViewSpecification, extraOps ...metadata.DDLOperation) error

TableManagerStartupFn is a startup function called before the table-manager is deemed ready to use.

type UpdateOption

type UpdateOption interface {
	// contains filtered or unexported methods
}

UpdateOption is an interface that describes options that can mutate an update

func WithConditionalUpdate

func WithConditionalUpdate(cmp qb.Cmp, payload map[string]any) UpdateOption

WithConditionalUpdate does a conditional update with a custom predicate and many values.

func WithSimpleIf

func WithSimpleIf(targetColumn string, val any) UpdateOption

WithSimpleIf allows for a LWT that does a simple value-based comparison on a single column

func WithUpdateTTL

func WithUpdateTTL(ttl time.Duration) UpdateOption

WithUpdateTTL sets the TTL option for an update.

type UpsertOption

type UpsertOption interface {
	InsertOption
	UpdateOption
}

UpsertOption is an option that can be used for inserts or update

func WithConditionalUpsert

func WithConditionalUpsert(cmp qb.Cmp, payload map[string]any) UpsertOption

WithConditionalUpsert does a conditional update with a custom predicate and many values.

func WithSimpleUpsertIf

func WithSimpleUpsertIf(targetColumn string, val any) UpsertOption

WithSimpleUpsertIf allows for a LWT that does a simple value-based comparison on a single column

func WithTTL

func WithTTL(d time.Duration) UpsertOption

WithTTL sets the TTL option for an upsert.

type ViewManager

type ViewManager[T any] interface {
	// CountByPartitionKey gets the number of records in the partition.
	CountByPartitionKey(ctx context.Context, partitionKeys ...any) (int64, error)

	// CountByCustomQuery gets the number of records in a custom query.
	CountByCustomQuery(ctx context.Context, queryBuilder QueryBuilderFn) (int64, error)

	// GetByPartitionKey gets the first record from a partition. If there are multiple records, the
	// behaviour is to return the first record by clustering order. Equivalent to GetByPrimaryKey
	// if no clustering key is set
	GetByPartitionKey(ctx context.Context, keys ...any) (*T, error)

	// GetByPrimaryKey gets by the full primary key (partitioning and clustering keys)
	GetByPrimaryKey(ctx context.Context, primaryKeys ...any) (*T, error)

	// GetByIndexedColumn gets the first record matching an index
	GetByIndexedColumn(ctx context.Context, columnName string, value any, opts ...QueryOption) (*T, error)

	// Scan performs a paged scan of the table, processing each batch of records. If the ScanFn returns true,
	// the scan will continue advancing until no more records are returned.
	Scan(ctx context.Context, fn PageHandlerFn[T], opts ...QueryOption) error

	// SelectByCustomQuery gets all records by a custom query in a paged fashion
	SelectByCustomQuery(ctx context.Context, queryBuilder QueryBuilderFn, pagingFn PageHandlerFn[T], opts ...QueryOption) error

	// SelectByPartitionKey gets all records from a partition
	SelectByPartitionKey(ctx context.Context, fn PageHandlerFn[T], opts []QueryOption, partitionKeys ...any) error

	// SelectByPrimaryKey gets all records from a partition
	SelectByPrimaryKey(ctx context.Context, fn PageHandlerFn[T], opts []QueryOption, primaryKeys ...any) error

	// SelectByIndexedColumn gets all records matching an indexed column
	SelectByIndexedColumn(ctx context.Context, fn PageHandlerFn[T], columnName string, columnValue any, opts ...QueryOption) error
}

ViewManager is an object that provides an abstraction over a view in ScyllaDB

func NewViewManager

func NewViewManager[T any](ctx context.Context, options ...ManagerOption) (ViewManager[T], error)

NewViewManager creates a view-manager instance. This is essentially a table-manager but without the insert/update/delete operations available.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL