source

package
v0.4.0-rc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package source bridges NATS (Core or JetStream) into mig's Sink interface.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Connect

func Connect(cfg config.Config, sink Sink) (*nats.Conn, error)

Connect builds a *nats.Conn from cfg. Auth precedence: explicit flags > --context. Cold-start failures (server unreachable) return immediately with a clear error; once connected, mid-session drops reconnect forever.

Types

type Conn

type Conn interface {
	Subscribe(subject string, cb nats.MsgHandler) (*nats.Subscription, error)
	Status() nats.Status
	ConnectedUrl() string
	Close()
}

Conn is the read-only interface a Source needs from a NATS connection. Defined here so tests can swap it out for a fake.

type CoreSource

type CoreSource struct {
	// contains filtered or unexported fields
}

CoreSource subscribes via Core NATS (no JetStream, no acks, no replay).

func NewCore

func NewCore(nc *nats.Conn, cfg config.Config) *CoreSource

NewCore returns a CoreSource bound to nc. Caller owns nc and must close it.

func (*CoreSource) Kind

func (c *CoreSource) Kind() string

Kind returns "core".

func (*CoreSource) Run

func (c *CoreSource) Run(ctx context.Context, sink Sink) error

Run subscribes to all configured subjects and blocks until ctx is cancelled.

type InspectMsg

type InspectMsg struct {
	Subject    string
	Data       []byte
	Headers    nats.Header
	Sequence   *uint64    // JetStream stream sequence; nil for Core
	Timestamp  *time.Time // JetStream server timestamp; nil for Core
	ReceivedAt time.Time  // local clock at ingest
	Size       int        // len(Data), cached
	JSONValid  bool       // single json.Valid pass at ingest
}

InspectMsg is the normalized observation produced by Sources. All fields are populated when applicable; pointer fields are nil when not.

type JetStreamSource

type JetStreamSource struct {
	// contains filtered or unexported fields
}

JetStreamSource consumes from a JetStream stream via an in-memory OrderedConsumer (AckNone, ephemeral, gap-healing).

func NewJetStream

func NewJetStream(ctx context.Context, nc *nats.Conn, cfg config.Config) (*JetStreamSource, error)

NewJetStream prepares a JetStreamSource. If cfg.Stream is empty it auto-detects the stream by intersecting its config subjects with cfg.Subjects. Returns an error including the list of available streams if auto-detection fails.

func (*JetStreamSource) Kind

func (j *JetStreamSource) Kind() string

Kind returns "jetstream".

func (*JetStreamSource) Run

func (j *JetStreamSource) Run(ctx context.Context, sink Sink) error

Run starts the OrderedConsumer and blocks until ctx is cancelled.

func (*JetStreamSource) StreamName

func (j *JetStreamSource) StreamName() string

StreamName returns the stream this source is bound to.

type NoStreamError

type NoStreamError struct {
	Filters []string
	All     []string
}

NoStreamError is returned when auto-detection finds no stream. The Error() formatting includes the available streams so users can choose --stream or --core. Empty reports whether no streams exist at all (signal to fall back to Core).

func (*NoStreamError) Empty

func (e *NoStreamError) Empty() bool

Empty reports whether no streams existed at all (signal to fall back to Core).

func (*NoStreamError) Error

func (e *NoStreamError) Error() string

type PrintSink

type PrintSink struct {
	// contains filtered or unexported fields
}

PrintSink implements Sink by writing each message as a single line to w. Designed for --no-tui mode. Safe for concurrent use.

func NewPrintSink

func NewPrintSink(w io.Writer, pretty bool) *PrintSink

NewPrintSink returns a PrintSink writing to w. When pretty is true, valid JSON payloads are indented across multiple lines.

func (*PrintSink) Record

func (p *PrintSink) Record(m InspectMsg)

Record writes a single observation line to the sink's writer.

func (*PrintSink) RecordDrop

func (p *PrintSink) RecordDrop(reason string)

RecordDrop emits a single-line drop marker for a slow-consumer or similar event.

func (*PrintSink) RecordError

func (p *PrintSink) RecordError(err error)

RecordError emits a single-line error marker.

type Sink

type Sink interface {
	Record(InspectMsg)
	RecordDrop(reason string)
	RecordError(err error)
}

Sink is the consumer side of a Source. Implementations must be safe for concurrent calls from arbitrary goroutines and MUST NOT block; doing so would stall the NATS callback and back-pressure the entire connection.

type Source

type Source interface {
	Run(ctx context.Context, sink Sink) error
	Kind() string
}

Source delivers InspectMsg into Sink until ctx is cancelled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL