reductgo

package module
v1.16.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2025 License: MIT Imports: 14 Imported by: 0

README

ReductStore Client SDK for Golang

Go Reference Go Report Card GitHub Workflow Status

The ReductStore Client SDK for Golang is an HTTP client wrapper for interacting with a ReductStore instance from a Golang application. It provides a set of APIs for accessing and manipulating data stored in ReductStore.

Requirements

  • Go 1.24 or later

Features

  • HTTP client wrapper with context support
  • Clean API design following Go idioms
  • Support for ReductStore HTTP API v1.16
  • Token-based authentication for secure access to the database
  • Labeling for read-write operations and querying
  • Batch operations for efficient data processing

Getting Started

To get started with the ReductStore Client SDK for Golang, you'll need to have ReductStore installed and running on your machine. You can find instructions for installing ReductStore here.

Once you have ReductStore up and running, you can install the ReductStore Client SDK for Golang using go get:

go get github.com/reductstore/reduct-go

Then, you can use the following example code to start interacting with your ReductStore database from your Go application:

package main

import (
	"context"
	reduct "github.com/reductstore/reduct-go"
	model "github.com/reductstore/reduct-go/model"
	"time"
)

func main() {
	ctx := context.Background()
	// 1. Create a ReductStore client
	client := reduct.NewClient("http://localhost:8383", reduct.ClientOptions{
		APIToken: "my-token",
	})

	// 2. Get or create a bucket with 1Gb quota
	settings := model.NewBucketSettingBuilder().
		WithQuotaType(model.QuotaTypeFifo).
		WithQuotaSize(1_000_000_000).
		Build()

	bucket, err := client.CreateOrGetBucket(ctx, "my-bucket", &settings)
	if err != nil {
		panic(err)
	}

	// 3. Write some data with timestamps in the 'entry-1' entry
	ts := time.Now().UnixMicro()
	writer := bucket.BeginWrite(ctx, "entry-1",
		&reduct.WriteOptions{Timestamp: ts, Labels: map[string]any{"score": 10}})
	err = writer.Write("<Blob data>")
	if err != nil {
		panic(err)
	}

	writer = bucket.BeginWrite(ctx, "entry-1",
		&reduct.WriteOptions{Timestamp: ts + 1, Labels: map[string]any{"score": 20}})
	err = writer.Write("<Blob data 2>")
	if err != nil {
		panic(err)
	}

	// 4. Query the data by time range and condition
	queryOptions := reduct.NewQueryOptionsBuilder().
		WithStart(ts).
		WithStop(ts + 2).
		WithWhen(map[string]any{"&score": map[string]any{"$gt": 15}}).
		Build()

	query, err := bucket.Query(ctx, "entry-1", &queryOptions)
	if err != nil {
		panic(err)
	}

	for rec := range query.Records() {
		data, err := rec.Read()
		if err != nil {
			panic(err)
		}
		timestamp := rec.Time()
		labels := rec.Labels()
		println("Record at time:", timestamp)
		println("Labels:", labels)
		println("Data:", string(data))
	}
}

For more examples, see the Guides section in the ReductStore documentation.

Supported ReductStore Versions and Backward Compatibility

The library is backward compatible with the previous versions. However, some methods have been deprecated and will be removed in future releases. Please refer to the Changelog for more details.

The SDK supports the following ReductStore API versions:

  • v1.16
  • v1.15
  • v1.14

It can work with newer and older versions, but it is not guaranteed that all features will work as expected because the API may change and some features may be deprecated or the SDK may not support them yet.

Documentation

Overview

Package reductgo provides functionality for managing Reduct object storage, including client operations, bucket management

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

func (*Batch) Add

func (b *Batch) Add(ts int64, data []byte, contentType string, labels LabelMap)

Add adds a record to the batch.

func (*Batch) AddOnlyLabels

func (b *Batch) AddOnlyLabels(ts int64, labels LabelMap)

AddOnlyLabels adds an empty record with only labels.

func (*Batch) AddOnlyTimestamp

func (b *Batch) AddOnlyTimestamp(ts int64)

AddOnlyTimestamp adds an empty record with only a timestamp.

func (*Batch) Clear

func (b *Batch) Clear()

Clear removes all records from the batch and resets its state.

func (*Batch) LastAccessTime

func (b *Batch) LastAccessTime() time.Time

LastAccessTime returns the last access time of the batch.

func (*Batch) RecordCount

func (b *Batch) RecordCount() int

RecordCount returns the number of records in the batch.

func (*Batch) Size

func (b *Batch) Size() int64

Size returns the total size of the batch.

func (*Batch) Write

func (b *Batch) Write(ctx context.Context) (ErrorMap, error)

Write writes the batch to the server. It returns an ErrorMap with timestamps as keys and APIError as values for individual records that failed to write. If the whole batch fails, it returns an error.

type BatchOptions

type BatchOptions struct{}

type BatchType

type BatchType int
const (
	BatchWrite BatchType = iota
	BatchUpdate
	BatchRemove
)

type Bucket

type Bucket struct {
	HTTPClient httpclient.HTTPClient
	Name       string
}

func (*Bucket) BeginMetadataRead added in v0.0.7

func (b *Bucket) BeginMetadataRead(ctx context.Context, entry string, ts *int64) (*ReadableRecord, error)

BeginMetadataRead starts reading only the metadata of a record from the given entry at the specified timestamp.

Parameters:

  • ctx: Context for cancellation and timeout control.
  • entry: Name of the entry to read from.
  • ts: Optional A UNIX timestamp in microseconds. If it is empty, the latest record is returned.

It returns a readableRecord or an error if the read fails.

Use readableRecord.Read() to read the content of the reader.

func (*Bucket) BeginRead

func (b *Bucket) BeginRead(ctx context.Context, entry string, ts *int64) (*ReadableRecord, error)

BeginRead starts reading a record from the given entry at the specified timestamp.

Parameters:

  • ctx: Context for cancellation and timeout control.
  • entry: Name of the entry to read from.
  • ts: Optional A UNIX timestamp in microseconds. If it is empty, the latest record is returned.

It returns a readableRecord or an error if the read fails.

Use readableRecord.Read() to read the content of the reader.

func (*Bucket) BeginRemoveBatch

func (b *Bucket) BeginRemoveBatch(_ context.Context, entry string) *Batch

func (*Bucket) BeginUpdateBatch

func (b *Bucket) BeginUpdateBatch(_ context.Context, entry string) *Batch

func (*Bucket) BeginWrite

func (b *Bucket) BeginWrite(_ context.Context, entry string, options *WriteOptions) *WritableRecord

BeginWrite starts a record writer for an entry.

Parameters:

  • entry the name of the entry to write the record to.
  • options:
  • TimeStamp: timestamp in microseconds, it is set to current time if not provided
  • ContentType: "text/plain"
  • Labels: record label kev:value pairs {label1: "value1", label2: "value2"}.

func (*Bucket) BeginWriteBatch

func (b *Bucket) BeginWriteBatch(_ context.Context, entry string) *Batch

func (*Bucket) CheckExists

func (b *Bucket) CheckExists(ctx context.Context) (bool, error)

CheckExists checks if the bucket exists on the server.

func (*Bucket) GetEntries added in v1.15.0

func (b *Bucket) GetEntries(ctx context.Context) ([]model.EntryInfo, error)

GetEntries retrieves the list of entries and their information in the bucket.

func (*Bucket) GetFullInfo added in v1.15.0

func (b *Bucket) GetFullInfo(ctx context.Context) (model.FullBucketDetail, error)

GetFullInfo retrieves the full details of the bucket, including its settings and entries.

func (*Bucket) GetInfo

func (b *Bucket) GetInfo(ctx context.Context) (model.BucketInfo, error)

GetInfo retrieves the basic information about the bucket, such as its name, size, and quota.

func (*Bucket) GetSettings

func (b *Bucket) GetSettings(ctx context.Context) (model.BucketSetting, error)

GetSettings retrieves the settings of the bucket.

func (*Bucket) Query

func (b *Bucket) Query(ctx context.Context, entry string, options *QueryOptions) (*QueryResult, error)

Query queries records for a time interval and returns them through a channel

Parameters:

  • ctx: Context for cancellation and timeout control
  • entry: Name of the entry to query
  • start: Optional start point of the time period in microseconds
  • end: Optional end point of the time period in microseconds
  • options: Optional query options for filtering and controlling the query behavior

Example:

records, err := bucket.Query(ctx, "entry-1", start, end, nil)
if err != nil {
    return err
}
for record := range records {
    fmt.Printf("Time: %d, Size: %d\n", record.Time(), record.Size())
    fmt.Printf("Labels: %v\n", record.Labels())
    content, err := record.Read()
    if err != nil {
        return err
    }
    // Process content...
}

func (*Bucket) Remove

func (b *Bucket) Remove(ctx context.Context) error

Remove deletes the bucket from the server.

func (*Bucket) RemoveEntry added in v1.15.0

func (b *Bucket) RemoveEntry(ctx context.Context, entry string) error

RemoveEntry removes an entry from the bucket and all its records.

Parameters:

  • ctx: Context for cancellation and timeout control.
  • entry: Name of the entry to remove.

func (*Bucket) RemoveQuery

func (b *Bucket) RemoveQuery(ctx context.Context, entry string, options *QueryOptions) (int64, error)

RemoveQuery removes records by query.

Parameters:

  • ctx: Context for cancellation and timeout control
  • entry: Name of the entry
  • start: Optional start point of the time period in microseconds. If nil, starts from the first record
  • end: Optional end point of the time period in microseconds. If nil, ends at the last record
  • options: Optional query options. Only When and Ext fields are used, other options are ignored

Note: remove is exclusive of the end point. [start, end) Returns the number of records removed.

func (*Bucket) RemoveRecord added in v1.15.0

func (b *Bucket) RemoveRecord(ctx context.Context, entry string, ts int64) error

RemoveRecord removes a record from the bucket.

Parameters:

  • ctx: Context for cancellation and timeout control.
  • entry: Name of the entry to remove the record from.
  • ts: Timestamp of the record to remove in microseconds.

func (*Bucket) Rename

func (b *Bucket) Rename(ctx context.Context, newName string) error

Rename changes the name of the bucket.

func (*Bucket) RenameEntry added in v1.15.0

func (b *Bucket) RenameEntry(ctx context.Context, entry, newName string) error

RenameEntry renames an entry.

Parameters:

  • ctx: Context for cancellation and timeout control.
  • entry: Name of the entry to rename.
  • newName: New name of the entry.

func (*Bucket) SetSettings

func (b *Bucket) SetSettings(ctx context.Context, settings model.BucketSetting) error

SetSettings updates the settings of the bucket.

func (*Bucket) Update

func (b *Bucket) Update(ctx context.Context, entry string, ts int64, labels LabelMap) error

Update updates the labels of an existing record. If a label has an empty string value, it will be removed.

Parameters:

  • ctx: Context for cancellation and timeout control
  • entry: Name of the entry
  • ts: Timestamp of record in microseconds
  • labels: Labels to update

type CSVRowResult

type CSVRowResult struct {
	Size        int64    `json:"size"`
	ContentType string   `json:"content_type,omitempty"`
	Labels      LabelMap `json:"labels"`
}

CSVRowResult represents the parsed result of a CSV row.

func ParseCSVRow

func ParseCSVRow(row string) CSVRowResult

ParseCSVRow parses a CSV row with support for escaped values.

type Client

type Client interface {
	// Get Info
	GetInfo(ctx context.Context) (model.ServerInfo, error)
	// Check if the storage engine is working
	IsLive(ctx context.Context) (bool, error)
	// Get a list of the buckets with their stats
	GetBuckets(ctx context.Context) ([]model.BucketInfo, error)
	// Create a new bucket
	CreateBucket(ctx context.Context, name string, settings *model.BucketSetting) (Bucket, error)
	// Create a new bucket if it doesn't exist and return it
	CreateOrGetBucket(ctx context.Context, name string, settings *model.BucketSetting) (Bucket, error)
	// Get a bucket
	GetBucket(ctx context.Context, name string) (Bucket, error)
	// Check if a bucket exists
	CheckBucketExists(ctx context.Context, name string) (bool, error)
	// Remove a bucket
	RemoveBucket(ctx context.Context, name string) error
	// Get a list of Tokens
	GetTokens(ctx context.Context) ([]model.Token, error)
	// Show Information about a Token
	GetToken(ctx context.Context, name string) (model.Token, error)
	// Create a New Token
	CreateToken(ctx context.Context, name string, permissions model.TokenPermissions) (string, error)
	// Remove a Token
	RemoveToken(ctx context.Context, name string) error
	// Get Full Information about Current API Token
	GetCurrentToken(ctx context.Context) (model.Token, error)
	// Get a list of Replication Tasks
	GetReplicationTasks(ctx context.Context) ([]model.ReplicationInfo, error)
	// Get a Replication Task
	GetReplicationTask(ctx context.Context, name string) (model.FullReplicationInfo, error)
	// Create a Replication Task
	CreateReplicationTask(ctx context.Context, name string, task model.ReplicationSettings) error
	// Update a Replication Task
	UpdateReplicationTask(ctx context.Context, name string, task model.ReplicationSettings) error
	// Remove a Replication Task
	RemoveReplicationTask(ctx context.Context, name string) error
}

this is a client for a ReductStore instance.

func NewClient

func NewClient(url string, options ClientOptions) Client

NewClient creates a new ReductClient.

type ClientOptions

type ClientOptions struct {
	APIToken  string
	Timeout   time.Duration
	VerifySSL bool
}

type ErrorMap added in v1.15.0

type ErrorMap map[int64]model.APIError

type LabelMap

type LabelMap map[string]any

type QueryOptions

type QueryOptions struct {
	QueryType    QueryType     `json:"query_type"`
	Start        int64         `json:"start,omitempty"`
	Stop         int64         `json:"stop,omitempty"`
	When         any           `json:"when,omitempty"`
	Ext          any           `json:"ext,omitempty"`
	Strict       bool          `json:"strict,omitempty"`
	Continuous   bool          `json:"continuous,omitempty"`
	Head         bool          `json:"head,omitempty"`
	PollInterval time.Duration `json:"-"`
}

QueryOptions represents a query to run on an entry.

type QueryOptionsBuilder added in v0.0.3

type QueryOptionsBuilder struct {
	// contains filtered or unexported fields
}

func NewQueryOptionsBuilder added in v0.0.3

func NewQueryOptionsBuilder() *QueryOptionsBuilder

func (*QueryOptionsBuilder) Build added in v0.0.3

func (q *QueryOptionsBuilder) Build() QueryOptions

Build builds the QueryOptions from the builder.

func (*QueryOptionsBuilder) WithContinuous added in v0.0.3

func (q *QueryOptionsBuilder) WithContinuous(continuous bool) *QueryOptionsBuilder

WithContinuous WithQueryType makes the query continuous. If set, the query doesn't finish if no records are found and waits for new records to be added. Returns the QueryOptionsBuilder to allow method chaining.

func (*QueryOptionsBuilder) WithExt added in v0.0.3

func (q *QueryOptionsBuilder) WithExt(ext any) *QueryOptionsBuilder

WithExt sets the ext field for the query to pass additional parameters to extensions Returns the QueryOptionsBuilder to allow method chaining.

func (*QueryOptionsBuilder) WithHead added in v0.0.3

func (q *QueryOptionsBuilder) WithHead(head bool) *QueryOptionsBuilder

WithHead if set to true, only metadata is fetched without the content. Returns the QueryOptionsBuilder to allow method chaining.

func (*QueryOptionsBuilder) WithPollInterval added in v0.0.3

func (q *QueryOptionsBuilder) WithPollInterval(pollInterval time.Duration) *QueryOptionsBuilder

WithPollInterval sets the interval for polling the query if it is continuous. Returns the QueryOptionsBuilder to allow method chaining.

func (*QueryOptionsBuilder) WithStart added in v0.0.3

func (q *QueryOptionsBuilder) WithStart(start int64) *QueryOptionsBuilder

WithStart sets the start timestamp (in microseconds) for the query. Returns the QueryOptionsBuilder to allow method chaining.

func (*QueryOptionsBuilder) WithStop added in v0.0.3

func (q *QueryOptionsBuilder) WithStop(stop int64) *QueryOptionsBuilder

WithStop sets the stop timestamp (in microseconds) for the query. Returns the QueryOptionsBuilder to allow method chaining.

func (*QueryOptionsBuilder) WithStrict added in v0.0.3

func (q *QueryOptionsBuilder) WithStrict(strict bool) *QueryOptionsBuilder

WithStrict sets the strict mode for the query. Returns the QueryOptionsBuilder to allow method chaining.

func (*QueryOptionsBuilder) WithWhen added in v0.0.3

func (q *QueryOptionsBuilder) WithWhen(when any) *QueryOptionsBuilder

WithWhen sets the when condition for the query. Example: map[string]any{"&label": map[string]any{"$eq": "test"}} Returns the QueryOptionsBuilder to allow method chaining.

type QueryResponse

type QueryResponse struct {
	ID             int64 `json:"id,omitempty"`
	RemovedRecords int64 `json:"removed_records,omitempty"`
}

QueryResponse represents the response from a query operation.

type QueryResult

type QueryResult struct {
	// contains filtered or unexported fields
}

func (*QueryResult) Records

func (q *QueryResult) Records() <-chan *ReadableRecord

type QueryType

type QueryType string

QueryType represents the type of query to run.

const (
	QueryTypeQuery  QueryType = "QUERY"
	QueryTypeRemove QueryType = "REMOVE"
)

type ReadableRecord

type ReadableRecord struct {
	// contains filtered or unexported fields
}

func NewReadableRecord

func NewReadableRecord(time int64,
	size int64,
	last bool,
	stream io.Reader,
	labels LabelMap,
	contentType string,
) *ReadableRecord

func (*ReadableRecord) ContentType

func (r *ReadableRecord) ContentType() string

ContentType returns the content type of the record.

func (*ReadableRecord) IsLast

func (r *ReadableRecord) IsLast() bool

IsLast is true if this is the last record in the query.

This is not the same as IsLastInBatch(), which is true if this is the last record in the batch.

func (*ReadableRecord) IsLastInBatch added in v0.0.8

func (r *ReadableRecord) IsLastInBatch() bool

IsLastInBatch is true if this is the last record in the batch.

This is not the same as IsLast(), which is true if this is the last record in the query. use this to check if the record is the last in the batch which has to be processed in a stream.

func (*ReadableRecord) Labels

func (r *ReadableRecord) Labels() LabelMap

Labels returns the labels of the record.

func (*ReadableRecord) Read

func (r *ReadableRecord) Read() ([]byte, error)

Read reads the record from the stream.

note: calling read on last record will return no error, but may return empty data.

calling this method on a last record is not recommended, use Stream().Read() instead.

func (*ReadableRecord) ReadAsString

func (r *ReadableRecord) ReadAsString() (string, error)

ReadAsString reads the record from the stream and returns it as a string.

use this to read the record at once.

func (*ReadableRecord) Size

func (r *ReadableRecord) Size() int64

Size returns the size of the record.

func (*ReadableRecord) Stream

func (r *ReadableRecord) Stream() io.Reader

Stream returns the stream of the record.

use this to read the record in a stream.

func (*ReadableRecord) Time

func (r *ReadableRecord) Time() int64

Time returns the timestamp of the record.

type Record

type Record struct {
	Data        []byte
	ContentType string
	Labels      LabelMap
}

type ReductClient

type ReductClient struct {
	APIToken string
	// this is a custom http client
	HTTPClient httpclient.HTTPClient
	// contains filtered or unexported fields
}

func (*ReductClient) CheckBucketExists

func (c *ReductClient) CheckBucketExists(ctx context.Context, name string) (bool, error)

CheckBucketExists checks if a bucket exists.

func (*ReductClient) CreateBucket

func (c *ReductClient) CreateBucket(ctx context.Context, name string, settings *model.BucketSetting) (Bucket, error)

func (*ReductClient) CreateOrGetBucket

func (c *ReductClient) CreateOrGetBucket(ctx context.Context, name string, settings *model.BucketSetting) (Bucket, error)

func (*ReductClient) CreateReplicationTask added in v0.0.3

func (c *ReductClient) CreateReplicationTask(ctx context.Context, name string, task model.ReplicationSettings) error

CreateReplicationTask creates a new replication task.

func (*ReductClient) CreateToken added in v0.0.3

func (c *ReductClient) CreateToken(ctx context.Context, name string, permissions model.TokenPermissions) (string, error)

CreateToken creates a new token.

func (*ReductClient) GetBucket

func (c *ReductClient) GetBucket(ctx context.Context, name string) (Bucket, error)

GetBucket returns a bucket.

func (*ReductClient) GetBuckets added in v0.0.2

func (c *ReductClient) GetBuckets(ctx context.Context) ([]model.BucketInfo, error)

GetBuckets returns a list of buckets with their stats.

func (*ReductClient) GetCurrentToken added in v0.0.3

func (c *ReductClient) GetCurrentToken(ctx context.Context) (model.Token, error)

GetCurrentToken returns the current token.

func (*ReductClient) GetInfo added in v0.0.2

func (c *ReductClient) GetInfo(ctx context.Context) (model.ServerInfo, error)

GetInfo returns information about the server.

func (*ReductClient) GetReplicationTask added in v0.0.3

func (c *ReductClient) GetReplicationTask(ctx context.Context, name string) (model.FullReplicationInfo, error)

GetReplicationTask returns a replication task.

func (*ReductClient) GetReplicationTasks added in v0.0.3

func (c *ReductClient) GetReplicationTasks(ctx context.Context) ([]model.ReplicationInfo, error)

GetReplicationTasks returns a list of replication tasks.

func (*ReductClient) GetToken added in v0.0.3

func (c *ReductClient) GetToken(ctx context.Context, name string) (model.Token, error)

GetToken returns information about a token.

func (*ReductClient) GetTokens added in v0.0.3

func (c *ReductClient) GetTokens(ctx context.Context) ([]model.Token, error)

GetTokens returns a list of tokens.

func (*ReductClient) IsLive added in v0.0.2

func (c *ReductClient) IsLive(ctx context.Context) (bool, error)

IsLive checks if the server is live.

func (*ReductClient) RemoveBucket

func (c *ReductClient) RemoveBucket(ctx context.Context, name string) error

RemoveBucket removes a bucket.

func (*ReductClient) RemoveReplicationTask added in v0.0.3

func (c *ReductClient) RemoveReplicationTask(ctx context.Context, name string) error

RemoveReplicationTask removes a replication task.

func (*ReductClient) RemoveToken added in v0.0.3

func (c *ReductClient) RemoveToken(ctx context.Context, name string) error

RemoveToken removes a token.

func (*ReductClient) UpdateReplicationTask added in v0.0.3

func (c *ReductClient) UpdateReplicationTask(ctx context.Context, name string, task model.ReplicationSettings) error

UpdateReplicationTask updates an existing replication task.

type WritableRecord

type WritableRecord struct {
	// contains filtered or unexported fields
}

func NewWritableRecord

func NewWritableRecord(bucketName string,
	entryName string,
	httpClient httpclient.HTTPClient,
	options WriteOptions,
) *WritableRecord

func (*WritableRecord) Write

func (w *WritableRecord) Write(data any) error

Write writes the record to the bucket.

data can be a string, []byte, or io.Reader. size is the size of the data to write. if size is not provided, it will be calculated from the data.

type WriteOptions

type WriteOptions struct {
	Timestamp   int64
	ContentType string
	Labels      LabelMap
	Size        int64
}

Directories

Path Synopsis
Package model defines the core data structures (models) used throughout the application.
Package model defines the core data structures (models) used throughout the application.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL