Documentation
¶
Overview ¶
Package rtps provides a pure-Go RTPS/UDP implementation of the dds interfaces. It requires no CGo and no installed system libraries; it speaks the OMG RTPS 2.3 wire protocol over UDP multicast/unicast so it can interoperate with any standards-compliant DDS implementation.
Create a participant with rtps.New. For reliable delivery, pass dds.ReliableQoS to NewPublisher/NewSubscriber. For payload-level security, pass rtps.WithSecurity to rtps.New:
p, err := rtps.New(dds.Domain(0), rtps.WithSecurity(myPlugin))
Package rtps provides a pure-Go RTPS/UDP implementation of the dds interfaces. It does not use CGo and runs on any platform that supports UDP sockets.
Two participants in the same process or on different hosts in the same LAN can exchange samples via standard RTPS wire protocol (OMG spec, version 2.3). Use the mock package for in-process testing; use this package when real network transport or interoperability with other DDS implementations is required.
Port Assignment ¶
Ports follow the RTPS 2.3 §9.6.1 formula:
metaMulticast(domain) = 7400 + 250*domain metaUnicast(domain,i) = 7400 + 250*domain + 10 + 2*i dataUnicast(domain,i) = 7400 + 250*domain + 11 + 2*i
Discovery uses the multicast group 239.255.0.1.
Notes ¶
- IPv6 transport is supported via WithIPv6 but has had limited interop testing.
Index ¶
- Constants
- Variables
- func New(domain dds.Domain, opts ...Option) (dds.Participant, error)
- func NewLoaningPublisher(p dds.Participant, topic string, qos dds.QoS, bufSize int) (dds.LoaningPublisher, error)
- func TopicMatches(pattern, topic string) bool
- type AccessController
- type AckNack
- type DataFrag
- type DataSubmessage
- type DiscoveryPlugin
- type EndpointPlugin
- type EntityId
- type GUID
- type Gap
- type GuidPrefix
- type Header
- type Heartbeat
- type Locator
- type Option
- func WithAccessControl(ac AccessController) Option
- func WithAntiReplay(rc ReplayChecker) Option
- func WithConfig(cfg *config.ParticipantConfig) Option
- func WithContext(ctx context.Context) Option
- func WithDeadlineCallback(fn func(topic string)) Option
- func WithDiscoverySecurity(plugin DiscoveryPlugin) Option
- func WithHeartbeatPeriod(d time.Duration) Option
- func WithIPv6() Option
- func WithLivelinessCallback(fn func(dds.GUID, dds.LivelinessEvent)) Option
- func WithLogger(l *slog.Logger) Option
- func WithNoMulticast() Option
- func WithPeerLocators(addrs ...string) Option
- func WithPersistentHistory(dir string) Option
- func WithSPDPInterval(d time.Duration) Option
- func WithSPDPJitter(d time.Duration) Option
- func WithSecurity(plugin SecurityPlugin) Option
- func WithStaticPeers(addrs ...string) Option
- func WithTSNConfig(cfg *tsn.StreamConfig) Option
- func WithTracer(t dds.Tracer) Option
- type ReplayChecker
- type SecurityPlugin
- type SequenceNumber
Constants ¶
const ( EndpointSPDPAnnouncer uint32 = 1 << 0 EndpointSPDPDetector uint32 = 1 << 1 EndpointSEDPPubAnnouncer uint32 = 1 << 2 EndpointSEDPPubDetector uint32 = 1 << 3 EndpointSEDPSubAnnouncer uint32 = 1 << 4 EndpointSEDPSubDetector uint32 = 1 << 5 )
Builtin endpoint availability bitmask (§8.5.4.3 / §9.6.2.2).
const ( LocatorKindInvalid = -1 LocatorKindUDPv4 = 1 LocatorKindUDPv6 = 2 )
Variables ¶
var ( EntityIdParticipant = EntityId{0x00, 0x00, 0x01, 0xC1} EntityIdSPDPWriter = EntityId{0x00, 0x01, 0x00, 0xC2} EntityIdSPDPReader = EntityId{0x00, 0x01, 0x00, 0xC7} EntityIdSEDPPubWriter = EntityId{0x00, 0x00, 0x03, 0xC2} EntityIdSEDPPubReader = EntityId{0x00, 0x00, 0x03, 0xC7} EntityIdSEDPSubWriter = EntityId{0x00, 0x00, 0x04, 0xC2} EntityIdSEDPSubReader = EntityId{0x00, 0x00, 0x04, 0xC7} EntityIdUnknown = EntityId{0x00, 0x00, 0x00, 0x00} EntityIdBuiltinParticipant = EntityId{0x00, 0x00, 0x01, 0xC1} )
Well-known entity IDs per RTPS 2.3 Table 9.1.
Functions ¶
func New ¶
New creates an RTPS participant joined to the given DDS domain. It binds UDP sockets, starts SPDP/SEDP, and returns a dds.Participant.
func NewLoaningPublisher ¶ added in v0.16.0
func NewLoaningPublisher(p dds.Participant, topic string, qos dds.QoS, bufSize int) (dds.LoaningPublisher, error)
NewLoaningPublisher creates a LoaningPublisher for topic using the given QoS. bufSize is the maximum sample size the pool will pre-allocate; pass 0 for the pool default (4096 bytes).
func TopicMatches ¶ added in v0.3.0
TopicMatches reports whether pattern (which may contain MQTT-style + and # wildcards) matches the concrete topic name.
Rules:
- '+' matches exactly one topic level (no slashes).
- '#' at the end of a segment matches zero or more remaining levels.
- Literal segments must match exactly (case-sensitive).
- "foo/" and "foo" are distinct topics (two levels vs one level).
Types ¶
type AccessController ¶ added in v0.46.0
AccessController authorises endpoint creation per topic. security.AccessPolicy satisfies it. When configured via WithAccessControl, NewPublisher is rejected for topics that fail CanWrite and NewSubscriber for topics that fail CanRead.
type AckNack ¶
type AckNack struct {
ReaderEntityId EntityId
WriterEntityId EntityId
Base SequenceNumber // first missing sequence number
Bitmap uint32 // bit N set → Base+N is missing
Count int32
}
AckNack holds the parsed fields of an ACKNACK submessage. We use a fixed 32-bit bitmap (NumBits always 32, one bitmap word).
type DataFrag ¶ added in v0.3.0
type DataFrag struct {
WriterEntityId EntityId
ReaderEntityId EntityId
WriterSeqNum SequenceNumber
FragmentStartingNum uint32 // 1-based index of the first fragment in this submessage
FragmentsInSubmsg uint16 // number of fragments in this submessage
FragmentSize uint16 // size of each fragment in bytes (last may be smaller)
DataSize uint32 // total (unfragmented) data size in bytes
Payload []byte // raw bytes of the fragment(s)
}
DataFrag carries the fields of an RTPS DATA_FRAG submessage.
type DataSubmessage ¶
type DataSubmessage struct {
ReaderEntityId EntityId
WriterEntityId EntityId
SeqNum SequenceNumber
Payload []byte // nil when Data flag not set
}
DataSubmessage holds the parsed fields of a DATA submessage.
type DiscoveryPlugin ¶ added in v0.9.1
type DiscoveryPlugin interface {
// SignDiscovery returns an authentication tag for the given GUID prefix
// (12 bytes). The tag is embedded in the SPDP announcement.
SignDiscovery(guidPrefix []byte) []byte
// VerifyDiscovery returns true when tag is a valid authentication tag for
// guidPrefix. A nil or empty tag must return false.
VerifyDiscovery(guidPrefix, tag []byte) bool
}
DiscoveryPlugin authenticates SPDP participant-discovery announcements. When configured via WithDiscoverySecurity, outbound announcements are tagged with a token produced by SignDiscovery, and inbound announcements whose token does not verify are silently discarded.
The built-in implementation is security.HMACDiscoveryPlugin.
type EndpointPlugin ¶ added in v0.10.0
type EndpointPlugin interface {
// SignEndpoint returns a tag for the endpoint identified by guidPrefix and
// topicName. The tag is embedded in the SEDP announcement.
SignEndpoint(guidPrefix []byte, topic string) []byte
// VerifyEndpoint returns true when tag is a valid authentication tag for
// the given guidPrefix and topicName. A nil or empty tag must return false.
VerifyEndpoint(guidPrefix []byte, topic string, tag []byte) bool
}
EndpointPlugin optionally extends a DiscoveryPlugin to authenticate SEDP endpoint announcements. Participants that implement this interface sign outbound endpoint announcements and reject inbound announcements whose tag does not verify. The built-in implementation is security.HMACDiscoveryPlugin.
type EntityId ¶
type EntityId [4]byte
EntityId identifies a specific endpoint within a participant (§9.3.1).
type GUID ¶
type GUID struct {
Prefix GuidPrefix
Entity EntityId
}
GUID globally identifies a DDS entity: participant + endpoint.
type Gap ¶ added in v0.2.1
type Gap struct {
ReaderEntityId EntityId
WriterEntityId EntityId
GapStart SequenceNumber // first irrelevant SN (inclusive)
GapEnd SequenceNumber // last irrelevant SN (inclusive)
}
Gap indicates a contiguous range of sequence numbers that are permanently unavailable from this writer (evicted from history). Receiving a GAP tells the reader to advance its expected-SN pointer past the covered range.
type GuidPrefix ¶
type GuidPrefix [12]byte
GuidPrefix is the 12-byte participant identifier (RTPS 2.3 §9.3.1).
func (GuidPrefix) String ¶
func (p GuidPrefix) String() string
type Header ¶
type Header struct {
ProtocolVersion [2]byte // {major, minor}
VendorId [2]byte
GuidPrefix GuidPrefix
}
Header is the fixed 20-byte RTPS message header (§9.4.1).
type Heartbeat ¶
type Heartbeat struct {
ReaderEntityId EntityId
WriterEntityId EntityId
FirstSN SequenceNumber // lowest SN still in the writer's history
LastSN SequenceNumber // highest SN sent so far
Count int32 // monotonically increasing per writer
}
Heartbeat holds the parsed fields of a HEARTBEAT submessage.
type Option ¶
type Option func(*participant)
Option configures a Participant at creation time.
func WithAccessControl ¶ added in v0.46.0
func WithAccessControl(ac AccessController) Option
WithAccessControl enforces a topic ACL on this participant. Enforcement is opt-in: with no controller configured, all topics are permitted. It composes with WithSecurity (encryption) and WithAntiReplay.
func WithAntiReplay ¶ added in v0.46.0
func WithAntiReplay(rc ReplayChecker) Option
WithAntiReplay enables anti-replay protection on inbound samples. Enforcement is opt-in: with no checker configured, no samples are dropped. It composes with WithSecurity (encryption) and WithAccessControl.
func WithConfig ¶ added in v0.6.0
func WithConfig(cfg *config.ParticipantConfig) Option
WithConfig applies all fields from cfg to the participant. It is equivalent to calling the corresponding WithXxx options individually and is intended for use with JSON configuration files loaded via config.LoadConfig.
func WithContext ¶ added in v0.10.0
WithContext returns an Option that closes the participant when ctx is done. This is the idiomatic Go shutdown pattern: pass a context with a cancel function or deadline to tie the participant's lifetime to an outer scope.
func WithDeadlineCallback ¶ added in v0.3.0
WithDeadlineCallback sets a function that is called when a publisher has not written for longer than its QoS.Deadline period.
func WithDiscoverySecurity ¶ added in v0.9.1
func WithDiscoverySecurity(plugin DiscoveryPlugin) Option
WithSecurity returns an Option that applies plugin to every payload transmitted and received by this participant. All peers that communicate with this participant must use the same plugin and key material. WithDiscoverySecurity returns an Option that applies plugin to SPDP discovery announcements. Outbound announcements are signed; inbound announcements with missing or invalid tokens are rejected.
func WithHeartbeatPeriod ¶ added in v0.6.0
WithHeartbeatPeriod sets the period of the periodic HEARTBEAT ticker used by reliable writers. The default is 200 ms. Use shorter values for low-latency reliable delivery; use longer values to reduce control traffic.
func WithIPv6 ¶ added in v0.2.0
func WithIPv6() Option
WithIPv6 enables the IPv6 multicast transport. When set, the participant binds an additional pair of IPv6 UDP sockets and joins the RTPS IPv6 discovery group (FF03::1). IPv4 sockets are still created so the participant is reachable from both IPv4 and IPv6 peers.
func WithLivelinessCallback ¶ added in v0.4.0
func WithLivelinessCallback(fn func(dds.GUID, dds.LivelinessEvent)) Option
WithLivelinessCallback registers fn to be called when a remote participant is discovered (LivelinessGained) or loses its lease (LivelinessLost). The GUID passed is the 16-byte participant GUID (prefix + built-in entity 0x000001c1).
func WithLogger ¶ added in v0.4.0
WithLogger sets the structured logger used by this participant. Passing nil (the default) disables all log output with zero overhead.
func WithNoMulticast ¶ added in v0.3.0
func WithNoMulticast() Option
WithNoMulticast disables SPDP multicast discovery. The participant will not join or send to 239.255.0.1; use WithPeerLocators to supply peers manually.
func WithPeerLocators ¶ added in v0.3.0
WithPeerLocators adds static peer unicast addresses for unicast-only discovery. Each address must be a host:port string parseable by net.ResolveUDPAddr.
func WithPersistentHistory ¶ added in v0.3.0
WithPersistentHistory configures TransientLocal durability to be backed by files in dir. On participant start the last sample for each topic is loaded from disk; on every Write the new last sample is flushed to disk. The file name for topic T is <dir>/topic-<sanitised(T)>.bin. Each file holds a 4-byte length prefix followed by the raw sample bytes.
The option is a no-op when dir is "". Failures during load are silently ignored (missing files are normal on first run); failures during flush are also silently ignored so that a write to a read-only directory does not block the caller.
func WithSPDPInterval ¶ added in v0.5.0
WithSPDPInterval sets the SPDP participant announcement interval. The default is 2 seconds. TSN networks with bounded-latency requirements may prefer longer intervals (e.g. 10 s) to reduce discovery overhead.
func WithSPDPJitter ¶ added in v0.5.0
WithSPDPJitter adds a random delay of up to d before each SPDP announcement. This prevents synchronised floods when many participants start simultaneously on a TSN segment. A typical value is 500 ms.
func WithSecurity ¶
func WithSecurity(plugin SecurityPlugin) Option
func WithStaticPeers ¶ added in v0.5.0
WithStaticPeers adds static peer unicast addresses for unicast-only discovery on TSN networks where SPDP multicast is undesirable. Equivalent to WithPeerLocators; provided for TSN configuration clarity.
func WithTSNConfig ¶ added in v0.5.0
func WithTSNConfig(cfg *tsn.StreamConfig) Option
WithTSNConfig registers a TSN stream configuration with the participant. When a publisher is created for a topic in the config, the participant allocates a dedicated socket for that traffic class, marks it with SO_PRIORITY / IP_TOS, and (on Linux) enables SO_TXTIME if TxOffsetUS > 0.
func WithTracer ¶ added in v0.4.0
WithTracer wires an OpenTelemetry-compatible Tracer into the participant. Pass dds.NoopTracer (the default) to disable tracing with zero cost.
type ReplayChecker ¶ added in v0.46.0
ReplayChecker rejects replayed samples. security.ReplayGuard satisfies it. When configured via WithAntiReplay, every inbound DATA sample is checked and dropped (not delivered) if Check reports it as a replay.
type SecurityPlugin ¶
type SecurityPlugin interface {
Seal(plaintext []byte) ([]byte, error)
Open(ciphertext []byte) ([]byte, error)
}
SecurityPlugin is satisfied by any type that can seal (encrypt / sign) and open (decrypt / verify) a DDS payload byte slice. Built-in implementations live in the security sub-package; NullPlugin (pass-through) is also available there for development and testing.
type SequenceNumber ¶
SequenceNumber is the 8-byte writer sequence number (§9.3.2).