Documentation
¶
Overview ¶
Package dds defines the Go interface for Data Distribution Service (DDS) publish/subscribe operations.
The interface is intentionally narrow: it covers the pub/sub primitives needed for vehicle-signal transport and nothing more.
Choose an implementation by importing one of the sub-packages and calling its New function:
import "github.com/SoundMatt/go-DDS/mock" // in-process, no CGo import "github.com/SoundMatt/go-DDS/cyclone" // CycloneDDS via CGo import "github.com/SoundMatt/go-DDS/rtps" // pure-Go RTPS/UDP
All packages expose a New(Domain) (Participant, error) constructor that satisfies this package's Participant interface.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var DefaultQoS = QoS{ Reliability: BestEffort, Durability: Volatile, HistoryDepth: 1, }
DefaultQoS is BestEffort + Volatile with implementation-default history.
var ErrClosed = errors.New("dds: entity is closed")
ErrClosed is returned when an operation is attempted on a closed entity.
var ErrTopicEmpty = errors.New("dds: topic name must not be empty")
ErrTopicEmpty is returned when an empty topic string is passed.
var ReliableQoS = QoS{ Reliability: Reliable, Durability: TransientLocal, HistoryDepth: 1, }
ReliableQoS is Reliable + TransientLocal. Use for actuator commands and any topic where a late-joining subscriber must receive the current value.
Functions ¶
This section is empty.
Types ¶
type Domain ¶
type Domain int
Domain is a DDS domain identifier (0–232 inclusive per the DDS spec). Participants on the same domain and network segment discover each other automatically without a broker.
type DurabilityKind ¶
type DurabilityKind int
DurabilityKind controls whether late-joining subscribers receive historical samples that were published before they joined.
const ( // Volatile discards samples as soon as they are delivered. Volatile DurabilityKind = iota // TransientLocal retains the last N samples so that late joiners // receive current state on subscription. TransientLocal )
type Metrics ¶ added in v0.3.0
type Metrics struct {
WriteCount uint64
DeliverCount uint64
DropCount uint64
BytesWritten uint64
BytesDelivered uint64
}
Metrics holds cumulative statistics for a participant.
type MetricsProvider ¶ added in v0.3.0
type MetricsProvider interface {
Metrics() Metrics
}
MetricsProvider is implemented by participants that expose runtime statistics.
type Participant ¶
type Participant interface {
// NewPublisher creates a writer for the named topic using the given QoS.
NewPublisher(topic string, qos QoS) (Publisher, error)
// NewSubscriber creates a reader for the named topic using the given QoS.
// Optional SubscriberOption values configure content filtering and other
// per-subscriber policies.
NewSubscriber(topic string, qos QoS, opts ...SubscriberOption) (Subscriber, error)
// Close releases all DDS resources held by this participant.
Close() error
}
Participant is the DDS domain participant — the root factory for all DDS entities. Create one per process per domain. A Participant is safe for concurrent use from multiple goroutines.
type Publisher ¶
Publisher writes samples to a single DDS topic. A Publisher is safe for concurrent use from multiple goroutines.
type QoS ¶
type QoS struct {
Reliability ReliabilityKind
Durability DurabilityKind
HistoryDepth int // 0 means implementation default (typically 1)
Deadline time.Duration // 0 = disabled; publisher fires DeadlineCallback if no Write within this period
}
QoS bundles the policies that govern a single publisher or subscriber endpoint.
type ReliabilityKind ¶
type ReliabilityKind int
ReliabilityKind controls delivery guarantees for a topic endpoint.
const ( // BestEffort delivers samples without retransmission. Suitable for // high-frequency sensor data where occasional loss is acceptable. BestEffort ReliabilityKind = iota // Reliable retransmits lost samples until acknowledged. Required for // command/control and actuator writes. Reliable )
type Subscriber ¶
Subscriber reads samples from a single DDS topic as a Go channel. A Subscriber is safe for concurrent use from multiple goroutines.
type SubscriberConfig ¶ added in v0.3.0
SubscriberConfig holds per-subscriber options applied at construction time. It is exported so that implementation packages (mock, rtps, cyclone) can read the resolved configuration without duplicating the option-merge logic.
func ApplySubscriberOpts ¶ added in v0.3.0
func ApplySubscriberOpts(opts []SubscriberOption) SubscriberConfig
ApplySubscriberOpts merges a slice of SubscriberOption into a SubscriberConfig.
type SubscriberOption ¶ added in v0.3.0
type SubscriberOption func(*SubscriberConfig)
SubscriberOption configures a subscriber at creation time.
func WithFilter ¶ added in v0.3.0
func WithFilter(fn func(Sample) bool) SubscriberOption
WithFilter returns a SubscriberOption that applies fn as a content filter. Only samples for which fn returns true are delivered to the subscriber's channel; non-matching samples are discarded silently.
type WaitSet ¶
type WaitSet struct {
// contains filtered or unexported fields
}
WaitSet multiplexes over a set of subscribers, blocking until any one of them delivers a sample.
func NewWaitSet ¶
func NewWaitSet(subs ...Subscriber) *WaitSet
NewWaitSet creates a WaitSet that monitors the given subscribers.
Example ¶
ExampleNewWaitSet demonstrates multiplexing over two subscribers without a polling loop. The WaitSet blocks until any attached subscriber delivers a sample, then returns it together with the subscriber it arrived on.
package main
import (
"context"
"fmt"
"time"
dds "github.com/SoundMatt/go-DDS"
"github.com/SoundMatt/go-DDS/mock"
)
func main() {
p, _ := mock.New(dds.Domain(0))
defer p.Close()
subTemp, _ := p.NewSubscriber("sensors/temp", dds.DefaultQoS)
subSpeed, _ := p.NewSubscriber("vehicle/speed", dds.DefaultQoS)
defer subTemp.Close()
defer subSpeed.Close()
pubTemp, _ := p.NewPublisher("sensors/temp", dds.DefaultQoS)
defer pubTemp.Close()
_ = pubTemp.Write([]byte("21.5"))
ws := dds.NewWaitSet(subTemp, subSpeed)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
sample, sub, err := ws.Wait(ctx)
if err != nil {
fmt.Println("error:", err)
return
}
switch sub {
case subTemp:
fmt.Println("temp:", string(sample.Payload))
case subSpeed:
fmt.Println("speed:", string(sample.Payload))
}
}
Output: temp: 21.5
Directories
¶
| Path | Synopsis |
|---|---|
|
Package cyclone provides a CycloneDDS-backed implementation of the dds interfaces via CGo.
|
Package cyclone provides a CycloneDDS-backed implementation of the dds interfaces via CGo. |
|
Package interop contains RTPS wire-compatibility tests that require a live CycloneDDS peer.
|
Package interop contains RTPS wire-compatibility tests that require a live CycloneDDS peer. |
|
Package mock provides an in-process, CGo-free implementation of the dds interfaces.
|
Package mock provides an in-process, CGo-free implementation of the dds interfaces. |
|
Package monitor provides a real-time web dashboard for a DDS participant.
|
Package monitor provides a real-time web dashboard for a DDS participant. |
|
Package rtps provides a pure-Go RTPS/UDP implementation of the dds interfaces.
|
Package rtps provides a pure-Go RTPS/UDP implementation of the dds interfaces. |
|
Package security provides pluggable transport-security for go-DDS.
|
Package security provides pluggable transport-security for go-DDS. |