rtps

package
v0.47.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2026 License: MPL-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package rtps provides a pure-Go RTPS/UDP implementation of the dds interfaces. It requires no CGo and no installed system libraries; it speaks the OMG RTPS 2.3 wire protocol over UDP multicast/unicast so it can interoperate with any standards-compliant DDS implementation.

Create a participant with rtps.New. For reliable delivery, pass dds.ReliableQoS to NewPublisher/NewSubscriber. For payload-level security, pass rtps.WithSecurity to rtps.New:

p, err := rtps.New(dds.Domain(0), rtps.WithSecurity(myPlugin))

Package rtps provides a pure-Go RTPS/UDP implementation of the dds interfaces. It does not use CGo and runs on any platform that supports UDP sockets.

Two participants in the same process or on different hosts in the same LAN can exchange samples via standard RTPS wire protocol (OMG spec, version 2.3). Use the mock package for in-process testing; use this package when real network transport or interoperability with other DDS implementations is required.

Port Assignment

Ports follow the RTPS 2.3 §9.6.1 formula:

metaMulticast(domain) = 7400 + 250*domain
metaUnicast(domain,i) = 7400 + 250*domain + 10 + 2*i
dataUnicast(domain,i) = 7400 + 250*domain + 11 + 2*i

Discovery uses the multicast group 239.255.0.1.

Notes

  • IPv6 transport is supported via WithIPv6 but has had limited interop testing.

Index

Constants

View Source
const (
	EndpointSPDPAnnouncer    uint32 = 1 << 0
	EndpointSPDPDetector     uint32 = 1 << 1
	EndpointSEDPPubAnnouncer uint32 = 1 << 2
	EndpointSEDPPubDetector  uint32 = 1 << 3
	EndpointSEDPSubAnnouncer uint32 = 1 << 4
	EndpointSEDPSubDetector  uint32 = 1 << 5
)

Builtin endpoint availability bitmask (§8.5.4.3 / §9.6.2.2).

View Source
const (
	LocatorKindInvalid = -1
	LocatorKindUDPv4   = 1
	LocatorKindUDPv6   = 2
)

Variables

View Source
var (
	EntityIdParticipant        = EntityId{0x00, 0x00, 0x01, 0xC1}
	EntityIdSPDPWriter         = EntityId{0x00, 0x01, 0x00, 0xC2}
	EntityIdSPDPReader         = EntityId{0x00, 0x01, 0x00, 0xC7}
	EntityIdSEDPPubWriter      = EntityId{0x00, 0x00, 0x03, 0xC2}
	EntityIdSEDPPubReader      = EntityId{0x00, 0x00, 0x03, 0xC7}
	EntityIdSEDPSubWriter      = EntityId{0x00, 0x00, 0x04, 0xC2}
	EntityIdSEDPSubReader      = EntityId{0x00, 0x00, 0x04, 0xC7}
	EntityIdUnknown            = EntityId{0x00, 0x00, 0x00, 0x00}
	EntityIdBuiltinParticipant = EntityId{0x00, 0x00, 0x01, 0xC1}
)

Well-known entity IDs per RTPS 2.3 Table 9.1.

Functions

func New

func New(domain dds.Domain, opts ...Option) (dds.Participant, error)

New creates an RTPS participant joined to the given DDS domain. It binds UDP sockets, starts SPDP/SEDP, and returns a dds.Participant.

func NewLoaningPublisher added in v0.16.0

func NewLoaningPublisher(p dds.Participant, topic string, qos dds.QoS, bufSize int) (dds.LoaningPublisher, error)

NewLoaningPublisher creates a LoaningPublisher for topic using the given QoS. bufSize is the maximum sample size the pool will pre-allocate; pass 0 for the pool default (4096 bytes).

func TopicMatches added in v0.3.0

func TopicMatches(pattern, topic string) bool

TopicMatches reports whether pattern (which may contain MQTT-style + and # wildcards) matches the concrete topic name.

Rules:

  • '+' matches exactly one topic level (no slashes).
  • '#' at the end of a segment matches zero or more remaining levels.
  • Literal segments must match exactly (case-sensitive).
  • "foo/" and "foo" are distinct topics (two levels vs one level).

Types

type AccessController added in v0.46.0

type AccessController interface {
	CanRead(topic string) bool
	CanWrite(topic string) bool
}

AccessController authorises endpoint creation per topic. security.AccessPolicy satisfies it. When configured via WithAccessControl, NewPublisher is rejected for topics that fail CanWrite and NewSubscriber for topics that fail CanRead.

type AckNack

type AckNack struct {
	ReaderEntityId EntityId
	WriterEntityId EntityId
	Base           SequenceNumber // first missing sequence number
	Bitmap         uint32         // bit N set → Base+N is missing
	Count          int32
}

AckNack holds the parsed fields of an ACKNACK submessage. We use a fixed 32-bit bitmap (NumBits always 32, one bitmap word).

type DataFrag added in v0.3.0

type DataFrag struct {
	WriterEntityId      EntityId
	ReaderEntityId      EntityId
	WriterSeqNum        SequenceNumber
	FragmentStartingNum uint32 // 1-based index of the first fragment in this submessage
	FragmentsInSubmsg   uint16 // number of fragments in this submessage
	FragmentSize        uint16 // size of each fragment in bytes (last may be smaller)
	DataSize            uint32 // total (unfragmented) data size in bytes
	Payload             []byte // raw bytes of the fragment(s)
}

DataFrag carries the fields of an RTPS DATA_FRAG submessage.

type DataSubmessage

type DataSubmessage struct {
	ReaderEntityId EntityId
	WriterEntityId EntityId
	SeqNum         SequenceNumber
	Payload        []byte // nil when Data flag not set
}

DataSubmessage holds the parsed fields of a DATA submessage.

type DiscoveryPlugin added in v0.9.1

type DiscoveryPlugin interface {
	// SignDiscovery returns an authentication tag for the given GUID prefix
	// (12 bytes). The tag is embedded in the SPDP announcement.
	SignDiscovery(guidPrefix []byte) []byte
	// VerifyDiscovery returns true when tag is a valid authentication tag for
	// guidPrefix. A nil or empty tag must return false.
	VerifyDiscovery(guidPrefix, tag []byte) bool
}

DiscoveryPlugin authenticates SPDP participant-discovery announcements. When configured via WithDiscoverySecurity, outbound announcements are tagged with a token produced by SignDiscovery, and inbound announcements whose token does not verify are silently discarded.

The built-in implementation is security.HMACDiscoveryPlugin.

type EndpointPlugin added in v0.10.0

type EndpointPlugin interface {
	// SignEndpoint returns a tag for the endpoint identified by guidPrefix and
	// topicName. The tag is embedded in the SEDP announcement.
	SignEndpoint(guidPrefix []byte, topic string) []byte
	// VerifyEndpoint returns true when tag is a valid authentication tag for
	// the given guidPrefix and topicName. A nil or empty tag must return false.
	VerifyEndpoint(guidPrefix []byte, topic string, tag []byte) bool
}

EndpointPlugin optionally extends a DiscoveryPlugin to authenticate SEDP endpoint announcements. Participants that implement this interface sign outbound endpoint announcements and reject inbound announcements whose tag does not verify. The built-in implementation is security.HMACDiscoveryPlugin.

type EntityId

type EntityId [4]byte

EntityId identifies a specific endpoint within a participant (§9.3.1).

func (EntityId) String

func (e EntityId) String() string

type GUID

type GUID struct {
	Prefix GuidPrefix
	Entity EntityId
}

GUID globally identifies a DDS entity: participant + endpoint.

func (GUID) String

func (g GUID) String() string

type Gap added in v0.2.1

type Gap struct {
	ReaderEntityId EntityId
	WriterEntityId EntityId
	GapStart       SequenceNumber // first irrelevant SN (inclusive)
	GapEnd         SequenceNumber // last irrelevant SN (inclusive)
}

Gap indicates a contiguous range of sequence numbers that are permanently unavailable from this writer (evicted from history). Receiving a GAP tells the reader to advance its expected-SN pointer past the covered range.

type GuidPrefix

type GuidPrefix [12]byte

GuidPrefix is the 12-byte participant identifier (RTPS 2.3 §9.3.1).

func (GuidPrefix) String

func (p GuidPrefix) String() string
type Header struct {
	ProtocolVersion [2]byte // {major, minor}
	VendorId        [2]byte
	GuidPrefix      GuidPrefix
}

Header is the fixed 20-byte RTPS message header (§9.4.1).

type Heartbeat

type Heartbeat struct {
	ReaderEntityId EntityId
	WriterEntityId EntityId
	FirstSN        SequenceNumber // lowest SN still in the writer's history
	LastSN         SequenceNumber // highest SN sent so far
	Count          int32          // monotonically increasing per writer
}

Heartbeat holds the parsed fields of a HEARTBEAT submessage.

type Locator

type Locator struct {
	Kind    int32
	Port    uint32
	Address [16]byte
}

Locator_t: 24-byte transport endpoint address (RTPS 2.3 §9.3.2).

type Option

type Option func(*participant)

Option configures a Participant at creation time.

func WithAccessControl added in v0.46.0

func WithAccessControl(ac AccessController) Option

WithAccessControl enforces a topic ACL on this participant. Enforcement is opt-in: with no controller configured, all topics are permitted. It composes with WithSecurity (encryption) and WithAntiReplay.

func WithAntiReplay added in v0.46.0

func WithAntiReplay(rc ReplayChecker) Option

WithAntiReplay enables anti-replay protection on inbound samples. Enforcement is opt-in: with no checker configured, no samples are dropped. It composes with WithSecurity (encryption) and WithAccessControl.

func WithConfig added in v0.6.0

func WithConfig(cfg *config.ParticipantConfig) Option

WithConfig applies all fields from cfg to the participant. It is equivalent to calling the corresponding WithXxx options individually and is intended for use with JSON configuration files loaded via config.LoadConfig.

func WithContext added in v0.10.0

func WithContext(ctx context.Context) Option

WithContext returns an Option that closes the participant when ctx is done. This is the idiomatic Go shutdown pattern: pass a context with a cancel function or deadline to tie the participant's lifetime to an outer scope.

func WithDeadlineCallback added in v0.3.0

func WithDeadlineCallback(fn func(topic string)) Option

WithDeadlineCallback sets a function that is called when a publisher has not written for longer than its QoS.Deadline period.

func WithDiscoverySecurity added in v0.9.1

func WithDiscoverySecurity(plugin DiscoveryPlugin) Option

WithSecurity returns an Option that applies plugin to every payload transmitted and received by this participant. All peers that communicate with this participant must use the same plugin and key material. WithDiscoverySecurity returns an Option that applies plugin to SPDP discovery announcements. Outbound announcements are signed; inbound announcements with missing or invalid tokens are rejected.

func WithHeartbeatPeriod added in v0.6.0

func WithHeartbeatPeriod(d time.Duration) Option

WithHeartbeatPeriod sets the period of the periodic HEARTBEAT ticker used by reliable writers. The default is 200 ms. Use shorter values for low-latency reliable delivery; use longer values to reduce control traffic.

func WithIPv6 added in v0.2.0

func WithIPv6() Option

WithIPv6 enables the IPv6 multicast transport. When set, the participant binds an additional pair of IPv6 UDP sockets and joins the RTPS IPv6 discovery group (FF03::1). IPv4 sockets are still created so the participant is reachable from both IPv4 and IPv6 peers.

func WithLivelinessCallback added in v0.4.0

func WithLivelinessCallback(fn func(dds.GUID, dds.LivelinessEvent)) Option

WithLivelinessCallback registers fn to be called when a remote participant is discovered (LivelinessGained) or loses its lease (LivelinessLost). The GUID passed is the 16-byte participant GUID (prefix + built-in entity 0x000001c1).

func WithLogger added in v0.4.0

func WithLogger(l *slog.Logger) Option

WithLogger sets the structured logger used by this participant. Passing nil (the default) disables all log output with zero overhead.

func WithNoMulticast added in v0.3.0

func WithNoMulticast() Option

WithNoMulticast disables SPDP multicast discovery. The participant will not join or send to 239.255.0.1; use WithPeerLocators to supply peers manually.

func WithPeerLocators added in v0.3.0

func WithPeerLocators(addrs ...string) Option

WithPeerLocators adds static peer unicast addresses for unicast-only discovery. Each address must be a host:port string parseable by net.ResolveUDPAddr.

func WithPersistentHistory added in v0.3.0

func WithPersistentHistory(dir string) Option

WithPersistentHistory configures TransientLocal durability to be backed by files in dir. On participant start the last sample for each topic is loaded from disk; on every Write the new last sample is flushed to disk. The file name for topic T is <dir>/topic-<sanitised(T)>.bin. Each file holds a 4-byte length prefix followed by the raw sample bytes.

The option is a no-op when dir is "". Failures during load are silently ignored (missing files are normal on first run); failures during flush are also silently ignored so that a write to a read-only directory does not block the caller.

func WithSPDPInterval added in v0.5.0

func WithSPDPInterval(d time.Duration) Option

WithSPDPInterval sets the SPDP participant announcement interval. The default is 2 seconds. TSN networks with bounded-latency requirements may prefer longer intervals (e.g. 10 s) to reduce discovery overhead.

func WithSPDPJitter added in v0.5.0

func WithSPDPJitter(d time.Duration) Option

WithSPDPJitter adds a random delay of up to d before each SPDP announcement. This prevents synchronised floods when many participants start simultaneously on a TSN segment. A typical value is 500 ms.

func WithSecurity

func WithSecurity(plugin SecurityPlugin) Option

func WithStaticPeers added in v0.5.0

func WithStaticPeers(addrs ...string) Option

WithStaticPeers adds static peer unicast addresses for unicast-only discovery on TSN networks where SPDP multicast is undesirable. Equivalent to WithPeerLocators; provided for TSN configuration clarity.

func WithTSNConfig added in v0.5.0

func WithTSNConfig(cfg *tsn.StreamConfig) Option

WithTSNConfig registers a TSN stream configuration with the participant. When a publisher is created for a topic in the config, the participant allocates a dedicated socket for that traffic class, marks it with SO_PRIORITY / IP_TOS, and (on Linux) enables SO_TXTIME if TxOffsetUS > 0.

func WithTracer added in v0.4.0

func WithTracer(t dds.Tracer) Option

WithTracer wires an OpenTelemetry-compatible Tracer into the participant. Pass dds.NoopTracer (the default) to disable tracing with zero cost.

type ReplayChecker added in v0.46.0

type ReplayChecker interface {
	Check(seq uint64, ts time.Time) error
}

ReplayChecker rejects replayed samples. security.ReplayGuard satisfies it. When configured via WithAntiReplay, every inbound DATA sample is checked and dropped (not delivered) if Check reports it as a replay.

type SecurityPlugin

type SecurityPlugin interface {
	Seal(plaintext []byte) ([]byte, error)
	Open(ciphertext []byte) ([]byte, error)
}

SecurityPlugin is satisfied by any type that can seal (encrypt / sign) and open (decrypt / verify) a DDS payload byte slice. Built-in implementations live in the security sub-package; NullPlugin (pass-through) is also available there for development and testing.

type SequenceNumber

type SequenceNumber struct {
	High int32
	Low  uint32
}

SequenceNumber is the 8-byte writer sequence number (§9.3.2).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL