natstransport

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package natstransport implements toc.Publisher and toc.Subscriber over core NATS (not JetStream).

Delivery semantics

This is an at-most-once transport. A nil publish error means the NATS client accepted the message for transmission — it is NOT an acknowledgment of server receipt. Messages may be lost during disconnect/reconnect windows. For durable delivery, use JetStream (requires different interfaces).

Error handling

All subscriber-side failures — handler errors, unmarshal errors, validation errors, and recovered panics — are routed to the error handler configured via WithErrorHandler. The default error handler discards all errors silently. Callers who need observability MUST configure an error handler.

Handler contract

Handlers are invoked serially per subscription. Handlers MUST NOT call toc.Subscription.Close synchronously — doing so deadlocks because the handler holds the active callback count that Close waits to reach zero. Handlers should return promptly; a handler that blocks indefinitely prevents toc.Subscription.Close from completing.

Diagnosis wire format

The Diagnosis protobuf message lacks pipeline_id and timestamp_unix_nano envelope fields (unlike ObservationBatch which has them). This transport uses NATS message headers as a shim:

  • Pipeline ID: extracted from the NATS subject
  • Timestamp: HeaderTimestamp NATS header (base-10 int64, >= 0)

This is a transport-specific compatibility measure. If diagnosis needs a transport-agnostic wire contract, add a proto envelope.

Connection ownership

The caller owns the nats.Conn lifecycle. Transport is an immutable, copyable handle — copies share the underlying connection.

Index

Constants

View Source
const HeaderTimestamp = "Toc-Timestamp-Unix-Nano"

HeaderTimestamp is the NATS header key for diagnosis timestamp. Diagnosis proto lacks envelope fields — this header carries timestamp_unix_nano as a transport-specific shim.

Variables

This section is empty.

Functions

func ValidatePipelineID

func ValidatePipelineID(id string) error

ValidatePipelineID returns an error if id is not a valid single NATS subject token. It rejects empty strings, NATS-reserved characters (`.`, `*`, `>`), and whitespace.

Types

type Option

type Option func(*config) error

Option configures a Transport.

func WithErrorHandler

func WithErrorHandler(fn func(error)) Option

WithErrorHandler sets the callback for subscriber-side failures. The handler may be called concurrently across subscriptions. It is called synchronously from the callback path and must return quickly. It must not panic and must not call toc.Subscription.Close synchronously.

Default: discard all errors.

func WithPrefix

func WithPrefix(prefix string) Option

WithPrefix sets the NATS subject prefix. Default is "toc". Multi-token prefixes (e.g. "toc.dev") are allowed.

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport implements toc.Publisher and toc.Subscriber over core NATS.

Transport is an immutable, copyable handle. Copies share the underlying nats.Conn. The caller owns connection lifecycle.

func New

func New(conn *nats.Conn, opts ...Option) (Transport, error)

New creates a Transport. The caller owns the conn lifecycle.

func (Transport) PublishDiagnosis

func (t Transport) PublishDiagnosis(ctx context.Context, msg toc.DiagnosisMessage) error

PublishDiagnosis implements toc.DiagnosisPublisher.

func (Transport) PublishObservations

func (t Transport) PublishObservations(ctx context.Context, batch toc.ObservationBatch) error

PublishObservations implements toc.ObservationPublisher.

func (Transport) SubscribeAllDiagnosis

func (t Transport) SubscribeAllDiagnosis(ctx context.Context, fn toc.DiagnosisHandler) (toc.Subscription, error)

SubscribeAllDiagnosis implements toc.DiagnosisSubscriber.

func (Transport) SubscribeAllObservations

func (t Transport) SubscribeAllObservations(ctx context.Context, fn toc.ObservationHandler) (toc.Subscription, error)

SubscribeAllObservations implements toc.ObservationSubscriber.

func (Transport) SubscribeDiagnosis

func (t Transport) SubscribeDiagnosis(ctx context.Context, pipelineID string, fn toc.DiagnosisHandler) (toc.Subscription, error)

SubscribeDiagnosis implements toc.DiagnosisSubscriber.

func (Transport) SubscribeObservations

func (t Transport) SubscribeObservations(ctx context.Context, pipelineID string, fn toc.ObservationHandler) (toc.Subscription, error)

SubscribeObservations implements toc.ObservationSubscriber.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL