comet

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2025 License: MIT Imports: 19 Imported by: 0

README

☄️ Comet

High-performance embedded segmented log for edge observability. Built for single-digit microsecond latency and bounded resources.

Architecture Guide | Performance Guide | Troubleshooting | Security | API Reference

What is Comet?

Comet is a segmented append-only log optimized for observability data (metrics, logs, traces) at edge locations. It implements the same pattern as Kafka's storage engine - append-only segments with time/size-based retention - but embedded directly in your service with aggressive deletion policies for resource-constrained environments.

Each shard maintains a series of immutable segment files that are rotated at size boundaries and deleted based on retention policies, ensuring predictable resource usage without the complexity of circular buffers or in-place overwrites.

Comet requires local filesystems (ext4, xfs, etc.) for its microsecond latency guarantees. It is unapologetically local. If you need distributed storage, use a proper distributed system like NATS JetStream or Kafka instead.

The Edge Storage Problem

Edge deployments need local observability buffering, but other solutions fall short:

  • Kafka: Requires clusters, complex ops, ~1-5ms latency
  • RocksDB: Single-threaded writes, 50-200μs writes, 100ms+ during compaction stalls
  • Redis: Requires separate server, memory-only without persistence config
  • Ring buffers: No persistence, no compression, data loss on overflow
  • Files + rotation: No indexing, no consumer tracking, manual everything

The gap: No embedded solution with Kafka's reliability at microsecond latencies.

Features

  • Ultra-low latency: 1.7μs single-process, 33μs multi-process writes
  • Predictable performance: No compaction stalls or write amplification like LSM-trees
  • True multi-process support: Hybrid coordination (mmap + file locks), crash-safe rotation, real OS processes
  • O(log n) lookups: Binary searchable index with bounded memory usage
  • Lock-free reads: Atomic pointers, zero-copy via mmap
  • Automatic retention: Time and size-based cleanup, protects unconsumed data
  • Production ready: Crash recovery, built-in metrics, extensive testing
  • Smart sharding: Consistent hashing, automatic discovery, batch optimizations
  • Optional zstd compression: ~37% storage savings when needed

Multi-Process Coordination

Unlike other embedded solutions, Comet enables true multi-process coordination. Perfect for prefork web servers like Go Fiber.

  • Hybrid coordination strategy - Memory-mapped atomics for high-frequency operations, OS file locks for critical sections
  • Lock-free writes - Direct memory writes to mapped files with atomic space allocation
  • Crash-safe rotation - File locks provide automatic cleanup when processes crash
  • Real process testing - Spawns actual OS processes, not just goroutines
  • 33μs write latency - Only 19x slower than single-process (vs 4,470x with traditional file locking)

How Does Comet Compare?

Feature Comet Kafka Redis Streams RocksDB Proof
Write Latency 1.7μs (33μs multi-process) 1-5ms 50-100μs 50-200μs Code
Multi-Process ✅ Real OS processes ✅ Distributed ❌ Single process ⚠️ Mutex locks Test
Resource Bounds ✅ Time & size limits ⚠️ JVM heap ⚠️ Memory only ⚠️ Manual compact Retention
Crash Recovery ✅ Automatic ✅ Replicas ⚠️ AOF/RDB ✅ WAL Test
Zero Copy Reads ✅ mmap ❌ Network ❌ Serialization ❌ Deserialization Code
Storage Overhead ~12 bytes/entry ~50 bytes/entry ~20 bytes/entry ~30 bytes/entry Format
Sharding ✅ Built-in ✅ Partitions ❌ Manual ❌ Manual Code
Compression ✅ Optional zstd ✅ Multiple codecs ❌ None ✅ Multiple Code
Embedded ✅ Native ❌ Requires cluster ❌ Requires server ✅ Native -

Quick Start

The Easy Way™

Step 1: Create a client

client, err := comet.NewClient("/var/lib/comet")
defer client.Close()

Step 2: Write your events (sharding handled automatically)

// Comet automatically shards by user ID for optimal performance
stream := comet.PickShardStream(event.UserID, "events", "v1", 16)
ids, err := client.Append(ctx, stream, [][]byte{
    []byte(event.ToJSON()),
})

Step 3: Process events (everything automatic!)

consumer := comet.NewConsumer(client, comet.ConsumerOptions{
    Group: "my-processor",
})

// This is it! Auto-discovery, auto-retry, auto-ACK, auto-everything!
err = consumer.Process(ctx, func(messages []comet.StreamMessage) error {
    for _, msg := range messages {
        processEvent(msg.Data)  // Your logic here
    }
    return nil  // Success = automatic progress tracking
})
That's it! Comet handles:
  • Compression - Large events compressed automatically
  • Sharding - Load distributed across 16 shards
  • Retries - Failed batches retry automatically
  • Progress - Consumer offsets tracked per shard
  • Cleanup - Old data deleted automatically
  • Recovery - Crash? Picks up where it left off
Want more control? Scale horizontally:
// Deploy this same code across 3 processes:
err = consumer.Process(ctx, processEvents,
    comet.WithStream("events:v1:shard:*"),
    comet.WithConsumerAssignment(workerID, numWorkers),  // This worker + total count
)
// Each worker processes different shards automatically!
// No coordination needed - Comet handles it
Production-Ready Example
err = consumer.Process(ctx, processEvents,
    comet.WithStream("events:v1:shard:*"),
    comet.WithBatchSize(1000),
    comet.WithPollInterval(50 * time.Millisecond),

    // Optional: Add observability
    comet.WithErrorHandler(func(err error, retryCount int) {
        metrics.Increment("comet.errors", 1)
        log.Printf("Retry %d: %v", retryCount, err)
    }),
    comet.WithBatchCallback(func(size int, duration time.Duration) {
        metrics.Histogram("comet.batch.size", float64(size))
        metrics.Histogram("comet.batch.duration_ms", duration.Milliseconds())
    }),
)
Need to tweak something?
// Only override what you need:
config := comet.DefaultCometConfig()
config.Retention.MaxAge = 24 * time.Hour  // Keep data longer
client, err := comet.NewClientWithConfig("/var/lib/comet", config)

// Or use a preset:
config = comet.HighCompressionConfig()      // Optimize for storage
config = comet.MultiProcessConfig()         // For prefork deployments
config = comet.HighThroughputConfig()       // For maximum write speed
Configuration Structure
type CometConfig struct {
    Compression CompressionConfig  // Controls compression behavior
    Indexing    IndexingConfig     // Controls indexing and lookup
    Storage     StorageConfig      // Controls file storage
    Concurrency ConcurrencyConfig  // Controls multi-process behavior
    Retention   RetentionConfig    // Controls data retention
}

Architecture

┌─────────────────┐     ┌─────────────────┐
│   Your Service  │     │  Your Service   │
│                 │     │                 │
│  ┌───────────┐  │     │  ┌───────────┐  │
│  │   Comet   │  │     │  │   Comet   │  │
│  │  Writer   │  │     │  │  Reader   │  │
│  └─────┬─────┘  │     │  └─────┬─────┘  │
└────────┼────────┘     └────────┼────────┘
         │                       │
         ▼                       ▼
    ┌──────────────────────────────────┐
    │      Segmented Log Storage       │
    │                                  │
    │  Shard 0: [seg0][seg1][seg2]→    │
    │  Shard 1: [seg0][seg1]→          │
    │  ...                             │
    │                                  │
    │  ↓ segments deleted by retention │
    └──────────────────────────────────┘

Performance Optimizations

Comet achieves microsecond-level latency through careful optimization:

  1. Lock-Free Reads: Memory-mapped files with atomic pointers enable concurrent reads without locks
  2. I/O Outside Locks: Compression and disk writes happen outside critical sections
  3. Binary Searchable Index: O(log n) entry lookups instead of linear scans
  4. Vectored I/O: Batches multiple writes into single syscalls
  5. Batch ACKs: Groups acknowledgments by shard to minimize lock acquisitions
  6. Pre-allocated Buffers: Reuses buffers to minimize allocations
  7. Concurrent Shards: Each shard has independent locks for parallel operations
  8. Memory-Mapped Multi-Process Coordination: Direct memory writes with atomic sequence allocation

How It Works

  1. Append-Only Segments: Data is written to segment files that grow up to MaxFileSize
  2. Segment Rotation: When a segment reaches max size, it's closed and a new one starts
  3. Binary Index: Entry locations are indexed for O(log n) lookups with bounded memory
  4. Retention: Old segments are deleted based on age (MaxAge) or total size limits
  5. Sharding: Load is distributed across multiple independent segmented logs
  6. Index Limits: Index memory is capped by MaxIndexEntries - older entries are pruned

Resource Usage

With default settings:

  • Memory: ~2MB per shard (10k index entries × ~200 bytes/entry)
  • Disk: Bounded by retention policy (MaxShardSize × shard count)
  • CPU: Minimal - compression happens outside locks

Example: 16 shards × 2MB index = 32MB memory for indexes Example: 16 shards × 1GB/shard = 16GB max disk usage

Sharding

Comet uses deterministic sharding for load distribution:

// Smart sharding by key (consistent distribution)
stream := comet.PickShardStream("user-123", "events", "v1", 16)

// Read from all shards
shards := comet.AllShardsRange(16)
messages, err := consumer.Read(ctx, shards, 1000)

for _, msg := range messages {
    fmt.Printf("Shard: %d, Entry: %d\n", msg.ID.ShardID, msg.ID.EntryNumber)
}

// Batch acknowledgments (automatically grouped by shard)
var messageIDs []comet.MessageID
for _, msg := range messages {
    messageIDs = append(messageIDs, msg.ID)
}
consumer.Ack(ctx, messageIDs...)

Use Cases

Right tool for:

  • Edge deployments with limited storage
  • High-frequency observability data (metrics, logs, traces)
  • Recent data access patterns (debugging last N hours)
  • Local buffering before shipping to cloud
  • Multi-service nodes requiring predictable resource usage

Not for:

  • Network filesystems (NFS, CIFS, etc.)
  • Long-term storage (use S3/GCS)
  • Transactional data requiring ACID
  • Random access patterns
  • Complex queries or aggregations

Performance

Benchmarked on Apple M2 with SSD (see Performance Guide for detailed analysis):

Single-Process Mode (default)

Optimized for single-process deployments with best performance:

  • Single entry: 1.7μs latency (594k entries/sec)
  • 10-entry batch: 2.7μs per batch (3.6M entries/sec)
  • 100-entry batch: 9.0μs per batch (11.1M entries/sec)
  • 1000-entry batch: 112μs per batch (8.9M entries/sec)
  • 10000-entry batch: 548μs per batch (18.2M entries/sec)
Multi-Process Mode

For prefork/multi-process deployments with memory-mapped coordination:

  • Single entry: 33μs latency (30k entries/sec) - ultra-fast for multi-process!
  • 10-entry batch: 35μs per batch (283k entries/sec)
  • 100-entry batch: 56μs per batch (1.8M entries/sec)
  • 1000-entry batch: 170μs per batch (5.9M entries/sec)
  • 10000-entry batch: 2.0ms per batch (5.0M entries/sec)

Note on Multi-Process Latency: While single-entry writes are ~19x slower in multi-process mode (33μs vs 1.7μs), this difference is often irrelevant in production:

  • With async batching: If you're buffering writes (like most ingest services), the latency is hidden from your request path
  • With large batches: At 1000-entry batches, multi-process is only ~1.5x slower per batch (170μs vs 112μs)
  • With prefork benefits: You gain linear CPU scaling, process isolation, and crash resilience

When the 33μs matters: Direct, synchronous writes where every microsecond counts When it doesn't: HTTP APIs, batched ingestion, async workers, or any pattern that decouples the write from the request

Other Performance Metrics
  • ACK performance: 30ns per ACK (34M ACKs/sec) with batch optimization
  • Memory efficiency: Zero allocations for ACKs, 5 allocations per write batch
  • Multi-process coordination: Memory-mapped atomic operations for lock-free sequence allocation
  • Storage overhead: 12 bytes per entry (4-byte length + 8-byte timestamp)

Configuration

Single-Process vs Multi-Process Mode

Comet defaults to single-process mode for optimal single-entry performance. Enable multi-process mode when needed:

// Single-process mode (default) - fastest performance
client, err := comet.NewClient("/data/streams")

// Multi-process mode - for prefork/multi-process deployments
config := comet.DefaultCometConfig()
config.Concurrency.EnableMultiProcessMode = true
client, err := comet.NewClientWithConfig("/data/streams", config)

When to use multi-process mode:

  • Async/batched writes where the 31μs latency is hidden from clients
  • Fiber prefork mode or similar multi-process web servers

License

MIT

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AllShardStreams

func AllShardStreams(namespace, version string, shardCount uint32) []string

AllShardStreams returns stream names for all shards in a namespace

func AllShardsRange

func AllShardsRange(shardCount uint32) []uint32

AllShardsRange returns a slice of shard IDs from 0 to shardCount-1 Useful for consumers that need to read from all shards

func PickShard

func PickShard(key string, shardCount uint32) uint32

PickShard selects a shard based on a key using consistent hashing Returns a shard ID in the range [0, shardCount)

func PickShardStream

func PickShardStream(key, namespace, version string, shardCount uint32) string

PickShardStream combines PickShard and ShardStreamName for convenience Example: PickShardStream("user123", "events", "v1", 16) -> "events:v1:shard:0007"

func ShardStreamName

func ShardStreamName(namespace string, version string, shardID uint32) string

ShardStreamName generates a stream name for a specific shard

Types

type AtomicSlice

type AtomicSlice struct {
	// contains filtered or unexported fields
}

AtomicSlice provides atomic access to a byte slice using atomic.Value

func (*AtomicSlice) Load

func (a *AtomicSlice) Load() []byte

Load atomically loads the slice

func (*AtomicSlice) Store

func (a *AtomicSlice) Store(data []byte)

Store atomically stores a new slice

type BinarySearchableIndex

type BinarySearchableIndex struct {
	// Sorted slice of index nodes for binary search
	Nodes []EntryIndexNode `json:"nodes"`
	// Interval between indexed entries (default: 1000)
	IndexInterval int `json:"index_interval"`
	// Maximum number of nodes to keep (0 = unlimited)
	MaxNodes int `json:"max_nodes"`
}

BinarySearchableIndex provides O(log n) entry lookups

func (*BinarySearchableIndex) AddIndexNode

func (bi *BinarySearchableIndex) AddIndexNode(entryNumber int64, position EntryPosition)

AddIndexNode adds a new entry to the binary searchable index

func (*BinarySearchableIndex) FindEntry

func (bi *BinarySearchableIndex) FindEntry(entryNumber int64) (EntryPosition, bool)

FindEntry uses binary search to locate an entry position

func (*BinarySearchableIndex) GetScanStartPosition

func (bi *BinarySearchableIndex) GetScanStartPosition(entryNumber int64) (EntryPosition, int64, bool)

GetScanStartPosition returns the best starting position for scanning to find an entry

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client implements a local file-based stream client with append-only storage

func NewClient

func NewClient(dataDir string) (*Client, error)

NewClient creates a new comet client for local file-based streaming with default config

func NewClientWithConfig

func NewClientWithConfig(dataDir string, config CometConfig) (*Client, error)

NewClientWithConfig creates a new comet client with custom configuration

func (*Client) Append

func (c *Client) Append(ctx context.Context, stream string, entries [][]byte) ([]MessageID, error)

Append adds entries to a stream shard (append-only semantics)

func (*Client) Close

func (c *Client) Close() error

Close gracefully shuts down the client

func (*Client) ForceRetentionCleanup

func (c *Client) ForceRetentionCleanup()

ForceRetentionCleanup forces an immediate retention cleanup pass This is primarily useful for testing retention behavior

func (*Client) GetRetentionStats

func (c *Client) GetRetentionStats() *RetentionStats

GetRetentionStats returns current retention statistics

func (*Client) GetStats

func (c *Client) GetStats() CometStats

GetStats returns current metrics for monitoring

func (*Client) Len

func (c *Client) Len(ctx context.Context, stream string) (int64, error)

Len returns the number of entries in a stream shard

func (*Client) Sync

func (c *Client) Sync(ctx context.Context) error

Sync ensures all buffered data is durably written to disk

type ClientMetrics

type ClientMetrics struct {
	TotalEntries       atomic.Uint64
	TotalBytes         atomic.Uint64
	TotalCompressed    atomic.Uint64
	WriteLatencyNano   atomic.Uint64
	MinWriteLatency    atomic.Uint64
	MaxWriteLatency    atomic.Uint64
	CompressionRatio   atomic.Uint64
	CompressedEntries  atomic.Uint64
	SkippedCompression atomic.Uint64
	TotalFiles         atomic.Uint64
	FileRotations      atomic.Uint64
	CheckpointCount    atomic.Uint64
	LastCheckpoint     atomic.Uint64
	ActiveReaders      atomic.Uint64
	ConsumerLag        atomic.Uint64
	ErrorCount         atomic.Uint64
	LastErrorNano      atomic.Uint64
	CompressionWait    atomic.Uint64 // Time waiting for compression
	IndexPersistErrors atomic.Uint64 // Failed index persistence attempts
}

ClientMetrics holds atomic counters for thread-safe metrics tracking

type CometConfig

type CometConfig struct {
	// Compression settings
	Compression CompressionConfig `json:"compression"`

	// Indexing settings
	Indexing IndexingConfig `json:"indexing"`

	// Storage settings
	Storage StorageConfig `json:"storage"`

	// Concurrency settings
	Concurrency ConcurrencyConfig `json:"concurrency"`

	// Retention policy
	Retention RetentionConfig `json:"retention"`
}

CometConfig holds configuration for comet client

func DefaultCometConfig

func DefaultCometConfig() CometConfig

DefaultCometConfig returns sensible defaults optimized for logging workloads

func HighCompressionConfig

func HighCompressionConfig() CometConfig

HighCompressionConfig returns a config optimized for high compression ratios Suitable for text-heavy logs and structured data

func HighThroughputConfig

func HighThroughputConfig() CometConfig

HighThroughputConfig returns a config optimized for high write throughput Trades some memory for better performance

func MultiProcessConfig

func MultiProcessConfig() CometConfig

MultiProcessConfig returns a config suitable for multi-process deployments Enable this for prefork servers or when multiple processes write to the same stream

type CometStats

type CometStats struct {
	// Write metrics
	TotalEntries     uint64 `json:"total_entries"`
	TotalBytes       uint64 `json:"total_bytes"`
	TotalCompressed  uint64 `json:"total_compressed_bytes"`
	WriteLatencyNano uint64 `json:"avg_write_latency_nano"`
	MinWriteLatency  uint64 `json:"min_write_latency_nano"`
	MaxWriteLatency  uint64 `json:"max_write_latency_nano"`

	// Compression metrics
	CompressionRatio   uint64 `json:"compression_ratio_x100"` // x100 for fixed point
	CompressedEntries  uint64 `json:"compressed_entries"`
	SkippedCompression uint64 `json:"skipped_compression"`

	// File management
	TotalFiles      uint64 `json:"total_files"`
	FileRotations   uint64 `json:"file_rotations"`
	CheckpointCount uint64 `json:"checkpoint_count"`
	LastCheckpoint  uint64 `json:"last_checkpoint_nano"`

	// Consumer metrics
	ActiveReaders uint64 `json:"active_readers"`
	ConsumerLag   uint64 `json:"max_consumer_lag_bytes"`

	// Error tracking
	ErrorCount         uint64 `json:"error_count"`
	LastErrorNano      uint64 `json:"last_error_nano"`
	IndexPersistErrors uint64 `json:"index_persist_errors"`

	// Compression metrics
	CompressionWaitNano uint64 `json:"compression_wait_nano"`
}

CometStats tracks key metrics for monitoring comet performance

type CompressedEntry

type CompressedEntry struct {
	Data           []byte
	OriginalSize   uint64
	CompressedSize uint64
	WasCompressed  bool
}

CompressedEntry represents a pre-compressed entry ready for writing

type CompressionConfig

type CompressionConfig struct {
	MinCompressSize int `json:"min_compress_size"` // Don't compress entries smaller than this
}

CompressionConfig controls compression behavior

type ConcurrencyConfig

type ConcurrencyConfig struct {
	EnableMultiProcessMode bool `json:"enable_file_locking"` // Enable file-based locking for multi-process safety
}

ConcurrencyConfig controls multi-process behavior

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer reads from comet stream shards Fields ordered for optimal memory alignment

func NewConsumer

func NewConsumer(client *Client, opts ConsumerOptions) *Consumer

NewConsumer creates a new consumer for comet streams

func (*Consumer) Ack

func (c *Consumer) Ack(ctx context.Context, messageIDs ...MessageID) error

Ack acknowledges one or more processed messages and updates consumer offset

func (*Consumer) AckRange

func (c *Consumer) AckRange(ctx context.Context, shardID uint32, fromEntry, toEntry int64) error

AckRange acknowledges all messages in a contiguous range for a shard This is more efficient than individual acks for bulk processing

func (*Consumer) Close

func (c *Consumer) Close() error

Close closes the consumer and releases all resources

func (*Consumer) GetLag

func (c *Consumer) GetLag(ctx context.Context, shardID uint32) (int64, error)

GetLag returns how many entries behind this consumer group is

func (*Consumer) GetShardStats

func (c *Consumer) GetShardStats(ctx context.Context, shardID uint32) (*StreamStats, error)

GetShardStats returns statistics for a specific shard

func (*Consumer) Process

func (c *Consumer) Process(ctx context.Context, handler ProcessFunc, opts ...ProcessOption) error

Process continuously reads and processes messages from shards. The simplest usage processes all discoverable shards:

err := consumer.Process(ctx, handleMessages)

With options:

err := consumer.Process(ctx, handleMessages,
    comet.WithStream("events:v1:shard:*"),
    comet.WithBatchSize(1000),
    comet.WithErrorHandler(logError),
)

For distributed processing:

err := consumer.Process(ctx, handleMessages,
    comet.WithStream("events:v1:shard:*"),
    comet.WithConsumerAssignment(workerID, totalWorkers),
)

func (*Consumer) Read

func (c *Consumer) Read(ctx context.Context, shards []uint32, count int) ([]StreamMessage, error)

Read reads up to count entries from the specified shards

func (*Consumer) ResetOffset

func (c *Consumer) ResetOffset(ctx context.Context, shardID uint32, entryNumber int64) error

ResetOffset sets the consumer offset to a specific entry number

type ConsumerOptions

type ConsumerOptions struct {
	Group string
}

ConsumerOptions configures a consumer

type EntryIndexNode

type EntryIndexNode struct {
	EntryNumber int64         `json:"entry_number"` // Entry number this node covers
	Position    EntryPosition `json:"position"`     // Position in files
}

EntryIndexNode represents a node in the binary searchable index

type EntryPosition

type EntryPosition struct {
	ByteOffset int64 `json:"byte_offset"` // Byte position within that file (8 bytes)
	FileIndex  int   `json:"file_index"`  // Which file in Files array (8 bytes)
}

EntryPosition tracks where an entry is located Fields ordered for optimal memory alignment

type FileInfo

type FileInfo struct {
	// 64-bit aligned fields first
	StartOffset int64     `json:"start_offset"`
	EndOffset   int64     `json:"end_offset"`
	StartEntry  int64     `json:"start_entry"` // First entry number in this file
	Entries     int64     `json:"entries"`
	StartTime   time.Time `json:"start_time"`
	EndTime     time.Time `json:"end_time"`

	// String last (will use remaining space efficiently)
	Path string `json:"path"`
}

FileInfo tracks a single data file Fields ordered for optimal memory alignment

type IndexingConfig

type IndexingConfig struct {
	BoundaryInterval int `json:"boundary_interval"` // Store boundaries every N entries
	MaxIndexEntries  int `json:"max_index_entries"` // Max boundary entries per shard (0 = unlimited)
}

IndexingConfig controls indexing behavior

type MappedFile

type MappedFile struct {
	FileInfo // Embedded struct (already aligned)
	// contains filtered or unexported fields
}

MappedFile represents a memory-mapped data file with atomic data updates Fields ordered for optimal memory alignment (embedded struct first)

type MessageID

type MessageID struct {
	EntryNumber int64  `json:"entry_number"`
	ShardID     uint32 `json:"shard_id"`
}

MessageID represents a structured message ID Fields ordered for optimal memory alignment: int64 first, then uint32

func ParseMessageID

func ParseMessageID(str string) (MessageID, error)

ParseMessageID parses a string ID back to MessageID

func (MessageID) String

func (id MessageID) String() string

String returns the string representation of the ID (ShardID-EntryNumber format)

type MmapCoordinationState

type MmapCoordinationState struct {
	// Core coordination fields
	ActiveFileIndex atomic.Int64 // Current file being written to
	WriteOffset     atomic.Int64 // Current write position in active file
	FileSize        atomic.Int64 // Current size of active file

	// Performance tracking
	LastWriteNanos atomic.Int64 // Timestamp of last write
	TotalWrites    atomic.Int64 // Total number of writes
	// contains filtered or unexported fields
}

MmapCoordinationState represents shared state for ultra-fast multi-process coordination

type MmapSharedState

type MmapSharedState struct {
	LastUpdateNanos int64 // Nanosecond timestamp of last index change
}

MmapSharedState provides instant multi-process coordination Just 8 bytes - a timestamp indicating when the index was last modified

type MmapWriter

type MmapWriter struct {
	// contains filtered or unexported fields
}

MmapWriter implements ultra-fast memory-mapped writes for multi-process mode

func NewMmapWriter

func NewMmapWriter(shardDir string, maxFileSize int64, index *ShardIndex, metrics *ClientMetrics, rotationLockFile *os.File) (*MmapWriter, error)

NewMmapWriter creates a new memory-mapped writer for a shard

func (*MmapWriter) Close

func (w *MmapWriter) Close() error

Close cleans up resources

func (*MmapWriter) CoordinationState

func (w *MmapWriter) CoordinationState() *MmapCoordinationState

CoordinationState returns the coordination state for external access

func (*MmapWriter) Sync

func (w *MmapWriter) Sync() error

Sync ensures data is persisted to disk

func (*MmapWriter) Write

func (w *MmapWriter) Write(entries [][]byte, entryNumbers []int64) error

Write appends entries using memory-mapped I/O

type ProcessFunc

type ProcessFunc func(messages []StreamMessage) error

ProcessFunc handles a batch of messages, returning error to trigger retry

type ProcessOption

type ProcessOption func(*processConfig)

ProcessOption configures the Process method

func WithAutoAck

func WithAutoAck(enabled bool) ProcessOption

WithAutoAck controls automatic acknowledgment (default: true)

func WithBatchCallback

func WithBatchCallback(callback func(size int, duration time.Duration)) ProcessOption

WithBatchCallback sets a callback after each batch completes

func WithBatchSize

func WithBatchSize(size int) ProcessOption

WithBatchSize sets the number of messages to read at once

func WithConsumerAssignment

func WithConsumerAssignment(id, total int) ProcessOption

WithConsumerAssignment configures distributed processing

func WithErrorHandler

func WithErrorHandler(handler func(err error, retryCount int)) ProcessOption

WithErrorHandler sets a callback for processing errors

func WithMaxRetries

func WithMaxRetries(retries int) ProcessOption

WithMaxRetries sets the number of retry attempts for failed batches

func WithPollInterval

func WithPollInterval(interval time.Duration) ProcessOption

WithPollInterval sets how long to wait when no messages are available

func WithRetryDelay

func WithRetryDelay(delay time.Duration) ProcessOption

WithRetryDelay sets the base delay between retries

func WithShards

func WithShards(shards ...uint32) ProcessOption

WithShards specifies explicit shards to process

func WithStream

func WithStream(pattern string) ProcessOption

WithStream specifies a stream pattern for shard discovery

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader provides memory-mapped read access to a shard Fields ordered for optimal memory alignment

func NewReader

func NewReader(shardID uint32, index *ShardIndex) (*Reader, error)

NewReader creates a new reader for a shard

func (*Reader) Close

func (r *Reader) Close() error

Close unmaps all files and cleans up resources

func (*Reader) ReadEntryAtPosition

func (r *Reader) ReadEntryAtPosition(pos EntryPosition) ([]byte, error)

ReadEntryAtPosition reads a single entry at the given position

type RetentionConfig

type RetentionConfig struct {
	// Time-based retention
	MaxAge time.Duration `json:"max_age"` // Delete files older than this

	// Size-based retention
	MaxTotalSize int64 `json:"max_total_size"` // Total size limit across all shards
	MaxShardSize int64 `json:"max_shard_size"` // Size limit per shard

	// Cleanup behavior
	CleanupInterval   time.Duration `json:"cleanup_interval"`   // How often to run cleanup
	MinFilesToKeep    int           `json:"min_files_to_keep"`  // Always keep at least N files
	ProtectUnconsumed bool          `json:"protect_unconsumed"` // Don't delete unread data
	ForceDeleteAfter  time.Duration `json:"force_delete_after"` // Delete even if unread after this time
}

RetentionConfig defines data retention policies

type RetentionStats

type RetentionStats struct {
	// 8-byte aligned fields first
	TotalSizeBytes int64         `json:"total_size_bytes"`
	TotalSizeGB    float64       `json:"total_size_gb"`
	MaxTotalSizeGB float64       `json:"max_total_size_gb"`
	RetentionAge   time.Duration `json:"retention_age"`
	TotalFiles     int           `json:"total_files"`

	// Pointer fields (8 bytes)
	ShardStats map[uint32]ShardRetentionStats `json:"shard_stats,omitempty"`

	// Larger composite types last (time.Time is 24 bytes)
	OldestData time.Time `json:"oldest_data"`
	NewestData time.Time `json:"newest_data"`
}

RetentionStats provides type-safe retention statistics Fields ordered for optimal memory alignment

type SequenceState

type SequenceState struct {
	LastEntryNumber int64 // Last allocated entry number
}

SequenceState is a memory-mapped structure for atomic entry number generation

type Shard

type Shard struct {
	// contains filtered or unexported fields
}

Shard represents a single stream shard with its own files Fields ordered for optimal memory alignment (64-bit words first)

type ShardIndex

type ShardIndex struct {
	// 64-bit aligned fields first (8 bytes each)
	CurrentEntryNumber int64 `json:"current_entry_number"` // Entry-based tracking (not byte offsets!)
	CurrentWriteOffset int64 `json:"current_write_offset"` // Still track for file management

	// Maps (8 bytes pointer each)
	ConsumerOffsets map[string]int64 `json:"consumer_entry_offsets"` // Consumer tracking by entry number (not bytes!)

	// Composite types
	BinaryIndex BinarySearchableIndex `json:"binary_index"` // Binary searchable index for O(log n) lookups

	// Slices (24 bytes: ptr + len + cap)
	Files []FileInfo `json:"files"` // File management

	// Strings (24 bytes: ptr + len + cap)
	CurrentFile string `json:"current_file"`

	// Smaller fields last
	BoundaryInterval int `json:"boundary_interval"` // 8 bytes
}

ShardIndex tracks files and consumer offsets Fixed to use entry-based addressing instead of byte offsets Fields ordered for optimal memory alignment

type ShardRetentionStats

type ShardRetentionStats struct {
	// 8-byte aligned fields first
	SizeBytes int64 `json:"size_bytes"`
	Files     int   `json:"files"`

	// Pointer field (8 bytes)
	ConsumerLag map[string]int64 `json:"consumer_lag,omitempty"`

	// Larger composite types last (time.Time is 24 bytes)
	OldestEntry time.Time `json:"oldest_entry"`
	NewestEntry time.Time `json:"newest_entry"`
}

ShardRetentionStats provides retention stats for a single shard Fields ordered for optimal memory alignment

type StorageConfig

type StorageConfig struct {
	MaxFileSize    int64 `json:"max_file_size"`      // Maximum size per file before rotation
	CheckpointTime int   `json:"checkpoint_time_ms"` // Checkpoint every N milliseconds
}

StorageConfig controls file storage behavior

type StreamMessage

type StreamMessage struct {
	Stream string    // Stream name/identifier
	ID     MessageID // Unique message ID
	Data   []byte    // Raw message data
}

StreamMessage represents a message read from a stream

type StreamStats

type StreamStats struct {
	// 64-bit aligned fields first
	TotalEntries int64
	TotalBytes   int64
	OldestEntry  time.Time
	NewestEntry  time.Time

	// Composite types
	ConsumerOffsets map[string]int64

	// Smaller fields last
	FileCount int    // 8 bytes
	ShardID   uint32 // 4 bytes

}

StreamStats returns statistics about a shard Fields ordered for optimal memory alignment

type WriteRequest

type WriteRequest struct {
	UpdateFunc   func() error // Function to update index state after successful write (8 bytes)
	WriteBuffers [][]byte     // Buffers to write (24 bytes)
	IDs          []MessageID  // Message IDs for the batch (24 bytes)
}

WriteRequest represents a batch write operation Fields ordered for optimal memory alignment

Directories

Path Synopsis
cmd
test_worker command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL