Documentation
¶
Overview ¶
Package source bridges NATS (Core or JetStream) into mig's Sink interface.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Conn ¶
type Conn interface {
Subscribe(subject string, cb nats.MsgHandler) (*nats.Subscription, error)
Status() nats.Status
ConnectedUrl() string
Close()
}
Conn is the read-only interface a Source needs from a NATS connection. Defined here so tests can swap it out for a fake.
type CoreSource ¶
type CoreSource struct {
// contains filtered or unexported fields
}
CoreSource subscribes via Core NATS (no JetStream, no acks, no replay).
type InspectMsg ¶
type InspectMsg struct {
Subject string
Data []byte
Headers nats.Header
Sequence *uint64 // JetStream stream sequence; nil for Core
Timestamp *time.Time // JetStream server timestamp; nil for Core
ReceivedAt time.Time // local clock at ingest
Size int // len(Data), cached
JSONValid bool // single json.Valid pass at ingest
}
InspectMsg is the normalized observation produced by Sources. All fields are populated when applicable; pointer fields are nil when not.
type JetStreamSource ¶
type JetStreamSource struct {
// contains filtered or unexported fields
}
JetStreamSource consumes from a JetStream stream via an in-memory OrderedConsumer (AckNone, ephemeral, gap-healing).
func NewJetStream ¶
NewJetStream prepares a JetStreamSource. If cfg.Stream is empty it auto-detects the stream by intersecting its config subjects with cfg.Subjects. Returns an error including the list of available streams if auto-detection fails.
func (*JetStreamSource) Run ¶
func (j *JetStreamSource) Run(ctx context.Context, sink Sink) error
Run starts the OrderedConsumer and blocks until ctx is cancelled.
func (*JetStreamSource) StreamName ¶
func (j *JetStreamSource) StreamName() string
StreamName returns the stream this source is bound to.
type NoStreamError ¶
NoStreamError is returned when auto-detection finds no stream. The Error() formatting includes the available streams so users can choose --stream or --core. Empty reports whether no streams exist at all (signal to fall back to Core).
func (*NoStreamError) Empty ¶
func (e *NoStreamError) Empty() bool
Empty reports whether no streams existed at all (signal to fall back to Core).
func (*NoStreamError) Error ¶
func (e *NoStreamError) Error() string
type PrintSink ¶
type PrintSink struct {
// contains filtered or unexported fields
}
PrintSink implements Sink by writing each message as a single line to w. Designed for --no-tui mode. Safe for concurrent use.
func NewPrintSink ¶
NewPrintSink returns a PrintSink writing to w. When pretty is true, valid JSON payloads are indented across multiple lines.
func (*PrintSink) Record ¶
func (p *PrintSink) Record(m InspectMsg)
Record writes a single observation line to the sink's writer.
func (*PrintSink) RecordDrop ¶
RecordDrop emits a single-line drop marker for a slow-consumer or similar event.
func (*PrintSink) RecordError ¶
RecordError emits a single-line error marker.
type Sink ¶
type Sink interface {
Record(InspectMsg)
RecordDrop(reason string)
RecordError(err error)
}
Sink is the consumer side of a Source. Implementations must be safe for concurrent calls from arbitrary goroutines and MUST NOT block; doing so would stall the NATS callback and back-pressure the entire connection.