Documentation
¶
Overview ¶
Package natstransport implements toc.Publisher and toc.Subscriber over core NATS (not JetStream).
Delivery semantics ¶
This is an at-most-once transport. A nil publish error means the NATS client accepted the message for transmission — it is NOT an acknowledgment of server receipt. Messages may be lost during disconnect/reconnect windows. For durable delivery, use JetStream (requires different interfaces).
Error handling ¶
All subscriber-side failures — handler errors, unmarshal errors, validation errors, and recovered panics — are routed to the error handler configured via WithErrorHandler. The default error handler discards all errors silently. Callers who need observability MUST configure an error handler.
Handler contract ¶
Handlers are invoked serially per subscription. Handlers MUST NOT call toc.Subscription.Close synchronously — doing so deadlocks because the handler holds the active callback count that Close waits to reach zero. Handlers should return promptly; a handler that blocks indefinitely prevents toc.Subscription.Close from completing.
Diagnosis wire format ¶
The Diagnosis protobuf message lacks pipeline_id and timestamp_unix_nano envelope fields (unlike ObservationBatch which has them). This transport uses NATS message headers as a shim:
- Pipeline ID: extracted from the NATS subject
- Timestamp: HeaderTimestamp NATS header (base-10 int64, >= 0)
This is a transport-specific compatibility measure. If diagnosis needs a transport-agnostic wire contract, add a proto envelope.
Connection ownership ¶
The caller owns the nats.Conn lifecycle. Transport is an immutable, copyable handle — copies share the underlying connection.
Index ¶
- Constants
- func ValidatePipelineID(id string) error
- type Option
- type Transport
- func (t Transport) PublishDiagnosis(ctx context.Context, msg toc.DiagnosisMessage) error
- func (t Transport) PublishObservations(ctx context.Context, batch toc.ObservationBatch) error
- func (t Transport) SubscribeAllDiagnosis(ctx context.Context, fn toc.DiagnosisHandler) (toc.Subscription, error)
- func (t Transport) SubscribeAllObservations(ctx context.Context, fn toc.ObservationHandler) (toc.Subscription, error)
- func (t Transport) SubscribeDiagnosis(ctx context.Context, pipelineID string, fn toc.DiagnosisHandler) (toc.Subscription, error)
- func (t Transport) SubscribeObservations(ctx context.Context, pipelineID string, fn toc.ObservationHandler) (toc.Subscription, error)
Constants ¶
const HeaderTimestamp = "Toc-Timestamp-Unix-Nano"
HeaderTimestamp is the NATS header key for diagnosis timestamp. Diagnosis proto lacks envelope fields — this header carries timestamp_unix_nano as a transport-specific shim.
Variables ¶
This section is empty.
Functions ¶
func ValidatePipelineID ¶
ValidatePipelineID returns an error if id is not a valid single NATS subject token. It rejects empty strings, NATS-reserved characters (`.`, `*`, `>`), and whitespace.
Types ¶
type Option ¶
type Option func(*config) error
Option configures a Transport.
func WithErrorHandler ¶
WithErrorHandler sets the callback for subscriber-side failures. The handler may be called concurrently across subscriptions. It is called synchronously from the callback path and must return quickly. It must not panic and must not call toc.Subscription.Close synchronously.
Default: discard all errors.
func WithPrefix ¶
WithPrefix sets the NATS subject prefix. Default is "toc". Multi-token prefixes (e.g. "toc.dev") are allowed.
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
Transport implements toc.Publisher and toc.Subscriber over core NATS.
Transport is an immutable, copyable handle. Copies share the underlying nats.Conn. The caller owns connection lifecycle.
func (Transport) PublishDiagnosis ¶
PublishDiagnosis implements toc.DiagnosisPublisher.
func (Transport) PublishObservations ¶
PublishObservations implements toc.ObservationPublisher.
func (Transport) SubscribeAllDiagnosis ¶
func (t Transport) SubscribeAllDiagnosis(ctx context.Context, fn toc.DiagnosisHandler) (toc.Subscription, error)
SubscribeAllDiagnosis implements toc.DiagnosisSubscriber.
func (Transport) SubscribeAllObservations ¶
func (t Transport) SubscribeAllObservations(ctx context.Context, fn toc.ObservationHandler) (toc.Subscription, error)
SubscribeAllObservations implements toc.ObservationSubscriber.
func (Transport) SubscribeDiagnosis ¶
func (t Transport) SubscribeDiagnosis(ctx context.Context, pipelineID string, fn toc.DiagnosisHandler) (toc.Subscription, error)
SubscribeDiagnosis implements toc.DiagnosisSubscriber.
func (Transport) SubscribeObservations ¶
func (t Transport) SubscribeObservations(ctx context.Context, pipelineID string, fn toc.ObservationHandler) (toc.Subscription, error)
SubscribeObservations implements toc.ObservationSubscriber.