dds

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2026 License: MPL-2.0 Imports: 4 Imported by: 0

README

go-DDS

A generic Go library for DDS (Data Distribution Service) publish/subscribe. Works in any domain — IoT, robotics, industrial control, vehicle networks, simulation, and more.

The API is a stable Go interface. Implementations are swappable without changing application code:

Package Description Requires
mock In-process, pure Go. Zero dependencies. Default for development and testing. Nothing
rtps Pure-Go RTPS/UDP wire protocol. Real DDS, zero native dependencies. Nothing
cyclone CycloneDDS via CGo. Interoperates with non-Go DDS participants. libcyclonedds-dev + -tags cyclone
security Pluggable security — NullPlugin, HMAC-SHA-256, AES-256-GCM. Nothing

Install

go get github.com/SoundMatt/go-DDS

Quick start

import (
    dds "github.com/SoundMatt/go-DDS"
    "github.com/SoundMatt/go-DDS/mock"
)

p, _ := mock.New(dds.Domain(0))
defer p.Close()

sub, _ := p.NewSubscriber("sensors/temperature", dds.DefaultQoS)
pub, _ := p.NewPublisher("sensors/temperature", dds.DefaultQoS)

pub.Write([]byte(`{"value": 21.5, "unit": "celsius"}`))

sample := <-sub.C()
fmt.Println(string(sample.Payload)) // {"value": 21.5, "unit": "celsius"}

Switching implementations

Application code only ever references the dds interface package. Swap implementations at the call site:

// Development / tests — no system library needed:
import "github.com/SoundMatt/go-DDS/mock"
p, err := mock.New(dds.Domain(0))

// Production — pure-Go UDP transport, no native deps:
import "github.com/SoundMatt/go-DDS/rtps"
p, err := rtps.New(dds.Domain(0))

// Interop — real CycloneDDS domain, multi-host:
// (rebuild with: go build -tags cyclone ./...)
import "github.com/SoundMatt/go-DDS/cyclone"
p, err := cyclone.New(dds.Domain(0))

QoS

// Live data — best-effort, volatile (default)
pub, _ := p.NewPublisher("robot/joint/angles", dds.DefaultQoS)

// Commands — reliable delivery, late joiners see current state
cmd, _ := p.NewPublisher("robot/joint/target", dds.ReliableQoS)

WaitSet

dds.WaitSet multiplexes over multiple subscribers — no polling loop required:

subTemp, _ := p.NewSubscriber("sensors/temp", dds.DefaultQoS)
subSpeed, _ := p.NewSubscriber("vehicle/speed", dds.DefaultQoS)

ws := dds.NewWaitSet(subTemp, subSpeed)
ctx := context.Background()

for {
    sample, sub, err := ws.Wait(ctx)
    if err != nil {
        break
    }
    switch sub {
    case subTemp:
        fmt.Println("temp:", string(sample.Payload))
    case subSpeed:
        fmt.Println("speed:", string(sample.Payload))
    }
}

Security

Pluggable payload-level security via the security package:

import (
    "github.com/SoundMatt/go-DDS/rtps"
    "github.com/SoundMatt/go-DDS/security"
)

key := security.NewRandomKey(32)

// AES-256-GCM: full encryption + authentication
aesPlugin, _ := security.NewAESGCMPlugin(key)
p, _ := rtps.New(dds.Domain(0), rtps.WithSecurity(aesPlugin))

// HMAC-SHA-256: integrity + authentication, no encryption
hmacPlugin := security.NewHMACPlugin(key)
p, _ = rtps.New(dds.Domain(0), rtps.WithSecurity(hmacPlugin))

All peers communicating on a topic must use the same plugin and key.

Example use cases

Domain Topic example QoS
Robotics robot/arm/joint_states BestEffort (100 Hz sensor)
Industrial plc/conveyor/speed Reliable (actuator command)
Vehicle networks vehicle/speed BestEffort
Simulation sim/entity/pose BestEffort
IoT building/floor3/temp Reliable

Wire format

Each DDS sample payload is raw bytes. The application chooses the encoding — JSON, Protobuf, MessagePack, plain text, or anything else. go-DDS does not impose a schema.

The RTPS transport encodes payloads as CDR_LE byte arrays, compatible with the RTPS 2.3 wire format. The CycloneDDS implementation uses an opaque RawMessage DDS type.

RTPS interoperability testing

The interop/ directory contains wire-compatibility tests against a live CycloneDDS peer. These tests are gated behind the interop build tag so they never run in normal CI.

# Start CycloneDDS peer in Docker
docker compose -f interop/docker-compose.yml up -d cyclone-peer

# Run interop tests
go test -tags interop -v -timeout 60s ./interop/...

# Tear down
docker compose -f interop/docker-compose.yml down

Three tests are provided:

Test Direction What it verifies
TestInterop_GoPublisher_CycloneSubscriber go-DDS → CycloneDDS RTPS writer is interoperable
TestInterop_CyclonePublisher_GoSubscriber CycloneDDS → go-DDS RTPS reader is interoperable
TestInterop_BidirectionalEcho both End-to-end round-trip

Set INTEROP_DOMAIN (default 0) and INTEROP_TIMEOUT (default 15s) to configure the tests.

Using CycloneDDS (production interop)

# Linux
apt-get install -y libcyclonedds-dev

# macOS
brew install cyclonedds

# Build
go build -tags cyclone ./...
go test -tags cyclone ./cyclone/...

CI status

CI Go Reference

Job Platforms Notes
test-mock ubuntu, macOS, Windows × Go 1.22/1.23 race detector, full coverage
test-rtps ubuntu -short (skips 2.2 s two-participant test)
test-cyclone ubuntu-22.04 probe-and-flag — skips cleanly if libcyclonedds-dev is absent
benchmark-smoke ubuntu 1 iteration each, catches panics/deadlocks
fuzz-short ubuntu 10 s per fuzz target
lint ubuntu golangci-lint
dco PR only Signed-off-by check

Roadmap

See ROADMAP.md for per-item context, release notes, and implementation guidance.

Released

  • Go interface (Participant, Publisher, Subscriber, QoS)
  • In-process mock — 100% statement coverage
  • CycloneDDS CGo implementation (-tags cyclone)
  • Configurable poll interval (cyclone.Options)
  • Pure-Go RTPS/UDP — no CGo, all platforms
  • Reliable QoS retransmission (HEARTBEAT / ACKNACK)
  • WaitSet — sub-millisecond multi-topic blocking receive
  • DDS-Security plugin interface (NullPlugin, HMAC-SHA-256, AES-256-GCM)
  • TransientLocal durability (last-value cache for late joiners)
  • IPv6 multicast transport (WithIPv6() option, LocatorKindUDPv6)
  • RTPS interop testing with CycloneDDS (Docker Compose + CycloneDDS peer)

Known protocol bugs (all fixed in v0.2.1)

  • matchedReaderLocators ignores topic — sends data to all peers (participant.go:497)
  • SPDP lease duration advertised but never enforced — stale peers never evicted
  • RTPS GAP submessage not sent — reliable subscribers stall after history eviction

Planned — core

  • Typed sentinel errors (errors.Is / errors.As support)
  • Unicast-only / no-multicast discovery mode (Docker, container, NAT environments)
  • Content-filtered subscriptions (server-side predicate before channel delivery)
  • Deadline QoS (configurable missed-deadline callback)
  • Large payload fragmentation (RTPS DATA_FRAG submessage)
  • Topic wildcards (sensors/#, vehicle/*/speed)
  • Metrics / statistics API (publish count, drop count, latency histogram)
  • RTPS persistent history (disk-backed TransientLocal for crash recovery)
  • Real-time web monitor (monitor/ sub-package, SSE, no external dependencies)

Planned — operational

  • Configurable subscriber channel depth and back-pressure policy (drop-newest / drop-oldest / block)
  • Structured logging (WithLogger(*slog.Logger) option, zero-cost when unused)
  • Participant liveliness detection and callback (LivelinessGained / LivelinessLost)
  • Graceful shutdown with reliable-ACK drain (CloseWithDrain(ctx))

Planned — transport

  • Multicast data delivery (topic-specific multicast group, one packet per write)
  • Shared memory transport (shmem/ sub-package, cross-process same-host, zero UDP copies)
  • INFO_TS submessage — source timestamps in Sample.SourceTimestamp

Planned — integration

  • MQTT bridge (bridge/mqtt/ — DDS ↔ MQTT bidirectional, QoS mapping)
  • IDL / protobuf schema binding (TypedPublisher[T] / TypedSubscriber[T] generics, go generate)
  • OpenTelemetry tracing (WithOTelTracer, per-write and per-deliver spans)

Planned — TSN (Time-Sensitive Networking)

  • TSN-extended QoS fields (TransportPriority, LatencyBudget, Lifespan, PublishPeriod, MaxSampleSize)
  • DDS-to-TSN stream model (tsn.Stream descriptor; OMG DDS-TSN spec)
  • VLAN, PCP, and DSCP socket marking (SO_PRIORITY, IP_TOS, VLAN interface, Linux-only)
  • Scheduled transmit time (SO_TXTIME + CLOCK_TAI + ETF/taprio qdisc, Linux-only)
  • gPTP / IEEE 802.1AS time base integration (CLOCK_TAI via golang.org/x/sys/unix)
  • Separate traffic-class sockets (discovery, best-effort data, per-TSN-writer)
  • TSN-safe discovery (configurable SPDP interval, jitter budget, static peers)
  • Fragmentation bounds for TSN streams (reject oversize, deterministic DATA_FRAG)
  • External TSN configuration (YAML/JSON tsn_streams: file, tsn.LoadConfig)

Contributing

See CONTRIBUTING.md. All commits require a DCO sign-off.

License

Mozilla Public License v2.0 — see LICENSE.
Copyright (c) 2026 Matt Jones.

Documentation

Overview

Package dds defines the Go interface for Data Distribution Service (DDS) publish/subscribe operations.

The interface is intentionally narrow: it covers the pub/sub primitives needed for vehicle-signal transport and nothing more.

Choose an implementation by importing one of the sub-packages and calling its New function:

import "github.com/SoundMatt/go-DDS/mock"    // in-process, no CGo
import "github.com/SoundMatt/go-DDS/cyclone" // CycloneDDS via CGo
import "github.com/SoundMatt/go-DDS/rtps"    // pure-Go RTPS/UDP

All packages expose a New(Domain) (Participant, error) constructor that satisfies this package's Participant interface.

Index

Examples

Constants

This section is empty.

Variables

View Source
var DefaultQoS = QoS{
	Reliability:  BestEffort,
	Durability:   Volatile,
	HistoryDepth: 1,
}

DefaultQoS is BestEffort + Volatile with implementation-default history.

View Source
var ErrClosed = errors.New("dds: entity is closed")

ErrClosed is returned when an operation is attempted on a closed entity.

View Source
var ErrTopicEmpty = errors.New("dds: topic name must not be empty")

ErrTopicEmpty is returned when an empty topic string is passed.

View Source
var ReliableQoS = QoS{
	Reliability:  Reliable,
	Durability:   TransientLocal,
	HistoryDepth: 1,
}

ReliableQoS is Reliable + TransientLocal. Use for actuator commands and any topic where a late-joining subscriber must receive the current value.

Functions

This section is empty.

Types

type Domain

type Domain int

Domain is a DDS domain identifier (0–232 inclusive per the DDS spec). Participants on the same domain and network segment discover each other automatically without a broker.

type DurabilityKind

type DurabilityKind int

DurabilityKind controls whether late-joining subscribers receive historical samples that were published before they joined.

const (
	// Volatile discards samples as soon as they are delivered.
	Volatile DurabilityKind = iota
	// TransientLocal retains the last N samples so that late joiners
	// receive current state on subscription.
	TransientLocal
)

type Metrics added in v0.3.0

type Metrics struct {
	WriteCount     uint64
	DeliverCount   uint64
	DropCount      uint64
	BytesWritten   uint64
	BytesDelivered uint64
}

Metrics holds cumulative statistics for a participant.

type MetricsProvider added in v0.3.0

type MetricsProvider interface {
	Metrics() Metrics
}

MetricsProvider is implemented by participants that expose runtime statistics.

type Participant

type Participant interface {
	// NewPublisher creates a writer for the named topic using the given QoS.
	NewPublisher(topic string, qos QoS) (Publisher, error)

	// NewSubscriber creates a reader for the named topic using the given QoS.
	// Optional SubscriberOption values configure content filtering and other
	// per-subscriber policies.
	NewSubscriber(topic string, qos QoS, opts ...SubscriberOption) (Subscriber, error)

	// Close releases all DDS resources held by this participant.
	Close() error
}

Participant is the DDS domain participant — the root factory for all DDS entities. Create one per process per domain. A Participant is safe for concurrent use from multiple goroutines.

type Publisher

type Publisher interface {
	Write(payload []byte) error
	Close() error
}

Publisher writes samples to a single DDS topic. A Publisher is safe for concurrent use from multiple goroutines.

type QoS

type QoS struct {
	Reliability  ReliabilityKind
	Durability   DurabilityKind
	HistoryDepth int           // 0 means implementation default (typically 1)
	Deadline     time.Duration // 0 = disabled; publisher fires DeadlineCallback if no Write within this period
}

QoS bundles the policies that govern a single publisher or subscriber endpoint.

type ReliabilityKind

type ReliabilityKind int

ReliabilityKind controls delivery guarantees for a topic endpoint.

const (
	// BestEffort delivers samples without retransmission. Suitable for
	// high-frequency sensor data where occasional loss is acceptable.
	BestEffort ReliabilityKind = iota
	// Reliable retransmits lost samples until acknowledged. Required for
	// command/control and actuator writes.
	Reliable
)

type Sample

type Sample struct {
	Topic   string
	Payload []byte
}

Sample is a single data sample delivered to a Subscriber.

type Subscriber

type Subscriber interface {
	C() <-chan Sample
	Close() error
}

Subscriber reads samples from a single DDS topic as a Go channel. A Subscriber is safe for concurrent use from multiple goroutines.

type SubscriberConfig added in v0.3.0

type SubscriberConfig struct {
	Filter func(Sample) bool
}

SubscriberConfig holds per-subscriber options applied at construction time. It is exported so that implementation packages (mock, rtps, cyclone) can read the resolved configuration without duplicating the option-merge logic.

func ApplySubscriberOpts added in v0.3.0

func ApplySubscriberOpts(opts []SubscriberOption) SubscriberConfig

ApplySubscriberOpts merges a slice of SubscriberOption into a SubscriberConfig.

type SubscriberOption added in v0.3.0

type SubscriberOption func(*SubscriberConfig)

SubscriberOption configures a subscriber at creation time.

func WithFilter added in v0.3.0

func WithFilter(fn func(Sample) bool) SubscriberOption

WithFilter returns a SubscriberOption that applies fn as a content filter. Only samples for which fn returns true are delivered to the subscriber's channel; non-matching samples are discarded silently.

type WaitSet

type WaitSet struct {
	// contains filtered or unexported fields
}

WaitSet multiplexes over a set of subscribers, blocking until any one of them delivers a sample.

func NewWaitSet

func NewWaitSet(subs ...Subscriber) *WaitSet

NewWaitSet creates a WaitSet that monitors the given subscribers.

Example

ExampleNewWaitSet demonstrates multiplexing over two subscribers without a polling loop. The WaitSet blocks until any attached subscriber delivers a sample, then returns it together with the subscriber it arrived on.

package main

import (
	"context"
	"fmt"
	"time"

	dds "github.com/SoundMatt/go-DDS"
	"github.com/SoundMatt/go-DDS/mock"
)

func main() {
	p, _ := mock.New(dds.Domain(0))
	defer p.Close()

	subTemp, _ := p.NewSubscriber("sensors/temp", dds.DefaultQoS)
	subSpeed, _ := p.NewSubscriber("vehicle/speed", dds.DefaultQoS)
	defer subTemp.Close()
	defer subSpeed.Close()

	pubTemp, _ := p.NewPublisher("sensors/temp", dds.DefaultQoS)
	defer pubTemp.Close()

	_ = pubTemp.Write([]byte("21.5"))

	ws := dds.NewWaitSet(subTemp, subSpeed)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	sample, sub, err := ws.Wait(ctx)
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	switch sub {
	case subTemp:
		fmt.Println("temp:", string(sample.Payload))
	case subSpeed:
		fmt.Println("speed:", string(sample.Payload))
	}
}
Output:
temp: 21.5

func (*WaitSet) Wait

func (ws *WaitSet) Wait(ctx context.Context) (Sample, Subscriber, error)

Wait blocks until a sample is available on any attached subscriber, or until ctx is cancelled.

Directories

Path Synopsis
Package cyclone provides a CycloneDDS-backed implementation of the dds interfaces via CGo.
Package cyclone provides a CycloneDDS-backed implementation of the dds interfaces via CGo.
Package interop contains RTPS wire-compatibility tests that require a live CycloneDDS peer.
Package interop contains RTPS wire-compatibility tests that require a live CycloneDDS peer.
Package mock provides an in-process, CGo-free implementation of the dds interfaces.
Package mock provides an in-process, CGo-free implementation of the dds interfaces.
Package monitor provides a real-time web dashboard for a DDS participant.
Package monitor provides a real-time web dashboard for a DDS participant.
Package rtps provides a pure-Go RTPS/UDP implementation of the dds interfaces.
Package rtps provides a pure-Go RTPS/UDP implementation of the dds interfaces.
Package security provides pluggable transport-security for go-DDS.
Package security provides pluggable transport-security for go-DDS.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL