Documentation ¶
Overview ¶
Package oxia provides a Go client library for interacting with Oxia service.
Example ¶
standaloneServer := initExampleServer(exampleServerAddr) defer standaloneServer.Close() // Creates a client instance // Once created, a client instance will be valid until it's explicitly closed, and it can // be used from different go-routines. client, err := NewSyncClient(exampleServerAddr, WithRequestTimeout(10*time.Second)) if err != nil { log.Fatal(err) } defer client.Close() // Write a record to Oxia with the specified key and value, and with the expectation // that the record does not already exist. _, res1, err := client.Put(context.Background(), "/my-key", []byte("value-1"), ExpectedRecordNotExists()) if err != nil { log.Fatal(err) } // Write a record with the expectation that it has not changed since the previous write. // If there was any change, the operation will fail _, _, err = client.Put(context.Background(), "/my-key", []byte("value-2"), ExpectedVersionId(res1.VersionId)) if err != nil { log.Fatal(err) } // Read the value of a record key, value, version, err := client.Get(context.Background(), "/my-key") if err != nil { log.Fatal(err) } _ = client.Close() // Sleep to avoid DATA RACE on zerolog read at os.Stdout,and runExamples write at os.Stdout time.Sleep(2 * time.Second) fmt.Printf("Result: key: %s - Value: %s - Version: %#v\n", key, string(value), version.VersionId)
Output: Result: key: /my-key - Value: value-2 - Version: 1
Index ¶
- Constants
- Variables
- type AsyncClient
- type BaseOption
- type Cache
- type ClientOption
- func WithBatchLinger(batchLinger time.Duration) ClientOption
- func WithGlobalMeterProvider() ClientOption
- func WithIdentity(identity string) ClientOption
- func WithMaxRequestsPerBatch(maxRequestsPerBatch int) ClientOption
- func WithMeterProvider(meterProvider metric.MeterProvider) ClientOption
- func WithNamespace(namespace string) ClientOption
- func WithRequestTimeout(requestTimeout time.Duration) ClientOption
- func WithSessionTimeout(sessionTimeout time.Duration) ClientOption
- func WithTLS(tlsConf *tls.Config) ClientOption
- type DeleteOption
- type DeleteRangeOption
- type DeserializeFunc
- type GetOption
- type GetResult
- type ListOption
- type ListResult
- type ModifyFunc
- type Notification
- type NotificationType
- type Notifications
- type Optional
- type PutOption
- type PutResult
- type RangeScanOption
- type ResultAndChannel
- type ResultHeap
- type SerializeFunc
- type SyncClient
- type Version
Examples ¶
Constants ¶
const ( DefaultBatchLinger = 5 * time.Millisecond DefaultMaxRequestsPerBatch = 1000 DefaultMaxBatchSize = 128 * 1024 DefaultRequestTimeout = 30 * time.Second DefaultSessionTimeout = 15 * time.Second DefaultNamespace = common.DefaultNamespace )
const ( // VersionIdNotExists represent the VersionId of a non-existing record. VersionIdNotExists int64 = -1 )
Variables ¶
var ( // ErrKeyNotFound A record associated with the specified key was not found. ErrKeyNotFound = errors.New("key not found") // ErrUnexpectedVersionId The expected version id passed as a condition does not match // the current version id of the stored record. ErrUnexpectedVersionId = errors.New("unexpected version id") ErrInvalidOptions = errors.New("invalid options") // ErrRequestTooLarge is returned when a request is larger than the maximum batch size. ErrRequestTooLarge = batch.ErrRequestTooLarge // ErrUnknownStatus Unknown error. ErrUnknownStatus = errors.New("unknown status") )
var ( ErrInvalidOptionBatchLinger = errors.New("BatchLinger must be greater than or equal to zero") ErrInvalidOptionMaxRequestsPerBatch = errors.New("MaxRequestsPerBatch must be greater than zero") ErrInvalidOptionMaxBatchSize = errors.New("MaxBatchSize must be greater than zero") ErrInvalidOptionRequestTimeout = errors.New("RequestTimeout must be greater than zero") ErrInvalidOptionSessionTimeout = errors.New("SessionTimeout must be greater than zero") ErrInvalidOptionIdentity = errors.New("Identity must be non-empty") ErrInvalidOptionNamespace = errors.New("Namespace cannot be empty") ErrInvalidOptionTLS = errors.New("Tls cannot be empty") )
Functions ¶
This section is empty.
Types ¶
type AsyncClient ¶
type AsyncClient interface { io.Closer // Put Associates a value with a key // // There are few options that can be passed to the Put operation: // - The Put operation can be made conditional on that the record hasn't changed from // a specific existing version by passing the [ExpectedVersionId] option. // - Client can assert that the record does not exist by passing [ExpectedRecordNotExists] // - Client can create an ephemeral record with [Ephemeral] // // Returns a [Version] object that contains information about the newly updated record // Returns [ErrorUnexpectedVersionId] if the expected version id does not match the // current version id of the record Put(key string, value []byte, options ...PutOption) <-chan PutResult // Delete removes the key and its associated value from the data store. // // The Delete operation can be made conditional on that the record hasn't changed from // a specific existing version by passing the [ExpectedVersionId] option. // Returns [ErrorUnexpectedVersionId] if the expected version id does not match the // current version id of the record Delete(key string, options ...DeleteOption) <-chan error // DeleteRange deletes any records with keys within the specified range. // Note: Oxia uses a custom sorting order that treats `/` characters in special way. // Refer to this documentation for the specifics: // https://github.com/streamnative/oxia/blob/main/docs/oxia-key-sorting.md DeleteRange(minKeyInclusive string, maxKeyExclusive string, options ...DeleteRangeOption) <-chan error // Get returns the value associated with the specified key. // In addition to the value, a version object is also returned, with information // about the record state. // Returns ErrorKeyNotFound if the record does not exist Get(key string, options ...GetOption) <-chan GetResult // List any existing keys within the specified range. // Note: Oxia uses a custom sorting order that treats `/` characters in special way. // Refer to this documentation for the specifics: // https://github.com/streamnative/oxia/blob/main/docs/oxia-key-sorting.md List(ctx context.Context, minKeyInclusive string, maxKeyExclusive string, options ...ListOption) <-chan ListResult // RangeScan perform a scan for existing records with any keys within the specified range. // Note: Oxia uses a custom sorting order that treats `/` characters in special way. // Refer to this documentation for the specifics: // https://github.com/streamnative/oxia/blob/main/docs/oxia-key-sorting.md RangeScan(ctx context.Context, minKeyInclusive string, maxKeyExclusive string, options ...RangeScanOption) <-chan GetResult // GetNotifications creates a new subscription to receive the notifications // from Oxia for any change that is applied to the database GetNotifications() (Notifications, error) }
AsyncClient Oxia client with methods suitable for asynchronous operations.
This interface exposes the same functionality as SyncClient, though it returns a channel instead of an actual result for the performed operations.
This allows to enqueue multiple operations which the client library will be able to group and automatically batch.
Batching of requests will ensure a larger throughput and more efficient handling. Applications can control the batching by configuring the linger-time with WithBatchLinger option in NewAsyncClient.
Example ¶
standaloneServer := initExampleServer(exampleServerAddr) defer standaloneServer.Close() // Creates a client instance // Once created, a client instance will be valid until it's explicitly closed, and it can // be used from different go-routines. client, err := NewAsyncClient(exampleServerAddr, WithRequestTimeout(10*time.Second)) if err != nil { log.Fatal(err) } // Write a record to Oxia with the specified key and value, and with the expectation // that the record does not already exist. // The client library will try to batch multiple operations into a single request, to // achieve much better efficiency and performance c1 := client.Put("/my-key-1", []byte("value-1")) c2 := client.Put("/my-key-2", []byte("value-2")) c3 := client.Put("/my-key-3", []byte("value-3")) // Wait for the async operations to complete r1 := <-c1 fmt.Printf("First operation complete: version: %#v - error: %#v\n", r1.Version.VersionId, r1.Err) r2 := <-c2 fmt.Printf("First operation complete: version: %#v - error: %#v\n", r2.Version.VersionId, r2.Err) r3 := <-c3 fmt.Printf("First operation complete: version: %#v - error: %#v\n", r3.Version.VersionId, r3.Err) _ = client.Close() // Sleep to avoid DATA RACE on zerolog read at os.Stdout,and runExamples write at os.Stdout time.Sleep(2 * time.Second)
Output: First operation complete: version: 0 - error: <nil> First operation complete: version: 0 - error: <nil> First operation complete: version: 0 - error: <nil>
func NewAsyncClient ¶
func NewAsyncClient(serviceAddress string, opts ...ClientOption) (AsyncClient, error)
NewAsyncClient creates a new Oxia client with the async interface
ServiceAddress is the target host:port of any Oxia server to bootstrap the client. It is used for establishing the shard assignments. Ideally this should be a load-balanced endpoint.
A list of ClientOption arguments can be passed to configure the Oxia client. Example:
client, err := oxia.NewAsyncClient("my-oxia-service:6648", oxia.WithBatchLinger(10*time.Milliseconds))
type BaseOption ¶ added in v0.5.0
type BaseOption interface { PutOption GetOption DeleteOption DeleteRangeOption ListOption RangeScanOption }
BaseOption is an option that applies to all the client operations.
func PartitionKey ¶ added in v0.5.0
func PartitionKey(partitionKey string) BaseOption
PartitionKey overrides the partition routing with the specified `partitionKey` instead of the regular record key. Records with the same partitionKey will always be guaranteed to be co-located in the same Oxia shard.
type Cache ¶
type Cache[Value any] interface { io.Closer // Put Associates a value with a key // // There are few options that can be passed to the Put operation: // - The Put operation can be made conditional on that the record hasn't changed from // a specific existing version by passing the [ExpectedVersionId] option. // - Client can assert that the record does not exist by passing [ExpectedRecordNotExists] // - Client can create an ephemeral record with [Ephemeral] // // Returns a [Version] object that contains information about the newly updated record // Returns [ErrorUnexpectedVersionId] if the expected version id does not match the // current version id of the record Put(ctx context.Context, key string, value Value, options ...PutOption) (string, Version, error) // ReadModifyUpdate applies atomically the result of the `modifyFunc` argument into // the database. // The `modifyFunc` will be pass the current value and will return the modified value ReadModifyUpdate(ctx context.Context, key string, modifyFunc ModifyFunc[Value]) error // Delete removes the key and its associated value from the data store. // // The Delete operation can be made conditional on that the record hasn't changed from // a specific existing version by passing the [ExpectedVersionId] option. // Returns [ErrorUnexpectedVersionId] if the expected version id does not match the // current version id of the record Delete(ctx context.Context, key string, options ...DeleteOption) error // Get returns the value associated with the specified key. // In addition to the value, a version object is also returned, with information // about the record state. // Returns ErrorKeyNotFound if the record does not exist Get(ctx context.Context, key string) (Value, Version, error) }
Cache provides a view of the data stored in Oxia that is locally cached.
The cached values are automatically updated when there are updates or deletions. The cache is storing de-serialized object.
func NewCache ¶
func NewCache[T any](client SyncClient, serializeFunc SerializeFunc, deserializeFunc DeserializeFunc) (Cache[T], error)
NewCache creates a new cache object for a specific type Uses the `serializeFunc` and `deserializeFunc` for SerDe.
type ClientOption ¶
type ClientOption interface {
// contains filtered or unexported methods
}
ClientOption is an interface for applying Oxia client options.
func WithBatchLinger ¶
func WithBatchLinger(batchLinger time.Duration) ClientOption
WithBatchLinger defines how long the batcher will wait before sending a batched request. The value must be greater than or equal to zero. A value of zero will disable linger, effectively disabling batching.
func WithGlobalMeterProvider ¶
func WithGlobalMeterProvider() ClientOption
WithGlobalMeterProvider instructs the Oxia client to use the global OpenTelemetry MeterProvider.
func WithIdentity ¶
func WithIdentity(identity string) ClientOption
func WithMaxRequestsPerBatch ¶
func WithMaxRequestsPerBatch(maxRequestsPerBatch int) ClientOption
WithMaxRequestsPerBatch defines how many individual requests a batch can contain before the batched request is sent. The value must be greater than zero. A value of one will effectively disable batching.
func WithMeterProvider ¶
func WithMeterProvider(meterProvider metric.MeterProvider) ClientOption
func WithNamespace ¶
func WithNamespace(namespace string) ClientOption
WithNamespace set the Oxia namespace to be used for this client. If not set, the client will be using the `default` namespace.
func WithRequestTimeout ¶
func WithRequestTimeout(requestTimeout time.Duration) ClientOption
func WithSessionTimeout ¶
func WithSessionTimeout(sessionTimeout time.Duration) ClientOption
WithSessionTimeout specifies the session timeout to.
func WithTLS ¶ added in v0.4.0
func WithTLS(tlsConf *tls.Config) ClientOption
type DeleteOption ¶
type DeleteOption interface { PutOption // contains filtered or unexported methods }
DeleteOption represents an option for the [SyncClient.Delete] operation.
func ExpectedVersionId ¶
func ExpectedVersionId(versionId int64) DeleteOption
ExpectedVersionId Marks that the operation should only be successful if the versionId of the record stored in the server matches the expected one.
type DeleteRangeOption ¶ added in v0.5.0
type DeleteRangeOption interface {
// contains filtered or unexported methods
}
DeleteRangeOption represents an option for the [SyncClient.Delete] operation.
type DeserializeFunc ¶
DeserializeFunc is the deserialization function. eg: [json.Unmarshall].
type GetOption ¶ added in v0.5.0
type GetOption interface {
// contains filtered or unexported methods
}
GetOption represents an option for the [SyncClient.Get] operation.
func ComparisonCeiling ¶ added in v0.5.0
func ComparisonCeiling() GetOption
ComparisonCeiling option will make the get operation to search for the record whose key is the lowest key >= to the supplied key.
func ComparisonEqual ¶ added in v0.5.0
func ComparisonEqual() GetOption
ComparisonEqual sets the Get() operation to compare the stored key for equality.
func ComparisonFloor ¶ added in v0.5.0
func ComparisonFloor() GetOption
ComparisonFloor option will make the get operation to search for the record whose key is the highest key <= to the supplied key.
func ComparisonHigher ¶ added in v0.5.0
func ComparisonHigher() GetOption
ComparisonHigher option will make the get operation to search for the record whose key is strictly > to the supplied key.
func ComparisonLower ¶ added in v0.5.0
func ComparisonLower() GetOption
ComparisonLower option will make the get operation to search for the record whose key is strictly < to the supplied key.
type GetResult ¶
type GetResult struct { // Key is the key of the record Key string // Value is the value of the record Value []byte // The version information Version Version // The error if the `Get` operation failed Err error }
GetResult structure is wrapping a record, with its Key and Value, its version information and an eventual error as results for a `Get` operation in the AsyncClient.
type ListOption ¶ added in v0.5.0
type ListOption interface {
// contains filtered or unexported methods
}
ListOption represents an option for the [SyncClient.List] operation.
type ListResult ¶
type ListResult struct { // The list of keys returned by [List] Keys []string // The eventual error in the [List] operation Err error }
ListResult structure is wrapping a list of keys, and a potential error as results for a `List` operation in the AsyncClient.
type ModifyFunc ¶
ModifyFunc is the transformation function to apply on ReadModifyUpdate.
type Notification ¶
type Notification struct { // The type of the modification Type NotificationType // The Key of the record to which the notification is referring Key string // The current VersionId of the record, or -1 for a KeyDeleted event VersionId int64 }
Notification represents one change in the Oxia database.
type NotificationType ¶
type NotificationType int
NotificationType represents the type of the notification event.
const ( // KeyCreated A record that didn't exist was created. KeyCreated NotificationType = iota // KeyModified An existing record was modified. KeyModified // KeyDeleted A record was deleted. KeyDeleted )
func (NotificationType) String ¶
func (n NotificationType) String() string
type Notifications ¶
type Notifications interface { io.Closer // Ch exposes the channel where all the notification events are published Ch() <-chan *Notification }
Notifications allow applications to receive the feed of changes that are happening in the Oxia database.
Example ¶
standaloneServer := initExampleServer(exampleServerAddr) defer standaloneServer.Close() client, err := NewSyncClient(exampleServerAddr) if err != nil { log.Fatal(err) } notifications, err := client.GetNotifications() if err != nil { log.Fatal(err) } _, _, err = client.Put(context.Background(), "/my-key", []byte("value-1"), ExpectedRecordNotExists()) if err != nil { log.Fatal(err) } var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() time.Sleep(1 * time.Second) _ = notifications.Close() }() // Receive all the notification from the server for notification := range notifications.Ch() { fmt.Printf("Type %#v - Key: %s - VersionId: %d\n", notification.Type, notification.Key, notification.VersionId) } _ = client.Close() // Sleep to avoid DATA RACE on zerolog read at os.Stdout,and runExamples write at os.Stdout time.Sleep(2 * time.Second) wg.Wait()
Output: Type 0 - Key: /my-key - VersionId: 0
type Optional ¶
type Optional[T any] interface { // Present is true if the optional value is set Present() bool // Empty is true if the optional value is not set Empty() bool // Get the value and test if it was present Get() (value T, ok bool) // MustGet get the value and panic if it's not present MustGet() T }
Optional represents a wrapper for some value that can be present or not.
type PutOption ¶
type PutOption interface {
// contains filtered or unexported methods
}
PutOption represents an option for the [SyncClient.Put] operation.
func Ephemeral ¶
func Ephemeral() PutOption
Ephemeral marks the record to be created as an ephemeral record. Ephemeral records have their lifecycle tied to a particular client instance, and they are automatically deleted when the client instance is closed. These records are also deleted if the client cannot communicate with the Oxia service for some extended amount of time, and the session between the client and the service "expires". Application can control the session behavior by setting the session timeout appropriately with WithSessionTimeout option when creating the client instance.
func ExpectedRecordNotExists ¶
func ExpectedRecordNotExists() PutOption
ExpectedRecordNotExists Marks that the put operation should only be successful if the record does not exist yet.
func SequenceKeysDeltas ¶ added in v0.5.0
SequenceKeysDeltas will request that the final record key to be assigned by the server, based on the prefix record key and appending one or more sequences. The sequence numbers will be atomically added based on the deltas. Deltas must be >= 0 and the first one strictly > 0. SequenceKeysDeltas also requires that a PartitionKey option is provided.
type PutResult ¶
type PutResult struct { // The Key of the inserted record Key string // The Version information Version Version // The error if the `Put` operation failed Err error }
PutResult structure is wrapping the version information for the result of a `Put` operation and an eventual error in the AsyncClient.
type RangeScanOption ¶ added in v0.5.0
type RangeScanOption interface {
// contains filtered or unexported methods
}
RangeScanOption represents an option for the [SyncClient.RangeScan] operation.
type ResultAndChannel ¶ added in v0.5.0
type ResultAndChannel struct {
// contains filtered or unexported fields
}
type ResultHeap ¶ added in v0.5.0
type ResultHeap []*ResultAndChannel
func (ResultHeap) Len ¶ added in v0.5.0
func (h ResultHeap) Len() int
func (ResultHeap) Less ¶ added in v0.5.0
func (h ResultHeap) Less(i, j int) bool
func (*ResultHeap) Pop ¶ added in v0.5.0
func (h *ResultHeap) Pop() any
func (*ResultHeap) Push ¶ added in v0.5.0
func (h *ResultHeap) Push(x any)
func (ResultHeap) Swap ¶ added in v0.5.0
func (h ResultHeap) Swap(i, j int)
type SerializeFunc ¶
SerializeFunc is the serialization function. eg: [json.Marshall].
type SyncClient ¶
type SyncClient interface { io.Closer // Put Associates a value with a key // // There are few options that can be passed to the Put operation: // - The Put operation can be made conditional on that the record hasn't changed from // a specific existing version by passing the [ExpectedVersionId] option. // - Client can assert that the record does not exist by passing [ExpectedRecordNotExists] // - Client can create an ephemeral record with [Ephemeral] // // Returns the actual key of the inserted record // Returns a [Version] object that contains information about the newly updated record // Returns [ErrorUnexpectedVersionId] if the expected version id does not match the // current version id of the record Put(ctx context.Context, key string, value []byte, options ...PutOption) (insertedKey string, version Version, err error) // Delete removes the key and its associated value from the data store. // // The Delete operation can be made conditional on that the record hasn't changed from // a specific existing version by passing the [ExpectedVersionId] option. // Returns [ErrorUnexpectedVersionId] if the expected version id does not match the // current version id of the record Delete(ctx context.Context, key string, options ...DeleteOption) error // DeleteRange deletes any records with keys within the specified range. // Note: Oxia uses a custom sorting order that treats `/` characters in special way. // Refer to this documentation for the specifics: // https://github.com/streamnative/oxia/blob/main/docs/oxia-key-sorting.md DeleteRange(ctx context.Context, minKeyInclusive string, maxKeyExclusive string, options ...DeleteRangeOption) error // Get returns the value associated with the specified key. // In addition to the value, a version object is also returned, with information // about the record state. // Returns ErrorKeyNotFound if the record does not exist Get(ctx context.Context, key string, options ...GetOption) (storedKey string, value []byte, version Version, err error) // List any existing keys within the specified range. // Note: Oxia uses a custom sorting order that treats `/` characters in special way. // Refer to this documentation for the specifics: // https://github.com/streamnative/oxia/blob/main/docs/oxia-key-sorting.md List(ctx context.Context, minKeyInclusive string, maxKeyExclusive string, options ...ListOption) (keys []string, err error) // RangeScan perform a scan for existing records with any keys within the specified range. // Ordering in results channel is respected only if a [PartitionKey] option is passed (and the keys were // inserted with that partition key). RangeScan(ctx context.Context, minKeyInclusive string, maxKeyExclusive string, options ...RangeScanOption) <-chan GetResult // GetNotifications creates a new subscription to receive the notifications // from Oxia for any change that is applied to the database GetNotifications() (Notifications, error) }
SyncClient is the main interface to perform operations with Oxia.
Once a client instance is created, it will be valid until it gets explicitly closed, and it can be shared across different go-routines.
If any ephemeral records are created (using the Ephemeral PutOption), they will all be automatically deleted when the client instance is closed, or if the process crashed.
func NewSyncClient ¶
func NewSyncClient(serviceAddress string, opts ...ClientOption) (SyncClient, error)
NewSyncClient creates a new Oxia client with the sync interface
ServiceAddress is the target host:port of any Oxia server to bootstrap the client. It is used for establishing the shard assignments. Ideally this should be a load-balanced endpoint.
A list of ClientOption arguments can be passed to configure the Oxia client. Example:
client, err := oxia.NewSyncClient("my-oxia-service:6648", oxia.WithRequestTimeout(30*time.Second))
type Version ¶
type Version struct { // VersionId represents an identifier that can be used to refer to a particular version // of a record. // Applications shouldn't make assumptions on the actual values of the VersionId. VersionIds // are only meaningful for a given key and don't reflect the number of changes that were made // on a given record. // Applications can use the VersionId when making [SyncClient.Put] or [SyncClient.Delete] // operations by passing an [ExpectedVersionId] option. VersionId int64 // The time when the record was last created // (If the record gets deleted and recreated, it will have a new CreatedTimestamp value) CreatedTimestamp uint64 // The time when the record was last modified ModifiedTimestamp uint64 // The number of modifications to the record since it was last created // (If the record gets deleted and recreated, the ModificationsCount will restart at 0) ModificationsCount int64 // Whether the record is ephemeral. See [Ephemeral] Ephemeral bool // For ephemeral records, the unique identity of the Oxia client that did last modify it. // It will be empty for all non-ephemeral records. ClientIdentity string }
Version includes some information regarding the state of a record.