dds

package module
v0.47.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2026 License: MPL-2.0 Imports: 12 Imported by: 0

README

go-DDS

A generic Go library for DDS (Data Distribution Service) publish/subscribe. Works in any domain — IoT, robotics, industrial control, vehicle networks, simulation, and more.

The API is a stable Go interface. Implementations are swappable without changing application code.

CI Go Reference

Packages

Package Description Requires
mock In-process broker. Zero dependencies. Default for development and testing. Nothing
rtps Pure-Go RTPS/UDP wire protocol. Real DDS across processes and hosts. Nothing
cyclone CycloneDDS via CGo. Full wire interop with non-Go participants. libcyclonedds-dev + -tags cyclone
shmem Shared-memory transport. Zero UDP overhead for same-host pub/sub. Nothing
security Pluggable payload security — NullPlugin, HMAC-SHA-256, AES-256-GCM, CertPlugin (X.509/ECDSA), AccessPolicy (topic ACL), ReplayGuard (anti-replay); HMACDiscoveryPlugin for SPDP-layer peer authentication. Nothing
xtypes Dynamic Data / XTypes — TypeDescriptor, TypeIdentifier, DynamicData, TypeRegistry, CheckCompatibility. Nothing
config JSON/YAML participant configuration with validation. Nothing
monitor Real-time web dashboard. /health, /api/topics, /api/diagnostics, SSE discovery events. Nothing
tsn TSN stream model, TAPRIO scheduling, stream health tracking. Nothing
bridge/domain Domain bridge — forward samples between two Participant domains in-process. Nothing
bridge/mqtt Bidirectional DDS ↔ MQTT bridge with QoS and topic mapping. Nothing
bridge/wan WAN bridge — forward DDS samples between domains over TCP (length-framed JSON, 16 MiB cap); optional TLS + shared-token auth. Nothing
admin HTTP admin API — /admin/health, /admin/topics, /admin/discovery, /admin/publish; bearer-token auth. Nothing
services Managed service lifecycle — RecorderService, ReplayService (loop + seek), MonitorService. Nothing
record Topic recording to JSONL, deterministic replay (real-time or scaled), fault injection. Nothing
pool Allocation-free byte-slice recycling and fixed-capacity sample ring buffer. Nothing
safety E2E protection header (CRC-16, sequence counter, freshness) and deterministic queue. Nothing
testutil Test harness helpers: NewParticipant, AssertSample, TopicRecorder, BurstPublish. Nothing
cmd/ddstool CLI tool: pub, sub, discover subcommands. Nothing

Install

go get github.com/SoundMatt/go-DDS

Quick start

import (
    dds "github.com/SoundMatt/go-DDS"
    "github.com/SoundMatt/go-DDS/mock"
)

p, _ := mock.New(dds.Domain(0))
defer p.Close()

sub, _ := p.NewSubscriber("sensors/temperature", dds.DefaultQoS)
pub, _ := p.NewPublisher("sensors/temperature", dds.DefaultQoS)

pub.Write([]byte(`{"value": 21.5, "unit": "celsius"}`))

sample := <-sub.C()
fmt.Println(string(sample.Payload)) // {"value": 21.5, "unit": "celsius"}

Switching implementations

Application code only ever references the dds interface package. Swap at the call site:

// Development / tests — no system library needed:
import "github.com/SoundMatt/go-DDS/mock"
p, err := mock.New(dds.Domain(0))

// Production — pure-Go UDP transport, no native deps:
import "github.com/SoundMatt/go-DDS/rtps"
p, err := rtps.New(dds.Domain(0))

// Same host, zero UDP overhead:
import "github.com/SoundMatt/go-DDS/shmem"
p, err := shmem.New(dds.Domain(0))

// Interop with non-Go DDS participants:
// go build -tags cyclone ./...
import "github.com/SoundMatt/go-DDS/cyclone"
p, err := cyclone.New(dds.Domain(0))

QoS

// Live data — best-effort, volatile (default)
pub, _ := p.NewPublisher("robot/joint/angles", dds.DefaultQoS)

// Commands — reliable delivery, late joiners see current state
cmd, _ := p.NewPublisher("robot/joint/target", dds.ReliableQoS)

WaitSet

dds.WaitSet multiplexes over multiple subscribers — no polling loop required:

subTemp, _ := p.NewSubscriber("sensors/temp", dds.DefaultQoS)
subSpeed, _ := p.NewSubscriber("vehicle/speed", dds.DefaultQoS)

ws := dds.NewWaitSet(subTemp, subSpeed)
for {
    sample, sub, err := ws.Wait(ctx)
    if err != nil { break }
    switch sub {
    case subTemp:  fmt.Println("temp:", string(sample.Payload))
    case subSpeed: fmt.Println("speed:", string(sample.Payload))
    }
}

Security

Pluggable payload-level security via the security package:

import (
    "github.com/SoundMatt/go-DDS/rtps"
    "github.com/SoundMatt/go-DDS/security"
)

key := security.NewRandomKey(32)

// AES-256-GCM: full encryption + authentication
aesPlugin, _ := security.NewAESGCMPlugin(key)
p, _ := rtps.New(dds.Domain(0), rtps.WithSecurity(aesPlugin))

// HMAC-SHA-256: integrity + authentication, no encryption
hmacPlugin := security.NewHMACPlugin(key)
p, _ = rtps.New(dds.Domain(0), rtps.WithSecurity(hmacPlugin))

All peers on a topic must use the same plugin and key.

Enterprise security (v0.9): X.509/ECDSA certificate authentication, topic ACL, and anti-replay protection:

// CertPlugin: mutual authentication + payload encryption via X.509/ECDSA.
// Implements the rtps.SecurityPlugin (Seal/Open) interface, so it wires into
// the data path with WithSecurity. Argument order: cert, key, CA.
certPlugin, _ := security.NewCertPlugin(myCertPEM, myKeyPEM, caPEM)
p, _ := rtps.New(dds.Domain(0), rtps.WithSecurity(certPlugin))

// Secure Discovery: HMAC-SHA-256 authentication of SPDP announcements.
// Peers without the same key are silently ignored at the discovery layer.
discPlugin := security.NewHMACDiscoveryPlugin([]byte("shared-discovery-key"))
p, _ = rtps.New(dds.Domain(0), rtps.WithDiscoverySecurity(discPlugin))

AccessPolicy (topic ACL) and ReplayGuard (anti-replay) are enforced in the data path when configured, and compose with the encryption plugin above. Both are opt-in: with neither set, all topics are permitted and no samples are dropped.

// AccessPolicy: per-topic read/write ACL with path.Match patterns.
// NewPublisher fails with dds.ErrAccessDenied on a topic that fails CanWrite;
// NewSubscriber fails on a topic that fails CanRead.
policy := security.NewAccessPolicy(
    security.Rule{Pattern: "vehicle/speed", Allow: security.PermWrite},
    security.Rule{Pattern: "vehicle/*", Allow: security.PermRead},
)

// ReplayGuard: inbound samples whose sequence number repeats within the window
// are dropped before delivery.
guard := security.NewReplayGuard(5 * time.Second)

p, _ := rtps.New(dds.Domain(0),
    rtps.WithSecurity(certPlugin),     // encryption + authentication
    rtps.WithAccessControl(policy),    // topic ACL, enforced at endpoint creation
    rtps.WithAntiReplay(guard),        // anti-replay, enforced on the receive path
)

Context API

All three core operations support context cancellation:

// Publisher.WriteCtx — returns ctx.Err() immediately if context is done.
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err := pub.WriteCtx(ctx, payload)

// Subscriber.Unsubscribe — stops delivery without closing the channel.
// Use when you want to stop receiving but keep the channel readable for
// any already-buffered samples.
sub.Unsubscribe()   // no more samples delivered
<-sub.C()           // drain buffered samples (channel still open)
sub.Close()         // close channel when done

// Participant.Domain — inspect which domain a participant joined.
fmt.Println(p.Domain()) // dds.Domain(0)

Configuration

Load participant settings from a JSON or YAML file:

import "github.com/SoundMatt/go-DDS/config"

cfg, err := config.LoadConfig("dds.json")
// or:  cfg, err := config.ParseConfig([]byte(`{...}`))
if err != nil { log.Fatal(err) }

p, err := rtps.New(dds.Domain(0), rtps.WithConfig(cfg))

Runtime monitoring

The monitor package serves a live web dashboard with no external dependencies:

import "github.com/SoundMatt/go-DDS/monitor"

mon := monitor.New(p) // p implements MetricsProvider, HealthProvider, etc.
log.Fatal(http.ListenAndServe(":8080", mon))

Endpoints:

Path Description
GET /health JSON health status (ok / degraded / down)
GET /api/topics Per-topic metrics (write/deliver/drop counts, bytes)
GET /api/diagnostics Discovery metrics and transport statistics
GET /api/events SSE stream of discovery events (peer joined/left)

Recording and replay

Record live traffic to a JSONL file and replay it deterministically:

import "github.com/SoundMatt/go-DDS/record"

// Record
sub, _ := p.NewSubscriber("vehicle/speed", dds.DefaultQoS)
f, _ := os.Create("capture.jsonl")
rec := record.NewRecorder(f).AddTopic(sub).Start()
time.Sleep(10 * time.Second)
rec.Stop()

// Replay (real-time)
f2, _ := os.Open("capture.jsonl")
pl := record.NewPlayer(f2, p)
pl.Play(ctx)

// Replay at 4× speed, only one topic
pl.PlayScaled(ctx, 4.0)
pl.PlayFiltered(ctx, []string{"vehicle/speed"})

Fault injection

Stress-test consumers by wrapping any publisher with configurable faults:

import "github.com/SoundMatt/go-DDS/record"

pub, _ := p.NewPublisher("vehicle/speed", dds.DefaultQoS)
faulty := record.NewFaultPublisher(pub, record.FaultOptions{
    LossRate:      0.05,               // 5% packet loss
    DelayMin:      2 * time.Millisecond,
    DelayMax:      10 * time.Millisecond,
    CorruptRate:   0.01,               // 1% payload corruption
    DuplicateRate: 0.02,               // 2% duplicate delivery
}, 0 /* seed: 0 = time-seeded */)
defer faulty.Close()

faulty.Write(payload) // faults applied transparently

Safety communication

Protect topics with an 18-byte E2E header (DataID, SourceID, sequence counter, timestamp, CRC-16):

import "github.com/SoundMatt/go-DDS/safety"

cfg := safety.E2EConfig{
    DataID:   42,
    SourceID: 1,
    MaxAge:   100 * time.Millisecond, // freshness check
}

// Publisher side — header added automatically
rawPub, _ := p.NewPublisher("safety/speed", dds.DefaultQoS)
pub := safety.NewE2EPublisher(rawPub, cfg)

// Subscriber side — header stripped, checks run
rawSub, _ := p.NewSubscriber("safety/speed", dds.DefaultQoS)
sub := safety.NewE2ESubscriber(rawSub, cfg)

go func() {
    for e := range sub.Errors() {
        log.Printf("E2E fault: %v", e) // ErrCRCMismatch, ErrSequenceGap, ErrStaleSample
    }
}()

for s := range sub.C() {
    // s.Payload is the original payload, header already removed
}

Decouple writes from transport using a bounded, panic-containing queue:

q := safety.NewDeterministicQueue(pub, 128 /* depth */).Start()
defer q.Stop()

if err := q.Enqueue(payload); errors.Is(err, safety.ErrQueueFull) {
    // back-pressure: queue is full
}

Edge performance

Recycle payload buffers on high-throughput paths:

import "github.com/SoundMatt/go-DDS/pool"

bp := pool.New(1500) // capacity in bytes
buf := bp.Get()
buf = append(buf, payload...)
pub.Write(buf)
bp.Put(buf) // returned to pool; no allocation next cycle

Fixed-capacity ring buffer for decoupling a subscriber from a processing loop:

sb := pool.NewSampleBuffer(256)

// producer goroutine
go func() {
    for s := range sub.C() {
        if !sb.Push(s) { /* drop or handle back-pressure */ }
    }
}()

// consumer goroutine at its own rate
s, ok := sb.Pop()

Testing

The testutil package provides ready-made fixtures for unit and integration tests:

import "github.com/SoundMatt/go-DDS/testutil"

p := testutil.NewParticipant(t, dds.Domain(0)) // auto-closed at test end

sub, _ := p.NewSubscriber("test/topic", dds.DefaultQoS)
pub, _ := p.NewPublisher("test/topic", dds.DefaultQoS)

pub.Write([]byte("ping"))
testutil.AssertSample(t, sub, []byte("ping"), time.Second)

rec := testutil.NewTopicRecorder(sub).Start()
testutil.BurstPublish(pub, 10, []byte("burst"))
rec.WaitFor(10, 2*time.Second)

The ddstool CLI inspects a live domain without writing application code:

go run github.com/SoundMatt/go-DDS/cmd/ddstool pub -topic vehicle/speed -payload '{"kmh":80}' -count 10
go run github.com/SoundMatt/go-DDS/cmd/ddstool sub -topic vehicle/speed -count 5
go run github.com/SoundMatt/go-DDS/cmd/ddstool discover -wait 5s

TSN (Time-Sensitive Networking)

Map DDS topics directly to IEEE 802.1Qbv TAPRIO streams:

import "github.com/SoundMatt/go-DDS/tsn"

cfg, _ := tsn.LoadConfig("streams.json")
p, _ := rtps.New(dds.Domain(0), rtps.WithConfig(cfg.ParticipantConfig))

// Monitor write timing health per stream
stream := cfg.Streams[0]
tracker := tsn.NewHealthTracker(stream, 100 /* window */)
tracker.Record(time.Now())
health := tracker.Health() // Healthy=true when <5% of writes are late

// Generate tc-qdisc command for a TAPRIO gate
taprioCfg, _ := tsn.TAPRIOFromStreams(cfg)
fmt.Println(taprioCfg.TCCommand("eth0", 0))
// tc qdisc replace dev eth0 parent root handle 100 taprio ...

Dynamic Data (XTypes)

Inspect and construct DDS samples at runtime without compile-time types:

import "github.com/SoundMatt/go-DDS/xtypes"

// Describe the type once
td := xtypes.TypeDescriptor{
    Kind: xtypes.KindStruct,
    Fields: []xtypes.FieldDescriptor{
        {Name: "kmh",    Kind: xtypes.KindFloat64},
        {Name: "source", Kind: xtypes.KindString},
    },
}

// Build and validate a sample against the descriptor
d := xtypes.NewDynamicData(&td)
_ = d.Set("kmh", 80.0)
_ = d.Set("source", "gps")

payload, _ := d.ToJSON()   // serialize
d2 := xtypes.NewDynamicData(&td)
_ = d2.FromJSON(payload)   // deserialize with schema validation

// Forward/backward compatibility check
compatible, err := xtypes.CheckCompatibility(&writerDesc, &readerDesc)

Domain Bridge

Forward DDS samples between two in-process Participant domains (e.g. a simulator domain and an integration-test domain):

import "github.com/SoundMatt/go-DDS/bridge/domain"

b, err := domain.New(srcParticipant, dstParticipant, domain.Options{
    Topics: []string{"vehicle/speed", "vehicle/status"},
})
b.Start()
defer b.Close()

WAN Bridge

Forward DDS samples between domains over a TCP connection (e.g. edge → cloud):

import "github.com/SoundMatt/go-DDS/bridge/wan"

// Cloud side — receive frames and publish to the cloud participant
srv, err := wan.Serve(cloudParticipant, ":9000", wan.Options{})
defer srv.Close()

// Edge side — subscribe to topics and stream to the cloud
cli, err := wan.Connect(edgeParticipant, "cloud.example.com:9000", wan.Options{
    Topics: []string{"vehicle/speed", "vehicle/status"},
})
defer cli.Close()

Wire format: 4-byte big-endian length prefix + JSON {"t":"<topic>","p":"<base64>"}. Hard cap: 16 MiB per frame. For bidirectional bridging, create one pair in each direction.

Admin API

HTTP endpoints for runtime inspection and publishing without a DDS client:

import "github.com/SoundMatt/go-DDS/admin"

srv, err := admin.New(p, admin.Options{
    Addr:  ":8081",
    Token: "secret",   // Bearer token; empty = no auth
})
defer srv.Close()
Endpoint Method Description
/admin/health GET Participant health status
/admin/topics GET Per-topic metrics
/admin/discovery GET Discovery metrics
/admin/publish POST Publish a payload to a topic

Services

Managed lifecycle wrappers for recorder, replay, and monitor:

import "github.com/SoundMatt/go-DDS/services"

// Record to file with managed start/stop
recSvc := services.NewRecorderService(p, services.RecorderOptions{
    Output: f,
    Topics: []string{"vehicle/speed"},
})
recSvc.Start()
time.Sleep(10 * time.Second)
recSvc.Stop()

// Replay from file (loop until stopped)
replSvc := services.NewReplayService(p, services.ReplayOptions{
    Input: f2,
    Loop:  true,
    Speed: 2.0,   // 2× real-time
})
replSvc.Start()
defer replSvc.Stop()

// Run the monitor as a managed service
monSvc := services.NewMonitorService(p, ":8080")
monSvc.Start()
defer monSvc.Stop()

RTPS interoperability testing

Wire-compatibility tests against a live CycloneDDS peer (gated behind the interop build tag):

docker compose -f interop/docker-compose.yml up -d cyclone-peer
go test -tags interop -v -timeout 60s ./interop/...
docker compose -f interop/docker-compose.yml down

Using CycloneDDS

# Linux
apt-get install -y libcyclonedds-dev

# macOS
brew install cyclonedds

# Build + test
go build -tags cyclone ./...
go test -tags cyclone ./cyclone/...

CI

Job Platforms Notes
test-mock ubuntu, macOS, Windows × Go 1.25/1.26 race detector, full coverage
test-rtps ubuntu -short
test-cyclone ubuntu-22.04 skips cleanly if libcyclonedds-dev absent
benchmark-smoke ubuntu 1 iteration each, catches panics/deadlocks
fuzz-short ubuntu 10 s per fuzz target
lint ubuntu golangci-lint v2
dco PR only Signed-off-by check

Roadmap

See ROADMAP.md for per-milestone goals, sub-items, and success criteria.

Released — v0.1 – v0.3

  • Go interface (Participant, Publisher, Subscriber, QoS, WaitSet)
  • In-process mock broker — 100% statement coverage
  • CycloneDDS CGo implementation (-tags cyclone)
  • Pure-Go RTPS/UDP — no CGo, all platforms
  • Reliable QoS retransmission (HEARTBEAT / ACKNACK)
  • DDS-Security plugin interface (NullPlugin, HMAC-SHA-256, AES-256-GCM)
  • TransientLocal durability — late-joiner last-value cache
  • IPv6 multicast transport
  • RTPS interop testing with CycloneDDS (Docker Compose)
  • Sentinel errors, unicast discovery, content filters, deadline QoS
  • Large-payload fragmentation (DATA_FRAG), topic wildcards, metrics
  • Persistent history, real-time web monitor

Released — v0.4

  • Configurable channel depth and back-pressure (DropNewest / DropOldest / Block)
  • Structured logging (WithLogger(*slog.Logger))
  • Participant liveliness detection (WithLivelinessCallback)
  • Graceful shutdown with reliable-ACK drain (CloseWithDrain)
  • Multicast data delivery
  • Shared-memory transport (shmem/)
  • INFO_TS submessage — source timestamps in Sample.Timestamp
  • MQTT bridge (bridge/mqtt/)
  • Typed generics (TypedPublisher[T], TypedSubscriber[T], JSONCodec[T])
  • OpenTelemetry-compatible tracing

Released — v0.5 — TSN

  • TSN-extended QoS fields (TransportPriority, LatencyBudget, Lifespan, PublishPeriod)
  • DDS-to-TSN stream model (tsn.Stream, tsn.StreamConfig, tsn.LoadConfig)
  • VLAN, PCP, and DSCP socket marking (Linux)
  • Scheduled transmit time (SO_TXTIME + CLOCK_TAI + ETF/TAPRIO, Linux)
  • Per-PCP traffic-class sockets, TSN-safe discovery, fragmentation bounds

Released — v0.6 — Production Runtime + Observability

  • JSON/YAML participant configuration (config/)
  • DiscoveryMetrics, TopicMetrics, Health interfaces
  • Per-topic atomic counters in rtps and mock
  • WithHeartbeatPeriod, WithConfig RTPS options
  • Monitor /health, /api/topics, /api/diagnostics, SSE discovery events

Released — v0.7 — Developer Experience + Deterministic Networking

  • Test harness helpers (testutil/) — NewParticipant, AssertSample, TopicRecorder, BurstPublish, PeriodicPublish
  • CLI tool (cmd/ddstool) — pub, sub, discover subcommands
  • TSN diagnostics (tsn.HealthTracker, tsn.TAPRIOConfig, TCCommand)
  • CI upgraded to Node.js 24 and golangci-lint v2

Released — v0.8 — Verification, Edge Performance, Safety

  • Topic recording to JSONL (record.Recorder) and deterministic replay (record.Player) — real-time, time-scaled, topic-filtered
  • Fault injection wrapper (record.FaultPublisher) — packet loss, delay, corruption, duplication, reordering
  • Allocation-free buffer recycling (pool.BytePool) and sample ring buffer (pool.SampleBuffer)
  • E2E protection header (safety.E2EPublisher / safety.E2ESubscriber) — CRC-16/CCITT, sequence counter, configurable freshness window
  • Deterministic queue with panic containment (safety.DeterministicQueue)

Released — v0.9 — Enterprise Security, Dynamic Data, Services, Context API

  • CertPlugin (X.509/ECDSA mutual auth), AccessPolicy (topic ACL), ReplayGuard (anti-replay) — security/
  • XTypes dynamic data — TypeDescriptor, TypeIdentifier (content hash), DynamicData, TypeRegistry, CheckCompatibility — xtypes/
  • Domain bridge — in-process participant-to-participant forwarding — bridge/domain/
  • WAN bridge — TCP forwarding with length-framed JSON wire format, 16 MiB cap — bridge/wan/
  • HTTP admin API — health, metrics, discovery, publish; bearer-token auth — admin/
  • Managed service lifecycle — RecorderService, ReplayService (loop + seek + speed), MonitorService — services/
  • Participant.Domain() accessor — dds.Participant interface
  • Publisher.WriteCtx(ctx, payload) — context-aware writes across all transports
  • Subscriber.Unsubscribe() — non-destructive deregistration (channel stays open)
  • mock.IsolatedBroker() — per-test broker isolation, eliminates cross-test echo loops
  • HMACDiscoveryPlugin + rtps.WithDiscoverySecurity() — HMAC-SHA-256 authenticated SPDP announcements

Released — v0.9.1 — Spec Completeness and Go Idioms

  • Sample.SequenceNumber and Sample.WriterGUID — per-writer monotonic counter and endpoint identity on all transports (mock, shmem, rtps)
  • Subscriber.TryRead() — non-blocking read on all transports (mock, shmem, rtps, cyclone)
  • Active subscriber Deadline QoS enforcement — WithDeadlineMissed(fn) fires callback when no sample arrives within QoS.Deadline period
  • Wildcard subscriptions in rtps and shmem transports (MQTT-style + and #)
  • rpc package — OMG DDS-RPC style request-reply via Requester[Req,Rep] and Replier[Req,Rep]
  • GobCodec[T] — stdlib binary codec complementing JSONCodec[T]
  • ErrQoSMismatch, ErrDeadlineMissed, ErrSampleRejected, ErrResourceLimits sentinel errors
  • TypedSample[T] gains SequenceNumber and WriterGUID fields

Released — v0.9.2 — ProtoCodec, Reorder Fault Injection, WithContext, Fuzz Coverage

  • ProtoCodec[T proto.Message] — Protocol Buffers codec complementing JSONCodec[T] and GobCodec[T]
  • FaultPublisher.ReorderWindow — shuffle a window of N samples on emit, simulating out-of-order delivery
  • FaultPublisher.WriteCtx(ctx, payload) — context-aware fault writes honouring deadline cancellation during delay
  • mock.WithContext(ctx) / rtps.WithContext(ctx) — tie participant lifetime to a context.Context
  • Fuzz targets for security (HMAC + AES-GCM round-trip and arbitrary-input safety)
  • Fuzz targets for rpc wire format (reply/request dispatch robustness, round-trip)
  • Fuzz targets for ProtoCodec (round-trip and arbitrary Unmarshal)

Released — v0.10 — Dynamic WaitSet, REST/SSE Bridge, Secure SEDP, TypeRegistry, Docker Quickstart

  • WaitSet.Attach(subs...) / WaitSet.Detach(subs...) — dynamically add/remove subscribers from an in-flight WaitSet (snapshot-safe, race-free)
  • bridge/rest — HTTP/SSE gateway: GET /topics (list), GET /topics/{t} (SSE stream, base64 payload), POST /topics/{t} (publish); Bearer token auth; keepalive pings
  • rtps.EndpointPlugin + HMACDiscoveryPlugin.SignEndpoint/VerifyEndpoint — HMAC-SHA-256 endpoint tags embedded in SEDP announcements (vendor PID 0x8002); unauthenticated endpoints are silently rejected
  • xtypes.TopicTypeRegistry + RegisterTopicCodec[T] + GlobalTopicRegistry — map topic names to Go type names for codec autodiscovery
  • Docker Quickstart — cmd/monitor, examples/quickstart/{pub,sub}, multi-stage docker/Dockerfile, docker/docker-compose.yml (network_mode: host for RTPS multicast)

Released — v0.11 — gRPC Bridge, Key Rotation, Bridge Networking, Interop, DevContainer, GHCR

  • bridge/grpc — gRPC gateway (JSON codec): Subscribe server-streaming, Publish unary, StreamPublish client-streaming; Bearer token auth interceptors; per-topic filter and transform hooks; YAML config (LoadConfig/ApplyConfig)
  • HMACDiscoveryPlugin.Rekey(newKey) — atomic key rotation; RWMutex-safe; old tags are immediately invalidated for both SPDP and SEDP
  • Docker bridge networking — DDS_PEERS + WithNoMulticast across all quickstart binaries; docker/docker-compose.yml updated to bridge network (works on macOS/Windows Docker Desktop); docker/docker-compose.host.yml overlay for Linux host networking
  • docker/compose.interop.yml — CycloneDDS peer containers on the same bridge network for cross-implementation wire-compat testing
  • .devcontainer/devcontainer.json — Go 1.25 dev container with golangci-lint, Docker-in-Docker, and VS Code Go extension; works in GitHub Codespaces
  • .github/workflows/docker-publish.yml — multi-arch (linux/amd64, linux/arm64) GHCR publish on push to main and version tags

Released — v0.12 — Examples, Safety Completeness, IDL/CDR Compiler, TAPRIO

  • examples/sensor-pipeline, examples/command-response, examples/secure-topic, examples/taprio-stream — self-contained runnable examples with go run .
  • E2ESubscriber schema validation — per-topic payload-length and schema-hash checks with SafetyEventKindSchemaViolation
  • safety.Metrics per-topic violation counters (CRC, sequence gap, stale, schema) via SafetyMetrics() snapshot
  • Monitor SSE safety event type — WatchSafety() broadcasts SafetyEvent to browser clients
  • idl/.idl → Go struct + Codec[T] code generation; CDR/XCDR1 encode/decode
  • tsn.TAPRIOConfig.Apply() — TAPRIO qdisc configuration via netlink
  • go-FuSa: 280+ requirements, all [traced+tested]

Released — v0.13.x — IDL Factory Codegen, RateMonitor, TSN Dashboard

  • IDL NewXxxPublisher/NewXxxSubscriber typed factory wrappers
  • safety.RateMonitor — threshold-based violation-rate alerting (events/sec per topic per kind)
  • TSN health dashboard in monitor — /api/tsn, tsn_health SSE events, tsn.HealthTracker
  • IDL nested struct CDR encode/decode (v0.13.1)
  • IDL array (T name[N]), enum, qualified type names, ddstool idl CLI (v0.13.2)

Released — v0.14.x — IDL Completeness, go-FuSa Compliance, Safety Certification Package

  • IDL go/format output, @key annotation, typedef, end-to-end roundtrip harness, --package flag (v0.14.0)
  • IDL fuzz targets (FuzzIDLParse, FuzzIDLGenerate) + parseEnum infinite-loop fix (v0.14.1)
  • go-FuSa v0.19.0 → v0.21.0 compliance: LINT001/ANA007/CYBER017 fixes; cycle detection in IDL generator (v0.14.1–v0.14.3)
  • CI: pinned gofusa safety gate (0 errors required to merge) (v0.14.2)
  • Release workflow — auto-regenerates FMEA, safety case, SBOM, provenance on every tag (v0.14.3)
  • HARA.md — tabletop ISO 26262-3 HARA; H-01 late delivery → ASIL-B
  • GC_LATENCY.md — measured STW pause MAX 146µs, E2E latency MAX 305µs; formal GSN argument
  • cmd/latmon — continuous rolling-window latency monitor with JSON output
  • cert/ — complete certification package: PSAC, SDP, SVP, SCMP, SQAP, LLR, SCR, DCA, TQPs, PRP, DEVIATIONS, RELEASE_LOG
  • SAFETY_PLAN.md, CODING_STANDARD.md, STANDARDS_GAP.md — ISO 26262 / IEC 61508 / DO-178C gap analysis
  • 238 requirements, all traced + tested; gofusa check 0 errors

See ROADMAP.md for goals and sub-items.

Example use cases

Domain Topic example QoS
Robotics robot/arm/joint_states BestEffort (100 Hz sensor)
Industrial plc/conveyor/speed Reliable (actuator command)
Vehicle networks vehicle/speed BestEffort
Simulation sim/entity/pose BestEffort
IoT building/floor3/temp Reliable

Wire format

Each DDS sample payload is raw bytes. The application chooses the encoding — JSON, Protobuf, MessagePack, plain text, or anything else. go-DDS does not impose a schema.

The RTPS transport encodes payloads as CDR_LE byte arrays, compatible with the RTPS 2.3 wire format. The CycloneDDS implementation uses an opaque RawMessage DDS type.

Contributing

See CONTRIBUTING.md. All commits require a DCO sign-off (Signed-off-by:).

License

Mozilla Public License v2.0 — see LICENSE.
Copyright (c) 2026 Matt Jones.

Documentation

Overview

Package dds defines the Go interface for Data Distribution Service (DDS) publish/subscribe operations.

The interface is intentionally narrow: it covers the pub/sub primitives needed for vehicle-signal transport and nothing more.

Choose an implementation by importing one of the sub-packages and calling its New function:

import "github.com/SoundMatt/go-DDS/mock"    // in-process, no CGo
import "github.com/SoundMatt/go-DDS/cyclone" // CycloneDDS via CGo
import "github.com/SoundMatt/go-DDS/rtps"    // pure-Go RTPS/UDP

All packages expose a New(Domain) (Participant, error) constructor that satisfies this package's Participant interface.

Index

Examples

Constants

View Source
const SpecVersion = relay.SpecVersion

SpecVersion is the RELAY spec version this package conforms to (§17 req 12). It tracks the linked RELAY module's constant so the two never drift; bumping the RELAY dependency advances this automatically.

Variables

View Source
var DefaultQoS = QoS{
	Reliability:  BestEffort,
	Durability:   Volatile,
	HistoryDepth: 1,
}

DefaultQoS is BestEffort + Volatile with implementation-default history.

View Source
var ErrAccessDenied = errors.New("dds: access denied by topic ACL")

ErrAccessDenied is returned when a topic ACL forbids creating a publisher or subscriber for the requested topic. It is only ever returned when an access controller has been configured on the participant.

View Source
var ErrClosed = fmt.Errorf("dds: entity is closed: %w", relay.ErrClosed)

ErrClosed is returned when an operation is attempted on a closed entity.

View Source
var ErrDeadlineMissed = fmt.Errorf("dds: deadline missed — no sample within QoS.Deadline period: %w", relay.ErrTimeout)

ErrDeadlineMissed is returned when a subscriber receives no sample within its QoS.Deadline period. Wraps relay.ErrTimeout per spec §5.3.

View Source
var ErrDomainOutOfRange = fmt.Errorf("dds: domain out of range [0,232]: %w", ErrNotConnected)

ErrDomainOutOfRange is returned when a Domain value is outside 0–232 inclusive (spec §15.2 dds.Domain). It wraps ErrNotConnected so callers that already test for a participant start-up failure continue to match.

View Source
var ErrLoanBuffer = errors.New("dds: loan buffer unavailable or invalid")

ErrLoanBuffer is returned when a loaned buffer cannot be obtained (pool exhausted or size exceeds pool capacity) or when Commit is called with a buffer that was not issued by the same LoaningPublisher.

View Source
var ErrNotConnected = fmt.Errorf("dds: not connected: %w", relay.ErrNotConnected)

ErrNotConnected is returned before a Participant has joined its domain or when socket allocation fails at startup.

View Source
var ErrPayloadTooLarge = fmt.Errorf("dds: payload exceeds QoS MaxSampleSize: %w", relay.ErrPayloadTooLarge)

ErrPayloadTooLarge is returned when Write is called with a payload that exceeds the MaxSampleSize set in the publisher's QoS.

View Source
var ErrQoSMismatch = errors.New("dds: QoS incompatibility between publisher and subscriber")

ErrQoSMismatch is returned when a publisher and subscriber have incompatible QoS policies.

View Source
var ErrResourceLimits = errors.New("dds: resource limit exceeded")

ErrResourceLimits is returned when a resource limit is exceeded.

View Source
var ErrSampleRejected = fmt.Errorf("dds: sample rejected — resource limits exceeded: %w", relay.ErrPayloadTooLarge)

ErrSampleRejected is returned when a sample is rejected because resource limits are exceeded. Wraps relay.ErrPayloadTooLarge per spec §5.3.

View Source
var ErrTimeout = fmt.Errorf("dds: timeout: %w", relay.ErrTimeout)

ErrTimeout is returned when an operation does not complete within the permitted time (e.g. WriteCtx with an expired context).

View Source
var ErrTopicEmpty = errors.New("dds: topic name must not be empty")

ErrTopicEmpty is returned when an empty topic string is passed.

View Source
var ReliableQoS = QoS{
	Reliability:  Reliable,
	Durability:   TransientLocal,
	HistoryDepth: 1,
}

ReliableQoS is Reliable + TransientLocal. Use for actuator commands and any topic where a late-joining subscriber must receive the current value.

Functions

func Adapt added in v0.41.0

func Adapt(p Participant) relay.Node

Adapt wraps p as a relay.Node (spec §10.3). Send publishes to the topic named by msg.ID, caching publishers by topic for efficiency. Subscribe returns a channel that closes when the node closes; DDS subscriptions are topic-specific — use Participant.NewSubscriber for per-topic receive paths.

func CloseWithDrain added in v0.4.0

func CloseWithDrain(ctx context.Context, p Participant) error

CloseWithDrain waits for all pending reliable ACKs then closes p. If p does not implement Drainer, it calls p.Close() directly.

func ValidateDomain added in v0.42.0

func ValidateDomain(d Domain) error

ValidateDomain returns ErrDomainOutOfRange if d is outside 0–232 inclusive, matching the RELAY dds.Domain canonical range (spec §15.2). It is the single source of truth for domain validation across all transports.

Types

type BackPressurePolicy added in v0.4.0

type BackPressurePolicy int

BackPressurePolicy controls what happens when a subscriber's channel is full.

const (
	// DropNewest silently discards the incoming sample when the channel is full.
	// This is the default policy and matches pre-v0.4 behaviour.
	DropNewest BackPressurePolicy = iota
	// DropOldest evicts the oldest queued sample to make room for the new one.
	DropOldest
	// Block waits until the channel has capacity (may block the writer goroutine).
	Block
)

type Codec added in v0.4.0

type Codec[T any] interface {
	Marshal(v T) ([]byte, error)
	Unmarshal(data []byte) (T, error)
}

Codec[T] marshals and unmarshals values of type T to/from []byte. Implement this interface to bind a schema (JSON, protobuf, msgpack, …) to a TypedPublisher or TypedSubscriber.

type DiscoveryMetrics added in v0.6.0

type DiscoveryMetrics struct {
	AnnouncesSent     uint64 // SPDP announcements sent
	AnnouncesReceived uint64 // SPDP announcements received from remote peers
	PeersKnown        uint64 // current number of known remote participants
	PeerEvictions     uint64 // cumulative peers evicted due to lease expiry
	EndpointMatches   uint64 // cumulative topic endpoint matches (local↔remote)
}

DiscoveryMetrics holds cumulative discovery statistics for a participant.

type DiscoveryMetricsProvider added in v0.6.0

type DiscoveryMetricsProvider interface {
	DiscoveryMetrics() DiscoveryMetrics
}

DiscoveryMetricsProvider is implemented by participants that expose discovery-layer statistics.

type Domain

type Domain int

Domain is a DDS domain identifier (0–232 inclusive per the DDS spec). Participants on the same domain and network segment discover each other automatically without a broker.

type Drainer added in v0.4.0

type Drainer interface {
	CloseWithDrain(ctx context.Context) error
}

Drainer is optionally implemented by Participants that support graceful shutdown: waiting for all in-flight reliable writes to be acknowledged before closing. If a Participant does not implement Drainer, CloseWithDrain falls back to a plain Close.

type DurabilityKind

type DurabilityKind int

DurabilityKind controls whether late-joining subscribers receive historical samples that were published before they joined.

const (
	// Volatile discards samples as soon as they are delivered.
	Volatile DurabilityKind = iota
	// TransientLocal retains the last N samples so that late joiners
	// receive current state on subscription.
	TransientLocal
)

type GUID added in v0.4.0

type GUID [16]byte

GUID is a globally unique 16-byte DDS participant or endpoint identifier. The first 12 bytes are the GuidPrefix; the last 4 bytes are the EntityId.

type GobCodec added in v0.9.1

type GobCodec[T any] struct{}

GobCodec[T] implements Codec[T] using encoding/gob. It is a zero-size struct; the zero value is ready to use. Suitable for concrete struct types exchanged within a single Go binary.

func (GobCodec[T]) Marshal added in v0.9.1

func (GobCodec[T]) Marshal(v T) ([]byte, error)

Marshal encodes v using gob encoding.

func (GobCodec[T]) Unmarshal added in v0.9.1

func (GobCodec[T]) Unmarshal(data []byte) (T, error)

Unmarshal decodes data from gob encoding into T.

type Health added in v0.6.0

type Health struct {
	// Status is the overall health classification.
	Status HealthStatus `json:"status"`
	// Details carries an optional human-readable or JSON-encoded string
	// describing per-subsystem state. Empty string means no details.
	Details string `json:"details,omitempty"`
}

Health is a point-in-time health snapshot for a participant.

type HealthProvider added in v0.6.0

type HealthProvider interface {
	Health() Health
}

HealthProvider is implemented by participants that expose health reporting.

type HealthStatus added in v0.6.0

type HealthStatus int

HealthStatus is the overall operational status of a participant.

const (
	// HealthOK means the participant is running normally.
	HealthOK HealthStatus = iota
	// HealthDegraded means the participant is running with reduced capability.
	HealthDegraded
	// HealthDown means the participant has been closed or has failed.
	HealthDown
)

func (HealthStatus) String added in v0.6.0

func (h HealthStatus) String() string

String returns a lowercase, JSON-friendly representation of the status.

type JSONCodec added in v0.4.0

type JSONCodec[T any] struct{}

JSONCodec[T] implements Codec[T] using encoding/json. It is a zero-size struct; the zero value is ready to use.

func (JSONCodec[T]) Marshal added in v0.4.0

func (JSONCodec[T]) Marshal(v T) ([]byte, error)

Marshal encodes v to JSON bytes.

func (JSONCodec[T]) Unmarshal added in v0.4.0

func (JSONCodec[T]) Unmarshal(data []byte) (T, error)

Unmarshal decodes data from JSON into T.

type LivelinessEvent added in v0.4.0

type LivelinessEvent int

LivelinessEvent reports whether a remote participant has been discovered or lost its lease.

const (
	// LivelinessGained fires when a new remote participant is discovered via SPDP.
	LivelinessGained LivelinessEvent = iota
	// LivelinessLost fires when a remote participant's lease has expired.
	LivelinessLost
)

type LoaningPublisher added in v0.16.0

type LoaningPublisher interface {
	Publisher
	// Loan returns a pre-allocated byte slice of the given size backed by the
	// publisher's internal pool. Returns ErrLoanBuffer if the pool is exhausted
	// or size exceeds the pool's configured capacity.
	Loan(size int) ([]byte, error)
	// Commit publishes the loaned buffer (by calling Write) and returns it to
	// the pool. buf must be a slice previously returned by Loan on this same
	// publisher. The buffer must not be used after Commit returns.
	Commit(buf []byte) error
}

LoaningPublisher extends Publisher with zero-copy loaned-sample support. Use Loan to obtain a pre-allocated buffer from the publisher's internal pool, write payload data into it, then call Commit to publish and return the buffer. Commit calls Write internally; the buffer is returned to the pool afterwards regardless of whether Write succeeds.

The loaned buffer must not be used after Commit returns.

type Metrics added in v0.3.0

type Metrics struct {
	WriteCount     uint64 `json:"write_count"`
	DeliverCount   uint64 `json:"deliver_count"`
	DropCount      uint64 `json:"drop_count"`
	BytesWritten   uint64 `json:"bytes_written"`
	BytesDelivered uint64 `json:"bytes_delivered"`
	ErrorCount     uint64 `json:"error_count"`
}

Metrics holds cumulative statistics for a participant.

type MetricsProvider added in v0.3.0

type MetricsProvider interface {
	Metrics() Metrics
}

MetricsProvider is implemented by participants that expose runtime statistics.

type Participant

type Participant interface {
	// NewPublisher creates a writer for the named topic using the given QoS.
	NewPublisher(topic string, qos QoS) (Publisher, error)

	// NewSubscriber creates a reader for the named topic using the given QoS.
	// Optional SubscriberOption values configure content filtering and other
	// per-subscriber policies.
	NewSubscriber(topic string, qos QoS, opts ...SubscriberOption) (Subscriber, error)

	// Domain returns the DDS domain this participant joined.
	Domain() Domain

	// Close releases all DDS resources held by this participant.
	Close() error
}

Participant is the DDS domain participant — the root factory for all DDS entities. Create one per process per domain. A Participant is safe for concurrent use from multiple goroutines.

type ProtoCodec added in v0.10.0

type ProtoCodec[T proto.Message] struct{}

ProtoCodec[T] implements Codec[T] using google.golang.org/protobuf. T must be a pointer to a protobuf-generated message struct (e.g. *mypkg.Msg). The zero value is ready to use.

func (ProtoCodec[T]) Marshal added in v0.10.0

func (ProtoCodec[T]) Marshal(v T) ([]byte, error)

Marshal encodes v to protobuf wire format.

func (ProtoCodec[T]) Unmarshal added in v0.10.0

func (ProtoCodec[T]) Unmarshal(data []byte) (T, error)

Unmarshal decodes data from protobuf wire format into a new T.

type Publisher

type Publisher interface {
	Write(payload []byte) error
	// WriteCtx is Write with context cancellation support. If ctx is already
	// done when WriteCtx is called it returns ctx.Err() immediately.
	WriteCtx(ctx context.Context, payload []byte) error
	Close() error
}

Publisher writes samples to a single DDS topic. A Publisher is safe for concurrent use from multiple goroutines.

type QoS

type QoS struct {
	Reliability  ReliabilityKind `json:"reliability"`
	Durability   DurabilityKind  `json:"durability"`
	HistoryDepth int             `json:"history_depth"` // 0 means implementation default (typically 1)
	Deadline     time.Duration   `json:"deadline"`      // 0 = disabled

	// TransportPriority sets the network-level priority (maps to VLAN PCP /
	// SO_PRIORITY on Linux). 0 = normal, 1–7 = elevated; 7 is highest.
	TransportPriority int `json:"transport_priority"`
	// LatencyBudget is the acceptable end-to-end delivery latency for this
	// endpoint. 0 = unspecified. Informational in v0.5; future releases may
	// enforce it via qdisc admission control.
	LatencyBudget time.Duration `json:"latency_budget"`
	// Lifespan is the sample time-to-live measured from the write timestamp.
	// Samples older than Lifespan are dropped before delivery. 0 = infinite.
	Lifespan time.Duration `json:"lifespan"`
	// PublishPeriod is the periodic publish rate for TSN streams. 0 = aperiodic.
	// The application is responsible for calling Write at this rate; the value
	// is used by TSN stream reservation and scheduling.
	PublishPeriod time.Duration `json:"publish_period"`
	// MaxSampleSize is the maximum Write payload size in bytes. Write returns
	// ErrPayloadTooLarge if the payload exceeds this limit. 0 = unlimited.
	MaxSampleSize int `json:"max_sample_size"`
}

QoS bundles the policies that govern a single publisher or subscriber endpoint.

type ReliabilityKind

type ReliabilityKind int

ReliabilityKind controls delivery guarantees for a topic endpoint.

const (
	// BestEffort delivers samples without retransmission. Suitable for
	// high-frequency sensor data where occasional loss is acceptable.
	BestEffort ReliabilityKind = iota
	// Reliable retransmits lost samples until acknowledged. Required for
	// command/control and actuator writes.
	Reliable
)

type Sample

type Sample struct {
	Topic          string    `json:"topic"`
	Payload        []byte    `json:"payload"`
	Timestamp      time.Time `json:"timestamp"`
	SequenceNumber uint64    `json:"seq"`         // monotonically increasing per writer; 0 = not set
	WriterGUID     GUID      `json:"writer_guid"` // identity of the publishing endpoint; zero = not set
}

Sample is a single data sample delivered to a Subscriber. Timestamp is the source time of the write; zero means no timestamp was set (INFO_TS was not present in the RTPS message, or the mock transport was used).

func FromMessage added in v0.41.0

func FromMessage(m relay.Message) (Sample, error)

FromMessage converts a relay.Message envelope back to a Sample (spec §15.7.2).

func (Sample) ToMessage added in v0.41.0

func (s Sample) ToMessage() relay.Message

ToMessage converts this Sample to a relay.Message envelope (spec §15.7.2).

type Span added in v0.4.0

type Span interface {
	// SetAttribute attaches a key/value attribute to the span.
	SetAttribute(key, value string)
	// End finalises the span and records it to the tracer backend.
	End()
}

Span is a single tracing span. Call End when the operation is complete.

type SpanAttribute added in v0.4.0

type SpanAttribute struct {
	Key   string
	Value string
}

SpanAttribute is a key/value pair attached to a tracing span.

type Subscriber

type Subscriber interface {
	C() <-chan Sample
	// TryRead attempts a non-blocking read. Returns (zero, false) when the
	// channel is empty or closed.
	TryRead() (Sample, bool)
	// Unsubscribe removes this subscriber from the topic without closing its
	// channel. After Unsubscribe the channel remains open but no new samples
	// are delivered. Call Close to stop delivery AND close the channel.
	// Idempotent; second call is a no-op (spec §6.4).
	Unsubscribe()
	Close() error
}

Subscriber reads samples from a single DDS topic as a Go channel. A Subscriber is safe for concurrent use from multiple goroutines.

type SubscriberConfig added in v0.3.0

type SubscriberConfig struct {
	Filter                 func(Sample) bool
	ChannelDepth           int                // 0 = implementation default (64)
	BackPressure           BackPressurePolicy // default: DropNewest
	DeadlineMissedCallback func()             // called when subscriber deadline expires; nil = disabled
}

SubscriberConfig holds per-subscriber options applied at construction time. It is exported so that implementation packages (mock, rtps, cyclone) can read the resolved configuration without duplicating the option-merge logic.

func ApplySubscriberOpts added in v0.3.0

func ApplySubscriberOpts(opts []SubscriberOption) SubscriberConfig

ApplySubscriberOpts merges a slice of SubscriberOption into a SubscriberConfig.

func (SubscriberConfig) ChanDepth added in v0.4.0

func (c SubscriberConfig) ChanDepth(defaultDepth int) int

ChanDepth returns the resolved channel depth: cfg.ChannelDepth if > 0, otherwise the provided default.

type SubscriberOption added in v0.3.0

type SubscriberOption func(*SubscriberConfig)

SubscriberOption configures a subscriber at creation time.

func WithBackPressure added in v0.4.0

func WithBackPressure(policy BackPressurePolicy) SubscriberOption

WithBackPressure sets the back-pressure policy applied when the subscriber channel is full. The default policy is DropNewest.

func WithChannelDepth added in v0.4.0

func WithChannelDepth(n int) SubscriberOption

WithChannelDepth sets the capacity of the subscriber's internal channel. A depth of 0 uses the implementation default (typically 64).

func WithDeadlineMissed added in v0.9.1

func WithDeadlineMissed(fn func()) SubscriberOption

WithDeadlineMissed registers fn to be called when the subscriber has not received a sample within its QoS.Deadline period. fn must be non-nil. Has no effect when QoS.Deadline == 0 on the subscriber.

func WithFilter added in v0.3.0

func WithFilter(fn func(Sample) bool) SubscriberOption

WithFilter returns a SubscriberOption that applies fn as a content filter. Only samples for which fn returns true are delivered to the subscriber's channel; non-matching samples are discarded silently.

type TopicMetrics added in v0.6.0

type TopicMetrics struct {
	Topic          string
	WriteCount     uint64
	DeliverCount   uint64
	DropCount      uint64
	BytesWritten   uint64
	BytesDelivered uint64
}

TopicMetrics holds per-topic statistics for a single DDS topic.

type TopicMetricsProvider added in v0.6.0

type TopicMetricsProvider interface {
	TopicMetrics() []TopicMetrics
}

TopicMetricsProvider is implemented by participants that expose per-topic statistics. The returned slice contains one entry per observed topic.

type Tracer added in v0.4.0

type Tracer interface {
	Start(ctx context.Context, spanName string, attrs ...SpanAttribute) (context.Context, Span)
}

Tracer is satisfied by any OpenTelemetry-compatible tracer implementation. go-DDS does not import go.opentelemetry.io/otel; callers bridge their tracer to this interface using a thin adapter.

var NoopTracer Tracer = noopTracerImpl{}

NoopTracer is the zero-cost tracer used when no OTel backend is configured.

type TypedPublisher added in v0.4.0

type TypedPublisher[T any] struct {
	// contains filtered or unexported fields
}

TypedPublisher[T] wraps a Publisher to encode values with a Codec before writing.

func NewTypedPublisher added in v0.4.0

func NewTypedPublisher[T any](pub Publisher, codec Codec[T]) *TypedPublisher[T]

NewTypedPublisher wraps pub so that Write accepts T values, encoding them with codec before passing bytes to the underlying Publisher.

func (*TypedPublisher[T]) Close added in v0.4.0

func (tp *TypedPublisher[T]) Close() error

Close closes the underlying publisher.

func (*TypedPublisher[T]) Write added in v0.4.0

func (tp *TypedPublisher[T]) Write(v T) error

Write encodes v with the configured Codec and writes it to the underlying publisher.

func (*TypedPublisher[T]) WriteCtx added in v0.9.1

func (tp *TypedPublisher[T]) WriteCtx(ctx context.Context, v T) error

WriteCtx encodes v and writes it, honouring ctx cancellation.

type TypedSample added in v0.4.0

type TypedSample[T any] struct {
	Topic          string
	Value          T
	Timestamp      time.Time
	SequenceNumber uint64
	WriterGUID     GUID
}

TypedSample[T] is a decoded sample delivered by TypedSubscriber[T].

type TypedSubscriber added in v0.4.0

type TypedSubscriber[T any] struct {
	// contains filtered or unexported fields
}

TypedSubscriber[T] wraps a Subscriber to decode samples with a Codec. Samples that fail to decode are silently dropped.

func NewTypedSubscriber added in v0.4.0

func NewTypedSubscriber[T any](sub Subscriber, codec Codec[T]) *TypedSubscriber[T]

NewTypedSubscriber wraps sub so that its channel delivers TypedSample[T] values decoded with codec. A background goroutine pumps samples from sub.C().

func (*TypedSubscriber[T]) C added in v0.4.0

func (ts *TypedSubscriber[T]) C() <-chan TypedSample[T]

C returns the typed sample channel.

func (*TypedSubscriber[T]) Close added in v0.4.0

func (ts *TypedSubscriber[T]) Close() error

Close stops the pump goroutine and closes the underlying subscriber.

type WaitSet

type WaitSet struct {
	// contains filtered or unexported fields
}

WaitSet multiplexes over a dynamic set of Subscribers. Use NewWaitSet to construct one and Attach/Detach to modify the set at any time. Wait blocks until one of the attached subscribers delivers a sample.

func NewWaitSet

func NewWaitSet(subs ...Subscriber) *WaitSet

NewWaitSet creates a WaitSet that monitors the given subscribers.

Example

ExampleNewWaitSet demonstrates multiplexing over two subscribers without a polling loop. The WaitSet blocks until any attached subscriber delivers a sample, then returns it together with the subscriber it arrived on.

package main

import (
	"context"
	"fmt"
	"time"

	dds "github.com/SoundMatt/go-DDS"
	"github.com/SoundMatt/go-DDS/mock"
)

func main() {
	p, err := mock.New(dds.Domain(0))
	if err != nil {
		panic("New: " + err.Error())
	}
	defer p.Close()

	subTemp, err := p.NewSubscriber("sensors/temp", dds.DefaultQoS)
	if err != nil {
		panic("NewSubscriber: " + err.Error())
	}
	subSpeed, err := p.NewSubscriber("vehicle/speed", dds.DefaultQoS)
	if err != nil {
		panic("NewSubscriber: " + err.Error())
	}
	defer func() { _ = subTemp.Close() }()
	defer func() { _ = subSpeed.Close() }()

	pubTemp, err := p.NewPublisher("sensors/temp", dds.DefaultQoS)
	if err != nil {
		panic("NewPublisher: " + err.Error())
	}
	defer func() { _ = pubTemp.Close() }()

	_ = pubTemp.Write([]byte("21.5"))

	ws := dds.NewWaitSet(subTemp, subSpeed)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	sample, sub, err := ws.Wait(ctx)
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	switch sub {
	case subTemp:
		fmt.Println("temp:", string(sample.Payload))
	case subSpeed:
		fmt.Println("speed:", string(sample.Payload))
	}
}
Output:
temp: 21.5

func (*WaitSet) Attach added in v0.10.0

func (ws *WaitSet) Attach(subs ...Subscriber) *WaitSet

Attach adds subs to the WaitSet. Changes take effect on the next Wait call; an in-progress Wait sees the snapshot taken at its start.

func (*WaitSet) Detach added in v0.10.0

func (ws *WaitSet) Detach(subs ...Subscriber) *WaitSet

Detach removes subs from the WaitSet. Changes take effect on the next Wait call; an in-progress Wait sees the snapshot taken at its start.

func (*WaitSet) Wait

func (ws *WaitSet) Wait(ctx context.Context) (Sample, Subscriber, error)

Wait blocks until a sample is available on any attached subscriber, or until ctx is cancelled. It snapshots the subscriber list at call time; Attach/Detach calls from other goroutines take effect on the next Wait invocation.

Directories

Path Synopsis
Package admin provides an HTTP administration API for a go-DDS participant (Milestone 10 — Enterprise Services: Administration).
Package admin provides an HTTP administration API for a go-DDS participant (Milestone 10 — Enterprise Services: Administration).
Package auto provides automatic DDS transport selection.
Package auto provides automatic DDS transport selection.
bridge
domain
Package domain provides a Bridge that forwards DDS samples between two Participant domains (Milestone 10 — Enterprise Services: Routing).
Package domain provides a Bridge that forwards DDS samples between two Participant domains (Milestone 10 — Enterprise Services: Routing).
grpc
Package grpcbridge provides a gRPC gateway that bridges a DDS participant to gRPC clients using JSON encoding.
Package grpcbridge provides a gRPC gateway that bridges a DDS participant to gRPC clients using JSON encoding.
mqtt
Package mqtt provides a bidirectional bridge between a DDS participant and an MQTT broker.
Package mqtt provides a bidirectional bridge between a DDS participant and an MQTT broker.
rest
Package rest provides an HTTP/SSE gateway that bridges a DDS participant to HTTP clients.
Package rest provides an HTTP/SSE gateway that bridges a DDS participant to HTTP clients.
wan
Package wan provides a WAN Bridge that forwards DDS samples between two Participant domains over a TCP connection (Milestone 10 — Routing: WAN bridge).
Package wan provides a WAN Bridge that forwards DDS samples between two Participant domains over a TCP connection (Milestone 10 — Routing: WAN bridge).
Package cdr implements CDR/XCDR1 (Common Data Representation) encoding and decoding as specified by the OMG DDS-XTypes 1.3 standard.
Package cdr implements CDR/XCDR1 (Common Data Representation) encoding and decoding as specified by the OMG DDS-XTypes 1.3 standard.
cmd
ddstool command
Command ddstool is a command-line interface for DDS publish/subscribe operations, participant diagnostics, and IDL code generation.
Command ddstool is a command-line interface for DDS publish/subscribe operations, participant diagnostics, and IDL code generation.
go-dds command
Command go-dds is the RELAY-conformant CLI for the go-DDS library.
Command go-dds is the RELAY-conformant CLI for the go-DDS library.
latmon command
monitor command
Command monitor starts a real-time web dashboard for a DDS domain.
Command monitor starts a real-time web dashboard for a DDS domain.
Package config provides JSON-based configuration for DDS participants.
Package config provides JSON-based configuration for DDS participants.
Package cyclone provides a CycloneDDS-backed implementation of the dds interfaces via CGo.
Package cyclone provides a CycloneDDS-backed implementation of the dds interfaces via CGo.
examples
auto-transport command
Command auto-transport demonstrates the auto transport selector.
Command auto-transport demonstrates the auto transport selector.
command-response command
command-response demonstrates the OMG DDS-RPC request-reply pattern using rpc.Requester and rpc.Replier built on two DDS topics.
command-response demonstrates the OMG DDS-RPC request-reply pattern using rpc.Requester and rpc.Replier built on two DDS topics.
loaned-samples command
Command loaned-samples demonstrates zero-copy publishing with LoaningPublisher.
Command loaned-samples demonstrates zero-copy publishing with LoaningPublisher.
otel-tracing command
Command otel-tracing demonstrates the OpenTelemetry adapter for go-DDS.
Command otel-tracing demonstrates the OpenTelemetry adapter for go-DDS.
quickstart/pub command
Command pub publishes sensor readings on DDS topic "sensors/temperature".
Command pub publishes sensor readings on DDS topic "sensors/temperature".
quickstart/sub command
Command sub subscribes to DDS topic "sensors/temperature" and logs samples.
Command sub subscribes to DDS topic "sensors/temperature" and logs samples.
secure-topic command
secure-topic demonstrates end-to-end payload protection using the security package.
secure-topic demonstrates end-to-end payload protection using the security package.
sensor-pipeline command
sensor-pipeline demonstrates periodic publishing and aggregating subscription using TypedPublisher/TypedSubscriber with the JSON codec.
sensor-pipeline demonstrates periodic publishing and aggregating subscription using TypedPublisher/TypedSubscriber with the JSON codec.
taprio-stream command
taprio-stream demonstrates how to configure a TSN stream for bounded-latency DDS publishing using tsn.StreamConfig and tsn.TAPRIOConfig.
taprio-stream demonstrates how to configure a TSN stream for bounded-latency DDS publishing using tsn.StreamConfig and tsn.TAPRIOConfig.
idl
Package idl provides an OMG IDL parser and Go source code generator for go-DDS (Milestone M2 — Developer Experience).
Package idl provides an OMG IDL parser and Go source code generator for go-DDS (Milestone M2 — Developer Experience).
Package interop contains RTPS wire-compatibility tests that require a live CycloneDDS peer.
Package interop contains RTPS wire-compatibility tests that require a live CycloneDDS peer.
Package mock provides an in-process, CGo-free implementation of the dds interfaces.
Package mock provides an in-process, CGo-free implementation of the dds interfaces.
Package monitor provides a real-time web dashboard for a DDS participant.
Package monitor provides a real-time web dashboard for a DDS participant.
Package otel bridges the dds.Tracer interface to an OpenTelemetry TracerProvider.
Package otel bridges the dds.Tracer interface to an OpenTelemetry TracerProvider.
Package pool provides allocation-efficient data structures for high-throughput DDS workloads on embedded and edge hardware.
Package pool provides allocation-efficient data structures for high-throughput DDS workloads on embedded and edge hardware.
Package record provides topic recording, deterministic replay, and fault injection for go-DDS applications.
Package record provides topic recording, deterministic replay, and fault injection for go-DDS applications.
Package rpc implements OMG DDS-RPC style request-reply over a DDS participant.
Package rpc implements OMG DDS-RPC style request-reply over a DDS participant.
Package rtps provides a pure-Go RTPS/UDP implementation of the dds interfaces.
Package rtps provides a pure-Go RTPS/UDP implementation of the dds interfaces.
Package safety provides end-to-end data protection and deterministic queuing primitives for safety-oriented DDS deployments.
Package safety provides end-to-end data protection and deterministic queuing primitives for safety-oriented DDS deployments.
Package security provides pluggable transport-security for go-DDS.
Package security provides pluggable transport-security for go-DDS.
Package services provides long-running operational service wrappers for go-DDS (Milestone 10 — Enterprise Services: Service Framework).
Package services provides long-running operational service wrappers for go-DDS (Milestone 10 — Enterprise Services: Service Framework).
Package shmem provides a cross-process shared-memory transport for same-host DDS communication.
Package shmem provides a cross-process shared-memory transport for same-host DDS communication.
Package testutil provides test-harness helpers for DDS participants.
Package testutil provides test-harness helpers for DDS participants.
scenario
Package scenario provides a declarative scenario DSL for DDS integration tests.
Package scenario provides a declarative scenario DSL for DDS integration tests.
Package tsn provides Time-Sensitive Networking stream descriptors and configuration utilities following the OMG DDS-TSN specification.
Package tsn provides Time-Sensitive Networking stream descriptors and configuration utilities following the OMG DDS-TSN specification.
Package xtypes implements Dynamic Data for go-DDS (Milestone 9).
Package xtypes implements Dynamic Data for go-DDS (Milestone 9).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL