comet

package module
v1.3.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2025 License: MIT Imports: 24 Imported by: 0

README

☄️ Comet

High-performance embedded segmented log for edge observability. Built for single-digit microsecond latency and bounded resources.

Architecture Guide | Performance Guide | Troubleshooting | Security | API Reference

[!NOTE] This is very much an experiment in vibe coding. While the ideas are sound and the test coverage is robust, you may want to keep that in mind before using it for now.

What is Comet?

Comet is a segmented append-only log optimized for observability data (metrics, logs, traces) at edge locations. It implements the same pattern as Kafka's storage engine - append-only segments with time/size-based retention - but embedded directly in your service with aggressive deletion policies for resource-constrained environments.

Each shard maintains a series of immutable segment files that are rotated at size boundaries and deleted based on retention policies, ensuring predictable resource usage without the complexity of circular buffers or in-place overwrites.

Comet requires local filesystems (ext4, xfs, etc.) for its microsecond latency guarantees. It is unapologetically local. If you need distributed storage, use a proper distributed system like NATS JetStream or Kafka instead.

The Edge Storage Problem

Edge deployments need local observability buffering, but other solutions fall short:

  • Kafka: Requires clusters, complex ops, ~1-5ms latency
  • RocksDB: Single-threaded writes, 50-200μs writes, 100ms+ during compaction stalls
  • Redis: Requires separate server, memory-only without persistence config
  • Ring buffers: No persistence, no compression, data loss on overflow
  • Files + rotation: No indexing, no consumer tracking, manual everything

The gap: No embedded solution with Kafka's reliability at microsecond latencies.

Features

  • Ultra-low latency: Up to 12,634,200 entries/sec with batching (2.4M ops/sec with optimal sharding)
    • Comet uses periodic checkpoints (default: every 1000 writes or 1 second) to persist data to disk. Between checkpoints, writes are acknowledged after being written to the OS page cache.
  • Predictable performance: No compaction stalls or write amplification like LSM-trees
  • True multi-process support: Hybrid coordination (mmap + file locks), crash-safe rotation, real OS processes
  • O(log n) lookups: Binary searchable index with bounded memory usage
  • Lock-free reads: Atomic pointers, zero-copy via mmap with memory safety
  • Automatic retention: Time and size-based cleanup, protects unconsumed data
  • Production ready: Crash recovery, built-in metrics, extensive testing
  • Smart sharding: Consistent hashing, automatic discovery, batch optimizations
  • Optional zstd compression: ~37% storage savings when needed

Multi-Process Coordination

Unlike other embedded solutions, Comet enables true multi-process coordination through memory-mapped state files. Perfect for prefork web servers and multi-process deployments.

  • Automatic shard ownership - Each process owns specific shards based on shardID % processCount == processID
  • Per-shard state files - Each shard has its own comet.state for metrics and recovery
  • Memory-mapped coordination - Lock-free operations through atomic memory access
  • Crash-safe design - State files enable automatic recovery on restart

How Does Comet Compare?

Feature Comet Kafka Redis Streams RocksDB Proof
Write Latency 1.7μs (33μs multi-process) 1-5ms 50-100μs 50-200μs Benchmarks
Multi-Process ✅ Real OS processes ✅ Distributed ❌ Single process ⚠️ Mutex locks Tests
Resource Bounds ✅ Time & size limits ⚠️ JVM heap ⚠️ Memory only ⚠️ Manual compact Retention
Crash Recovery ✅ Automatic ✅ Replicas ⚠️ AOF/RDB ✅ WAL Recovery
Zero Copy Reads ✅ mmap ❌ Network ❌ Serialization ❌ Deserialization Reader
Storage Overhead ~12 bytes/entry ~50 bytes/entry ~20 bytes/entry ~30 bytes/entry Format
Sharding ✅ Built-in ✅ Partitions ❌ Manual ❌ Manual Client
Compression ✅ Optional zstd ✅ Multiple codecs ❌ None ✅ Multiple Config
Embedded ✅ Native ❌ Requires cluster ❌ Requires server ✅ Native -

Quick Start

The Easy Way™

Step 1: Create a client

client, err := comet.NewClient("/var/lib/comet")
defer client.Close()

Step 2: Write your events

// Pick which shard to write to based on a key (for consistent routing)
// High cardinality keys (e.g. uuid) are recommended for consistent routing
stream := client.PickShardStream("events:v1", event.ID, 256)
// This returns something like "events:v1:shard:00A7" based on hash(event.ID) % 256

ids, err := client.Append(ctx, stream, [][]byte{
    []byte(event.ToJSON()),
})

Step 3: Process events

consumer := comet.NewConsumer(client, comet.ConsumerOptions{
    Group: "my-processor",
})

// Process() is the main API - it handles everything for you!
// By default, it discovers and processes ALL shards automatically
err = consumer.Process(ctx, func(ctx context.Context, messages []comet.StreamMessage) error {
    for _, msg := range messages {
        processEvent(msg.Data)  // Your logic here
    }
    return nil  // Success = automatic progress tracking
})
That's it! Comet handles:
  • Compression - Large events compressed automatically
  • Sharding - Load distributed across 16 shards
  • Retries - Failed batches retry automatically
  • Progress - Consumer offsets tracked per shard
  • Cleanup - Old data deleted automatically
  • Recovery - Crash? Picks up where it left off
Want more control? Scale horizontally:
// Deploy this same code across 3 processes:
err = consumer.Process(ctx, processEvents,
    comet.WithStream("events:v1:shard:*"),
    comet.WithConsumerAssignment(workerID, numWorkers),  // This worker + total count
)
// Each worker processes different shards automatically!
// No coordination needed - Comet handles it
Production-Ready Example
// Define your processing function with context support
processEvents := func(ctx context.Context, messages []comet.StreamMessage) error {
    for _, msg := range messages {
        // Check for cancellation
        if ctx.Err() != nil {
            return ctx.Err()
        }
        // Process each message
        if err := handleEvent(msg.Data); err != nil {
            return err // Will trigger retry
        }
    }
    return nil
}

err = consumer.Process(ctx, processEvents,
    comet.WithStream("events:v1:shard:*"),
    comet.WithBatchSize(1000),
    comet.WithPollInterval(50 * time.Millisecond),

    // Optional: Add observability
    comet.WithErrorHandler(func(err error, retryCount int) {
        metrics.Increment("comet.errors", 1)
        log.Printf("Retry %d: %v", retryCount, err)
    }),
    comet.WithBatchCallback(func(size int, duration time.Duration) {
        metrics.Histogram("comet.batch.size", float64(size))
        metrics.Histogram("comet.batch.duration_ms", duration.Milliseconds())
    }),
)
Need to tweak something?
// Only override what you need:
config := comet.DefaultCometConfig()
config.Retention.MaxAge = 24 * time.Hour  // Keep data longer
client, err := comet.NewClient("/var/lib/comet", config)
Configuration Structure
type CometConfig struct {
    Compression CompressionConfig  // Controls compression behavior
    Indexing    IndexingConfig     // Controls indexing and lookup
    Storage     StorageConfig      // Controls file storage
    Concurrency ConcurrencyConfig  // Controls multi-process behavior
    Retention   RetentionConfig    // Controls data retention
}

Architecture

┌─────────────────┐     ┌─────────────────┐
│   Your Service  │     │  Your Service   │
│    Process 0    │     │    Process 1    │
│  ┌───────────┐  │     │  ┌───────────┐  │
│  │   Comet   │  │     │  │   Comet   │  │
│  │  Client   │  │     │  │  Client   │  │
│  └─────┬─────┘  │     │  └─────┬─────┘  │
└────────┼────────┘     └────────┼────────┘
         │                       │
         ▼                       ▼
    ┌──────────────────────────────────┐
    │      Segmented Log Storage       │
    │                                  │
    │  Shard 0: [seg0][seg1][seg2]→    │ ← Process 0
    │           [comet.state]          │
    │  Shard 1: [seg0][seg1]→          │ ← Process 1
    │           [comet.state]          │
    │  Shard 2: [seg0][seg1][seg2]→    │ ← Process 0
    │           [comet.state]          │
    │  Shard 3: [seg0][seg1]→          │ ← Process 1
    │           [comet.state]          │
    │  ...                             │
    │                                  │
    │  ↓ segments deleted by retention │
    └──────────────────────────────────┘

Performance Optimizations

Comet achieves microsecond-level latency through careful optimization:

  1. Lock-Free Reads: Memory-mapped files with atomic pointers and defensive copying for memory safety
  2. Binary Searchable Index: O(log n) entry lookups instead of linear scans
  3. Vectored I/O: Batches multiple writes into single syscalls
  4. Batch ACKs: Groups acknowledgments by shard to minimize lock acquisitions
  5. Pre-allocated Buffers: Reuses buffers to minimize allocations
  6. Concurrent Shards: Each shard has independent locks for parallel operations

How It Works

  1. Append-Only Segments: Data is written to segment files that grow up to MaxFileSize
  2. Segment Rotation: When a segment reaches max size, it's closed and a new one starts
  3. Binary Index: Entry locations are indexed for O(log n) lookups with bounded memory
  4. Retention: Old segments are deleted based on age (MaxAge) or total size limits
  5. Sharding: Load is distributed across multiple independent segmented logs
  6. Index Limits: Index memory is capped by MaxIndexEntries - older entries are pruned

Resource Usage

With default settings:

  • Memory: ~2MB per shard (10k index entries × ~200 bytes/entry)
  • Disk: Bounded by retention policy (MaxShardSize × shard count)
  • CPU: Minimal - compression happens outside locks

Example: 16 shards × 2MB index = 32MB memory for indexes Example: 16 shards × 1GB/shard = 16GB max disk usage

Sharding

Comet uses deterministic sharding for load distribution. Here's how it works:

Writing
// The key (event ID, user ID, tenant, etc.) determines which shard gets the data
stream := client.PickShardStream("events:v1", event.ID, 256)
// Returns "events:v1:shard:0007" (hash("user-123") % 16 = 7)

// The key is ONLY used for routing - it's not stored anywhere!
client.Append(ctx, stream, data)
Reading
// Process ALL shards (recommended)
err = consumer.Process(ctx, handler,
    comet.WithStream("events:v1:shard:*"))  // The * wildcard finds all shards

// Process specific shards only
err = consumer.Process(ctx, handler,
    comet.WithShards(0, 1, 2))  // Only process shards 0, 1, and 2

// Process with advanced options
err = consumer.Process(ctx, handler,
    comet.WithStream("events:v1:shard:*"),
    comet.WithBatchSize(1000),
    comet.WithConsumerAssignment(workerID, 3))  // This is worker 'workerID' of 3 total workers

Use Cases

Right tool for:

  • Edge deployments with limited storage
  • High-frequency observability data (metrics, logs, traces)
  • Recent data access patterns (debugging last N hours)
  • Local buffering before shipping to cloud
  • Multi-service nodes requiring predictable resource usage

Not for:

  • Network filesystems (NFS, CIFS, etc.)
  • Long-term storage (use S3/GCS)
  • Transactional data requiring ACID
  • Random access patterns
  • Complex queries or aggregations

Performance notes

Durability Note: Comet uses periodic checkpoints (default: every 1000 writes or 1 second) to persist metadata to disk. Between checkpoints, writes are acknowledged after being written to the OS page cache. This provides excellent performance while maintaining durability through:

  • OS page cache (typically synced within 30 seconds)
  • Explicit fsync on file rotation
  • Crash recovery that rebuilds state from data files
Performance Metrics
  • ACK performance: 201ns per ACK (5M ACKs/sec) for single ACKs
  • Memory efficiency: 7 allocations per write batch, 4 allocations per ACK
  • Storage overhead: 12 bytes per entry (4-byte length + 8-byte timestamp)

Configuration

Single-Process vs Multi-Process Mode

Comet supports both single-process and multi-process deployments with a unified state management system:

// Single-process mode (default) - fastest performance
client, err := comet.NewClient("/data/streams")

// Multi-process mode - for prefork/multi-process deployments
client, err := comet.NewMultiProcessClient("/data/streams")

How multi-process mode works:

  • Each process owns specific shards based on: shardID % processCount == processID
  • Processes automatically coordinate through memory-mapped state files
  • No configuration needed - just use NewMultiProcessClient()
  • Automatic process ID assignment from a pool

When to use multi-process mode:

  • Process isolation is critical
  • Using prefork web servers (e.g., Go Fiber with prefork)
  • Need independent process scaling
  • You're already batching writes (reduces the latency impact)

When to use single-process mode (default):

  • Single service deployment
  • Don't need process-level isolation
Sharding Configuration and Performance

Comet uses sharding to enable parallel writes and horizontal scaling. Based on extensive benchmarking, here are the optimal configurations:

Performance Benchmarks

With a 3GB memory budget and 16 concurrent threads on 2023 MacBook Air w/ Apple M2 chip:

Configuration Ops/sec Latency Performance vs Default
1 shard × 1GB 748K 1.0μs Baseline
4 shards × 768MB (default) 912K 1.0μs -
16 shards × 192MB 1.3M <1μs +45%
64 shards × 48MB 1.5M <1μs +65%
256 shards × 10MB 2.4M <1μs +168%
1024 shards × 4MB 7.8K 127μs -91% (degradation)
Multi-Process Performance

With 256 shards and varying process counts on 2023 MacBook Air w/ Apple M2 chip:

Processes Shards/Process Ops/sec Performance Gain
1 256 1.3M Baseline
4 64 5.3M 4x
8 32 6.2M 4.8x
16 16 6.4M 4.9x

For single-process deployments:

// Option 1: Use the OptimizedConfig helper
config := comet.OptimizedConfig(256, 3072)  // 256 shards with 3GB memory budget
client, err := comet.NewClient("/data", config)

// Option 2: Manual configuration
config := comet.DefaultCometConfig()
config.Storage.MaxFileSize = 10 << 20  // 10MB

// Use 256 shards when creating streams
stream := client.PickShardStream("events:v1", event.ID, 256)

For multi-process deployments:

// 4-8 processes with 256 total shards works best
config := comet.MultiProcessConfig()
config.Storage.MaxFileSize = 10 << 20  // 10MB
config.Concurrency.ProcessCount = 4    // 64 shards per process
Sharding Best Practices
  1. Choose shard count upfront: Changing shard count requires data migration
  2. Use high-cardinality keys: UUIDs, user IDs, or request IDs for even distribution
  3. Powers of 2: Use 16, 64, 256 shards for optimal hash distribution
  4. File size scaling: Smaller files (10-50MB) work better with many shards
  5. Memory budget: Plan for memory mapped capacity for optimal performance
How Sharding Works
// Helper functions for shard management
stream := client.PickShardStream("events:v1", uniqueKey, 256) // One-liner
shardID := client.PickShard(uniqueKey, 256)                   // Get shard ID
streamName := comet.ShardStreamName("events:v1", shardID)     // "events:v1:00FF"

// In multi-process mode, PickShard returns only shards owned by this process
// In single-process mode, returns any shard from 0 to shardCount-1

// Get all shards for parallel processing
shardIDs := comet.AllShardsRange(256)                   // [0, 1, ..., 255]
streams := comet.AllShardStreams("events", "v1", 256)   // All stream names

// Consumers automatically discover all shards
consumer.Process(ctx, handler,
    comet.WithStream("events:v1:shard:*"))  // Wildcard pattern

License

MIT

Documentation

Overview

Example (CustomLogger)
package main

import (
	"log/slog"
	"os"
	"path/filepath"

	"github.com/orbiterhq/comet"
)

func main() {
	// Example: Using the default logger (writes to stderr)
	config := comet.DefaultCometConfig()
	config.Log.Level = "info" // Options: debug, info, warn, error, none

	// Example: Using slog
	slogger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
		Level: slog.LevelDebug,
	}))
	config.Log.Logger = comet.NewSlogAdapter(slogger)

	// Example: Disabling logs entirely
	config.Log.Logger = comet.NoOpLogger{}

	// Create client with configured logging
	client, err := comet.NewClient(filepath.Join(os.TempDir(), "logger"), config)
	if err != nil {
		panic(err)
	}
	defer client.Close()
}

Index

Examples

Constants

View Source
const (
	// ConsumerOffsetMagic identifies a valid consumer offset file
	ConsumerOffsetMagic = 0xC0FE0FF5

	// ConsumerOffsetVersion for format changes
	ConsumerOffsetVersion = 1

	// MaxConsumerGroups supported per shard (must be power of 2 for efficient hashing)
	MaxConsumerGroups = 512

	// ConsumerGroupNameSize maximum bytes for group name
	ConsumerGroupNameSize = 48

	// ConsumerOffsetFileSize total size of mmap file
	ConsumerOffsetFileSize = 64 + (MaxConsumerGroups * 128) // 64KB + 64 bytes header
)
View Source
const (
	CometStateVersion1 = 1
	CometStateSize     = 1024
)

Constants for the unified state

View Source
const MaxShardCount = uint32(256)

MaxShardCount is the maximum number of shards supported (0-255)

Variables

View Source
var ErrStopProcessing = errors.New("stop processing")

ErrStopProcessing is a sentinel error that can be returned from ProcessFunc to gracefully stop processing while still ACKing the current batch

Functions

func AllShardStreams

func AllShardStreams(prefix string, shardCount uint32) []string

AllShardStreams returns all stream names for the given prefix and shard count

func AllShardsRange

func AllShardsRange(shardCount uint32) []uint32

AllShardsRange returns a slice containing all shard IDs from 0 to shardCount-1

func GetProcessID added in v1.3.0

func GetProcessID(shmFile ...string) int

GetProcessID returns a unique process ID (0 to N-1) for multi-process deployments. It uses a shared memory file to coordinate process ID assignment across multiple processes that may start in any order.

This is useful for deployments where you don't have explicit control over process startup order, such as: - systemd with multiple service instances - Container orchestration (Kubernetes, Docker Swarm) - Process managers (PM2, Supervisor) - Manual process spawning

The function will return: - 0 to (NumCPU-1): Successfully acquired process ID - -1: Failed to acquire a process ID (all slots taken or error)

Example usage:

processID := comet.GetProcessID()
if processID < 0 {
    log.Fatal("Failed to acquire process ID")
}
config := comet.MultiProcessConfig(processID, runtime.NumCPU())
client, err := comet.NewClient(dataDir, config)

func GetProcessIDWithFile added in v1.3.0

func GetProcessIDWithFile(shmFile string) int

GetProcessIDWithFile is like GetProcessID but allows specifying a custom shared memory file. This is useful when running multiple independent Comet deployments on the same machine.

func IsDebug added in v1.1.1

func IsDebug() bool

IsDebug returns whether debug mode is enabled (thread-safe)

func ReleaseProcessID added in v1.3.0

func ReleaseProcessID(shmFile_ ...string)

ReleaseProcessID releases the process ID when shutting down gracefully. This is optional but helps with faster slot reuse.

func SetDebug added in v1.1.1

func SetDebug(enabled bool)

SetDebug allows runtime control of debug mode

func ShardStreamName

func ShardStreamName(prefix string, shardID uint32) string

ShardStreamName constructs a stream name from prefix and shard ID Example: ShardStreamName("events:v1", 255) returns "events:v1:0011"

Types

type AtomicSlice

type AtomicSlice struct {
	// contains filtered or unexported fields
}

AtomicSlice provides atomic access to a byte slice using atomic.Value

func (*AtomicSlice) Load

func (a *AtomicSlice) Load() []byte

Load atomically loads the slice

func (*AtomicSlice) Store

func (a *AtomicSlice) Store(data []byte)

Store atomically stores a new slice

type BinarySearchableIndex

type BinarySearchableIndex struct {
	// Sorted slice of index nodes for binary search
	Nodes []EntryIndexNode `json:"nodes"`
	// Interval between indexed entries (default: 1000)
	IndexInterval int `json:"index_interval"`
	// Maximum number of nodes to keep (0 = unlimited)
	MaxNodes int `json:"max_nodes"`
}

BinarySearchableIndex provides O(log n) entry lookups

func (*BinarySearchableIndex) AddIndexNode

func (idx *BinarySearchableIndex) AddIndexNode(entryNumber int64, position EntryPosition)

AddIndexNode adds a node to the binary searchable index

func (*BinarySearchableIndex) FindEntryPosition added in v1.3.0

func (idx *BinarySearchableIndex) FindEntryPosition(targetEntry int64) (EntryPosition, bool)

FindEntryPosition uses binary search to find the position of an entry

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client implements a local file-based stream client with append-only storage

func NewClient

func NewClient(dataDir string, config ...CometConfig) (*Client, error)

NewClient creates a new comet client with custom configuration

func NewMultiProcessClient added in v1.3.0

func NewMultiProcessClient(dataDir string, cfg ...CometConfig) (*Client, error)

NewMultiProcessClient creates a new comet client with automatic multi-process coordination. It uses the default shared memory file for process ID coordination. The process ID is automatically released when the client is closed.

Example usage:

client, err := comet.NewMultiProcessClient("./data")
if err != nil {
    log.Fatal(err)
}
defer client.Close() // Automatically releases process ID

func (*Client) Append

func (c *Client) Append(ctx context.Context, stream string, entries [][]byte) ([]MessageID, error)

Append adds entries to a stream shard (append-only semantics)

func (*Client) Close

func (c *Client) Close() error

Close gracefully shuts down the client

func (*Client) ForceRetentionCleanup

func (c *Client) ForceRetentionCleanup()

ForceRetentionCleanup forces an immediate retention cleanup pass This is primarily useful for testing retention behavior

func (*Client) GetRetentionStats

func (c *Client) GetRetentionStats() *RetentionStats

GetRetentionStats returns current retention statistics

func (*Client) GetShardDiagnostics added in v1.3.8

func (c *Client) GetShardDiagnostics(shardID uint32) *ShardDiagnostics

GetShardDiagnostics returns detailed diagnostics for a specific shard

func (*Client) GetShardStats added in v1.3.0

func (c *Client) GetShardStats(shardID uint32) (map[string]any, error)

GetShardStats returns detailed statistics for a specific shard

func (*Client) GetStats

func (c *Client) GetStats() CometStats

GetStats returns current metrics for monitoring

func (*Client) GetUnhealthyShards added in v1.3.8

func (c *Client) GetUnhealthyShards() []ShardDiagnostics

GetUnhealthyShards returns diagnostics for all unhealthy shards

func (*Client) Health added in v1.1.1

func (c *Client) Health() CometHealth

Health returns basic health status

func (*Client) Len

func (c *Client) Len(ctx context.Context, stream string) (int64, error)

Len returns the number of entries in a stream shard

func (*Client) ListRecent added in v1.2.0

func (c *Client) ListRecent(ctx context.Context, streamName string, limit int) ([]StreamMessage, error)

ListRecent returns the N most recent messages from a stream This is a browse operation that doesn't affect consumer offsets

func (*Client) PickShard added in v1.3.1

func (c *Client) PickShard(key string, shardCount uint32) uint32

PickShard selects a shard ID based on consistent hashing of the key In multi-process mode, returns a shard owned by this process

func (*Client) PickShardStream added in v1.3.1

func (c *Client) PickShardStream(prefix string, key string, shardCount uint32) string

PickShardStream returns a complete stream name for the shard picked by key Example: client.PickShardStream("events:v1", "user123", 256) returns "events:v1:0255" In multi-process mode, this will only pick from shards owned by this client

func (*Client) ScanAll added in v1.2.0

func (c *Client) ScanAll(ctx context.Context, streamName string, fn func(context.Context, StreamMessage) bool) error

ScanAll scans all entries in a stream, calling fn for each message Return false from fn to stop scanning early This operation bypasses consumer groups and doesn't affect offsets

func (*Client) Sync

func (c *Client) Sync(ctx context.Context) error

Sync ensures all buffered data is durably written to disk

type ClientMetrics

type ClientMetrics struct {
	// Write metrics
	WritesTotal        atomic.Uint64
	BytesWritten       atomic.Uint64
	WriteErrors        atomic.Uint64
	WriteLatencyNanos  atomic.Uint64
	CompressionSaves   atomic.Uint64
	CompressionSkipped atomic.Uint64

	// Read metrics
	ReadsTotal       atomic.Uint64
	BytesRead        atomic.Uint64
	ReadErrors       atomic.Uint64
	ConsumerLagTotal atomic.Uint64

	// File metrics
	FileRotations atomic.Uint64
	FilesCreated  atomic.Uint64

	// Additional metrics
	LastErrorNano      atomic.Int64
	TotalEntries       atomic.Uint64
	FilesDeleted       atomic.Uint64
	CheckpointsWritten atomic.Uint64

	// Process coordination metrics
	ShardConflicts    atomic.Uint64
	LockWaitNanos     atomic.Uint64
	RecoveryAttempts  atomic.Uint64
	RecoverySuccesses atomic.Uint64

	// Index persistence errors
	IndexPersistErrors atomic.Uint64
	ErrorCount         atomic.Uint64

	// Consumer metrics
	AckCount         atomic.Uint64
	ActiveConsumers  atomic.Uint64
	ConsumerTimeouts atomic.Uint64
	ConsumerResets   atomic.Uint64

	// Retention metrics
	RetentionRuns    atomic.Uint64
	FilesCleanedUp   atomic.Uint64
	BytesCleanedUp   atomic.Uint64
	RetentionErrors  atomic.Uint64
	RetentionSkipped atomic.Uint64
}

ClientMetrics tracks global client metrics

type CometConfig

type CometConfig struct {
	// Write mode
	WriteMode WriteMode `json:"write_mode"`

	// Compression settings
	Compression CompressionConfig `json:"compression"`

	// Indexing settings
	Indexing IndexingConfig `json:"indexing"`

	// Storage settings
	Storage StorageConfig `json:"storage"`

	// Concurrency settings
	Concurrency ConcurrencyConfig `json:"concurrency"`

	// Retention policy
	Retention RetentionConfig `json:"retention"`
	// Logging configuration
	Log LogConfig `json:"log"`

	// Reader settings
	Reader ReaderConfig `json:"reader"`
}

CometConfig represents the complete comet configuration

func DefaultCometConfig

func DefaultCometConfig() CometConfig

DefaultCometConfig returns sensible defaults optimized for logging workloads

func DeprecatedMultiProcessConfig added in v1.3.0

func DeprecatedMultiProcessConfig(processID, processCount int) CometConfig

DeprecatedMultiProcessConfig creates a config for multi-process mode with N processes

func HighCompressionConfig

func HighCompressionConfig() CometConfig

HighCompressionConfig returns a config optimized for compression ratio

func HighThroughputConfig

func HighThroughputConfig() CometConfig

HighThroughputConfig returns a config optimized for write throughput

func MultiProcessConfig

func MultiProcessConfig(sharedMemoryFile ...string) CometConfig

MultiProcessConfig returns a configuration optimized for multi-process deployments. It automatically acquires a unique process ID and configures the client for multi-process mode. The process ID is automatically released when the client is closed.

func OptimizedConfig added in v1.3.0

func OptimizedConfig(shardCount int, memoryBudget int) CometConfig

OptimizedConfig returns a configuration optimized for a specific number of shards. Based on benchmarking, it adjusts file size and other parameters for optimal performance.

Recommended shard counts:

  • 16 shards: Good for small deployments
  • 64 shards: Balanced performance
  • 256 shards: Optimal for high throughput (2.4M ops/sec)

Example:

config := comet.OptimizedConfig(256, 3072) // For 256 shards with 3GB memory budget
client, err := comet.NewClient("/data", config)
Example

ExampleOptimizedConfig demonstrates using the optimized configuration

package main

import (
	"os"
	"path/filepath"

	"github.com/orbiterhq/comet"
)

func main() {
	// Create a client optimized for 256 shards with 3GB memory budget
	cfg := comet.OptimizedConfig(256, 3072)
	client, err := comet.NewClient(filepath.Join(os.TempDir(), "comet"), cfg)
	if err != nil {
		panic(err)
	}
	defer client.Close()

	// Use 256 shards when writing
	stream := client.PickShardStream("events:v1", "user-123", 256)
	// Write to the stream...
	_ = stream
}

type CometHealth added in v1.3.0

type CometHealth struct {
	Healthy       bool      `json:"healthy"`
	Status        string    `json:"status"`
	ActiveShards  int       `json:"active_shards"`
	Details       string    `json:"details"`
	Uptime        int64     `json:"uptime_seconds"`
	WritesOK      bool      `json:"writes_ok"`
	ReadsOK       bool      `json:"reads_ok"`
	LastWriteTime time.Time `json:"last_write_time"`
	ErrorCount    int64     `json:"error_count"`
}

CometHealth represents the health status of the client

type CometState added in v1.1.1

type CometState struct {
	// ======== Header (0-63): Version and core state ========
	Version         uint64 // 0-7:   Format version (start with 1)
	WriteOffset     uint64 // 8-15:  Current write position in active file
	LastEntryNumber int64  // 16-23: Sequence counter for entry IDs
	LastIndexUpdate int64  // 24-31: Timestamp of last index modification

	LastFileSequence uint64 // 48-55: Sequence counter for file naming

	TotalBytes       uint64 // 72-79:  Total uncompressed bytes
	TotalWrites      uint64 // 80-87:  Total write operations
	LastWriteNanos   int64  // 88-95:  Timestamp of last write
	CurrentBatchSize uint64 // 96-103: Current batch being built
	TotalBatches     uint64 // 104-111: Total batches written
	FailedWrites     uint64 // 112-119: Write failures

	// ======== Cache Line 2 (128-191): Compression metrics ========
	TotalCompressed      uint64 // 128-135: Total compressed bytes
	CompressedEntries    uint64 // 136-143: Number of compressed entries
	SkippedCompression   uint64 // 144-151: Entries too small to compress
	CompressionRatio     uint64 // 152-159: Average ratio * 100
	CompressionTimeNanos int64  // 160-167: Total time compressing

	// ======== Cache Line 3 (192-255): Latency metrics ========
	WriteLatencySum   uint64 // 192-199: Sum for averaging
	WriteLatencyCount uint64 // 200-207: Count for averaging
	MinWriteLatency   uint64 // 208-215: Minimum seen
	MaxWriteLatency   uint64 // 216-223: Maximum seen
	SyncLatencyNanos  int64  // 224-231: Time spent in fsync

	// ======== Cache Line 4 (256-319): File operation metrics ========
	FilesCreated      uint64 // 256-263: Total files created
	FilesDeleted      uint64 // 264-271: Files removed by retention
	FileRotations     uint64 // 272-279: Successful rotations
	RotationTimeNanos int64  // 280-287: Time spent rotating
	CurrentFiles      uint64 // 288-295: Current file count
	TotalFileBytes    uint64 // 296-303: Total size on disk
	FailedRotations   uint64 // 304-311: Rotation failures
	SyncCount         uint64 // 312-319: Total sync operations

	// ======== Cache Line 5 (320-383): Checkpoint/Index metrics ========
	CheckpointCount     uint64 // 320-327: Total checkpoints
	LastCheckpointNanos int64  // 328-335: Last checkpoint time
	CheckpointTimeNanos int64  // 336-343: Total checkpoint time
	IndexPersistCount   uint64 // 344-351: Index saves
	IndexPersistErrors  uint64 // 352-359: Failed index saves
	BinaryIndexNodes    uint64 // 360-367: Nodes in binary index

	// ======== Cache Line 6 (384-447): Consumer metrics ========
	ActiveReaders    uint64 // 384-391: Current reader count
	TotalReaders     uint64 // 392-399: Total readers created
	MaxConsumerLag   uint64 // 400-407: Max entries behind
	TotalEntriesRead uint64 // 408-415: Total read operations
	ConsumerGroups   uint64 // 416-423: Active consumer groups
	AckedEntries     uint64 // 424-431: Acknowledged entries
	ReaderCacheHits  uint64 // 432-439: Cache hit count

	// ======== Cache Line 7 (448-511): Error/Recovery metrics ========
	ErrorCount         uint64 // 448-455: Total errors
	LastErrorNanos     int64  // 456-463: Last error timestamp
	CorruptionDetected uint64 // 464-471: Corrupted entries found
	RecoveryAttempts   uint64 // 472-479: Auto-recovery attempts
	RecoverySuccesses  uint64 // 480-487: Successful recoveries
	PartialWrites      uint64 // 488-495: Incomplete write detected
	ReadErrors         uint64 // 496-503: Read failures

	// ======== Cache Lines 8-9 (512-639): Retention metrics ========
	RetentionRuns        uint64 // 512-519: Cleanup executions
	LastRetentionNanos   int64  // 520-527: Last cleanup time
	RetentionTimeNanos   int64  // 528-535: Total cleanup time
	EntriesDeleted       uint64 // 536-543: Entries removed
	BytesReclaimed       uint64 // 544-551: Space freed
	OldestEntryNanos     int64  // 552-559: Oldest data timestamp
	RetentionErrors      uint64 // 560-567: Cleanup failures
	ProtectedByConsumers uint64 // 568-575: Files kept for consumers

	// ======== Cache Lines 10-11 (640-767): Reader cache metrics ========
	ReaderFileMaps    uint64 // 640-647: Files mapped into memory
	ReaderFileUnmaps  uint64 // 648-655: Files unmapped from memory
	ReaderCacheBytes  uint64 // 656-663: Current cache memory usage
	ReaderMappedFiles uint64 // 664-671: Current number of mapped files
	ReaderFileRemaps  uint64 // 672-679: File remappings due to growth
	ReaderCacheEvicts uint64 // 680-687: Files evicted due to pressure
	// contains filtered or unexported fields
}

CometState consolidates ALL mmap state and comprehensive metrics Stored in: comet.state (1KB file per shard) Total size: 1024 bytes (1KB) for plenty of room to grow

Design principles: 1. Group related metrics in same cache line 2. Hot path metrics in early cache lines 3. Generous reserved space for future additions 4. All fields are atomic-safe for multi-process access 5. CRITICAL: Use raw int64/uint64 fields with atomic operations, NOT atomic.Int64

func (*CometState) AddErrorCount added in v1.2.0

func (s *CometState) AddErrorCount(delta uint64) uint64

ErrorCount methods - simple atomics since processes own shards exclusively

func (*CometState) AddFailedWrites added in v1.2.0

func (s *CometState) AddFailedWrites(delta uint64) uint64

FailedWrites methods - simple atomics since processes own shards exclusively

func (*CometState) AddFileRotations added in v1.1.1

func (s *CometState) AddFileRotations(delta uint64) uint64

FileRotations methods - simple atomics since processes own shards exclusively

func (*CometState) AddFilesCreated added in v1.1.1

func (s *CometState) AddFilesCreated(delta uint64) uint64

FilesCreated methods - simple atomics since processes own shards exclusively

func (*CometState) AddLastFileSequence added in v1.1.1

func (s *CometState) AddLastFileSequence(delta uint64) uint64

LastFileSequence methods

func (*CometState) AddTotalBytes added in v1.1.1

func (s *CometState) AddTotalBytes(delta uint64) uint64

TotalBytes methods - simple atomics since processes own shards exclusively

func (*CometState) AddTotalWrites added in v1.1.1

func (s *CometState) AddTotalWrites(delta uint64) uint64

func (*CometState) AddWriteLatencyCount added in v1.1.1

func (s *CometState) AddWriteLatencyCount(delta uint64) uint64

WriteLatencyCount methods

func (*CometState) AddWriteLatencySum added in v1.1.1

func (s *CometState) AddWriteLatencySum(delta uint64) uint64

WriteLatencySum methods

func (*CometState) AddWriteOffset added in v1.1.1

func (s *CometState) AddWriteOffset(delta uint64) uint64

func (*CometState) AllocateEntryNumbers added in v1.3.0

func (s *CometState) AllocateEntryNumbers(count int) int64

AllocateEntryNumbers atomically reserves a batch of entry numbers

func (*CometState) GetAverageWriteLatency added in v1.1.1

func (s *CometState) GetAverageWriteLatency() uint64

Computed metrics helpers

func (*CometState) GetCompressionRatioFloat added in v1.1.1

func (s *CometState) GetCompressionRatioFloat() float64

func (*CometState) GetErrorRate added in v1.1.1

func (s *CometState) GetErrorRate() float64

func (*CometState) GetLastEntryNumber added in v1.1.1

func (s *CometState) GetLastEntryNumber() int64

Helper methods for atomic operations on uint64 fields

func (*CometState) GetLastIndexUpdate added in v1.1.1

func (s *CometState) GetLastIndexUpdate() int64

func (*CometState) GetLastWriteNanos added in v1.1.1

func (s *CometState) GetLastWriteNanos() int64

LastWriteNanos methods

func (*CometState) GetMaxWriteLatency added in v1.1.1

func (s *CometState) GetMaxWriteLatency() uint64

MaxWriteLatency methods

func (*CometState) GetMinWriteLatency added in v1.1.1

func (s *CometState) GetMinWriteLatency() uint64

MinWriteLatency methods

func (*CometState) GetTotalWrites added in v1.1.1

func (s *CometState) GetTotalWrites() uint64

TotalWrites methods - simple atomics since processes own shards exclusively

func (*CometState) GetVersion added in v1.1.1

func (s *CometState) GetVersion() uint64

Helper methods for non-atomic version field

func (*CometState) GetWriteOffset added in v1.1.1

func (s *CometState) GetWriteOffset() uint64

WriteOffset methods

func (*CometState) IncrementLastEntryNumber added in v1.1.1

func (s *CometState) IncrementLastEntryNumber() int64

func (*CometState) SetLastIndexUpdate added in v1.1.1

func (s *CometState) SetLastIndexUpdate(nanos int64)

func (*CometState) SetVersion added in v1.1.1

func (s *CometState) SetVersion(v uint64)

func (*CometState) StoreLastWriteNanos added in v1.1.1

func (s *CometState) StoreLastWriteNanos(val int64)

func (*CometState) StoreWriteOffset added in v1.1.1

func (s *CometState) StoreWriteOffset(val uint64)

type CometStats

type CometStats struct {
	// Core metrics
	TotalEntries        int64            `json:"total_entries"`
	TotalBytes          int64            `json:"total_bytes"`
	TotalCompressed     int64            `json:"total_compressed"`
	CompressedEntries   int64            `json:"compressed_entries"`
	SkippedCompression  int64            `json:"skipped_compression"`
	FileRotations       int64            `json:"file_rotations"`
	CompressionWaitNano int64            `json:"compression_wait_nano"`
	ConsumerGroups      int              `json:"consumer_groups"`
	ConsumerOffsets     map[string]int64 `json:"consumer_offsets"`
	WriteThroughput     float64          `json:"write_throughput_mbps"`
	CompressionRatio    float64          `json:"compression_ratio"`
	OpenReaders         int64            `json:"open_readers"`
	MaxLag              int64            `json:"max_lag"`
	FileCount           int              `json:"file_count"`
	IndexSize           int64            `json:"index_size"`
	UptimeSeconds       int64            `json:"uptime_seconds"`

	// Production diagnostics (calculated on-the-fly)
	WriteErrors       int64            `json:"write_errors"`
	ReadErrors        int64            `json:"read_errors"`
	TotalErrors       int64            `json:"total_errors"`
	WritesTotal       int64            `json:"writes_total"`
	ReadsTotal        int64            `json:"reads_total"`
	RecoveryAttempts  int64            `json:"recovery_attempts"`
	RecoverySuccesses int64            `json:"recovery_successes"`
	AvgWriteLatencyNs int64            `json:"avg_write_latency_ns"`
	GoroutineCount    int              `json:"goroutine_count"`
	ConsumerLags      map[string]int64 `json:"consumer_lags"`
	TotalFileBytes    int64            `json:"total_file_bytes"`
	FailedWrites      int64            `json:"failed_writes"`
	FailedRotations   int64            `json:"failed_rotations"`
	SyncCount         int64            `json:"sync_count"`
	ErrorRate         float64          `json:"error_rate_per_sec"`
	WriteRate         float64          `json:"write_rate_per_sec"`
	ReadRate          float64          `json:"read_rate_per_sec"`
}

CometStats provides runtime statistics

type CompressedEntry

type CompressedEntry struct {
	Data           []byte
	OriginalSize   uint64
	CompressedSize uint64
	WasCompressed  bool
}

CompressedEntry represents a pre-compressed entry ready for writing

type CompressionConfig

type CompressionConfig struct {
	MinCompressSize int `json:"min_compress_size"` // Minimum size to compress (bytes)
}

CompressionConfig controls compression behavior

type ConcurrencyConfig

type ConcurrencyConfig struct {
	// Process-level shard ownership (simplifies multi-process coordination)
	// When ProcessCount > 1, multi-process mode is automatically enabled
	ProcessID    int    `json:"process_id"`    // This process's ID (0-based)
	ProcessCount int    `json:"process_count"` // Total number of processes (0 = single-process)
	SHMFile      string `json:"shm_file"`      // Shared memory file path (empty = os.TempDir()/comet-worker-slots-shm)
}

ConcurrencyConfig controls multi-process behavior

func (ConcurrencyConfig) IsMultiProcess added in v1.3.0

func (c ConcurrencyConfig) IsMultiProcess() bool

IsMultiProcess returns true if running in multi-process mode

func (ConcurrencyConfig) Owns added in v1.3.0

func (c ConcurrencyConfig) Owns(shardID uint32) bool

Owns checks if this process owns a particular shard

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer reads from comet stream shards Fields ordered for optimal memory alignment

func NewConsumer

func NewConsumer(client *Client, opts ConsumerOptions) *Consumer

NewConsumer creates a new consumer for comet streams

func (*Consumer) Ack

func (c *Consumer) Ack(ctx context.Context, messageIDs ...MessageID) error

Ack acknowledges one or more processed messages and updates consumer offset

func (*Consumer) AckRange

func (c *Consumer) AckRange(ctx context.Context, shardID uint32, fromEntry, toEntry int64) error

AckRange acknowledges all messages in a contiguous range for a shard This is more efficient than individual acks for bulk processing IMPORTANT: This method only allows ACKing messages that have been read by this consumer

func (*Consumer) Close

func (c *Consumer) Close() error

Close closes the consumer and releases all resources

func (*Consumer) FilterDuplicates added in v1.3.0

func (c *Consumer) FilterDuplicates(messages []StreamMessage) []StreamMessage

FilterDuplicates removes already-processed messages from a batch, returning only new messages

func (*Consumer) FlushACKs added in v1.3.0

func (c *Consumer) FlushACKs(ctx context.Context) error

FlushACKs forces immediate persistence of all consumer offsets. This is useful for ensuring durability before shutdown or tests.

func (*Consumer) GetLag

func (c *Consumer) GetLag(ctx context.Context, shardID uint32) (int64, error)

GetLag returns how many entries behind this consumer group is

func (*Consumer) GetShardStats

func (c *Consumer) GetShardStats(ctx context.Context, shardID uint32) (*StreamStats, error)

GetShardStats returns statistics for a specific shard

func (*Consumer) MarkBatchProcessed added in v1.3.0

func (c *Consumer) MarkBatchProcessed(messages []StreamMessage)

MarkBatchProcessed marks all messages in a batch as processed

func (*Consumer) Process

func (c *Consumer) Process(ctx context.Context, handler ProcessFunc, opts ...ProcessOption) error

Process continuously reads and processes messages from shards. This is the high-level API for stream processing - handles discovery, retries, and ACKing automatically.

The simplest usage processes all discoverable shards:

err := consumer.Process(ctx, handleMessages)

Recommended usage with explicit configuration:

err := consumer.Process(ctx, handleMessages,
    comet.WithStream("events:v1:shard:*"),     // Wildcard pattern for shard discovery
    comet.WithBatchSize(1000),                 // Process up to 1000 messages at once
    comet.WithErrorHandler(logError),          // Handle processing errors
    comet.WithAutoAck(true),                   // Automatically ACK successful batches
)

For distributed processing across multiple consumer instances:

err := consumer.Process(ctx, handleMessages,
    comet.WithStream("events:v1:shard:*"),
    comet.WithConsumerAssignment(workerID, totalWorkers), // Distribute shards across workers
)

Stream patterns must end with ":*" for wildcard matching, e.g.: - "events:v1:shard:*" matches events:v1:shard:0000, events:v1:shard:0001, etc. - "logs:*:*:*" matches any 4-part stream name starting with "logs"

func (*Consumer) Read

func (c *Consumer) Read(ctx context.Context, shards []uint32, count int) ([]StreamMessage, error)

Read reads up to count entries from the specified shards starting from the consumer group's current offset. This is a low-level method for manual message processing - you probably want Process() instead.

Key differences from Process(): - Read() is one-shot, Process() is continuous - Read() requires manual ACKing, Process() can auto-ACK - Read() uses explicit shard IDs, Process() can use wildcards - Read() has no retry logic, Process() has configurable retries

func (*Consumer) ResetOffset

func (c *Consumer) ResetOffset(ctx context.Context, shardID uint32, entryNumber int64) error

ResetOffset sets the consumer offset to a specific entry number

func (*Consumer) Sync added in v1.3.0

func (c *Consumer) Sync(ctx context.Context) error

Sync forces immediate persistence of any pending ACKs

type ConsumerEntry added in v1.3.7

type ConsumerEntry struct {
	// Cache line 1 (64 bytes)
	GroupName  [ConsumerGroupNameSize]byte // 48 bytes - null-terminated string
	Offset     int64                       // 8 bytes - atomic access
	LastUpdate int64                       // 8 bytes - unix nano timestamp

	// Cache line 2 (64 bytes)
	AckCount uint64    // 8 bytes - total ACKs for metrics
	Reserved [56]uint8 // 56 bytes - future use
}

ConsumerEntry represents a single consumer group's offset Sized to exactly 128 bytes (2 cache lines) for alignment

type ConsumerOffsetHeader added in v1.3.7

type ConsumerOffsetHeader struct {
	Version  uint32    // 4 bytes
	Magic    uint32    // 4 bytes
	Reserved [56]uint8 // 56 bytes padding to 64 bytes
}

ConsumerOffsetHeader is the header of the memory-mapped file

type ConsumerOffsetMmap added in v1.3.7

type ConsumerOffsetMmap struct {
	// contains filtered or unexported fields
}

ConsumerOffsetMmap provides lock-free access to consumer offsets

func NewConsumerOffsetMmap added in v1.3.7

func NewConsumerOffsetMmap(shardPath string, shardID uint32) (*ConsumerOffsetMmap, error)

NewConsumerOffsetMmap creates or opens a memory-mapped consumer offset file

func (*ConsumerOffsetMmap) Close added in v1.3.7

func (c *ConsumerOffsetMmap) Close() error

Close unmaps the file and closes it

func (*ConsumerOffsetMmap) Get added in v1.3.7

func (c *ConsumerOffsetMmap) Get(group string) (int64, bool)

Get returns the offset for a consumer group

func (*ConsumerOffsetMmap) GetAll added in v1.3.7

func (c *ConsumerOffsetMmap) GetAll() map[string]int64

GetAll returns all consumer offsets

func (*ConsumerOffsetMmap) GetStats added in v1.3.7

func (c *ConsumerOffsetMmap) GetStats() (used, total int)

GetStats returns statistics about consumer offset usage

func (*ConsumerOffsetMmap) Remove added in v1.3.7

func (c *ConsumerOffsetMmap) Remove(group string)

Remove removes a consumer group (for testing/admin)

func (*ConsumerOffsetMmap) Set added in v1.3.7

func (c *ConsumerOffsetMmap) Set(group string, offset int64) error

Set updates the offset for a consumer group

type ConsumerOptions

type ConsumerOptions struct {
	Group         string
	ConsumerID    int // 0, 1, 2, etc. for deterministic shard assignment
	ConsumerCount int // Total consumers in this group for deterministic shard assignment
}

ConsumerOptions configures a consumer

type EntryIndexMetadata added in v1.3.0

type EntryIndexMetadata struct {
	EntryNumber int64         `json:"entry_number"`
	Position    EntryPosition `json:"position"`
}

EntryIndexMetadata represents a stored index node

type EntryIndexNode

type EntryIndexNode struct {
	EntryNumber int64         `json:"entry_number"` // Entry number this node covers
	Position    EntryPosition `json:"position"`     // Position in files
}

EntryIndexNode represents a node in the binary searchable index

type EntryPosition

type EntryPosition struct {
	FileIndex  int   `json:"file_index"`  // Index in the Files array
	ByteOffset int64 `json:"byte_offset"` // Byte offset within the file
}

EntryPosition represents the location of an entry in the file system EntryPosition represents the location of an entry in the file system

type FileInfo

type FileInfo struct {
	// 64-bit aligned fields first (8 bytes each)
	StartOffset int64 `json:"start_offset"` // Byte offset in virtual stream
	EndOffset   int64 `json:"end_offset"`   // Last byte offset + 1
	StartEntry  int64 `json:"start_entry"`  // First entry number
	Entries     int64 `json:"entries"`      // Number of entries

	// Time fields (24 bytes each on 64-bit due to location pointer)
	StartTime time.Time `json:"start_time"` // File creation time
	EndTime   time.Time `json:"end_time"`   // Last write time

	// String last (will use remaining space efficiently)
	Path string `json:"path"`
}

FileInfo represents a data file in the shard Fields ordered for optimal memory alignment

type IndexingConfig

type IndexingConfig struct {
	BoundaryInterval int `json:"boundary_interval"` // Store boundary every N entries
	MaxIndexEntries  int `json:"max_index_entries"` // Max boundary entries per shard (0 = unlimited)
}

IndexingConfig controls indexing behavior

type LogConfig added in v1.1.1

type LogConfig struct {
	// Logger allows injecting a custom logger
	// If nil, a default logger will be created based on Level
	Logger Logger `json:"-"`

	// Level controls log level when using default logger
	// Options: "debug", "info", "warn", "error", "none"
	Level string `json:"level"`
}

LogConfig controls logging behavior

type LogLevel added in v1.1.1

type LogLevel int

LogLevel represents the severity of a log message

const (
	LogLevelDebug LogLevel = iota
	LogLevelInfo
	LogLevelWarn
	LogLevelError
)

type Logger added in v1.1.1

type Logger interface {
	// Debug logs a debug message with optional key-value pairs
	Debug(msg string, keysAndValues ...any)

	// Info logs an informational message with optional key-value pairs
	Info(msg string, keysAndValues ...any)

	// Warn logs a warning message with optional key-value pairs
	Warn(msg string, keysAndValues ...any)

	// Error logs an error message with optional key-value pairs
	Error(msg string, keysAndValues ...any)

	// WithContext returns a logger with the given context
	WithContext(ctx context.Context) Logger

	// WithFields returns a logger with the given fields attached
	WithFields(keysAndValues ...any) Logger
}

Logger is the interface for Comet's logging needs. It's designed to be simple and easy to adapt to various logging libraries.

type MappedFile

type MappedFile struct {
	FileInfo
	// contains filtered or unexported fields
}

MappedFile represents a memory-mapped file with atomic data access

type MessageID

type MessageID struct {
	EntryNumber int64  `json:"entry_number"`
	ShardID     uint32 `json:"shard_id"`
}

MessageID represents a structured message ID Fields ordered for optimal memory alignment: int64 first, then uint32

func ParseMessageID

func ParseMessageID(str string) (MessageID, error)

ParseMessageID parses a string ID back to MessageID

func (MessageID) String

func (id MessageID) String() string

String returns the string representation of the ID (ShardID-EntryNumber format)

type NoOpLogger added in v1.1.1

type NoOpLogger struct{}

NoOpLogger is a logger that discards all log messages

func (NoOpLogger) Debug added in v1.1.1

func (NoOpLogger) Debug(msg string, keysAndValues ...any)

func (NoOpLogger) Error added in v1.1.1

func (NoOpLogger) Error(msg string, keysAndValues ...any)

func (NoOpLogger) Info added in v1.1.1

func (NoOpLogger) Info(msg string, keysAndValues ...any)

func (NoOpLogger) Warn added in v1.1.1

func (NoOpLogger) Warn(msg string, keysAndValues ...any)

func (NoOpLogger) WithContext added in v1.1.1

func (n NoOpLogger) WithContext(ctx context.Context) Logger

func (NoOpLogger) WithFields added in v1.1.1

func (n NoOpLogger) WithFields(keysAndValues ...any) Logger

type ProcessFunc

type ProcessFunc func(ctx context.Context, messages []StreamMessage) error

ProcessFunc handles a batch of messages, returning error to trigger retry

type ProcessOption

type ProcessOption func(*processConfig)

ProcessOption configures the Process method

func WithAutoAck

func WithAutoAck(enabled bool) ProcessOption

WithAutoAck controls automatic acknowledgment (default: true)

func WithBatchCallback

func WithBatchCallback(callback func(size int, duration time.Duration)) ProcessOption

WithBatchCallback sets a callback after each batch completes

func WithBatchSize

func WithBatchSize(size int) ProcessOption

WithBatchSize sets the number of messages to read at once

func WithConsumerAssignment

func WithConsumerAssignment(id, total int) ProcessOption

WithConsumerAssignment configures distributed processing

func WithErrorHandler

func WithErrorHandler(handler func(err error, retryCount int)) ProcessOption

WithErrorHandler sets a callback for processing errors

func WithMaxRetries

func WithMaxRetries(retries int) ProcessOption

WithMaxRetries sets the number of retry attempts for failed batches

func WithPollInterval

func WithPollInterval(interval time.Duration) ProcessOption

WithPollInterval sets how long to wait when no messages are available

func WithRetryDelay

func WithRetryDelay(delay time.Duration) ProcessOption

WithRetryDelay sets the base delay between retries

func WithShardDiscoveryInterval added in v1.3.8

func WithShardDiscoveryInterval(interval time.Duration) ProcessOption

WithShardDiscoveryInterval sets how often to check for new shards Default is 5 seconds. Set to 0 to disable periodic rediscovery.

func WithShards

func WithShards(shards ...uint32) ProcessOption

WithShards specifies explicit shards to process

func WithStream

func WithStream(pattern string) ProcessOption

WithStream specifies a stream pattern for shard discovery

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader provides bounded memory-mapped read access to a shard with intelligent file management

func NewReader

func NewReader(shardID uint32, index *ShardIndex, config ...ReaderConfig) (*Reader, error)

NewReader creates a new bounded reader for a shard with smart file mapping

func (*Reader) Close

func (r *Reader) Close() error

Close unmaps all files and cleans up resources

func (*Reader) GetMemoryUsage added in v1.1.1

func (r *Reader) GetMemoryUsage() (int64, int)

GetMemoryUsage returns current memory usage statistics

func (*Reader) ReadEntryAtPosition

func (r *Reader) ReadEntryAtPosition(pos EntryPosition) ([]byte, error)

ReadEntryAtPosition reads a single entry at the given position

func (*Reader) ReadEntryByNumber added in v1.1.1

func (r *Reader) ReadEntryByNumber(entryNumber int64) ([]byte, error)

ReadEntryByNumber reads an entry by its sequential entry number, using the file metadata to locate it

func (*Reader) SetClient added in v1.3.5

func (r *Reader) SetClient(client *Client)

SetClient sets the client reference for index refreshing

func (*Reader) SetState added in v1.1.1

func (r *Reader) SetState(state *CometState)

SetState sets the shared state for metrics tracking

func (*Reader) UpdateFiles added in v1.1.1

func (r *Reader) UpdateFiles(newFiles *[]FileInfo) error

UpdateFiles updates the reader with new file information

type ReaderConfig added in v1.1.1

type ReaderConfig struct {
	MaxMappedFiles  int   // Maximum number of files to keep mapped (default: 10)
	MaxMemoryBytes  int64 // Maximum memory to use for mapping (default: 1GB)
	CleanupInterval int   // How often to run cleanup in milliseconds (default: 5000)
}

ReaderConfig configures the bounded reader behavior

func DefaultReaderConfig added in v1.1.1

func DefaultReaderConfig() ReaderConfig

DefaultReaderConfig returns the default configuration for Reader

func ReaderConfigForStorage added in v1.3.0

func ReaderConfigForStorage(maxFileSize int64) ReaderConfig

ReaderConfigForStorage returns a reader configuration optimized for the given storage settings

type RetentionConfig

type RetentionConfig struct {
	MaxAge            time.Duration `json:"max_age"`             // Delete files older than this
	MaxBytes          int64         `json:"max_bytes"`           // Delete oldest files if total size exceeds
	MaxTotalSize      int64         `json:"max_total_size"`      // Alias for MaxBytes for compatibility
	MaxShardSize      int64         `json:"max_shard_size"`      // Maximum size per shard
	CheckUnconsumed   bool          `json:"check_unconsumed"`    // Protect unconsumed messages
	ProtectUnconsumed bool          `json:"protect_unconsumed"`  // Alias for CheckUnconsumed
	CleanupInterval   time.Duration `json:"cleanup_interval"`    // How often to run cleanup (0 = disabled)
	FileGracePeriod   time.Duration `json:"file_grace_period"`   // Don't delete files newer than this
	ForceDeleteAfter  time.Duration `json:"force_delete_after"`  // Force delete files older than this
	SafetyMargin      float64       `json:"safety_margin"`       // Keep this fraction of space free (0.0-1.0)
	MinFilesToRetain  int           `json:"min_files_to_retain"` // Always keep at least N files per shard
	MinFilesToKeep    int           `json:"min_files_to_keep"`   // Alias for MinFilesToRetain
}

RetentionConfig controls data retention policies

type RetentionStats

type RetentionStats struct {
	// 8-byte aligned fields first
	TotalSizeBytes int64         `json:"total_size_bytes"`
	TotalSizeGB    float64       `json:"total_size_gb"`
	MaxTotalSizeGB float64       `json:"max_total_size_gb"`
	RetentionAge   time.Duration `json:"retention_age"`
	TotalFiles     int           `json:"total_files"`

	// Pointer fields (8 bytes)
	ShardStats map[uint32]ShardRetentionStats `json:"shard_stats,omitempty"`

	// Larger composite types last (time.Time is 24 bytes)
	OldestData time.Time `json:"oldest_data"`
	NewestData time.Time `json:"newest_data"`
}

RetentionStats provides type-safe retention statistics Fields ordered for optimal memory alignment

type Shard

type Shard struct {
	// contains filtered or unexported fields
}

Shard represents a single stream shard with its own files Fields ordered for optimal memory alignment (64-bit words first)

func (*Shard) GetConsumerOffset added in v1.3.0

func (s *Shard) GetConsumerOffset(group string) int64

GetConsumerOffset returns the current offset for a consumer group

func (*Shard) UpdateConsumerOffset added in v1.3.0

func (s *Shard) UpdateConsumerOffset(group string, offset int64)

UpdateConsumerOffset updates the offset for a consumer group

type ShardDiagnostics added in v1.3.8

type ShardDiagnostics struct {
	// 8-byte aligned fields first
	WriteRate          float64          `json:"write_rate_per_sec"`
	ErrorRate          float64          `json:"error_rate_per_sec"`
	TotalEntries       int64            `json:"total_entries"`
	TotalBytes         int64            `json:"total_bytes"`
	FileRotateFailures int64            `json:"file_rotate_failures"`
	ConsumerLags       map[string]int64 `json:"consumer_lags"`
	UnhealthyReasons   []string         `json:"unhealthy_reasons,omitempty"`
	LastError          string           `json:"last_error,omitempty"`

	// Time fields (24 bytes each)
	LastWriteTime time.Time `json:"last_write_time"`
	LastErrorTime time.Time `json:"last_error_time,omitempty"`

	// Smaller fields last
	ShardID   uint32 `json:"shard_id"`
	FileCount int    `json:"file_count"`
	IsHealthy bool   `json:"is_healthy"`
}

ShardDiagnostics provides detailed diagnostics for a specific shard Fields ordered for optimal memory alignment

type ShardIndex

type ShardIndex struct {
	// 64-bit aligned fields first (8 bytes each)
	CurrentEntryNumber int64 `json:"current_entry_number"` // Entry-based tracking (not byte offsets!)
	CurrentWriteOffset int64 `json:"current_write_offset"` // Still track for file management

	// Maps (8 bytes pointer each)
	ConsumerOffsets map[string]int64 `json:"consumer_entry_offsets"` // Consumer tracking by entry number (not bytes!)

	// Composite types
	BinaryIndex BinarySearchableIndex `json:"binary_index"` // Binary searchable index for O(log n) lookups

	// Slices (24 bytes: ptr + len + cap)
	Files []FileInfo `json:"files"` // File management

	// Strings (24 bytes: ptr + len + cap)
	CurrentFile string `json:"current_file"`

	// Smaller fields last
	BoundaryInterval int `json:"boundary_interval"` // Store boundaries every N entries (4 bytes)
}

ShardIndex tracks files and consumer offsets Fixed to use entry-based addressing instead of byte offsets Fields ordered for optimal memory alignment

type ShardRetentionStats

type ShardRetentionStats struct {
	// 8-byte aligned fields first
	SizeBytes int64 `json:"size_bytes"`
	Files     int   `json:"files"`

	// Pointer field (8 bytes)
	ConsumerLag map[string]int64 `json:"consumer_lag,omitempty"`

	// Larger composite types last (time.Time is 24 bytes)
	OldestEntry time.Time `json:"oldest_entry"`
	NewestEntry time.Time `json:"newest_entry"`
}

ShardRetentionStats provides retention stats for a single shard Fields ordered for optimal memory alignment

type SlogAdapter added in v1.1.1

type SlogAdapter struct {
	// contains filtered or unexported fields
}

SlogAdapter adapts slog.Logger to the Comet Logger interface

func NewSlogAdapter added in v1.1.1

func NewSlogAdapter(logger *slog.Logger) *SlogAdapter

NewSlogAdapter creates a new adapter for slog.Logger

func (*SlogAdapter) Debug added in v1.1.1

func (s *SlogAdapter) Debug(msg string, keysAndValues ...any)

func (*SlogAdapter) Error added in v1.1.1

func (s *SlogAdapter) Error(msg string, keysAndValues ...any)

func (*SlogAdapter) Info added in v1.1.1

func (s *SlogAdapter) Info(msg string, keysAndValues ...any)

func (*SlogAdapter) Warn added in v1.1.1

func (s *SlogAdapter) Warn(msg string, keysAndValues ...any)

func (*SlogAdapter) WithContext added in v1.1.1

func (s *SlogAdapter) WithContext(ctx context.Context) Logger

func (*SlogAdapter) WithFields added in v1.1.1

func (s *SlogAdapter) WithFields(keysAndValues ...any) Logger

type StdLogger added in v1.1.1

type StdLogger struct {
	// contains filtered or unexported fields
}

StdLogger is a simple logger that writes to stdout/stderr

func NewStdLogger added in v1.1.1

func NewStdLogger(level LogLevel) *StdLogger

NewStdLogger creates a new standard logger

func (*StdLogger) Debug added in v1.1.1

func (s *StdLogger) Debug(msg string, keysAndValues ...any)

func (*StdLogger) Error added in v1.1.1

func (s *StdLogger) Error(msg string, keysAndValues ...any)

func (*StdLogger) Info added in v1.1.1

func (s *StdLogger) Info(msg string, keysAndValues ...any)

func (*StdLogger) Warn added in v1.1.1

func (s *StdLogger) Warn(msg string, keysAndValues ...any)

func (*StdLogger) WithContext added in v1.1.1

func (s *StdLogger) WithContext(ctx context.Context) Logger

func (*StdLogger) WithFields added in v1.1.1

func (s *StdLogger) WithFields(keysAndValues ...any) Logger

type StorageConfig

type StorageConfig struct {
	MaxFileSize        int64         `json:"max_file_size"`       // Maximum size per file before rotation
	CheckpointInterval time.Duration `json:"checkpoint_interval"` // Checkpoint interval
	CheckpointEntries  int           `json:"checkpoint_entries"`  // Checkpoint every N entries (default: 100000)
	FlushInterval      time.Duration `json:"flush_interval"`      // Flush to OS cache interval (memory management, not durability)
	FlushEntries       int           `json:"flush_entries"`       // Flush to OS cache every N entries (memory management, not durability)
}

StorageConfig controls file storage behavior

type StreamMessage

type StreamMessage struct {
	Stream string    // Stream name/identifier
	ID     MessageID // Unique message ID
	Data   []byte    // Raw message data
}

StreamMessage represents a message read from a stream

type StreamStats

type StreamStats struct {
	// 64-bit aligned fields first
	TotalEntries int64
	TotalBytes   int64
	OldestEntry  time.Time
	NewestEntry  time.Time

	// Composite types
	ConsumerOffsets map[string]int64

	// Smaller fields last
	FileCount int    // 8 bytes
	ShardID   uint32 // 4 bytes

}

StreamStats returns statistics about a shard Fields ordered for optimal memory alignment

type WriteMode added in v1.3.0

type WriteMode int

WriteMode determines how data is written to disk

const (
	WriteModeDirect   WriteMode = iota // Direct I/O with O_SYNC
	WriteModeBuffered                  // Standard buffered writes (fastest)
	WriteModeFSync                     // Buffered writes + explicit fsync

)

type WriteRequest

type WriteRequest struct {
	CurrentWriteOffset int64       // Snapshot of write offset at request time (8 bytes)
	WriteBuffers       [][]byte    // Buffers to write (24 bytes)
	IDs                []MessageID // Message IDs for the batch (24 bytes)
	ShouldFlush        bool        // Whether to flush after this write (1 byte)
}

WriteRequest represents a batch write operation Fields ordered for optimal memory alignment

Directories

Path Synopsis
cmd
test_unified command
test_worker command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL