s2

package
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2026 License: MIT Imports: 28 Imported by: 1

Documentation

Overview

Go SDK for S2.

The Go SDK provides ergonomic wrappers and utilities to interact with the S2 API.

Installation

go get github.com/s2-streamstore/s2-sdk-go/s2@latest

Authentication

Generate an authentication token by logging onto the web console at s2.dev.

Quick Start

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	ctx := context.Background()
	client := s2.New("your-access-token", nil)

	stream := client.Basin("my-basin").Stream("my-stream")

	ack, err := stream.Append(ctx, &s2.AppendInput{
		Records: []s2.AppendRecord{
			{Body: []byte("first record")},
			{Body: []byte("second record")},
		},
	})
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("wrote records at seq %d\n", ack.Start.SeqNum)

	session, err := stream.ReadSession(ctx, &s2.ReadOptions{
		SeqNum: s2.Uint64(0),
	})
	if err != nil {
		log.Fatal(err)
	}
	defer session.Close()

	for session.Next(ctx) {
		rec := session.Record()
		fmt.Printf("[%d] %s\n", rec.SeqNum, string(rec.Body))
	}
	if err := session.Err(); err != nil {
		log.Fatal(err)
	}
}

How the SDK is Organized

The SDK has three levels of clients:

  • Client — managing basins, access tokens, metrics
  • BasinClient — managing streams within a basin
  • StreamClient — reading and writing records

The clients can be accessed like so:

client := s2.New("token", nil)           // account level
basin := client.Basin("my-basin")        // basin level
stream := basin.Stream("my-stream")      // stream level

Reading Records

Read session

session, err := stream.ReadSession(ctx, &s2.ReadOptions{
	SeqNum: s2.Uint64(0),   // start from the beginning
	Wait:   s2.Int32(30),   // wait up to 30s for new records
})
if err != nil {
	log.Fatal(err)
}
defer session.Close()

for session.Next(ctx) {
	rec := session.Record()
	// process rec
}
if err := session.Err(); err != nil {
	log.Fatal(err)
}

// resume from where you left off
if pos := session.NextReadPosition(); pos != nil {
	fmt.Printf("next time, start from seq %d\n", pos.SeqNum)
}

Unary Read

batch, err := stream.Read(ctx, &s2.ReadOptions{
	SeqNum: s2.Uint64(0),
	Count:  s2.Uint64(100),
})
if err != nil {
	log.Fatal(err)
}

for _, rec := range batch.Records {
	fmt.Printf("[%d] %s\n", rec.SeqNum, rec.Body)
}

Writing Records

Append Session (pipelining)

session, err := stream.AppendSession(ctx, nil)
if err != nil {
	log.Fatal(err)
}
defer session.Close()

future, err := session.Submit(&s2.AppendInput{
	Records: []s2.AppendRecord{{Body: []byte("hello")}},
})
if err != nil {
	log.Fatal(err)
}

ticket, err := future.Wait(ctx)
if err != nil {
	log.Fatal(err)
}

ack, err := ticket.Ack(ctx)
if err != nil {
	log.Fatal(err)
}
fmt.Printf("written at seq %d\n", ack.Start.SeqNum)

Unary Append

ack, err := stream.Append(ctx, &s2.AppendInput{
	Records: []s2.AppendRecord{
		{Body: []byte("hello")},
		{Body: []byte("world")},
	},
})
if err != nil {
	log.Fatal(err)
}
fmt.Printf("wrote %d records starting at seq %d\n",
	ack.End.SeqNum-ack.Start.SeqNum, ack.Start.SeqNum)

Batching with Producer

Group records based on the amount collected or a linger time:

session, _ := stream.AppendSession(ctx, nil)
defer session.Close()

batcher := s2.NewBatcher(ctx, &s2.BatchingOptions{
	Linger:     100 * time.Millisecond,  // flush after 100ms
	MaxRecords: 500,                      // or after 500 records
})
producer := s2.NewProducer(ctx, batcher, session)
defer producer.Close()

for i := 0; i < 1000; i++ {
	future, _ := producer.Submit(s2.AppendRecord{
		Body: []byte(fmt.Sprintf("record %d", i)),
	})
	go func(f *s2.RecordSubmitFuture) {
		ticket, _ := f.Wait(ctx)
		ack, err := ticket.Ack(ctx)
		if err != nil {
			log.Printf("ack error: %v", err)
			return
		}
		log.Printf("seq: %d", ack.SeqNum())
	}(future)
}

Managing Basins and Streams

Basins

// Create a basin
info, err := client.Basins.Create(ctx, s2.CreateBasinArgs{
	Basin: "my-basin",
	Scope: s2.Ptr(s2.BasinScopeAwsUsEast1),
})

// List basins
iter := client.Basins.Iter(ctx, nil)
for iter.Next() {
	fmt.Println(iter.Value().Name)
}

// Delete a basin
client.Basins.Delete(ctx, "my-basin")

Streams

basin := client.Basin("my-basin")

// Create a stream
info, err := basin.Streams.Create(ctx, s2.CreateStreamArgs{
	Stream: "my-stream",
	Config: &s2.StreamConfig{
		StorageClass: s2.Ptr(s2.StorageClassExpress),
		RetentionPolicy: &s2.RetentionPolicy{
			Age: s2.Ptr(int64(86400 * 7)),  // keep for 7 days
		},
	},
})

// List streams
iter := basin.Streams.Iter(ctx, nil)
for iter.Next() {
	fmt.Println(iter.Value().Name)
}

// Delete a stream
basin.Streams.Delete(ctx, "my-stream")

Access Tokens

You can create scoped tokens for your applications:

resp, err := client.AccessTokens.Issue(ctx, s2.IssueAccessTokenArgs{
	ID: "my-service",
	Scope: s2.AccessTokenScope{
		Basins: &s2.ResourceSet{Prefix: s2.Ptr("prod-")},
		OpGroups: &s2.PermittedOperationGroups{
			Stream: &s2.ReadWritePermissions{Read: true, Write: true},
		},
	},
})
fmt.Printf("token: %s\n", resp.AccessToken)

// Revoke
client.AccessTokens.Revoke(ctx, s2.RevokeAccessTokenArgs{ID: "my-service"})

Error Handling

Errors from the SDK are typically *S2Error which includes the status code and other info:

ack, err := stream.Append(ctx, input)
if err != nil {
	var s2Err *s2.S2Error
	if errors.As(err, &s2Err) {
		fmt.Printf("S2 error: %s (status %d, retryable: %v)\n",
			s2Err.Message, s2Err.Status, s2Err.IsRetryable())
	}
}

Index

Examples

Constants

View Source
const (
	// Maximum number of records allowed in a single append batch.
	MaxBatchRecords = 1000
	// Maximum metered size in bytes for a single append batch (1 MiB).
	MaxBatchMeteredBytes = 1 * 1024 * 1024 // 1 MiB
	// Maximum fencing token length in bytes.
	MaxFencingTokenLength = 36
)
View Source
const (
	OperationListBasins        = "list-basins"
	OperationCreateBasin       = "create-basin"
	OperationDeleteBasin       = "delete-basin"
	OperationReconfigureBasin  = "reconfigure-basin"
	OperationGetBasinConfig    = "get-basin-config"
	OperationIssueAccessToken  = "issue-access-token"
	OperationRevokeAccessToken = "revoke-access-token"
	OperationListAccessTokens  = "list-access-tokens"
	OperationListStreams       = "list-streams"
	OperationCreateStream      = "create-stream"
	OperationDeleteStream      = "delete-stream"
	OperationGetStreamConfig   = "get-stream-config"
	OperationReconfigureStream = "reconfigure-stream"
	OperationCheckTail         = "check-tail"
	OperationAppend            = "append"
	OperationRead              = "read"
	OperationTrim              = "trim"
	OperationFence             = "fence"
	OperationAccountMetrics    = "account-metrics"
	OperationBasinMetrics      = "basin-metrics"
	OperationStreamMetrics     = "stream-metrics"
)
View Source
const (
	DefaultBaseURL = "https://aws.s2.dev/v1"
)

Variables

View Source
var (
	// ErrSessionClosed is returned when an operation is attempted on a closed session.
	ErrSessionClosed = errors.New("session closed")

	// ErrTimeout is returned when an operation times out.
	ErrTimeout = errors.New("operation timed out")

	// ErrMaxAttemptsExhausted is returned when all retry attempts have been exhausted.
	ErrMaxAttemptsExhausted = errors.New("max attempts exhausted")
)

Sentinel errors for SDK-originated errors. Use errors.Is() to check for these conditions.

View Source
var DefaultRetryConfig = &RetryConfig{
	MaxAttempts:       defaultMaxAttempts,
	MinBaseDelay:      defaultMinBaseDelay,
	MaxBaseDelay:      defaultMaxBaseDelay,
	AppendRetryPolicy: AppendRetryPolicyAll,
}

Default retry configuration. It retries up to 3 times with exponential backoff between 100ms and 1s.

Functions

func Bool added in v0.11.0

func Bool(v bool) *bool

Returns a pointer to v.

func Int32 added in v0.11.0

func Int32(v int32) *int32

Returns a pointer to v.

func Int64 added in v0.11.0

func Int64(v int64) *int64

Returns a pointer to v.

func MeteredBatchBytes added in v0.11.0

func MeteredBatchBytes(records []AppendRecord) int64

Calculates the total metered size in bytes of a batch of append records.

func MeteredPayloadBytes added in v0.11.0

func MeteredPayloadBytes(record AppendRecord) uint64

Calculates the metered size in bytes of an append record.

func MeteredSequencedRecordBytes added in v0.11.0

func MeteredSequencedRecordBytes(record SequencedRecord) uint64

Calculates the metered size in bytes of a sequenced record.

func Ptr added in v0.11.0

func Ptr[T any](v T) *T

Returns a pointer to v for any type.

func String added in v0.11.0

func String(v string) *string

Returns a pointer to v.

func Uint32 added in v0.11.0

func Uint32(v uint32) *uint32

Returns a pointer to v.

func Uint64 added in v0.11.0

func Uint64(v uint64) *uint64

Returns a pointer to v.

Types

type AccessTokenID added in v0.11.0

type AccessTokenID string

Access token ID.

type AccessTokenInfo added in v0.8.0

type AccessTokenInfo struct {
	// Access token ID.
	// It must be unique to the account and between 1 and 96 bytes in length.
	ID AccessTokenID `json:"id"`
	// Access token scope.
	Scope AccessTokenScope `json:"scope"`
	// Namespace streams based on the configured stream-level scope, which must be a prefix.
	// Stream name arguments will be automatically prefixed, and the prefix will be stripped when listing streams.
	AutoPrefixStreams bool `json:"auto_prefix_streams,omitempty"`
	// Expiration time.
	// If not set, the expiration will be set to that of the requestor's token.
	ExpiresAt *time.Time `json:"expires_at,omitempty"`
}

type AccessTokenIterator added in v0.11.0

type AccessTokenIterator struct {
	// contains filtered or unexported fields
}

Iterator over access tokens returned by AccessTokensClient.Iter. Use Next to advance, Value to get the current item, and Err to check for errors.

func (*AccessTokenIterator) Err added in v0.11.0

func (it *AccessTokenIterator) Err() error

Returns any error encountered during iteration. Should be called after Next returns false to check if iteration stopped due to an error.

func (*AccessTokenIterator) Next added in v0.11.0

func (it *AccessTokenIterator) Next() bool

Advances the iterator to the next access token. Returns true if there is a next item, false when iteration is complete or an error occurred. Call Err after iteration to check for errors.

func (*AccessTokenIterator) Value added in v0.11.0

Returns the current access token. Only valid after a successful call to Next.

type AccessTokenScope added in v0.8.0

type AccessTokenScope struct {
	// Token IDs allowed.
	AccessTokens *ResourceSet `json:"access_tokens,omitempty"`
	// Basin names allowed.
	Basins *ResourceSet `json:"basins,omitempty"`
	// Access permissions at operation group level.
	OpGroups *PermittedOperationGroups `json:"op_groups,omitempty"`
	// Operations allowed for the token.
	// A union of allowed operations and groups is used as an effective set of allowed operations.
	Ops []string `json:"ops,omitempty"`
	// Stream names allowed.
	Streams *ResourceSet `json:"streams,omitempty"`
}

Access token scope.

type AccessTokensClient added in v0.11.0

type AccessTokensClient struct {
	// contains filtered or unexported fields
}

func (*AccessTokensClient) Issue added in v0.11.0

Issue an access token.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	ctx := context.Background()

	resp, err := client.AccessTokens.Issue(ctx, s2.IssueAccessTokenArgs{
		ID: "my-service-token",
		Scope: s2.AccessTokenScope{
			Basins: &s2.ResourceSet{Prefix: s2.Ptr("my-")},
			OpGroups: &s2.PermittedOperationGroups{
				Stream: &s2.ReadWritePermissions{Read: true, Write: true},
			},
		},
	})
	if err != nil {
		log.Fatalf("issue token: %v", err)
	}

	fmt.Printf("issued token: %s\n", resp.AccessToken)
}

func (*AccessTokensClient) Iter added in v0.11.0

Iterate over access tokens.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	ctx := context.Background()

	iter := client.AccessTokens.Iter(ctx, nil)
	for iter.Next() {
		token := iter.Value()
		fmt.Printf("token: %s\n", token.ID)
	}
	if err := iter.Err(); err != nil {
		log.Fatalf("list tokens: %v", err)
	}
}

func (*AccessTokensClient) List added in v0.11.0

List access tokens.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	ctx := context.Background()

	resp, err := client.AccessTokens.List(ctx, &s2.ListAccessTokensArgs{
		Prefix: "my-",
		Limit:  s2.Ptr(10),
	})
	if err != nil {
		log.Fatalf("list access tokens: %v", err)
	}

	for _, token := range resp.AccessTokens {
		fmt.Printf("token: %s\n", token.ID)
	}
	fmt.Printf("has more: %v\n", resp.HasMore)
}

func (*AccessTokensClient) Revoke added in v0.11.0

Revoke an access token.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	ctx := context.Background()

	if err := client.AccessTokens.Revoke(ctx, s2.RevokeAccessTokenArgs{
		ID: "my-service-token",
	}); err != nil {
		log.Fatalf("revoke token: %v", err)
	}

	fmt.Println("token revoked")
}

type AccountMetricSet added in v0.11.0

type AccountMetricSet string
const (
	AccountMetricSetActiveBasins AccountMetricSet = "active-basins"
	AccountMetricSetAccountOps   AccountMetricSet = "account-ops"
)

type AccountMetricsArgs added in v0.11.0

type AccountMetricsArgs struct {
	// Metric set to return.
	Set AccountMetricSet `json:"set"`
	// Start timestamp as Unix epoch seconds, if applicable for the metric set.
	Start *int64 `json:"start,omitempty"`
	// End timestamp as Unix epoch seconds, if applicable for the metric set.
	End *int64 `json:"end,omitempty"`
	// Interval to aggregate over for timeseries metric sets.
	Interval *TimeseriesInterval `json:"interval,omitempty"`
}

type AccumulationMetric added in v0.11.0

type AccumulationMetric struct {
	// The interval at which data points are accumulated.
	Interval TimeseriesInterval `json:"interval"`
	// Timeseries name.
	Name string `json:"name"`
	// Unit of the metric.
	Unit MetricUnit `json:"unit"`
	// Timeseries values.
	// Each element is a tuple of a timestamp in Unix epoch seconds and a data point.
	// The data point represents the accumulated value for the time period starting at the timestamp,
	// spanning one interval.
	Values []MetricSample `json:"values"`
}

type AppendAck added in v0.11.0

type AppendAck struct {
	// Sequence number and timestamp of the first record that was appended.
	Start StreamPosition `json:"start"`
	// Sequence number of the last record that was appended `+ 1`, and timestamp of the last record that was appended.
	// The difference between `end.seq_num` and `start.seq_num` will be the number of records appended.
	End StreamPosition `json:"end"`
	// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record on the stream.
	// This can be greater than the `end` position in case of concurrent appends.
	Tail StreamPosition `json:"tail"`
}

Success response to an `append` request.

type AppendInput

type AppendInput struct {
	// Batch of records to append atomically, which must contain at least one record, and no more than 1000.
	// The total size of a batch of records may not exceed 1 MiB of metered bytes.
	Records []AppendRecord `json:"records"`
	// Enforce that the sequence number assigned to the first record matches.
	MatchSeqNum *uint64 `json:"match_seq_num,omitempty"`
	// Enforce a fencing token, which starts out as an empty string that can be overridden by a `fence` command record.
	FencingToken *string `json:"fencing_token,omitempty"`
}

Payload of an `append` request.

type AppendRecord

type AppendRecord struct {
	// Timestamp for this record.
	// The service will always ensure monotonicity by adjusting it up if necessary to the maximum observed timestamp.
	// Refer to stream timestamping configuration for the finer semantics around whether a client-specified timestamp is required,
	// and whether it will be capped at the arrival time.
	Timestamp *uint64 `json:"timestamp,omitempty"`
	// Series of name-value pairs for this record.
	Headers []Header `json:"headers,omitempty"`
	// Body of the record.
	Body []byte `json:"body,omitempty"`
}

Record to be appended to a stream.

func NewFenceCommandRecord added in v0.11.0

func NewFenceCommandRecord(token string, timestamp *uint64) AppendRecord

Create a new Fence command record.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	stream := client.Basin("my-basin").Stream("my-stream")
	ctx := context.Background()

	fenceRecord := s2.NewFenceCommandRecord("my-writer-id", nil)

	ack, err := stream.Append(ctx, &s2.AppendInput{
		Records: []s2.AppendRecord{fenceRecord},
	})
	if err != nil {
		log.Fatalf("fence: %v", err)
	}

	fmt.Printf("fence set at seq=%d\n", ack.Start.SeqNum)
}

func NewTrimCommandRecord added in v0.11.0

func NewTrimCommandRecord(seqNum uint64, timestamp *uint64) AppendRecord

Create a new Trim command record.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	stream := client.Basin("my-basin").Stream("my-stream")
	ctx := context.Background()

	trimRecord := s2.NewTrimCommandRecord(1000, nil)

	ack, err := stream.Append(ctx, &s2.AppendInput{
		Records: []s2.AppendRecord{trimRecord},
	})
	if err != nil {
		log.Fatalf("trim: %v", err)
	}

	fmt.Printf("trim command at seq=%d\n", ack.Start.SeqNum)
}

type AppendRetryPolicy added in v0.11.0

type AppendRetryPolicy string

Policy for retrying append operations.

const (
	// Retry all append operations, including those that may have side effects (default).
	AppendRetryPolicyAll AppendRetryPolicy = "all"
	// Only retry append operations that are guaranteed to have no side effects.
	AppendRetryPolicyNoSideEffects AppendRetryPolicy = "noSideEffects"
)

type AppendSession added in v0.11.0

type AppendSession struct {
	// contains filtered or unexported fields
}

AppendSession provides ordered, pipelined appends with automatic retries.

func (*AppendSession) Close added in v0.11.0

func (r *AppendSession) Close() error

Closes the session after all inflight batches have been acknowledged. New submissions after Close is called will be rejected.

func (*AppendSession) LastAckedPosition added in v0.11.0

func (r *AppendSession) LastAckedPosition() *AppendAck

Returns the last acknowledged position, if any.

func (*AppendSession) Submit added in v0.11.0

func (r *AppendSession) Submit(input *AppendInput) (*SubmitFuture, error)

Submit an append request. Returns a SubmitFuture that resolves to a submit ticket once the batch is enqueued (has capacity). Call ticket.Wait() to get a SubmitFuture for the AppendAck once the batch is durable. This method applies backpressure and will block if capacity limits are reached.

type AppendSessionOptions added in v0.11.0

type AppendSessionOptions struct {
	// Aggregate size of records, to allow in-flight before applying backpressure (default: 5 MiB).
	MaxInflightBytes uint64
	// Maximum number of batches allowed in-flight before applying backpressure.
	MaxInflightBatches uint32
	// Retry configuration for handling transient failures.
	// Applies to management operations (basins, streams, tokens) and stream operations (read, append).
	RetryConfig *RetryConfig
}

type BasinClient

type BasinClient struct {
	Streams *StreamsClient
	// contains filtered or unexported fields
}

Basin client.

func (*BasinClient) Name added in v0.11.0

func (b *BasinClient) Name() string

Name of the basin.

func (*BasinClient) Stream added in v0.11.0

func (b *BasinClient) Stream(name StreamName) *StreamClient

type BasinConfig

type BasinConfig struct {
	// Create stream on append if it doesn't exist, using the default stream configuration.
	// Defaults to false.
	CreateStreamOnAppend *bool `json:"create_stream_on_append,omitempty"`
	// Create stream on read if it doesn't exist, using the default stream configuration.
	// Defaults to false.
	CreateStreamOnRead *bool `json:"create_stream_on_read,omitempty"`
	// Default stream configuration.
	DefaultStreamConfig *StreamConfig `json:"default_stream_config,omitempty"`
}

type BasinInfo

type BasinInfo struct {
	// Basin name.
	Name BasinName `json:"name"`
	// Basin scope.
	Scope BasinScope `json:"scope"`
	// Creation time.
	CreatedAt time.Time `json:"created_at"`
	// Deletion time, if the basin is being deleted.
	DeletedAt *time.Time `json:"deleted_at,omitempty"`
}

type BasinMetricSet added in v0.11.0

type BasinMetricSet string
const (
	BasinMetricSetStorage          BasinMetricSet = "storage"
	BasinMetricSetAppendOps        BasinMetricSet = "append-ops"
	BasinMetricSetReadOps          BasinMetricSet = "read-ops"
	BasinMetricSetReadThroughput   BasinMetricSet = "read-throughput"
	BasinMetricSetAppendThroughput BasinMetricSet = "append-throughput"
	BasinMetricSetBasinOps         BasinMetricSet = "basin-ops"
)

type BasinMetricsArgs added in v0.11.0

type BasinMetricsArgs struct {
	//  Basin name.
	Basin string `json:"basin"`
	// Metric set to return.
	Set BasinMetricSet `json:"set"`
	// Start timestamp as Unix epoch seconds, if applicable for the metric set.
	Start *int64 `json:"start,omitempty"`
	// End timestamp as Unix epoch seconds, if applicable for the metric set.
	End *int64 `json:"end,omitempty"`
	// Interval to aggregate over for timeseries metric sets.
	Interval *TimeseriesInterval `json:"interval,omitempty"`
}

type BasinName added in v0.11.0

type BasinName string

Basin name which must be globally unique. It can be between 8 and 48 characters in length, and comprise lowercase letters, numbers and hyphens. It cannot begin or end with a hyphen.

type BasinReconfiguration added in v0.11.0

type BasinReconfiguration struct {
	// Create a stream on append.
	CreateStreamOnAppend *bool `json:"create_stream_on_append,omitempty"`
	// Create a stream on read.
	CreateStreamOnRead *bool `json:"create_stream_on_read,omitempty"`
	// Basin configuration.
	DefaultStreamConfig *StreamReconfiguration `json:"default_stream_config,omitempty"`
}

type BasinScope added in v0.7.0

type BasinScope string

Basin scope.

const (
	BasinScopeAwsUsEast1 BasinScope = "aws:us-east-1"
)

type BasinsClient added in v0.11.0

type BasinsClient struct {
	// contains filtered or unexported fields
}

func (*BasinsClient) Create added in v0.11.0

func (b *BasinsClient) Create(ctx context.Context, args CreateBasinArgs) (*BasinInfo, error)

Create a basin.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	ctx := context.Background()

	info, err := client.Basins.Create(ctx, s2.CreateBasinArgs{
		Basin: "my-new-basin",
		Scope: s2.Ptr(s2.BasinScopeAwsUsEast1),
		Config: &s2.BasinConfig{
			DefaultStreamConfig: &s2.StreamConfig{
				StorageClass: s2.Ptr(s2.StorageClassStandard),
			},
		},
	})
	if err != nil {
		log.Fatalf("create basin: %v", err)
	}

	fmt.Printf("created basin: %s (created_at=%s)\n", info.Name, info.CreatedAt)
}

func (*BasinsClient) Delete added in v0.11.0

func (b *BasinsClient) Delete(ctx context.Context, basinName BasinName) error

Delete a basin.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	ctx := context.Background()

	if err := client.Basins.Delete(ctx, "my-basin"); err != nil {
		log.Fatalf("delete basin: %v", err)
	}

	fmt.Println("basin deleted")
}

func (*BasinsClient) GetConfig added in v0.11.0

func (b *BasinsClient) GetConfig(ctx context.Context, basinName BasinName) (*BasinConfig, error)

Get basin configuration.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	ctx := context.Background()

	config, err := client.Basins.GetConfig(ctx, "my-basin")
	if err != nil {
		log.Fatalf("get basin config: %v", err)
	}

	fmt.Printf("basin config: %+v\n", config)
}

func (*BasinsClient) Iter added in v0.11.0

Iterate over basins. By default, basins that are being deleted are excluded. Set IncludeDeleted to true to include them.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	ctx := context.Background()

	iter := client.Basins.Iter(ctx, &s2.ListBasinsArgs{
		Prefix: "my-",
	})
	for iter.Next() {
		basin := iter.Value()
		fmt.Printf("basin: %s (scope=%s, created_at=%s)\n", basin.Name, basin.Scope, basin.CreatedAt)
	}
	if err := iter.Err(); err != nil {
		log.Fatalf("list basins: %v", err)
	}
}

func (*BasinsClient) List added in v0.11.0

List basins.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	ctx := context.Background()

	resp, err := client.Basins.List(ctx, &s2.ListBasinsArgs{
		Prefix: "my-",
		Limit:  s2.Ptr(10),
	})
	if err != nil {
		log.Fatalf("list basins: %v", err)
	}

	for _, basin := range resp.Basins {
		fmt.Printf("basin: %s (scope=%s, created_at=%s)\n", basin.Name, basin.Scope, basin.CreatedAt)
	}
	fmt.Printf("has more: %v\n", resp.HasMore)
}

func (*BasinsClient) Reconfigure added in v0.11.0

func (b *BasinsClient) Reconfigure(ctx context.Context, args ReconfigureBasinArgs) (*BasinConfig, error)

Reconfigure a basin.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	ctx := context.Background()

	config, err := client.Basins.Reconfigure(ctx, s2.ReconfigureBasinArgs{
		Basin: "my-basin",
		Config: s2.BasinReconfiguration{
			CreateStreamOnAppend: s2.Ptr(true),
		},
	})
	if err != nil {
		log.Fatalf("reconfigure basin: %v", err)
	}

	fmt.Printf("basin config: %+v\n", config)
}

type BasinsIterator added in v0.11.0

type BasinsIterator struct {
	// contains filtered or unexported fields
}

Iterator over basins returned by BasinsClient.Iter. Use Next to advance, Value to get the current item, and Err to check for errors.

func (*BasinsIterator) Err added in v0.11.0

func (it *BasinsIterator) Err() error

Returns any error encountered during iteration. Should be called after Next returns false to check if iteration stopped due to an error.

func (*BasinsIterator) Next added in v0.11.0

func (it *BasinsIterator) Next() bool

Advances the iterator to the next basin. Returns true if there is a next item, false when iteration is complete or an error occurred. Call Err after iteration to check for errors.

func (*BasinsIterator) Value added in v0.11.0

func (it *BasinsIterator) Value() BasinInfo

Returns the current basin. Only valid after a successful call to Next.

type BatchOutput added in v0.11.0

type BatchOutput struct {
	// Input contains the records to append.
	Input *AppendInput
	// contains filtered or unexported fields
}

BatchOutput represents a flushed batch ready for appending.

type BatchSubmitTicket added in v0.11.0

type BatchSubmitTicket struct {
	// contains filtered or unexported fields
}

Resolves with the AppendAck once the batch is durable.

func (*BatchSubmitTicket) Ack added in v0.11.0

Resolves BatchSubmitTicket with the AppendAck once the batch is durable.

Example
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	stream := client.Basin("my-basin").Stream("my-stream")

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	session, err := stream.AppendSession(ctx, nil)
	if err != nil {
		log.Fatalf("open append session: %v", err)
	}
	defer session.Close()

	fut, err := session.Submit(&s2.AppendInput{
		Records: []s2.AppendRecord{
			{Body: []byte("first")},
			{Body: []byte("second")},
		},
	})
	if err != nil {
		log.Fatalf("submit append: %v", err)
	}

	ticket, err := fut.Wait(ctx)
	if err != nil {
		log.Fatalf("enqueue failed: %v", err)
	}

	ack, err := ticket.Ack(ctx)
	if err != nil {
		log.Fatalf("append failed: %v", err)
	}
	fmt.Printf("acknowledged records %d-%d\n", ack.Start.SeqNum, ack.End.SeqNum-1)
}

type Batcher added in v0.11.0

type Batcher struct {
	// contains filtered or unexported fields
}

Accumulates AppendRecords and emits them as batches based on configurable size, count, or time thresholds.

func NewBatcher added in v0.11.0

func NewBatcher(ctx context.Context, opts *BatchingOptions) *Batcher

Create a new Batcher that accumulates AppendRecords and emits them as batches based on configurable size, count, or time thresholds.

func (*Batcher) Add added in v0.11.0

func (b *Batcher) Add(record AppendRecord, resultCh chan *producerOutcome) error

Adds a record to the current batch.

func (*Batcher) Batches added in v0.11.0

func (b *Batcher) Batches() <-chan *BatchOutput

Batches returns a receive-only channel of flushed batches.

func (*Batcher) Close added in v0.11.0

func (b *Batcher) Close()

Flushes any remaining records and closes the batcher.

func (*Batcher) Flush added in v0.11.0

func (b *Batcher) Flush()

Flush forces the current batch to be emitted immediately.

type BatchingOptions added in v0.11.0

type BatchingOptions struct {
	// Duration to wait before flushing a batch (default: 5ms)
	Linger time.Duration
	// Maximum number of records in a batch (default: 1000, max: 1000)
	MaxRecords int
	// Maximum batch size in metered bytes (default: 1 MiB, max: 1 MiB)
	MaxMeteredBytes uint64
	// Optional sequence number to match for first batch (auto-increments for subsequent batches)
	MatchSeqNum *uint64
	// Optional fencing token to enforce (remains static across batches)
	FencingToken *string
	// Buffer size for the internal batches channel (default: 16)
	ChannelBuffer int
}

type Client

type Client struct {

	// Client for access tokens.
	AccessTokens *AccessTokensClient
	// Client for basins.
	Basins *BasinsClient
	// Client for metrics.
	Metrics *MetricsClient
	// contains filtered or unexported fields
}

func New added in v0.11.0

func New(accessToken string, opts *ClientOptions) *Client

Create a new Client.

func NewFromEnvironment added in v0.11.0

func NewFromEnvironment(opts *ClientOptions) *Client

Create a client using configuration from environment variables. Environment variables: S2_ACCESS_TOKEN, S2_ACCOUNT_ENDPOINT, S2_BASIN_ENDPOINT. ClientOptions fields override environment variables. Panics if S2_ACCESS_TOKEN is not set.

func (*Client) Basin added in v0.11.0

func (c *Client) Basin(name string) *BasinClient

Create a new BasinClient.

type ClientOptions added in v0.11.0

type ClientOptions struct {
	// Endpoint to connect to S2.
	// Defaults to "https://aws.s2.dev/v1".
	BaseURL string
	// HTTP client used for requests.
	HTTPClient *http.Client
	// Allows customizing how basin endpoints are constructed.
	// If provided, this function is used to derive the endpoint for a given basin.
	// When provided, the "s2-basin" HTTP header is automatically included in basin-scoped requests.
	MakeBasinBaseURL func(basin string) string
	// Retry configuration.
	RetryConfig *RetryConfig
	// Get SDK level logs.
	Logger *slog.Logger
	// Overall timeout for HTTP requests.
	// Defaults to 5 seconds.
	RequestTimeout time.Duration
	// Timeout for establishing TCP connections.
	// Defaults to 3 seconds.
	ConnectionTimeout time.Duration
	// Compression algorithm for request bodies.
	// Defaults to CompressionNone (no compression).
	Compression CompressionType
}

func LoadConfigFromEnv added in v0.11.8

func LoadConfigFromEnv() *ClientOptions

Returns ClientOptions with endpoints from S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT.

type CompressionType added in v0.11.0

type CompressionType = internalframing.CompressionType

type Config added in v0.11.0

type Config struct {
	AccessToken     string
	AccountTemplate *endpointTemplate
	BasinTemplate   *endpointTemplate
}

type CreateBasinArgs added in v0.11.0

type CreateBasinArgs struct {
	// Basin name which must be globally unique.
	// It can be between 8 and 48 characters in length, and comprise lowercase letters, numbers and hyphens.
	// It cannot begin or end with a hyphen.
	Basin BasinName `json:"basin"`
	// Basin configuration.
	Config *BasinConfig `json:"config,omitempty"`
	// Basin scope.
	Scope *BasinScope `json:"scope,omitempty"`
}

type CreateStreamArgs added in v0.11.0

type CreateStreamArgs struct {
	// Stream name.
	Stream StreamName `json:"stream"`
	// Stream configuration.
	Config *StreamConfig `json:"config,omitempty"`
}

type DeleteOnEmptyConfig added in v0.11.0

type DeleteOnEmptyConfig struct {
	// Minimum age in seconds before an empty stream can be deleted.
	// Set to 0 (default) to disable delete-on-empty (don't delete automatically).
	MinAgeSecs *int64 `json:"min_age_secs,omitempty"`
}

type DeleteOnEmptyReconfiguration added in v0.11.0

type DeleteOnEmptyReconfiguration struct {
	// Minimum age in seconds before an empty stream can be deleted.
	// Set to 0 to disable delete-on-empty (don't delete automatically).
	MinAgeSecs *int64 `json:"min_age_secs,omitempty"`
}

type ErrorInfo added in v0.11.0

type ErrorInfo struct {
	Code    string `json:"code"`
	Message string `json:"message"`
}

type FencingTokenMismatchError added in v0.11.0

type FencingTokenMismatchError struct {
	*S2Error
	ExpectedFencingToken string
}

func (*FencingTokenMismatchError) Unwrap added in v0.11.0

func (e *FencingTokenMismatchError) Unwrap() error

type GaugeMetric added in v0.11.0

type GaugeMetric struct {
	// Timeseries name.
	Name string `json:"name"`
	// Unit of the metric.
	Unit MetricUnit `json:"unit"`
	// Timeseries values.
	// Each element is a tuple of a timestamp in Unix epoch seconds and a data point.
	// The data point represents the value at the instant of the timestamp.
	Values []MetricSample `json:"values"`
}

Named series of `(timestamp, value)` points each representing an instantaneous value.

type Header struct {
	// Header name.
	// The name cannot be empty, with the exception of an S2 command record.
	Name []byte
	// Header value.
	Value []byte
}

Header adds structured information to a record as a name-value pair.

func NewHeader added in v0.11.0

func NewHeader(name, value string) Header

Creates a Header from string name and value.

type IndexedAppendAck added in v0.11.0

type IndexedAppendAck struct {
	// contains filtered or unexported fields
}

Represents the acknowledgment for a single record within a batch.

func (*IndexedAppendAck) BatchAppendAck added in v0.11.0

func (a *IndexedAppendAck) BatchAppendAck() *AppendAck

Returns the underlying batch AppendAck.

func (*IndexedAppendAck) SeqNum added in v0.11.0

func (a *IndexedAppendAck) SeqNum() uint64

Returns the sequence number assigned to this specific record.

type InfiniteRetention added in v0.11.0

type InfiniteRetention struct{}

Retain records unless explicitly trimmed.

type IssueAccessTokenArgs added in v0.11.0

type IssueAccessTokenArgs struct {
	// Access token ID.
	// It must be unique to the account and between 1 and 96 bytes in length.
	ID AccessTokenID `json:"id"`
	// Access token scope.
	Scope AccessTokenScope `json:"scope"`
	// Namespace streams based on the configured stream-level scope, which must be a prefix.
	// Stream name arguments will be automatically prefixed, and the prefix will be stripped when listing streams.
	AutoPrefixStreams bool `json:"auto_prefix_streams,omitempty"`
	// Expiration time. If not set, the expiration will be set to that of the requestor's token.
	ExpiresAt *time.Time `json:"expires_at,omitempty"`
}

type IssueAccessTokenResponse added in v0.11.0

type IssueAccessTokenResponse struct {
	// Created access token.
	AccessToken string `json:"access_token"`
}

type LabelMetric added in v0.11.0

type LabelMetric struct {
	// Label name.
	Name string `json:"name"`
	// Label values.
	Values []string `json:"values"`
}

Set of string labels.

type ListAccessTokensArgs added in v0.11.0

type ListAccessTokensArgs struct {
	// Filter to access tokens whose ID begins with this prefix.
	Prefix string `json:"prefix,omitempty"`
	// Filter to access tokens whose ID lexicographically starts after this string.
	StartAfter string `json:"start_after,omitempty"`
	// Number of results, up to a maximum of 1000.
	Limit *int `json:"limit,omitempty"`
}

type ListAccessTokensResponse added in v0.8.0

type ListAccessTokensResponse struct {
	// Matching access tokens.
	AccessTokens []AccessTokenInfo `json:"access_tokens"`
	// Indicates that there are more access tokens that match the criteria.
	HasMore bool `json:"has_more"`
}

type ListBasinsArgs added in v0.11.0

type ListBasinsArgs struct {
	// Filter to basins whose names begin with this prefix.
	Prefix string `json:"prefix,omitempty"`
	// Filter to basins whose names lexicographically start after this string.
	// It must be greater than or equal to the `prefix` if specified.
	StartAfter string `json:"start_after,omitempty"`
	// Number of results, up to a maximum of 1000.
	Limit *int `json:"limit,omitempty"`
	// Include basins that are being deleted (default: false).
	// Only applies to the Iter method.
	IncludeDeleted bool `json:"-"`
}

type ListBasinsResponse

type ListBasinsResponse struct {
	// Matching basins.
	Basins []BasinInfo `json:"basins"`
	// Indicates that there are more basins that match the criteria.
	HasMore bool `json:"has_more"`
}

type ListStreamsArgs added in v0.11.0

type ListStreamsArgs struct {
	// Filter to streams whose name begins with this prefix.
	Prefix string `json:"prefix,omitempty"`
	// Filter to streams whose name begins with this prefix.
	// It must be greater than or equal to the `prefix` if specified.
	StartAfter string `json:"start_after,omitempty"`
	// Number of results, up to a maximum of 1000.
	Limit *int `json:"limit,omitempty"`
	// Include streams that are being deleted (default: false).
	// Only applies to the Iter method.
	IncludeDeleted bool `json:"-"`
}

type ListStreamsResponse

type ListStreamsResponse struct {
	// Matching streams.
	Streams []StreamInfo `json:"streams"`
	// Indicates that there are more results that match the criteria.
	HasMore bool `json:"has_more"`
}

type Metric added in v0.11.0

type Metric struct {
	// Single named value.
	Scalar *ScalarMetric `json:"scalar,omitempty"`
	// Named series of `(timestamp, value)` points representing an accumulation over a specified bucket.
	Accumulation *AccumulationMetric `json:"accumulation,omitempty"`
	// Named series of `(timestamp, value)` points each representing an instantaneous value.
	Gauge *GaugeMetric `json:"gauge,omitempty"`
	// Set of string labels.
	Label *LabelMetric `json:"label,omitempty"`
}

type MetricSample added in v0.11.0

type MetricSample [2]float64

type MetricSetResponse added in v0.11.0

type MetricSetResponse struct {
	// Metrics comprising the set.
	Values []Metric `json:"values"`
}

type MetricUnit added in v0.11.0

type MetricUnit string
const (
	MetricUnitBytes      MetricUnit = "bytes"
	MetricUnitOperations MetricUnit = "operations"
)

type MetricsClient added in v0.11.0

type MetricsClient struct {
	// contains filtered or unexported fields
}

Metrics client.

func (*MetricsClient) Account added in v0.11.0

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)

	metrics, err := client.Metrics.Account(context.Background(), &s2.AccountMetricsArgs{
		Set: s2.AccountMetricSetActiveBasins,
	})
	if err != nil {
		log.Fatalf("account metrics: %v", err)
	}

	for _, m := range metrics.Values {
		if m.Scalar != nil {
			fmt.Printf("%s: %.0f %s\n", m.Scalar.Name, m.Scalar.Value, m.Scalar.Unit)
		}
	}
}

func (*MetricsClient) Basin added in v0.11.0

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)

	metrics, err := client.Metrics.Basin(context.Background(), &s2.BasinMetricsArgs{
		Basin: "my-basin",
		Set:   s2.BasinMetricSetStorage,
	})
	if err != nil {
		log.Fatalf("basin metrics: %v", err)
	}

	for _, m := range metrics.Values {
		if m.Scalar != nil {
			fmt.Printf("%s: %.0f %s\n", m.Scalar.Name, m.Scalar.Value, m.Scalar.Unit)
		}
	}
}

func (*MetricsClient) Stream added in v0.11.0

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)

	metrics, err := client.Metrics.Stream(context.Background(), &s2.StreamMetricsArgs{
		Basin:  "my-basin",
		Stream: "my-stream",
		Set:    s2.StreamMetricSetStorage,
	})
	if err != nil {
		log.Fatalf("stream metrics: %v", err)
	}

	for _, m := range metrics.Values {
		if m.Scalar != nil {
			fmt.Printf("%s: %.0f %s\n", m.Scalar.Name, m.Scalar.Value, m.Scalar.Unit)
		}
	}
}

type PermittedOperationGroups added in v0.8.0

type PermittedOperationGroups struct {
	// Account-level access permissions.
	Account *ReadWritePermissions `json:"account,omitempty"`
	// Basin-level access permissions.
	Basin *ReadWritePermissions `json:"basin,omitempty"`
	// Stream-level access permissions.
	Stream *ReadWritePermissions `json:"stream,omitempty"`
}

Access permissions at operation group level.

type Producer added in v0.11.0

type Producer struct {
	// contains filtered or unexported fields
}

Producer provides per-record append semantics on top of a batched AppendSession.

  • submit(record) returns a RecordSubmitFuture that resolves once the record has been accepted (written to the batcher). Backpressure is applied automatically via the batcher when the AppendSession is at capacity.
  • ticket.Ack() returns an IndexedAppendAck that resolves once the record is durable.
Example
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	stream := client.Basin("my-basin").Stream("my-stream")

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	session, err := stream.AppendSession(ctx, nil)
	if err != nil {
		log.Fatalf("open append session: %v", err)
	}
	defer session.Close()

	batcher := s2.NewBatcher(ctx, &s2.BatchingOptions{
		Linger:     100 * time.Millisecond,
		MaxRecords: 500,
	})
	producer := s2.NewProducer(ctx, batcher, session)
	defer producer.Close()

	for i := range 100 {
		fut, err := producer.Submit(s2.AppendRecord{
			Body: []byte(fmt.Sprintf("record %d", i)),
		})
		if err != nil {
			log.Fatalf("producer submit: %v", err)
		}

		go func(f *s2.RecordSubmitFuture) {
			ticket, err := f.Wait(ctx)
			if err != nil {
				log.Printf("enqueue error: %v", err)
				return
			}
			ack, err := ticket.Ack(ctx)
			if err != nil {
				log.Printf("ack error: %v", err)
				return
			}
			fmt.Printf("ack: seqNum=%d\n", ack.SeqNum())
		}(fut)
	}

	time.Sleep(time.Second)
}

func NewProducer added in v0.11.0

func NewProducer(ctx context.Context, batcher *Batcher, session *AppendSession) *Producer

Create a new Producer.

func (*Producer) Close added in v0.11.0

func (p *Producer) Close() error

Stops the producer, flushes pending batches, and waits for in-flight acks. Close blocks until in-flight work completes or the producer context is canceled.

func (*Producer) Submit added in v0.11.0

func (p *Producer) Submit(record AppendRecord) (*RecordSubmitFuture, error)

Submit a single record for appending. Returns a RecordSubmitFuture that resolves to a RecordSubmitTicket once the record has been accepted. Blocks if the underlying AppendSession is at capacity.

type RangeNotSatisfiableError added in v0.11.0

type RangeNotSatisfiableError struct {
	*S2Error
	// The current tail position of the stream, if available.
	Tail *StreamPosition
}

func (*RangeNotSatisfiableError) Unwrap added in v0.11.0

func (e *RangeNotSatisfiableError) Unwrap() error

type ReadBatch added in v0.11.0

type ReadBatch struct {
	// Records that are durably sequenced on the stream, retrieved based on the requested criteria.
	// This can only be empty in response to a unary read (i.e. not SSE),
	// if the request cannot be satisfied without violating an explicit bound (`count`, `bytes`, or `until`).
	Records []SequencedRecord `json:"records"`
	// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record.
	// This will only be present when reading recent records.
	Tail *StreamPosition `json:"tail,omitempty"`
}

type ReadOptions added in v0.11.0

type ReadOptions struct {
	// Start from a sequence number.
	SeqNum *uint64 `json:"seq_num,omitempty"`
	// Start from a timestamp.
	Timestamp *uint64 `json:"timestamp,omitempty"`
	// Start from number of records before the next sequence number.
	TailOffset *int64 `json:"tail_offset,omitempty"`
	// Record count limit.
	// Non-streaming reads are capped by the default limit of 1000 records.
	Count *uint64 `json:"count,omitempty"`
	// Metered bytes limit.
	// Non-streaming reads are capped by the default limit of 1 MiB.
	Bytes *uint64 `json:"bytes,omitempty"`
	// Duration in seconds to wait for new records.
	// The default duration is 0 if there is a bound on `count`, `bytes`, or `until`, and otherwise infinite.
	// Non-streaming reads are always bounded on `count` and `bytes`, so you can achieve long poll semantics by specifying a non-zero duration up to 60 seconds.
	// In the context of an SSE or S2S streaming read, the duration will bound how much time can elapse between records throughout the lifetime of the session.
	Wait *int32 `json:"wait,omitempty"`
	// Exclusive timestamp to read until.
	Until *uint64 `json:"until,omitempty"`
	// Start reading from the tail if the requested position is beyond it.
	// Otherwise, a `416 Range Not Satisfiable` response is returned.
	Clamp *bool `json:"clamp,omitempty"`
	// Whether to filter out command records from read results.
	// Filtering is performed client-side.
	// Defaults to false.
	IgnoreCommandRecords bool `json:"-"`
}

type ReadSession added in v0.11.0

type ReadSession struct {
	// contains filtered or unexported fields
}

func (*ReadSession) Close added in v0.11.0

func (s *ReadSession) Close() error

Stops the session.

func (*ReadSession) Err added in v0.11.0

func (s *ReadSession) Err() error

Reports the terminal error (if any) that caused Next to stop.

func (*ReadSession) LastObservedTail added in v0.11.0

func (s *ReadSession) LastObservedTail() *StreamPosition

Returns the last observed tail position, if known.

func (*ReadSession) Next added in v0.11.0

func (s *ReadSession) Next() bool

Blocks until a record is available, the context is done, or the session encounters a fatal error. When it returns true, Record() yields the fetched record. When it returns false, call Err().

func (*ReadSession) NextReadPosition added in v0.11.0

func (s *ReadSession) NextReadPosition() *StreamPosition

Returns the next read position, if known.

func (*ReadSession) Record added in v0.11.0

func (s *ReadSession) Record() SequencedRecord

Returns the most recent record fetched by Next.

type ReadWritePermissions added in v0.8.0

type ReadWritePermissions struct {
	// Read permission.
	Read bool `json:"read,omitempty"`
	// Write permission.
	Write bool `json:"write,omitempty"`
}

type ReconfigureBasinArgs added in v0.11.0

type ReconfigureBasinArgs struct {
	// Basin name.
	Basin BasinName
	// Basin reconfiguration.
	Config BasinReconfiguration
}

type ReconfigureStreamArgs added in v0.11.0

type ReconfigureStreamArgs struct {
	// Stream name.
	Stream StreamName
	// Stream reconfiguration.
	Config StreamReconfiguration
}

type RecordSubmitFuture added in v0.11.0

type RecordSubmitFuture struct {
	// contains filtered or unexported fields
}

Represents a pending single-record submission to a Producer.

func (*RecordSubmitFuture) Wait added in v0.11.0

Blocks until the record is accepted and returns a RecordSubmitTicket.

type RecordSubmitTicket added in v0.11.0

type RecordSubmitTicket struct {
	// contains filtered or unexported fields
}

Returned after a record is accepted by the Producer. Use RecordSubmitTicket.Ack to wait for the record to become durable.

func (*RecordSubmitTicket) Ack added in v0.11.0

Blocks until the record is durable and returns the IndexedAppendAck.

type ResourceSet added in v0.8.0

type ResourceSet struct {
	// Match only the resource with this exact name.
	// Use an empty string to match no resources.
	Exact *string `json:"exact,omitempty"`
	// Match all resources that start with this prefix.
	// Use an empty string to match all resource.
	Prefix *string `json:"prefix,omitempty"`
}

type RetentionPolicy added in v0.8.0

type RetentionPolicy struct {
	// Age limits records to a specific age window (seconds).
	Age *int64 `json:"age,omitempty"`
	// Retain records unless explicitly trimmed.
	Infinite *InfiniteRetention `json:"infinite,omitempty"`
}

type RetryConfig added in v0.11.0

type RetryConfig struct {
	// Total number of attempts, including the initial try. Must be >= 1. A value of 1 means no retries.
	MaxAttempts int
	// Minimum base delay for exponential backoff. Defaults to 100ms.
	MinBaseDelay time.Duration
	// Maximum base delay for exponential backoff. Defaults to 1s.
	// Note: actual delay with jitter can be up to 2x this value.
	MaxBaseDelay time.Duration
	// Policy for retrying append operations.
	AppendRetryPolicy AppendRetryPolicy
}

Retry configuration.

type RevokeAccessTokenArgs added in v0.11.0

type RevokeAccessTokenArgs struct {
	// Access token ID.
	ID AccessTokenID `json:"id"`
}

type S2Error added in v0.11.0

type S2Error struct {
	Message string
	Code    string
	Status  int
	Origin  string // "server", "sdk", "network"
}

func (*S2Error) Error added in v0.11.0

func (e *S2Error) Error() string

func (*S2Error) HasNoSideEffects added in v0.13.3

func (e *S2Error) HasNoSideEffects() bool

HasNoSideEffects reports whether this error guarantees no mutation occurred.

func (*S2Error) IsNetworkError added in v0.11.0

func (e *S2Error) IsNetworkError() bool

func (*S2Error) IsRetryable added in v0.11.0

func (e *S2Error) IsRetryable() bool

type ScalarMetric added in v0.11.0

type ScalarMetric struct {
	// Metric name.
	Name string `json:"name"`
	// Unit of the metric.
	Unit MetricUnit `json:"unit"`
	// Metric value.
	Value float64 `json:"value"`
}

type SeqNumMismatchError added in v0.11.0

type SeqNumMismatchError struct {
	*S2Error
	ExpectedSeqNum uint64
}

func (*SeqNumMismatchError) Unwrap added in v0.11.0

func (e *SeqNumMismatchError) Unwrap() error

type SequencedRecord

type SequencedRecord struct {
	// Body of the record.
	Body []byte
	// Series of name-value pairs for this record.
	Headers []Header
	// Sequence number assigned by the service.
	SeqNum uint64
	// Timestamp for this record.
	Timestamp uint64
}

func (SequencedRecord) IsCommandRecord added in v0.13.0

func (r SequencedRecord) IsCommandRecord() bool

Reports whether this record is a command record. Command records have exactly one header with an empty name.

type StorageClass

type StorageClass string
const (
	StorageClassStandard StorageClass = "standard"
	StorageClassExpress  StorageClass = "express"
)

type StreamClient

type StreamClient struct {
	// contains filtered or unexported fields
}

func (*StreamClient) Append

func (s *StreamClient) Append(ctx context.Context, input *AppendInput) (*AppendAck, error)

Appends one or more records to the stream. All records in a single append call must use the same format (either all string or all bytes). For high-throughput sequential appends, use AppendSession instead.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	stream := client.Basin("my-basin").Stream("my-stream")
	ctx := context.Background()

	ack, err := stream.Append(ctx, &s2.AppendInput{
		Records: []s2.AppendRecord{
			{Body: []byte("hello world")},
		},
	})
	if err != nil {
		log.Fatalf("append: %v", err)
	}

	fmt.Printf("appended at seq=%d\n", ack.Start.SeqNum)
}

func (*StreamClient) AppendSession

func (s *StreamClient) AppendSession(ctx context.Context, opts *AppendSessionOptions) (*AppendSession, error)

Creates an append session that guarantees ordering of submissions. Use this to coordinate high-throughput, sequential appends with backpressure.

Example
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	stream := client.Basin("my-basin").Stream("my-stream")

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	session, err := stream.AppendSession(ctx, nil)
	if err != nil {
		log.Fatalf("open append session: %v", err)
	}
	defer session.Close()

	fut, err := session.Submit(&s2.AppendInput{
		Records: []s2.AppendRecord{
			{Body: []byte("hello")},
			{Body: []byte("world")},
		},
	})
	if err != nil {
		log.Fatalf("submit: %v", err)
	}

	ticket, err := fut.Wait(ctx)
	if err != nil {
		log.Fatalf("enqueue: %v", err)
	}

	ack, err := ticket.Ack(ctx)
	if err != nil {
		log.Fatalf("get ack: %v", err)
	}

	fmt.Printf("appended records %d to %d\n", ack.Start.SeqNum, ack.End.SeqNum)
}

func (*StreamClient) CheckTail

func (s *StreamClient) CheckTail(ctx context.Context) (*TailResponse, error)

Check the tail of the stream. Returns the next sequence number and timestamp to be assigned (tail).

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	stream := client.Basin("my-basin").Stream("my-stream")
	ctx := context.Background()

	tail, err := stream.CheckTail(ctx)
	if err != nil {
		log.Fatalf("check tail: %v", err)
	}

	fmt.Printf("tail: seq=%d ts=%d\n", tail.Tail.SeqNum, tail.Tail.Timestamp)
}

func (*StreamClient) Name added in v0.11.0

func (s *StreamClient) Name() StreamName

func (*StreamClient) Read

func (s *StreamClient) Read(ctx context.Context, opts *ReadOptions) (*ReadBatch, error)
Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	stream := client.Basin("my-basin").Stream("my-stream")
	ctx := context.Background()

	batch, err := stream.Read(ctx, &s2.ReadOptions{
		SeqNum: s2.Uint64(0),
		Count:  s2.Uint64(100),
	})
	if err != nil {
		log.Fatalf("read: %v", err)
	}

	for _, rec := range batch.Records {
		fmt.Printf("seq=%d body=%q\n", rec.SeqNum, string(rec.Body))
	}
}

func (*StreamClient) ReadSession

func (s *StreamClient) ReadSession(ctx context.Context, opts *ReadOptions) (*ReadSession, error)

Opens a streaming read session. Call Close when done consuming records.

Example
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	stream := client.Basin("my-basin").Stream("my-stream")

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	session, err := stream.ReadSession(ctx, &s2.ReadOptions{
		SeqNum: s2.Uint64(0), // Start from beginning
		Wait:   s2.Int32(10), // Wait up to 10s for new records
	})
	if err != nil {
		log.Fatalf("open read session: %v", err)
	}
	defer session.Close()

	for session.Next() {
		rec := session.Record()
		fmt.Printf("seq=%d body=%q\n", rec.SeqNum, string(rec.Body))
	}
	if err := session.Err(); err != nil {
		log.Fatalf("read session error: %v", err)
	}

	if next := session.NextReadPosition(); next != nil {
		fmt.Printf("resume from seq=%d\n", next.SeqNum)
	}
}

type StreamConfig

type StreamConfig struct {
	// Delete-on-empty configuration.
	DeleteOnEmpty *DeleteOnEmptyConfig `json:"delete_on_empty,omitempty"`
	// Retention policy for the stream.
	// If unspecified, the default is to retain records for 7 days.
	RetentionPolicy *RetentionPolicy `json:"retention_policy,omitempty"`
	// Storage class for recent writes.
	StorageClass *StorageClass `json:"storage_class,omitempty"`
	// Timestamping behavior.
	Timestamping *TimestampingConfig `json:"timestamping,omitempty"`
}

type StreamInfo

type StreamInfo struct {
	Name StreamName `json:"name"`
	// Creation time.
	CreatedAt time.Time `json:"created_at"`
	// Deletion time, if the stream is being deleted.
	DeletedAt *time.Time `json:"deleted_at,omitempty"`
}

type StreamMetricSet added in v0.11.0

type StreamMetricSet string
const (
	StreamMetricSetStorage StreamMetricSet = "storage"
)

type StreamMetricsArgs added in v0.11.0

type StreamMetricsArgs struct {
	// Basin name.
	Basin string `json:"basin"`
	// Stream name.
	Stream string `json:"stream"`
	// Metric set to return.
	Set StreamMetricSet `json:"set"`
	// Start timestamp as Unix epoch seconds, if applicable for the metric set.
	Start *int64 `json:"start,omitempty"`
	// End timestamp as Unix epoch seconds, if applicable for metric set.
	End *int64 `json:"end,omitempty"`
	// Interval to aggregate over for timeseries metric sets.
	Interval *TimeseriesInterval `json:"interval,omitempty"`
}

type StreamName added in v0.11.0

type StreamName string

Stream name that is unique to the basin. It can be between 1 and 512 bytes in length.

type StreamPosition added in v0.11.0

type StreamPosition struct {
	// Sequence number assigned by the service.
	SeqNum uint64 `json:"seq_num"`
	// Timestamp, which may be client-specified or assigned by the service.
	// If it is assigned by the service, it will represent milliseconds since Unix epoch.
	Timestamp uint64 `json:"timestamp"`
}

Position of a record in a stream.

type StreamReconfiguration added in v0.11.0

type StreamReconfiguration struct {
	// Delete-on-empty configuration.
	DeleteOnEmpty *DeleteOnEmptyReconfiguration `json:"delete_on_empty,omitempty"`
	// Retention policy for the stream.
	// If unspecified, the default is to retain records for 7 days.
	RetentionPolicy *RetentionPolicy `json:"retention_policy,omitempty"`
	// Storage class for recent writes.
	StorageClass *StorageClass `json:"storage_class,omitempty"`
	// Timestamping behavior.
	Timestamping *TimestampingReconfiguration `json:"timestamping,omitempty"`
}

type StreamsClient added in v0.11.0

type StreamsClient struct {
	// contains filtered or unexported fields
}

func (*StreamsClient) Create added in v0.11.0

func (s *StreamsClient) Create(ctx context.Context, args CreateStreamArgs) (*StreamInfo, error)

Create a stream.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	basin := client.Basin("my-basin")
	ctx := context.Background()

	info, err := basin.Streams.Create(ctx, s2.CreateStreamArgs{
		Stream: "my-stream",
		Config: &s2.StreamConfig{
			StorageClass: s2.Ptr(s2.StorageClassExpress),
			RetentionPolicy: &s2.RetentionPolicy{
				Age: s2.Ptr(int64(86400 * 7)), // 7 days
			},
		},
	})
	if err != nil {
		log.Fatalf("create stream: %v", err)
	}

	fmt.Printf("created stream: %s\n", info.Name)
}

func (*StreamsClient) Delete added in v0.11.0

func (s *StreamsClient) Delete(ctx context.Context, streamName StreamName) error

Delete a stream.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	basin := client.Basin("my-basin")
	ctx := context.Background()

	if err := basin.Streams.Delete(ctx, "my-stream"); err != nil {
		log.Fatalf("delete stream: %v", err)
	}

	fmt.Println("stream deleted")
}

func (*StreamsClient) GetConfig added in v0.11.0

func (s *StreamsClient) GetConfig(ctx context.Context, streamName StreamName) (*StreamConfig, error)

Get stream configuration.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	basin := client.Basin("my-basin")
	ctx := context.Background()

	config, err := basin.Streams.GetConfig(ctx, "my-stream")
	if err != nil {
		log.Fatalf("get stream config: %v", err)
	}

	fmt.Printf("stream config: %+v\n", config)
}

func (*StreamsClient) Iter added in v0.11.0

Iterate over streams in a basin. By default, streams that are being deleted are excluded. Set IncludeDeleted to true to include them.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	basin := client.Basin("my-basin")
	ctx := context.Background()

	iter := basin.Streams.Iter(ctx, &s2.ListStreamsArgs{
		Prefix: "events-",
	})
	for iter.Next() {
		stream := iter.Value()
		fmt.Printf("stream: %s (created=%s)\n", stream.Name, stream.CreatedAt)
	}
	if err := iter.Err(); err != nil {
		log.Fatalf("list streams: %v", err)
	}
}

func (*StreamsClient) List added in v0.11.0

List streams.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	basin := client.Basin("my-basin")
	ctx := context.Background()

	resp, err := basin.Streams.List(ctx, &s2.ListStreamsArgs{
		Prefix: "events-",
		Limit:  s2.Ptr(10),
	})
	if err != nil {
		log.Fatalf("list streams: %v", err)
	}

	for _, stream := range resp.Streams {
		fmt.Printf("stream: %s (created=%s)\n", stream.Name, stream.CreatedAt)
	}
	fmt.Printf("has more: %v\n", resp.HasMore)
}

func (*StreamsClient) Reconfigure added in v0.11.0

func (s *StreamsClient) Reconfigure(ctx context.Context, args ReconfigureStreamArgs) (*StreamConfig, error)

Reconfigure a stream.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/s2-streamstore/s2-sdk-go/s2"
)

func main() {
	client := s2.New("your-access-token", nil)
	basin := client.Basin("my-basin")
	ctx := context.Background()

	config, err := basin.Streams.Reconfigure(ctx, s2.ReconfigureStreamArgs{
		Stream: "my-stream",
		Config: s2.StreamReconfiguration{
			RetentionPolicy: &s2.RetentionPolicy{
				Age: s2.Ptr(int64(86400 * 30)),
			},
		},
	})
	if err != nil {
		log.Fatalf("reconfigure stream: %v", err)
	}

	fmt.Printf("stream config: %+v\n", config)
}

type StreamsIterator added in v0.11.0

type StreamsIterator struct {
	// contains filtered or unexported fields
}

Iterates over streams returned by StreamsClient.Iter. Use Next to advance, Value to get the current item, and Err to check for errors.

func (*StreamsIterator) Err added in v0.11.0

func (it *StreamsIterator) Err() error

Returns any error encountered during iteration. Should be called after Next returns false to check if iteration stopped due to an error.

func (*StreamsIterator) Next added in v0.11.0

func (it *StreamsIterator) Next() bool

Advances the iterator to the next stream. Returns true if there is a next item, false when iteration is complete or an error occurred. Call Err after iteration to check for errors.

func (*StreamsIterator) Value added in v0.11.0

func (it *StreamsIterator) Value() StreamInfo

Returns the current stream. Only valid after a successful call to Next.

type SubmitFuture added in v0.11.0

type SubmitFuture struct {
	// contains filtered or unexported fields
}

Represents a pending batch submission to an AppendSession. Call SubmitFuture.Wait to block until the batch is accepted.

func (*SubmitFuture) Wait added in v0.11.0

Blocks until the batch is accepted by the AppendSession and returns a BatchSubmitTicket that can be used to wait for the append acknowledgment.

type TailResponse added in v0.11.0

type TailResponse struct {
	// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record.
	Tail StreamPosition `json:"tail"`
}

type TimeseriesInterval added in v0.11.0

type TimeseriesInterval string
const (
	TimeseriesIntervalMinute TimeseriesInterval = "minute"
	TimeseriesIntervalHour   TimeseriesInterval = "hour"
	TimeseriesIntervalDay    TimeseriesInterval = "day"
)

type TimestampingConfig added in v0.11.0

type TimestampingConfig struct {
	// Timestamping mode for appends that influences how timestamps are handled.
	Mode *TimestampingMode `json:"mode,omitempty"`
	// Allow client-specified timestamps to exceed the arrival time.
	// If this is false or not set, client timestamps will be capped at the arrival time.
	Uncapped *bool `json:"uncapped,omitempty"`
}

Timestamping behavior.

type TimestampingMode added in v0.8.0

type TimestampingMode string
const (
	TimestampingModeClientPrefer  TimestampingMode = "client-prefer"
	TimestampingModeClientRequire TimestampingMode = "client-require"
	TimestampingModeArrival       TimestampingMode = "arrival"
)

type TimestampingReconfiguration added in v0.11.0

type TimestampingReconfiguration struct {
	// Timestamping mode for appends that influences how timestamps are handled.
	Mode *TimestampingMode `json:"mode,omitempty"`
	// Allow client-specified timestamps to exceed the arrival time.
	Uncapped *bool `json:"uncapped,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL