Documentation
¶
Index ¶
- func AllShardStreams(namespace, version string, shardCount uint32) []string
- func AllShardsRange(shardCount uint32) []uint32
- func PickShard(key string, shardCount uint32) uint32
- func PickShardStream(key, namespace, version string, shardCount uint32) string
- func ShardStreamName(namespace string, version string, shardID uint32) string
- type AtomicSlice
- type BinarySearchableIndex
- type Client
- func (c *Client) Append(ctx context.Context, stream string, entries [][]byte) ([]MessageID, error)
- func (c *Client) Close() error
- func (c *Client) ForceRetentionCleanup()
- func (c *Client) GetRetentionStats() *RetentionStats
- func (c *Client) GetStats() CometStats
- func (c *Client) Len(ctx context.Context, stream string) (int64, error)
- func (c *Client) Sync(ctx context.Context) error
- type ClientMetrics
- type CometConfig
- type CometStats
- type CompressedEntry
- type CompressionConfig
- type ConcurrencyConfig
- type Consumer
- func (c *Consumer) Ack(ctx context.Context, messageIDs ...MessageID) error
- func (c *Consumer) AckRange(ctx context.Context, shardID uint32, fromEntry, toEntry int64) error
- func (c *Consumer) Close() error
- func (c *Consumer) GetLag(ctx context.Context, shardID uint32) (int64, error)
- func (c *Consumer) GetShardStats(ctx context.Context, shardID uint32) (*StreamStats, error)
- func (c *Consumer) Process(ctx context.Context, handler ProcessFunc, opts ...ProcessOption) error
- func (c *Consumer) Read(ctx context.Context, shards []uint32, count int) ([]StreamMessage, error)
- func (c *Consumer) ResetOffset(ctx context.Context, shardID uint32, entryNumber int64) error
- type ConsumerOptions
- type EntryIndexNode
- type EntryPosition
- type FileInfo
- type IndexingConfig
- type MappedFile
- type MessageID
- type MmapCoordinationState
- type MmapSharedState
- type MmapWriter
- type ProcessFunc
- type ProcessOption
- func WithAutoAck(enabled bool) ProcessOption
- func WithBatchCallback(callback func(size int, duration time.Duration)) ProcessOption
- func WithBatchSize(size int) ProcessOption
- func WithConsumerAssignment(id, total int) ProcessOption
- func WithErrorHandler(handler func(err error, retryCount int)) ProcessOption
- func WithMaxRetries(retries int) ProcessOption
- func WithPollInterval(interval time.Duration) ProcessOption
- func WithRetryDelay(delay time.Duration) ProcessOption
- func WithShards(shards ...uint32) ProcessOption
- func WithStream(pattern string) ProcessOption
- type Reader
- type RetentionConfig
- type RetentionStats
- type SequenceState
- type Shard
- type ShardIndex
- type ShardRetentionStats
- type StorageConfig
- type StreamMessage
- type StreamStats
- type WriteRequest
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AllShardStreams ¶
AllShardStreams returns stream names for all shards in a namespace
func AllShardsRange ¶
AllShardsRange returns a slice of shard IDs from 0 to shardCount-1 Useful for consumers that need to read from all shards
func PickShard ¶
PickShard selects a shard based on a key using consistent hashing Returns a shard ID in the range [0, shardCount)
func PickShardStream ¶
PickShardStream combines PickShard and ShardStreamName for convenience Example: PickShardStream("user123", "events", "v1", 16) -> "events:v1:shard:0007"
Types ¶
type AtomicSlice ¶
type AtomicSlice struct {
// contains filtered or unexported fields
}
AtomicSlice provides atomic access to a byte slice using atomic.Value
func (*AtomicSlice) Store ¶
func (a *AtomicSlice) Store(data []byte)
Store atomically stores a new slice
type BinarySearchableIndex ¶
type BinarySearchableIndex struct { // Sorted slice of index nodes for binary search Nodes []EntryIndexNode `json:"nodes"` // Interval between indexed entries (default: 1000) IndexInterval int `json:"index_interval"` // Maximum number of nodes to keep (0 = unlimited) MaxNodes int `json:"max_nodes"` }
BinarySearchableIndex provides O(log n) entry lookups
func (*BinarySearchableIndex) AddIndexNode ¶
func (bi *BinarySearchableIndex) AddIndexNode(entryNumber int64, position EntryPosition)
AddIndexNode adds a new entry to the binary searchable index
func (*BinarySearchableIndex) FindEntry ¶
func (bi *BinarySearchableIndex) FindEntry(entryNumber int64) (EntryPosition, bool)
FindEntry uses binary search to locate an entry position
func (*BinarySearchableIndex) GetScanStartPosition ¶
func (bi *BinarySearchableIndex) GetScanStartPosition(entryNumber int64) (EntryPosition, int64, bool)
GetScanStartPosition returns the best starting position for scanning to find an entry
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client implements a local file-based stream client with append-only storage
func NewClient ¶
NewClient creates a new comet client for local file-based streaming with default config
func NewClientWithConfig ¶
func NewClientWithConfig(dataDir string, config CometConfig) (*Client, error)
NewClientWithConfig creates a new comet client with custom configuration
func (*Client) ForceRetentionCleanup ¶
func (c *Client) ForceRetentionCleanup()
ForceRetentionCleanup forces an immediate retention cleanup pass This is primarily useful for testing retention behavior
func (*Client) GetRetentionStats ¶
func (c *Client) GetRetentionStats() *RetentionStats
GetRetentionStats returns current retention statistics
func (*Client) GetStats ¶
func (c *Client) GetStats() CometStats
GetStats returns current metrics for monitoring
type ClientMetrics ¶
type ClientMetrics struct { TotalEntries atomic.Uint64 TotalBytes atomic.Uint64 TotalCompressed atomic.Uint64 WriteLatencyNano atomic.Uint64 MinWriteLatency atomic.Uint64 MaxWriteLatency atomic.Uint64 CompressionRatio atomic.Uint64 CompressedEntries atomic.Uint64 SkippedCompression atomic.Uint64 TotalFiles atomic.Uint64 FileRotations atomic.Uint64 CheckpointCount atomic.Uint64 LastCheckpoint atomic.Uint64 ActiveReaders atomic.Uint64 ConsumerLag atomic.Uint64 ErrorCount atomic.Uint64 LastErrorNano atomic.Uint64 CompressionWait atomic.Uint64 // Time waiting for compression IndexPersistErrors atomic.Uint64 // Failed index persistence attempts }
ClientMetrics holds atomic counters for thread-safe metrics tracking
type CometConfig ¶
type CometConfig struct { // Compression settings Compression CompressionConfig `json:"compression"` // Indexing settings Indexing IndexingConfig `json:"indexing"` // Storage settings Storage StorageConfig `json:"storage"` // Concurrency settings Concurrency ConcurrencyConfig `json:"concurrency"` // Retention policy Retention RetentionConfig `json:"retention"` }
CometConfig holds configuration for comet client
func DefaultCometConfig ¶
func DefaultCometConfig() CometConfig
DefaultCometConfig returns sensible defaults optimized for logging workloads
func HighCompressionConfig ¶
func HighCompressionConfig() CometConfig
HighCompressionConfig returns a config optimized for high compression ratios Suitable for text-heavy logs and structured data
func HighThroughputConfig ¶
func HighThroughputConfig() CometConfig
HighThroughputConfig returns a config optimized for high write throughput Trades some memory for better performance
func MultiProcessConfig ¶
func MultiProcessConfig() CometConfig
MultiProcessConfig returns a config suitable for multi-process deployments Enable this for prefork servers or when multiple processes write to the same stream
type CometStats ¶
type CometStats struct { // Write metrics TotalEntries uint64 `json:"total_entries"` TotalBytes uint64 `json:"total_bytes"` TotalCompressed uint64 `json:"total_compressed_bytes"` WriteLatencyNano uint64 `json:"avg_write_latency_nano"` MinWriteLatency uint64 `json:"min_write_latency_nano"` MaxWriteLatency uint64 `json:"max_write_latency_nano"` // Compression metrics CompressionRatio uint64 `json:"compression_ratio_x100"` // x100 for fixed point CompressedEntries uint64 `json:"compressed_entries"` SkippedCompression uint64 `json:"skipped_compression"` // File management TotalFiles uint64 `json:"total_files"` FileRotations uint64 `json:"file_rotations"` CheckpointCount uint64 `json:"checkpoint_count"` LastCheckpoint uint64 `json:"last_checkpoint_nano"` // Consumer metrics ActiveReaders uint64 `json:"active_readers"` ConsumerLag uint64 `json:"max_consumer_lag_bytes"` // Error tracking ErrorCount uint64 `json:"error_count"` LastErrorNano uint64 `json:"last_error_nano"` IndexPersistErrors uint64 `json:"index_persist_errors"` // Compression metrics CompressionWaitNano uint64 `json:"compression_wait_nano"` }
CometStats tracks key metrics for monitoring comet performance
type CompressedEntry ¶
type CompressedEntry struct { Data []byte OriginalSize uint64 CompressedSize uint64 WasCompressed bool }
CompressedEntry represents a pre-compressed entry ready for writing
type CompressionConfig ¶
type CompressionConfig struct {
MinCompressSize int `json:"min_compress_size"` // Don't compress entries smaller than this
}
CompressionConfig controls compression behavior
type ConcurrencyConfig ¶
type ConcurrencyConfig struct {
EnableMultiProcessMode bool `json:"enable_file_locking"` // Enable file-based locking for multi-process safety
}
ConcurrencyConfig controls multi-process behavior
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer reads from comet stream shards Fields ordered for optimal memory alignment
func NewConsumer ¶
func NewConsumer(client *Client, opts ConsumerOptions) *Consumer
NewConsumer creates a new consumer for comet streams
func (*Consumer) AckRange ¶
AckRange acknowledges all messages in a contiguous range for a shard This is more efficient than individual acks for bulk processing
func (*Consumer) GetShardStats ¶
GetShardStats returns statistics for a specific shard
func (*Consumer) Process ¶
func (c *Consumer) Process(ctx context.Context, handler ProcessFunc, opts ...ProcessOption) error
Process continuously reads and processes messages from shards. The simplest usage processes all discoverable shards:
err := consumer.Process(ctx, handleMessages)
With options:
err := consumer.Process(ctx, handleMessages, comet.WithStream("events:v1:shard:*"), comet.WithBatchSize(1000), comet.WithErrorHandler(logError), )
For distributed processing:
err := consumer.Process(ctx, handleMessages, comet.WithStream("events:v1:shard:*"), comet.WithConsumerAssignment(workerID, totalWorkers), )
type ConsumerOptions ¶
type ConsumerOptions struct {
Group string
}
ConsumerOptions configures a consumer
type EntryIndexNode ¶
type EntryIndexNode struct { EntryNumber int64 `json:"entry_number"` // Entry number this node covers Position EntryPosition `json:"position"` // Position in files }
EntryIndexNode represents a node in the binary searchable index
type EntryPosition ¶
type EntryPosition struct { ByteOffset int64 `json:"byte_offset"` // Byte position within that file (8 bytes) FileIndex int `json:"file_index"` // Which file in Files array (8 bytes) }
EntryPosition tracks where an entry is located Fields ordered for optimal memory alignment
type FileInfo ¶
type FileInfo struct { // 64-bit aligned fields first StartOffset int64 `json:"start_offset"` EndOffset int64 `json:"end_offset"` StartEntry int64 `json:"start_entry"` // First entry number in this file Entries int64 `json:"entries"` StartTime time.Time `json:"start_time"` EndTime time.Time `json:"end_time"` // String last (will use remaining space efficiently) Path string `json:"path"` }
FileInfo tracks a single data file Fields ordered for optimal memory alignment
type IndexingConfig ¶
type IndexingConfig struct { BoundaryInterval int `json:"boundary_interval"` // Store boundaries every N entries MaxIndexEntries int `json:"max_index_entries"` // Max boundary entries per shard (0 = unlimited) }
IndexingConfig controls indexing behavior
type MappedFile ¶
type MappedFile struct { FileInfo // Embedded struct (already aligned) // contains filtered or unexported fields }
MappedFile represents a memory-mapped data file with atomic data updates Fields ordered for optimal memory alignment (embedded struct first)
type MessageID ¶
MessageID represents a structured message ID Fields ordered for optimal memory alignment: int64 first, then uint32
func ParseMessageID ¶
ParseMessageID parses a string ID back to MessageID
type MmapCoordinationState ¶
type MmapCoordinationState struct { // Core coordination fields ActiveFileIndex atomic.Int64 // Current file being written to WriteOffset atomic.Int64 // Current write position in active file FileSize atomic.Int64 // Current size of active file // Performance tracking LastWriteNanos atomic.Int64 // Timestamp of last write TotalWrites atomic.Int64 // Total number of writes // contains filtered or unexported fields }
MmapCoordinationState represents shared state for ultra-fast multi-process coordination
type MmapSharedState ¶
type MmapSharedState struct {
}MmapSharedState provides instant multi-process coordination Just 8 bytes - a timestamp indicating when the index was last modified
type MmapWriter ¶
type MmapWriter struct {
// contains filtered or unexported fields
}
MmapWriter implements ultra-fast memory-mapped writes for multi-process mode
func NewMmapWriter ¶
func NewMmapWriter(shardDir string, maxFileSize int64, index *ShardIndex, metrics *ClientMetrics, rotationLockFile *os.File) (*MmapWriter, error)
NewMmapWriter creates a new memory-mapped writer for a shard
func (*MmapWriter) CoordinationState ¶
func (w *MmapWriter) CoordinationState() *MmapCoordinationState
CoordinationState returns the coordination state for external access
type ProcessFunc ¶
type ProcessFunc func(messages []StreamMessage) error
ProcessFunc handles a batch of messages, returning error to trigger retry
type ProcessOption ¶
type ProcessOption func(*processConfig)
ProcessOption configures the Process method
func WithAutoAck ¶
func WithAutoAck(enabled bool) ProcessOption
WithAutoAck controls automatic acknowledgment (default: true)
func WithBatchCallback ¶
func WithBatchCallback(callback func(size int, duration time.Duration)) ProcessOption
WithBatchCallback sets a callback after each batch completes
func WithBatchSize ¶
func WithBatchSize(size int) ProcessOption
WithBatchSize sets the number of messages to read at once
func WithConsumerAssignment ¶
func WithConsumerAssignment(id, total int) ProcessOption
WithConsumerAssignment configures distributed processing
func WithErrorHandler ¶
func WithErrorHandler(handler func(err error, retryCount int)) ProcessOption
WithErrorHandler sets a callback for processing errors
func WithMaxRetries ¶
func WithMaxRetries(retries int) ProcessOption
WithMaxRetries sets the number of retry attempts for failed batches
func WithPollInterval ¶
func WithPollInterval(interval time.Duration) ProcessOption
WithPollInterval sets how long to wait when no messages are available
func WithRetryDelay ¶
func WithRetryDelay(delay time.Duration) ProcessOption
WithRetryDelay sets the base delay between retries
func WithShards ¶
func WithShards(shards ...uint32) ProcessOption
WithShards specifies explicit shards to process
func WithStream ¶
func WithStream(pattern string) ProcessOption
WithStream specifies a stream pattern for shard discovery
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader provides memory-mapped read access to a shard Fields ordered for optimal memory alignment
func NewReader ¶
func NewReader(shardID uint32, index *ShardIndex) (*Reader, error)
NewReader creates a new reader for a shard
func (*Reader) ReadEntryAtPosition ¶
func (r *Reader) ReadEntryAtPosition(pos EntryPosition) ([]byte, error)
ReadEntryAtPosition reads a single entry at the given position
type RetentionConfig ¶
type RetentionConfig struct { // Time-based retention MaxAge time.Duration `json:"max_age"` // Delete files older than this // Size-based retention MaxTotalSize int64 `json:"max_total_size"` // Total size limit across all shards MaxShardSize int64 `json:"max_shard_size"` // Size limit per shard // Cleanup behavior CleanupInterval time.Duration `json:"cleanup_interval"` // How often to run cleanup MinFilesToKeep int `json:"min_files_to_keep"` // Always keep at least N files ProtectUnconsumed bool `json:"protect_unconsumed"` // Don't delete unread data ForceDeleteAfter time.Duration `json:"force_delete_after"` // Delete even if unread after this time }
RetentionConfig defines data retention policies
type RetentionStats ¶
type RetentionStats struct { // 8-byte aligned fields first TotalSizeBytes int64 `json:"total_size_bytes"` TotalSizeGB float64 `json:"total_size_gb"` MaxTotalSizeGB float64 `json:"max_total_size_gb"` RetentionAge time.Duration `json:"retention_age"` TotalFiles int `json:"total_files"` // Pointer fields (8 bytes) ShardStats map[uint32]ShardRetentionStats `json:"shard_stats,omitempty"` // Larger composite types last (time.Time is 24 bytes) OldestData time.Time `json:"oldest_data"` NewestData time.Time `json:"newest_data"` }
RetentionStats provides type-safe retention statistics Fields ordered for optimal memory alignment
type SequenceState ¶
type SequenceState struct {
LastEntryNumber int64 // Last allocated entry number
}
SequenceState is a memory-mapped structure for atomic entry number generation
type Shard ¶
type Shard struct {
// contains filtered or unexported fields
}
Shard represents a single stream shard with its own files Fields ordered for optimal memory alignment (64-bit words first)
type ShardIndex ¶
type ShardIndex struct { // 64-bit aligned fields first (8 bytes each) CurrentEntryNumber int64 `json:"current_entry_number"` // Entry-based tracking (not byte offsets!) CurrentWriteOffset int64 `json:"current_write_offset"` // Still track for file management // Maps (8 bytes pointer each) ConsumerOffsets map[string]int64 `json:"consumer_entry_offsets"` // Consumer tracking by entry number (not bytes!) // Composite types BinaryIndex BinarySearchableIndex `json:"binary_index"` // Binary searchable index for O(log n) lookups // Slices (24 bytes: ptr + len + cap) Files []FileInfo `json:"files"` // File management // Strings (24 bytes: ptr + len + cap) CurrentFile string `json:"current_file"` // Smaller fields last BoundaryInterval int `json:"boundary_interval"` // 8 bytes }
ShardIndex tracks files and consumer offsets Fixed to use entry-based addressing instead of byte offsets Fields ordered for optimal memory alignment
type ShardRetentionStats ¶
type ShardRetentionStats struct { // 8-byte aligned fields first SizeBytes int64 `json:"size_bytes"` Files int `json:"files"` // Pointer field (8 bytes) ConsumerLag map[string]int64 `json:"consumer_lag,omitempty"` // Larger composite types last (time.Time is 24 bytes) OldestEntry time.Time `json:"oldest_entry"` NewestEntry time.Time `json:"newest_entry"` }
ShardRetentionStats provides retention stats for a single shard Fields ordered for optimal memory alignment
type StorageConfig ¶
type StorageConfig struct { MaxFileSize int64 `json:"max_file_size"` // Maximum size per file before rotation CheckpointTime int `json:"checkpoint_time_ms"` // Checkpoint every N milliseconds }
StorageConfig controls file storage behavior
type StreamMessage ¶
type StreamMessage struct { Stream string // Stream name/identifier ID MessageID // Unique message ID Data []byte // Raw message data }
StreamMessage represents a message read from a stream
type StreamStats ¶
type StreamStats struct { // 64-bit aligned fields first TotalEntries int64 TotalBytes int64 OldestEntry time.Time NewestEntry time.Time // Composite types ConsumerOffsets map[string]int64 // Smaller fields last FileCount int // 8 bytes ShardID uint32 // 4 bytes }
StreamStats returns statistics about a shard Fields ordered for optimal memory alignment
type WriteRequest ¶
type WriteRequest struct { UpdateFunc func() error // Function to update index state after successful write (8 bytes) WriteBuffers [][]byte // Buffers to write (24 bytes) IDs []MessageID // Message IDs for the batch (24 bytes) }
WriteRequest represents a batch write operation Fields ordered for optimal memory alignment