storage

package
v1.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrOffsetOutOfRange = errors.New("offset out of range")

ErrOffsetOutOfRange is returned when the requested offset is outside persisted data.

Functions

func CountRecordBatchMessages

func CountRecordBatchMessages(recordSet []byte) int

CountRecordBatchMessages sums the message counts encoded in a record set. The record set is expected to be a concatenation of Kafka record batches as produced by the broker.

func PatchRecordBatchBaseOffset

func PatchRecordBatchBaseOffset(batch *RecordBatch, baseOffset int64)

PatchRecordBatchBaseOffset overwrites the base offset field in the Kafka record batch header.

Types

type AppendResult

type AppendResult struct {
	BaseOffset int64
	LastOffset int64
}

AppendResult contains offsets for a flushed batch.

type ByteRange

type ByteRange struct {
	Start int64
	End   int64
}

ByteRange represents an inclusive byte range for reads.

type IndexBuilder

type IndexBuilder struct {
	// contains filtered or unexported fields
}

IndexBuilder tracks offsets and file positions for sparse indexing.

func NewIndexBuilder

func NewIndexBuilder(interval int32) *IndexBuilder

NewIndexBuilder creates a builder that emits an entry every interval messages.

func (*IndexBuilder) BuildBytes

func (b *IndexBuilder) BuildBytes() ([]byte, error)

BuildBytes encodes the index header and entries.

func (*IndexBuilder) Entries

func (b *IndexBuilder) Entries() []*IndexEntry

Entries returns the recorded index entries.

func (*IndexBuilder) MaybeAdd

func (b *IndexBuilder) MaybeAdd(offset int64, position int32, batchMessages int32)

MaybeAdd records an index entry when the interval has elapsed or no entry exists yet.

type IndexEntry

type IndexEntry struct {
	Offset   int64
	Position int32
}

IndexEntry mirrors a sparse index row.

func ParseIndex

func ParseIndex(data []byte) ([]*IndexEntry, error)

ParseIndex validates and returns entries from serialized bytes.

type MemoryS3Client

type MemoryS3Client struct {
	// contains filtered or unexported fields
}

MemoryS3Client is an in-memory implementation of S3Client for development/testing.

func NewMemoryS3Client

func NewMemoryS3Client() *MemoryS3Client

NewMemoryS3Client initializes the in-memory S3 client.

func (*MemoryS3Client) DownloadIndex

func (m *MemoryS3Client) DownloadIndex(ctx context.Context, key string) ([]byte, error)

func (*MemoryS3Client) DownloadSegment

func (m *MemoryS3Client) DownloadSegment(ctx context.Context, key string, rng *ByteRange) ([]byte, error)

func (*MemoryS3Client) EnsureBucket

func (m *MemoryS3Client) EnsureBucket(ctx context.Context) error

func (*MemoryS3Client) ListSegments

func (m *MemoryS3Client) ListSegments(ctx context.Context, prefix string) ([]S3Object, error)

func (*MemoryS3Client) UploadIndex

func (m *MemoryS3Client) UploadIndex(ctx context.Context, key string, body []byte) error

func (*MemoryS3Client) UploadSegment

func (m *MemoryS3Client) UploadSegment(ctx context.Context, key string, body []byte) error

type PartitionLog

type PartitionLog struct {
	// contains filtered or unexported fields
}

PartitionLog coordinates buffering, segment serialization, S3 uploads, and caching.

func NewPartitionLog

func NewPartitionLog(namespace string, topic string, partition int32, startOffset int64, s3Client S3Client, cache *cache.SegmentCache, cfg PartitionLogConfig, onFlush func(context.Context, *SegmentArtifact), onS3Op func(string, time.Duration, error)) *PartitionLog

NewPartitionLog constructs a log for a topic partition.

func (*PartitionLog) AppendBatch

func (l *PartitionLog) AppendBatch(ctx context.Context, batch RecordBatch) (*AppendResult, error)

AppendBatch writes a record batch to the log, updating offsets and flushing as needed.

func (*PartitionLog) EarliestOffset

func (l *PartitionLog) EarliestOffset() int64

EarliestOffset returns the lowest offset available in the log.

func (*PartitionLog) Flush

func (l *PartitionLog) Flush(ctx context.Context) error

Flush forces buffered batches to be written to S3 immediately.

func (*PartitionLog) Read

func (l *PartitionLog) Read(ctx context.Context, offset int64, maxBytes int32) ([]byte, error)

Read loads the segment containing the requested offset.

func (*PartitionLog) RestoreFromS3

func (l *PartitionLog) RestoreFromS3(ctx context.Context) (int64, error)

RestoreFromS3 rebuilds segment ranges from objects already stored in S3.

type PartitionLogConfig

type PartitionLogConfig struct {
	Buffer            WriteBufferConfig
	Segment           SegmentWriterConfig
	ReadAheadSegments int
	CacheEnabled      bool
}

PartitionLogConfig configures per-partition log behavior.

type RecordBatch

type RecordBatch struct {
	BaseOffset      int64
	LastOffsetDelta int32
	MessageCount    int32
	Bytes           []byte
}

RecordBatch carries a Kafka record batch blob plus metadata required for indexing.

func NewRecordBatchFromBytes

func NewRecordBatchFromBytes(data []byte) (RecordBatch, error)

NewRecordBatchFromBytes parses Kafka record batch metadata and returns a RecordBatch struct.

type S3Client

type S3Client interface {
	UploadSegment(ctx context.Context, key string, body []byte) error
	UploadIndex(ctx context.Context, key string, body []byte) error
	DownloadSegment(ctx context.Context, key string, rng *ByteRange) ([]byte, error)
	DownloadIndex(ctx context.Context, key string) ([]byte, error)
	ListSegments(ctx context.Context, prefix string) ([]S3Object, error)
	EnsureBucket(ctx context.Context) error
}

S3Client is the abstraction used by storage to read/write segments.

func NewS3Client

func NewS3Client(ctx context.Context, cfg S3Config) (S3Client, error)

NewS3Client returns an AWS-backed S3 client.

type S3Config

type S3Config struct {
	Bucket          string
	Region          string
	Endpoint        string
	ForcePathStyle  bool
	AccessKeyID     string
	SecretAccessKey string
	SessionToken    string
	KMSKeyARN       string
}

S3Config describes connection details for AWS S3 or compatible endpoints.

type S3Object

type S3Object struct {
	Key  string
	Size int64
}

S3Object describes a stored segment object.

type SegmentArtifact

type SegmentArtifact struct {
	BaseOffset    int64
	LastOffset    int64
	MessageCount  int32
	CreatedAt     time.Time
	SegmentBytes  []byte
	IndexBytes    []byte
	RelativeIndex []*IndexEntry
}

SegmentArtifact contains serialized segment + index bytes ready for upload.

func BuildSegment

func BuildSegment(cfg SegmentWriterConfig, batches []RecordBatch, created time.Time) (*SegmentArtifact, error)

BuildSegment assembles segment + index bytes from buffered batches.

type SegmentWriterConfig

type SegmentWriterConfig struct {
	IndexIntervalMessages int32
}

SegmentWriterConfig controls serialization.

type WriteBuffer

type WriteBuffer struct {
	// contains filtered or unexported fields
}

WriteBuffer accumulates record batches prior to segment serialization.

func NewWriteBuffer

func NewWriteBuffer(cfg WriteBufferConfig) *WriteBuffer

NewWriteBuffer creates an empty buffer.

func (*WriteBuffer) Append

func (b *WriteBuffer) Append(batch RecordBatch)

Append adds a batch to the buffer.

func (*WriteBuffer) Drain

func (b *WriteBuffer) Drain() []RecordBatch

Drain returns all buffered batches and resets counters.

func (*WriteBuffer) ShouldFlush

func (b *WriteBuffer) ShouldFlush(now time.Time) bool

ShouldFlush checks if size thresholds or time elapsed require a flush.

func (*WriteBuffer) Size

func (b *WriteBuffer) Size() int

Size returns the accumulated byte count (for tests/metrics).

type WriteBufferConfig

type WriteBufferConfig struct {
	MaxBytes      int
	MaxMessages   int
	MaxBatches    int
	FlushInterval time.Duration
}

WriteBufferConfig controls flush thresholds.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL