Documentation
¶
Overview ¶
Go SDK for S2.
The Go SDK provides ergonomic wrappers and utilities to interact with the S2 API.
Installation ¶
go get github.com/s2-streamstore/s2-sdk-go/s2@latest
Authentication ¶
Generate an authentication token by logging onto the web console at s2.dev.
Quick Start ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
ctx := context.Background()
client := s2.New("your-access-token", nil)
stream := client.Basin("my-basin").Stream("my-stream")
ack, err := stream.Append(ctx, &s2.AppendInput{
Records: []s2.AppendRecord{
{Body: []byte("first record")},
{Body: []byte("second record")},
},
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("wrote records at seq %d\n", ack.Start.SeqNum)
session, err := stream.ReadSession(ctx, &s2.ReadOptions{
SeqNum: s2.Uint64(0),
})
if err != nil {
log.Fatal(err)
}
defer session.Close()
for session.Next(ctx) {
rec := session.Record()
fmt.Printf("[%d] %s\n", rec.SeqNum, string(rec.Body))
}
if err := session.Err(); err != nil {
log.Fatal(err)
}
}
How the SDK is Organized ¶
The SDK has three levels of clients:
- Client — managing basins, access tokens, metrics
- BasinClient — managing streams within a basin
- StreamClient — reading and writing records
The clients can be accessed like so:
client := s2.New("token", nil) // account level
basin := client.Basin("my-basin") // basin level
stream := basin.Stream("my-stream") // stream level
Reading Records ¶
Read session
session, err := stream.ReadSession(ctx, &s2.ReadOptions{
SeqNum: s2.Uint64(0), // start from the beginning
Wait: s2.Int32(30), // wait up to 30s for new records
})
if err != nil {
log.Fatal(err)
}
defer session.Close()
for session.Next(ctx) {
rec := session.Record()
// process rec
}
if err := session.Err(); err != nil {
log.Fatal(err)
}
// resume from where you left off
if pos := session.NextReadPosition(); pos != nil {
fmt.Printf("next time, start from seq %d\n", pos.SeqNum)
}
Unary Read
batch, err := stream.Read(ctx, &s2.ReadOptions{
SeqNum: s2.Uint64(0),
Count: s2.Uint64(100),
})
if err != nil {
log.Fatal(err)
}
for _, rec := range batch.Records {
fmt.Printf("[%d] %s\n", rec.SeqNum, rec.Body)
}
Writing Records ¶
Append Session (pipelining)
session, err := stream.AppendSession(ctx, nil)
if err != nil {
log.Fatal(err)
}
defer session.Close()
future, err := session.Submit(&s2.AppendInput{
Records: []s2.AppendRecord{{Body: []byte("hello")}},
})
if err != nil {
log.Fatal(err)
}
ticket, err := future.Wait(ctx)
if err != nil {
log.Fatal(err)
}
ack, err := ticket.Ack(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Printf("written at seq %d\n", ack.Start.SeqNum)
Unary Append
ack, err := stream.Append(ctx, &s2.AppendInput{
Records: []s2.AppendRecord{
{Body: []byte("hello")},
{Body: []byte("world")},
},
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("wrote %d records starting at seq %d\n",
ack.End.SeqNum-ack.Start.SeqNum, ack.Start.SeqNum)
Batching with Producer ¶
Group records based on the amount collected or a linger time:
session, _ := stream.AppendSession(ctx, nil)
defer session.Close()
batcher := s2.NewBatcher(ctx, &s2.BatchingOptions{
Linger: 100 * time.Millisecond, // flush after 100ms
MaxRecords: 500, // or after 500 records
})
producer := s2.NewProducer(ctx, batcher, session)
defer producer.Close()
for i := 0; i < 1000; i++ {
future, _ := producer.Submit(s2.AppendRecord{
Body: []byte(fmt.Sprintf("record %d", i)),
})
go func(f *s2.RecordSubmitFuture) {
ticket, _ := f.Wait(ctx)
ack, err := ticket.Ack(ctx)
if err != nil {
log.Printf("ack error: %v", err)
return
}
log.Printf("seq: %d", ack.SeqNum())
}(future)
}
Managing Basins and Streams ¶
Basins
// Create a basin
info, err := client.Basins.Create(ctx, s2.CreateBasinArgs{
Basin: "my-basin",
Scope: s2.Ptr(s2.BasinScopeAwsUsEast1),
})
// List basins
iter := client.Basins.Iter(ctx, nil)
for iter.Next() {
fmt.Println(iter.Value().Name)
}
// Delete a basin
client.Basins.Delete(ctx, "my-basin")
Streams
basin := client.Basin("my-basin")
// Create a stream
info, err := basin.Streams.Create(ctx, s2.CreateStreamArgs{
Stream: "my-stream",
Config: &s2.StreamConfig{
StorageClass: s2.Ptr(s2.StorageClassExpress),
RetentionPolicy: &s2.RetentionPolicy{
Age: s2.Ptr(int64(86400 * 7)), // keep for 7 days
},
},
})
// List streams
iter := basin.Streams.Iter(ctx, nil)
for iter.Next() {
fmt.Println(iter.Value().Name)
}
// Delete a stream
basin.Streams.Delete(ctx, "my-stream")
Access Tokens ¶
You can create scoped tokens for your applications:
resp, err := client.AccessTokens.Issue(ctx, s2.IssueAccessTokenArgs{
ID: "my-service",
Scope: s2.AccessTokenScope{
Basins: &s2.ResourceSet{Prefix: s2.Ptr("prod-")},
OpGroups: &s2.PermittedOperationGroups{
Stream: &s2.ReadWritePermissions{Read: true, Write: true},
},
},
})
fmt.Printf("token: %s\n", resp.AccessToken)
// Revoke
client.AccessTokens.Revoke(ctx, s2.RevokeAccessTokenArgs{ID: "my-service"})
Error Handling ¶
Errors from the SDK are typically *S2Error which includes the status code and other info:
ack, err := stream.Append(ctx, input)
if err != nil {
var s2Err *s2.S2Error
if errors.As(err, &s2Err) {
fmt.Printf("S2 error: %s (status %d, retryable: %v)\n",
s2Err.Message, s2Err.Status, s2Err.IsRetryable())
}
}
Index ¶
- Constants
- Variables
- func Bool(v bool) *bool
- func Int32(v int32) *int32
- func Int64(v int64) *int64
- func MeteredBatchBytes(records []AppendRecord) int64
- func MeteredPayloadBytes(record AppendRecord) uint64
- func MeteredSequencedRecordBytes(record SequencedRecord) uint64
- func Ptr[T any](v T) *T
- func String(v string) *string
- func Uint32(v uint32) *uint32
- func Uint64(v uint64) *uint64
- type AccessTokenID
- type AccessTokenInfo
- type AccessTokenIterator
- type AccessTokenScope
- type AccessTokensClient
- func (a *AccessTokensClient) Issue(ctx context.Context, args IssueAccessTokenArgs) (*IssueAccessTokenResponse, error)
- func (a *AccessTokensClient) Iter(ctx context.Context, args *ListAccessTokensArgs) *AccessTokenIterator
- func (a *AccessTokensClient) List(ctx context.Context, args *ListAccessTokensArgs) (*ListAccessTokensResponse, error)
- func (a *AccessTokensClient) Revoke(ctx context.Context, args RevokeAccessTokenArgs) error
- type AccountMetricSet
- type AccountMetricsArgs
- type AccumulationMetric
- type AppendAck
- type AppendInput
- type AppendRecord
- type AppendRetryPolicy
- type AppendSession
- type AppendSessionOptions
- type BasinClient
- type BasinConfig
- type BasinInfo
- type BasinMetricSet
- type BasinMetricsArgs
- type BasinName
- type BasinReconfiguration
- type BasinScope
- type BasinsClient
- func (b *BasinsClient) Create(ctx context.Context, args CreateBasinArgs) (*BasinInfo, error)
- func (b *BasinsClient) Delete(ctx context.Context, basinName BasinName) error
- func (b *BasinsClient) GetConfig(ctx context.Context, basinName BasinName) (*BasinConfig, error)
- func (b *BasinsClient) Iter(ctx context.Context, args *ListBasinsArgs) *BasinsIterator
- func (b *BasinsClient) List(ctx context.Context, args *ListBasinsArgs) (*ListBasinsResponse, error)
- func (b *BasinsClient) Reconfigure(ctx context.Context, args ReconfigureBasinArgs) (*BasinConfig, error)
- type BasinsIterator
- type BatchOutput
- type BatchSubmitTicket
- type Batcher
- type BatchingOptions
- type Client
- type ClientOptions
- type CompressionType
- type Config
- type CreateBasinArgs
- type CreateStreamArgs
- type DeleteOnEmptyConfig
- type DeleteOnEmptyReconfiguration
- type ErrorInfo
- type FencingTokenMismatchError
- type GaugeMetric
- type Header
- type IndexedAppendAck
- type InfiniteRetention
- type IssueAccessTokenArgs
- type IssueAccessTokenResponse
- type LabelMetric
- type ListAccessTokensArgs
- type ListAccessTokensResponse
- type ListBasinsArgs
- type ListBasinsResponse
- type ListStreamsArgs
- type ListStreamsResponse
- type Metric
- type MetricSample
- type MetricSetResponse
- type MetricUnit
- type MetricsClient
- func (m *MetricsClient) Account(ctx context.Context, args *AccountMetricsArgs) (*MetricSetResponse, error)
- func (m *MetricsClient) Basin(ctx context.Context, args *BasinMetricsArgs) (*MetricSetResponse, error)
- func (m *MetricsClient) Stream(ctx context.Context, args *StreamMetricsArgs) (*MetricSetResponse, error)
- type PermittedOperationGroups
- type Producer
- type RangeNotSatisfiableError
- type ReadBatch
- type ReadOptions
- type ReadSession
- type ReadWritePermissions
- type ReconfigureBasinArgs
- type ReconfigureStreamArgs
- type RecordSubmitFuture
- type RecordSubmitTicket
- type ResourceSet
- type RetentionPolicy
- type RetryConfig
- type RevokeAccessTokenArgs
- type S2Error
- type ScalarMetric
- type SeqNumMismatchError
- type SequencedRecord
- type StorageClass
- type StreamClient
- func (s *StreamClient) Append(ctx context.Context, input *AppendInput) (*AppendAck, error)
- func (s *StreamClient) AppendSession(ctx context.Context, opts *AppendSessionOptions) (*AppendSession, error)
- func (s *StreamClient) CheckTail(ctx context.Context) (*TailResponse, error)
- func (s *StreamClient) Name() StreamName
- func (s *StreamClient) Read(ctx context.Context, opts *ReadOptions) (*ReadBatch, error)
- func (s *StreamClient) ReadSession(ctx context.Context, opts *ReadOptions) (*ReadSession, error)
- type StreamConfig
- type StreamInfo
- type StreamMetricSet
- type StreamMetricsArgs
- type StreamName
- type StreamPosition
- type StreamReconfiguration
- type StreamsClient
- func (s *StreamsClient) Create(ctx context.Context, args CreateStreamArgs) (*StreamInfo, error)
- func (s *StreamsClient) Delete(ctx context.Context, streamName StreamName) error
- func (s *StreamsClient) GetConfig(ctx context.Context, streamName StreamName) (*StreamConfig, error)
- func (s *StreamsClient) Iter(ctx context.Context, args *ListStreamsArgs) *StreamsIterator
- func (s *StreamsClient) List(ctx context.Context, args *ListStreamsArgs) (*ListStreamsResponse, error)
- func (s *StreamsClient) Reconfigure(ctx context.Context, args ReconfigureStreamArgs) (*StreamConfig, error)
- type StreamsIterator
- type SubmitFuture
- type TailResponse
- type TimeseriesInterval
- type TimestampingConfig
- type TimestampingMode
- type TimestampingReconfiguration
Examples ¶
- AccessTokensClient.Issue
- AccessTokensClient.Iter
- AccessTokensClient.List
- AccessTokensClient.Revoke
- BasinsClient.Create
- BasinsClient.Delete
- BasinsClient.GetConfig
- BasinsClient.Iter
- BasinsClient.List
- BasinsClient.Reconfigure
- BatchSubmitTicket.Ack
- MetricsClient.Account
- MetricsClient.Basin
- MetricsClient.Stream
- NewFenceCommandRecord
- NewTrimCommandRecord
- Producer
- StreamClient.Append
- StreamClient.AppendSession
- StreamClient.CheckTail
- StreamClient.Read
- StreamClient.ReadSession
- StreamsClient.Create
- StreamsClient.Delete
- StreamsClient.GetConfig
- StreamsClient.Iter
- StreamsClient.List
- StreamsClient.Reconfigure
Constants ¶
const ( // Maximum number of records allowed in a single append batch. MaxBatchRecords = 1000 // Maximum metered size in bytes for a single append batch (1 MiB). MaxBatchMeteredBytes = 1 * 1024 * 1024 // 1 MiB // Maximum fencing token length in bytes. MaxFencingTokenLength = 36 )
const ( OperationListBasins = "list-basins" OperationCreateBasin = "create-basin" OperationDeleteBasin = "delete-basin" OperationReconfigureBasin = "reconfigure-basin" OperationGetBasinConfig = "get-basin-config" OperationIssueAccessToken = "issue-access-token" OperationRevokeAccessToken = "revoke-access-token" OperationListAccessTokens = "list-access-tokens" OperationListStreams = "list-streams" OperationCreateStream = "create-stream" OperationDeleteStream = "delete-stream" OperationGetStreamConfig = "get-stream-config" OperationReconfigureStream = "reconfigure-stream" OperationCheckTail = "check-tail" OperationAppend = "append" OperationRead = "read" OperationTrim = "trim" OperationFence = "fence" OperationAccountMetrics = "account-metrics" OperationBasinMetrics = "basin-metrics" OperationStreamMetrics = "stream-metrics" )
const (
DefaultBaseURL = "https://aws.s2.dev/v1"
)
Variables ¶
var ( // ErrSessionClosed is returned when an operation is attempted on a closed session. ErrSessionClosed = errors.New("session closed") // ErrTimeout is returned when an operation times out. ErrTimeout = errors.New("operation timed out") // ErrMaxAttemptsExhausted is returned when all retry attempts have been exhausted. ErrMaxAttemptsExhausted = errors.New("max attempts exhausted") )
Sentinel errors for SDK-originated errors. Use errors.Is() to check for these conditions.
var DefaultRetryConfig = &RetryConfig{ MaxAttempts: defaultMaxAttempts, MinBaseDelay: defaultMinBaseDelay, MaxBaseDelay: defaultMaxBaseDelay, AppendRetryPolicy: AppendRetryPolicyAll, }
Default retry configuration. It retries up to 3 times with exponential backoff between 100ms and 1s.
Functions ¶
func MeteredBatchBytes ¶ added in v0.11.0
func MeteredBatchBytes(records []AppendRecord) int64
Calculates the total metered size in bytes of a batch of append records.
func MeteredPayloadBytes ¶ added in v0.11.0
func MeteredPayloadBytes(record AppendRecord) uint64
Calculates the metered size in bytes of an append record.
func MeteredSequencedRecordBytes ¶ added in v0.11.0
func MeteredSequencedRecordBytes(record SequencedRecord) uint64
Calculates the metered size in bytes of a sequenced record.
Types ¶
type AccessTokenInfo ¶ added in v0.8.0
type AccessTokenInfo struct {
// Access token ID.
// It must be unique to the account and between 1 and 96 bytes in length.
ID AccessTokenID `json:"id"`
// Access token scope.
Scope AccessTokenScope `json:"scope"`
// Namespace streams based on the configured stream-level scope, which must be a prefix.
// Stream name arguments will be automatically prefixed, and the prefix will be stripped when listing streams.
AutoPrefixStreams bool `json:"auto_prefix_streams,omitempty"`
// Expiration time.
// If not set, the expiration will be set to that of the requestor's token.
ExpiresAt *time.Time `json:"expires_at,omitempty"`
}
type AccessTokenIterator ¶ added in v0.11.0
type AccessTokenIterator struct {
// contains filtered or unexported fields
}
Iterator over access tokens returned by AccessTokensClient.Iter. Use Next to advance, Value to get the current item, and Err to check for errors.
func (*AccessTokenIterator) Err ¶ added in v0.11.0
func (it *AccessTokenIterator) Err() error
Returns any error encountered during iteration. Should be called after Next returns false to check if iteration stopped due to an error.
func (*AccessTokenIterator) Next ¶ added in v0.11.0
func (it *AccessTokenIterator) Next() bool
Advances the iterator to the next access token. Returns true if there is a next item, false when iteration is complete or an error occurred. Call Err after iteration to check for errors.
func (*AccessTokenIterator) Value ¶ added in v0.11.0
func (it *AccessTokenIterator) Value() AccessTokenInfo
Returns the current access token. Only valid after a successful call to Next.
type AccessTokenScope ¶ added in v0.8.0
type AccessTokenScope struct {
// Token IDs allowed.
AccessTokens *ResourceSet `json:"access_tokens,omitempty"`
// Basin names allowed.
Basins *ResourceSet `json:"basins,omitempty"`
// Access permissions at operation group level.
OpGroups *PermittedOperationGroups `json:"op_groups,omitempty"`
// Operations allowed for the token.
// A union of allowed operations and groups is used as an effective set of allowed operations.
Ops []string `json:"ops,omitempty"`
// Stream names allowed.
Streams *ResourceSet `json:"streams,omitempty"`
}
Access token scope.
type AccessTokensClient ¶ added in v0.11.0
type AccessTokensClient struct {
// contains filtered or unexported fields
}
func (*AccessTokensClient) Issue ¶ added in v0.11.0
func (a *AccessTokensClient) Issue(ctx context.Context, args IssueAccessTokenArgs) (*IssueAccessTokenResponse, error)
Issue an access token.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
ctx := context.Background()
resp, err := client.AccessTokens.Issue(ctx, s2.IssueAccessTokenArgs{
ID: "my-service-token",
Scope: s2.AccessTokenScope{
Basins: &s2.ResourceSet{Prefix: s2.Ptr("my-")},
OpGroups: &s2.PermittedOperationGroups{
Stream: &s2.ReadWritePermissions{Read: true, Write: true},
},
},
})
if err != nil {
log.Fatalf("issue token: %v", err)
}
fmt.Printf("issued token: %s\n", resp.AccessToken)
}
Output:
func (*AccessTokensClient) Iter ¶ added in v0.11.0
func (a *AccessTokensClient) Iter(ctx context.Context, args *ListAccessTokensArgs) *AccessTokenIterator
Iterate over access tokens.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
ctx := context.Background()
iter := client.AccessTokens.Iter(ctx, nil)
for iter.Next() {
token := iter.Value()
fmt.Printf("token: %s\n", token.ID)
}
if err := iter.Err(); err != nil {
log.Fatalf("list tokens: %v", err)
}
}
Output:
func (*AccessTokensClient) List ¶ added in v0.11.0
func (a *AccessTokensClient) List(ctx context.Context, args *ListAccessTokensArgs) (*ListAccessTokensResponse, error)
List access tokens.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
ctx := context.Background()
resp, err := client.AccessTokens.List(ctx, &s2.ListAccessTokensArgs{
Prefix: "my-",
Limit: s2.Ptr(10),
})
if err != nil {
log.Fatalf("list access tokens: %v", err)
}
for _, token := range resp.AccessTokens {
fmt.Printf("token: %s\n", token.ID)
}
fmt.Printf("has more: %v\n", resp.HasMore)
}
Output:
func (*AccessTokensClient) Revoke ¶ added in v0.11.0
func (a *AccessTokensClient) Revoke(ctx context.Context, args RevokeAccessTokenArgs) error
Revoke an access token.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
ctx := context.Background()
if err := client.AccessTokens.Revoke(ctx, s2.RevokeAccessTokenArgs{
ID: "my-service-token",
}); err != nil {
log.Fatalf("revoke token: %v", err)
}
fmt.Println("token revoked")
}
Output:
type AccountMetricSet ¶ added in v0.11.0
type AccountMetricSet string
const ( AccountMetricSetActiveBasins AccountMetricSet = "active-basins" AccountMetricSetAccountOps AccountMetricSet = "account-ops" )
type AccountMetricsArgs ¶ added in v0.11.0
type AccountMetricsArgs struct {
// Metric set to return.
Set AccountMetricSet `json:"set"`
// Start timestamp as Unix epoch seconds, if applicable for the metric set.
Start *int64 `json:"start,omitempty"`
// End timestamp as Unix epoch seconds, if applicable for the metric set.
End *int64 `json:"end,omitempty"`
// Interval to aggregate over for timeseries metric sets.
Interval *TimeseriesInterval `json:"interval,omitempty"`
}
type AccumulationMetric ¶ added in v0.11.0
type AccumulationMetric struct {
// The interval at which data points are accumulated.
Interval TimeseriesInterval `json:"interval"`
// Timeseries name.
Name string `json:"name"`
// Unit of the metric.
Unit MetricUnit `json:"unit"`
// Timeseries values.
// Each element is a tuple of a timestamp in Unix epoch seconds and a data point.
// The data point represents the accumulated value for the time period starting at the timestamp,
// spanning one interval.
Values []MetricSample `json:"values"`
}
type AppendAck ¶ added in v0.11.0
type AppendAck struct {
// Sequence number and timestamp of the first record that was appended.
Start StreamPosition `json:"start"`
// Sequence number of the last record that was appended `+ 1`, and timestamp of the last record that was appended.
// The difference between `end.seq_num` and `start.seq_num` will be the number of records appended.
End StreamPosition `json:"end"`
// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record on the stream.
// This can be greater than the `end` position in case of concurrent appends.
Tail StreamPosition `json:"tail"`
}
Success response to an `append` request.
type AppendInput ¶
type AppendInput struct {
// Batch of records to append atomically, which must contain at least one record, and no more than 1000.
// The total size of a batch of records may not exceed 1 MiB of metered bytes.
Records []AppendRecord `json:"records"`
// Enforce that the sequence number assigned to the first record matches.
MatchSeqNum *uint64 `json:"match_seq_num,omitempty"`
// Enforce a fencing token, which starts out as an empty string that can be overridden by a `fence` command record.
FencingToken *string `json:"fencing_token,omitempty"`
}
Payload of an `append` request.
type AppendRecord ¶
type AppendRecord struct {
// Timestamp for this record.
// The service will always ensure monotonicity by adjusting it up if necessary to the maximum observed timestamp.
// Refer to stream timestamping configuration for the finer semantics around whether a client-specified timestamp is required,
// and whether it will be capped at the arrival time.
Timestamp *uint64 `json:"timestamp,omitempty"`
// Series of name-value pairs for this record.
Headers []Header `json:"headers,omitempty"`
// Body of the record.
Body []byte `json:"body,omitempty"`
}
Record to be appended to a stream.
func NewFenceCommandRecord ¶ added in v0.11.0
func NewFenceCommandRecord(token string, timestamp *uint64) AppendRecord
Create a new Fence command record.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
stream := client.Basin("my-basin").Stream("my-stream")
ctx := context.Background()
fenceRecord := s2.NewFenceCommandRecord("my-writer-id", nil)
ack, err := stream.Append(ctx, &s2.AppendInput{
Records: []s2.AppendRecord{fenceRecord},
})
if err != nil {
log.Fatalf("fence: %v", err)
}
fmt.Printf("fence set at seq=%d\n", ack.Start.SeqNum)
}
Output:
func NewTrimCommandRecord ¶ added in v0.11.0
func NewTrimCommandRecord(seqNum uint64, timestamp *uint64) AppendRecord
Create a new Trim command record.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
stream := client.Basin("my-basin").Stream("my-stream")
ctx := context.Background()
trimRecord := s2.NewTrimCommandRecord(1000, nil)
ack, err := stream.Append(ctx, &s2.AppendInput{
Records: []s2.AppendRecord{trimRecord},
})
if err != nil {
log.Fatalf("trim: %v", err)
}
fmt.Printf("trim command at seq=%d\n", ack.Start.SeqNum)
}
Output:
type AppendRetryPolicy ¶ added in v0.11.0
type AppendRetryPolicy string
Policy for retrying append operations.
const ( // Retry all append operations, including those that may have side effects (default). AppendRetryPolicyAll AppendRetryPolicy = "all" // Only retry append operations that are guaranteed to have no side effects. AppendRetryPolicyNoSideEffects AppendRetryPolicy = "noSideEffects" )
type AppendSession ¶ added in v0.11.0
type AppendSession struct {
// contains filtered or unexported fields
}
AppendSession provides ordered, pipelined appends with automatic retries.
func (*AppendSession) Close ¶ added in v0.11.0
func (r *AppendSession) Close() error
Closes the session after all inflight batches have been acknowledged. New submissions after Close is called will be rejected.
func (*AppendSession) LastAckedPosition ¶ added in v0.11.0
func (r *AppendSession) LastAckedPosition() *AppendAck
Returns the last acknowledged position, if any.
func (*AppendSession) Submit ¶ added in v0.11.0
func (r *AppendSession) Submit(input *AppendInput) (*SubmitFuture, error)
Submit an append request. Returns a SubmitFuture that resolves to a submit ticket once the batch is enqueued (has capacity). Call ticket.Wait() to get a SubmitFuture for the AppendAck once the batch is durable. This method applies backpressure and will block if capacity limits are reached.
type AppendSessionOptions ¶ added in v0.11.0
type AppendSessionOptions struct {
// Aggregate size of records, to allow in-flight before applying backpressure (default: 5 MiB).
MaxInflightBytes uint64
// Maximum number of batches allowed in-flight before applying backpressure.
MaxInflightBatches uint32
// Retry configuration for handling transient failures.
// Applies to management operations (basins, streams, tokens) and stream operations (read, append).
RetryConfig *RetryConfig
}
type BasinClient ¶
type BasinClient struct {
Streams *StreamsClient
// contains filtered or unexported fields
}
Basin client.
func (*BasinClient) Stream ¶ added in v0.11.0
func (b *BasinClient) Stream(name StreamName) *StreamClient
type BasinConfig ¶
type BasinConfig struct {
// Create stream on append if it doesn't exist, using the default stream configuration.
// Defaults to false.
CreateStreamOnAppend *bool `json:"create_stream_on_append,omitempty"`
// Create stream on read if it doesn't exist, using the default stream configuration.
// Defaults to false.
CreateStreamOnRead *bool `json:"create_stream_on_read,omitempty"`
// Default stream configuration.
DefaultStreamConfig *StreamConfig `json:"default_stream_config,omitempty"`
}
type BasinMetricSet ¶ added in v0.11.0
type BasinMetricSet string
const ( BasinMetricSetStorage BasinMetricSet = "storage" BasinMetricSetAppendOps BasinMetricSet = "append-ops" BasinMetricSetReadOps BasinMetricSet = "read-ops" BasinMetricSetReadThroughput BasinMetricSet = "read-throughput" BasinMetricSetAppendThroughput BasinMetricSet = "append-throughput" BasinMetricSetBasinOps BasinMetricSet = "basin-ops" )
type BasinMetricsArgs ¶ added in v0.11.0
type BasinMetricsArgs struct {
// Basin name.
Basin string `json:"basin"`
// Metric set to return.
Set BasinMetricSet `json:"set"`
// Start timestamp as Unix epoch seconds, if applicable for the metric set.
Start *int64 `json:"start,omitempty"`
// End timestamp as Unix epoch seconds, if applicable for the metric set.
End *int64 `json:"end,omitempty"`
// Interval to aggregate over for timeseries metric sets.
Interval *TimeseriesInterval `json:"interval,omitempty"`
}
type BasinName ¶ added in v0.11.0
type BasinName string
Basin name which must be globally unique. It can be between 8 and 48 characters in length, and comprise lowercase letters, numbers and hyphens. It cannot begin or end with a hyphen.
type BasinReconfiguration ¶ added in v0.11.0
type BasinReconfiguration struct {
// Create a stream on append.
CreateStreamOnAppend *bool `json:"create_stream_on_append,omitempty"`
// Create a stream on read.
CreateStreamOnRead *bool `json:"create_stream_on_read,omitempty"`
// Basin configuration.
DefaultStreamConfig *StreamReconfiguration `json:"default_stream_config,omitempty"`
}
type BasinScope ¶ added in v0.7.0
type BasinScope string
Basin scope.
const (
BasinScopeAwsUsEast1 BasinScope = "aws:us-east-1"
)
type BasinsClient ¶ added in v0.11.0
type BasinsClient struct {
// contains filtered or unexported fields
}
func (*BasinsClient) Create ¶ added in v0.11.0
func (b *BasinsClient) Create(ctx context.Context, args CreateBasinArgs) (*BasinInfo, error)
Create a basin.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
ctx := context.Background()
info, err := client.Basins.Create(ctx, s2.CreateBasinArgs{
Basin: "my-new-basin",
Scope: s2.Ptr(s2.BasinScopeAwsUsEast1),
Config: &s2.BasinConfig{
DefaultStreamConfig: &s2.StreamConfig{
StorageClass: s2.Ptr(s2.StorageClassStandard),
},
},
})
if err != nil {
log.Fatalf("create basin: %v", err)
}
fmt.Printf("created basin: %s (created_at=%s)\n", info.Name, info.CreatedAt)
}
Output:
func (*BasinsClient) Delete ¶ added in v0.11.0
func (b *BasinsClient) Delete(ctx context.Context, basinName BasinName) error
Delete a basin.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
ctx := context.Background()
if err := client.Basins.Delete(ctx, "my-basin"); err != nil {
log.Fatalf("delete basin: %v", err)
}
fmt.Println("basin deleted")
}
Output:
func (*BasinsClient) GetConfig ¶ added in v0.11.0
func (b *BasinsClient) GetConfig(ctx context.Context, basinName BasinName) (*BasinConfig, error)
Get basin configuration.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
ctx := context.Background()
config, err := client.Basins.GetConfig(ctx, "my-basin")
if err != nil {
log.Fatalf("get basin config: %v", err)
}
fmt.Printf("basin config: %+v\n", config)
}
Output:
func (*BasinsClient) Iter ¶ added in v0.11.0
func (b *BasinsClient) Iter(ctx context.Context, args *ListBasinsArgs) *BasinsIterator
Iterate over basins. By default, basins that are being deleted are excluded. Set IncludeDeleted to true to include them.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
ctx := context.Background()
iter := client.Basins.Iter(ctx, &s2.ListBasinsArgs{
Prefix: "my-",
})
for iter.Next() {
basin := iter.Value()
fmt.Printf("basin: %s (scope=%s, created_at=%s)\n", basin.Name, basin.Scope, basin.CreatedAt)
}
if err := iter.Err(); err != nil {
log.Fatalf("list basins: %v", err)
}
}
Output:
func (*BasinsClient) List ¶ added in v0.11.0
func (b *BasinsClient) List(ctx context.Context, args *ListBasinsArgs) (*ListBasinsResponse, error)
List basins.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
ctx := context.Background()
resp, err := client.Basins.List(ctx, &s2.ListBasinsArgs{
Prefix: "my-",
Limit: s2.Ptr(10),
})
if err != nil {
log.Fatalf("list basins: %v", err)
}
for _, basin := range resp.Basins {
fmt.Printf("basin: %s (scope=%s, created_at=%s)\n", basin.Name, basin.Scope, basin.CreatedAt)
}
fmt.Printf("has more: %v\n", resp.HasMore)
}
Output:
func (*BasinsClient) Reconfigure ¶ added in v0.11.0
func (b *BasinsClient) Reconfigure(ctx context.Context, args ReconfigureBasinArgs) (*BasinConfig, error)
Reconfigure a basin.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
ctx := context.Background()
config, err := client.Basins.Reconfigure(ctx, s2.ReconfigureBasinArgs{
Basin: "my-basin",
Config: s2.BasinReconfiguration{
CreateStreamOnAppend: s2.Ptr(true),
},
})
if err != nil {
log.Fatalf("reconfigure basin: %v", err)
}
fmt.Printf("basin config: %+v\n", config)
}
Output:
type BasinsIterator ¶ added in v0.11.0
type BasinsIterator struct {
// contains filtered or unexported fields
}
Iterator over basins returned by BasinsClient.Iter. Use Next to advance, Value to get the current item, and Err to check for errors.
func (*BasinsIterator) Err ¶ added in v0.11.0
func (it *BasinsIterator) Err() error
Returns any error encountered during iteration. Should be called after Next returns false to check if iteration stopped due to an error.
func (*BasinsIterator) Next ¶ added in v0.11.0
func (it *BasinsIterator) Next() bool
Advances the iterator to the next basin. Returns true if there is a next item, false when iteration is complete or an error occurred. Call Err after iteration to check for errors.
func (*BasinsIterator) Value ¶ added in v0.11.0
func (it *BasinsIterator) Value() BasinInfo
Returns the current basin. Only valid after a successful call to Next.
type BatchOutput ¶ added in v0.11.0
type BatchOutput struct {
// Input contains the records to append.
Input *AppendInput
// contains filtered or unexported fields
}
BatchOutput represents a flushed batch ready for appending.
type BatchSubmitTicket ¶ added in v0.11.0
type BatchSubmitTicket struct {
// contains filtered or unexported fields
}
Resolves with the AppendAck once the batch is durable.
func (*BatchSubmitTicket) Ack ¶ added in v0.11.0
func (t *BatchSubmitTicket) Ack(ctx context.Context) (*AppendAck, error)
Resolves BatchSubmitTicket with the AppendAck once the batch is durable.
Example ¶
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
stream := client.Basin("my-basin").Stream("my-stream")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
session, err := stream.AppendSession(ctx, nil)
if err != nil {
log.Fatalf("open append session: %v", err)
}
defer session.Close()
fut, err := session.Submit(&s2.AppendInput{
Records: []s2.AppendRecord{
{Body: []byte("first")},
{Body: []byte("second")},
},
})
if err != nil {
log.Fatalf("submit append: %v", err)
}
ticket, err := fut.Wait(ctx)
if err != nil {
log.Fatalf("enqueue failed: %v", err)
}
ack, err := ticket.Ack(ctx)
if err != nil {
log.Fatalf("append failed: %v", err)
}
fmt.Printf("acknowledged records %d-%d\n", ack.Start.SeqNum, ack.End.SeqNum-1)
}
Output:
type Batcher ¶ added in v0.11.0
type Batcher struct {
// contains filtered or unexported fields
}
Accumulates AppendRecords and emits them as batches based on configurable size, count, or time thresholds.
func NewBatcher ¶ added in v0.11.0
func NewBatcher(ctx context.Context, opts *BatchingOptions) *Batcher
Create a new Batcher that accumulates AppendRecords and emits them as batches based on configurable size, count, or time thresholds.
func (*Batcher) Add ¶ added in v0.11.0
func (b *Batcher) Add(record AppendRecord, resultCh chan *producerOutcome) error
Adds a record to the current batch.
func (*Batcher) Batches ¶ added in v0.11.0
func (b *Batcher) Batches() <-chan *BatchOutput
Batches returns a receive-only channel of flushed batches.
type BatchingOptions ¶ added in v0.11.0
type BatchingOptions struct {
// Duration to wait before flushing a batch (default: 5ms)
Linger time.Duration
// Maximum number of records in a batch (default: 1000, max: 1000)
MaxRecords int
// Maximum batch size in metered bytes (default: 1 MiB, max: 1 MiB)
MaxMeteredBytes uint64
// Optional sequence number to match for first batch (auto-increments for subsequent batches)
MatchSeqNum *uint64
// Optional fencing token to enforce (remains static across batches)
FencingToken *string
// Buffer size for the internal batches channel (default: 16)
ChannelBuffer int
}
type Client ¶
type Client struct {
// Client for access tokens.
AccessTokens *AccessTokensClient
// Client for basins.
Basins *BasinsClient
// Client for metrics.
Metrics *MetricsClient
// contains filtered or unexported fields
}
func New ¶ added in v0.11.0
func New(accessToken string, opts *ClientOptions) *Client
Create a new Client.
func NewFromEnvironment ¶ added in v0.11.0
func NewFromEnvironment(opts *ClientOptions) *Client
Create a client using configuration from environment variables. Environment variables: S2_ACCESS_TOKEN, S2_ACCOUNT_ENDPOINT, S2_BASIN_ENDPOINT. ClientOptions fields override environment variables. Panics if S2_ACCESS_TOKEN is not set.
func (*Client) Basin ¶ added in v0.11.0
func (c *Client) Basin(name string) *BasinClient
Create a new BasinClient.
type ClientOptions ¶ added in v0.11.0
type ClientOptions struct {
// Endpoint to connect to S2.
// Defaults to "https://aws.s2.dev/v1".
BaseURL string
// HTTP client used for requests.
HTTPClient *http.Client
// Allows customizing how basin endpoints are constructed.
// If provided, this function is used to derive the endpoint for a given basin.
// When provided, the "s2-basin" HTTP header is automatically included in basin-scoped requests.
MakeBasinBaseURL func(basin string) string
// Retry configuration.
RetryConfig *RetryConfig
// Get SDK level logs.
Logger *slog.Logger
// Overall timeout for HTTP requests.
// Defaults to 5 seconds.
RequestTimeout time.Duration
// Timeout for establishing TCP connections.
// Defaults to 3 seconds.
ConnectionTimeout time.Duration
// Compression algorithm for request bodies.
// Defaults to CompressionNone (no compression).
Compression CompressionType
}
func LoadConfigFromEnv ¶ added in v0.11.8
func LoadConfigFromEnv() *ClientOptions
Returns ClientOptions with endpoints from S2_ACCOUNT_ENDPOINT and S2_BASIN_ENDPOINT.
type CompressionType ¶ added in v0.11.0
type CompressionType = internalframing.CompressionType
const ( CompressionNone CompressionType = internalframing.CompressionNone CompressionZstd CompressionType = internalframing.CompressionZstd CompressionGzip CompressionType = internalframing.CompressionGzip )
type Config ¶ added in v0.11.0
type Config struct {
AccessToken string
AccountTemplate *endpointTemplate
BasinTemplate *endpointTemplate
}
type CreateBasinArgs ¶ added in v0.11.0
type CreateBasinArgs struct {
// Basin name which must be globally unique.
// It can be between 8 and 48 characters in length, and comprise lowercase letters, numbers and hyphens.
// It cannot begin or end with a hyphen.
Basin BasinName `json:"basin"`
// Basin configuration.
Config *BasinConfig `json:"config,omitempty"`
// Basin scope.
Scope *BasinScope `json:"scope,omitempty"`
}
type CreateStreamArgs ¶ added in v0.11.0
type CreateStreamArgs struct {
// Stream name.
Stream StreamName `json:"stream"`
// Stream configuration.
Config *StreamConfig `json:"config,omitempty"`
}
type DeleteOnEmptyConfig ¶ added in v0.11.0
type DeleteOnEmptyConfig struct {
// Minimum age in seconds before an empty stream can be deleted.
// Set to 0 (default) to disable delete-on-empty (don't delete automatically).
MinAgeSecs *int64 `json:"min_age_secs,omitempty"`
}
type DeleteOnEmptyReconfiguration ¶ added in v0.11.0
type DeleteOnEmptyReconfiguration struct {
// Minimum age in seconds before an empty stream can be deleted.
// Set to 0 to disable delete-on-empty (don't delete automatically).
MinAgeSecs *int64 `json:"min_age_secs,omitempty"`
}
type FencingTokenMismatchError ¶ added in v0.11.0
func (*FencingTokenMismatchError) Unwrap ¶ added in v0.11.0
func (e *FencingTokenMismatchError) Unwrap() error
type GaugeMetric ¶ added in v0.11.0
type GaugeMetric struct {
// Timeseries name.
Name string `json:"name"`
// Unit of the metric.
Unit MetricUnit `json:"unit"`
// Timeseries values.
// Each element is a tuple of a timestamp in Unix epoch seconds and a data point.
// The data point represents the value at the instant of the timestamp.
Values []MetricSample `json:"values"`
}
Named series of `(timestamp, value)` points each representing an instantaneous value.
type Header ¶
type Header struct {
// Header name.
// The name cannot be empty, with the exception of an S2 command record.
Name []byte
// Header value.
Value []byte
}
Header adds structured information to a record as a name-value pair.
type IndexedAppendAck ¶ added in v0.11.0
type IndexedAppendAck struct {
// contains filtered or unexported fields
}
Represents the acknowledgment for a single record within a batch.
func (*IndexedAppendAck) BatchAppendAck ¶ added in v0.11.0
func (a *IndexedAppendAck) BatchAppendAck() *AppendAck
Returns the underlying batch AppendAck.
func (*IndexedAppendAck) SeqNum ¶ added in v0.11.0
func (a *IndexedAppendAck) SeqNum() uint64
Returns the sequence number assigned to this specific record.
type InfiniteRetention ¶ added in v0.11.0
type InfiniteRetention struct{}
Retain records unless explicitly trimmed.
type IssueAccessTokenArgs ¶ added in v0.11.0
type IssueAccessTokenArgs struct {
// Access token ID.
// It must be unique to the account and between 1 and 96 bytes in length.
ID AccessTokenID `json:"id"`
// Access token scope.
Scope AccessTokenScope `json:"scope"`
// Namespace streams based on the configured stream-level scope, which must be a prefix.
// Stream name arguments will be automatically prefixed, and the prefix will be stripped when listing streams.
AutoPrefixStreams bool `json:"auto_prefix_streams,omitempty"`
// Expiration time. If not set, the expiration will be set to that of the requestor's token.
ExpiresAt *time.Time `json:"expires_at,omitempty"`
}
type IssueAccessTokenResponse ¶ added in v0.11.0
type IssueAccessTokenResponse struct {
// Created access token.
AccessToken string `json:"access_token"`
}
type LabelMetric ¶ added in v0.11.0
type LabelMetric struct {
// Label name.
Name string `json:"name"`
// Label values.
Values []string `json:"values"`
}
Set of string labels.
type ListAccessTokensArgs ¶ added in v0.11.0
type ListAccessTokensArgs struct {
// Filter to access tokens whose ID begins with this prefix.
Prefix string `json:"prefix,omitempty"`
// Filter to access tokens whose ID lexicographically starts after this string.
StartAfter string `json:"start_after,omitempty"`
// Number of results, up to a maximum of 1000.
Limit *int `json:"limit,omitempty"`
}
type ListAccessTokensResponse ¶ added in v0.8.0
type ListAccessTokensResponse struct {
// Matching access tokens.
AccessTokens []AccessTokenInfo `json:"access_tokens"`
// Indicates that there are more access tokens that match the criteria.
HasMore bool `json:"has_more"`
}
type ListBasinsArgs ¶ added in v0.11.0
type ListBasinsArgs struct {
// Filter to basins whose names begin with this prefix.
Prefix string `json:"prefix,omitempty"`
// Filter to basins whose names lexicographically start after this string.
// It must be greater than or equal to the `prefix` if specified.
StartAfter string `json:"start_after,omitempty"`
// Number of results, up to a maximum of 1000.
Limit *int `json:"limit,omitempty"`
// Include basins that are being deleted (default: false).
// Only applies to the Iter method.
IncludeDeleted bool `json:"-"`
}
type ListBasinsResponse ¶
type ListStreamsArgs ¶ added in v0.11.0
type ListStreamsArgs struct {
// Filter to streams whose name begins with this prefix.
Prefix string `json:"prefix,omitempty"`
// Filter to streams whose name begins with this prefix.
// It must be greater than or equal to the `prefix` if specified.
StartAfter string `json:"start_after,omitempty"`
// Number of results, up to a maximum of 1000.
Limit *int `json:"limit,omitempty"`
// Include streams that are being deleted (default: false).
// Only applies to the Iter method.
IncludeDeleted bool `json:"-"`
}
type ListStreamsResponse ¶
type ListStreamsResponse struct {
// Matching streams.
Streams []StreamInfo `json:"streams"`
// Indicates that there are more results that match the criteria.
HasMore bool `json:"has_more"`
}
type Metric ¶ added in v0.11.0
type Metric struct {
// Single named value.
Scalar *ScalarMetric `json:"scalar,omitempty"`
// Named series of `(timestamp, value)` points representing an accumulation over a specified bucket.
Accumulation *AccumulationMetric `json:"accumulation,omitempty"`
// Named series of `(timestamp, value)` points each representing an instantaneous value.
Gauge *GaugeMetric `json:"gauge,omitempty"`
// Set of string labels.
Label *LabelMetric `json:"label,omitempty"`
}
type MetricSample ¶ added in v0.11.0
type MetricSample [2]float64
type MetricSetResponse ¶ added in v0.11.0
type MetricSetResponse struct {
// Metrics comprising the set.
Values []Metric `json:"values"`
}
type MetricUnit ¶ added in v0.11.0
type MetricUnit string
const ( MetricUnitBytes MetricUnit = "bytes" MetricUnitOperations MetricUnit = "operations" )
type MetricsClient ¶ added in v0.11.0
type MetricsClient struct {
// contains filtered or unexported fields
}
Metrics client.
func (*MetricsClient) Account ¶ added in v0.11.0
func (m *MetricsClient) Account(ctx context.Context, args *AccountMetricsArgs) (*MetricSetResponse, error)
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
metrics, err := client.Metrics.Account(context.Background(), &s2.AccountMetricsArgs{
Set: s2.AccountMetricSetActiveBasins,
})
if err != nil {
log.Fatalf("account metrics: %v", err)
}
for _, m := range metrics.Values {
if m.Scalar != nil {
fmt.Printf("%s: %.0f %s\n", m.Scalar.Name, m.Scalar.Value, m.Scalar.Unit)
}
}
}
Output:
func (*MetricsClient) Basin ¶ added in v0.11.0
func (m *MetricsClient) Basin(ctx context.Context, args *BasinMetricsArgs) (*MetricSetResponse, error)
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
metrics, err := client.Metrics.Basin(context.Background(), &s2.BasinMetricsArgs{
Basin: "my-basin",
Set: s2.BasinMetricSetStorage,
})
if err != nil {
log.Fatalf("basin metrics: %v", err)
}
for _, m := range metrics.Values {
if m.Scalar != nil {
fmt.Printf("%s: %.0f %s\n", m.Scalar.Name, m.Scalar.Value, m.Scalar.Unit)
}
}
}
Output:
func (*MetricsClient) Stream ¶ added in v0.11.0
func (m *MetricsClient) Stream(ctx context.Context, args *StreamMetricsArgs) (*MetricSetResponse, error)
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
metrics, err := client.Metrics.Stream(context.Background(), &s2.StreamMetricsArgs{
Basin: "my-basin",
Stream: "my-stream",
Set: s2.StreamMetricSetStorage,
})
if err != nil {
log.Fatalf("stream metrics: %v", err)
}
for _, m := range metrics.Values {
if m.Scalar != nil {
fmt.Printf("%s: %.0f %s\n", m.Scalar.Name, m.Scalar.Value, m.Scalar.Unit)
}
}
}
Output:
type PermittedOperationGroups ¶ added in v0.8.0
type PermittedOperationGroups struct {
// Account-level access permissions.
Account *ReadWritePermissions `json:"account,omitempty"`
// Basin-level access permissions.
Basin *ReadWritePermissions `json:"basin,omitempty"`
// Stream-level access permissions.
Stream *ReadWritePermissions `json:"stream,omitempty"`
}
Access permissions at operation group level.
type Producer ¶ added in v0.11.0
type Producer struct {
// contains filtered or unexported fields
}
Producer provides per-record append semantics on top of a batched AppendSession.
- submit(record) returns a RecordSubmitFuture that resolves once the record has been accepted (written to the batcher). Backpressure is applied automatically via the batcher when the AppendSession is at capacity.
- ticket.Ack() returns an IndexedAppendAck that resolves once the record is durable.
Example ¶
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
stream := client.Basin("my-basin").Stream("my-stream")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
session, err := stream.AppendSession(ctx, nil)
if err != nil {
log.Fatalf("open append session: %v", err)
}
defer session.Close()
batcher := s2.NewBatcher(ctx, &s2.BatchingOptions{
Linger: 100 * time.Millisecond,
MaxRecords: 500,
})
producer := s2.NewProducer(ctx, batcher, session)
defer producer.Close()
for i := range 100 {
fut, err := producer.Submit(s2.AppendRecord{
Body: []byte(fmt.Sprintf("record %d", i)),
})
if err != nil {
log.Fatalf("producer submit: %v", err)
}
go func(f *s2.RecordSubmitFuture) {
ticket, err := f.Wait(ctx)
if err != nil {
log.Printf("enqueue error: %v", err)
return
}
ack, err := ticket.Ack(ctx)
if err != nil {
log.Printf("ack error: %v", err)
return
}
fmt.Printf("ack: seqNum=%d\n", ack.SeqNum())
}(fut)
}
time.Sleep(time.Second)
}
Output:
func NewProducer ¶ added in v0.11.0
func NewProducer(ctx context.Context, batcher *Batcher, session *AppendSession) *Producer
Create a new Producer.
func (*Producer) Close ¶ added in v0.11.0
Stops the producer, flushes pending batches, and waits for in-flight acks. Close blocks until in-flight work completes or the producer context is canceled.
func (*Producer) Submit ¶ added in v0.11.0
func (p *Producer) Submit(record AppendRecord) (*RecordSubmitFuture, error)
Submit a single record for appending. Returns a RecordSubmitFuture that resolves to a RecordSubmitTicket once the record has been accepted. Blocks if the underlying AppendSession is at capacity.
type RangeNotSatisfiableError ¶ added in v0.11.0
type RangeNotSatisfiableError struct {
*S2Error
// The current tail position of the stream, if available.
Tail *StreamPosition
}
func (*RangeNotSatisfiableError) Unwrap ¶ added in v0.11.0
func (e *RangeNotSatisfiableError) Unwrap() error
type ReadBatch ¶ added in v0.11.0
type ReadBatch struct {
// Records that are durably sequenced on the stream, retrieved based on the requested criteria.
// This can only be empty in response to a unary read (i.e. not SSE),
// if the request cannot be satisfied without violating an explicit bound (`count`, `bytes`, or `until`).
Records []SequencedRecord `json:"records"`
// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record.
// This will only be present when reading recent records.
Tail *StreamPosition `json:"tail,omitempty"`
}
type ReadOptions ¶ added in v0.11.0
type ReadOptions struct {
// Start from a sequence number.
SeqNum *uint64 `json:"seq_num,omitempty"`
// Start from a timestamp.
Timestamp *uint64 `json:"timestamp,omitempty"`
// Start from number of records before the next sequence number.
TailOffset *int64 `json:"tail_offset,omitempty"`
// Record count limit.
// Non-streaming reads are capped by the default limit of 1000 records.
Count *uint64 `json:"count,omitempty"`
// Metered bytes limit.
// Non-streaming reads are capped by the default limit of 1 MiB.
Bytes *uint64 `json:"bytes,omitempty"`
// Duration in seconds to wait for new records.
// The default duration is 0 if there is a bound on `count`, `bytes`, or `until`, and otherwise infinite.
// Non-streaming reads are always bounded on `count` and `bytes`, so you can achieve long poll semantics by specifying a non-zero duration up to 60 seconds.
// In the context of an SSE or S2S streaming read, the duration will bound how much time can elapse between records throughout the lifetime of the session.
Wait *int32 `json:"wait,omitempty"`
// Exclusive timestamp to read until.
Until *uint64 `json:"until,omitempty"`
// Start reading from the tail if the requested position is beyond it.
// Otherwise, a `416 Range Not Satisfiable` response is returned.
Clamp *bool `json:"clamp,omitempty"`
// Whether to filter out command records from read results.
// Filtering is performed client-side.
// Defaults to false.
IgnoreCommandRecords bool `json:"-"`
}
type ReadSession ¶ added in v0.11.0
type ReadSession struct {
// contains filtered or unexported fields
}
func (*ReadSession) Err ¶ added in v0.11.0
func (s *ReadSession) Err() error
Reports the terminal error (if any) that caused Next to stop.
func (*ReadSession) LastObservedTail ¶ added in v0.11.0
func (s *ReadSession) LastObservedTail() *StreamPosition
Returns the last observed tail position, if known.
func (*ReadSession) Next ¶ added in v0.11.0
func (s *ReadSession) Next() bool
Blocks until a record is available, the context is done, or the session encounters a fatal error. When it returns true, Record() yields the fetched record. When it returns false, call Err().
func (*ReadSession) NextReadPosition ¶ added in v0.11.0
func (s *ReadSession) NextReadPosition() *StreamPosition
Returns the next read position, if known.
func (*ReadSession) Record ¶ added in v0.11.0
func (s *ReadSession) Record() SequencedRecord
Returns the most recent record fetched by Next.
type ReadWritePermissions ¶ added in v0.8.0
type ReconfigureBasinArgs ¶ added in v0.11.0
type ReconfigureBasinArgs struct {
// Basin name.
Basin BasinName
// Basin reconfiguration.
Config BasinReconfiguration
}
type ReconfigureStreamArgs ¶ added in v0.11.0
type ReconfigureStreamArgs struct {
// Stream name.
Stream StreamName
// Stream reconfiguration.
Config StreamReconfiguration
}
type RecordSubmitFuture ¶ added in v0.11.0
type RecordSubmitFuture struct {
// contains filtered or unexported fields
}
Represents a pending single-record submission to a Producer.
func (*RecordSubmitFuture) Wait ¶ added in v0.11.0
func (f *RecordSubmitFuture) Wait(ctx context.Context) (*RecordSubmitTicket, error)
Blocks until the record is accepted and returns a RecordSubmitTicket.
type RecordSubmitTicket ¶ added in v0.11.0
type RecordSubmitTicket struct {
// contains filtered or unexported fields
}
Returned after a record is accepted by the Producer. Use RecordSubmitTicket.Ack to wait for the record to become durable.
func (*RecordSubmitTicket) Ack ¶ added in v0.11.0
func (t *RecordSubmitTicket) Ack(ctx context.Context) (*IndexedAppendAck, error)
Blocks until the record is durable and returns the IndexedAppendAck.
type ResourceSet ¶ added in v0.8.0
type ResourceSet struct {
// Match only the resource with this exact name.
// Use an empty string to match no resources.
Exact *string `json:"exact,omitempty"`
// Match all resources that start with this prefix.
// Use an empty string to match all resource.
Prefix *string `json:"prefix,omitempty"`
}
type RetentionPolicy ¶ added in v0.8.0
type RetentionPolicy struct {
// Age limits records to a specific age window (seconds).
Age *int64 `json:"age,omitempty"`
// Retain records unless explicitly trimmed.
Infinite *InfiniteRetention `json:"infinite,omitempty"`
}
type RetryConfig ¶ added in v0.11.0
type RetryConfig struct {
// Total number of attempts, including the initial try. Must be >= 1. A value of 1 means no retries.
MaxAttempts int
// Minimum base delay for exponential backoff. Defaults to 100ms.
MinBaseDelay time.Duration
// Maximum base delay for exponential backoff. Defaults to 1s.
// Note: actual delay with jitter can be up to 2x this value.
MaxBaseDelay time.Duration
// Policy for retrying append operations.
AppendRetryPolicy AppendRetryPolicy
}
Retry configuration.
type RevokeAccessTokenArgs ¶ added in v0.11.0
type RevokeAccessTokenArgs struct {
// Access token ID.
ID AccessTokenID `json:"id"`
}
type S2Error ¶ added in v0.11.0
type S2Error struct {
Message string
Code string
Status int
Origin string // "server", "sdk", "network"
}
func (*S2Error) HasNoSideEffects ¶ added in v0.13.3
HasNoSideEffects reports whether this error guarantees no mutation occurred.
func (*S2Error) IsNetworkError ¶ added in v0.11.0
func (*S2Error) IsRetryable ¶ added in v0.11.0
type ScalarMetric ¶ added in v0.11.0
type ScalarMetric struct {
// Metric name.
Name string `json:"name"`
// Unit of the metric.
Unit MetricUnit `json:"unit"`
// Metric value.
Value float64 `json:"value"`
}
type SeqNumMismatchError ¶ added in v0.11.0
func (*SeqNumMismatchError) Unwrap ¶ added in v0.11.0
func (e *SeqNumMismatchError) Unwrap() error
type SequencedRecord ¶
type SequencedRecord struct {
// Body of the record.
Body []byte
// Series of name-value pairs for this record.
Headers []Header
// Sequence number assigned by the service.
SeqNum uint64
// Timestamp for this record.
Timestamp uint64
}
func (SequencedRecord) IsCommandRecord ¶ added in v0.13.0
func (r SequencedRecord) IsCommandRecord() bool
Reports whether this record is a command record. Command records have exactly one header with an empty name.
type StorageClass ¶
type StorageClass string
const ( StorageClassStandard StorageClass = "standard" StorageClassExpress StorageClass = "express" )
type StreamClient ¶
type StreamClient struct {
// contains filtered or unexported fields
}
func (*StreamClient) Append ¶
func (s *StreamClient) Append(ctx context.Context, input *AppendInput) (*AppendAck, error)
Appends one or more records to the stream. All records in a single append call must use the same format (either all string or all bytes). For high-throughput sequential appends, use AppendSession instead.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
stream := client.Basin("my-basin").Stream("my-stream")
ctx := context.Background()
ack, err := stream.Append(ctx, &s2.AppendInput{
Records: []s2.AppendRecord{
{Body: []byte("hello world")},
},
})
if err != nil {
log.Fatalf("append: %v", err)
}
fmt.Printf("appended at seq=%d\n", ack.Start.SeqNum)
}
Output:
func (*StreamClient) AppendSession ¶
func (s *StreamClient) AppendSession(ctx context.Context, opts *AppendSessionOptions) (*AppendSession, error)
Creates an append session that guarantees ordering of submissions. Use this to coordinate high-throughput, sequential appends with backpressure.
Example ¶
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
stream := client.Basin("my-basin").Stream("my-stream")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
session, err := stream.AppendSession(ctx, nil)
if err != nil {
log.Fatalf("open append session: %v", err)
}
defer session.Close()
fut, err := session.Submit(&s2.AppendInput{
Records: []s2.AppendRecord{
{Body: []byte("hello")},
{Body: []byte("world")},
},
})
if err != nil {
log.Fatalf("submit: %v", err)
}
ticket, err := fut.Wait(ctx)
if err != nil {
log.Fatalf("enqueue: %v", err)
}
ack, err := ticket.Ack(ctx)
if err != nil {
log.Fatalf("get ack: %v", err)
}
fmt.Printf("appended records %d to %d\n", ack.Start.SeqNum, ack.End.SeqNum)
}
Output:
func (*StreamClient) CheckTail ¶
func (s *StreamClient) CheckTail(ctx context.Context) (*TailResponse, error)
Check the tail of the stream. Returns the next sequence number and timestamp to be assigned (tail).
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
stream := client.Basin("my-basin").Stream("my-stream")
ctx := context.Background()
tail, err := stream.CheckTail(ctx)
if err != nil {
log.Fatalf("check tail: %v", err)
}
fmt.Printf("tail: seq=%d ts=%d\n", tail.Tail.SeqNum, tail.Tail.Timestamp)
}
Output:
func (*StreamClient) Name ¶ added in v0.11.0
func (s *StreamClient) Name() StreamName
func (*StreamClient) Read ¶
func (s *StreamClient) Read(ctx context.Context, opts *ReadOptions) (*ReadBatch, error)
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
stream := client.Basin("my-basin").Stream("my-stream")
ctx := context.Background()
batch, err := stream.Read(ctx, &s2.ReadOptions{
SeqNum: s2.Uint64(0),
Count: s2.Uint64(100),
})
if err != nil {
log.Fatalf("read: %v", err)
}
for _, rec := range batch.Records {
fmt.Printf("seq=%d body=%q\n", rec.SeqNum, string(rec.Body))
}
}
Output:
func (*StreamClient) ReadSession ¶
func (s *StreamClient) ReadSession(ctx context.Context, opts *ReadOptions) (*ReadSession, error)
Opens a streaming read session. Call Close when done consuming records.
Example ¶
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
stream := client.Basin("my-basin").Stream("my-stream")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
session, err := stream.ReadSession(ctx, &s2.ReadOptions{
SeqNum: s2.Uint64(0), // Start from beginning
Wait: s2.Int32(10), // Wait up to 10s for new records
})
if err != nil {
log.Fatalf("open read session: %v", err)
}
defer session.Close()
for session.Next() {
rec := session.Record()
fmt.Printf("seq=%d body=%q\n", rec.SeqNum, string(rec.Body))
}
if err := session.Err(); err != nil {
log.Fatalf("read session error: %v", err)
}
if next := session.NextReadPosition(); next != nil {
fmt.Printf("resume from seq=%d\n", next.SeqNum)
}
}
Output:
type StreamConfig ¶
type StreamConfig struct {
// Delete-on-empty configuration.
DeleteOnEmpty *DeleteOnEmptyConfig `json:"delete_on_empty,omitempty"`
// Retention policy for the stream.
// If unspecified, the default is to retain records for 7 days.
RetentionPolicy *RetentionPolicy `json:"retention_policy,omitempty"`
// Storage class for recent writes.
StorageClass *StorageClass `json:"storage_class,omitempty"`
// Timestamping behavior.
Timestamping *TimestampingConfig `json:"timestamping,omitempty"`
}
type StreamInfo ¶
type StreamInfo struct {
Name StreamName `json:"name"`
// Creation time.
CreatedAt time.Time `json:"created_at"`
// Deletion time, if the stream is being deleted.
DeletedAt *time.Time `json:"deleted_at,omitempty"`
}
type StreamMetricSet ¶ added in v0.11.0
type StreamMetricSet string
const (
StreamMetricSetStorage StreamMetricSet = "storage"
)
type StreamMetricsArgs ¶ added in v0.11.0
type StreamMetricsArgs struct {
// Basin name.
Basin string `json:"basin"`
// Stream name.
Stream string `json:"stream"`
// Metric set to return.
Set StreamMetricSet `json:"set"`
// Start timestamp as Unix epoch seconds, if applicable for the metric set.
Start *int64 `json:"start,omitempty"`
// End timestamp as Unix epoch seconds, if applicable for metric set.
End *int64 `json:"end,omitempty"`
// Interval to aggregate over for timeseries metric sets.
Interval *TimeseriesInterval `json:"interval,omitempty"`
}
type StreamName ¶ added in v0.11.0
type StreamName string
Stream name that is unique to the basin. It can be between 1 and 512 bytes in length.
type StreamPosition ¶ added in v0.11.0
type StreamPosition struct {
// Sequence number assigned by the service.
SeqNum uint64 `json:"seq_num"`
// Timestamp, which may be client-specified or assigned by the service.
// If it is assigned by the service, it will represent milliseconds since Unix epoch.
Timestamp uint64 `json:"timestamp"`
}
Position of a record in a stream.
type StreamReconfiguration ¶ added in v0.11.0
type StreamReconfiguration struct {
// Delete-on-empty configuration.
DeleteOnEmpty *DeleteOnEmptyReconfiguration `json:"delete_on_empty,omitempty"`
// Retention policy for the stream.
// If unspecified, the default is to retain records for 7 days.
RetentionPolicy *RetentionPolicy `json:"retention_policy,omitempty"`
// Storage class for recent writes.
StorageClass *StorageClass `json:"storage_class,omitempty"`
// Timestamping behavior.
Timestamping *TimestampingReconfiguration `json:"timestamping,omitempty"`
}
type StreamsClient ¶ added in v0.11.0
type StreamsClient struct {
// contains filtered or unexported fields
}
func (*StreamsClient) Create ¶ added in v0.11.0
func (s *StreamsClient) Create(ctx context.Context, args CreateStreamArgs) (*StreamInfo, error)
Create a stream.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
basin := client.Basin("my-basin")
ctx := context.Background()
info, err := basin.Streams.Create(ctx, s2.CreateStreamArgs{
Stream: "my-stream",
Config: &s2.StreamConfig{
StorageClass: s2.Ptr(s2.StorageClassExpress),
RetentionPolicy: &s2.RetentionPolicy{
Age: s2.Ptr(int64(86400 * 7)), // 7 days
},
},
})
if err != nil {
log.Fatalf("create stream: %v", err)
}
fmt.Printf("created stream: %s\n", info.Name)
}
Output:
func (*StreamsClient) Delete ¶ added in v0.11.0
func (s *StreamsClient) Delete(ctx context.Context, streamName StreamName) error
Delete a stream.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
basin := client.Basin("my-basin")
ctx := context.Background()
if err := basin.Streams.Delete(ctx, "my-stream"); err != nil {
log.Fatalf("delete stream: %v", err)
}
fmt.Println("stream deleted")
}
Output:
func (*StreamsClient) GetConfig ¶ added in v0.11.0
func (s *StreamsClient) GetConfig(ctx context.Context, streamName StreamName) (*StreamConfig, error)
Get stream configuration.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
basin := client.Basin("my-basin")
ctx := context.Background()
config, err := basin.Streams.GetConfig(ctx, "my-stream")
if err != nil {
log.Fatalf("get stream config: %v", err)
}
fmt.Printf("stream config: %+v\n", config)
}
Output:
func (*StreamsClient) Iter ¶ added in v0.11.0
func (s *StreamsClient) Iter(ctx context.Context, args *ListStreamsArgs) *StreamsIterator
Iterate over streams in a basin. By default, streams that are being deleted are excluded. Set IncludeDeleted to true to include them.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
basin := client.Basin("my-basin")
ctx := context.Background()
iter := basin.Streams.Iter(ctx, &s2.ListStreamsArgs{
Prefix: "events-",
})
for iter.Next() {
stream := iter.Value()
fmt.Printf("stream: %s (created=%s)\n", stream.Name, stream.CreatedAt)
}
if err := iter.Err(); err != nil {
log.Fatalf("list streams: %v", err)
}
}
Output:
func (*StreamsClient) List ¶ added in v0.11.0
func (s *StreamsClient) List(ctx context.Context, args *ListStreamsArgs) (*ListStreamsResponse, error)
List streams.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
basin := client.Basin("my-basin")
ctx := context.Background()
resp, err := basin.Streams.List(ctx, &s2.ListStreamsArgs{
Prefix: "events-",
Limit: s2.Ptr(10),
})
if err != nil {
log.Fatalf("list streams: %v", err)
}
for _, stream := range resp.Streams {
fmt.Printf("stream: %s (created=%s)\n", stream.Name, stream.CreatedAt)
}
fmt.Printf("has more: %v\n", resp.HasMore)
}
Output:
func (*StreamsClient) Reconfigure ¶ added in v0.11.0
func (s *StreamsClient) Reconfigure(ctx context.Context, args ReconfigureStreamArgs) (*StreamConfig, error)
Reconfigure a stream.
Example ¶
package main
import (
"context"
"fmt"
"log"
"github.com/s2-streamstore/s2-sdk-go/s2"
)
func main() {
client := s2.New("your-access-token", nil)
basin := client.Basin("my-basin")
ctx := context.Background()
config, err := basin.Streams.Reconfigure(ctx, s2.ReconfigureStreamArgs{
Stream: "my-stream",
Config: s2.StreamReconfiguration{
RetentionPolicy: &s2.RetentionPolicy{
Age: s2.Ptr(int64(86400 * 30)),
},
},
})
if err != nil {
log.Fatalf("reconfigure stream: %v", err)
}
fmt.Printf("stream config: %+v\n", config)
}
Output:
type StreamsIterator ¶ added in v0.11.0
type StreamsIterator struct {
// contains filtered or unexported fields
}
Iterates over streams returned by StreamsClient.Iter. Use Next to advance, Value to get the current item, and Err to check for errors.
func (*StreamsIterator) Err ¶ added in v0.11.0
func (it *StreamsIterator) Err() error
Returns any error encountered during iteration. Should be called after Next returns false to check if iteration stopped due to an error.
func (*StreamsIterator) Next ¶ added in v0.11.0
func (it *StreamsIterator) Next() bool
Advances the iterator to the next stream. Returns true if there is a next item, false when iteration is complete or an error occurred. Call Err after iteration to check for errors.
func (*StreamsIterator) Value ¶ added in v0.11.0
func (it *StreamsIterator) Value() StreamInfo
Returns the current stream. Only valid after a successful call to Next.
type SubmitFuture ¶ added in v0.11.0
type SubmitFuture struct {
// contains filtered or unexported fields
}
Represents a pending batch submission to an AppendSession. Call SubmitFuture.Wait to block until the batch is accepted.
func (*SubmitFuture) Wait ¶ added in v0.11.0
func (f *SubmitFuture) Wait(ctx context.Context) (*BatchSubmitTicket, error)
Blocks until the batch is accepted by the AppendSession and returns a BatchSubmitTicket that can be used to wait for the append acknowledgment.
type TailResponse ¶ added in v0.11.0
type TailResponse struct {
// Sequence number that will be assigned to the next record on the stream, and timestamp of the last record.
Tail StreamPosition `json:"tail"`
}
type TimeseriesInterval ¶ added in v0.11.0
type TimeseriesInterval string
const ( TimeseriesIntervalMinute TimeseriesInterval = "minute" TimeseriesIntervalHour TimeseriesInterval = "hour" TimeseriesIntervalDay TimeseriesInterval = "day" )
type TimestampingConfig ¶ added in v0.11.0
type TimestampingConfig struct {
// Timestamping mode for appends that influences how timestamps are handled.
Mode *TimestampingMode `json:"mode,omitempty"`
// Allow client-specified timestamps to exceed the arrival time.
// If this is false or not set, client timestamps will be capped at the arrival time.
Uncapped *bool `json:"uncapped,omitempty"`
}
Timestamping behavior.
type TimestampingMode ¶ added in v0.8.0
type TimestampingMode string
const ( TimestampingModeClientPrefer TimestampingMode = "client-prefer" TimestampingModeClientRequire TimestampingMode = "client-require" TimestampingModeArrival TimestampingMode = "arrival" )
type TimestampingReconfiguration ¶ added in v0.11.0
type TimestampingReconfiguration struct {
// Timestamping mode for appends that influences how timestamps are handled.
Mode *TimestampingMode `json:"mode,omitempty"`
// Allow client-specified timestamps to exceed the arrival time.
Uncapped *bool `json:"uncapped,omitempty"`
}