Documentation
¶
Overview ¶
Package storage implements the per-channel file writer and related storage primitives. All file I/O is serialized through a single goroutine per channel so the OS O_APPEND guarantee covers records under PIPE_BUF; larger records are additionally protected by an advisory flock. This package targets POSIX systems (Linux, macOS) only.
Index ¶
- Variables
- func NewChannelWatcher(log logger) (*channelWatcher, error)
- func NewChannelWriter(channelDir string, maxSegmentBytes int64, policy SyncPolicy, ...) (*channelWriter, error)
- func OffsetFileExists(path string) bool
- func ReadOffset(path string) (int64, error)
- func WriteOffset(path string, offset int64) error
- type ChannelWatcher
- type ChannelWriter
- type Compactor
- type HandlerFunc
- type LocalNotifier
- type Subscriber
- type SyncPolicy
Constants ¶
This section is empty.
Variables ¶
var ErrWriterClosed = errors.New("channel writer is closed")
ErrWriterClosed is returned by Write after Close has been called.
Functions ¶
func NewChannelWatcher ¶
func NewChannelWatcher(log logger) (*channelWatcher, error)
NewChannelWatcher creates a ChannelWatcher backed by fsnotify. log may be nil.
func NewChannelWriter ¶
func NewChannelWriter(channelDir string, maxSegmentBytes int64, policy SyncPolicy, syncInterval time.Duration, notifyFn func(), log logger) (*channelWriter, error)
NewChannelWriter creates channelDir if needed and starts the writer goroutine. maxSegmentBytes controls when the writer rolls to a new segment file; 0 means never roll (all data goes into one segment). notifyFn is called after each successful write; it may be nil. log may be nil.
func OffsetFileExists ¶
OffsetFileExists reports whether an offset file exists at path.
func ReadOffset ¶
ReadOffset reads the byte offset stored at path. The file must contain a decimal integer as written by WriteOffset.
func WriteOffset ¶
WriteOffset atomically persists offset to path. It writes to a sibling .tmp file, fsyncs, then renames over path so the original is never left in a half-written state.
Types ¶
type ChannelWatcher ¶
ChannelWatcher abstracts fsnotify vs polling so subscriber code is testable.
type ChannelWriter ¶
ChannelWriter appends envelopes to a channel's segment files. Write blocks until confirmed (rendezvous — no in-memory queue).
type Compactor ¶
type Compactor struct {
// contains filtered or unexported fields
}
Compactor tracks registered subscribers for a single channel and deletes fully-consumed sealed segment files. Unlike the old copy-based approach, compaction is a simple O(1)-per-segment file deletion with no writer pause.
func NewCompactor ¶
NewCompactor returns a Compactor that reads subscriber offsets from offsetDir. maxLagBytes is the per-subscriber lag threshold above which a warning is logged; 0 disables lag warnings. log may be nil.
func (*Compactor) DeregisterSubscriber ¶
DeregisterSubscriber removes id and deletes its offset file. After removal, compaction is no longer constrained by this subscriber's position.
func (*Compactor) MaybeCompact ¶
MaybeCompact deletes any sealed segment files in channelDir whose contents have been fully consumed by all registered subscribers. The active (newest) segment is never deleted. No writer pause is needed — the writer only ever appends to the active segment, and Unix allows open-fd reads of deleted files to complete normally.
Lag warnings are logged for any subscriber whose unconsumed bytes exceed maxLagBytes (if configured).
func (*Compactor) MinOffset ¶
MinOffset returns the smallest persisted offset across all registered subscribers. Returns 0 if no subscribers are registered.
func (*Compactor) RegisterSubscriber ¶
RegisterSubscriber marks id as a subscriber whose offset constrains deletion.
type HandlerFunc ¶
HandlerFunc processes a decoded message. A non-nil return or a panic triggers the retry / dead-letter logic.
type LocalNotifier ¶
type LocalNotifier struct {
// contains filtered or unexported fields
}
LocalNotifier provides an in-process notification channel that bypasses the filesystem watcher, giving same-process subscribers lower latency. Use LocalNotifier.Notify as the notifyFn argument to NewChannelWriter; have the subscriber select on LocalNotifier.C().
func NewLocalNotifier ¶
func NewLocalNotifier() *LocalNotifier
NewLocalNotifier creates a LocalNotifier with a capacity-1 channel.
func (*LocalNotifier) C ¶
func (n *LocalNotifier) C() <-chan struct{}
C returns the read-only channel that subscribers should select on.
func (*LocalNotifier) Notify ¶
func (n *LocalNotifier) Notify()
Notify sends a non-blocking notification. Safe to call from any goroutine.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber reads envelopes from a channel's segment files, dispatches them to a handler, and persists a global byte offset for at-least-once delivery.
func NewSubscriber ¶
func NewSubscriber( id, channelDir, offsetDir string, reg payloadDecoder, maxRetries int, dlWriter ChannelWriter, log logger, flushInterval time.Duration, ) (*Subscriber, error)
NewSubscriber constructs a Subscriber and initialises its offset file. If no offset file exists the subscriber starts from the current end of the channel (new subscriber, skips history). Otherwise it resumes from the persisted offset (restarting subscriber). log may be nil.
func (*Subscriber) Start ¶
func (s *Subscriber) Start(notifyC <-chan struct{}, handler HandlerFunc)
Start launches the subscriber goroutine. notifyC should come from ChannelWatcher.Watch or LocalNotifier.C(). Call Stop to wait for exit.
func (*Subscriber) Stop ¶
func (s *Subscriber) Stop()
Stop signals the goroutine to exit and blocks until it does. Safe to call more than once.
type SyncPolicy ¶
type SyncPolicy string
SyncPolicy controls when channel file writes are flushed to stable storage.
const ( SyncPolicyNone SyncPolicy = "none" SyncPolicyPeriodic SyncPolicy = "periodic" SyncPolicyAlways SyncPolicy = "always" )
SyncPolicy constants.