record

package
v0.47.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2026 License: MPL-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package record provides topic recording, deterministic replay, and fault injection for go-DDS applications.

Recording captures all samples delivered to a set of subscribers and writes them to an io.Writer as newline-delimited JSON (JSONL). Each line is a RecordedSample object.

Replay reads a JSONL recording and re-publishes the samples through a Participant, preserving original timing (or scaling it).

Fault injection wraps any Publisher and injects configurable faults — packet loss, delay, payload corruption, and duplication — to stress-test DDS consumers without modifying the network or transport.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type FaultOptions

type FaultOptions struct {
	// LossRate is the probability that a Write call is silently dropped.
	LossRate float64
	// DelayMin and DelayMax bound the random additional latency added before
	// forwarding a sample. DelayMin == DelayMax imposes a fixed delay.
	// Both zero means no added latency.
	DelayMin time.Duration
	DelayMax time.Duration
	// CorruptRate is the probability that one byte of the payload is bit-flipped
	// before forwarding.
	CorruptRate float64
	// DuplicateRate is the probability that a sample is forwarded twice.
	DuplicateRate float64
	// ReorderWindow, when > 1, buffers up to ReorderWindow samples and emits
	// them in randomised order when the window fills, simulating out-of-order
	// delivery. Samples buffered at Close are flushed in shuffled order.
	ReorderWindow int
}

FaultOptions configures the fault injection behaviour of a FaultPublisher. All probability fields are in [0.0, 1.0]; values outside that range are clamped at call time.

type FaultPublisher

type FaultPublisher struct {
	// contains filtered or unexported fields
}

FaultPublisher wraps a dds.Publisher and injects faults on Write according to the given FaultOptions. It satisfies the dds.Publisher interface.

func NewFaultPublisher

func NewFaultPublisher(pub dds.Publisher, opts FaultOptions, seed int64) *FaultPublisher

NewFaultPublisher wraps pub with fault injection configured by opts. seed is passed to the internal PRNG; pass 0 for a time-derived seed.

func (*FaultPublisher) Close

func (f *FaultPublisher) Close() error

Close flushes any buffered reorder window, then closes the underlying publisher.

func (*FaultPublisher) Write

func (f *FaultPublisher) Write(payload []byte) error

Write applies configured faults and then forwards to the underlying publisher. The call may block if DelayMin > 0.

func (*FaultPublisher) WriteCtx added in v0.10.0

func (f *FaultPublisher) WriteCtx(ctx context.Context, payload []byte) error

WriteCtx applies configured faults and forwards to the underlying publisher, honouring ctx cancellation during any configured delay.

type Player

type Player struct {
	// contains filtered or unexported fields
}

Player reads a JSONL recording written by Recorder and replays all samples through freshly created publishers on a Participant.

func NewPlayer

func NewPlayer(r io.Reader, p dds.Participant) *Player

NewPlayer returns a Player that reads from r and creates publishers on p.

func (*Player) Play

func (pl *Player) Play(ctx context.Context) error

Play replays the recording at 1× speed (real-time). The first sample is published immediately; subsequent samples are delayed to match the original inter-sample timing recorded by the Recorder.

func (*Player) PlayFiltered

func (pl *Player) PlayFiltered(ctx context.Context, topics []string) error

PlayFiltered replays only samples whose topic is in the allow-list. A nil or empty allow-list plays all topics.

func (*Player) PlayScaled

func (pl *Player) PlayScaled(ctx context.Context, speed float64) error

PlayScaled replays with time scaling. speed > 1.0 compresses time; speed < 1.0 stretches it. Values ≤ 0 are clamped to 1.0.

type RecordedSample

type RecordedSample struct {
	Topic      string    `json:"topic"`
	Payload    []byte    `json:"payload"`
	Timestamp  time.Time `json:"timestamp"`
	RecordedAt time.Time `json:"recorded_at"`
}

RecordedSample is the JSON-line format written by Recorder. Payload is base64-encoded when marshalled to JSON (standard encoding/json behaviour for []byte fields).

type Recorder

type Recorder struct {
	// contains filtered or unexported fields
}

Recorder subscribes to one or more topics and writes all received samples to an io.Writer as JSONL. Each call to AddTopic registers a subscriber whose channel will be drained into the recording.

func NewRecorder

func NewRecorder(w io.Writer) *Recorder

NewRecorder returns a Recorder that writes JSONL to w.

func (*Recorder) AddTopic

func (r *Recorder) AddTopic(sub dds.Subscriber) *Recorder

AddTopic registers sub for recording. Must be called before Start. Returns r for chaining.

func (*Recorder) Start

func (r *Recorder) Start() *Recorder

Start launches one drain goroutine per registered subscriber. Returns r for chaining.

func (*Recorder) Stop

func (r *Recorder) Stop()

Stop signals all drain goroutines to exit and waits for them to finish. Safe to call more than once.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL