oxia

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2024 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Overview

Package oxia provides a Go client library for interacting with Oxia service.

Example
standaloneServer := initExampleServer(exampleServerAddr)
defer standaloneServer.Close()

// Creates a client instance
// Once created, a client instance will be valid until it's explicitly closed, and it can
// be used from different go-routines.
client, err := NewSyncClient(exampleServerAddr, WithRequestTimeout(10*time.Second))
if err != nil {
	log.Fatal(err)
}
defer client.Close()

// Write a record to Oxia with the specified key and value, and with the expectation
// that the record does not already exist.
res1, err := client.Put(context.Background(), "/my-key", []byte("value-1"), ExpectedRecordNotExists())
if err != nil {
	log.Fatal(err)
}

// Write a record with the expectation that it has not changed since the previous write.
// If there was any change, the operation will fail
_, err = client.Put(context.Background(), "/my-key", []byte("value-2"), ExpectedVersionId(res1.VersionId))
if err != nil {
	log.Fatal(err)
}

// Read the value of a record
value, version, err := client.Get(context.Background(), "/my-key")
if err != nil {
	log.Fatal(err)
}
_ = client.Close()
// Sleep to avoid DATA RACE on zerolog read at os.Stdout,and runExamples write at os.Stdout
time.Sleep(2 * time.Second)

fmt.Printf("Result: %s - Version: %#v\n", string(value), version.VersionId)
Output:

Result: value-2 - Version: 1

Index

Examples

Constants

View Source
const (
	DefaultBatchLinger         = 5 * time.Millisecond
	DefaultMaxRequestsPerBatch = 1000
	DefaultMaxBatchSize        = 128 * 1024
	DefaultRequestTimeout      = 30 * time.Second
	DefaultSessionTimeout      = 15 * time.Second
	DefaultNamespace           = common.DefaultNamespace
)
View Source
const (
	// VersionIdNotExists represent the VersionId of a non-existing record.
	VersionIdNotExists int64 = -1
)

Variables

View Source
var (
	// ErrKeyNotFound A record associated with the specified key was not found.
	ErrKeyNotFound = errors.New("key not found")

	// ErrUnexpectedVersionId The expected version id passed as a condition does not match
	// the current version id of the stored record.
	ErrUnexpectedVersionId = errors.New("unexpected version id")

	// ErrRequestTooLarge is returned when a request is larger than the maximum batch size.
	ErrRequestTooLarge = batch.ErrRequestTooLarge

	// ErrUnknownStatus Unknown error.
	ErrUnknownStatus = errors.New("unknown status")
)
View Source
var (
	ErrInvalidOptionBatchLinger         = errors.New("BatchLinger must be greater than or equal to zero")
	ErrInvalidOptionMaxRequestsPerBatch = errors.New("MaxRequestsPerBatch must be greater than zero")
	ErrInvalidOptionMaxBatchSize        = errors.New("MaxBatchSize must be greater than zero")
	ErrInvalidOptionRequestTimeout      = errors.New("RequestTimeout must be greater than zero")
	ErrInvalidOptionSessionTimeout      = errors.New("SessionTimeout must be greater than zero")
	ErrInvalidOptionIdentity            = errors.New("Identity must be non-empty")
	ErrInvalidOptionNamespace           = errors.New("Namespace cannot be empty")
	ErrInvalidOptionTLS                 = errors.New("Tls cannot be empty")
)

Functions

This section is empty.

Types

type AsyncClient

type AsyncClient interface {
	io.Closer

	// Put Associates a value with a key
	//
	// There are few options that can be passed to the Put operation:
	//  - The Put operation can be made conditional on that the record hasn't changed from
	//    a specific existing version by passing the [ExpectedVersionId] option.
	//  - Client can assert that the record does not exist by passing [ExpectedRecordNotExists]
	//  - Client can create an ephemeral record with [Ephemeral]
	//
	// Returns a [Version] object that contains information about the newly updated record
	// Returns [ErrorUnexpectedVersionId] if the expected version id does not match the
	// current version id of the record
	Put(key string, value []byte, options ...PutOption) <-chan PutResult

	// Delete removes the key and its associated value from the data store.
	//
	// The Delete operation can be made conditional on that the record hasn't changed from
	// a specific existing version by passing the [ExpectedVersionId] option.
	// Returns [ErrorUnexpectedVersionId] if the expected version id does not match the
	// current version id of the record
	Delete(key string, options ...DeleteOption) <-chan error

	// DeleteRange deletes any records with keys within the specified range.
	// Note: Oxia uses a custom sorting order that treats `/` characters in special way.
	// Refer to this documentation for the specifics:
	// https://github.com/streamnative/oxia/blob/main/docs/oxia-key-sorting.md
	DeleteRange(minKeyInclusive string, maxKeyExclusive string) <-chan error

	// Get returns the value associated with the specified key.
	// In addition to the value, a version object is also returned, with information
	// about the record state.
	// Returns ErrorKeyNotFound if the record does not exist
	Get(key string) <-chan GetResult

	// List any existing keys within the specified range.
	// Note: Oxia uses a custom sorting order that treats `/` characters in special way.
	// Refer to this documentation for the specifics:
	// https://github.com/streamnative/oxia/blob/main/docs/oxia-key-sorting.md
	List(ctx context.Context, minKeyInclusive string, maxKeyExclusive string) <-chan ListResult

	// GetNotifications creates a new subscription to receive the notifications
	// from Oxia for any change that is applied to the database
	GetNotifications() (Notifications, error)
}

AsyncClient Oxia client with methods suitable for asynchronous operations.

This interface exposes the same functionality as SyncClient, though it returns a channel instead of an actual result for the performed operations.

This allows to enqueue multiple operations which the client library will be able to group and automatically batch.

Batching of requests will ensure a larger throughput and more efficient handling. Applications can control the batching by configuring the linger-time with WithBatchLinger option in NewAsyncClient.

Example
standaloneServer := initExampleServer(exampleServerAddr)
defer standaloneServer.Close()

// Creates a client instance
// Once created, a client instance will be valid until it's explicitly closed, and it can
// be used from different go-routines.
client, err := NewAsyncClient(exampleServerAddr, WithRequestTimeout(10*time.Second))
if err != nil {
	log.Fatal(err)
}

// Write a record to Oxia with the specified key and value, and with the expectation
// that the record does not already exist.
// The client library will try to batch multiple operations into a single request, to
// achieve much better efficiency and performance
c1 := client.Put("/my-key-1", []byte("value-1"))
c2 := client.Put("/my-key-2", []byte("value-2"))
c3 := client.Put("/my-key-3", []byte("value-3"))

// Wait for the async operations to complete
r1 := <-c1
fmt.Printf("First operation complete: version: %#v - error: %#v\n", r1.Version.VersionId, r1.Err)

r2 := <-c2
fmt.Printf("First operation complete: version: %#v - error: %#v\n", r2.Version.VersionId, r2.Err)

r3 := <-c3
fmt.Printf("First operation complete: version: %#v - error: %#v\n", r3.Version.VersionId, r3.Err)

_ = client.Close()
// Sleep to avoid DATA RACE on zerolog read at os.Stdout,and runExamples write at os.Stdout
time.Sleep(2 * time.Second)
Output:

First operation complete: version: 0 - error: <nil>
First operation complete: version: 0 - error: <nil>
First operation complete: version: 0 - error: <nil>

func NewAsyncClient

func NewAsyncClient(serviceAddress string, opts ...ClientOption) (AsyncClient, error)

NewAsyncClient creates a new Oxia client with the async interface

ServiceAddress is the target host:port of any Oxia server to bootstrap the client. It is used for establishing the shard assignments. Ideally this should be a load-balanced endpoint.

A list of ClientOption arguments can be passed to configure the Oxia client. Example:

client, err := oxia.NewAsyncClient("my-oxia-service:6648", oxia.WithBatchLinger(10*time.Milliseconds))

type Cache

type Cache[Value any] interface {
	io.Closer

	// Put Associates a value with a key
	//
	// There are few options that can be passed to the Put operation:
	//  - The Put operation can be made conditional on that the record hasn't changed from
	//    a specific existing version by passing the [ExpectedVersionId] option.
	//  - Client can assert that the record does not exist by passing [ExpectedRecordNotExists]
	//  - Client can create an ephemeral record with [Ephemeral]
	//
	// Returns a [Version] object that contains information about the newly updated record
	// Returns [ErrorUnexpectedVersionId] if the expected version id does not match the
	// current version id of the record
	Put(ctx context.Context, key string, value Value, options ...PutOption) (Version, error)

	// ReadModifyUpdate applies atomically the result of the `modifyFunc` argument into
	// the database.
	// The `modifyFunc` will be pass the current value and will return the modified value
	ReadModifyUpdate(ctx context.Context, key string, modifyFunc ModifyFunc[Value]) error

	// Delete removes the key and its associated value from the data store.
	//
	// The Delete operation can be made conditional on that the record hasn't changed from
	// a specific existing version by passing the [ExpectedVersionId] option.
	// Returns [ErrorUnexpectedVersionId] if the expected version id does not match the
	// current version id of the record
	Delete(ctx context.Context, key string, options ...DeleteOption) error

	// Get returns the value associated with the specified key.
	// In addition to the value, a version object is also returned, with information
	// about the record state.
	// Returns ErrorKeyNotFound if the record does not exist
	Get(ctx context.Context, key string) (Value, Version, error)
}

Cache provides a view of the data stored in Oxia that is locally cached.

The cached values are automatically updated when there are updates or deletions. The cache is storing de-serialized object.

func NewCache

func NewCache[T any](client SyncClient, serializeFunc SerializeFunc, deserializeFunc DeserializeFunc) (Cache[T], error)

NewCache creates a new cache object for a specific type Uses the `serializeFunc` and `deserializeFunc` for SerDe.

type ClientOption

type ClientOption interface {
	// contains filtered or unexported methods
}

ClientOption is an interface for applying Oxia client options.

func WithBatchLinger

func WithBatchLinger(batchLinger time.Duration) ClientOption

WithBatchLinger defines how long the batcher will wait before sending a batched request. The value must be greater than or equal to zero. A value of zero will disable linger, effectively disabling batching.

func WithGlobalMeterProvider

func WithGlobalMeterProvider() ClientOption

WithGlobalMeterProvider instructs the Oxia client to use the global OpenTelemetry MeterProvider.

func WithIdentity

func WithIdentity(identity string) ClientOption

func WithMaxRequestsPerBatch

func WithMaxRequestsPerBatch(maxRequestsPerBatch int) ClientOption

WithMaxRequestsPerBatch defines how many individual requests a batch can contain before the batched request is sent. The value must be greater than zero. A value of one will effectively disable batching.

func WithMeterProvider

func WithMeterProvider(meterProvider metric.MeterProvider) ClientOption

func WithNamespace

func WithNamespace(namespace string) ClientOption

WithNamespace set the Oxia namespace to be used for this client. If not set, the client will be using the `default` namespace.

func WithRequestTimeout

func WithRequestTimeout(requestTimeout time.Duration) ClientOption

func WithSessionTimeout

func WithSessionTimeout(sessionTimeout time.Duration) ClientOption

WithSessionTimeout specifies the session timeout to.

func WithTLS added in v0.4.0

func WithTLS(tlsConf *tls.Config) ClientOption

type DeleteOption

type DeleteOption interface {
	PutOption
	// contains filtered or unexported methods
}

DeleteOption represents an option for the [SyncClient.Delete] operation.

func ExpectedVersionId

func ExpectedVersionId(versionId int64) DeleteOption

ExpectedVersionId Marks that the operation should only be successful if the versionId of the record stored in the server matches the expected one.

type DeserializeFunc

type DeserializeFunc func(data []byte, value any) error

DeserializeFunc is the deserialization function. eg: [json.Unmarshall].

type GetResult

type GetResult struct {
	// Value is the value of the record
	Value []byte

	// The version information
	Version Version

	// The error if the `Get` operation failed
	Err error
}

GetResult structure is wrapping a Value, its version information and an eventual error as results for a `Get` operation in the AsyncClient.

type ListResult

type ListResult struct {
	// The list of keys returned by [List]
	Keys []string
	// The eventual error in the [List] operation
	Err error
}

ListResult structure is wrapping a list of keys, and a potential error as results for a `List` operation in the AsyncClient.

type ModifyFunc

type ModifyFunc[Value any] func(v Optional[Value]) (Value, error)

ModifyFunc is the transformation function to apply on ReadModifyUpdate.

type Notification

type Notification struct {
	// The type of the modification
	Type NotificationType

	// The Key of the record to which the notification is referring
	Key string

	// The current VersionId of the record, or -1 for a KeyDeleted event
	VersionId int64
}

Notification represents one change in the Oxia database.

type NotificationType

type NotificationType int

NotificationType represents the type of the notification event.

const (
	// KeyCreated A record that didn't exist was created.
	KeyCreated NotificationType = iota
	// KeyModified An existing record was modified.
	KeyModified
	// KeyDeleted A record was deleted.
	KeyDeleted
)

func (NotificationType) String

func (n NotificationType) String() string

type Notifications

type Notifications interface {
	io.Closer

	// Ch exposes the channel where all the notification events are published
	Ch() <-chan *Notification
}

Notifications allow applications to receive the feed of changes that are happening in the Oxia database.

Example
standaloneServer := initExampleServer(exampleServerAddr)
defer standaloneServer.Close()

client, err := NewSyncClient(exampleServerAddr)
if err != nil {
	log.Fatal(err)
}

notifications, err := client.GetNotifications()
if err != nil {
	log.Fatal(err)
}

_, err = client.Put(context.Background(), "/my-key", []byte("value-1"), ExpectedRecordNotExists())
if err != nil {
	log.Fatal(err)
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
	defer wg.Done()
	time.Sleep(1 * time.Second)
	_ = notifications.Close()
}()

// Receive all the notification from the server
for notification := range notifications.Ch() {
	fmt.Printf("Type %#v - Key: %s - VersionId: %d\n",
		notification.Type, notification.Key, notification.VersionId)
}

_ = client.Close()
// Sleep to avoid DATA RACE on zerolog read at os.Stdout,and runExamples write at os.Stdout
time.Sleep(2 * time.Second)
wg.Wait()
Output:

Type 0 - Key: /my-key - VersionId: 0

type Optional

type Optional[T any] interface {
	// Present is true if the optional value is set
	Present() bool

	// Empty is true if the optional value is not set
	Empty() bool

	// Get the value and test if it was present
	Get() (value T, ok bool)

	// MustGet get the value and panic if it's not present
	MustGet() T
}

Optional represents a wrapper for some value that can be present or not.

type PutOption

type PutOption interface {
	// contains filtered or unexported methods
}

PutOption represents an option for the [SyncClient.Put] operation.

func Ephemeral

func Ephemeral() PutOption

Ephemeral marks the record to be created as an ephemeral record. Ephemeral records have their lifecycle tied to a particular client instance, and they are automatically deleted when the client instance is closed. These records are also deleted if the client cannot communicate with the Oxia service for some extended amount of time, and the session between the client and the service "expires". Application can control the session behavior by setting the session timeout appropriately with WithSessionTimeout option when creating the client instance.

func ExpectedRecordNotExists

func ExpectedRecordNotExists() PutOption

ExpectedRecordNotExists Marks that the put operation should only be successful if the record does not exist yet.

type PutResult

type PutResult struct {
	// The Version information
	Version Version

	// The error if the `Put` operation failed
	Err error
}

PutResult structure is wrapping the version information for the result of a `Put` operation and an eventual error in the AsyncClient.

type SerializeFunc

type SerializeFunc func(value any) ([]byte, error)

SerializeFunc is the serialization function. eg: [json.Marshall].

type SyncClient

type SyncClient interface {
	io.Closer

	// Put Associates a value with a key
	//
	// There are few options that can be passed to the Put operation:
	//  - The Put operation can be made conditional on that the record hasn't changed from
	//    a specific existing version by passing the [ExpectedVersionId] option.
	//  - Client can assert that the record does not exist by passing [ExpectedRecordNotExists]
	//  - Client can create an ephemeral record with [Ephemeral]
	//
	// Returns a [Version] object that contains information about the newly updated record
	// Returns [ErrorUnexpectedVersionId] if the expected version id does not match the
	// current version id of the record
	Put(ctx context.Context, key string, value []byte, options ...PutOption) (Version, error)

	// Delete removes the key and its associated value from the data store.
	//
	// The Delete operation can be made conditional on that the record hasn't changed from
	// a specific existing version by passing the [ExpectedVersionId] option.
	// Returns [ErrorUnexpectedVersionId] if the expected version id does not match the
	// current version id of the record
	Delete(ctx context.Context, key string, options ...DeleteOption) error

	// DeleteRange deletes any records with keys within the specified range.
	// Note: Oxia uses a custom sorting order that treats `/` characters in special way.
	// Refer to this documentation for the specifics:
	// https://github.com/streamnative/oxia/blob/main/docs/oxia-key-sorting.md
	DeleteRange(ctx context.Context, minKeyInclusive string, maxKeyExclusive string) error

	// Get returns the value associated with the specified key.
	// In addition to the value, a version object is also returned, with information
	// about the record state.
	// Returns ErrorKeyNotFound if the record does not exist
	Get(ctx context.Context, key string) (value []byte, version Version, err error)

	// List any existing keys within the specified range.
	// Note: Oxia uses a custom sorting order that treats `/` characters in special way.
	// Refer to this documentation for the specifics:
	// https://github.com/streamnative/oxia/blob/main/docs/oxia-key-sorting.md
	List(ctx context.Context, minKeyInclusive string, maxKeyExclusive string) <-chan ListResult

	// GetNotifications creates a new subscription to receive the notifications
	// from Oxia for any change that is applied to the database
	GetNotifications() (Notifications, error)
}

SyncClient is the main interface to perform operations with Oxia.

Once a client instance is created, it will be valid until it gets explicitly closed, and it can be shared across different go-routines.

If any ephemeral records are created (using the Ephemeral PutOption), they will all be automatically deleted when the client instance is closed, or if the process crashed.

func NewSyncClient

func NewSyncClient(serviceAddress string, opts ...ClientOption) (SyncClient, error)

NewSyncClient creates a new Oxia client with the sync interface

ServiceAddress is the target host:port of any Oxia server to bootstrap the client. It is used for establishing the shard assignments. Ideally this should be a load-balanced endpoint.

A list of ClientOption arguments can be passed to configure the Oxia client. Example:

client, err := oxia.NewSyncClient("my-oxia-service:6648", oxia.WithRequestTimeout(30*time.Second))

type Version

type Version struct {
	// VersionId represents an identifier that can be used to refer to a particular version
	// of a record.
	// Applications shouldn't make assumptions on the actual values of the VersionId. VersionIds
	// are only meaningful for a given key and don't reflect the number of changes that were made
	// on a given record.
	// Applications can use the VersionId when making [SyncClient.Put] or [SyncClient.Delete]
	// operations by passing an [ExpectedVersionId] option.
	VersionId int64

	// The time when the record was last created
	// (If the record gets deleted and recreated, it will have a new CreatedTimestamp value)
	CreatedTimestamp uint64

	// The time when the record was last modified
	ModifiedTimestamp uint64

	// The number of modifications to the record since it was last created
	// (If the record gets deleted and recreated, the ModificationsCount will restart at 0)
	ModificationsCount int64

	// Whether the record is ephemeral. See [Ephemeral]
	Ephemeral bool

	// For ephemeral records, the unique identity of the Oxia client that did last modify it.
	// It will be empty for all non-ephemeral records.
	ClientIdentity string
}

Version includes some information regarding the state of a record.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL