client

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2025 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package client exposes a high-level libfabric client abstraction layered on top of the fi primitives.

Index

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("libfabric client: closed")

ErrClosed indicates the client has already been closed.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client owns the resources necessary to perform message send/receive operations.

func Connect

func Connect(cfg Config) (*Client, error)

Connect dials a MSG endpoint exposed by a listener using the supplied configuration.

func Dial

func Dial(cfg Config) (*Client, error)

Dial discovers a compatible provider and prepares the client resources.

func (*Client) Close

func (c *Client) Close() error

Close releases the underlying libfabric resources.

func (*Client) DefaultPeer

func (c *Client) DefaultPeer() fi.Address

DefaultPeer returns the currently configured destination address for automatic sends.

func (*Client) LocalAddress

func (c *Client) LocalAddress() ([]byte, error)

LocalAddress returns the provider-specific address bytes for the client's endpoint.

func (*Client) Receive

func (c *Client) Receive(ctx context.Context, buf []byte) (int, error)

Receive posts a blocking receive, filling buf once the completion resolves.

func (*Client) ReceiveAsync

func (c *Client) ReceiveAsync(buf []byte) (*ReceiveFuture, error)

ReceiveAsync posts a receive and returns a future that resolves when data arrives.

func (*Client) ReceiveFrom

func (c *Client) ReceiveFrom(ctx context.Context, buf []byte) (int, fi.Address, error)

ReceiveFrom behaves like Receive but also returns the peer address.

func (*Client) RegisterPeer

func (c *Client) RegisterPeer(addr []byte, setDefault bool) (fi.Address, error)

RegisterPeer inserts the supplied provider address into the client's address vector. When setDefault is true, subsequent calls to Send/SendAsync target the peer automatically.

func (*Client) RegisterReceiveHandler

func (c *Client) RegisterReceiveHandler(handler ReceiveHandler) func()

RegisterReceiveHandler installs a callback invoked for every completed receive. The returned function unregisters the handler when invoked. Passing a nil handler is a no-op.

func (*Client) RegisterSendHandler

func (c *Client) RegisterSendHandler(handler SendHandler) func()

RegisterSendHandler installs a callback invoked for every completed send. The returned function unregisters the handler when invoked. Passing a nil handler is a no-op.

func (*Client) Send

func (c *Client) Send(ctx context.Context, payload []byte) error

Send posts a blocking send operation using the configured timeout when the supplied context lacks a deadline.

func (*Client) SendAsync

func (c *Client) SendAsync(payload []byte) (*SendFuture, error)

SendAsync posts a send and returns a future that resolves when libfabric reports completion.

func (*Client) SendTo

func (c *Client) SendTo(ctx context.Context, dest fi.Address, payload []byte) error

SendTo transmits payload to the specified destination address.

func (*Client) SendToAsync

func (c *Client) SendToAsync(payload []byte, dest fi.Address) (*SendFuture, error)

SendToAsync posts a send targeted at the provided destination address.

func (*Client) SetDefaultPeer

func (c *Client) SetDefaultPeer(addr fi.Address)

SetDefaultPeer configures the destination address used by Send/SendAsync for RDM endpoints.

func (*Client) Stats

func (c *Client) Stats() Stats

Stats returns a snapshot of client counters.

type Config

type Config struct {
	Provider         string
	EndpointType     fi.EndpointType
	Timeout          time.Duration
	MRPoolSize       int
	MRPoolCapacity   int
	MRPoolAccess     fi.MRAccessFlag
	Node             string
	Service          string
	Logger           Logger
	StructuredLogger StructuredLogger
	Tracer           Tracer
	Metrics          MetricHook
}

Config controls Dial behaviour for the high-level Client.

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

Listener accepts MSG connections and converts them into high-level Clients.

func Listen

func Listen(cfg ListenerConfig) (*Listener, error)

Listen prepares a MSG listener using the provided configuration.

func (*Listener) Accept

func (l *Listener) Accept(ctx context.Context) (*Client, error)

Accept waits for the next MSG connection request and returns a fully initialised Client.

func (*Listener) Addr

func (l *Listener) Addr() ([]byte, error)

Addr returns the bound provider address for the listener.

func (*Listener) Close

func (l *Listener) Close() error

Close releases listener resources.

type ListenerConfig

type ListenerConfig struct {
	Provider         string
	Node             string
	Service          string
	Logger           Logger
	StructuredLogger StructuredLogger
	Tracer           Tracer
	Metrics          MetricHook
}

ListenerConfig controls MSG listener setup.

type Logger

type Logger interface {
	Debugf(format string, args ...any)
}

Logger provides structured debug logging hooks for the client.

type MetricHook

type MetricHook interface {
	DispatcherStarted(attrs map[string]string)
	DispatcherStopped(attrs map[string]string)
	DispatcherCQError(kind string, err error, attrs map[string]string)
	SendCompleted(attrs map[string]string)
	SendFailed(err error, attrs map[string]string)
	ReceiveCompleted(attrs map[string]string)
	ReceiveFailed(err error, attrs map[string]string)
}

MetricHook captures dispatcher telemetry events.

type OTelMetrics

type OTelMetrics struct {
	// contains filtered or unexported fields
}

OTelMetrics implements MetricHook using OpenTelemetry counters.

func NewOTelMetrics

func NewOTelMetrics(opts OTelMetricsOptions) (*OTelMetrics, error)

NewOTelMetrics constructs a MetricHook that emits OpenTelemetry counter measurements.

func (*OTelMetrics) DispatcherCQError

func (o *OTelMetrics) DispatcherCQError(kind string, _ error, attrs map[string]string)

DispatcherCQError counts completion queue errors observed by the dispatcher.

func (*OTelMetrics) DispatcherStarted

func (o *OTelMetrics) DispatcherStarted(attrs map[string]string)

DispatcherStarted records that the dispatcher loop has started executing.

func (*OTelMetrics) DispatcherStopped

func (o *OTelMetrics) DispatcherStopped(attrs map[string]string)

DispatcherStopped records that the dispatcher loop has exited.

func (*OTelMetrics) ReceiveCompleted

func (o *OTelMetrics) ReceiveCompleted(attrs map[string]string)

ReceiveCompleted records a successful receive completion.

func (*OTelMetrics) ReceiveFailed

func (o *OTelMetrics) ReceiveFailed(_ error, attrs map[string]string)

ReceiveFailed records a failed receive completion.

func (*OTelMetrics) SendCompleted

func (o *OTelMetrics) SendCompleted(attrs map[string]string)

SendCompleted records a successful send completion.

func (*OTelMetrics) SendFailed

func (o *OTelMetrics) SendFailed(_ error, attrs map[string]string)

SendFailed records a failed send completion.

type OTelMetricsOptions

type OTelMetricsOptions struct {
	MeterProvider          metric.MeterProvider
	Meter                  metric.Meter
	InstrumentationName    string
	InstrumentationVersion string
}

OTelMetricsOptions configures NewOTelMetrics.

type OperationError

type OperationError struct {
	Kind        OperationKind
	Errno       fi.Errno
	ProviderErr int
	Flags       uint64
	Length      uint64
	Data        uint64
	Tag         uint64
}

OperationError exposes detailed completion error information surfaced by libfabric.

func (OperationError) Error

func (e OperationError) Error() string

func (OperationError) Unwrap

func (e OperationError) Unwrap() error

Unwrap allows errors.Is / errors.As to match against the underlying Errno.

type OperationKind

type OperationKind int

OperationKind identifies the type of libfabric operation tracked by a future.

const (
	// OperationSend identifies send operations tracked by OperationKind.
	OperationSend OperationKind = iota
	// OperationReceive identifies receive operations tracked by OperationKind.
	OperationReceive
)

func (OperationKind) String

func (k OperationKind) String() string

type PrometheusMetrics

type PrometheusMetrics struct {
	// contains filtered or unexported fields
}

PrometheusMetrics implements MetricHook using Prometheus counters.

func NewPrometheusMetrics

func NewPrometheusMetrics(opts PrometheusMetricsOptions) (*PrometheusMetrics, error)

NewPrometheusMetrics constructs a MetricHook backed by Prometheus counters.

func (*PrometheusMetrics) DispatcherCQError

func (p *PrometheusMetrics) DispatcherCQError(kind string, _ error, attrs map[string]string)

DispatcherCQError counts completion queue errors observed by the dispatcher.

func (*PrometheusMetrics) DispatcherStarted

func (p *PrometheusMetrics) DispatcherStarted(attrs map[string]string)

DispatcherStarted records that the dispatcher loop has started executing.

func (*PrometheusMetrics) DispatcherStopped

func (p *PrometheusMetrics) DispatcherStopped(attrs map[string]string)

DispatcherStopped records that the dispatcher loop has exited.

func (*PrometheusMetrics) ReceiveCompleted

func (p *PrometheusMetrics) ReceiveCompleted(attrs map[string]string)

ReceiveCompleted records a successful receive completion.

func (*PrometheusMetrics) ReceiveFailed

func (p *PrometheusMetrics) ReceiveFailed(_ error, attrs map[string]string)

ReceiveFailed records a failed receive completion.

func (*PrometheusMetrics) SendCompleted

func (p *PrometheusMetrics) SendCompleted(attrs map[string]string)

SendCompleted records a successful send completion.

func (*PrometheusMetrics) SendFailed

func (p *PrometheusMetrics) SendFailed(_ error, attrs map[string]string)

SendFailed records a failed send completion.

type PrometheusMetricsOptions

type PrometheusMetricsOptions struct {
	Registerer  prometheus.Registerer
	Namespace   string
	Subsystem   string
	ConstLabels prometheus.Labels
}

PrometheusMetricsOptions configures NewPrometheusMetrics.

type ReceiveCompletion

type ReceiveCompletion struct {
	Payload []byte
	Source  fi.Address
	Err     error
}

ReceiveCompletion describes a completed receive operation delivered through a handler.

type ReceiveFuture

type ReceiveFuture struct {
	// contains filtered or unexported fields
}

ReceiveFuture tracks the completion of a posted receive operation.

func (*ReceiveFuture) Await

func (f *ReceiveFuture) Await(ctx context.Context) (int, error)

Await blocks until the receive resolves or the context is cancelled.

func (*ReceiveFuture) Buffer

func (f *ReceiveFuture) Buffer() []byte

Buffer returns the caller-provided buffer passed to ReceiveAsync.

func (*ReceiveFuture) Done

func (f *ReceiveFuture) Done() <-chan struct{}

Done exposes a channel that closes when the receive completes.

func (*ReceiveFuture) OnComplete

func (f *ReceiveFuture) OnComplete(fn func(int, error))

OnComplete registers a callback invoked asynchronously once data arrives.

func (*ReceiveFuture) Source

func (f *ReceiveFuture) Source() fi.Address

Source returns the address of the peer that produced the data, when available.

type ReceiveHandler

type ReceiveHandler func(ReceiveCompletion)

ReceiveHandler is invoked when a receive operation completes.

type SendCompletion

type SendCompletion struct {
	Size int
	Err  error
}

SendCompletion describes the outcome of a send operation dispatched through a handler.

type SendFuture

type SendFuture struct {
	// contains filtered or unexported fields
}

SendFuture tracks the completion of a posted send operation.

func (*SendFuture) Await

func (f *SendFuture) Await(ctx context.Context) error

Await blocks until the send operation completes or the context is cancelled.

func (*SendFuture) Done

func (f *SendFuture) Done() <-chan struct{}

Done exposes a channel that closes when the send operation resolves.

func (*SendFuture) OnComplete

func (f *SendFuture) OnComplete(fn func(error))

OnComplete registers a callback invoked asynchronously when the send resolves.

type SendHandler

type SendHandler func(SendCompletion)

SendHandler is invoked when a send operation completes.

type Span

type Span interface {
	End(err error)
	AddEvent(name string, attrs ...TraceAttribute)
	RecordError(err error)
}

Span records dispatcher lifecycle, events, and errors for tracing systems.

type Stats

type Stats struct {
	SendPosted     uint64
	SendCompleted  uint64
	SendErrored    uint64
	ReceivePosted  uint64
	ReceiveMatched uint64
	ReceiveErrored uint64
}

Stats contains counters for client operations.

type StructuredLogger

type StructuredLogger interface {
	Debugw(msg string, keyvals ...any)
}

StructuredLogger emits key/value pairs for structured logging backends.

type TraceAttribute

type TraceAttribute struct {
	Key   string
	Value any
}

TraceAttribute represents a tracing attribute attached to dispatcher spans or events.

type Tracer

type Tracer interface {
	StartSpan(name string, attrs ...TraceAttribute) Span
}

Tracer starts spans that wrap dispatcher activity.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL