Documentation
¶
Overview ¶
Package queue provides compression utility functions for message payloads.
Package queue provides deduplication tracking for message idempotency.
Package queue provides message dequeue operations. This file contains FIFO and priority-based dequeue functionality.
Package queue provides dead letter queue (DLQ) operations. This file contains retry handling, acknowledgement, and DLQ management functionality.
Package queue provides message enqueue operations. This file contains all variants of enqueue functionality including batch operations.
Package queue provides lifecycle management for queue operations. This file contains sync, stats, compaction, and timer management functionality.
Package queue provides configuration and validation for queue options. This file contains the Options struct and related functions.
Package queue provides priority queue management functionality. This file contains priority index operations for priority-aware message dequeuing.
Package queue provides a persistent message queue implementation.
Queue provides a durable, disk-backed message queue with:
- Persistent storage with automatic segment rotation
- Message ordering guarantees (FIFO)
- Offset-based message tracking
- Efficient batching support
- Crash recovery
Basic usage:
q, err := queue.Open("/path/to/queue", nil)
if err != nil {
log.Fatal(err)
}
defer q.Close()
// Enqueue a message
offset, err := q.Enqueue([]byte("hello"))
if err != nil {
log.Fatal(err)
}
// Dequeue a message
msg, err := q.Dequeue()
if err != nil {
log.Fatal(err)
}
Package queue provides seek operations for queue navigation. This file contains methods for seeking to specific positions in the queue.
Package queue provides streaming API for real-time message delivery. This file contains the streaming functionality for continuous message processing.
Package queue provides validation utilities for queue operations. This file contains platform-independent validation and sanitization functions.
Package queue provides validation utilities for queue operations. This file contains Unix-specific disk space checking functionality.
Index ¶
- Constants
- func CalculateBackoff(retryCount int, baseDelay, maxBackoff time.Duration) time.Duration
- type BatchEnqueueOptions
- type DedupTracker
- func (dt *DedupTracker) ActiveCount() int
- func (dt *DedupTracker) Check(dedupID string, window time.Duration) (uint64, bool)
- func (dt *DedupTracker) CleanExpired() int
- func (dt *DedupTracker) Close() error
- func (dt *DedupTracker) Count() int
- func (dt *DedupTracker) EnsureStateDir() error
- func (dt *DedupTracker) Load() error
- func (dt *DedupTracker) Persist() error
- func (dt *DedupTracker) Track(dedupID string, msgID uint64, offset uint64, window time.Duration) error
- type Message
- type Metadata
- type MetricsCollector
- type Options
- type Queue
- func (q *Queue) Ack(msgID uint64) error
- func (q *Queue) Close() error
- func (q *Queue) Compact() (*segment.CompactionResult, error)
- func (q *Queue) Dequeue() (*Message, error)
- func (q *Queue) DequeueBatch(maxMessages int) ([]*Message, error)
- func (q *Queue) Enqueue(payload []byte) (uint64, error)
- func (q *Queue) EnqueueBatch(payloads [][]byte) ([]uint64, error)
- func (q *Queue) EnqueueBatchWithOptions(messages []BatchEnqueueOptions) ([]uint64, error)
- func (q *Queue) EnqueueWithAllOptions(payload []byte, priority uint8, ttl time.Duration, headers map[string]string) (uint64, error)
- func (q *Queue) EnqueueWithCompression(payload []byte, compression format.CompressionType) (uint64, error)
- func (q *Queue) EnqueueWithDedup(payload []byte, dedupID string, window time.Duration) (uint64, bool, error)
- func (q *Queue) EnqueueWithHeaders(payload []byte, headers map[string]string) (uint64, error)
- func (q *Queue) EnqueueWithOptions(payload []byte, ttl time.Duration, headers map[string]string) (uint64, error)
- func (q *Queue) EnqueueWithPriority(payload []byte, priority uint8) (uint64, error)
- func (q *Queue) EnqueueWithTTL(payload []byte, ttl time.Duration) (uint64, error)
- func (q *Queue) GetDLQ() *Queue
- func (q *Queue) GetRetryInfo(msgID uint64) *RetryInfo
- func (q *Queue) IsClosed() bool
- func (q *Queue) Nack(msgID uint64, reason string) error
- func (q *Queue) RequeueFromDLQ(dlqMsgID uint64) error
- func (q *Queue) SeekToMessageID(msgID uint64) error
- func (q *Queue) SeekToTimestamp(timestamp int64) error
- func (q *Queue) Stats() *Stats
- func (q *Queue) Stream(ctx context.Context, handler StreamHandler) error
- func (q *Queue) Sync() error
- type RetryInfo
- type RetryTracker
- type Stats
- type StreamHandler
Constants ¶
const ( // MetadataFileName is the name of the metadata file MetadataFileName = "metadata.dat" // MetadataVersion is the current metadata format version MetadataVersion uint32 = 1 // MetadataSize is the fixed size of metadata file MetadataSize = 24 // version(4) + nextMsgID(8) + readMsgID(8) + reserved(4) )
Variables ¶
This section is empty.
Functions ¶
func CalculateBackoff ¶
CalculateBackoff calculates exponential backoff duration based on retry count (v1.2.0+). This is a helper function for implementing retry logic with the DLQ system. The backoff duration increases exponentially: base * 2^retryCount, capped at maxBackoff.
Parameters:
- retryCount: Number of previous retry attempts (typically from RetryInfo.RetryCount)
- baseDelay: Base delay for the first retry (e.g., 1 second)
- maxBackoff: Maximum backoff duration to prevent excessively long waits
Example usage:
msg, _ := q.Dequeue()
if info := q.GetRetryInfo(msg.ID); info != nil && info.RetryCount > 0 {
backoff := CalculateBackoff(info.RetryCount, time.Second, 5*time.Minute)
time.Sleep(backoff)
}
Returns the calculated backoff duration, always between baseDelay and maxBackoff.
Types ¶
type BatchEnqueueOptions ¶
type BatchEnqueueOptions struct {
// Payload is the message data
Payload []byte
// Priority is the message priority level (v1.1.0+)
// Default: PriorityLow (0)
Priority uint8
// TTL is the time-to-live duration for the message
// Set to 0 for no expiration
// Default: 0 (no expiration)
TTL time.Duration
// Headers contains key-value metadata for the message
// Default: nil
Headers map[string]string
// Compression is the compression type for the message (v1.3.0+)
// Set to CompressionNone to use queue's DefaultCompression
// Default: CompressionNone (uses queue default)
Compression format.CompressionType
}
BatchEnqueueOptions contains options for enqueueing a single message in a batch operation.
type DedupTracker ¶
type DedupTracker struct {
// contains filtered or unexported fields
}
DedupTracker manages hash-based deduplication with time windows. It provides O(1) duplicate detection using SHA-256 hashing and maintains crash-safe persistent state.
func NewDedupTracker ¶
func NewDedupTracker(statePath string, maxSize int) *DedupTracker
NewDedupTracker creates a new deduplication tracker.
func (*DedupTracker) ActiveCount ¶
func (dt *DedupTracker) ActiveCount() int
ActiveCount returns the number of non-expired entries.
func (*DedupTracker) Check ¶
Check returns the original offset if the dedup ID is a duplicate. Returns (offset, true) if duplicate found, (0, false) if new.
func (*DedupTracker) CleanExpired ¶
func (dt *DedupTracker) CleanExpired() int
CleanExpired removes all expired entries from the table. Returns the number of entries removed.
func (*DedupTracker) Close ¶
func (dt *DedupTracker) Close() error
Close persists state and cleans up resources.
func (*DedupTracker) Count ¶
func (dt *DedupTracker) Count() int
Count returns the total number of entries (including expired).
func (*DedupTracker) EnsureStateDir ¶
func (dt *DedupTracker) EnsureStateDir() error
EnsureStateDir creates the state directory if it doesn't exist.
func (*DedupTracker) Load ¶
func (dt *DedupTracker) Load() error
Load restores the deduplication state from disk. Returns nil if state file doesn't exist (fresh start).
func (*DedupTracker) Persist ¶
func (dt *DedupTracker) Persist() error
Persist saves the deduplication state to disk using atomic writes. Uses temp file + rename pattern for crash safety.
type Message ¶
type Message struct {
// ID is the unique message identifier
ID uint64
// Offset is the file offset where the message is stored
Offset uint64
// Payload is the message data
Payload []byte
// Timestamp is when the message was enqueued (Unix nanoseconds)
Timestamp int64
// ExpiresAt is when the message expires (Unix nanoseconds), 0 if no TTL
ExpiresAt int64
// Priority is the message priority level (v1.1.0+)
Priority uint8
// Headers contains key-value metadata for the message
Headers map[string]string
}
Message represents a dequeued message.
type Metadata ¶
type Metadata struct {
Version uint32 // Format version
NextMsgID uint64 // Next message ID to assign
ReadMsgID uint64 // Next message ID to read
// contains filtered or unexported fields
}
Metadata stores queue state for persistence
func OpenMetadata ¶
OpenMetadata opens or creates a metadata file
func (*Metadata) SetNextMsgID ¶
SetNextMsgID updates the next message ID and persists it
func (*Metadata) SetReadMsgID ¶
SetReadMsgID updates the read message ID and persists it
func (*Metadata) UpdateState ¶
UpdateState updates both IDs and persists them atomically
type MetricsCollector ¶
type MetricsCollector interface {
RecordEnqueue(payloadSize int, duration time.Duration)
RecordDequeue(payloadSize int, duration time.Duration)
RecordEnqueueBatch(count, totalPayloadSize int, duration time.Duration)
RecordDequeueBatch(count, totalPayloadSize int, duration time.Duration)
RecordEnqueueError()
RecordDequeueError()
RecordSeek()
RecordCompaction(segmentsRemoved int, bytesFreed int64, duration time.Duration)
RecordCompactionError()
UpdateQueueState(pending, segments, nextMsgID, readMsgID uint64)
}
MetricsCollector defines the interface for recording queue metrics.
type Options ¶
type Options struct {
// SegmentOptions configures segment management
SegmentOptions *segment.ManagerOptions
// AutoSync enables automatic syncing after each write
AutoSync bool
// SyncInterval for periodic syncing (if AutoSync is false)
SyncInterval time.Duration
// CompactionInterval for automatic background compaction (0 = disabled)
CompactionInterval time.Duration
// EnablePriorities enables priority queue mode (v1.1.0+)
// When disabled, all messages are treated as PriorityLow (FIFO behavior)
EnablePriorities bool
// PriorityStarvationWindow prevents low-priority message starvation (v1.1.0+)
// Low-priority messages waiting longer than this duration will be promoted
// Set to 0 to disable starvation prevention
PriorityStarvationWindow time.Duration
// DLQPath is the path to the dead letter queue directory (v1.2.0+)
// If empty, DLQ is disabled. Messages that fail processing after MaxRetries
// will be moved to this separate queue for inspection and potential reprocessing.
DLQPath string
// MaxRetries is the maximum number of delivery attempts before moving to DLQ (v1.2.0+)
// Set to 0 for unlimited retries (messages never move to DLQ).
// Only effective when DLQPath is configured.
// Default: 3
MaxRetries int
// MaxMessageSize is the maximum size in bytes for a single message payload (v1.2.0+)
// Messages larger than this will be rejected during enqueue.
// Set to 0 for unlimited message size (not recommended for production).
// Default: 10 MB
MaxMessageSize int64
// MinFreeDiskSpace is the minimum required free disk space in bytes (v1.2.0+)
// Enqueue operations will fail if available disk space falls below this threshold.
// Set to 0 to disable disk space checking (not recommended for production).
// Default: 100 MB
MinFreeDiskSpace int64
// DLQMaxAge is the maximum age for messages in the DLQ (v1.2.0+)
// Messages older than this duration will be removed during compaction.
// Set to 0 to keep DLQ messages indefinitely.
// Default: 0 (no age-based cleanup)
DLQMaxAge time.Duration
// DLQMaxSize is the maximum total size in bytes for the DLQ (v1.2.0+)
// When DLQ exceeds this size, oldest messages will be removed during compaction.
// Set to 0 for unlimited DLQ size.
// Default: 0 (no size limit)
DLQMaxSize int64
// DefaultCompression is the compression type used when not explicitly specified (v1.3.0+)
// Set to CompressionNone to disable compression by default
// Default: CompressionNone (no compression)
DefaultCompression format.CompressionType
// CompressionLevel is the compression level for algorithms that support it (v1.3.0+)
// For GZIP: 1 (fastest) to 9 (best compression), 0 = default (6)
// Higher values = better compression but slower
// Default: 0 (use algorithm default, which is 6 for gzip)
CompressionLevel int
// MinCompressionSize is the minimum payload size to compress (v1.3.0+)
// Messages smaller than this are not compressed even if compression is requested
// This avoids the CPU overhead when compression doesn't help much
// Default: 1024 bytes (1KB)
MinCompressionSize int
// DefaultDeduplicationWindow is the default time window for dedup tracking (v1.4.0+)
// Set to 0 to disable deduplication by default
// Default: 0 (disabled)
DefaultDeduplicationWindow time.Duration
// MaxDeduplicationEntries is the maximum number of dedup entries to track (v1.4.0+)
// Prevents unbounded memory growth
// Default: 100,000 entries (~6.4 MB)
MaxDeduplicationEntries int
// Logger for structured logging (nil = no logging)
Logger logging.Logger
// MetricsCollector for collecting queue metrics (nil = no metrics)
MetricsCollector MetricsCollector
}
Options configures queue behavior.
func DefaultOptions ¶
DefaultOptions returns sensible defaults for queue configuration.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is a persistent, disk-backed message queue.
func Open ¶
Open opens or creates a queue at the specified directory. Creates the directory if it doesn't exist.
func (*Queue) Ack ¶
Ack acknowledges a message processing success (v1.2.0+). When DLQ is enabled, this clears retry tracking for the message. If DLQ is not configured, this method is a no-op.
Ack should be called after successfully processing a message from the queue to indicate that the message does not need to be retried.
func (*Queue) Compact ¶
func (q *Queue) Compact() (*segment.CompactionResult, error)
Compact manually triggers compaction of old segments based on retention policy. Returns the compaction result with segments removed and bytes freed.
func (*Queue) Dequeue ¶
Dequeue retrieves the next message from the queue. Returns an error if no messages available. Automatically skips expired messages (with TTL). When EnablePriorities is true, returns messages in priority order (High → Medium → Low).
func (*Queue) DequeueBatch ¶
DequeueBatch retrieves up to maxMessages from the queue in a single operation. Returns fewer messages if the queue has fewer than maxMessages available. Returns an error if no messages are available. Automatically skips expired messages (with TTL).
func (*Queue) Enqueue ¶
Enqueue appends a message to the queue. Returns the offset where the message was written.
func (*Queue) EnqueueBatch ¶
EnqueueBatch appends multiple messages to the queue in a single operation. This is more efficient than calling Enqueue() multiple times as it performs a single fsync for all messages. Returns the offsets where the messages were written.
func (*Queue) EnqueueBatchWithOptions ¶
func (q *Queue) EnqueueBatchWithOptions(messages []BatchEnqueueOptions) ([]uint64, error)
EnqueueBatchWithOptions appends multiple messages with individual options to the queue. This is more efficient than calling EnqueueWithAllOptions() multiple times as it performs a single fsync for all messages. Each message can have different priority, TTL, and headers. Returns the offsets where the messages were written.
func (*Queue) EnqueueWithAllOptions ¶
func (q *Queue) EnqueueWithAllOptions(payload []byte, priority uint8, ttl time.Duration, headers map[string]string) (uint64, error)
EnqueueWithAllOptions appends a message with priority, TTL, and headers (v1.1.0+). This is the most flexible enqueue method, combining all available features. Returns the offset where the message was written.
func (*Queue) EnqueueWithCompression ¶
func (q *Queue) EnqueueWithCompression(payload []byte, compression format.CompressionType) (uint64, error)
EnqueueWithCompression appends a message with explicit compression (v1.3.0+). This allows overriding the queue's DefaultCompression setting for individual messages. Returns the offset where the message was written.
func (*Queue) EnqueueWithDedup ¶
func (q *Queue) EnqueueWithDedup(payload []byte, dedupID string, window time.Duration) (uint64, bool, error)
EnqueueWithDedup appends a message with deduplication tracking (v1.4.0+). If a message with the same dedupID was enqueued within the deduplication window, this returns the original message ID and offset without writing a duplicate. Returns (offset, isDuplicate, error).
dedupID: A unique identifier for this message (e.g., order ID, request ID) window: How long to track this message for deduplication (0 = use queue default)
Example:
offset, isDup, err := q.EnqueueWithDedup(payload, "order-12345", 5*time.Minute)
if err != nil { ... }
if isDup {
// Message was already processed, offset is the original message
}
func (*Queue) EnqueueWithHeaders ¶
EnqueueWithHeaders appends a message to the queue with key-value metadata headers. Headers can be used for routing, tracing, content-type indication, or message classification. Returns the offset where the message was written.
func (*Queue) EnqueueWithOptions ¶
func (q *Queue) EnqueueWithOptions(payload []byte, ttl time.Duration, headers map[string]string) (uint64, error)
EnqueueWithOptions appends a message with both TTL and headers. This allows combining multiple features in a single enqueue operation. Returns the offset where the message was written.
func (*Queue) EnqueueWithPriority ¶
EnqueueWithPriority appends a message with a specific priority level (v1.1.0+). Returns the offset where the message was written.
func (*Queue) EnqueueWithTTL ¶
EnqueueWithTTL appends a message to the queue with a time-to-live duration. The message will expire after the specified TTL and will be skipped during dequeue. Returns the offset where the message was written.
func (*Queue) GetDLQ ¶
GetDLQ returns the dead letter queue for inspection (v1.2.0+). Returns nil if DLQ is not configured. The returned queue can be used to inspect or dequeue messages from the DLQ.
func (*Queue) GetRetryInfo ¶
GetRetryInfo returns retry information for a message (v1.2.0+). Returns nil if DLQ is not configured or if the message has no retry tracking. This is useful for implementing custom retry logic and backoff strategies.
The returned RetryInfo contains:
- MessageID: The message ID being tracked
- RetryCount: Number of times Nack() has been called for this message
- LastFailure: Timestamp of the most recent Nack() call
- FailureReason: Reason string from the most recent Nack() call
Example usage:
msg, _ := q.Dequeue()
info := q.GetRetryInfo(msg.ID)
if info != nil && info.RetryCount > 0 {
// Calculate backoff based on retry count
backoff := time.Duration(1<<uint(info.RetryCount)) * time.Second
time.Sleep(backoff)
}
func (*Queue) Nack ¶
Nack reports a message processing failure (v1.2.0+). When DLQ is enabled, this increments the retry count and potentially moves the message to the dead letter queue if max retries are exceeded. If DLQ is not configured, this method is a no-op.
The reason parameter should describe why the message processing failed. This reason is stored with the retry metadata for debugging purposes.
Returns an error if the operation fails.
func (*Queue) RequeueFromDLQ ¶
RequeueFromDLQ moves a message from the DLQ back to the main queue (v1.2.0+). The message ID should be from the DLQ (not the original message ID). Returns an error if DLQ is not configured or the message is not found.
func (*Queue) SeekToMessageID ¶
SeekToMessageID sets the read position to a specific message ID. Subsequent Dequeue() calls will start reading from this message. This is useful for replay scenarios or skipping messages.
The message ID must exist in the queue (between first and next message ID). After seeking, the read position is persisted to metadata.
func (*Queue) SeekToTimestamp ¶
SeekToTimestamp sets the read position to the first message at or after the given timestamp. Uses the segment index for efficient lookup. Returns an error if no messages exist at or after the timestamp.
func (*Queue) Stream ¶
func (q *Queue) Stream(ctx context.Context, handler StreamHandler) error
Stream continuously reads messages from the queue and calls the handler for each message. Streaming continues until the context is cancelled, an error occurs, or no more messages are available. The handler is called for each message in order.
The Stream method polls for new messages with a configurable interval (100ms by default). When a message is available, it's immediately passed to the handler. If no messages are available, Stream waits briefly before checking again.
Context cancellation will gracefully stop streaming and return context.Canceled. Handler errors will stop streaming and return the handler error.
Example usage:
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := q.Stream(ctx, func(msg *Message) error {
fmt.Printf("Received: %s\n", msg.Payload)
return nil
})
type RetryInfo ¶
type RetryInfo struct {
MessageID uint64 `json:"msg_id"`
RetryCount int `json:"retry_count"`
LastFailure time.Time `json:"last_failure"`
FailureReason string `json:"failure_reason,omitempty"`
}
RetryInfo contains retry metadata for a single message.
type RetryTracker ¶
type RetryTracker struct {
// contains filtered or unexported fields
}
RetryTracker manages message retry state for Dead Letter Queue functionality. It tracks how many times each message has failed processing and stores failure metadata for debugging purposes.
The retry state is persisted to disk as JSON for crash recovery.
func NewRetryTracker ¶
func NewRetryTracker(path string, maxRetries int) (*RetryTracker, error)
NewRetryTracker creates a new retry tracker. If a retry state file exists at the given path, it will be loaded. Otherwise, a new tracker is created.
func (*RetryTracker) Ack ¶
func (rt *RetryTracker) Ack(msgID uint64) error
Ack removes retry tracking for a successfully processed message. This is called when a message is processed without errors.
func (*RetryTracker) Clear ¶
func (rt *RetryTracker) Clear() error
Clear removes all retry tracking state. This is useful for testing or manual cleanup.
func (*RetryTracker) Close ¶
func (rt *RetryTracker) Close() error
Close persists the final state and cleans up resources.
func (*RetryTracker) Count ¶
func (rt *RetryTracker) Count() int
Count returns the number of messages currently being tracked. This indicates how many messages have active retry state.
func (*RetryTracker) GetInfo ¶
func (rt *RetryTracker) GetInfo(msgID uint64) *RetryInfo
GetInfo returns retry information for a message. Returns nil if no retry information exists (message hasn't failed yet).
type Stats ¶
type Stats struct {
// TotalMessages is the total number of messages ever enqueued
TotalMessages uint64
// PendingMessages is the number of unread messages
PendingMessages uint64
// NextMessageID is the ID that will be assigned to the next enqueued message
NextMessageID uint64
// ReadMessageID is the ID of the next message to be dequeued
ReadMessageID uint64
// SegmentCount is the number of segments
SegmentCount int
// DLQMessages is the total number of messages in the DLQ
DLQMessages uint64
// DLQPendingMessages is the number of unprocessed messages in the DLQ
DLQPendingMessages uint64
// RetryTrackedMessages is the number of messages currently being tracked for retries
RetryTrackedMessages int
// DedupTrackedEntries is the number of active dedup entries (v1.4.0+)
DedupTrackedEntries int
}
Stats returns queue statistics.
type StreamHandler ¶
StreamHandler is called for each message in the stream. Return an error to stop streaming.