Documentation
¶
Index ¶
- Variables
- func CountRecordBatchMessages(recordSet []byte) int
- func PatchRecordBatchBaseOffset(batch *RecordBatch, baseOffset int64)
- type AppendResult
- type ByteRange
- type IndexBuilder
- type IndexEntry
- type MemoryS3Client
- func (m *MemoryS3Client) DownloadIndex(ctx context.Context, key string) ([]byte, error)
- func (m *MemoryS3Client) DownloadSegment(ctx context.Context, key string, rng *ByteRange) ([]byte, error)
- func (m *MemoryS3Client) EnsureBucket(ctx context.Context) error
- func (m *MemoryS3Client) ListSegments(ctx context.Context, prefix string) ([]S3Object, error)
- func (m *MemoryS3Client) UploadIndex(ctx context.Context, key string, body []byte) error
- func (m *MemoryS3Client) UploadSegment(ctx context.Context, key string, body []byte) error
- type PartitionLog
- func (l *PartitionLog) AppendBatch(ctx context.Context, batch RecordBatch) (*AppendResult, error)
- func (l *PartitionLog) EarliestOffset() int64
- func (l *PartitionLog) Flush(ctx context.Context) error
- func (l *PartitionLog) Read(ctx context.Context, offset int64, maxBytes int32) ([]byte, error)
- func (l *PartitionLog) RestoreFromS3(ctx context.Context) (int64, error)
- type PartitionLogConfig
- type RecordBatch
- type S3Client
- type S3Config
- type S3Object
- type SegmentArtifact
- type SegmentWriterConfig
- type WriteBuffer
- type WriteBufferConfig
Constants ¶
This section is empty.
Variables ¶
var ErrOffsetOutOfRange = errors.New("offset out of range")
ErrOffsetOutOfRange is returned when the requested offset is outside persisted data.
Functions ¶
func CountRecordBatchMessages ¶
CountRecordBatchMessages sums the message counts encoded in a record set. The record set is expected to be a concatenation of Kafka record batches as produced by the broker.
func PatchRecordBatchBaseOffset ¶
func PatchRecordBatchBaseOffset(batch *RecordBatch, baseOffset int64)
PatchRecordBatchBaseOffset overwrites the base offset field in the Kafka record batch header.
Types ¶
type AppendResult ¶
AppendResult contains offsets for a flushed batch.
type IndexBuilder ¶
type IndexBuilder struct {
// contains filtered or unexported fields
}
IndexBuilder tracks offsets and file positions for sparse indexing.
func NewIndexBuilder ¶
func NewIndexBuilder(interval int32) *IndexBuilder
NewIndexBuilder creates a builder that emits an entry every interval messages.
func (*IndexBuilder) BuildBytes ¶
func (b *IndexBuilder) BuildBytes() ([]byte, error)
BuildBytes encodes the index header and entries.
func (*IndexBuilder) Entries ¶
func (b *IndexBuilder) Entries() []*IndexEntry
Entries returns the recorded index entries.
type IndexEntry ¶
IndexEntry mirrors a sparse index row.
func ParseIndex ¶
func ParseIndex(data []byte) ([]*IndexEntry, error)
ParseIndex validates and returns entries from serialized bytes.
type MemoryS3Client ¶
type MemoryS3Client struct {
// contains filtered or unexported fields
}
MemoryS3Client is an in-memory implementation of S3Client for development/testing.
func NewMemoryS3Client ¶
func NewMemoryS3Client() *MemoryS3Client
NewMemoryS3Client initializes the in-memory S3 client.
func (*MemoryS3Client) DownloadIndex ¶
func (*MemoryS3Client) DownloadSegment ¶
func (*MemoryS3Client) EnsureBucket ¶
func (m *MemoryS3Client) EnsureBucket(ctx context.Context) error
func (*MemoryS3Client) ListSegments ¶
func (*MemoryS3Client) UploadIndex ¶
func (*MemoryS3Client) UploadSegment ¶
type PartitionLog ¶
type PartitionLog struct {
// contains filtered or unexported fields
}
PartitionLog coordinates buffering, segment serialization, S3 uploads, and caching.
func NewPartitionLog ¶
func NewPartitionLog(namespace string, topic string, partition int32, startOffset int64, s3Client S3Client, cache *cache.SegmentCache, cfg PartitionLogConfig, onFlush func(context.Context, *SegmentArtifact), onS3Op func(string, time.Duration, error)) *PartitionLog
NewPartitionLog constructs a log for a topic partition.
func (*PartitionLog) AppendBatch ¶
func (l *PartitionLog) AppendBatch(ctx context.Context, batch RecordBatch) (*AppendResult, error)
AppendBatch writes a record batch to the log, updating offsets and flushing as needed.
func (*PartitionLog) EarliestOffset ¶
func (l *PartitionLog) EarliestOffset() int64
EarliestOffset returns the lowest offset available in the log.
func (*PartitionLog) Flush ¶
func (l *PartitionLog) Flush(ctx context.Context) error
Flush forces buffered batches to be written to S3 immediately.
func (*PartitionLog) RestoreFromS3 ¶
func (l *PartitionLog) RestoreFromS3(ctx context.Context) (int64, error)
RestoreFromS3 rebuilds segment ranges from objects already stored in S3.
type PartitionLogConfig ¶
type PartitionLogConfig struct {
Buffer WriteBufferConfig
Segment SegmentWriterConfig
ReadAheadSegments int
CacheEnabled bool
}
PartitionLogConfig configures per-partition log behavior.
type RecordBatch ¶
RecordBatch carries a Kafka record batch blob plus metadata required for indexing.
func NewRecordBatchFromBytes ¶
func NewRecordBatchFromBytes(data []byte) (RecordBatch, error)
NewRecordBatchFromBytes parses Kafka record batch metadata and returns a RecordBatch struct.
type S3Client ¶
type S3Client interface {
UploadSegment(ctx context.Context, key string, body []byte) error
UploadIndex(ctx context.Context, key string, body []byte) error
DownloadSegment(ctx context.Context, key string, rng *ByteRange) ([]byte, error)
DownloadIndex(ctx context.Context, key string) ([]byte, error)
ListSegments(ctx context.Context, prefix string) ([]S3Object, error)
EnsureBucket(ctx context.Context) error
}
S3Client is the abstraction used by storage to read/write segments.
type S3Config ¶
type S3Config struct {
Bucket string
Region string
Endpoint string
ForcePathStyle bool
AccessKeyID string
SecretAccessKey string
SessionToken string
KMSKeyARN string
}
S3Config describes connection details for AWS S3 or compatible endpoints.
type SegmentArtifact ¶
type SegmentArtifact struct {
BaseOffset int64
LastOffset int64
MessageCount int32
CreatedAt time.Time
SegmentBytes []byte
IndexBytes []byte
RelativeIndex []*IndexEntry
}
SegmentArtifact contains serialized segment + index bytes ready for upload.
func BuildSegment ¶
func BuildSegment(cfg SegmentWriterConfig, batches []RecordBatch, created time.Time) (*SegmentArtifact, error)
BuildSegment assembles segment + index bytes from buffered batches.
type SegmentWriterConfig ¶
type SegmentWriterConfig struct {
IndexIntervalMessages int32
}
SegmentWriterConfig controls serialization.
type WriteBuffer ¶
type WriteBuffer struct {
// contains filtered or unexported fields
}
WriteBuffer accumulates record batches prior to segment serialization.
func NewWriteBuffer ¶
func NewWriteBuffer(cfg WriteBufferConfig) *WriteBuffer
NewWriteBuffer creates an empty buffer.
func (*WriteBuffer) Append ¶
func (b *WriteBuffer) Append(batch RecordBatch)
Append adds a batch to the buffer.
func (*WriteBuffer) Drain ¶
func (b *WriteBuffer) Drain() []RecordBatch
Drain returns all buffered batches and resets counters.
func (*WriteBuffer) ShouldFlush ¶
func (b *WriteBuffer) ShouldFlush(now time.Time) bool
ShouldFlush checks if size thresholds or time elapsed require a flush.
func (*WriteBuffer) Size ¶
func (b *WriteBuffer) Size() int
Size returns the accumulated byte count (for tests/metrics).