dds

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2026 License: MPL-2.0 Imports: 6 Imported by: 0

README

go-DDS

A generic Go library for DDS (Data Distribution Service) publish/subscribe. Works in any domain — IoT, robotics, industrial control, vehicle networks, simulation, and more.

The API is a stable Go interface. Implementations are swappable without changing application code:

Package Description Requires
mock In-process, pure Go. Zero dependencies. Default for development and testing. Nothing
rtps Pure-Go RTPS/UDP wire protocol. Real DDS, zero native dependencies. Nothing
cyclone CycloneDDS via CGo. Interoperates with non-Go DDS participants. libcyclonedds-dev + -tags cyclone
security Pluggable security — NullPlugin, HMAC-SHA-256, AES-256-GCM. Nothing

Install

go get github.com/SoundMatt/go-DDS

Quick start

import (
    dds "github.com/SoundMatt/go-DDS"
    "github.com/SoundMatt/go-DDS/mock"
)

p, _ := mock.New(dds.Domain(0))
defer p.Close()

sub, _ := p.NewSubscriber("sensors/temperature", dds.DefaultQoS)
pub, _ := p.NewPublisher("sensors/temperature", dds.DefaultQoS)

pub.Write([]byte(`{"value": 21.5, "unit": "celsius"}`))

sample := <-sub.C()
fmt.Println(string(sample.Payload)) // {"value": 21.5, "unit": "celsius"}

Switching implementations

Application code only ever references the dds interface package. Swap implementations at the call site:

// Development / tests — no system library needed:
import "github.com/SoundMatt/go-DDS/mock"
p, err := mock.New(dds.Domain(0))

// Production — pure-Go UDP transport, no native deps:
import "github.com/SoundMatt/go-DDS/rtps"
p, err := rtps.New(dds.Domain(0))

// Interop — real CycloneDDS domain, multi-host:
// (rebuild with: go build -tags cyclone ./...)
import "github.com/SoundMatt/go-DDS/cyclone"
p, err := cyclone.New(dds.Domain(0))

QoS

// Live data — best-effort, volatile (default)
pub, _ := p.NewPublisher("robot/joint/angles", dds.DefaultQoS)

// Commands — reliable delivery, late joiners see current state
cmd, _ := p.NewPublisher("robot/joint/target", dds.ReliableQoS)

WaitSet

dds.WaitSet multiplexes over multiple subscribers — no polling loop required:

subTemp, _ := p.NewSubscriber("sensors/temp", dds.DefaultQoS)
subSpeed, _ := p.NewSubscriber("vehicle/speed", dds.DefaultQoS)

ws := dds.NewWaitSet(subTemp, subSpeed)
ctx := context.Background()

for {
    sample, sub, err := ws.Wait(ctx)
    if err != nil {
        break
    }
    switch sub {
    case subTemp:
        fmt.Println("temp:", string(sample.Payload))
    case subSpeed:
        fmt.Println("speed:", string(sample.Payload))
    }
}

Security

Pluggable payload-level security via the security package:

import (
    "github.com/SoundMatt/go-DDS/rtps"
    "github.com/SoundMatt/go-DDS/security"
)

key := security.NewRandomKey(32)

// AES-256-GCM: full encryption + authentication
aesPlugin, _ := security.NewAESGCMPlugin(key)
p, _ := rtps.New(dds.Domain(0), rtps.WithSecurity(aesPlugin))

// HMAC-SHA-256: integrity + authentication, no encryption
hmacPlugin := security.NewHMACPlugin(key)
p, _ = rtps.New(dds.Domain(0), rtps.WithSecurity(hmacPlugin))

All peers communicating on a topic must use the same plugin and key.

Example use cases

Domain Topic example QoS
Robotics robot/arm/joint_states BestEffort (100 Hz sensor)
Industrial plc/conveyor/speed Reliable (actuator command)
Vehicle networks vehicle/speed BestEffort
Simulation sim/entity/pose BestEffort
IoT building/floor3/temp Reliable

Wire format

Each DDS sample payload is raw bytes. The application chooses the encoding — JSON, Protobuf, MessagePack, plain text, or anything else. go-DDS does not impose a schema.

The RTPS transport encodes payloads as CDR_LE byte arrays, compatible with the RTPS 2.3 wire format. The CycloneDDS implementation uses an opaque RawMessage DDS type.

RTPS interoperability testing

The interop/ directory contains wire-compatibility tests against a live CycloneDDS peer. These tests are gated behind the interop build tag so they never run in normal CI.

# Start CycloneDDS peer in Docker
docker compose -f interop/docker-compose.yml up -d cyclone-peer

# Run interop tests
go test -tags interop -v -timeout 60s ./interop/...

# Tear down
docker compose -f interop/docker-compose.yml down

Three tests are provided:

Test Direction What it verifies
TestInterop_GoPublisher_CycloneSubscriber go-DDS → CycloneDDS RTPS writer is interoperable
TestInterop_CyclonePublisher_GoSubscriber CycloneDDS → go-DDS RTPS reader is interoperable
TestInterop_BidirectionalEcho both End-to-end round-trip

Set INTEROP_DOMAIN (default 0) and INTEROP_TIMEOUT (default 15s) to configure the tests.

Using CycloneDDS (production interop)

# Linux
apt-get install -y libcyclonedds-dev

# macOS
brew install cyclonedds

# Build
go build -tags cyclone ./...
go test -tags cyclone ./cyclone/...

CI status

CI Go Reference

Job Platforms Notes
test-mock ubuntu, macOS, Windows × Go 1.22/1.23 race detector, full coverage
test-rtps ubuntu -short (skips 2.2 s two-participant test)
test-cyclone ubuntu-22.04 probe-and-flag — skips cleanly if libcyclonedds-dev is absent
benchmark-smoke ubuntu 1 iteration each, catches panics/deadlocks
fuzz-short ubuntu 10 s per fuzz target
lint ubuntu golangci-lint
dco PR only Signed-off-by check

Roadmap

See ROADMAP.md for per-item context, release notes, and implementation guidance.

Released — v0.1 – v0.3

  • Go interface (Participant, Publisher, Subscriber, QoS)
  • In-process mock — 100% statement coverage
  • CycloneDDS CGo implementation (-tags cyclone)
  • Configurable poll interval (cyclone.Options)
  • Pure-Go RTPS/UDP — no CGo, all platforms
  • Reliable QoS retransmission (HEARTBEAT / ACKNACK)
  • WaitSet — sub-millisecond multi-topic blocking receive
  • DDS-Security plugin interface (NullPlugin, HMAC-SHA-256, AES-256-GCM)
  • TransientLocal durability (last-value cache for late joiners)
  • IPv6 multicast transport (WithIPv6() option, LocatorKindUDPv6)
  • RTPS interop testing with CycloneDDS (Docker Compose + CycloneDDS peer)
  • Typed sentinel errors (ErrClosed, ErrTopicEmptyerrors.Is support)
  • Unicast-only / no-multicast discovery mode (WithNoMulticast, WithPeerLocators)
  • Content-filtered subscriptions (WithFilter)
  • Deadline QoS (WithDeadlineCallback)
  • Large payload fragmentation (RTPS DATA_FRAG submessage)
  • Topic wildcards (sensors/#, vehicle/+/speed)
  • Metrics / statistics API (Metrics() / MetricsProvider)
  • RTPS persistent history (WithPersistentHistory)
  • Real-time web monitor (monitor/ sub-package, SSE, no external dependencies)

Released — v0.4

  • Configurable subscriber channel depth (WithChannelDepth) and back-pressure policy (DropNewest / DropOldest / Block)
  • Structured logging (WithLogger(*slog.Logger), zero-cost when unused)
  • Participant liveliness detection and callback (WithLivelinessCallback, LivelinessGained / LivelinessLost)
  • Graceful shutdown with reliable-ACK drain (CloseWithDrain(ctx), Drainer interface)
  • Multicast data delivery (domain-scoped multicast group, one packet per write)
  • Shared memory transport (shmem/ sub-package, cross-process same-host)
  • INFO_TS submessage — source timestamps in Sample.Timestamp
  • MQTT bridge (bridge/mqtt/ — DDS ↔ MQTT bidirectional, QoS and topic mapping, no external dep)
  • Typed generics (TypedPublisher[T] / TypedSubscriber[T], Codec[T], JSONCodec[T])
  • OpenTelemetry-compatible tracing (WithTracer(dds.Tracer), per-deliver spans, zero dep)

Released — v0.5 — TSN (Time-Sensitive Networking)

  • TSN-extended QoS fields (TransportPriority, LatencyBudget, Lifespan, PublishPeriod, MaxSampleSize)
  • DDS-to-TSN stream model (tsn.Stream descriptor, tsn.StreamConfig, tsn.LoadConfig)
  • VLAN, PCP, and DSCP socket marking (SO_PRIORITY, IP_TOS, Linux-only build tag)
  • Scheduled transmit time (SO_TXTIME + CLOCK_TAI + ETF/taprio qdisc, Linux-only)
  • gPTP / IEEE 802.1AS time base (CLOCK_TAI via syscall.ClockGettime, no external dep)
  • Separate traffic-class sockets (per-PCP socket map, tsnSocketForPCP)
  • TSN-safe discovery (WithSPDPInterval, WithSPDPJitter, WithStaticPeers)
  • Fragmentation bounds for TSN streams (MaxSampleSize guard, splitIntoFragmentsN)
  • External TSN configuration (JSON file, tsn.LoadConfig, WithTSNConfig)

Future — 10-milestone roadmap targeting v0.9

Version Theme
v0.6 Production Runtime + Observability
v0.7 Developer Experience + Deterministic Networking
v0.8 Verification, Edge Performance, Safety
v0.9 Enterprise Security, Dynamic Data, Services

See ROADMAP.md for goals, sub-items, and success criteria per milestone.

Contributing

See CONTRIBUTING.md. All commits require a DCO sign-off.

License

Mozilla Public License v2.0 — see LICENSE.
Copyright (c) 2026 Matt Jones.

Documentation

Overview

Package dds defines the Go interface for Data Distribution Service (DDS) publish/subscribe operations.

The interface is intentionally narrow: it covers the pub/sub primitives needed for vehicle-signal transport and nothing more.

Choose an implementation by importing one of the sub-packages and calling its New function:

import "github.com/SoundMatt/go-DDS/mock"    // in-process, no CGo
import "github.com/SoundMatt/go-DDS/cyclone" // CycloneDDS via CGo
import "github.com/SoundMatt/go-DDS/rtps"    // pure-Go RTPS/UDP

All packages expose a New(Domain) (Participant, error) constructor that satisfies this package's Participant interface.

Index

Examples

Constants

This section is empty.

Variables

View Source
var DefaultQoS = QoS{
	Reliability:  BestEffort,
	Durability:   Volatile,
	HistoryDepth: 1,
}

DefaultQoS is BestEffort + Volatile with implementation-default history.

View Source
var ErrClosed = errors.New("dds: entity is closed")

ErrClosed is returned when an operation is attempted on a closed entity.

View Source
var ErrPayloadTooLarge = errors.New("dds: payload exceeds QoS MaxSampleSize")

ErrPayloadTooLarge is returned when Write is called with a payload that exceeds the MaxSampleSize set in the publisher's QoS.

View Source
var ErrTopicEmpty = errors.New("dds: topic name must not be empty")

ErrTopicEmpty is returned when an empty topic string is passed.

View Source
var ReliableQoS = QoS{
	Reliability:  Reliable,
	Durability:   TransientLocal,
	HistoryDepth: 1,
}

ReliableQoS is Reliable + TransientLocal. Use for actuator commands and any topic where a late-joining subscriber must receive the current value.

Functions

func CloseWithDrain added in v0.4.0

func CloseWithDrain(ctx context.Context, p Participant) error

CloseWithDrain waits for all pending reliable ACKs then closes p. If p does not implement Drainer, it calls p.Close() directly.

Types

type BackPressurePolicy added in v0.4.0

type BackPressurePolicy int

BackPressurePolicy controls what happens when a subscriber's channel is full.

const (
	// DropNewest silently discards the incoming sample when the channel is full.
	// This is the default policy and matches pre-v0.4 behaviour.
	DropNewest BackPressurePolicy = iota
	// DropOldest evicts the oldest queued sample to make room for the new one.
	DropOldest
	// Block waits until the channel has capacity (may block the writer goroutine).
	Block
)

type Codec added in v0.4.0

type Codec[T any] interface {
	Marshal(v T) ([]byte, error)
	Unmarshal(data []byte) (T, error)
}

Codec[T] marshals and unmarshals values of type T to/from []byte. Implement this interface to bind a schema (JSON, protobuf, msgpack, …) to a TypedPublisher or TypedSubscriber.

type DiscoveryMetrics added in v0.6.0

type DiscoveryMetrics struct {
	AnnouncesSent     uint64 // SPDP announcements sent
	AnnouncesReceived uint64 // SPDP announcements received from remote peers
	PeersKnown        uint64 // current number of known remote participants
	PeerEvictions     uint64 // cumulative peers evicted due to lease expiry
	EndpointMatches   uint64 // cumulative topic endpoint matches (local↔remote)
}

DiscoveryMetrics holds cumulative discovery statistics for a participant.

type DiscoveryMetricsProvider added in v0.6.0

type DiscoveryMetricsProvider interface {
	DiscoveryMetrics() DiscoveryMetrics
}

DiscoveryMetricsProvider is implemented by participants that expose discovery-layer statistics.

type Domain

type Domain int

Domain is a DDS domain identifier (0–232 inclusive per the DDS spec). Participants on the same domain and network segment discover each other automatically without a broker.

type Drainer added in v0.4.0

type Drainer interface {
	CloseWithDrain(ctx context.Context) error
}

Drainer is optionally implemented by Participants that support graceful shutdown: waiting for all in-flight reliable writes to be acknowledged before closing. If a Participant does not implement Drainer, CloseWithDrain falls back to a plain Close.

type DurabilityKind

type DurabilityKind int

DurabilityKind controls whether late-joining subscribers receive historical samples that were published before they joined.

const (
	// Volatile discards samples as soon as they are delivered.
	Volatile DurabilityKind = iota
	// TransientLocal retains the last N samples so that late joiners
	// receive current state on subscription.
	TransientLocal
)

type GUID added in v0.4.0

type GUID [16]byte

GUID is a globally unique 16-byte DDS participant or endpoint identifier. The first 12 bytes are the GuidPrefix; the last 4 bytes are the EntityId.

type Health added in v0.6.0

type Health struct {
	// Status is the overall health classification.
	Status HealthStatus
	// Details carries optional per-subsystem messages (may be nil).
	Details map[string]string
}

Health is a point-in-time health snapshot for a participant.

type HealthProvider added in v0.6.0

type HealthProvider interface {
	Health() Health
}

HealthProvider is implemented by participants that expose health reporting.

type HealthStatus added in v0.6.0

type HealthStatus int

HealthStatus is the overall operational status of a participant.

const (
	// HealthOK means the participant is running normally.
	HealthOK HealthStatus = iota
	// HealthDegraded means the participant is running with reduced capability.
	HealthDegraded
	// HealthDown means the participant has been closed or has failed.
	HealthDown
)

func (HealthStatus) String added in v0.6.0

func (h HealthStatus) String() string

String returns a lowercase, JSON-friendly representation of the status.

type JSONCodec added in v0.4.0

type JSONCodec[T any] struct{}

JSONCodec[T] implements Codec[T] using encoding/json. It is a zero-size struct; the zero value is ready to use.

func (JSONCodec[T]) Marshal added in v0.4.0

func (JSONCodec[T]) Marshal(v T) ([]byte, error)

Marshal encodes v to JSON bytes.

func (JSONCodec[T]) Unmarshal added in v0.4.0

func (JSONCodec[T]) Unmarshal(data []byte) (T, error)

Unmarshal decodes data from JSON into T.

type LivelinessEvent added in v0.4.0

type LivelinessEvent int

LivelinessEvent reports whether a remote participant has been discovered or lost its lease.

const (
	// LivelinessGained fires when a new remote participant is discovered via SPDP.
	LivelinessGained LivelinessEvent = iota
	// LivelinessLost fires when a remote participant's lease has expired.
	LivelinessLost
)

type Metrics added in v0.3.0

type Metrics struct {
	WriteCount     uint64
	DeliverCount   uint64
	DropCount      uint64
	BytesWritten   uint64
	BytesDelivered uint64
}

Metrics holds cumulative statistics for a participant.

type MetricsProvider added in v0.3.0

type MetricsProvider interface {
	Metrics() Metrics
}

MetricsProvider is implemented by participants that expose runtime statistics.

type Participant

type Participant interface {
	// NewPublisher creates a writer for the named topic using the given QoS.
	NewPublisher(topic string, qos QoS) (Publisher, error)

	// NewSubscriber creates a reader for the named topic using the given QoS.
	// Optional SubscriberOption values configure content filtering and other
	// per-subscriber policies.
	NewSubscriber(topic string, qos QoS, opts ...SubscriberOption) (Subscriber, error)

	// Close releases all DDS resources held by this participant.
	Close() error
}

Participant is the DDS domain participant — the root factory for all DDS entities. Create one per process per domain. A Participant is safe for concurrent use from multiple goroutines.

type Publisher

type Publisher interface {
	Write(payload []byte) error
	Close() error
}

Publisher writes samples to a single DDS topic. A Publisher is safe for concurrent use from multiple goroutines.

type QoS

type QoS struct {
	Reliability  ReliabilityKind
	Durability   DurabilityKind
	HistoryDepth int           // 0 means implementation default (typically 1)
	Deadline     time.Duration // 0 = disabled; publisher fires DeadlineCallback if no Write within this period

	// TransportPriority sets the network-level priority (maps to VLAN PCP /
	// SO_PRIORITY on Linux). 0 = normal, 1–7 = elevated; 7 is highest.
	TransportPriority int
	// LatencyBudget is the acceptable end-to-end delivery latency for this
	// endpoint. 0 = unspecified. Informational in v0.5; future releases may
	// enforce it via qdisc admission control.
	LatencyBudget time.Duration
	// Lifespan is the sample time-to-live measured from the write timestamp.
	// Samples older than Lifespan are dropped before delivery. 0 = infinite.
	Lifespan time.Duration
	// PublishPeriod is the periodic publish rate for TSN streams. 0 = aperiodic.
	// The application is responsible for calling Write at this rate; the value
	// is used by TSN stream reservation and scheduling.
	PublishPeriod time.Duration
	// MaxSampleSize is the maximum Write payload size in bytes. Write returns
	// ErrPayloadTooLarge if the payload exceeds this limit. 0 = unlimited.
	MaxSampleSize int
}

QoS bundles the policies that govern a single publisher or subscriber endpoint.

type ReliabilityKind

type ReliabilityKind int

ReliabilityKind controls delivery guarantees for a topic endpoint.

const (
	// BestEffort delivers samples without retransmission. Suitable for
	// high-frequency sensor data where occasional loss is acceptable.
	BestEffort ReliabilityKind = iota
	// Reliable retransmits lost samples until acknowledged. Required for
	// command/control and actuator writes.
	Reliable
)

type Sample

type Sample struct {
	Topic     string
	Payload   []byte
	Timestamp time.Time
}

Sample is a single data sample delivered to a Subscriber. Timestamp is the source time of the write; zero means no timestamp was set (INFO_TS was not present in the RTPS message, or the mock transport was used).

type Span added in v0.4.0

type Span interface {
	// SetAttribute attaches a key/value attribute to the span.
	SetAttribute(key, value string)
	// End finalises the span and records it to the tracer backend.
	End()
}

Span is a single tracing span. Call End when the operation is complete.

type SpanAttribute added in v0.4.0

type SpanAttribute struct {
	Key   string
	Value string
}

SpanAttribute is a key/value pair attached to a tracing span.

type Subscriber

type Subscriber interface {
	C() <-chan Sample
	Close() error
}

Subscriber reads samples from a single DDS topic as a Go channel. A Subscriber is safe for concurrent use from multiple goroutines.

type SubscriberConfig added in v0.3.0

type SubscriberConfig struct {
	Filter       func(Sample) bool
	ChannelDepth int                // 0 = implementation default (64)
	BackPressure BackPressurePolicy // default: DropNewest
}

SubscriberConfig holds per-subscriber options applied at construction time. It is exported so that implementation packages (mock, rtps, cyclone) can read the resolved configuration without duplicating the option-merge logic.

func ApplySubscriberOpts added in v0.3.0

func ApplySubscriberOpts(opts []SubscriberOption) SubscriberConfig

ApplySubscriberOpts merges a slice of SubscriberOption into a SubscriberConfig.

func (SubscriberConfig) ChanDepth added in v0.4.0

func (c SubscriberConfig) ChanDepth(defaultDepth int) int

ChanDepth returns the resolved channel depth: cfg.ChannelDepth if > 0, otherwise the provided default.

type SubscriberOption added in v0.3.0

type SubscriberOption func(*SubscriberConfig)

SubscriberOption configures a subscriber at creation time.

func WithBackPressure added in v0.4.0

func WithBackPressure(policy BackPressurePolicy) SubscriberOption

WithBackPressure sets the back-pressure policy applied when the subscriber channel is full. The default policy is DropNewest.

func WithChannelDepth added in v0.4.0

func WithChannelDepth(n int) SubscriberOption

WithChannelDepth sets the capacity of the subscriber's internal channel. A depth of 0 uses the implementation default (typically 64).

func WithFilter added in v0.3.0

func WithFilter(fn func(Sample) bool) SubscriberOption

WithFilter returns a SubscriberOption that applies fn as a content filter. Only samples for which fn returns true are delivered to the subscriber's channel; non-matching samples are discarded silently.

type TopicMetrics added in v0.6.0

type TopicMetrics struct {
	Topic          string
	WriteCount     uint64
	DeliverCount   uint64
	DropCount      uint64
	BytesWritten   uint64
	BytesDelivered uint64
}

TopicMetrics holds per-topic statistics for a single DDS topic.

type TopicMetricsProvider added in v0.6.0

type TopicMetricsProvider interface {
	TopicMetrics() []TopicMetrics
}

TopicMetricsProvider is implemented by participants that expose per-topic statistics. The returned slice contains one entry per observed topic.

type Tracer added in v0.4.0

type Tracer interface {
	Start(ctx context.Context, spanName string, attrs ...SpanAttribute) (context.Context, Span)
}

Tracer is satisfied by any OpenTelemetry-compatible tracer implementation. go-DDS does not import go.opentelemetry.io/otel; callers bridge their tracer to this interface using a thin adapter.

var NoopTracer Tracer = noopTracerImpl{}

NoopTracer is the zero-cost tracer used when no OTel backend is configured.

type TypedPublisher added in v0.4.0

type TypedPublisher[T any] struct {
	// contains filtered or unexported fields
}

TypedPublisher[T] wraps a Publisher to encode values with a Codec before writing.

func NewTypedPublisher added in v0.4.0

func NewTypedPublisher[T any](pub Publisher, codec Codec[T]) *TypedPublisher[T]

NewTypedPublisher wraps pub so that Write accepts T values, encoding them with codec before passing bytes to the underlying Publisher.

func (*TypedPublisher[T]) Close added in v0.4.0

func (tp *TypedPublisher[T]) Close() error

Close closes the underlying publisher.

func (*TypedPublisher[T]) Write added in v0.4.0

func (tp *TypedPublisher[T]) Write(v T) error

Write encodes v with the configured Codec and writes it to the underlying publisher.

type TypedSample added in v0.4.0

type TypedSample[T any] struct {
	Topic     string
	Value     T
	Timestamp time.Time
}

TypedSample[T] is a decoded sample delivered by TypedSubscriber[T].

type TypedSubscriber added in v0.4.0

type TypedSubscriber[T any] struct {
	// contains filtered or unexported fields
}

TypedSubscriber[T] wraps a Subscriber to decode samples with a Codec. Samples that fail to decode are silently dropped.

func NewTypedSubscriber added in v0.4.0

func NewTypedSubscriber[T any](sub Subscriber, codec Codec[T]) *TypedSubscriber[T]

NewTypedSubscriber wraps sub so that its channel delivers TypedSample[T] values decoded with codec. A background goroutine pumps samples from sub.C().

func (*TypedSubscriber[T]) C added in v0.4.0

func (ts *TypedSubscriber[T]) C() <-chan TypedSample[T]

C returns the typed sample channel.

func (*TypedSubscriber[T]) Close added in v0.4.0

func (ts *TypedSubscriber[T]) Close() error

Close stops the pump goroutine and closes the underlying subscriber.

type WaitSet

type WaitSet struct {
	// contains filtered or unexported fields
}

WaitSet multiplexes over a set of subscribers, blocking until any one of them delivers a sample.

func NewWaitSet

func NewWaitSet(subs ...Subscriber) *WaitSet

NewWaitSet creates a WaitSet that monitors the given subscribers.

Example

ExampleNewWaitSet demonstrates multiplexing over two subscribers without a polling loop. The WaitSet blocks until any attached subscriber delivers a sample, then returns it together with the subscriber it arrived on.

package main

import (
	"context"
	"fmt"
	"time"

	dds "github.com/SoundMatt/go-DDS"
	"github.com/SoundMatt/go-DDS/mock"
)

func main() {
	p, _ := mock.New(dds.Domain(0))
	defer p.Close()

	subTemp, _ := p.NewSubscriber("sensors/temp", dds.DefaultQoS)
	subSpeed, _ := p.NewSubscriber("vehicle/speed", dds.DefaultQoS)
	defer subTemp.Close()
	defer subSpeed.Close()

	pubTemp, _ := p.NewPublisher("sensors/temp", dds.DefaultQoS)
	defer pubTemp.Close()

	_ = pubTemp.Write([]byte("21.5"))

	ws := dds.NewWaitSet(subTemp, subSpeed)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	sample, sub, err := ws.Wait(ctx)
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	switch sub {
	case subTemp:
		fmt.Println("temp:", string(sample.Payload))
	case subSpeed:
		fmt.Println("speed:", string(sample.Payload))
	}
}
Output:
temp: 21.5

func (*WaitSet) Wait

func (ws *WaitSet) Wait(ctx context.Context) (Sample, Subscriber, error)

Wait blocks until a sample is available on any attached subscriber, or until ctx is cancelled.

Directories

Path Synopsis
bridge
mqtt
Package mqtt provides a bidirectional bridge between a DDS participant and an MQTT broker.
Package mqtt provides a bidirectional bridge between a DDS participant and an MQTT broker.
Package config provides JSON-based configuration for DDS participants.
Package config provides JSON-based configuration for DDS participants.
Package cyclone provides a CycloneDDS-backed implementation of the dds interfaces via CGo.
Package cyclone provides a CycloneDDS-backed implementation of the dds interfaces via CGo.
Package interop contains RTPS wire-compatibility tests that require a live CycloneDDS peer.
Package interop contains RTPS wire-compatibility tests that require a live CycloneDDS peer.
Package mock provides an in-process, CGo-free implementation of the dds interfaces.
Package mock provides an in-process, CGo-free implementation of the dds interfaces.
Package monitor provides a real-time web dashboard for a DDS participant.
Package monitor provides a real-time web dashboard for a DDS participant.
Package rtps provides a pure-Go RTPS/UDP implementation of the dds interfaces.
Package rtps provides a pure-Go RTPS/UDP implementation of the dds interfaces.
Package security provides pluggable transport-security for go-DDS.
Package security provides pluggable transport-security for go-DDS.
Package shmem provides a cross-process shared-memory transport for same-host DDS communication.
Package shmem provides a cross-process shared-memory transport for same-host DDS communication.
Package tsn provides Time-Sensitive Networking stream descriptors and configuration utilities following the OMG DDS-TSN specification.
Package tsn provides Time-Sensitive Networking stream descriptors and configuration utilities following the OMG DDS-TSN specification.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL