Documentation
¶
Overview ¶
Example ¶
package main
import (
"fmt"
"time"
"github.com/openziti/channel/v5"
"github.com/openziti/identity"
"github.com/openziti/transport/v2/tcp"
)
func main() {
addr, err := tcp.AddressParser{}.Parse("tcp:localhost:6565")
if err != nil {
panic(err)
}
dialId := &identity.TokenId{Token: "echo-client"}
underlayFactory := channel.NewClassicDialer(channel.DialerConfig{Identity: dialId, Endpoint: addr})
ch, err := channel.NewSingleChannel("echo-test", underlayFactory, nil, nil)
if err != nil {
panic(err)
}
helloMessageType := int32(256)
msg := channel.NewMessage(helloMessageType, []byte("hello!"))
// Can send the message on the channel. The call will return once the message is queued
if err := ch.Send(msg); err != nil {
panic(err)
}
// Can also have the message send itself on the channel
if err := msg.Send(ch); err != nil {
panic(err)
}
// Can set a timeout before sending. If the message can't be queued before the timeout, an error will be returned
// If the timeout expires before the message can be sent, the message won't be sent
if err := msg.WithTimeout(time.Second).Send(ch); err != nil {
panic(err)
}
// Can set a timeout before sending and wait for the message to be written to the wire. If the timeout expires
// before the message is sent, the message won't be sent and a TimeoutError will be returned
if err := msg.WithTimeout(time.Second).SendAndWaitForWire(ch); err != nil {
panic(err)
}
// Can set a timeout before sending and waiting for a reply message. If the timeout expires before the message is
// sent, the message won't be sent and a TimeoutError will be returned. If the timeout expires before the reply
// arrives a TimeoutError will be returned.
reply, err := msg.WithTimeout(time.Second).SendForReply(ch)
if err != nil {
panic(err)
}
fmt.Println(string(reply.Body))
}
Output:
Index ¶
- Constants
- Variables
- func AddReceiveHandlers(binding Binding, handlers ...ContentTypeReceiver)
- func DecodeString(t string, b []byte) ([]byte, string, error)
- func DecodeStringSlice(b []byte) ([]string, error)
- func DecodeStringToStringMap(b []byte) (map[string]string, error)
- func DecodeU32ToBytesMap(b []byte) (map[uint32][]byte, error)
- func EncodeStringSlice(strSlice []string) []byte
- func EncodeStringToStringMap(m map[string]string) []byte
- func EncodeU32ToBytesMap(m map[uint32][]byte) []byte
- func GetRetryVersion(err error) (uint32, bool)
- func GetUnderlayType(underlay Underlay) string
- func IsNonRetryable(err error) bool
- func IsTimeout(err error) bool
- func MarshalV2(m *Message) ([]byte, error)
- func MarshalV2WithRaw(m *Message) ([]byte, error)
- func NewClassicListenerF(identity *identity.TokenId, endpoint transport.Address, config ListenerConfig, ...) (io.Closer, error)
- func NewClassicListenerWithAcceptor(identity *identity.TokenId, endpoint transport.Address, config ListenerConfig, ...) (io.Closer, error)
- func NewErrorContext(err error) context.Context
- func NewWSListener(peer transport.Conn) *wsListener
- func NextConnectionId() (string, error)
- func RetryToQueues(queues ...*MessageQueue) func(string, Sendable) bool
- func SetLoggerFor(f func(name string) *slog.Logger)
- func WriteUnknownVersionResponse(writer io.Writer)
- type AsyncFunctionReceiveAdapter
- type BackoffConfig
- type BackoffDialPolicy
- func (self *BackoffDialPolicy) ConsecutiveFailures() uint32
- func (self *BackoffDialPolicy) Dial(underlayType string, connectionId string, groupSecret []byte, isFirst bool, ...) (Underlay, error)
- func (self *BackoffDialPolicy) LastDialTime() time.Time
- func (self *BackoffDialPolicy) UnderlayClosed(underlayType string, lifetime time.Duration)
- type BaseSendListener
- type BaseSendable
- type BindHandler
- type BindHandlerF
- type Binder
- type Binding
- type Channel
- type CloseHandler
- type CloseHandlerF
- type CloseNotifier
- type ClosedError
- type Config
- type ConnectOptions
- type ConnectionHandler
- type ContentTypeReceiver
- type DatagramMessageStrategy
- type DatagramUnderlay
- func (self *DatagramUnderlay) Certificates() []*x509.Certificate
- func (self *DatagramUnderlay) Close() error
- func (self *DatagramUnderlay) ConnectionId() string
- func (self *DatagramUnderlay) CreatedAt() time.Time
- func (self *DatagramUnderlay) GetLocalAddr() net.Addr
- func (self *DatagramUnderlay) GetRemoteAddr() net.Addr
- func (self *DatagramUnderlay) Headers() map[int32][]byte
- func (self *DatagramUnderlay) Id() string
- func (self *DatagramUnderlay) IsClosed() bool
- func (self *DatagramUnderlay) Label() string
- func (self *DatagramUnderlay) LogicalName() string
- func (self *DatagramUnderlay) Rx() (*Message, error)
- func (self *DatagramUnderlay) SetWriteDeadline(deadline time.Time) error
- func (self *DatagramUnderlay) SetWriteTimeout(duration time.Duration) error
- func (self *DatagramUnderlay) Tx(m *Message) error
- type Decoder
- type DialPolicy
- type DialUnderlayFactory
- type DialerConfig
- type Envelope
- type ErrorHandler
- type ErrorHandlerF
- type Factory
- type Headers
- func (self Headers) GetBoolHeader(key int32) (bool, bool)
- func (self Headers) GetByteHeader(key int32) (byte, bool)
- func (self Headers) GetStringHeader(key int32) (string, bool)
- func (self Headers) GetStringSliceHeader(key int32) ([]string, bool, error)
- func (self Headers) GetStringToStringMapHeader(key int32) (map[string]string, bool, error)
- func (self Headers) GetU32ToBytesMapHeader(key int32) (map[uint32][]byte, bool, error)
- func (self Headers) GetUint16Header(key int32) (uint16, bool)
- func (self Headers) GetUint32Header(key int32) (uint32, bool)
- func (self Headers) GetUint64Header(key int32) (uint64, bool)
- func (self Headers) PutBoolHeader(key int32, value bool)
- func (self Headers) PutByteHeader(key int32, value byte)
- func (self Headers) PutStringHeader(key int32, value string)
- func (self Headers) PutStringSliceHeader(key int32, s []string)
- func (self Headers) PutStringToStringMapHeader(key int32, m map[string]string)
- func (self Headers) PutU32ToBytesMapHeader(key int32, m map[uint32][]byte)
- func (self Headers) PutUint16Header(key int32, value uint16)
- func (self Headers) PutUint32Header(key int32, value uint32)
- func (self Headers) PutUint64Header(key int32, value uint64)
- type HeartbeatCallback
- type HeartbeatControl
- type HeartbeatOptions
- type Hello
- type HelloAcceptor
- type HelloAcceptorF
- type HelloHeaderProvider
- type Identity
- type ListenerConfig
- type Message
- func (m *Message) Context() context.Context
- func (m *Message) Msg() *Message
- func (m *Message) ReplyReceiver() ReplyReceiver
- func (m *Message) ReplyTo(o *Message) Envelope
- func (m *Message) Send(sender Sender) error
- func (m *Message) SendListener() SendListener
- func (m *Message) SetSequence(seq int32)
- func (m *Message) String() string
- func (m *Message) ToSendable() Sendable
- func (m *Message) WithContext(c context.Context) TimeoutEnvelope
- func (m *Message) WithTimeout(duration time.Duration) TimeoutEnvelope
- type MessageHeader
- func (header *MessageHeader) GetBoolHeader(key int32) (bool, bool)
- func (header *MessageHeader) GetByteHeader(key int32) (byte, bool)
- func (header *MessageHeader) GetStringHeader(key int32) (string, bool)
- func (header *MessageHeader) GetStringSliceHeader(key int32) ([]string, bool, error)
- func (header *MessageHeader) GetStringToStringMapHeader(key int32) (map[string]string, bool, error)
- func (header *MessageHeader) GetU32ToBytesMapHeader(key int32) (map[uint32][]byte, bool, error)
- func (header *MessageHeader) GetUint16Header(key int32) (uint16, bool)
- func (header *MessageHeader) GetUint32Header(key int32) (uint32, bool)
- func (header *MessageHeader) GetUint64Header(key int32) (uint64, bool)
- func (header *MessageHeader) IsReply() bool
- func (header *MessageHeader) IsReplyingTo(sequence int32) bool
- func (header *MessageHeader) PutBoolHeader(key int32, value bool)
- func (header *MessageHeader) PutByteHeader(key int32, value byte)
- func (header *MessageHeader) PutStringHeader(key int32, value string)
- func (header *MessageHeader) PutStringSliceHeader(key int32, s []string)
- func (header *MessageHeader) PutStringToStringMapHeader(key int32, m map[string]string)
- func (header *MessageHeader) PutU32ToBytesMapHeader(key int32, m map[uint32][]byte)
- func (header *MessageHeader) PutUint16Header(key int32, value uint16)
- func (header *MessageHeader) PutUint32Header(key int32, value uint32)
- func (header *MessageHeader) PutUint64Header(key int32, value uint64)
- func (header *MessageHeader) ReplyFor() int32
- func (header *MessageHeader) Sequence() int32
- type MessageMarshaller
- type MessageQueue
- type MessageSourceF
- func MakeSingleQueueMessageSource(closeNotify <-chan struct{}, q1 <-chan Sendable) MessageSourceF
- func MakeThreeQueueMessageSource(closeNotify <-chan struct{}, q1, q2, q3 <-chan Sendable) MessageSourceF
- func MakeTwoQueueMessageSource(closeNotify <-chan struct{}, q1, q2 <-chan Sendable) MessageSourceF
- type MessageSourceProvider
- type MessageStrategy
- type MsgReceiveHandler
- type MsgReceiveHandlerF
- type MultiChannelOptions
- type MultiListener
- type NonRetryableError
- type Options
- type PacketMessageProducer
- type PeekHandler
- type ReadAdapter
- type ReadTimout
- type ReceiveHandler
- type ReceiveHandlerF
- type ReconnectingDialerConfig
- type ReplyReceiver
- type Result
- type SendListener
- type Sendable
- type Sender
- type SenderContext
- type Senders
- type SimpleMessageSourceProvider
- type StreamMessageProducer
- type TimeoutEnvelope
- type TimeoutError
- type TraceHandler
- type TraceMessageDecode
- type TraceMessageDecoder
- type TransformHandler
- type TypeLoggingUnderlay
- func (self *TypeLoggingUnderlay) Certificates() []*x509.Certificate
- func (self *TypeLoggingUnderlay) Close() error
- func (self *TypeLoggingUnderlay) ConnectionId() string
- func (self *TypeLoggingUnderlay) CreatedAt() time.Time
- func (self *TypeLoggingUnderlay) GetLocalAddr() net.Addr
- func (self *TypeLoggingUnderlay) GetRemoteAddr() net.Addr
- func (self *TypeLoggingUnderlay) Headers() map[int32][]byte
- func (self *TypeLoggingUnderlay) Id() string
- func (self *TypeLoggingUnderlay) IsClosed() bool
- func (self *TypeLoggingUnderlay) Label() string
- func (self *TypeLoggingUnderlay) LogicalName() string
- func (self *TypeLoggingUnderlay) Rx() (*Message, error)
- func (self *TypeLoggingUnderlay) SetWriteDeadline(time time.Time) error
- func (self *TypeLoggingUnderlay) SetWriteTimeout(duration time.Duration) error
- func (self *TypeLoggingUnderlay) Tx(m *Message) error
- type TypeRoutingAcceptor
- type TypedBindHandler
- type TypedBindHandlerF
- type TypedBinding
- type TypedReceiveHandler
- type TypedReceiveHandlerF
- type Underlay
- type UnderlayAcceptor
- type UnderlayConstraint
- type UnderlayDispatcher
- type UnderlayDispatcherConfig
- type UnderlayEventListener
- type UnderlayFactory
- type UnderlayListener
- type Underlays
- func (u *Underlays) Add(ch Channel, underlay Underlay)
- func (u *Underlays) AddListener(l UnderlayEventListener)
- func (u *Underlays) CountsByType() map[string]int
- func (u *Underlays) First() Underlay
- func (u *Underlays) GetAll() []Underlay
- func (u *Underlays) Remove(ch Channel, underlay Underlay) bool
- type UngroupedChannelFallback
- type UnsupportedVersionError
Examples ¶
Constants ¶
const ( DefaultOutstandingConnects = 16 DefaultQueuedConnects = 1 DefaultConnectTimeout = 5 * time.Second MinQueuedConnects = 1 MinOutstandingConnects = 1 MinConnectTimeout = 30 * time.Millisecond MaxQueuedConnects = 5000 MaxOutstandingConnects = 1000 MaxConnectTimeout = 60000 * time.Millisecond // DefaultOutQueueSize is the default capacity of the outgoing message queue. DefaultOutQueueSize = 4 )
Connection and queue size defaults, minimums, and maximums.
const ( DefaultHeartbeatSendInterval = 10 * time.Second DefaultHeartbeatCheckInterval = time.Second DefaultHeartbeatTimeout = 30 * time.Second )
Heartbeat timing defaults.
const ( ConnectionIdHeader = 0 ReplyForHeader = 1 ResultSuccessHeader = 2 HelloRouterAdvertisementsHeader = 3 HelloVersionHeader = 4 HeartbeatHeader = 5 HeartbeatResponseHeader = 6 TypeHeader = 7 IdHeader = 8 IsGroupedHeader = 9 GroupSecretHeader = 10 IsFirstGroupConnection = 11 UnderlayTypeHeader = 12 // Headers in the range 128-255 inclusive will be reflected when creating replies ReflectedHeaderBitMask = 1 << 7 MaxReflectedHeader = (1 << 8) - 1 )
*
- Message headers notes
- 0-127 reserved for channel
- 128-255 reserved for headers that need to be reflected back to sender on responses
- 128 is used for a message UUID for tracing
- 1000-1099 reserved for edge messages
- 1100-1199 is reserved for control plane messages
- 2000-2500 is reserved for xgress messages
- 2000-2255 is reserved for xgress implementation headers
const ( ContentTypeHelloType = 0 ContentTypePingType = 1 ContentTypeResultType = 2 ContentTypeLatencyType = 3 ContentTypeLatencyResponseType = 4 ContentTypeHeartbeat = 5 ContentTypeRaw = 6 )
Built-in message content types used by the channel protocol.
const AnyContentType = -1
AnyContentType is a wildcard content type used to register a fallback receive handler.
const BadMagicNumberError = stringError("protocol error: invalid header")
const DECODER = "channel"
DECODER is the decoder name used in trace output for built-in channel messages.
const DecoderFieldName = "__decoder__"
DecoderFieldName is the JSON key for the decoder name in trace output.
const (
// DefaultUnderlayType is used when an underlay's type header is missing or not in the valid types list.
DefaultUnderlayType = "default"
)
const HelloSequence = -1
HelloSequence is the sequence number used for hello/handshake messages.
const MessageFieldName = "__message__"
MessageFieldName is the JSON key for the message type name in trace output.
Variables ¶
var DefaultBackoffConfig = BackoffConfig{ BaseDelay: 2 * time.Second, MaxDelay: time.Minute, MinStableDuration: 30 * time.Second, }
DefaultBackoffConfig provides sensible defaults for BackoffConfig.
var ErrClosed = errors.New("channel closed")
ErrClosed is returned when a read is attempted on a closed ReadAdapter.
var ListenerClosedError = listenerClosedError{}
ListenerClosedError is returned when an operation is attempted on a closed listener.
Functions ¶
func AddReceiveHandlers ¶ added in v5.0.9
func AddReceiveHandlers(binding Binding, handlers ...ContentTypeReceiver)
AddReceiveHandlers registers receive handlers which provide their own content type.
func DecodeString ¶
DecodeString reads a length-prefixed string from a byte slice, returning the remaining bytes and the string.
func DecodeStringSlice ¶
DecodeStringSlice decodes a binary-encoded string slice.
func DecodeStringToStringMap ¶
DecodeStringToStringMap decodes a binary-encoded string-to-string map.
func DecodeU32ToBytesMap ¶
DecodeU32ToBytesMap decodes a binary-encoded uint32-to-bytes map.
func EncodeStringSlice ¶
EncodeStringSlice encodes a string slice into a length-prefixed binary format.
func EncodeStringToStringMap ¶
EncodeStringToStringMap encodes a string-to-string map into a length-prefixed binary format.
func EncodeU32ToBytesMap ¶
EncodeU32ToBytesMap encodes a uint32-to-bytes map into a length-prefixed binary format.
func GetRetryVersion ¶
GetRetryVersion extracts a compatible protocol version from an UnsupportedVersionError, if present.
func GetUnderlayType ¶
GetUnderlayType returns the underlay type from the headers. If no type header is present, returns DefaultUnderlayType.
func IsNonRetryable ¶ added in v5.0.13
IsNonRetryable reports whether err (or any error it wraps) is a NonRetryableError.
func MarshalV2WithRaw ¶
MarshalV2WithRaw marshals a message using V2 format, except for ContentTypeRaw messages which are sent as raw body bytes without framing.
func NewClassicListenerF ¶
func NewClassicListenerF(identity *identity.TokenId, endpoint transport.Address, config ListenerConfig, f func(underlay Underlay)) (io.Closer, error)
NewClassicListenerF creates a classic listener that calls f for each accepted underlay. f is invoked after the hello is acknowledged; use NewClassicListenerWithAcceptor for an acceptor that needs to record state before the acknowledgement.
func NewClassicListenerWithAcceptor ¶ added in v5.0.11
func NewClassicListenerWithAcceptor(identity *identity.TokenId, endpoint transport.Address, config ListenerConfig, acceptor HelloAcceptor) (io.Closer, error)
NewClassicListenerWithAcceptor creates a classic listener that hands each accepted underlay to the given HelloAcceptor, which controls when the hello is acknowledged.
func NewErrorContext ¶
func NewWSListener ¶
NewWSListener creates a WebSocket listener from an existing transport connection.
func NextConnectionId ¶
NextConnectionId returns a new unique connection identifier.
func RetryToQueues ¶
func RetryToQueues(queues ...*MessageQueue) func(string, Sendable) bool
RetryToQueues returns a HandleTxFailed function that tries to push the sendable to each queue in order, returning true if any accepts it without blocking.
func SetLoggerFor ¶ added in v5.0.12
SetLoggerFor installs the resolver that channel uses for events scoped to a channel's LogicalName (e.g. "ctrl", "link", "agent"), letting an embedding application route channel logging through its own slog setup and control verbosity per channel type. When no resolver is installed, channel falls back to a default logger that forwards to pfxlog/logrus, preserving the logging behavior channel had before this seam existed.
Set this once during startup, before any channels are created. Each channel resolves and caches its logger at creation, so installing a resolver after channels already exist applies only to channels created afterward.
func WriteUnknownVersionResponse ¶
WriteUnknownVersionResponse sends a version negotiation response listing locally supported versions.
Types ¶
type AsyncFunctionReceiveAdapter ¶ added in v5.0.9
type AsyncFunctionReceiveAdapter struct {
Type int32
Handler ReceiveHandlerF
}
AsyncFunctionReceiveAdapter is a ContentTypeReceiver which handles each message in a new goroutine, for handlers which may block or run long. Receive handlers are otherwise invoked on the channel's receive loop.
func (*AsyncFunctionReceiveAdapter) ContentType ¶ added in v5.0.9
func (adapter *AsyncFunctionReceiveAdapter) ContentType() int32
ContentType returns the message content type this handler processes.
func (*AsyncFunctionReceiveAdapter) HandleReceive ¶ added in v5.0.9
func (adapter *AsyncFunctionReceiveAdapter) HandleReceive(m *Message, ch Channel)
HandleReceive dispatches the message to the wrapped handler in a new goroutine.
type BackoffConfig ¶
type BackoffConfig struct {
// BaseDelay is the initial delay after the first failure (default: 2s).
BaseDelay time.Duration
// MaxDelay is the maximum delay between attempts (default: 60s).
MaxDelay time.Duration
// MinStableDuration is how long a connection must live to be considered stable (default: 30s).
// An underlay that closes before living this long is treated as short-lived and counts as
// a failure for backoff purposes.
MinStableDuration time.Duration
// MinDialInterval is the minimum time between dial attempts (default: 0, no limit).
// If a dial is requested before this interval has elapsed since the last dial,
// the goroutine sleeps until the interval is satisfied.
MinDialInterval time.Duration
}
BackoffConfig controls exponential backoff behavior.
type BackoffDialPolicy ¶
type BackoffDialPolicy struct {
Dialer DialUnderlayFactory
Backoff BackoffConfig
// contains filtered or unexported fields
}
BackoffDialPolicy wraps a DialUnderlayFactory with exponential backoff retry logic. Failed dials and short-lived connections (reported via UnderlayClosed) count as failures; a connection that closes after a stable lifetime, or one that has demonstrably outlived MinStableDuration, resets the failure count. Accounting is intentionally global for the policy because all underlays dial the same destination. A stable underlay is evidence that the destination is not globally flapping.
func NewBackoffDialPolicy ¶
func NewBackoffDialPolicy(dialer DialUnderlayFactory) *BackoffDialPolicy
NewBackoffDialPolicy creates a BackoffDialPolicy with default configuration.
func NewBackoffDialPolicyWithConfig ¶
func NewBackoffDialPolicyWithConfig(dialer DialUnderlayFactory, config BackoffConfig) *BackoffDialPolicy
NewBackoffDialPolicyWithConfig creates a BackoffDialPolicy with the given configuration.
func (*BackoffDialPolicy) ConsecutiveFailures ¶
func (self *BackoffDialPolicy) ConsecutiveFailures() uint32
ConsecutiveFailures returns the current consecutive failure count.
func (*BackoffDialPolicy) Dial ¶
func (self *BackoffDialPolicy) Dial(underlayType string, connectionId string, groupSecret []byte, isFirst bool, connectTimeout time.Duration, cancel <-chan struct{}) (Underlay, error)
Dial attempts to create a new underlay, applying exponential backoff if previous attempts failed. When isFirst is true the dial (re)establishes the group: it advances the iteration, derives a fresh iteration-suffixed connection id, and sets the IsFirstGroupConnection header so the remote MultiListener creates a new channel. A fresh id avoids attaching to a still-closing channel of the prior iteration during a loss/reconnect race. Subsequent (isFirst == false) dials reuse the current iteration id with no header, so they attach to the established group.
func (*BackoffDialPolicy) LastDialTime ¶
func (self *BackoffDialPolicy) LastDialTime() time.Time
LastDialTime returns the time of the most recent dial attempt.
func (*BackoffDialPolicy) UnderlayClosed ¶ added in v5.0.7
func (self *BackoffDialPolicy) UnderlayClosed(underlayType string, lifetime time.Duration)
UnderlayClosed records the lifetime of a closed underlay. A short-lived underlay (lifetime < MinStableDuration) counts as a failure for backoff purposes; a stable one resets the failure count.
type BaseSendListener ¶
type BaseSendListener struct{}
BaseSendListener is a type that may be used to provide default methods for SendListener implementation
func (BaseSendListener) NotifyAfterWrite ¶
func (BaseSendListener) NotifyAfterWrite()
func (BaseSendListener) NotifyBeforeWrite ¶
func (BaseSendListener) NotifyBeforeWrite()
func (BaseSendListener) NotifyErr ¶
func (BaseSendListener) NotifyErr(error)
func (BaseSendListener) NotifyQueued ¶
func (BaseSendListener) NotifyQueued()
type BaseSendable ¶
type BaseSendable struct{}
BaseSendable is a type that may be used to provide default methods for Sendable implementation
func (BaseSendable) Context ¶
func (BaseSendable) Context() context.Context
func (BaseSendable) Msg ¶
func (BaseSendable) Msg() *Message
func (BaseSendable) ReplyReceiver ¶
func (BaseSendable) ReplyReceiver() ReplyReceiver
func (BaseSendable) SendListener ¶
func (BaseSendable) SendListener() SendListener
type BindHandler ¶
BindHandler is a handler that configures a Channel via its Binding during setup.
func BindHandlers ¶
func BindHandlers(handlers ...BindHandler) BindHandler
BindHandlers combines multiple BindHandlers into one, running them sequentially. Returns the first error encountered, or nil if all succeed.
type BindHandlerF ¶
BindHandlerF is the function form of BindHandler.
func (BindHandlerF) BindChannel ¶
func (f BindHandlerF) BindChannel(binding Binding) error
type Binder ¶ added in v5.0.6
type Binder struct {
// contains filtered or unexported fields
}
Binder is an installed bind handler, produced by MakeBinder or MakeTypedBinder and supplied to a channel via Config.Binder. It is opaque: consumers obtain one from the factory functions rather than constructing it directly.
func MakeBinder ¶
func MakeBinder(handler BindHandler) Binder
MakeBinder creates a Binder from a BindHandler.
func MakeTypedBinder ¶
func MakeTypedBinder[S Senders](senders S, handler TypedBindHandler[S]) Binder
MakeTypedBinder creates a Binder from a TypedBindHandler and senders.
type Binding ¶
type Binding interface {
AddReceiveHandler(contentType int32, h ReceiveHandler)
AddReceiveHandlerF(contentType int32, h ReceiveHandlerF)
AddMsgReceiveHandler(contentType int32, h MsgReceiveHandler)
AddMsgReceiveHandlerF(contentType int32, h MsgReceiveHandlerF)
AddPeekHandler(h PeekHandler)
AddTransformHandler(h TransformHandler)
AddErrorHandler(h ErrorHandler)
AddCloseHandler(h CloseHandler)
SetUserData(data any)
GetUserData() any
GetChannel() Channel
}
Binding is used to add handlers to Channel.
NOTE: It is intended that the Add* methods are used at initial channel setup, and not invoked on an in-service Channel. The Binding should not be retained once the channel setup is complete
type Channel ¶
type Channel interface {
Identity
SetLogicalName(logicalName string)
Sender
UnderlayAcceptor
io.Closer
IsClosed() bool
Underlay() Underlay
Headers() map[int32][]byte
GetTimeSinceLastRead() time.Duration
GetUserData() interface{}
// GetSenders returns the Senders for this channel. Consumers that need access to
// priority-specific senders can type-assert the result to their concrete type:
//
// mySenders := ch.GetSenders().(*MyCustomSenders)
// mySenders.GetHighPrioritySender().Send(msg)
//
// This replaces v4's MultiChannel.GetUnderlayHandler() pattern.
GetSenders() Senders
GetUnderlays() []Underlay
GetUnderlayCountsByType() map[string]int
}
Channel represents an asynchronous, message-passing framework, designed to sit on top of an underlay.
func NewChannel ¶
NewChannel creates a multi-underlay channel from the given configuration. The config must include Senders, a MessageSourceProvider, and an initial Underlay. An optional Binder is called to register handlers before the first underlay starts processing.
func NewSingleChannel ¶
func NewSingleChannel(logicalName string, underlayFactory UnderlayFactory, bindHandler BindHandler, options *Options) (Channel, error)
NewSingleChannel dials the factory and creates a simple channel with a single underlay, single sender, and the given bind handler.
func NewSingleChannelWithUnderlay ¶
func NewSingleChannelWithUnderlay(logicalName string, underlay Underlay, bindHandler BindHandler, options *Options) (Channel, error)
NewSingleChannelWithUnderlay creates a simple channel from an existing underlay, with a single sender and bind handler. Use this when you already have a connected underlay (e.g. from a listener).
type CloseHandler ¶
type CloseHandler interface {
HandleClose(ch Channel)
}
CloseHandler is notified when a channel closes.
type CloseHandlerF ¶
type CloseHandlerF func(ch Channel)
CloseHandlerF is the function form of CloseHandler.
func (CloseHandlerF) HandleClose ¶
func (self CloseHandlerF) HandleClose(ch Channel)
type CloseNotifier ¶
type CloseNotifier struct {
// contains filtered or unexported fields
}
CloseNotifier provides a one-shot close signal. Calling NotifyClosed closes the internal channel, unblocking any goroutines waiting on GetCloseNotify.
func NewCloseNotifier ¶
func NewCloseNotifier() *CloseNotifier
NewCloseNotifier creates a new CloseNotifier.
func (*CloseNotifier) GetCloseNotify ¶
func (self *CloseNotifier) GetCloseNotify() <-chan struct{}
func (*CloseNotifier) NotifyClosed ¶
func (self *CloseNotifier) NotifyClosed()
type ClosedError ¶
type ClosedError struct{}
ClosedError indicates an operation was attempted on a closed channel.
func (ClosedError) Error ¶
func (ClosedError) Error() string
type Config ¶
type Config struct {
LogicalName string
Options *Options
Binder Binder
Underlay Underlay
InjectUnderlayTypeIntoMessages bool
// ValidUnderlayTypes lists the recognized underlay type strings for this channel.
// Incoming underlays with types not in this list are mapped to DefaultUnderlayType.
// If nil, any type string is accepted as-is.
ValidUnderlayTypes []string
Senders Senders
MessageSourceProvider MessageSourceProvider
DialPolicy DialPolicy
Constraints map[string]UnderlayConstraint
MinTotalUnderlays int
// ConstraintStartupDelay delays the first constraint check after channel creation.
// Useful when the initial underlay needs time to stabilize before additional
// underlays are dialed.
ConstraintStartupDelay time.Duration
// UnderlayEventListeners are notified when underlays are added or removed.
// Use this to react to underlay changes (e.g., tracking connection counts,
// updating "has dedicated underlay" flags, calling change callbacks).
UnderlayEventListeners []UnderlayEventListener
}
Config holds all the parameters needed to create a new Channel.
type ConnectOptions ¶
type ConnectOptions struct {
MaxQueuedConnects int
MaxOutstandingConnects int
ConnectTimeout time.Duration
}
ConnectOptions controls connection acceptance limits and timeouts.
func DefaultConnectOptions ¶
func DefaultConnectOptions() ConnectOptions
DefaultConnectOptions returns ConnectOptions with sensible defaults.
func (*ConnectOptions) Validate ¶
func (co *ConnectOptions) Validate() error
Validate checks that all connect options are within acceptable bounds.
type ConnectionHandler ¶
type ConnectionHandler interface {
HandleConnection(hello *Hello, certificates []*x509.Certificate) error
}
ConnectionHandler handles new incoming connections during the hello/handshake phase.
type ContentTypeReceiver ¶ added in v5.0.9
type ContentTypeReceiver interface {
ContentType() int32
ReceiveHandler
}
ContentTypeReceiver is a receive handler which reports the content type it handles. It allows handlers to be registered without separately specifying the content type, using AddReceiveHandlers.
type DatagramMessageStrategy ¶
type DatagramMessageStrategy PacketMessageProducer
DatagramMessageStrategy is a MessageStrategy that uses MarshalV2WithRaw for marshalling and a custom PacketMessageProducer for reading.
func (DatagramMessageStrategy) GetMarshaller ¶
func (self DatagramMessageStrategy) GetMarshaller() MessageMarshaller
func (DatagramMessageStrategy) GetPacketProducer ¶
func (self DatagramMessageStrategy) GetPacketProducer() PacketMessageProducer
func (DatagramMessageStrategy) GetStreamProducer ¶
func (self DatagramMessageStrategy) GetStreamProducer() StreamMessageProducer
type DatagramUnderlay ¶
type DatagramUnderlay struct {
// contains filtered or unexported fields
}
DatagramUnderlay is an Underlay implementation for packet-oriented (datagram) transports like DTLS.
func (*DatagramUnderlay) Certificates ¶
func (self *DatagramUnderlay) Certificates() []*x509.Certificate
func (*DatagramUnderlay) Close ¶
func (self *DatagramUnderlay) Close() error
func (*DatagramUnderlay) ConnectionId ¶
func (self *DatagramUnderlay) ConnectionId() string
func (*DatagramUnderlay) CreatedAt ¶ added in v5.0.7
func (self *DatagramUnderlay) CreatedAt() time.Time
func (*DatagramUnderlay) GetLocalAddr ¶
func (self *DatagramUnderlay) GetLocalAddr() net.Addr
func (*DatagramUnderlay) GetRemoteAddr ¶
func (self *DatagramUnderlay) GetRemoteAddr() net.Addr
func (*DatagramUnderlay) Headers ¶
func (self *DatagramUnderlay) Headers() map[int32][]byte
func (*DatagramUnderlay) Id ¶
func (self *DatagramUnderlay) Id() string
func (*DatagramUnderlay) IsClosed ¶
func (self *DatagramUnderlay) IsClosed() bool
func (*DatagramUnderlay) Label ¶
func (self *DatagramUnderlay) Label() string
func (*DatagramUnderlay) LogicalName ¶
func (self *DatagramUnderlay) LogicalName() string
func (*DatagramUnderlay) Rx ¶
func (self *DatagramUnderlay) Rx() (*Message, error)
func (*DatagramUnderlay) SetWriteDeadline ¶
func (self *DatagramUnderlay) SetWriteDeadline(deadline time.Time) error
func (*DatagramUnderlay) SetWriteTimeout ¶
func (self *DatagramUnderlay) SetWriteTimeout(duration time.Duration) error
func (*DatagramUnderlay) Tx ¶
func (self *DatagramUnderlay) Tx(m *Message) error
type Decoder ¶
type Decoder struct{}
Decoder decodes built-in channel message types (hello, ping, result, latency, heartbeat) for tracing.
type DialPolicy ¶
type DialPolicy interface {
Dial(underlayType string, connectionId string, groupSecret []byte, isFirst bool, connectTimeout time.Duration, cancel <-chan struct{}) (Underlay, error)
UnderlayClosed(underlayType string, lifetime time.Duration)
}
DialPolicy controls how additional underlays are dialed for a multi-underlay channel. The channel passes all necessary context directly, so implementations are self-contained.
isFirst is true when the dial is (re)establishing the group's first underlay - either the initial connection or a reconnect after all underlays were lost. A first connection must set the IsFirstGroupConnection header so the remote MultiListener creates a new channel rather than rejecting the underlay; subsequent underlays attach to the existing group.
UnderlayClosed is invoked by the channel whenever an underlay is removed, with how long the underlay lived. This lets the policy judge connection stability directly, e.g. for flap detection, rather than inferring it from dial timing.
type DialUnderlayFactory ¶
type DialUnderlayFactory interface {
UnderlayFactory
CreateWithHeaders(timeout time.Duration, headers map[int32][]byte) (Underlay, error)
}
DialUnderlayFactory extends UnderlayFactory with the ability to pass custom headers during creation. Used by DialPolicy to include connection metadata (type, connection ID, group secret) when dialing.
func NewClassicDialer ¶
func NewClassicDialer(cfg DialerConfig) DialUnderlayFactory
NewClassicDialer creates a DialUnderlayFactory that dials using the classic transport protocol.
type DialerConfig ¶
type DialerConfig struct {
Identity *identity.TokenId
Endpoint transport.Address
LocalBinding string
Headers map[int32][]byte
MessageStrategy MessageStrategy
TransportConfig transport.Configuration
// HelloHeaderProvider, if set, is invoked after the transport connection is established and
// before the hello is sent, to compute additional hello headers from the peer's certificates.
HelloHeaderProvider HelloHeaderProvider
}
DialerConfig holds configuration for creating a classic dialer.
type Envelope ¶
type Envelope interface {
// WithTimeout returns a TimeoutEnvelope with a context using the given timeout
WithTimeout(duration time.Duration) TimeoutEnvelope
// WithContext returns a TimeoutEnvelope which uses the given context for timeout/cancellation
WithContext(c context.Context) TimeoutEnvelope
// Send sends the envelope on the given Channel
Send(sender Sender) error
// ReplyTo allows setting the reply header in a fluent style
ReplyTo(msg *Message) Envelope
// ToSendable converts the Envelope into a Sendable, which can be submitted to a Channel for sending
ToSendable() Sendable
}
Envelope allows setting message context and timeouts. Message is an Envelope (as well as a Sendable)
func NewErrorEnvelope ¶
type ErrorHandler ¶
ErrorHandler handles errors that occur during channel operations.
type ErrorHandlerF ¶
ErrorHandlerF is the function form of ErrorHandler.
func (ErrorHandlerF) HandleError ¶
func (self ErrorHandlerF) HandleError(err error, ch Channel)
type Factory ¶
Factory creates a new multi-underlay Channel from the first incoming underlay. The closeCallback should be called when the channel is closed to remove it from the listener.
type Headers ¶
Headers is a map of integer keys to byte-slice values, used for message metadata.
func (Headers) GetStringSliceHeader ¶
func (Headers) GetStringToStringMapHeader ¶
func (Headers) GetU32ToBytesMapHeader ¶
func (Headers) PutBoolHeader ¶
func (Headers) PutByteHeader ¶
func (Headers) PutStringHeader ¶
func (Headers) PutStringSliceHeader ¶
func (Headers) PutStringToStringMapHeader ¶
func (Headers) PutU32ToBytesMapHeader ¶
func (Headers) PutUint16Header ¶
func (Headers) PutUint32Header ¶
func (Headers) PutUint64Header ¶
type HeartbeatCallback ¶
type HeartbeatCallback interface {
HeartbeatTx(ts int64)
HeartbeatRx(ts int64)
HeartbeatRespTx(ts int64)
HeartbeatRespRx(ts int64)
CheckHeartBeat()
}
HeartbeatCallback provide an interface that is notified when various heartbeat events take place
type HeartbeatControl ¶ added in v5.0.11
type HeartbeatControl interface {
// UpdateIntervals changes the heartbeat send and check intervals. The send
// interval takes effect on the next outbound message; the check interval on
// the next pulse of the heartbeat loop.
UpdateIntervals(sendInterval, checkInterval time.Duration)
}
HeartbeatControl retunes the heartbeat timing of a running channel. It is returned by ConfigureHeartbeat so a caller can change the send and check intervals without rebuilding the channel, for example when applying updated configuration to an established link. UpdateIntervals is meant to be called from a single goroutine per channel.
func ConfigureHeartbeat ¶
func ConfigureHeartbeat(binding Binding, heartbeatInterval time.Duration, checkInterval time.Duration, cb HeartbeatCallback) HeartbeatControl
ConfigureHeartbeat setups up heartbeats on the given channel. It assumes that an equivalent setup happens on the other side of the channel.
When possible, heartbeats will be sent on existing traffic. When a heartbeat is due to be sent, the next message sent will include a heartbeat header. If no message is sent by the time the checker runs on checkInterval, a standalone heartbeat message will be sent.
Similarly, when a message with a heartbeat header is received, the next sent message will have a header set with the heartbeat response. If no message is sent within a few milliseconds, a standalone heartbeat response will be sent
The returned HeartbeatControl can be used to retune the send and check intervals later without rebuilding the channel.
type HeartbeatOptions ¶
type HeartbeatOptions struct {
SendInterval time.Duration `json:"sendInterval"`
CheckInterval time.Duration `json:"checkInterval"`
CloseUnresponsiveTimeout time.Duration `json:"closeUnresponsiveTimeout"`
// contains filtered or unexported fields
}
HeartbeatOptions configures heartbeat send interval, check interval, and unresponsive timeout.
func DefaultHeartbeatOptions ¶
func DefaultHeartbeatOptions() *HeartbeatOptions
DefaultHeartbeatOptions returns HeartbeatOptions with sensible defaults.
func LoadHeartbeatOptions ¶
func LoadHeartbeatOptions(data map[interface{}]interface{}) (*HeartbeatOptions, error)
LoadHeartbeatOptions parses HeartbeatOptions from a configuration map.
func (*HeartbeatOptions) GetDuration ¶
func (self *HeartbeatOptions) GetDuration(name string) (*time.Duration, error)
GetDuration parses a named duration value from the source configuration map.
type Hello ¶
Hello is the initial handshake message exchanged when a connection is established.
func UnmarshalHello ¶
UnmarshalHello extracts a Hello from a received Message.
type HelloAcceptor ¶ added in v5.0.11
HelloAcceptor takes ownership of an underlay whose hello has been received but not yet acknowledged. Implementations must call ackHello exactly once before using the underlay for application traffic, and must close the underlay if they do not accept it (including when ackHello returns an error).
Deferring the ack lets an acceptor record state keyed off the underlay before the peer is released by the acknowledgement. MultiListener uses this to register a group before acking, so a racing subsequent underlay for the same group cannot reach the listener before the group is known.
func AsHelloAcceptor ¶ added in v5.0.11
func AsHelloAcceptor(acceptor UnderlayAcceptor) HelloAcceptor
AsHelloAcceptor adapts a legacy UnderlayAcceptor (which receives an already-acknowledged underlay) into a HelloAcceptor by acknowledging the hello before handing the underlay off. Use it for single-underlay acceptors, which don't need to reserve state before the ack; multi-underlay acceptors should implement HelloAcceptor directly.
type HelloAcceptorF ¶ added in v5.0.11
type HelloAcceptorF func(underlay Underlay)
HelloAcceptorF adapts a plain accept function into a HelloAcceptor that acknowledges the hello and then hands off the underlay: the original single-phase behavior, with no reservation before the ack.
func (HelloAcceptorF) AcceptUnderlay ¶ added in v5.0.11
func (f HelloAcceptorF) AcceptUnderlay(underlay Underlay, ackHello func() error)
AcceptUnderlay acknowledges the hello and, on success, passes the underlay to the wrapped function. If the acknowledgement fails the underlay is closed.
type HelloHeaderProvider ¶ added in v5.0.13
type HelloHeaderProvider func(peerCertificates []*x509.Certificate, headers map[int32][]byte) error
HelloHeaderProvider adjusts the hello headers based on the peer's certificates. It is invoked after the transport connection is established (so the peer's certificates are available) but before the hello is sent, letting a dialer set hello headers that depend on the peer's verified identity. headers contains the headers already set on the hello (statically-configured and per-dial); the provider may add, modify or remove entries directly. Entries should be replaced rather than having their value slices edited in place, as values may be shared with the dialer's configuration. Returning an error aborts the dial, and that error is treated as non-retryable: the dialer does not open another connection to retry, since a provider that returns an error has made a deliberate decision about this specific peer.
type Identity ¶
type Identity interface {
// The Id used to represent the identity of this channel to lower-level resources.
//
Id() string
// The LogicalName represents the purpose or usage of this channel (i.e. 'ctrl', 'mgmt' 'r/001', etc.) Usually used
// by humans in understand the logical purpose of a channel.
//
LogicalName() string
// The ConnectionId represents the identity of this Channel to internal API components ("instance identifier").
// Usually used by the Channel framework to differentiate Channel instances.
//
ConnectionId() string
// Certificates contains the identity certificates provided by the peer.
//
Certificates() []*x509.Certificate
// Label constructs a consistently-formatted string used for context logging purposes, from the components above.
//
Label() string
}
Identity exposes the identifying information of a channel to callers and lower-level resources.
type ListenerConfig ¶
type ListenerConfig struct {
ConnectOptions
Headers map[int32][]byte
HeadersF func() map[int32][]byte
TransportConfig transport.Configuration
PoolConfigurator func(config *goroutines.PoolConfig)
ConnectionHandlers []ConnectionHandler
MessageStrategy MessageStrategy
}
ListenerConfig holds configuration for creating a classic listener.
func DefaultListenerConfig ¶
func DefaultListenerConfig() ListenerConfig
DefaultListenerConfig returns a ListenerConfig with sensible defaults.
type Message ¶
type Message struct {
MessageHeader
Body []byte
}
Message is a content-typed, sequenced message with headers and a body, suitable for wire transmission.
func NewMessage ¶
NewMessage creates a new Message with the given content type and body.
func NewResult ¶
NewResult creates a result Message indicating success or failure with a text message.
func (*Message) ReplyReceiver ¶
func (m *Message) ReplyReceiver() ReplyReceiver
func (*Message) SendListener ¶
func (m *Message) SendListener() SendListener
func (*Message) SetSequence ¶
func (*Message) ToSendable ¶
func (*Message) WithContext ¶
func (m *Message) WithContext(c context.Context) TimeoutEnvelope
func (*Message) WithTimeout ¶
func (m *Message) WithTimeout(duration time.Duration) TimeoutEnvelope
type MessageHeader ¶
type MessageHeader struct {
ContentType int32
Headers Headers
// contains filtered or unexported fields
}
MessageHeader contains the metadata for a channel message: content type, sequence, and headers.
func (*MessageHeader) GetBoolHeader ¶
func (header *MessageHeader) GetBoolHeader(key int32) (bool, bool)
func (*MessageHeader) GetByteHeader ¶
func (header *MessageHeader) GetByteHeader(key int32) (byte, bool)
func (*MessageHeader) GetStringHeader ¶
func (header *MessageHeader) GetStringHeader(key int32) (string, bool)
func (*MessageHeader) GetStringSliceHeader ¶
func (header *MessageHeader) GetStringSliceHeader(key int32) ([]string, bool, error)
func (*MessageHeader) GetStringToStringMapHeader ¶
func (*MessageHeader) GetU32ToBytesMapHeader ¶
func (*MessageHeader) GetUint16Header ¶
func (header *MessageHeader) GetUint16Header(key int32) (uint16, bool)
func (*MessageHeader) GetUint32Header ¶
func (header *MessageHeader) GetUint32Header(key int32) (uint32, bool)
func (*MessageHeader) GetUint64Header ¶
func (header *MessageHeader) GetUint64Header(key int32) (uint64, bool)
func (*MessageHeader) IsReply ¶
func (header *MessageHeader) IsReply() bool
func (*MessageHeader) IsReplyingTo ¶
func (header *MessageHeader) IsReplyingTo(sequence int32) bool
func (*MessageHeader) PutBoolHeader ¶
func (header *MessageHeader) PutBoolHeader(key int32, value bool)
func (*MessageHeader) PutByteHeader ¶
func (header *MessageHeader) PutByteHeader(key int32, value byte)
func (*MessageHeader) PutStringHeader ¶
func (header *MessageHeader) PutStringHeader(key int32, value string)
func (*MessageHeader) PutStringSliceHeader ¶
func (header *MessageHeader) PutStringSliceHeader(key int32, s []string)
func (*MessageHeader) PutStringToStringMapHeader ¶
func (header *MessageHeader) PutStringToStringMapHeader(key int32, m map[string]string)
func (*MessageHeader) PutU32ToBytesMapHeader ¶
func (header *MessageHeader) PutU32ToBytesMapHeader(key int32, m map[uint32][]byte)
func (*MessageHeader) PutUint16Header ¶
func (header *MessageHeader) PutUint16Header(key int32, value uint16)
func (*MessageHeader) PutUint32Header ¶
func (header *MessageHeader) PutUint32Header(key int32, value uint32)
func (*MessageHeader) PutUint64Header ¶
func (header *MessageHeader) PutUint64Header(key int32, value uint64)
func (*MessageHeader) ReplyFor ¶
func (header *MessageHeader) ReplyFor() int32
func (*MessageHeader) Sequence ¶
func (header *MessageHeader) Sequence() int32
type MessageMarshaller ¶
MessageMarshaller serializes a Message into bytes for transmission.
type MessageQueue ¶
MessageQueue pairs a Sender with its backing channel. Use this to create priority-based message routing: create one queue per priority level, then wire them to message sources using MakeSingleQueueMessageSource/2/3.
func NewMessageQueue ¶
func NewMessageQueue(ctx SenderContext, size int) *MessageQueue
NewMessageQueue creates a MessageQueue with the given buffer size.
type MessageSourceF ¶
type MessageSourceF func(notifier *CloseNotifier) (Sendable, error)
MessageSourceF is a function that returns the next message to send on an underlay. It blocks until a message is available or the notifier signals closure.
func MakeSingleQueueMessageSource ¶
func MakeSingleQueueMessageSource(closeNotify <-chan struct{}, q1 <-chan Sendable) MessageSourceF
MakeSingleQueueMessageSource returns a MessageSourceF that reads from one queue.
func MakeThreeQueueMessageSource ¶
func MakeThreeQueueMessageSource(closeNotify <-chan struct{}, q1, q2, q3 <-chan Sendable) MessageSourceF
MakeThreeQueueMessageSource returns a MessageSourceF that reads from three queues with equal priority.
func MakeTwoQueueMessageSource ¶
func MakeTwoQueueMessageSource(closeNotify <-chan struct{}, q1, q2 <-chan Sendable) MessageSourceF
MakeTwoQueueMessageSource returns a MessageSourceF that reads from two queues with equal priority.
type MessageSourceProvider ¶
type MessageSourceProvider interface {
GetMessageSource(underlayType string) MessageSourceF
}
MessageSourceProvider returns the message source for a given underlay type.
type MessageStrategy ¶
type MessageStrategy interface {
GetMarshaller() MessageMarshaller
GetStreamProducer() StreamMessageProducer
GetPacketProducer() PacketMessageProducer
}
MessageStrategy provides pluggable message serialization and deserialization.
type MsgReceiveHandler ¶
type MsgReceiveHandler interface {
HandleReceive(m *Message)
}
MsgReceiveHandler handles received messages without channel context.
type MsgReceiveHandlerF ¶
type MsgReceiveHandlerF func(m *Message)
MsgReceiveHandlerF is the function form of MsgReceiveHandler.
func (MsgReceiveHandlerF) HandleReceive ¶
func (self MsgReceiveHandlerF) HandleReceive(m *Message)
type MultiChannelOptions ¶
type MultiChannelOptions struct {
ConnectOptions
WriteTimeout time.Duration
MessageStrategy MessageStrategy
}
MultiChannelOptions configures multi-underlay channels with connect, write, and message options.
type MultiListener ¶
type MultiListener struct {
// contains filtered or unexported fields
}
MultiListener routes incoming underlays to existing channels or creates new ones. Grouped underlays are matched by connection ID; ungrouped ones are passed to the fallback.
func NewMultiListener ¶
func NewMultiListener(channelF Factory, fallback UngroupedChannelFallback) *MultiListener
NewMultiListener creates a MultiListener with the given channel factory and ungrouped fallback.
func (*MultiListener) AcceptUnderlay ¶
func (self *MultiListener) AcceptUnderlay(underlay Underlay, ackHello func() error)
AcceptUnderlay routes an incoming underlay to an existing channel or creates a new one. It implements HelloAcceptor: for a grouped first connection it registers the group (reserving its id) before acknowledging the hello, so the ack - which releases the dialer to dial subsequent underlays - cannot precede the group being known. A subsequent underlay therefore always finds either the channel or a create-in-progress notifier and attaches, rather than racing group creation and being rejected.
func (*MultiListener) CloseChannel ¶
func (self *MultiListener) CloseChannel(chId string)
CloseChannel removes the channel with the given ID from the listener's map.
type NonRetryableError ¶ added in v5.0.13
type NonRetryableError struct {
// contains filtered or unexported fields
}
NonRetryableError marks an error as a deliberate, final dial failure that the dialer must not retry internally (in contrast to a protocol-version negotiation error, which is retried).
func (*NonRetryableError) Error ¶ added in v5.0.13
func (self *NonRetryableError) Error() string
func (*NonRetryableError) Unwrap ¶ added in v5.0.13
func (self *NonRetryableError) Unwrap() error
type Options ¶
type Options struct {
OutQueueSize int
ConnectOptions
WriteTimeout time.Duration
MessageStrategy MessageStrategy
// Logger, when set, is the slog.Logger this channel uses for its lifecycle
// events. It takes precedence over the resolver installed via SetLoggerFor,
// letting an owner give each channel (or channel type) its own logger and level
// control - e.g. an SDK context injecting its logger, or a router naming
// its link vs ctrl channels distinctly. When nil, the channel falls back
// to the SetLoggerFor resolver and then to the pfxlog default.
Logger *slog.Logger `json:"-"`
}
Options configures channel behavior including queue sizes, timeouts, and message strategy.
func DefaultOptions ¶
func DefaultOptions() *Options
DefaultOptions returns Options with sensible defaults.
func LoadOptions ¶
LoadOptions parses channel Options from a configuration map, starting from DefaultOptions.
type PacketMessageProducer ¶
PacketMessageProducer reads a Message from a packet buffer.
type PeekHandler ¶
type PeekHandler interface {
Connect(ch Channel, remoteAddress string)
Rx(m *Message, ch Channel)
Tx(m *Message, ch Channel)
Close(ch Channel)
}
PeekHandler observes messages as they flow through the channel without modifying them.
type ReadAdapter ¶
type ReadAdapter struct {
// contains filtered or unexported fields
}
ReadAdapter bridges push-based data delivery into an io.Reader interface with deadline support.
func NewReadAdapter ¶
func NewReadAdapter(label string, channelDepth int) *ReadAdapter
NewReadAdapter creates a ReadAdapter with the given label and internal buffer depth.
func (*ReadAdapter) Close ¶
func (self *ReadAdapter) Close()
func (*ReadAdapter) GetNext ¶
func (self *ReadAdapter) GetNext() ([]byte, error)
GetNext blocks until data is available, the deadline expires, or the adapter is closed.
func (*ReadAdapter) PushData ¶
func (self *ReadAdapter) PushData(data []byte) error
PushData enqueues data to be read. Blocks until the data is accepted or the adapter is closed.
func (*ReadAdapter) SetReadDeadline ¶
func (self *ReadAdapter) SetReadDeadline(deadline time.Time) error
SetReadDeadline sets the deadline for the next GetNext/Read call. A zero value clears the deadline.
type ReadTimout ¶
type ReadTimout struct{}
ReadTimout is returned when a read operation exceeds its deadline. Implements net.Error.
func (ReadTimout) Error ¶
func (r ReadTimout) Error() string
func (ReadTimout) Temporary ¶
func (r ReadTimout) Temporary() bool
func (ReadTimout) Timeout ¶
func (r ReadTimout) Timeout() bool
type ReceiveHandler ¶
ReceiveHandler handles received messages for a specific content type.
type ReceiveHandlerF ¶
ReceiveHandlerF is the function form of ReceiveHandler.
func (ReceiveHandlerF) HandleReceive ¶
func (self ReceiveHandlerF) HandleReceive(m *Message, ch Channel)
type ReconnectingDialerConfig ¶
type ReconnectingDialerConfig struct {
Identity *identity.TokenId
Endpoint transport.Address
LocalBinding string
Headers map[int32][]byte
TransportConfig transport.Configuration
ReconnectHandler func()
DisconnectHandler func()
}
ReconnectingDialerConfig holds configuration for creating a reconnecting dialer.
type ReplyReceiver ¶
type ReplyReceiver interface {
AcceptReply(*Message)
}
ReplyReceiver is used to get notified when a Message reply arrives
type Result ¶
Result is a parsed result message containing a success flag and text.
func UnmarshalResult ¶
UnmarshalResult extracts a Result from a received Message.
type SendListener ¶
type SendListener interface {
// NotifyQueued is called when the message has been queued for send
NotifyQueued()
// NotifyBeforeWrite is called before send is called
NotifyBeforeWrite()
// NotifyAfterWrite is called after the message has been written to the Underlay
NotifyAfterWrite()
// NotifyErr is called if the Sendable context errors before send or if writing to the Underlay fails
NotifyErr(error)
}
SendListener is notified at the various stages of a message send
type Sendable ¶
type Sendable interface {
// Msg returns the Message to send
Msg() *Message
// SetSequence sets a sequence number indicating in which order the message was received
SetSequence(seq int32)
// Sequence returns the sequence number
Sequence() int32
// Context returns the Context used for timeouts/cancelling message sends, etc
Context() context.Context
// SendListener returns the SendListener to invoke at each stage of the send operation
SendListener() SendListener
// ReplyReceiver returns the ReplyReceiver to be invoked when a reply for the message or received, or nil if
// no ReplyReceiver should be invoked if or when a reply is received
ReplyReceiver() ReplyReceiver
}
Sendable encapsulates all the data and callbacks that a Channel requires when sending a Message.
type Sender ¶
type Sender interface {
// Send will send the given Sendable. If the Sender is busy, it will wait until either the Sender
// can process the Sendable, the channel is closed or the associated context.Context times out
Send(s Sendable) error
// TrySend will send the given Sendable. If the Sender is busy (outgoing message queue is full), it will return
// immediately rather than wait. The boolean return indicates whether the message was queued or not
TrySend(s Sendable) (bool, error)
// CloseNotify returns a channel that is closed when the sender is closed
CloseNotify() <-chan struct{}
}
Sender sends messages on a channel. It supports both blocking and non-blocking sends.
func NewSingleChSender ¶
func NewSingleChSender(ctx SenderContext, msgC chan<- Sendable) Sender
NewSingleChSender creates a Sender that writes to a single Go channel.
type SenderContext ¶
type SenderContext interface {
NextSequence() int32
GetCloseNotify() chan struct{}
}
SenderContext provides sequence numbering and close notification for senders.
func NewSenderContext ¶
func NewSenderContext() SenderContext
NewSenderContext creates a new SenderContext with its own sequence counter and close channel.
type Senders ¶
type Senders interface {
SenderContext
// GetDefaultSender returns the default sender for the channel.
GetDefaultSender() Sender
// HandleTxFailed is called when an underlay write fails.
// Returns true if the message was requeued for retry.
HandleTxFailed(underlayType string, sendable Sendable) bool
}
Senders provides access to senders and handles transmission failures.
type SimpleMessageSourceProvider ¶
type SimpleMessageSourceProvider struct {
// contains filtered or unexported fields
}
SimpleMessageSourceProvider maps underlay types to message sources, with a default fallback.
func NewSimpleMessageSourceProvider ¶
func NewSimpleMessageSourceProvider(defaultSource MessageSourceF) *SimpleMessageSourceProvider
NewSimpleMessageSourceProvider creates a provider with the given default message source.
func (*SimpleMessageSourceProvider) AddSource ¶
func (p *SimpleMessageSourceProvider) AddSource(underlayType string, source MessageSourceF)
AddSource registers a message source for a specific underlay type.
func (*SimpleMessageSourceProvider) GetMessageSource ¶
func (p *SimpleMessageSourceProvider) GetMessageSource(underlayType string) MessageSourceF
GetMessageSource returns the message source for the given underlay type, falling back to the default.
type StreamMessageProducer ¶
StreamMessageProducer reads a Message from a stream-oriented reader.
type TimeoutEnvelope ¶
type TimeoutEnvelope interface {
Envelope
// SendAndWaitForWire will wait until the configured timeout or until the message is sent, whichever comes first
// If the timeout happens first, the context error will be returned, wrapped by a TimeoutError
SendAndWaitForWire(sender Sender) error
// SendForReply will wait until the configured timeout or until a reply is received, whichever comes first
// If the timeout happens first, the context error will be returned, wrapped by a TimeoutError
SendForReply(sender Sender) (*Message, error)
}
TimeoutEnvelope has timeout related convenience methods, such as waiting for a Message to be written to the wire or waiting for a Message reply
type TimeoutError ¶
type TimeoutError struct {
// contains filtered or unexported fields
}
TimeoutError is used to indicate a timeout happened
func (TimeoutError) Unwrap ¶
func (self TimeoutError) Unwrap() error
type TraceHandler ¶
type TraceHandler struct {
// contains filtered or unexported fields
}
TraceHandler is a PeekHandler that writes channel message traces to a file.
func NewTraceHandler ¶
func NewTraceHandler(path string, id string) (*TraceHandler, error)
NewTraceHandler creates a TraceHandler that writes trace output to the given file path.
func (*TraceHandler) AddDecoder ¶
func (h *TraceHandler) AddDecoder(decoder TraceMessageDecoder)
AddDecoder registers a message decoder for trace output formatting.
func (TraceHandler) Close ¶
func (h TraceHandler) Close(ch Channel)
func (*TraceHandler) Connect ¶
func (h *TraceHandler) Connect(ch Channel, remoteAddress string)
func (TraceHandler) Rx ¶
func (h TraceHandler) Rx(msg *Message, ch Channel)
func (TraceHandler) Tx ¶
func (h TraceHandler) Tx(msg *Message, ch Channel)
type TraceMessageDecode ¶
type TraceMessageDecode map[string]interface{}
TraceMessageDecode is a map of trace metadata fields, serializable to JSON.
func NewTraceMessageDecode ¶
func NewTraceMessageDecode(decoder, message string) TraceMessageDecode
NewTraceMessageDecode creates a TraceMessageDecode with the decoder and message type pre-populated.
func (TraceMessageDecode) MarshalResult ¶
func (d TraceMessageDecode) MarshalResult() ([]byte, bool)
MarshalResult serializes the trace metadata to JSON, returning (nil, true) on error.
func (TraceMessageDecode) MarshalTraceMessageDecode ¶
func (d TraceMessageDecode) MarshalTraceMessageDecode() ([]byte, error)
MarshalTraceMessageDecode serializes the trace metadata to JSON.
type TraceMessageDecoder ¶
TraceMessageDecoder decodes a Message into a JSON byte representation for tracing.
type TransformHandler ¶
TransformHandler can modify messages as they flow through the channel.
type TypeLoggingUnderlay ¶
type TypeLoggingUnderlay struct {
// contains filtered or unexported fields
}
TypeLoggingUnderlay wraps an Underlay and logs the content type of each transmitted message.
func (*TypeLoggingUnderlay) Certificates ¶
func (self *TypeLoggingUnderlay) Certificates() []*x509.Certificate
func (*TypeLoggingUnderlay) Close ¶
func (self *TypeLoggingUnderlay) Close() error
func (*TypeLoggingUnderlay) ConnectionId ¶
func (self *TypeLoggingUnderlay) ConnectionId() string
func (*TypeLoggingUnderlay) CreatedAt ¶ added in v5.0.7
func (self *TypeLoggingUnderlay) CreatedAt() time.Time
func (*TypeLoggingUnderlay) GetLocalAddr ¶
func (self *TypeLoggingUnderlay) GetLocalAddr() net.Addr
func (*TypeLoggingUnderlay) GetRemoteAddr ¶
func (self *TypeLoggingUnderlay) GetRemoteAddr() net.Addr
func (*TypeLoggingUnderlay) Headers ¶
func (self *TypeLoggingUnderlay) Headers() map[int32][]byte
func (*TypeLoggingUnderlay) Id ¶
func (self *TypeLoggingUnderlay) Id() string
func (*TypeLoggingUnderlay) IsClosed ¶
func (self *TypeLoggingUnderlay) IsClosed() bool
func (*TypeLoggingUnderlay) Label ¶
func (self *TypeLoggingUnderlay) Label() string
func (*TypeLoggingUnderlay) LogicalName ¶
func (self *TypeLoggingUnderlay) LogicalName() string
func (*TypeLoggingUnderlay) Rx ¶
func (self *TypeLoggingUnderlay) Rx() (*Message, error)
func (*TypeLoggingUnderlay) SetWriteDeadline ¶
func (self *TypeLoggingUnderlay) SetWriteDeadline(time time.Time) error
func (*TypeLoggingUnderlay) SetWriteTimeout ¶
func (self *TypeLoggingUnderlay) SetWriteTimeout(duration time.Duration) error
func (*TypeLoggingUnderlay) Tx ¶
func (self *TypeLoggingUnderlay) Tx(m *Message) error
type TypeRoutingAcceptor ¶ added in v5.0.11
type TypeRoutingAcceptor struct {
Acceptors map[string]HelloAcceptor
DefaultAcceptor HelloAcceptor
}
TypeRoutingAcceptor is a HelloAcceptor that routes each underlay to a per-type HelloAcceptor based on its TypeHeader, falling back to DefaultAcceptor when the type is absent or unrecognized. It is the ack-aware analog of UnderlayDispatcher for use with NewClassicListenerWithAcceptor, letting multi-underlay acceptors reserve state before the hello is acknowledged.
func (*TypeRoutingAcceptor) AcceptUnderlay ¶ added in v5.0.11
func (self *TypeRoutingAcceptor) AcceptUnderlay(underlay Underlay, ackHello func() error)
AcceptUnderlay routes the underlay to the acceptor registered for its TypeHeader, or to the default acceptor. If no acceptor applies the underlay is closed without acking.
type TypedBindHandler ¶
type TypedBindHandler[S Senders] interface { BindChannel(binding TypedBinding[S]) error }
TypedBindHandler is a bind handler that receives a TypedBinding.
func TypedBindHandlers ¶
func TypedBindHandlers[S Senders](handlers ...TypedBindHandler[S]) TypedBindHandler[S]
TypedBindHandlers combines multiple TypedBindHandlers into one.
type TypedBindHandlerF ¶
type TypedBindHandlerF[S Senders] func(binding TypedBinding[S]) error
TypedBindHandlerF is the function form of TypedBindHandler.
func (TypedBindHandlerF[S]) BindChannel ¶
func (self TypedBindHandlerF[S]) BindChannel(binding TypedBinding[S]) error
type TypedBinding ¶
type TypedBinding[S Senders] interface { Binding GetSenders() S AddTypedReceiveHandler(contentType int32, h TypedReceiveHandler[S]) AddTypedReceiveHandlerF(contentType int32, h TypedReceiveHandlerF[S]) }
TypedBinding extends Binding with typed senders access.
type TypedReceiveHandler ¶
TypedReceiveHandler is a receive handler that gets typed senders access.
type TypedReceiveHandlerF ¶
TypedReceiveHandlerF is the function form of TypedReceiveHandler.
func (TypedReceiveHandlerF[S]) HandleReceive ¶
func (self TypedReceiveHandlerF[S]) HandleReceive(m *Message, ch Channel, senders S)
type Underlay ¶
type Underlay interface {
Rx() (*Message, error)
Tx(m *Message) error
Identity
io.Closer
IsClosed() bool
Headers() map[int32][]byte
SetWriteTimeout(duration time.Duration) error
SetWriteDeadline(time time.Time) error
GetLocalAddr() net.Addr
GetRemoteAddr() net.Addr
// CreatedAt returns the time at which the underlay was created. It is used to judge
// connection stability, e.g. for flap detection when an underlay is closed.
CreatedAt() time.Time
}
Underlay abstracts a physical communications channel, typically sitting on top of 'transport'.
type UnderlayAcceptor ¶
An UnderlayAcceptor take an Underlay and generally turns it into a channel for a specific use. It can be used when handling multiple channel types on a single listener
type UnderlayConstraint ¶
UnderlayConstraint specifies the desired and minimum number of underlays for a given type.
func OneUnderlay ¶
func OneUnderlay() UnderlayConstraint
OneUnderlay returns a constraint requiring exactly one underlay.
func OptionalUnderlays ¶
func OptionalUnderlays(desired int) UnderlayConstraint
OptionalUnderlays returns a constraint with min 0 and the given desired count.
type UnderlayDispatcher ¶
type UnderlayDispatcher struct {
// contains filtered or unexported fields
}
An UnderlayDispatcher accept underlays from an underlay listener and hands them off to UnderlayAcceptor instances, based on the TypeHeader.
func NewUnderlayDispatcher ¶
func NewUnderlayDispatcher(config UnderlayDispatcherConfig) *UnderlayDispatcher
NewUnderlayDispatcher creates a new UnderlayDispatcher from the given config.
func (*UnderlayDispatcher) Run ¶
func (self *UnderlayDispatcher) Run()
Run accepts underlays in a loop, dispatching each to the appropriate acceptor based on TypeHeader.
type UnderlayDispatcherConfig ¶
type UnderlayDispatcherConfig struct {
Listener UnderlayListener
ConnectTimeout time.Duration
Acceptors map[string]UnderlayAcceptor
DefaultAcceptor UnderlayAcceptor
}
UnderlayDispatcherConfig holds configuration for an UnderlayDispatcher.
type UnderlayEventListener ¶
type UnderlayEventListener interface {
UnderlayAdded(ch Channel, underlay Underlay)
UnderlayRemoved(ch Channel, underlay Underlay)
}
UnderlayEventListener is notified when underlays are added or removed from a channel.
type UnderlayFactory ¶
UnderlayFactory is used by Channel to obtain an Underlay instance. An underlay "dialer" or "listener" implement UnderlayFactory, to provide instances to Channel.
func NewExistingConnDialer ¶
func NewExistingConnDialer(id *identity.TokenId, peer net.Conn, headers map[int32][]byte) UnderlayFactory
NewExistingConnDialer creates an UnderlayFactory that performs a hello handshake over an existing net.Conn.
func NewExistingConnListener ¶
func NewExistingConnListener(identity *identity.TokenId, peer net.Conn, headers map[int32][]byte) UnderlayFactory
NewExistingConnListener creates an UnderlayFactory that accepts a hello handshake over an existing net.Conn.
func NewReconnectingDialer ¶
func NewReconnectingDialer(config ReconnectingDialerConfig) UnderlayFactory
NewReconnectingDialer creates an UnderlayFactory that automatically reconnects on connection loss.
type UnderlayListener ¶
type UnderlayListener interface {
Listen(handlers ...ConnectionHandler) error
UnderlayFactory
io.Closer
}
UnderlayListener represents a component designed to listen for incoming peer connections.
func NewClassicListener ¶
func NewClassicListener(identity *identity.TokenId, endpoint transport.Address, config ListenerConfig) UnderlayListener
NewClassicListener creates a classic listener that produces underlays via Create.
type Underlays ¶
type Underlays struct {
// contains filtered or unexported fields
}
Underlays manages a set of underlays with listener notification on add/remove.
func NewUnderlays ¶
func NewUnderlays() *Underlays
NewUnderlays creates a new empty Underlays collection.
func (*Underlays) AddListener ¶
func (u *Underlays) AddListener(l UnderlayEventListener)
AddListener registers a listener to be notified on underlay add/remove events.
func (*Underlays) CountsByType ¶
CountsByType returns the number of underlays for each underlay type.
type UngroupedChannelFallback ¶
UngroupedChannelFallback handles incoming underlays that are not part of a grouped connection.
type UnsupportedVersionError ¶
type UnsupportedVersionError struct {
// contains filtered or unexported fields
}
UnsupportedVersionError indicates the remote side does not support the requested protocol version.
func (UnsupportedVersionError) Error ¶
func (u UnsupportedVersionError) Error() string
Source Files
¶
- accept_dispatcher.go
- bind.go
- channel.go
- classic_dialer.go
- classic_impl.go
- classic_listener.go
- constants.go
- datagram_underlay.go
- decoder.go
- dial_policy.go
- envelope.go
- existing_conn_dialer.go
- existing_conn_impl.go
- existing_conn_listener.go
- handler.go
- heartbeater.go
- hello_acceptor.go
- impl.go
- logging.go
- message.go
- message_source.go
- messages.go
- multi_listener.go
- options.go
- read_adapter.go
- reconnecting_dialer.go
- reconnecting_impl.go
- senders.go
- trace.go
- trace_decode.go
- type_logging_underlay.go
- underlays.go
- ws_impl.go
- ws_listener.go