Documentation
¶
Overview ¶
Example (CustomLogger) ¶
package main import ( "log/slog" "os" "path/filepath" "github.com/orbiterhq/comet" ) func main() { // Example: Using the default logger (writes to stderr) config := comet.DefaultCometConfig() config.Log.Level = "info" // Options: debug, info, warn, error, none // Example: Using slog slogger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelDebug, })) config.Log.Logger = comet.NewSlogAdapter(slogger) // Example: Disabling logs entirely config.Log.Logger = comet.NoOpLogger{} // Create client with configured logging client, err := comet.NewClient(filepath.Join(os.TempDir(), "logger"), config) if err != nil { panic(err) } defer client.Close() }
Index ¶
- Constants
- Variables
- func AllShardStreams(prefix string, shardCount uint32) []string
- func AllShardsRange(shardCount uint32) []uint32
- func GetProcessID(shmFile ...string) int
- func GetProcessIDWithFile(shmFile string) int
- func IsDebug() bool
- func ReleaseProcessID(shmFile_ ...string)
- func SetDebug(enabled bool)
- func ShardStreamName(prefix string, shardID uint32) string
- type AtomicSlice
- type BinarySearchableIndex
- type Client
- func (c *Client) Append(ctx context.Context, stream string, entries [][]byte) ([]MessageID, error)
- func (c *Client) Close() error
- func (c *Client) ForceRetentionCleanup()
- func (c *Client) GetRetentionStats() *RetentionStats
- func (c *Client) GetShardDiagnostics(shardID uint32) *ShardDiagnostics
- func (c *Client) GetShardStats(shardID uint32) (map[string]any, error)
- func (c *Client) GetStats() CometStats
- func (c *Client) GetUnhealthyShards() []ShardDiagnostics
- func (c *Client) Health() CometHealth
- func (c *Client) Len(ctx context.Context, stream string) (int64, error)
- func (c *Client) ListRecent(ctx context.Context, streamName string, limit int) ([]StreamMessage, error)
- func (c *Client) PickShard(key string, shardCount uint32) uint32
- func (c *Client) PickShardStream(prefix string, key string, shardCount uint32) string
- func (c *Client) ScanAll(ctx context.Context, streamName string, ...) error
- func (c *Client) Sync(ctx context.Context) error
- type ClientMetrics
- type CometConfig
- func DefaultCometConfig() CometConfig
- func DeprecatedMultiProcessConfig(processID, processCount int) CometConfig
- func HighCompressionConfig() CometConfig
- func HighThroughputConfig() CometConfig
- func MultiProcessConfig(sharedMemoryFile ...string) CometConfig
- func OptimizedConfig(shardCount int, memoryBudget int) CometConfig
- type CometHealth
- type CometState
- func (s *CometState) AddErrorCount(delta uint64) uint64
- func (s *CometState) AddFailedWrites(delta uint64) uint64
- func (s *CometState) AddFileRotations(delta uint64) uint64
- func (s *CometState) AddFilesCreated(delta uint64) uint64
- func (s *CometState) AddLastFileSequence(delta uint64) uint64
- func (s *CometState) AddTotalBytes(delta uint64) uint64
- func (s *CometState) AddTotalWrites(delta uint64) uint64
- func (s *CometState) AddWriteLatencyCount(delta uint64) uint64
- func (s *CometState) AddWriteLatencySum(delta uint64) uint64
- func (s *CometState) AddWriteOffset(delta uint64) uint64
- func (s *CometState) AllocateEntryNumbers(count int) int64
- func (s *CometState) GetAverageWriteLatency() uint64
- func (s *CometState) GetCompressionRatioFloat() float64
- func (s *CometState) GetErrorRate() float64
- func (s *CometState) GetLastEntryNumber() int64
- func (s *CometState) GetLastIndexUpdate() int64
- func (s *CometState) GetLastWriteNanos() int64
- func (s *CometState) GetMaxWriteLatency() uint64
- func (s *CometState) GetMinWriteLatency() uint64
- func (s *CometState) GetTotalWrites() uint64
- func (s *CometState) GetVersion() uint64
- func (s *CometState) GetWriteOffset() uint64
- func (s *CometState) IncrementLastEntryNumber() int64
- func (s *CometState) SetLastIndexUpdate(nanos int64)
- func (s *CometState) SetVersion(v uint64)
- func (s *CometState) StoreLastWriteNanos(val int64)
- func (s *CometState) StoreWriteOffset(val uint64)
- type CometStats
- type CompressedEntry
- type CompressionConfig
- type ConcurrencyConfig
- type Consumer
- func (c *Consumer) Ack(ctx context.Context, messageIDs ...MessageID) error
- func (c *Consumer) AckRange(ctx context.Context, shardID uint32, fromEntry, toEntry int64) error
- func (c *Consumer) Close() error
- func (c *Consumer) FilterDuplicates(messages []StreamMessage) []StreamMessage
- func (c *Consumer) FlushACKs(ctx context.Context) error
- func (c *Consumer) GetLag(ctx context.Context, shardID uint32) (int64, error)
- func (c *Consumer) GetShardStats(ctx context.Context, shardID uint32) (*StreamStats, error)
- func (c *Consumer) MarkBatchProcessed(messages []StreamMessage)
- func (c *Consumer) Process(ctx context.Context, handler ProcessFunc, opts ...ProcessOption) error
- func (c *Consumer) Read(ctx context.Context, shards []uint32, count int) ([]StreamMessage, error)
- func (c *Consumer) ResetOffset(ctx context.Context, shardID uint32, entryNumber int64) error
- func (c *Consumer) Sync(ctx context.Context) error
- type ConsumerEntry
- type ConsumerOffsetHeader
- type ConsumerOffsetMmap
- func (c *ConsumerOffsetMmap) Close() error
- func (c *ConsumerOffsetMmap) Get(group string) (int64, bool)
- func (c *ConsumerOffsetMmap) GetAll() map[string]int64
- func (c *ConsumerOffsetMmap) GetStats() (used, total int)
- func (c *ConsumerOffsetMmap) Remove(group string)
- func (c *ConsumerOffsetMmap) Set(group string, offset int64) error
- type ConsumerOptions
- type EntryIndexMetadata
- type EntryIndexNode
- type EntryPosition
- type FileInfo
- type IndexingConfig
- type LogConfig
- type LogLevel
- type Logger
- type MappedFile
- type MessageID
- type NoOpLogger
- func (NoOpLogger) Debug(msg string, keysAndValues ...any)
- func (NoOpLogger) Error(msg string, keysAndValues ...any)
- func (NoOpLogger) Info(msg string, keysAndValues ...any)
- func (NoOpLogger) Warn(msg string, keysAndValues ...any)
- func (n NoOpLogger) WithContext(ctx context.Context) Logger
- func (n NoOpLogger) WithFields(keysAndValues ...any) Logger
- type ProcessFunc
- type ProcessOption
- func WithAutoAck(enabled bool) ProcessOption
- func WithBatchCallback(callback func(size int, duration time.Duration)) ProcessOption
- func WithBatchSize(size int) ProcessOption
- func WithConsumerAssignment(id, total int) ProcessOption
- func WithErrorHandler(handler func(err error, retryCount int)) ProcessOption
- func WithMaxRetries(retries int) ProcessOption
- func WithPollInterval(interval time.Duration) ProcessOption
- func WithRetryDelay(delay time.Duration) ProcessOption
- func WithShardDiscoveryInterval(interval time.Duration) ProcessOption
- func WithShards(shards ...uint32) ProcessOption
- func WithStream(pattern string) ProcessOption
- type Reader
- func (r *Reader) Close() error
- func (r *Reader) GetMemoryUsage() (int64, int)
- func (r *Reader) ReadEntryAtPosition(pos EntryPosition) ([]byte, error)
- func (r *Reader) ReadEntryByNumber(entryNumber int64) ([]byte, error)
- func (r *Reader) SetClient(client *Client)
- func (r *Reader) SetState(state *CometState)
- func (r *Reader) UpdateFiles(newFiles *[]FileInfo) error
- type ReaderConfig
- type RetentionConfig
- type RetentionStats
- type Shard
- type ShardDiagnostics
- type ShardIndex
- type ShardRetentionStats
- type SlogAdapter
- func (s *SlogAdapter) Debug(msg string, keysAndValues ...any)
- func (s *SlogAdapter) Error(msg string, keysAndValues ...any)
- func (s *SlogAdapter) Info(msg string, keysAndValues ...any)
- func (s *SlogAdapter) Warn(msg string, keysAndValues ...any)
- func (s *SlogAdapter) WithContext(ctx context.Context) Logger
- func (s *SlogAdapter) WithFields(keysAndValues ...any) Logger
- type StdLogger
- func (s *StdLogger) Debug(msg string, keysAndValues ...any)
- func (s *StdLogger) Error(msg string, keysAndValues ...any)
- func (s *StdLogger) Info(msg string, keysAndValues ...any)
- func (s *StdLogger) Warn(msg string, keysAndValues ...any)
- func (s *StdLogger) WithContext(ctx context.Context) Logger
- func (s *StdLogger) WithFields(keysAndValues ...any) Logger
- type StorageConfig
- type StreamMessage
- type StreamStats
- type WriteMode
- type WriteRequest
Examples ¶
Constants ¶
const ( // ConsumerOffsetMagic identifies a valid consumer offset file ConsumerOffsetMagic = 0xC0FE0FF5 // ConsumerOffsetVersion for format changes ConsumerOffsetVersion = 1 // MaxConsumerGroups supported per shard (must be power of 2 for efficient hashing) MaxConsumerGroups = 512 // ConsumerGroupNameSize maximum bytes for group name ConsumerGroupNameSize = 48 // ConsumerOffsetFileSize total size of mmap file ConsumerOffsetFileSize = 64 + (MaxConsumerGroups * 128) // 64KB + 64 bytes header )
const ( CometStateVersion1 = 1 CometStateSize = 1024 )
Constants for the unified state
const MaxShardCount = uint32(256)
MaxShardCount is the maximum number of shards supported (0-255)
Variables ¶
var ErrStopProcessing = errors.New("stop processing")
ErrStopProcessing is a sentinel error that can be returned from ProcessFunc to gracefully stop processing while still ACKing the current batch
Functions ¶
func AllShardStreams ¶
AllShardStreams returns all stream names for the given prefix and shard count
func AllShardsRange ¶
AllShardsRange returns a slice containing all shard IDs from 0 to shardCount-1
func GetProcessID ¶ added in v1.3.0
GetProcessID returns a unique process ID (0 to N-1) for multi-process deployments. It uses a shared memory file to coordinate process ID assignment across multiple processes that may start in any order.
This is useful for deployments where you don't have explicit control over process startup order, such as: - systemd with multiple service instances - Container orchestration (Kubernetes, Docker Swarm) - Process managers (PM2, Supervisor) - Manual process spawning
The function will return: - 0 to (NumCPU-1): Successfully acquired process ID - -1: Failed to acquire a process ID (all slots taken or error)
Example usage:
processID := comet.GetProcessID() if processID < 0 { log.Fatal("Failed to acquire process ID") } config := comet.MultiProcessConfig(processID, runtime.NumCPU()) client, err := comet.NewClient(dataDir, config)
func GetProcessIDWithFile ¶ added in v1.3.0
GetProcessIDWithFile is like GetProcessID but allows specifying a custom shared memory file. This is useful when running multiple independent Comet deployments on the same machine.
func IsDebug ¶ added in v1.1.1
func IsDebug() bool
IsDebug returns whether debug mode is enabled (thread-safe)
func ReleaseProcessID ¶ added in v1.3.0
func ReleaseProcessID(shmFile_ ...string)
ReleaseProcessID releases the process ID when shutting down gracefully. This is optional but helps with faster slot reuse.
func SetDebug ¶ added in v1.1.1
func SetDebug(enabled bool)
SetDebug allows runtime control of debug mode
func ShardStreamName ¶
ShardStreamName constructs a stream name from prefix and shard ID Example: ShardStreamName("events:v1", 255) returns "events:v1:0011"
Types ¶
type AtomicSlice ¶
type AtomicSlice struct {
// contains filtered or unexported fields
}
AtomicSlice provides atomic access to a byte slice using atomic.Value
func (*AtomicSlice) Store ¶
func (a *AtomicSlice) Store(data []byte)
Store atomically stores a new slice
type BinarySearchableIndex ¶
type BinarySearchableIndex struct { // Sorted slice of index nodes for binary search Nodes []EntryIndexNode `json:"nodes"` // Interval between indexed entries (default: 1000) IndexInterval int `json:"index_interval"` // Maximum number of nodes to keep (0 = unlimited) MaxNodes int `json:"max_nodes"` }
BinarySearchableIndex provides O(log n) entry lookups
func (*BinarySearchableIndex) AddIndexNode ¶
func (idx *BinarySearchableIndex) AddIndexNode(entryNumber int64, position EntryPosition)
AddIndexNode adds a node to the binary searchable index
func (*BinarySearchableIndex) FindEntryPosition ¶ added in v1.3.0
func (idx *BinarySearchableIndex) FindEntryPosition(targetEntry int64) (EntryPosition, bool)
FindEntryPosition uses binary search to find the position of an entry
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client implements a local file-based stream client with append-only storage
func NewClient ¶
func NewClient(dataDir string, config ...CometConfig) (*Client, error)
NewClient creates a new comet client with custom configuration
func NewMultiProcessClient ¶ added in v1.3.0
func NewMultiProcessClient(dataDir string, cfg ...CometConfig) (*Client, error)
NewMultiProcessClient creates a new comet client with automatic multi-process coordination. It uses the default shared memory file for process ID coordination. The process ID is automatically released when the client is closed.
Example usage:
client, err := comet.NewMultiProcessClient("./data") if err != nil { log.Fatal(err) } defer client.Close() // Automatically releases process ID
func (*Client) ForceRetentionCleanup ¶
func (c *Client) ForceRetentionCleanup()
ForceRetentionCleanup forces an immediate retention cleanup pass This is primarily useful for testing retention behavior
func (*Client) GetRetentionStats ¶
func (c *Client) GetRetentionStats() *RetentionStats
GetRetentionStats returns current retention statistics
func (*Client) GetShardDiagnostics ¶ added in v1.3.8
func (c *Client) GetShardDiagnostics(shardID uint32) *ShardDiagnostics
GetShardDiagnostics returns detailed diagnostics for a specific shard
func (*Client) GetShardStats ¶ added in v1.3.0
GetShardStats returns detailed statistics for a specific shard
func (*Client) GetStats ¶
func (c *Client) GetStats() CometStats
GetStats returns current metrics for monitoring
func (*Client) GetUnhealthyShards ¶ added in v1.3.8
func (c *Client) GetUnhealthyShards() []ShardDiagnostics
GetUnhealthyShards returns diagnostics for all unhealthy shards
func (*Client) Health ¶ added in v1.1.1
func (c *Client) Health() CometHealth
Health returns basic health status
func (*Client) ListRecent ¶ added in v1.2.0
func (c *Client) ListRecent(ctx context.Context, streamName string, limit int) ([]StreamMessage, error)
ListRecent returns the N most recent messages from a stream This is a browse operation that doesn't affect consumer offsets
func (*Client) PickShard ¶ added in v1.3.1
PickShard selects a shard ID based on consistent hashing of the key In multi-process mode, returns a shard owned by this process
func (*Client) PickShardStream ¶ added in v1.3.1
PickShardStream returns a complete stream name for the shard picked by key Example: client.PickShardStream("events:v1", "user123", 256) returns "events:v1:0255" In multi-process mode, this will only pick from shards owned by this client
func (*Client) ScanAll ¶ added in v1.2.0
func (c *Client) ScanAll(ctx context.Context, streamName string, fn func(context.Context, StreamMessage) bool) error
ScanAll scans all entries in a stream, calling fn for each message Return false from fn to stop scanning early This operation bypasses consumer groups and doesn't affect offsets
type ClientMetrics ¶
type ClientMetrics struct { // Write metrics WritesTotal atomic.Uint64 BytesWritten atomic.Uint64 WriteErrors atomic.Uint64 WriteLatencyNanos atomic.Uint64 CompressionSaves atomic.Uint64 CompressionSkipped atomic.Uint64 // Read metrics ReadsTotal atomic.Uint64 BytesRead atomic.Uint64 ReadErrors atomic.Uint64 ConsumerLagTotal atomic.Uint64 // File metrics FileRotations atomic.Uint64 FilesCreated atomic.Uint64 // Additional metrics LastErrorNano atomic.Int64 TotalEntries atomic.Uint64 FilesDeleted atomic.Uint64 CheckpointsWritten atomic.Uint64 // Process coordination metrics ShardConflicts atomic.Uint64 LockWaitNanos atomic.Uint64 RecoveryAttempts atomic.Uint64 RecoverySuccesses atomic.Uint64 // Index persistence errors IndexPersistErrors atomic.Uint64 ErrorCount atomic.Uint64 // Consumer metrics AckCount atomic.Uint64 ActiveConsumers atomic.Uint64 ConsumerTimeouts atomic.Uint64 ConsumerResets atomic.Uint64 // Retention metrics RetentionRuns atomic.Uint64 FilesCleanedUp atomic.Uint64 BytesCleanedUp atomic.Uint64 RetentionErrors atomic.Uint64 RetentionSkipped atomic.Uint64 }
ClientMetrics tracks global client metrics
type CometConfig ¶
type CometConfig struct { // Write mode WriteMode WriteMode `json:"write_mode"` // Compression settings Compression CompressionConfig `json:"compression"` // Indexing settings Indexing IndexingConfig `json:"indexing"` // Storage settings Storage StorageConfig `json:"storage"` // Concurrency settings Concurrency ConcurrencyConfig `json:"concurrency"` // Retention policy Retention RetentionConfig `json:"retention"` // Logging configuration Log LogConfig `json:"log"` // Reader settings Reader ReaderConfig `json:"reader"` }
CometConfig represents the complete comet configuration
func DefaultCometConfig ¶
func DefaultCometConfig() CometConfig
DefaultCometConfig returns sensible defaults optimized for logging workloads
func DeprecatedMultiProcessConfig ¶ added in v1.3.0
func DeprecatedMultiProcessConfig(processID, processCount int) CometConfig
DeprecatedMultiProcessConfig creates a config for multi-process mode with N processes
func HighCompressionConfig ¶
func HighCompressionConfig() CometConfig
HighCompressionConfig returns a config optimized for compression ratio
func HighThroughputConfig ¶
func HighThroughputConfig() CometConfig
HighThroughputConfig returns a config optimized for write throughput
func MultiProcessConfig ¶
func MultiProcessConfig(sharedMemoryFile ...string) CometConfig
MultiProcessConfig returns a configuration optimized for multi-process deployments. It automatically acquires a unique process ID and configures the client for multi-process mode. The process ID is automatically released when the client is closed.
func OptimizedConfig ¶ added in v1.3.0
func OptimizedConfig(shardCount int, memoryBudget int) CometConfig
OptimizedConfig returns a configuration optimized for a specific number of shards. Based on benchmarking, it adjusts file size and other parameters for optimal performance.
Recommended shard counts:
- 16 shards: Good for small deployments
- 64 shards: Balanced performance
- 256 shards: Optimal for high throughput (2.4M ops/sec)
Example:
config := comet.OptimizedConfig(256, 3072) // For 256 shards with 3GB memory budget client, err := comet.NewClient("/data", config)
Example ¶
ExampleOptimizedConfig demonstrates using the optimized configuration
package main import ( "os" "path/filepath" "github.com/orbiterhq/comet" ) func main() { // Create a client optimized for 256 shards with 3GB memory budget cfg := comet.OptimizedConfig(256, 3072) client, err := comet.NewClient(filepath.Join(os.TempDir(), "comet"), cfg) if err != nil { panic(err) } defer client.Close() // Use 256 shards when writing stream := client.PickShardStream("events:v1", "user-123", 256) // Write to the stream... _ = stream }
type CometHealth ¶ added in v1.3.0
type CometHealth struct { Healthy bool `json:"healthy"` Status string `json:"status"` ActiveShards int `json:"active_shards"` Details string `json:"details"` Uptime int64 `json:"uptime_seconds"` WritesOK bool `json:"writes_ok"` ReadsOK bool `json:"reads_ok"` LastWriteTime time.Time `json:"last_write_time"` ErrorCount int64 `json:"error_count"` }
CometHealth represents the health status of the client
type CometState ¶ added in v1.1.1
type CometState struct { // ======== Header (0-63): Version and core state ======== Version uint64 // 0-7: Format version (start with 1) WriteOffset uint64 // 8-15: Current write position in active file LastEntryNumber int64 // 16-23: Sequence counter for entry IDs LastIndexUpdate int64 // 24-31: Timestamp of last index modification LastFileSequence uint64 // 48-55: Sequence counter for file naming TotalBytes uint64 // 72-79: Total uncompressed bytes TotalWrites uint64 // 80-87: Total write operations LastWriteNanos int64 // 88-95: Timestamp of last write CurrentBatchSize uint64 // 96-103: Current batch being built TotalBatches uint64 // 104-111: Total batches written FailedWrites uint64 // 112-119: Write failures // ======== Cache Line 2 (128-191): Compression metrics ======== TotalCompressed uint64 // 128-135: Total compressed bytes CompressedEntries uint64 // 136-143: Number of compressed entries SkippedCompression uint64 // 144-151: Entries too small to compress CompressionRatio uint64 // 152-159: Average ratio * 100 CompressionTimeNanos int64 // 160-167: Total time compressing // ======== Cache Line 3 (192-255): Latency metrics ======== WriteLatencySum uint64 // 192-199: Sum for averaging WriteLatencyCount uint64 // 200-207: Count for averaging MinWriteLatency uint64 // 208-215: Minimum seen MaxWriteLatency uint64 // 216-223: Maximum seen SyncLatencyNanos int64 // 224-231: Time spent in fsync // ======== Cache Line 4 (256-319): File operation metrics ======== FilesCreated uint64 // 256-263: Total files created FilesDeleted uint64 // 264-271: Files removed by retention FileRotations uint64 // 272-279: Successful rotations RotationTimeNanos int64 // 280-287: Time spent rotating CurrentFiles uint64 // 288-295: Current file count TotalFileBytes uint64 // 296-303: Total size on disk FailedRotations uint64 // 304-311: Rotation failures SyncCount uint64 // 312-319: Total sync operations // ======== Cache Line 5 (320-383): Checkpoint/Index metrics ======== CheckpointCount uint64 // 320-327: Total checkpoints LastCheckpointNanos int64 // 328-335: Last checkpoint time CheckpointTimeNanos int64 // 336-343: Total checkpoint time IndexPersistCount uint64 // 344-351: Index saves IndexPersistErrors uint64 // 352-359: Failed index saves BinaryIndexNodes uint64 // 360-367: Nodes in binary index // ======== Cache Line 6 (384-447): Consumer metrics ======== ActiveReaders uint64 // 384-391: Current reader count TotalReaders uint64 // 392-399: Total readers created MaxConsumerLag uint64 // 400-407: Max entries behind TotalEntriesRead uint64 // 408-415: Total read operations ConsumerGroups uint64 // 416-423: Active consumer groups AckedEntries uint64 // 424-431: Acknowledged entries ReaderCacheHits uint64 // 432-439: Cache hit count // ======== Cache Line 7 (448-511): Error/Recovery metrics ======== ErrorCount uint64 // 448-455: Total errors LastErrorNanos int64 // 456-463: Last error timestamp CorruptionDetected uint64 // 464-471: Corrupted entries found RecoveryAttempts uint64 // 472-479: Auto-recovery attempts RecoverySuccesses uint64 // 480-487: Successful recoveries PartialWrites uint64 // 488-495: Incomplete write detected ReadErrors uint64 // 496-503: Read failures // ======== Cache Lines 8-9 (512-639): Retention metrics ======== RetentionRuns uint64 // 512-519: Cleanup executions LastRetentionNanos int64 // 520-527: Last cleanup time RetentionTimeNanos int64 // 528-535: Total cleanup time EntriesDeleted uint64 // 536-543: Entries removed BytesReclaimed uint64 // 544-551: Space freed OldestEntryNanos int64 // 552-559: Oldest data timestamp RetentionErrors uint64 // 560-567: Cleanup failures ProtectedByConsumers uint64 // 568-575: Files kept for consumers // ======== Cache Lines 10-11 (640-767): Reader cache metrics ======== ReaderFileMaps uint64 // 640-647: Files mapped into memory ReaderFileUnmaps uint64 // 648-655: Files unmapped from memory ReaderCacheBytes uint64 // 656-663: Current cache memory usage ReaderMappedFiles uint64 // 664-671: Current number of mapped files ReaderFileRemaps uint64 // 672-679: File remappings due to growth ReaderCacheEvicts uint64 // 680-687: Files evicted due to pressure // contains filtered or unexported fields }
CometState consolidates ALL mmap state and comprehensive metrics Stored in: comet.state (1KB file per shard) Total size: 1024 bytes (1KB) for plenty of room to grow
Design principles: 1. Group related metrics in same cache line 2. Hot path metrics in early cache lines 3. Generous reserved space for future additions 4. All fields are atomic-safe for multi-process access 5. CRITICAL: Use raw int64/uint64 fields with atomic operations, NOT atomic.Int64
func (*CometState) AddErrorCount ¶ added in v1.2.0
func (s *CometState) AddErrorCount(delta uint64) uint64
ErrorCount methods - simple atomics since processes own shards exclusively
func (*CometState) AddFailedWrites ¶ added in v1.2.0
func (s *CometState) AddFailedWrites(delta uint64) uint64
FailedWrites methods - simple atomics since processes own shards exclusively
func (*CometState) AddFileRotations ¶ added in v1.1.1
func (s *CometState) AddFileRotations(delta uint64) uint64
FileRotations methods - simple atomics since processes own shards exclusively
func (*CometState) AddFilesCreated ¶ added in v1.1.1
func (s *CometState) AddFilesCreated(delta uint64) uint64
FilesCreated methods - simple atomics since processes own shards exclusively
func (*CometState) AddLastFileSequence ¶ added in v1.1.1
func (s *CometState) AddLastFileSequence(delta uint64) uint64
LastFileSequence methods
func (*CometState) AddTotalBytes ¶ added in v1.1.1
func (s *CometState) AddTotalBytes(delta uint64) uint64
TotalBytes methods - simple atomics since processes own shards exclusively
func (*CometState) AddTotalWrites ¶ added in v1.1.1
func (s *CometState) AddTotalWrites(delta uint64) uint64
func (*CometState) AddWriteLatencyCount ¶ added in v1.1.1
func (s *CometState) AddWriteLatencyCount(delta uint64) uint64
WriteLatencyCount methods
func (*CometState) AddWriteLatencySum ¶ added in v1.1.1
func (s *CometState) AddWriteLatencySum(delta uint64) uint64
WriteLatencySum methods
func (*CometState) AddWriteOffset ¶ added in v1.1.1
func (s *CometState) AddWriteOffset(delta uint64) uint64
func (*CometState) AllocateEntryNumbers ¶ added in v1.3.0
func (s *CometState) AllocateEntryNumbers(count int) int64
AllocateEntryNumbers atomically reserves a batch of entry numbers
func (*CometState) GetAverageWriteLatency ¶ added in v1.1.1
func (s *CometState) GetAverageWriteLatency() uint64
Computed metrics helpers
func (*CometState) GetCompressionRatioFloat ¶ added in v1.1.1
func (s *CometState) GetCompressionRatioFloat() float64
func (*CometState) GetErrorRate ¶ added in v1.1.1
func (s *CometState) GetErrorRate() float64
func (*CometState) GetLastEntryNumber ¶ added in v1.1.1
func (s *CometState) GetLastEntryNumber() int64
Helper methods for atomic operations on uint64 fields
func (*CometState) GetLastIndexUpdate ¶ added in v1.1.1
func (s *CometState) GetLastIndexUpdate() int64
func (*CometState) GetLastWriteNanos ¶ added in v1.1.1
func (s *CometState) GetLastWriteNanos() int64
LastWriteNanos methods
func (*CometState) GetMaxWriteLatency ¶ added in v1.1.1
func (s *CometState) GetMaxWriteLatency() uint64
MaxWriteLatency methods
func (*CometState) GetMinWriteLatency ¶ added in v1.1.1
func (s *CometState) GetMinWriteLatency() uint64
MinWriteLatency methods
func (*CometState) GetTotalWrites ¶ added in v1.1.1
func (s *CometState) GetTotalWrites() uint64
TotalWrites methods - simple atomics since processes own shards exclusively
func (*CometState) GetVersion ¶ added in v1.1.1
func (s *CometState) GetVersion() uint64
Helper methods for non-atomic version field
func (*CometState) GetWriteOffset ¶ added in v1.1.1
func (s *CometState) GetWriteOffset() uint64
WriteOffset methods
func (*CometState) IncrementLastEntryNumber ¶ added in v1.1.1
func (s *CometState) IncrementLastEntryNumber() int64
func (*CometState) SetLastIndexUpdate ¶ added in v1.1.1
func (s *CometState) SetLastIndexUpdate(nanos int64)
func (*CometState) SetVersion ¶ added in v1.1.1
func (s *CometState) SetVersion(v uint64)
func (*CometState) StoreLastWriteNanos ¶ added in v1.1.1
func (s *CometState) StoreLastWriteNanos(val int64)
func (*CometState) StoreWriteOffset ¶ added in v1.1.1
func (s *CometState) StoreWriteOffset(val uint64)
type CometStats ¶
type CometStats struct { // Core metrics TotalEntries int64 `json:"total_entries"` TotalBytes int64 `json:"total_bytes"` TotalCompressed int64 `json:"total_compressed"` CompressedEntries int64 `json:"compressed_entries"` SkippedCompression int64 `json:"skipped_compression"` FileRotations int64 `json:"file_rotations"` CompressionWaitNano int64 `json:"compression_wait_nano"` ConsumerGroups int `json:"consumer_groups"` ConsumerOffsets map[string]int64 `json:"consumer_offsets"` WriteThroughput float64 `json:"write_throughput_mbps"` CompressionRatio float64 `json:"compression_ratio"` OpenReaders int64 `json:"open_readers"` MaxLag int64 `json:"max_lag"` FileCount int `json:"file_count"` IndexSize int64 `json:"index_size"` UptimeSeconds int64 `json:"uptime_seconds"` // Production diagnostics (calculated on-the-fly) WriteErrors int64 `json:"write_errors"` ReadErrors int64 `json:"read_errors"` TotalErrors int64 `json:"total_errors"` WritesTotal int64 `json:"writes_total"` ReadsTotal int64 `json:"reads_total"` RecoveryAttempts int64 `json:"recovery_attempts"` RecoverySuccesses int64 `json:"recovery_successes"` AvgWriteLatencyNs int64 `json:"avg_write_latency_ns"` GoroutineCount int `json:"goroutine_count"` ConsumerLags map[string]int64 `json:"consumer_lags"` TotalFileBytes int64 `json:"total_file_bytes"` FailedWrites int64 `json:"failed_writes"` FailedRotations int64 `json:"failed_rotations"` SyncCount int64 `json:"sync_count"` ErrorRate float64 `json:"error_rate_per_sec"` WriteRate float64 `json:"write_rate_per_sec"` ReadRate float64 `json:"read_rate_per_sec"` }
CometStats provides runtime statistics
type CompressedEntry ¶
type CompressedEntry struct { Data []byte OriginalSize uint64 CompressedSize uint64 WasCompressed bool }
CompressedEntry represents a pre-compressed entry ready for writing
type CompressionConfig ¶
type CompressionConfig struct {
MinCompressSize int `json:"min_compress_size"` // Minimum size to compress (bytes)
}
CompressionConfig controls compression behavior
type ConcurrencyConfig ¶
type ConcurrencyConfig struct { // Process-level shard ownership (simplifies multi-process coordination) // When ProcessCount > 1, multi-process mode is automatically enabled ProcessID int `json:"process_id"` // This process's ID (0-based) ProcessCount int `json:"process_count"` // Total number of processes (0 = single-process) SHMFile string `json:"shm_file"` // Shared memory file path (empty = os.TempDir()/comet-worker-slots-shm) }
ConcurrencyConfig controls multi-process behavior
func (ConcurrencyConfig) IsMultiProcess ¶ added in v1.3.0
func (c ConcurrencyConfig) IsMultiProcess() bool
IsMultiProcess returns true if running in multi-process mode
func (ConcurrencyConfig) Owns ¶ added in v1.3.0
func (c ConcurrencyConfig) Owns(shardID uint32) bool
Owns checks if this process owns a particular shard
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer reads from comet stream shards Fields ordered for optimal memory alignment
func NewConsumer ¶
func NewConsumer(client *Client, opts ConsumerOptions) *Consumer
NewConsumer creates a new consumer for comet streams
func (*Consumer) AckRange ¶
AckRange acknowledges all messages in a contiguous range for a shard This is more efficient than individual acks for bulk processing IMPORTANT: This method only allows ACKing messages that have been read by this consumer
func (*Consumer) FilterDuplicates ¶ added in v1.3.0
func (c *Consumer) FilterDuplicates(messages []StreamMessage) []StreamMessage
FilterDuplicates removes already-processed messages from a batch, returning only new messages
func (*Consumer) FlushACKs ¶ added in v1.3.0
FlushACKs forces immediate persistence of all consumer offsets. This is useful for ensuring durability before shutdown or tests.
func (*Consumer) GetShardStats ¶
GetShardStats returns statistics for a specific shard
func (*Consumer) MarkBatchProcessed ¶ added in v1.3.0
func (c *Consumer) MarkBatchProcessed(messages []StreamMessage)
MarkBatchProcessed marks all messages in a batch as processed
func (*Consumer) Process ¶
func (c *Consumer) Process(ctx context.Context, handler ProcessFunc, opts ...ProcessOption) error
Process continuously reads and processes messages from shards. This is the high-level API for stream processing - handles discovery, retries, and ACKing automatically.
The simplest usage processes all discoverable shards:
err := consumer.Process(ctx, handleMessages)
Recommended usage with explicit configuration:
err := consumer.Process(ctx, handleMessages, comet.WithStream("events:v1:shard:*"), // Wildcard pattern for shard discovery comet.WithBatchSize(1000), // Process up to 1000 messages at once comet.WithErrorHandler(logError), // Handle processing errors comet.WithAutoAck(true), // Automatically ACK successful batches )
For distributed processing across multiple consumer instances:
err := consumer.Process(ctx, handleMessages, comet.WithStream("events:v1:shard:*"), comet.WithConsumerAssignment(workerID, totalWorkers), // Distribute shards across workers )
Stream patterns must end with ":*" for wildcard matching, e.g.: - "events:v1:shard:*" matches events:v1:shard:0000, events:v1:shard:0001, etc. - "logs:*:*:*" matches any 4-part stream name starting with "logs"
func (*Consumer) Read ¶
Read reads up to count entries from the specified shards starting from the consumer group's current offset. This is a low-level method for manual message processing - you probably want Process() instead.
Key differences from Process(): - Read() is one-shot, Process() is continuous - Read() requires manual ACKing, Process() can auto-ACK - Read() uses explicit shard IDs, Process() can use wildcards - Read() has no retry logic, Process() has configurable retries
func (*Consumer) ResetOffset ¶
ResetOffset sets the consumer offset to a specific entry number
type ConsumerEntry ¶ added in v1.3.7
type ConsumerEntry struct { // Cache line 1 (64 bytes) GroupName [ConsumerGroupNameSize]byte // 48 bytes - null-terminated string Offset int64 // 8 bytes - atomic access LastUpdate int64 // 8 bytes - unix nano timestamp // Cache line 2 (64 bytes) AckCount uint64 // 8 bytes - total ACKs for metrics Reserved [56]uint8 // 56 bytes - future use }
ConsumerEntry represents a single consumer group's offset Sized to exactly 128 bytes (2 cache lines) for alignment
type ConsumerOffsetHeader ¶ added in v1.3.7
type ConsumerOffsetHeader struct { Version uint32 // 4 bytes Magic uint32 // 4 bytes Reserved [56]uint8 // 56 bytes padding to 64 bytes }
ConsumerOffsetHeader is the header of the memory-mapped file
type ConsumerOffsetMmap ¶ added in v1.3.7
type ConsumerOffsetMmap struct {
// contains filtered or unexported fields
}
ConsumerOffsetMmap provides lock-free access to consumer offsets
func NewConsumerOffsetMmap ¶ added in v1.3.7
func NewConsumerOffsetMmap(shardPath string, shardID uint32) (*ConsumerOffsetMmap, error)
NewConsumerOffsetMmap creates or opens a memory-mapped consumer offset file
func (*ConsumerOffsetMmap) Close ¶ added in v1.3.7
func (c *ConsumerOffsetMmap) Close() error
Close unmaps the file and closes it
func (*ConsumerOffsetMmap) Get ¶ added in v1.3.7
func (c *ConsumerOffsetMmap) Get(group string) (int64, bool)
Get returns the offset for a consumer group
func (*ConsumerOffsetMmap) GetAll ¶ added in v1.3.7
func (c *ConsumerOffsetMmap) GetAll() map[string]int64
GetAll returns all consumer offsets
func (*ConsumerOffsetMmap) GetStats ¶ added in v1.3.7
func (c *ConsumerOffsetMmap) GetStats() (used, total int)
GetStats returns statistics about consumer offset usage
func (*ConsumerOffsetMmap) Remove ¶ added in v1.3.7
func (c *ConsumerOffsetMmap) Remove(group string)
Remove removes a consumer group (for testing/admin)
type ConsumerOptions ¶
type ConsumerOptions struct { Group string ConsumerID int // 0, 1, 2, etc. for deterministic shard assignment ConsumerCount int // Total consumers in this group for deterministic shard assignment }
ConsumerOptions configures a consumer
type EntryIndexMetadata ¶ added in v1.3.0
type EntryIndexMetadata struct { EntryNumber int64 `json:"entry_number"` Position EntryPosition `json:"position"` }
EntryIndexMetadata represents a stored index node
type EntryIndexNode ¶
type EntryIndexNode struct { EntryNumber int64 `json:"entry_number"` // Entry number this node covers Position EntryPosition `json:"position"` // Position in files }
EntryIndexNode represents a node in the binary searchable index
type EntryPosition ¶
type EntryPosition struct { FileIndex int `json:"file_index"` // Index in the Files array ByteOffset int64 `json:"byte_offset"` // Byte offset within the file }
EntryPosition represents the location of an entry in the file system EntryPosition represents the location of an entry in the file system
type FileInfo ¶
type FileInfo struct { // 64-bit aligned fields first (8 bytes each) StartOffset int64 `json:"start_offset"` // Byte offset in virtual stream EndOffset int64 `json:"end_offset"` // Last byte offset + 1 StartEntry int64 `json:"start_entry"` // First entry number Entries int64 `json:"entries"` // Number of entries // Time fields (24 bytes each on 64-bit due to location pointer) StartTime time.Time `json:"start_time"` // File creation time EndTime time.Time `json:"end_time"` // Last write time // String last (will use remaining space efficiently) Path string `json:"path"` }
FileInfo represents a data file in the shard Fields ordered for optimal memory alignment
type IndexingConfig ¶
type IndexingConfig struct { BoundaryInterval int `json:"boundary_interval"` // Store boundary every N entries MaxIndexEntries int `json:"max_index_entries"` // Max boundary entries per shard (0 = unlimited) }
IndexingConfig controls indexing behavior
type LogConfig ¶ added in v1.1.1
type LogConfig struct { // Logger allows injecting a custom logger // If nil, a default logger will be created based on Level Logger Logger `json:"-"` // Level controls log level when using default logger // Options: "debug", "info", "warn", "error", "none" Level string `json:"level"` }
LogConfig controls logging behavior
type Logger ¶ added in v1.1.1
type Logger interface { // Debug logs a debug message with optional key-value pairs Debug(msg string, keysAndValues ...any) // Info logs an informational message with optional key-value pairs Info(msg string, keysAndValues ...any) // Warn logs a warning message with optional key-value pairs Warn(msg string, keysAndValues ...any) // Error logs an error message with optional key-value pairs Error(msg string, keysAndValues ...any) // WithContext returns a logger with the given context WithContext(ctx context.Context) Logger // WithFields returns a logger with the given fields attached WithFields(keysAndValues ...any) Logger }
Logger is the interface for Comet's logging needs. It's designed to be simple and easy to adapt to various logging libraries.
type MappedFile ¶
type MappedFile struct { FileInfo // contains filtered or unexported fields }
MappedFile represents a memory-mapped file with atomic data access
type MessageID ¶
MessageID represents a structured message ID Fields ordered for optimal memory alignment: int64 first, then uint32
func ParseMessageID ¶
ParseMessageID parses a string ID back to MessageID
type NoOpLogger ¶ added in v1.1.1
type NoOpLogger struct{}
NoOpLogger is a logger that discards all log messages
func (NoOpLogger) Debug ¶ added in v1.1.1
func (NoOpLogger) Debug(msg string, keysAndValues ...any)
func (NoOpLogger) Error ¶ added in v1.1.1
func (NoOpLogger) Error(msg string, keysAndValues ...any)
func (NoOpLogger) Info ¶ added in v1.1.1
func (NoOpLogger) Info(msg string, keysAndValues ...any)
func (NoOpLogger) Warn ¶ added in v1.1.1
func (NoOpLogger) Warn(msg string, keysAndValues ...any)
func (NoOpLogger) WithContext ¶ added in v1.1.1
func (n NoOpLogger) WithContext(ctx context.Context) Logger
func (NoOpLogger) WithFields ¶ added in v1.1.1
func (n NoOpLogger) WithFields(keysAndValues ...any) Logger
type ProcessFunc ¶
type ProcessFunc func(ctx context.Context, messages []StreamMessage) error
ProcessFunc handles a batch of messages, returning error to trigger retry
type ProcessOption ¶
type ProcessOption func(*processConfig)
ProcessOption configures the Process method
func WithAutoAck ¶
func WithAutoAck(enabled bool) ProcessOption
WithAutoAck controls automatic acknowledgment (default: true)
func WithBatchCallback ¶
func WithBatchCallback(callback func(size int, duration time.Duration)) ProcessOption
WithBatchCallback sets a callback after each batch completes
func WithBatchSize ¶
func WithBatchSize(size int) ProcessOption
WithBatchSize sets the number of messages to read at once
func WithConsumerAssignment ¶
func WithConsumerAssignment(id, total int) ProcessOption
WithConsumerAssignment configures distributed processing
func WithErrorHandler ¶
func WithErrorHandler(handler func(err error, retryCount int)) ProcessOption
WithErrorHandler sets a callback for processing errors
func WithMaxRetries ¶
func WithMaxRetries(retries int) ProcessOption
WithMaxRetries sets the number of retry attempts for failed batches
func WithPollInterval ¶
func WithPollInterval(interval time.Duration) ProcessOption
WithPollInterval sets how long to wait when no messages are available
func WithRetryDelay ¶
func WithRetryDelay(delay time.Duration) ProcessOption
WithRetryDelay sets the base delay between retries
func WithShardDiscoveryInterval ¶ added in v1.3.8
func WithShardDiscoveryInterval(interval time.Duration) ProcessOption
WithShardDiscoveryInterval sets how often to check for new shards Default is 5 seconds. Set to 0 to disable periodic rediscovery.
func WithShards ¶
func WithShards(shards ...uint32) ProcessOption
WithShards specifies explicit shards to process
func WithStream ¶
func WithStream(pattern string) ProcessOption
WithStream specifies a stream pattern for shard discovery
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader provides bounded memory-mapped read access to a shard with intelligent file management
func NewReader ¶
func NewReader(shardID uint32, index *ShardIndex, config ...ReaderConfig) (*Reader, error)
NewReader creates a new bounded reader for a shard with smart file mapping
func (*Reader) GetMemoryUsage ¶ added in v1.1.1
GetMemoryUsage returns current memory usage statistics
func (*Reader) ReadEntryAtPosition ¶
func (r *Reader) ReadEntryAtPosition(pos EntryPosition) ([]byte, error)
ReadEntryAtPosition reads a single entry at the given position
func (*Reader) ReadEntryByNumber ¶ added in v1.1.1
ReadEntryByNumber reads an entry by its sequential entry number, using the file metadata to locate it
func (*Reader) SetState ¶ added in v1.1.1
func (r *Reader) SetState(state *CometState)
SetState sets the shared state for metrics tracking
func (*Reader) UpdateFiles ¶ added in v1.1.1
UpdateFiles updates the reader with new file information
type ReaderConfig ¶ added in v1.1.1
type ReaderConfig struct { MaxMappedFiles int // Maximum number of files to keep mapped (default: 10) MaxMemoryBytes int64 // Maximum memory to use for mapping (default: 1GB) CleanupInterval int // How often to run cleanup in milliseconds (default: 5000) }
ReaderConfig configures the bounded reader behavior
func DefaultReaderConfig ¶ added in v1.1.1
func DefaultReaderConfig() ReaderConfig
DefaultReaderConfig returns the default configuration for Reader
func ReaderConfigForStorage ¶ added in v1.3.0
func ReaderConfigForStorage(maxFileSize int64) ReaderConfig
ReaderConfigForStorage returns a reader configuration optimized for the given storage settings
type RetentionConfig ¶
type RetentionConfig struct { MaxAge time.Duration `json:"max_age"` // Delete files older than this MaxBytes int64 `json:"max_bytes"` // Delete oldest files if total size exceeds MaxTotalSize int64 `json:"max_total_size"` // Alias for MaxBytes for compatibility MaxShardSize int64 `json:"max_shard_size"` // Maximum size per shard CheckUnconsumed bool `json:"check_unconsumed"` // Protect unconsumed messages ProtectUnconsumed bool `json:"protect_unconsumed"` // Alias for CheckUnconsumed CleanupInterval time.Duration `json:"cleanup_interval"` // How often to run cleanup (0 = disabled) FileGracePeriod time.Duration `json:"file_grace_period"` // Don't delete files newer than this ForceDeleteAfter time.Duration `json:"force_delete_after"` // Force delete files older than this SafetyMargin float64 `json:"safety_margin"` // Keep this fraction of space free (0.0-1.0) MinFilesToRetain int `json:"min_files_to_retain"` // Always keep at least N files per shard MinFilesToKeep int `json:"min_files_to_keep"` // Alias for MinFilesToRetain }
RetentionConfig controls data retention policies
type RetentionStats ¶
type RetentionStats struct { // 8-byte aligned fields first TotalSizeBytes int64 `json:"total_size_bytes"` TotalSizeGB float64 `json:"total_size_gb"` MaxTotalSizeGB float64 `json:"max_total_size_gb"` RetentionAge time.Duration `json:"retention_age"` TotalFiles int `json:"total_files"` // Pointer fields (8 bytes) ShardStats map[uint32]ShardRetentionStats `json:"shard_stats,omitempty"` // Larger composite types last (time.Time is 24 bytes) OldestData time.Time `json:"oldest_data"` NewestData time.Time `json:"newest_data"` }
RetentionStats provides type-safe retention statistics Fields ordered for optimal memory alignment
type Shard ¶
type Shard struct {
// contains filtered or unexported fields
}
Shard represents a single stream shard with its own files Fields ordered for optimal memory alignment (64-bit words first)
func (*Shard) GetConsumerOffset ¶ added in v1.3.0
GetConsumerOffset returns the current offset for a consumer group
func (*Shard) UpdateConsumerOffset ¶ added in v1.3.0
UpdateConsumerOffset updates the offset for a consumer group
type ShardDiagnostics ¶ added in v1.3.8
type ShardDiagnostics struct { // 8-byte aligned fields first WriteRate float64 `json:"write_rate_per_sec"` ErrorRate float64 `json:"error_rate_per_sec"` TotalEntries int64 `json:"total_entries"` TotalBytes int64 `json:"total_bytes"` FileRotateFailures int64 `json:"file_rotate_failures"` ConsumerLags map[string]int64 `json:"consumer_lags"` UnhealthyReasons []string `json:"unhealthy_reasons,omitempty"` LastError string `json:"last_error,omitempty"` // Time fields (24 bytes each) LastWriteTime time.Time `json:"last_write_time"` LastErrorTime time.Time `json:"last_error_time,omitempty"` // Smaller fields last ShardID uint32 `json:"shard_id"` FileCount int `json:"file_count"` IsHealthy bool `json:"is_healthy"` }
ShardDiagnostics provides detailed diagnostics for a specific shard Fields ordered for optimal memory alignment
type ShardIndex ¶
type ShardIndex struct { // 64-bit aligned fields first (8 bytes each) CurrentEntryNumber int64 `json:"current_entry_number"` // Entry-based tracking (not byte offsets!) CurrentWriteOffset int64 `json:"current_write_offset"` // Still track for file management // Maps (8 bytes pointer each) ConsumerOffsets map[string]int64 `json:"consumer_entry_offsets"` // Consumer tracking by entry number (not bytes!) // Composite types BinaryIndex BinarySearchableIndex `json:"binary_index"` // Binary searchable index for O(log n) lookups // Slices (24 bytes: ptr + len + cap) Files []FileInfo `json:"files"` // File management // Strings (24 bytes: ptr + len + cap) CurrentFile string `json:"current_file"` // Smaller fields last BoundaryInterval int `json:"boundary_interval"` // Store boundaries every N entries (4 bytes) }
ShardIndex tracks files and consumer offsets Fixed to use entry-based addressing instead of byte offsets Fields ordered for optimal memory alignment
type ShardRetentionStats ¶
type ShardRetentionStats struct { // 8-byte aligned fields first SizeBytes int64 `json:"size_bytes"` Files int `json:"files"` // Pointer field (8 bytes) ConsumerLag map[string]int64 `json:"consumer_lag,omitempty"` // Larger composite types last (time.Time is 24 bytes) OldestEntry time.Time `json:"oldest_entry"` NewestEntry time.Time `json:"newest_entry"` }
ShardRetentionStats provides retention stats for a single shard Fields ordered for optimal memory alignment
type SlogAdapter ¶ added in v1.1.1
type SlogAdapter struct {
// contains filtered or unexported fields
}
SlogAdapter adapts slog.Logger to the Comet Logger interface
func NewSlogAdapter ¶ added in v1.1.1
func NewSlogAdapter(logger *slog.Logger) *SlogAdapter
NewSlogAdapter creates a new adapter for slog.Logger
func (*SlogAdapter) Debug ¶ added in v1.1.1
func (s *SlogAdapter) Debug(msg string, keysAndValues ...any)
func (*SlogAdapter) Error ¶ added in v1.1.1
func (s *SlogAdapter) Error(msg string, keysAndValues ...any)
func (*SlogAdapter) Info ¶ added in v1.1.1
func (s *SlogAdapter) Info(msg string, keysAndValues ...any)
func (*SlogAdapter) Warn ¶ added in v1.1.1
func (s *SlogAdapter) Warn(msg string, keysAndValues ...any)
func (*SlogAdapter) WithContext ¶ added in v1.1.1
func (s *SlogAdapter) WithContext(ctx context.Context) Logger
func (*SlogAdapter) WithFields ¶ added in v1.1.1
func (s *SlogAdapter) WithFields(keysAndValues ...any) Logger
type StdLogger ¶ added in v1.1.1
type StdLogger struct {
// contains filtered or unexported fields
}
StdLogger is a simple logger that writes to stdout/stderr
func NewStdLogger ¶ added in v1.1.1
NewStdLogger creates a new standard logger
func (*StdLogger) WithContext ¶ added in v1.1.1
func (*StdLogger) WithFields ¶ added in v1.1.1
type StorageConfig ¶
type StorageConfig struct { MaxFileSize int64 `json:"max_file_size"` // Maximum size per file before rotation CheckpointInterval time.Duration `json:"checkpoint_interval"` // Checkpoint interval CheckpointEntries int `json:"checkpoint_entries"` // Checkpoint every N entries (default: 100000) FlushInterval time.Duration `json:"flush_interval"` // Flush to OS cache interval (memory management, not durability) FlushEntries int `json:"flush_entries"` // Flush to OS cache every N entries (memory management, not durability) }
StorageConfig controls file storage behavior
type StreamMessage ¶
type StreamMessage struct { Stream string // Stream name/identifier ID MessageID // Unique message ID Data []byte // Raw message data }
StreamMessage represents a message read from a stream
type StreamStats ¶
type StreamStats struct { // 64-bit aligned fields first TotalEntries int64 TotalBytes int64 OldestEntry time.Time NewestEntry time.Time // Composite types ConsumerOffsets map[string]int64 // Smaller fields last FileCount int // 8 bytes ShardID uint32 // 4 bytes }
StreamStats returns statistics about a shard Fields ordered for optimal memory alignment
type WriteMode ¶ added in v1.3.0
type WriteMode int
WriteMode determines how data is written to disk
type WriteRequest ¶
type WriteRequest struct { CurrentWriteOffset int64 // Snapshot of write offset at request time (8 bytes) WriteBuffers [][]byte // Buffers to write (24 bytes) IDs []MessageID // Message IDs for the batch (24 bytes) ShouldFlush bool // Whether to flush after this write (1 byte) }
WriteRequest represents a batch write operation Fields ordered for optimal memory alignment