channel

package module
v5.0.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2026 License: Apache-2.0 Imports: 30 Imported by: 0

README

Channel

channel is a binary message framework. It includes the following features:

  • Supports registering message type handlers
  • Supports request/response pattern as well as unidirectional message streams
  • Allows plugging observers to enable metrics and other use cases
  • Async or sync patterns
  • Protobufs supported but not required

Documentation

Overview

Example
package main

import (
	"fmt"
	"time"

	"github.com/openziti/channel/v5"
	"github.com/openziti/identity"
	"github.com/openziti/transport/v2/tcp"
)

func main() {
	addr, err := tcp.AddressParser{}.Parse("tcp:localhost:6565")
	if err != nil {
		panic(err)
	}
	dialId := &identity.TokenId{Token: "echo-client"}
	underlayFactory := channel.NewClassicDialer(channel.DialerConfig{Identity: dialId, Endpoint: addr})

	ch, err := channel.NewSingleChannel("echo-test", underlayFactory, nil, nil)
	if err != nil {
		panic(err)
	}

	helloMessageType := int32(256)
	msg := channel.NewMessage(helloMessageType, []byte("hello!"))

	// Can send the message on the channel. The call will return once the message is queued
	if err := ch.Send(msg); err != nil {
		panic(err)
	}

	// Can also have the message send itself on the channel
	if err := msg.Send(ch); err != nil {
		panic(err)
	}

	// Can set a timeout before sending. If the message can't be queued before the timeout, an error will be returned
	// If the timeout expires before the message can be sent, the message won't be sent
	if err := msg.WithTimeout(time.Second).Send(ch); err != nil {
		panic(err)
	}

	// Can set a timeout before sending and wait for the message to be written to the wire. If the timeout expires
	// before the message is sent, the message won't be sent and a TimeoutError will be returned
	if err := msg.WithTimeout(time.Second).SendAndWaitForWire(ch); err != nil {
		panic(err)
	}

	// Can set a timeout before sending and waiting for a reply message. If the timeout expires before the message is
	// sent, the message won't be sent and a TimeoutError will be returned. If the timeout expires before the reply
	// arrives a TimeoutError will be returned.
	reply, err := msg.WithTimeout(time.Second).SendForReply(ch)
	if err != nil {
		panic(err)
	}
	fmt.Println(string(reply.Body))
}

Index

Examples

Constants

View Source
const (
	DefaultOutstandingConnects = 16
	DefaultQueuedConnects      = 1
	DefaultConnectTimeout      = 5 * time.Second

	MinQueuedConnects      = 1
	MinOutstandingConnects = 1
	MinConnectTimeout      = 30 * time.Millisecond

	MaxQueuedConnects      = 5000
	MaxOutstandingConnects = 1000
	MaxConnectTimeout      = 60000 * time.Millisecond

	// DefaultOutQueueSize is the default capacity of the outgoing message queue.
	DefaultOutQueueSize = 4
)

Connection and queue size defaults, minimums, and maximums.

View Source
const (
	DefaultHeartbeatSendInterval  = 10 * time.Second
	DefaultHeartbeatCheckInterval = time.Second
	DefaultHeartbeatTimeout       = 30 * time.Second
)

Heartbeat timing defaults.

View Source
const (
	ConnectionIdHeader              = 0
	ReplyForHeader                  = 1
	ResultSuccessHeader             = 2
	HelloRouterAdvertisementsHeader = 3
	HelloVersionHeader              = 4
	HeartbeatHeader                 = 5
	HeartbeatResponseHeader         = 6
	TypeHeader                      = 7
	IdHeader                        = 8
	IsGroupedHeader                 = 9
	GroupSecretHeader               = 10
	IsFirstGroupConnection          = 11
	UnderlayTypeHeader              = 12

	// Headers in the range 128-255 inclusive will be reflected when creating replies
	ReflectedHeaderBitMask = 1 << 7
	MaxReflectedHeader     = (1 << 8) - 1
)

*

  • Message headers notes
  • 0-127 reserved for channel
  • 128-255 reserved for headers that need to be reflected back to sender on responses
  • 128 is used for a message UUID for tracing
  • 1000-1099 reserved for edge messages
  • 1100-1199 is reserved for control plane messages
  • 2000-2500 is reserved for xgress messages
  • 2000-2255 is reserved for xgress implementation headers
View Source
const (
	ContentTypeHelloType           = 0
	ContentTypePingType            = 1
	ContentTypeResultType          = 2
	ContentTypeLatencyType         = 3
	ContentTypeLatencyResponseType = 4
	ContentTypeHeartbeat           = 5
	ContentTypeRaw                 = 6
)

Built-in message content types used by the channel protocol.

View Source
const AnyContentType = -1

AnyContentType is a wildcard content type used to register a fallback receive handler.

View Source
const BadMagicNumberError = stringError("protocol error: invalid header")
View Source
const DECODER = "channel"

DECODER is the decoder name used in trace output for built-in channel messages.

View Source
const DecoderFieldName = "__decoder__"

DecoderFieldName is the JSON key for the decoder name in trace output.

View Source
const (

	// DefaultUnderlayType is used when an underlay's type header is missing or not in the valid types list.
	DefaultUnderlayType = "default"
)
View Source
const HelloSequence = -1

HelloSequence is the sequence number used for hello/handshake messages.

View Source
const MessageFieldName = "__message__"

MessageFieldName is the JSON key for the message type name in trace output.

Variables

View Source
var DefaultBackoffConfig = BackoffConfig{
	BaseDelay:         2 * time.Second,
	MaxDelay:          time.Minute,
	MinStableDuration: 30 * time.Second,
}

DefaultBackoffConfig provides sensible defaults for BackoffConfig.

View Source
var ErrClosed = errors.New("channel closed")

ErrClosed is returned when a read is attempted on a closed ReadAdapter.

View Source
var ListenerClosedError = listenerClosedError{}

ListenerClosedError is returned when an operation is attempted on a closed listener.

Functions

func AddReceiveHandlers added in v5.0.9

func AddReceiveHandlers(binding Binding, handlers ...ContentTypeReceiver)

AddReceiveHandlers registers receive handlers which provide their own content type.

func DecodeString

func DecodeString(t string, b []byte) ([]byte, string, error)

DecodeString reads a length-prefixed string from a byte slice, returning the remaining bytes and the string.

func DecodeStringSlice

func DecodeStringSlice(b []byte) ([]string, error)

DecodeStringSlice decodes a binary-encoded string slice.

func DecodeStringToStringMap

func DecodeStringToStringMap(b []byte) (map[string]string, error)

DecodeStringToStringMap decodes a binary-encoded string-to-string map.

func DecodeU32ToBytesMap

func DecodeU32ToBytesMap(b []byte) (map[uint32][]byte, error)

DecodeU32ToBytesMap decodes a binary-encoded uint32-to-bytes map.

func EncodeStringSlice

func EncodeStringSlice(strSlice []string) []byte

EncodeStringSlice encodes a string slice into a length-prefixed binary format.

func EncodeStringToStringMap

func EncodeStringToStringMap(m map[string]string) []byte

EncodeStringToStringMap encodes a string-to-string map into a length-prefixed binary format.

func EncodeU32ToBytesMap

func EncodeU32ToBytesMap(m map[uint32][]byte) []byte

EncodeU32ToBytesMap encodes a uint32-to-bytes map into a length-prefixed binary format.

func GetRetryVersion

func GetRetryVersion(err error) (uint32, bool)

GetRetryVersion extracts a compatible protocol version from an UnsupportedVersionError, if present.

func GetUnderlayType

func GetUnderlayType(underlay Underlay) string

GetUnderlayType returns the underlay type from the headers. If no type header is present, returns DefaultUnderlayType.

func IsNonRetryable added in v5.0.13

func IsNonRetryable(err error) bool

IsNonRetryable reports whether err (or any error it wraps) is a NonRetryableError.

func IsTimeout

func IsTimeout(err error) bool

IsTimeout returns true if the error is or wraps a TimeoutError.

func MarshalV2

func MarshalV2(m *Message) ([]byte, error)

MarshalV2 converts a *Message into a block of V2 wire format data.

func MarshalV2WithRaw

func MarshalV2WithRaw(m *Message) ([]byte, error)

MarshalV2WithRaw marshals a message using V2 format, except for ContentTypeRaw messages which are sent as raw body bytes without framing.

func NewClassicListenerF

func NewClassicListenerF(identity *identity.TokenId, endpoint transport.Address, config ListenerConfig, f func(underlay Underlay)) (io.Closer, error)

NewClassicListenerF creates a classic listener that calls f for each accepted underlay. f is invoked after the hello is acknowledged; use NewClassicListenerWithAcceptor for an acceptor that needs to record state before the acknowledgement.

func NewClassicListenerWithAcceptor added in v5.0.11

func NewClassicListenerWithAcceptor(identity *identity.TokenId, endpoint transport.Address, config ListenerConfig, acceptor HelloAcceptor) (io.Closer, error)

NewClassicListenerWithAcceptor creates a classic listener that hands each accepted underlay to the given HelloAcceptor, which controls when the hello is acknowledged.

func NewErrorContext

func NewErrorContext(err error) context.Context

func NewWSListener

func NewWSListener(peer transport.Conn) *wsListener

NewWSListener creates a WebSocket listener from an existing transport connection.

func NextConnectionId

func NextConnectionId() (string, error)

NextConnectionId returns a new unique connection identifier.

func RetryToQueues

func RetryToQueues(queues ...*MessageQueue) func(string, Sendable) bool

RetryToQueues returns a HandleTxFailed function that tries to push the sendable to each queue in order, returning true if any accepts it without blocking.

func SetLoggerFor added in v5.0.12

func SetLoggerFor(f func(name string) *slog.Logger)

SetLoggerFor installs the resolver that channel uses for events scoped to a channel's LogicalName (e.g. "ctrl", "link", "agent"), letting an embedding application route channel logging through its own slog setup and control verbosity per channel type. When no resolver is installed, channel falls back to a default logger that forwards to pfxlog/logrus, preserving the logging behavior channel had before this seam existed.

Set this once during startup, before any channels are created. Each channel resolves and caches its logger at creation, so installing a resolver after channels already exist applies only to channels created afterward.

func WriteUnknownVersionResponse

func WriteUnknownVersionResponse(writer io.Writer)

WriteUnknownVersionResponse sends a version negotiation response listing locally supported versions.

Types

type AsyncFunctionReceiveAdapter added in v5.0.9

type AsyncFunctionReceiveAdapter struct {
	Type    int32
	Handler ReceiveHandlerF
}

AsyncFunctionReceiveAdapter is a ContentTypeReceiver which handles each message in a new goroutine, for handlers which may block or run long. Receive handlers are otherwise invoked on the channel's receive loop.

func (*AsyncFunctionReceiveAdapter) ContentType added in v5.0.9

func (adapter *AsyncFunctionReceiveAdapter) ContentType() int32

ContentType returns the message content type this handler processes.

func (*AsyncFunctionReceiveAdapter) HandleReceive added in v5.0.9

func (adapter *AsyncFunctionReceiveAdapter) HandleReceive(m *Message, ch Channel)

HandleReceive dispatches the message to the wrapped handler in a new goroutine.

type BackoffConfig

type BackoffConfig struct {
	// BaseDelay is the initial delay after the first failure (default: 2s).
	BaseDelay time.Duration
	// MaxDelay is the maximum delay between attempts (default: 60s).
	MaxDelay time.Duration
	// MinStableDuration is how long a connection must live to be considered stable (default: 30s).
	// An underlay that closes before living this long is treated as short-lived and counts as
	// a failure for backoff purposes.
	MinStableDuration time.Duration
	// MinDialInterval is the minimum time between dial attempts (default: 0, no limit).
	// If a dial is requested before this interval has elapsed since the last dial,
	// the goroutine sleeps until the interval is satisfied.
	MinDialInterval time.Duration
}

BackoffConfig controls exponential backoff behavior.

type BackoffDialPolicy

type BackoffDialPolicy struct {
	Dialer  DialUnderlayFactory
	Backoff BackoffConfig
	// contains filtered or unexported fields
}

BackoffDialPolicy wraps a DialUnderlayFactory with exponential backoff retry logic. Failed dials and short-lived connections (reported via UnderlayClosed) count as failures; a connection that closes after a stable lifetime, or one that has demonstrably outlived MinStableDuration, resets the failure count. Accounting is intentionally global for the policy because all underlays dial the same destination. A stable underlay is evidence that the destination is not globally flapping.

func NewBackoffDialPolicy

func NewBackoffDialPolicy(dialer DialUnderlayFactory) *BackoffDialPolicy

NewBackoffDialPolicy creates a BackoffDialPolicy with default configuration.

func NewBackoffDialPolicyWithConfig

func NewBackoffDialPolicyWithConfig(dialer DialUnderlayFactory, config BackoffConfig) *BackoffDialPolicy

NewBackoffDialPolicyWithConfig creates a BackoffDialPolicy with the given configuration.

func (*BackoffDialPolicy) ConsecutiveFailures

func (self *BackoffDialPolicy) ConsecutiveFailures() uint32

ConsecutiveFailures returns the current consecutive failure count.

func (*BackoffDialPolicy) Dial

func (self *BackoffDialPolicy) Dial(underlayType string, connectionId string, groupSecret []byte, isFirst bool, connectTimeout time.Duration, cancel <-chan struct{}) (Underlay, error)

Dial attempts to create a new underlay, applying exponential backoff if previous attempts failed. When isFirst is true the dial (re)establishes the group: it advances the iteration, derives a fresh iteration-suffixed connection id, and sets the IsFirstGroupConnection header so the remote MultiListener creates a new channel. A fresh id avoids attaching to a still-closing channel of the prior iteration during a loss/reconnect race. Subsequent (isFirst == false) dials reuse the current iteration id with no header, so they attach to the established group.

func (*BackoffDialPolicy) LastDialTime

func (self *BackoffDialPolicy) LastDialTime() time.Time

LastDialTime returns the time of the most recent dial attempt.

func (*BackoffDialPolicy) UnderlayClosed added in v5.0.7

func (self *BackoffDialPolicy) UnderlayClosed(underlayType string, lifetime time.Duration)

UnderlayClosed records the lifetime of a closed underlay. A short-lived underlay (lifetime < MinStableDuration) counts as a failure for backoff purposes; a stable one resets the failure count.

type BaseSendListener

type BaseSendListener struct{}

BaseSendListener is a type that may be used to provide default methods for SendListener implementation

func (BaseSendListener) NotifyAfterWrite

func (BaseSendListener) NotifyAfterWrite()

func (BaseSendListener) NotifyBeforeWrite

func (BaseSendListener) NotifyBeforeWrite()

func (BaseSendListener) NotifyErr

func (BaseSendListener) NotifyErr(error)

func (BaseSendListener) NotifyQueued

func (BaseSendListener) NotifyQueued()

type BaseSendable

type BaseSendable struct{}

BaseSendable is a type that may be used to provide default methods for Sendable implementation

func (BaseSendable) Context

func (BaseSendable) Context() context.Context

func (BaseSendable) Msg

func (BaseSendable) Msg() *Message

func (BaseSendable) ReplyReceiver

func (BaseSendable) ReplyReceiver() ReplyReceiver

func (BaseSendable) SendListener

func (BaseSendable) SendListener() SendListener

type BindHandler

type BindHandler interface {
	BindChannel(binding Binding) error
}

BindHandler is a handler that configures a Channel via its Binding during setup.

func BindHandlers

func BindHandlers(handlers ...BindHandler) BindHandler

BindHandlers combines multiple BindHandlers into one, running them sequentially. Returns the first error encountered, or nil if all succeed.

type BindHandlerF

type BindHandlerF func(binding Binding) error

BindHandlerF is the function form of BindHandler.

func (BindHandlerF) BindChannel

func (f BindHandlerF) BindChannel(binding Binding) error

type Binder added in v5.0.6

type Binder struct {
	// contains filtered or unexported fields
}

Binder is an installed bind handler, produced by MakeBinder or MakeTypedBinder and supplied to a channel via Config.Binder. It is opaque: consumers obtain one from the factory functions rather than constructing it directly.

func MakeBinder

func MakeBinder(handler BindHandler) Binder

MakeBinder creates a Binder from a BindHandler.

func MakeTypedBinder

func MakeTypedBinder[S Senders](senders S, handler TypedBindHandler[S]) Binder

MakeTypedBinder creates a Binder from a TypedBindHandler and senders.

type Binding

type Binding interface {
	AddReceiveHandler(contentType int32, h ReceiveHandler)
	AddReceiveHandlerF(contentType int32, h ReceiveHandlerF)
	AddMsgReceiveHandler(contentType int32, h MsgReceiveHandler)
	AddMsgReceiveHandlerF(contentType int32, h MsgReceiveHandlerF)
	AddPeekHandler(h PeekHandler)
	AddTransformHandler(h TransformHandler)
	AddErrorHandler(h ErrorHandler)
	AddCloseHandler(h CloseHandler)
	SetUserData(data any)
	GetUserData() any
	GetChannel() Channel
}

Binding is used to add handlers to Channel.

NOTE: It is intended that the Add* methods are used at initial channel setup, and not invoked on an in-service Channel. The Binding should not be retained once the channel setup is complete

type Channel

type Channel interface {
	Identity
	SetLogicalName(logicalName string)
	Sender
	UnderlayAcceptor
	io.Closer
	IsClosed() bool
	Underlay() Underlay
	Headers() map[int32][]byte
	GetTimeSinceLastRead() time.Duration
	GetUserData() interface{}
	// GetSenders returns the Senders for this channel. Consumers that need access to
	// priority-specific senders can type-assert the result to their concrete type:
	//
	//     mySenders := ch.GetSenders().(*MyCustomSenders)
	//     mySenders.GetHighPrioritySender().Send(msg)
	//
	// This replaces v4's MultiChannel.GetUnderlayHandler() pattern.
	GetSenders() Senders
	GetUnderlays() []Underlay
	GetUnderlayCountsByType() map[string]int
}

Channel represents an asynchronous, message-passing framework, designed to sit on top of an underlay.

func NewChannel

func NewChannel(config *Config) (Channel, error)

NewChannel creates a multi-underlay channel from the given configuration. The config must include Senders, a MessageSourceProvider, and an initial Underlay. An optional Binder is called to register handlers before the first underlay starts processing.

func NewSingleChannel

func NewSingleChannel(logicalName string, underlayFactory UnderlayFactory, bindHandler BindHandler, options *Options) (Channel, error)

NewSingleChannel dials the factory and creates a simple channel with a single underlay, single sender, and the given bind handler.

func NewSingleChannelWithUnderlay

func NewSingleChannelWithUnderlay(logicalName string, underlay Underlay, bindHandler BindHandler, options *Options) (Channel, error)

NewSingleChannelWithUnderlay creates a simple channel from an existing underlay, with a single sender and bind handler. Use this when you already have a connected underlay (e.g. from a listener).

type CloseHandler

type CloseHandler interface {
	HandleClose(ch Channel)
}

CloseHandler is notified when a channel closes.

type CloseHandlerF

type CloseHandlerF func(ch Channel)

CloseHandlerF is the function form of CloseHandler.

func (CloseHandlerF) HandleClose

func (self CloseHandlerF) HandleClose(ch Channel)

type CloseNotifier

type CloseNotifier struct {
	// contains filtered or unexported fields
}

CloseNotifier provides a one-shot close signal. Calling NotifyClosed closes the internal channel, unblocking any goroutines waiting on GetCloseNotify.

func NewCloseNotifier

func NewCloseNotifier() *CloseNotifier

NewCloseNotifier creates a new CloseNotifier.

func (*CloseNotifier) GetCloseNotify

func (self *CloseNotifier) GetCloseNotify() <-chan struct{}

func (*CloseNotifier) NotifyClosed

func (self *CloseNotifier) NotifyClosed()

type ClosedError

type ClosedError struct{}

ClosedError indicates an operation was attempted on a closed channel.

func (ClosedError) Error

func (ClosedError) Error() string

type Config

type Config struct {
	LogicalName string
	Options     *Options
	Binder      Binder
	Underlay    Underlay

	InjectUnderlayTypeIntoMessages bool

	// ValidUnderlayTypes lists the recognized underlay type strings for this channel.
	// Incoming underlays with types not in this list are mapped to DefaultUnderlayType.
	// If nil, any type string is accepted as-is.
	ValidUnderlayTypes []string

	Senders               Senders
	MessageSourceProvider MessageSourceProvider
	DialPolicy            DialPolicy
	Constraints           map[string]UnderlayConstraint
	MinTotalUnderlays     int

	// ConstraintStartupDelay delays the first constraint check after channel creation.
	// Useful when the initial underlay needs time to stabilize before additional
	// underlays are dialed.
	ConstraintStartupDelay time.Duration

	// UnderlayEventListeners are notified when underlays are added or removed.
	// Use this to react to underlay changes (e.g., tracking connection counts,
	// updating "has dedicated underlay" flags, calling change callbacks).
	UnderlayEventListeners []UnderlayEventListener
}

Config holds all the parameters needed to create a new Channel.

type ConnectOptions

type ConnectOptions struct {
	MaxQueuedConnects      int
	MaxOutstandingConnects int
	ConnectTimeout         time.Duration
}

ConnectOptions controls connection acceptance limits and timeouts.

func DefaultConnectOptions

func DefaultConnectOptions() ConnectOptions

DefaultConnectOptions returns ConnectOptions with sensible defaults.

func (*ConnectOptions) Validate

func (co *ConnectOptions) Validate() error

Validate checks that all connect options are within acceptable bounds.

type ConnectionHandler

type ConnectionHandler interface {
	HandleConnection(hello *Hello, certificates []*x509.Certificate) error
}

ConnectionHandler handles new incoming connections during the hello/handshake phase.

type ContentTypeReceiver added in v5.0.9

type ContentTypeReceiver interface {
	ContentType() int32
	ReceiveHandler
}

ContentTypeReceiver is a receive handler which reports the content type it handles. It allows handlers to be registered without separately specifying the content type, using AddReceiveHandlers.

type DatagramMessageStrategy

type DatagramMessageStrategy PacketMessageProducer

DatagramMessageStrategy is a MessageStrategy that uses MarshalV2WithRaw for marshalling and a custom PacketMessageProducer for reading.

func (DatagramMessageStrategy) GetMarshaller

func (self DatagramMessageStrategy) GetMarshaller() MessageMarshaller

func (DatagramMessageStrategy) GetPacketProducer

func (self DatagramMessageStrategy) GetPacketProducer() PacketMessageProducer

func (DatagramMessageStrategy) GetStreamProducer

func (self DatagramMessageStrategy) GetStreamProducer() StreamMessageProducer

type DatagramUnderlay

type DatagramUnderlay struct {
	// contains filtered or unexported fields
}

DatagramUnderlay is an Underlay implementation for packet-oriented (datagram) transports like DTLS.

func (*DatagramUnderlay) Certificates

func (self *DatagramUnderlay) Certificates() []*x509.Certificate

func (*DatagramUnderlay) Close

func (self *DatagramUnderlay) Close() error

func (*DatagramUnderlay) ConnectionId

func (self *DatagramUnderlay) ConnectionId() string

func (*DatagramUnderlay) CreatedAt added in v5.0.7

func (self *DatagramUnderlay) CreatedAt() time.Time

func (*DatagramUnderlay) GetLocalAddr

func (self *DatagramUnderlay) GetLocalAddr() net.Addr

func (*DatagramUnderlay) GetRemoteAddr

func (self *DatagramUnderlay) GetRemoteAddr() net.Addr

func (*DatagramUnderlay) Headers

func (self *DatagramUnderlay) Headers() map[int32][]byte

func (*DatagramUnderlay) Id

func (self *DatagramUnderlay) Id() string

func (*DatagramUnderlay) IsClosed

func (self *DatagramUnderlay) IsClosed() bool

func (*DatagramUnderlay) Label

func (self *DatagramUnderlay) Label() string

func (*DatagramUnderlay) LogicalName

func (self *DatagramUnderlay) LogicalName() string

func (*DatagramUnderlay) Rx

func (self *DatagramUnderlay) Rx() (*Message, error)

func (*DatagramUnderlay) SetWriteDeadline

func (self *DatagramUnderlay) SetWriteDeadline(deadline time.Time) error

func (*DatagramUnderlay) SetWriteTimeout

func (self *DatagramUnderlay) SetWriteTimeout(duration time.Duration) error

func (*DatagramUnderlay) Tx

func (self *DatagramUnderlay) Tx(m *Message) error

type Decoder

type Decoder struct{}

Decoder decodes built-in channel message types (hello, ping, result, latency, heartbeat) for tracing.

func (Decoder) Decode

func (d Decoder) Decode(msg *Message) ([]byte, bool)

type DialPolicy

type DialPolicy interface {
	Dial(underlayType string, connectionId string, groupSecret []byte, isFirst bool, connectTimeout time.Duration, cancel <-chan struct{}) (Underlay, error)
	UnderlayClosed(underlayType string, lifetime time.Duration)
}

DialPolicy controls how additional underlays are dialed for a multi-underlay channel. The channel passes all necessary context directly, so implementations are self-contained.

isFirst is true when the dial is (re)establishing the group's first underlay - either the initial connection or a reconnect after all underlays were lost. A first connection must set the IsFirstGroupConnection header so the remote MultiListener creates a new channel rather than rejecting the underlay; subsequent underlays attach to the existing group.

UnderlayClosed is invoked by the channel whenever an underlay is removed, with how long the underlay lived. This lets the policy judge connection stability directly, e.g. for flap detection, rather than inferring it from dial timing.

type DialUnderlayFactory

type DialUnderlayFactory interface {
	UnderlayFactory
	CreateWithHeaders(timeout time.Duration, headers map[int32][]byte) (Underlay, error)
}

DialUnderlayFactory extends UnderlayFactory with the ability to pass custom headers during creation. Used by DialPolicy to include connection metadata (type, connection ID, group secret) when dialing.

func NewClassicDialer

func NewClassicDialer(cfg DialerConfig) DialUnderlayFactory

NewClassicDialer creates a DialUnderlayFactory that dials using the classic transport protocol.

type DialerConfig

type DialerConfig struct {
	Identity        *identity.TokenId
	Endpoint        transport.Address
	LocalBinding    string
	Headers         map[int32][]byte
	MessageStrategy MessageStrategy
	TransportConfig transport.Configuration
	// HelloHeaderProvider, if set, is invoked after the transport connection is established and
	// before the hello is sent, to compute additional hello headers from the peer's certificates.
	HelloHeaderProvider HelloHeaderProvider
}

DialerConfig holds configuration for creating a classic dialer.

type Envelope

type Envelope interface {
	// WithTimeout returns a TimeoutEnvelope with a context using the given timeout
	WithTimeout(duration time.Duration) TimeoutEnvelope

	// WithContext returns a TimeoutEnvelope which uses the given context for timeout/cancellation
	WithContext(c context.Context) TimeoutEnvelope

	// Send sends the envelope on the given Channel
	Send(sender Sender) error

	// ReplyTo allows setting the reply header in a fluent style
	ReplyTo(msg *Message) Envelope

	// ToSendable converts the Envelope into a Sendable, which can be submitted to a Channel for sending
	ToSendable() Sendable
}

Envelope allows setting message context and timeouts. Message is an Envelope (as well as a Sendable)

func NewErrorEnvelope

func NewErrorEnvelope(err error) Envelope

type ErrorHandler

type ErrorHandler interface {
	HandleError(err error, ch Channel)
}

ErrorHandler handles errors that occur during channel operations.

type ErrorHandlerF

type ErrorHandlerF func(err error, ch Channel)

ErrorHandlerF is the function form of ErrorHandler.

func (ErrorHandlerF) HandleError

func (self ErrorHandlerF) HandleError(err error, ch Channel)

type Factory

type Factory func(underlay Underlay, closeCallback func()) (Channel, error)

Factory creates a new multi-underlay Channel from the first incoming underlay. The closeCallback should be called when the channel is closed to remove it from the listener.

type Headers

type Headers map[int32][]byte

Headers is a map of integer keys to byte-slice values, used for message metadata.

func (Headers) GetBoolHeader

func (self Headers) GetBoolHeader(key int32) (bool, bool)

func (Headers) GetByteHeader

func (self Headers) GetByteHeader(key int32) (byte, bool)

func (Headers) GetStringHeader

func (self Headers) GetStringHeader(key int32) (string, bool)

func (Headers) GetStringSliceHeader

func (self Headers) GetStringSliceHeader(key int32) ([]string, bool, error)

func (Headers) GetStringToStringMapHeader

func (self Headers) GetStringToStringMapHeader(key int32) (map[string]string, bool, error)

func (Headers) GetU32ToBytesMapHeader

func (self Headers) GetU32ToBytesMapHeader(key int32) (map[uint32][]byte, bool, error)

func (Headers) GetUint16Header

func (self Headers) GetUint16Header(key int32) (uint16, bool)

func (Headers) GetUint32Header

func (self Headers) GetUint32Header(key int32) (uint32, bool)

func (Headers) GetUint64Header

func (self Headers) GetUint64Header(key int32) (uint64, bool)

func (Headers) PutBoolHeader

func (self Headers) PutBoolHeader(key int32, value bool)

func (Headers) PutByteHeader

func (self Headers) PutByteHeader(key int32, value byte)

func (Headers) PutStringHeader

func (self Headers) PutStringHeader(key int32, value string)

func (Headers) PutStringSliceHeader

func (self Headers) PutStringSliceHeader(key int32, s []string)

func (Headers) PutStringToStringMapHeader

func (self Headers) PutStringToStringMapHeader(key int32, m map[string]string)

func (Headers) PutU32ToBytesMapHeader

func (self Headers) PutU32ToBytesMapHeader(key int32, m map[uint32][]byte)

func (Headers) PutUint16Header

func (self Headers) PutUint16Header(key int32, value uint16)

func (Headers) PutUint32Header

func (self Headers) PutUint32Header(key int32, value uint32)

func (Headers) PutUint64Header

func (self Headers) PutUint64Header(key int32, value uint64)

type HeartbeatCallback

type HeartbeatCallback interface {
	HeartbeatTx(ts int64)
	HeartbeatRx(ts int64)
	HeartbeatRespTx(ts int64)
	HeartbeatRespRx(ts int64)
	CheckHeartBeat()
}

HeartbeatCallback provide an interface that is notified when various heartbeat events take place

type HeartbeatControl added in v5.0.11

type HeartbeatControl interface {
	// UpdateIntervals changes the heartbeat send and check intervals. The send
	// interval takes effect on the next outbound message; the check interval on
	// the next pulse of the heartbeat loop.
	UpdateIntervals(sendInterval, checkInterval time.Duration)
}

HeartbeatControl retunes the heartbeat timing of a running channel. It is returned by ConfigureHeartbeat so a caller can change the send and check intervals without rebuilding the channel, for example when applying updated configuration to an established link. UpdateIntervals is meant to be called from a single goroutine per channel.

func ConfigureHeartbeat

func ConfigureHeartbeat(binding Binding, heartbeatInterval time.Duration, checkInterval time.Duration, cb HeartbeatCallback) HeartbeatControl

ConfigureHeartbeat setups up heartbeats on the given channel. It assumes that an equivalent setup happens on the other side of the channel.

When possible, heartbeats will be sent on existing traffic. When a heartbeat is due to be sent, the next message sent will include a heartbeat header. If no message is sent by the time the checker runs on checkInterval, a standalone heartbeat message will be sent.

Similarly, when a message with a heartbeat header is received, the next sent message will have a header set with the heartbeat response. If no message is sent within a few milliseconds, a standalone heartbeat response will be sent

The returned HeartbeatControl can be used to retune the send and check intervals later without rebuilding the channel.

type HeartbeatOptions

type HeartbeatOptions struct {
	SendInterval             time.Duration `json:"sendInterval"`
	CheckInterval            time.Duration `json:"checkInterval"`
	CloseUnresponsiveTimeout time.Duration `json:"closeUnresponsiveTimeout"`
	// contains filtered or unexported fields
}

HeartbeatOptions configures heartbeat send interval, check interval, and unresponsive timeout.

func DefaultHeartbeatOptions

func DefaultHeartbeatOptions() *HeartbeatOptions

DefaultHeartbeatOptions returns HeartbeatOptions with sensible defaults.

func LoadHeartbeatOptions

func LoadHeartbeatOptions(data map[interface{}]interface{}) (*HeartbeatOptions, error)

LoadHeartbeatOptions parses HeartbeatOptions from a configuration map.

func (*HeartbeatOptions) GetDuration

func (self *HeartbeatOptions) GetDuration(name string) (*time.Duration, error)

GetDuration parses a named duration value from the source configuration map.

type Hello

type Hello struct {
	IdToken string
	Headers map[int32][]byte
}

Hello is the initial handshake message exchanged when a connection is established.

func UnmarshalHello

func UnmarshalHello(message *Message) *Hello

UnmarshalHello extracts a Hello from a received Message.

type HelloAcceptor added in v5.0.11

type HelloAcceptor interface {
	AcceptUnderlay(underlay Underlay, ackHello func() error)
}

HelloAcceptor takes ownership of an underlay whose hello has been received but not yet acknowledged. Implementations must call ackHello exactly once before using the underlay for application traffic, and must close the underlay if they do not accept it (including when ackHello returns an error).

Deferring the ack lets an acceptor record state keyed off the underlay before the peer is released by the acknowledgement. MultiListener uses this to register a group before acking, so a racing subsequent underlay for the same group cannot reach the listener before the group is known.

func AsHelloAcceptor added in v5.0.11

func AsHelloAcceptor(acceptor UnderlayAcceptor) HelloAcceptor

AsHelloAcceptor adapts a legacy UnderlayAcceptor (which receives an already-acknowledged underlay) into a HelloAcceptor by acknowledging the hello before handing the underlay off. Use it for single-underlay acceptors, which don't need to reserve state before the ack; multi-underlay acceptors should implement HelloAcceptor directly.

type HelloAcceptorF added in v5.0.11

type HelloAcceptorF func(underlay Underlay)

HelloAcceptorF adapts a plain accept function into a HelloAcceptor that acknowledges the hello and then hands off the underlay: the original single-phase behavior, with no reservation before the ack.

func (HelloAcceptorF) AcceptUnderlay added in v5.0.11

func (f HelloAcceptorF) AcceptUnderlay(underlay Underlay, ackHello func() error)

AcceptUnderlay acknowledges the hello and, on success, passes the underlay to the wrapped function. If the acknowledgement fails the underlay is closed.

type HelloHeaderProvider added in v5.0.13

type HelloHeaderProvider func(peerCertificates []*x509.Certificate, headers map[int32][]byte) error

HelloHeaderProvider adjusts the hello headers based on the peer's certificates. It is invoked after the transport connection is established (so the peer's certificates are available) but before the hello is sent, letting a dialer set hello headers that depend on the peer's verified identity. headers contains the headers already set on the hello (statically-configured and per-dial); the provider may add, modify or remove entries directly. Entries should be replaced rather than having their value slices edited in place, as values may be shared with the dialer's configuration. Returning an error aborts the dial, and that error is treated as non-retryable: the dialer does not open another connection to retry, since a provider that returns an error has made a deliberate decision about this specific peer.

type Identity

type Identity interface {
	// The Id used to represent the identity of this channel to lower-level resources.
	//
	Id() string

	// The LogicalName represents the purpose or usage of this channel (i.e. 'ctrl', 'mgmt' 'r/001', etc.) Usually used
	// by humans in understand the logical purpose of a channel.
	//
	LogicalName() string

	// The ConnectionId represents the identity of this Channel to internal API components ("instance identifier").
	// Usually used by the Channel framework to differentiate Channel instances.
	//
	ConnectionId() string

	// Certificates contains the identity certificates provided by the peer.
	//
	Certificates() []*x509.Certificate

	// Label constructs a consistently-formatted string used for context logging purposes, from the components above.
	//
	Label() string
}

Identity exposes the identifying information of a channel to callers and lower-level resources.

type ListenerConfig

type ListenerConfig struct {
	ConnectOptions
	Headers            map[int32][]byte
	HeadersF           func() map[int32][]byte
	TransportConfig    transport.Configuration
	PoolConfigurator   func(config *goroutines.PoolConfig)
	ConnectionHandlers []ConnectionHandler
	MessageStrategy    MessageStrategy
}

ListenerConfig holds configuration for creating a classic listener.

func DefaultListenerConfig

func DefaultListenerConfig() ListenerConfig

DefaultListenerConfig returns a ListenerConfig with sensible defaults.

type Message

type Message struct {
	MessageHeader
	Body []byte
}

Message is a content-typed, sequenced message with headers and a body, suitable for wire transmission.

func NewHello

func NewHello(idToken string, attributes map[int32][]byte) *Message

NewHello creates a hello Message with the given identity token and attributes.

func NewMessage

func NewMessage(contentType int32, body []byte) *Message

NewMessage creates a new Message with the given content type and body.

func NewResult

func NewResult(success bool, message string) *Message

NewResult creates a result Message indicating success or failure with a text message.

func ReadV2

func ReadV2(peer io.Reader) (*Message, error)

ReadV2 reads a V2 message from the given reader and returns the unmarshalled message

func (*Message) Context

func (m *Message) Context() context.Context

func (*Message) Msg

func (m *Message) Msg() *Message

func (*Message) ReplyReceiver

func (m *Message) ReplyReceiver() ReplyReceiver

func (*Message) ReplyTo

func (m *Message) ReplyTo(o *Message) Envelope

func (*Message) Send

func (m *Message) Send(sender Sender) error

func (*Message) SendListener

func (m *Message) SendListener() SendListener

func (*Message) SetSequence

func (m *Message) SetSequence(seq int32)

func (*Message) String

func (m *Message) String() string

func (*Message) ToSendable

func (m *Message) ToSendable() Sendable

func (*Message) WithContext

func (m *Message) WithContext(c context.Context) TimeoutEnvelope

func (*Message) WithTimeout

func (m *Message) WithTimeout(duration time.Duration) TimeoutEnvelope

type MessageHeader

type MessageHeader struct {
	ContentType int32

	Headers Headers
	// contains filtered or unexported fields
}

MessageHeader contains the metadata for a channel message: content type, sequence, and headers.

func (*MessageHeader) GetBoolHeader

func (header *MessageHeader) GetBoolHeader(key int32) (bool, bool)

func (*MessageHeader) GetByteHeader

func (header *MessageHeader) GetByteHeader(key int32) (byte, bool)

func (*MessageHeader) GetStringHeader

func (header *MessageHeader) GetStringHeader(key int32) (string, bool)

func (*MessageHeader) GetStringSliceHeader

func (header *MessageHeader) GetStringSliceHeader(key int32) ([]string, bool, error)

func (*MessageHeader) GetStringToStringMapHeader

func (header *MessageHeader) GetStringToStringMapHeader(key int32) (map[string]string, bool, error)

func (*MessageHeader) GetU32ToBytesMapHeader

func (header *MessageHeader) GetU32ToBytesMapHeader(key int32) (map[uint32][]byte, bool, error)

func (*MessageHeader) GetUint16Header

func (header *MessageHeader) GetUint16Header(key int32) (uint16, bool)

func (*MessageHeader) GetUint32Header

func (header *MessageHeader) GetUint32Header(key int32) (uint32, bool)

func (*MessageHeader) GetUint64Header

func (header *MessageHeader) GetUint64Header(key int32) (uint64, bool)

func (*MessageHeader) IsReply

func (header *MessageHeader) IsReply() bool

func (*MessageHeader) IsReplyingTo

func (header *MessageHeader) IsReplyingTo(sequence int32) bool

func (*MessageHeader) PutBoolHeader

func (header *MessageHeader) PutBoolHeader(key int32, value bool)

func (*MessageHeader) PutByteHeader

func (header *MessageHeader) PutByteHeader(key int32, value byte)

func (*MessageHeader) PutStringHeader

func (header *MessageHeader) PutStringHeader(key int32, value string)

func (*MessageHeader) PutStringSliceHeader

func (header *MessageHeader) PutStringSliceHeader(key int32, s []string)

func (*MessageHeader) PutStringToStringMapHeader

func (header *MessageHeader) PutStringToStringMapHeader(key int32, m map[string]string)

func (*MessageHeader) PutU32ToBytesMapHeader

func (header *MessageHeader) PutU32ToBytesMapHeader(key int32, m map[uint32][]byte)

func (*MessageHeader) PutUint16Header

func (header *MessageHeader) PutUint16Header(key int32, value uint16)

func (*MessageHeader) PutUint32Header

func (header *MessageHeader) PutUint32Header(key int32, value uint32)

func (*MessageHeader) PutUint64Header

func (header *MessageHeader) PutUint64Header(key int32, value uint64)

func (*MessageHeader) ReplyFor

func (header *MessageHeader) ReplyFor() int32

func (*MessageHeader) Sequence

func (header *MessageHeader) Sequence() int32

type MessageMarshaller

type MessageMarshaller func(m *Message) ([]byte, error)

MessageMarshaller serializes a Message into bytes for transmission.

type MessageQueue

type MessageQueue struct {
	C      chan Sendable
	Sender Sender
}

MessageQueue pairs a Sender with its backing channel. Use this to create priority-based message routing: create one queue per priority level, then wire them to message sources using MakeSingleQueueMessageSource/2/3.

func NewMessageQueue

func NewMessageQueue(ctx SenderContext, size int) *MessageQueue

NewMessageQueue creates a MessageQueue with the given buffer size.

type MessageSourceF

type MessageSourceF func(notifier *CloseNotifier) (Sendable, error)

MessageSourceF is a function that returns the next message to send on an underlay. It blocks until a message is available or the notifier signals closure.

func MakeSingleQueueMessageSource

func MakeSingleQueueMessageSource(closeNotify <-chan struct{}, q1 <-chan Sendable) MessageSourceF

MakeSingleQueueMessageSource returns a MessageSourceF that reads from one queue.

func MakeThreeQueueMessageSource

func MakeThreeQueueMessageSource(closeNotify <-chan struct{}, q1, q2, q3 <-chan Sendable) MessageSourceF

MakeThreeQueueMessageSource returns a MessageSourceF that reads from three queues with equal priority.

func MakeTwoQueueMessageSource

func MakeTwoQueueMessageSource(closeNotify <-chan struct{}, q1, q2 <-chan Sendable) MessageSourceF

MakeTwoQueueMessageSource returns a MessageSourceF that reads from two queues with equal priority.

type MessageSourceProvider

type MessageSourceProvider interface {
	GetMessageSource(underlayType string) MessageSourceF
}

MessageSourceProvider returns the message source for a given underlay type.

type MessageStrategy

type MessageStrategy interface {
	GetMarshaller() MessageMarshaller
	GetStreamProducer() StreamMessageProducer
	GetPacketProducer() PacketMessageProducer
}

MessageStrategy provides pluggable message serialization and deserialization.

type MsgReceiveHandler

type MsgReceiveHandler interface {
	HandleReceive(m *Message)
}

MsgReceiveHandler handles received messages without channel context.

type MsgReceiveHandlerF

type MsgReceiveHandlerF func(m *Message)

MsgReceiveHandlerF is the function form of MsgReceiveHandler.

func (MsgReceiveHandlerF) HandleReceive

func (self MsgReceiveHandlerF) HandleReceive(m *Message)

type MultiChannelOptions

type MultiChannelOptions struct {
	ConnectOptions
	WriteTimeout    time.Duration
	MessageStrategy MessageStrategy
}

MultiChannelOptions configures multi-underlay channels with connect, write, and message options.

type MultiListener

type MultiListener struct {
	// contains filtered or unexported fields
}

MultiListener routes incoming underlays to existing channels or creates new ones. Grouped underlays are matched by connection ID; ungrouped ones are passed to the fallback.

func NewMultiListener

func NewMultiListener(channelF Factory, fallback UngroupedChannelFallback) *MultiListener

NewMultiListener creates a MultiListener with the given channel factory and ungrouped fallback.

func (*MultiListener) AcceptUnderlay

func (self *MultiListener) AcceptUnderlay(underlay Underlay, ackHello func() error)

AcceptUnderlay routes an incoming underlay to an existing channel or creates a new one. It implements HelloAcceptor: for a grouped first connection it registers the group (reserving its id) before acknowledging the hello, so the ack - which releases the dialer to dial subsequent underlays - cannot precede the group being known. A subsequent underlay therefore always finds either the channel or a create-in-progress notifier and attaches, rather than racing group creation and being rejected.

func (*MultiListener) CloseChannel

func (self *MultiListener) CloseChannel(chId string)

CloseChannel removes the channel with the given ID from the listener's map.

type NonRetryableError added in v5.0.13

type NonRetryableError struct {
	// contains filtered or unexported fields
}

NonRetryableError marks an error as a deliberate, final dial failure that the dialer must not retry internally (in contrast to a protocol-version negotiation error, which is retried).

func (*NonRetryableError) Error added in v5.0.13

func (self *NonRetryableError) Error() string

func (*NonRetryableError) Unwrap added in v5.0.13

func (self *NonRetryableError) Unwrap() error

type Options

type Options struct {
	OutQueueSize int
	ConnectOptions
	WriteTimeout    time.Duration
	MessageStrategy MessageStrategy

	// Logger, when set, is the slog.Logger this channel uses for its lifecycle
	// events. It takes precedence over the resolver installed via SetLoggerFor,
	// letting an owner give each channel (or channel type) its own logger and level
	// control - e.g. an SDK context injecting its logger, or a router naming
	// its link vs ctrl channels distinctly. When nil, the channel falls back
	// to the SetLoggerFor resolver and then to the pfxlog default.
	Logger *slog.Logger `json:"-"`
}

Options configures channel behavior including queue sizes, timeouts, and message strategy.

func DefaultOptions

func DefaultOptions() *Options

DefaultOptions returns Options with sensible defaults.

func LoadOptions

func LoadOptions(data map[interface{}]interface{}) (*Options, error)

LoadOptions parses channel Options from a configuration map, starting from DefaultOptions.

func (*Options) Load added in v5.0.1

func (o *Options) Load(data map[interface{}]interface{}) error

Load applies configuration values from the given map onto the Options instance. This allows callers to adjust defaults before loading configuration.

func (Options) String

func (o Options) String() string

type PacketMessageProducer

type PacketMessageProducer func(b []byte) (*Message, error)

PacketMessageProducer reads a Message from a packet buffer.

type PeekHandler

type PeekHandler interface {
	Connect(ch Channel, remoteAddress string)
	Rx(m *Message, ch Channel)
	Tx(m *Message, ch Channel)
	Close(ch Channel)
}

PeekHandler observes messages as they flow through the channel without modifying them.

type ReadAdapter

type ReadAdapter struct {
	// contains filtered or unexported fields
}

ReadAdapter bridges push-based data delivery into an io.Reader interface with deadline support.

func NewReadAdapter

func NewReadAdapter(label string, channelDepth int) *ReadAdapter

NewReadAdapter creates a ReadAdapter with the given label and internal buffer depth.

func (*ReadAdapter) Close

func (self *ReadAdapter) Close()

func (*ReadAdapter) GetNext

func (self *ReadAdapter) GetNext() ([]byte, error)

GetNext blocks until data is available, the deadline expires, or the adapter is closed.

func (*ReadAdapter) PushData

func (self *ReadAdapter) PushData(data []byte) error

PushData enqueues data to be read. Blocks until the data is accepted or the adapter is closed.

func (*ReadAdapter) Read

func (self *ReadAdapter) Read(b []byte) (n int, err error)

func (*ReadAdapter) SetReadDeadline

func (self *ReadAdapter) SetReadDeadline(deadline time.Time) error

SetReadDeadline sets the deadline for the next GetNext/Read call. A zero value clears the deadline.

type ReadTimout

type ReadTimout struct{}

ReadTimout is returned when a read operation exceeds its deadline. Implements net.Error.

func (ReadTimout) Error

func (r ReadTimout) Error() string

func (ReadTimout) Temporary

func (r ReadTimout) Temporary() bool

func (ReadTimout) Timeout

func (r ReadTimout) Timeout() bool

type ReceiveHandler

type ReceiveHandler interface {
	HandleReceive(m *Message, ch Channel)
}

ReceiveHandler handles received messages for a specific content type.

type ReceiveHandlerF

type ReceiveHandlerF func(m *Message, ch Channel)

ReceiveHandlerF is the function form of ReceiveHandler.

func (ReceiveHandlerF) HandleReceive

func (self ReceiveHandlerF) HandleReceive(m *Message, ch Channel)

type ReconnectingDialerConfig

type ReconnectingDialerConfig struct {
	Identity          *identity.TokenId
	Endpoint          transport.Address
	LocalBinding      string
	Headers           map[int32][]byte
	TransportConfig   transport.Configuration
	ReconnectHandler  func()
	DisconnectHandler func()
}

ReconnectingDialerConfig holds configuration for creating a reconnecting dialer.

type ReplyReceiver

type ReplyReceiver interface {
	AcceptReply(*Message)
}

ReplyReceiver is used to get notified when a Message reply arrives

type Result

type Result struct {
	Success bool
	Message string
}

Result is a parsed result message containing a success flag and text.

func UnmarshalResult

func UnmarshalResult(message *Message) *Result

UnmarshalResult extracts a Result from a received Message.

type SendListener

type SendListener interface {
	// NotifyQueued is called when the message has been queued for send
	NotifyQueued()
	// NotifyBeforeWrite is called before send is called
	NotifyBeforeWrite()
	// NotifyAfterWrite is called after the message has been written to the Underlay
	NotifyAfterWrite()
	// NotifyErr is called if the Sendable context errors before send or if writing to the Underlay fails
	NotifyErr(error)
}

SendListener is notified at the various stages of a message send

type Sendable

type Sendable interface {
	// Msg returns the Message to send
	Msg() *Message

	// SetSequence sets a sequence number indicating in which order the message was received
	SetSequence(seq int32)

	// Sequence returns the sequence number
	Sequence() int32

	// Context returns the Context used for timeouts/cancelling message sends, etc
	Context() context.Context

	// SendListener returns the SendListener to invoke at each stage of the send operation
	SendListener() SendListener

	// ReplyReceiver returns the ReplyReceiver to be invoked when a reply for the message or received, or nil if
	// no ReplyReceiver should be invoked if or when a reply is received
	ReplyReceiver() ReplyReceiver
}

Sendable encapsulates all the data and callbacks that a Channel requires when sending a Message.

type Sender

type Sender interface {
	// Send will send the given Sendable. If the Sender is busy, it will wait until either the Sender
	// can process the Sendable, the channel is closed or the associated context.Context times out
	Send(s Sendable) error

	// TrySend will send the given Sendable. If the Sender is busy (outgoing message queue is full), it will return
	// immediately rather than wait. The boolean return indicates whether the message was queued or not
	TrySend(s Sendable) (bool, error)

	// CloseNotify returns a channel that is closed when the sender is closed
	CloseNotify() <-chan struct{}
}

Sender sends messages on a channel. It supports both blocking and non-blocking sends.

func NewSingleChSender

func NewSingleChSender(ctx SenderContext, msgC chan<- Sendable) Sender

NewSingleChSender creates a Sender that writes to a single Go channel.

type SenderContext

type SenderContext interface {
	NextSequence() int32
	GetCloseNotify() chan struct{}
}

SenderContext provides sequence numbering and close notification for senders.

func NewSenderContext

func NewSenderContext() SenderContext

NewSenderContext creates a new SenderContext with its own sequence counter and close channel.

type Senders

type Senders interface {
	SenderContext
	// GetDefaultSender returns the default sender for the channel.
	GetDefaultSender() Sender
	// HandleTxFailed is called when an underlay write fails.
	// Returns true if the message was requeued for retry.
	HandleTxFailed(underlayType string, sendable Sendable) bool
}

Senders provides access to senders and handles transmission failures.

type SimpleMessageSourceProvider

type SimpleMessageSourceProvider struct {
	// contains filtered or unexported fields
}

SimpleMessageSourceProvider maps underlay types to message sources, with a default fallback.

func NewSimpleMessageSourceProvider

func NewSimpleMessageSourceProvider(defaultSource MessageSourceF) *SimpleMessageSourceProvider

NewSimpleMessageSourceProvider creates a provider with the given default message source.

func (*SimpleMessageSourceProvider) AddSource

func (p *SimpleMessageSourceProvider) AddSource(underlayType string, source MessageSourceF)

AddSource registers a message source for a specific underlay type.

func (*SimpleMessageSourceProvider) GetMessageSource

func (p *SimpleMessageSourceProvider) GetMessageSource(underlayType string) MessageSourceF

GetMessageSource returns the message source for the given underlay type, falling back to the default.

type StreamMessageProducer

type StreamMessageProducer func(r io.Reader) (*Message, error)

StreamMessageProducer reads a Message from a stream-oriented reader.

type TimeoutEnvelope

type TimeoutEnvelope interface {
	Envelope

	// SendAndWaitForWire will wait until the configured timeout or until the message is sent, whichever comes first
	// If the timeout happens first, the context error will be returned, wrapped by a TimeoutError
	SendAndWaitForWire(sender Sender) error

	// SendForReply will wait until the configured timeout or until a reply is received, whichever comes first
	// If the timeout happens first, the context error will be returned, wrapped by a TimeoutError
	SendForReply(sender Sender) (*Message, error)
}

TimeoutEnvelope has timeout related convenience methods, such as waiting for a Message to be written to the wire or waiting for a Message reply

type TimeoutError

type TimeoutError struct {
	// contains filtered or unexported fields
}

TimeoutError is used to indicate a timeout happened

func (TimeoutError) Unwrap

func (self TimeoutError) Unwrap() error

type TraceHandler

type TraceHandler struct {
	// contains filtered or unexported fields
}

TraceHandler is a PeekHandler that writes channel message traces to a file.

func NewTraceHandler

func NewTraceHandler(path string, id string) (*TraceHandler, error)

NewTraceHandler creates a TraceHandler that writes trace output to the given file path.

func (*TraceHandler) AddDecoder

func (h *TraceHandler) AddDecoder(decoder TraceMessageDecoder)

AddDecoder registers a message decoder for trace output formatting.

func (TraceHandler) Close

func (h TraceHandler) Close(ch Channel)

func (*TraceHandler) Connect

func (h *TraceHandler) Connect(ch Channel, remoteAddress string)

func (TraceHandler) Rx

func (h TraceHandler) Rx(msg *Message, ch Channel)

func (TraceHandler) Tx

func (h TraceHandler) Tx(msg *Message, ch Channel)

type TraceMessageDecode

type TraceMessageDecode map[string]interface{}

TraceMessageDecode is a map of trace metadata fields, serializable to JSON.

func NewTraceMessageDecode

func NewTraceMessageDecode(decoder, message string) TraceMessageDecode

NewTraceMessageDecode creates a TraceMessageDecode with the decoder and message type pre-populated.

func (TraceMessageDecode) MarshalResult

func (d TraceMessageDecode) MarshalResult() ([]byte, bool)

MarshalResult serializes the trace metadata to JSON, returning (nil, true) on error.

func (TraceMessageDecode) MarshalTraceMessageDecode

func (d TraceMessageDecode) MarshalTraceMessageDecode() ([]byte, error)

MarshalTraceMessageDecode serializes the trace metadata to JSON.

type TraceMessageDecoder

type TraceMessageDecoder interface {
	Decode(msg *Message) ([]byte, bool)
}

TraceMessageDecoder decodes a Message into a JSON byte representation for tracing.

type TransformHandler

type TransformHandler interface {
	Rx(m *Message, ch Channel)
	Tx(m *Message, ch Channel)
}

TransformHandler can modify messages as they flow through the channel.

type TypeLoggingUnderlay

type TypeLoggingUnderlay struct {
	// contains filtered or unexported fields
}

TypeLoggingUnderlay wraps an Underlay and logs the content type of each transmitted message.

func (*TypeLoggingUnderlay) Certificates

func (self *TypeLoggingUnderlay) Certificates() []*x509.Certificate

func (*TypeLoggingUnderlay) Close

func (self *TypeLoggingUnderlay) Close() error

func (*TypeLoggingUnderlay) ConnectionId

func (self *TypeLoggingUnderlay) ConnectionId() string

func (*TypeLoggingUnderlay) CreatedAt added in v5.0.7

func (self *TypeLoggingUnderlay) CreatedAt() time.Time

func (*TypeLoggingUnderlay) GetLocalAddr

func (self *TypeLoggingUnderlay) GetLocalAddr() net.Addr

func (*TypeLoggingUnderlay) GetRemoteAddr

func (self *TypeLoggingUnderlay) GetRemoteAddr() net.Addr

func (*TypeLoggingUnderlay) Headers

func (self *TypeLoggingUnderlay) Headers() map[int32][]byte

func (*TypeLoggingUnderlay) Id

func (self *TypeLoggingUnderlay) Id() string

func (*TypeLoggingUnderlay) IsClosed

func (self *TypeLoggingUnderlay) IsClosed() bool

func (*TypeLoggingUnderlay) Label

func (self *TypeLoggingUnderlay) Label() string

func (*TypeLoggingUnderlay) LogicalName

func (self *TypeLoggingUnderlay) LogicalName() string

func (*TypeLoggingUnderlay) Rx

func (self *TypeLoggingUnderlay) Rx() (*Message, error)

func (*TypeLoggingUnderlay) SetWriteDeadline

func (self *TypeLoggingUnderlay) SetWriteDeadline(time time.Time) error

func (*TypeLoggingUnderlay) SetWriteTimeout

func (self *TypeLoggingUnderlay) SetWriteTimeout(duration time.Duration) error

func (*TypeLoggingUnderlay) Tx

func (self *TypeLoggingUnderlay) Tx(m *Message) error

type TypeRoutingAcceptor added in v5.0.11

type TypeRoutingAcceptor struct {
	Acceptors       map[string]HelloAcceptor
	DefaultAcceptor HelloAcceptor
}

TypeRoutingAcceptor is a HelloAcceptor that routes each underlay to a per-type HelloAcceptor based on its TypeHeader, falling back to DefaultAcceptor when the type is absent or unrecognized. It is the ack-aware analog of UnderlayDispatcher for use with NewClassicListenerWithAcceptor, letting multi-underlay acceptors reserve state before the hello is acknowledged.

func (*TypeRoutingAcceptor) AcceptUnderlay added in v5.0.11

func (self *TypeRoutingAcceptor) AcceptUnderlay(underlay Underlay, ackHello func() error)

AcceptUnderlay routes the underlay to the acceptor registered for its TypeHeader, or to the default acceptor. If no acceptor applies the underlay is closed without acking.

type TypedBindHandler

type TypedBindHandler[S Senders] interface {
	BindChannel(binding TypedBinding[S]) error
}

TypedBindHandler is a bind handler that receives a TypedBinding.

func TypedBindHandlers

func TypedBindHandlers[S Senders](handlers ...TypedBindHandler[S]) TypedBindHandler[S]

TypedBindHandlers combines multiple TypedBindHandlers into one.

type TypedBindHandlerF

type TypedBindHandlerF[S Senders] func(binding TypedBinding[S]) error

TypedBindHandlerF is the function form of TypedBindHandler.

func (TypedBindHandlerF[S]) BindChannel

func (self TypedBindHandlerF[S]) BindChannel(binding TypedBinding[S]) error

type TypedBinding

type TypedBinding[S Senders] interface {
	Binding
	GetSenders() S
	AddTypedReceiveHandler(contentType int32, h TypedReceiveHandler[S])
	AddTypedReceiveHandlerF(contentType int32, h TypedReceiveHandlerF[S])
}

TypedBinding extends Binding with typed senders access.

type TypedReceiveHandler

type TypedReceiveHandler[S Senders] interface {
	HandleReceive(m *Message, ch Channel, senders S)
}

TypedReceiveHandler is a receive handler that gets typed senders access.

type TypedReceiveHandlerF

type TypedReceiveHandlerF[S Senders] func(m *Message, ch Channel, senders S)

TypedReceiveHandlerF is the function form of TypedReceiveHandler.

func (TypedReceiveHandlerF[S]) HandleReceive

func (self TypedReceiveHandlerF[S]) HandleReceive(m *Message, ch Channel, senders S)

type Underlay

type Underlay interface {
	Rx() (*Message, error)
	Tx(m *Message) error
	Identity
	io.Closer
	IsClosed() bool
	Headers() map[int32][]byte
	SetWriteTimeout(duration time.Duration) error
	SetWriteDeadline(time time.Time) error
	GetLocalAddr() net.Addr
	GetRemoteAddr() net.Addr
	// CreatedAt returns the time at which the underlay was created. It is used to judge
	// connection stability, e.g. for flap detection when an underlay is closed.
	CreatedAt() time.Time
}

Underlay abstracts a physical communications channel, typically sitting on top of 'transport'.

type UnderlayAcceptor

type UnderlayAcceptor interface {
	AcceptUnderlay(u Underlay) error
}

An UnderlayAcceptor take an Underlay and generally turns it into a channel for a specific use. It can be used when handling multiple channel types on a single listener

type UnderlayConstraint

type UnderlayConstraint struct {
	Desired int
	Min     int
}

UnderlayConstraint specifies the desired and minimum number of underlays for a given type.

func OneUnderlay

func OneUnderlay() UnderlayConstraint

OneUnderlay returns a constraint requiring exactly one underlay.

func OptionalUnderlays

func OptionalUnderlays(desired int) UnderlayConstraint

OptionalUnderlays returns a constraint with min 0 and the given desired count.

type UnderlayDispatcher

type UnderlayDispatcher struct {
	// contains filtered or unexported fields
}

An UnderlayDispatcher accept underlays from an underlay listener and hands them off to UnderlayAcceptor instances, based on the TypeHeader.

func NewUnderlayDispatcher

func NewUnderlayDispatcher(config UnderlayDispatcherConfig) *UnderlayDispatcher

NewUnderlayDispatcher creates a new UnderlayDispatcher from the given config.

func (*UnderlayDispatcher) Run

func (self *UnderlayDispatcher) Run()

Run accepts underlays in a loop, dispatching each to the appropriate acceptor based on TypeHeader.

type UnderlayDispatcherConfig

type UnderlayDispatcherConfig struct {
	Listener        UnderlayListener
	ConnectTimeout  time.Duration
	Acceptors       map[string]UnderlayAcceptor
	DefaultAcceptor UnderlayAcceptor
}

UnderlayDispatcherConfig holds configuration for an UnderlayDispatcher.

type UnderlayEventListener

type UnderlayEventListener interface {
	UnderlayAdded(ch Channel, underlay Underlay)
	UnderlayRemoved(ch Channel, underlay Underlay)
}

UnderlayEventListener is notified when underlays are added or removed from a channel.

type UnderlayFactory

type UnderlayFactory interface {
	Create(timeout time.Duration) (Underlay, error)
}

UnderlayFactory is used by Channel to obtain an Underlay instance. An underlay "dialer" or "listener" implement UnderlayFactory, to provide instances to Channel.

func NewExistingConnDialer

func NewExistingConnDialer(id *identity.TokenId, peer net.Conn, headers map[int32][]byte) UnderlayFactory

NewExistingConnDialer creates an UnderlayFactory that performs a hello handshake over an existing net.Conn.

func NewExistingConnListener

func NewExistingConnListener(identity *identity.TokenId, peer net.Conn, headers map[int32][]byte) UnderlayFactory

NewExistingConnListener creates an UnderlayFactory that accepts a hello handshake over an existing net.Conn.

func NewReconnectingDialer

func NewReconnectingDialer(config ReconnectingDialerConfig) UnderlayFactory

NewReconnectingDialer creates an UnderlayFactory that automatically reconnects on connection loss.

type UnderlayListener

type UnderlayListener interface {
	Listen(handlers ...ConnectionHandler) error
	UnderlayFactory
	io.Closer
}

UnderlayListener represents a component designed to listen for incoming peer connections.

func NewClassicListener

func NewClassicListener(identity *identity.TokenId, endpoint transport.Address, config ListenerConfig) UnderlayListener

NewClassicListener creates a classic listener that produces underlays via Create.

type Underlays

type Underlays struct {
	// contains filtered or unexported fields
}

Underlays manages a set of underlays with listener notification on add/remove.

func NewUnderlays

func NewUnderlays() *Underlays

NewUnderlays creates a new empty Underlays collection.

func (*Underlays) Add

func (u *Underlays) Add(ch Channel, underlay Underlay)

Add appends the underlay and notifies all listeners.

func (*Underlays) AddListener

func (u *Underlays) AddListener(l UnderlayEventListener)

AddListener registers a listener to be notified on underlay add/remove events.

func (*Underlays) CountsByType

func (u *Underlays) CountsByType() map[string]int

CountsByType returns the number of underlays for each underlay type.

func (*Underlays) First

func (u *Underlays) First() Underlay

First returns the first underlay, or nil if empty.

func (*Underlays) GetAll

func (u *Underlays) GetAll() []Underlay

GetAll returns a snapshot copy of all current underlays.

func (*Underlays) Remove

func (u *Underlays) Remove(ch Channel, underlay Underlay) bool

Remove removes the underlay and notifies all listeners if it was found.

type UngroupedChannelFallback

type UngroupedChannelFallback func(underlay Underlay) error

UngroupedChannelFallback handles incoming underlays that are not part of a grouped connection.

type UnsupportedVersionError

type UnsupportedVersionError struct {
	// contains filtered or unexported fields
}

UnsupportedVersionError indicates the remote side does not support the requested protocol version.

func (UnsupportedVersionError) Error

func (u UnsupportedVersionError) Error() string

Directories

Path Synopsis
pb

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL