reconnect

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2025 License: MIT Imports: 20 Imported by: 2

README

Generic Reconnecting Network Client

The reconnect package provides a robust, generic network client with automatic reconnection capabilities for both TCP and Unix domain sockets. It implements retry mechanisms, connection lifecycle management, and configurable callbacks for handling connection events.

Overview

The reconnecting network client automatically handles connection failures by implementing configurable retry logic. It supports both TCP connections and Unix domain sockets, provides hooks for custom connection handling, session management, and error processing whilst maintaining thread safety and proper resource clean-up.

Key Features

  • Automatic Reconnection: Transparent reconnection with configurable retry strategies.
  • Multiple Network Types: Support for both TCP and Unix domain sockets.
  • Lifecycle Callbacks: Customisable hooks for socket, connect, session, disconnect, and error events.
  • Thread Safety: Built-in synchronisation for concurrent operations.
  • Timeout Management: Separate dial, read, and write timeout configuration.
  • Structured Logging: Integration with darvaza.org/slog for comprehensive logging.
  • Context Support: Full context.Context integration for cancellation and deadlines.
  • Configuration Validation: Built-in validation and default-setting mechanisms.

Basic Usage

import (
    "context"
    "net"
    "time"

    "darvaza.org/slog"
    "darvaza.org/x/net/reconnect"
)

// Get a logger instance (implementation-specific).
// This could be from any slog handler (filter, zap, etc.)
// or a custom implementation.
logger := getLogger()

// Create a configuration.
cfg := &reconnect.Config{
    Context: context.Background(),
    Logger:  logger,

    // Connection settings.
    // TCP: "host:port" or Unix: "/path/to/socket" or "unix:/path"
    Remote:       "example.com:8080",
    KeepAlive:    5 * time.Second,  // Default: 5s.
    DialTimeout:  2 * time.Second,  // Default: 2s.
    ReadTimeout:  2 * time.Second,  // Default: 2s.
    WriteTimeout: 2 * time.Second,  // Default: 2s.

    // Retry configuration.
    ReconnectDelay: time.Second,  // Delay between reconnection attempts.
}

// Create client.
client, err := reconnect.New(cfg)
if err != nil {
    logger.Fatal().Printf("Failed to create client: %v", err)
}

// Connect and start the client.
if err := client.Connect(); err != nil {
    logger.Fatal().Printf("Failed to connect: %v", err)
}

// Wait for completion.
defer func() {
    if err := client.Wait(); err != nil {
        logger.Error().Printf("Client error: %v", err)
    }
}()
Unix Domain Socket Usage

The client automatically detects Unix domain sockets based on the Remote string:

// Unix socket with explicit prefix
cfg := &reconnect.Config{
    Remote: "unix:/var/run/app.sock",
    // ... other configuration
}

// Unix socket with absolute path (auto-detected)
cfg := &reconnect.Config{
    Remote: "/var/run/app.sock",
    // ... other configuration
}

// Unix socket with .sock extension (auto-detected)
cfg := &reconnect.Config{
    Remote: "./app.sock",
    // ... other configuration
}

Advanced Configuration

The client supports extensive customisation through callback functions and options.

Connection Lifecycle Callbacks
import (
    "context"
    "net"
    "syscall"

    "darvaza.org/slog"
    "darvaza.org/x/net/reconnect"
)

// Assume logger is already obtained.
var logger slog.Logger

cfg := &reconnect.Config{
    // Called against the raw socket before connecting.
    OnSocket: func(ctx context.Context, conn syscall.RawConn) error {
        // Configure socket options.
        return nil
    },

    // Called when a connection is established.
    OnConnect: func(ctx context.Context, conn net.Conn) error {
        logger.Info().Printf("Connected to %s", conn.RemoteAddr())
        // Perform handshake or initial setup.
        return nil
    },

    // Called for each session after connection.
    OnSession: func(ctx context.Context) error {
        // Implement your protocol logic here.
        // This function should block until the session ends.
        return handleSession(ctx)
    },

    // Called when connection is about to close.
    OnDisconnect: func(ctx context.Context, conn net.Conn) error {
        logger.Info().Printf("Disconnecting from %s", conn.RemoteAddr())
        // Clean up connection-specific resources.
        return nil
    },

    // Called when errors occur.
    OnError: func(ctx context.Context, conn net.Conn, err error) error {
        logger.Error().
            WithField("error", err).
            Printf("Connection error")
        // Return nil to allow retry, non-nil to stop reconnection.
        return nil
    },
}
Retry Configuration
import (
    "context"
    "time"

    "darvaza.org/x/net/reconnect"
)

cfg := &reconnect.Config{
    // Simple constant delay between retries.
    ReconnectDelay: time.Second,

    // Custom wait function for retry logic.
    WaitReconnect: func(ctx context.Context) error {
        // Implement custom backoff logic.
        // Return error to stop reconnection.
        return customBackoff(ctx)
    },
}

// Use helper functions for common patterns.
cfg.WaitReconnect = reconnect.NewConstantWaiter(5 * time.Second)

// Immediate error return (no retry).
cfg.WaitReconnect = reconnect.NewImmediateErrorWaiter(errNoRetry)

// Prevent all reconnection attempts.
cfg.WaitReconnect = reconnect.NewDoNotReconnectWaiter(errStop)

Configuration Options

The Config structure supports the following fields:

Field Type Default Description
Context context.Context context.Background() Base context for the client.
Logger slog.Logger Default logger Logger for structured logging.
Remote string Required Target address. TCP: host:port, Unix: /path/to/socket or unix:/path
KeepAlive time.Duration 5s TCP keep-alive interval (ignored for Unix sockets).
DialTimeout time.Duration 2s Connection establishment timeout.
ReadTimeout time.Duration 2s Read deadline for connections.
WriteTimeout time.Duration 2s Write deadline for connections.
ReconnectDelay time.Duration 0 Delay between reconnection attempts.
WaitReconnect Waiter Constant waiter Custom reconnection wait function.
OnSocket func nil Raw socket configuration callback.
OnConnect func nil Connection establishment callback.
OnSession func nil Session handler (blocks until done).
OnDisconnect func nil Disconnection callback.
OnError func nil Error handler callback.

Client Methods

Core Methods
// New creates a new Client with options.
func New(cfg *Config, options ...OptionFunc) (*Client, error)

// Connect initiates the connection and starts the reconnection loop.
func (c *Client) Connect() error

// Config returns the configuration object.
func (c *Client) Config() *Config

// Reload attempts to apply configuration changes.
// Note: Currently returns ErrTODO.
func (c *Client) Reload() error

// Wait blocks until the client stops and returns the cancellation reason.
func (c *Client) Wait() error

// Err returns the cancellation reason.
func (c *Client) Err() error

// Done returns a channel that watches the client workers.
func (c *Client) Done() <-chan struct{}

// Shutdown initiates a shutdown and waits until done or context timeout.
func (c *Client) Shutdown(ctx context.Context) error
Configuration Methods
// SetDefaults fills gaps in the configuration.
func (cfg *Config) SetDefaults() error

// Valid checks if the configuration is usable.
func (cfg *Config) Valid() error

// ExportDialer creates a net.Dialer from the configuration.
func (cfg *Config) ExportDialer() net.Dialer
Utility Functions
// ParseRemote determines network type and address from a remote string.
// Returns (network, address, error) where network is reconnect.NetworkTCP or
// reconnect.NetworkUnix.
func ParseRemote(remote string) (network, address string, err error)

// ValidateRemote validates a remote address for TCP or Unix socket connection.
// Returns nil if the address is valid for either protocol.
func ValidateRemote(remote string) error

// TimeoutToAbsoluteTime adds duration to base time.
// Returns zero time if duration is negative.
func TimeoutToAbsoluteTime(base time.Time, d time.Duration) time.Time
Using Utility Functions
import "darvaza.org/x/net/reconnect"

// Parse and validate remote addresses
network, address, err := reconnect.ParseRemote("unix:/var/run/app.sock")
if err != nil {
    // Handle parsing error
}
// network = "unix", address = "/var/run/app.sock"

// Validate before using in configuration
if err := reconnect.ValidateRemote("example.com:8080"); err != nil {
    // Handle invalid address
}

// Convert relative timeout to absolute time
deadline := reconnect.TimeoutToAbsoluteTime(time.Now(), 30*time.Second)

Error Handling

The client distinguishes between recoverable and non-recoverable errors.

Error Types
var (
    // ErrConfigBusy indicates the Config is already in use.
    ErrConfigBusy = core.QuietWrap(fs.ErrPermission,
        "config already in use")

    // ErrRunning indicates the client is already running.
    ErrRunning = errors.New("already running")

    // Additional errors defined in the errors.go file.
)
Error Classification
  • Recoverable: Network timeouts, connection refused, temporary failures.
  • Non-recoverable: Context cancellation, configuration errors, explicit stop requests.

The OnError callback can override the default retry behaviour by returning a non-nil error to stop reconnection attempts.

Thread Safety

All client operations are thread-safe. Multiple goroutines can safely:

  • Call client methods concurrently.
  • Access the client's context.
  • Trigger cancellation.

The configuration becomes immutable after creating a client. Attempting to reuse a configuration for another client returns ErrConfigBusy.

Resource Management

The client properly manages resources:

  • Connections are automatically closed on errors.
  • Goroutines are cleaned up on shutdown.
  • Context cancellation is propagated throughout.
  • The Wait() method ensures proper shutdown sequencing.

Helper Functions

Waiter Functions
// NewConstantWaiter creates a waiter with fixed delay.
func NewConstantWaiter(d time.Duration) func(context.Context) error

// NewImmediateErrorWaiter returns an error immediately.
func NewImmediateErrorWaiter(err error) func(context.Context) error

// NewDoNotReconnectWaiter prevents reconnection.
func NewDoNotReconnectWaiter(err error) func(context.Context) error
Worker Functions
// NewShutdownFunc creates a worker that shuts down gracefully.
func NewShutdownFunc(s Shutdowner, timeout time.Duration) WorkerFunc

// NewCatchFunc creates an error catcher with exceptions.
func NewCatchFunc(nonErrors ...error) CatcherFunc

Implementation Notes

Configuration Lifecycle
  1. Create a Config with required fields.
  2. Call SetDefaults() to fill optional fields (done automatically by New).
  3. Call Valid() to verify the configuration (done automatically by New).
  4. Pass to New() to create a client.
  5. Configuration becomes immutable and bound to the client.
Connection Flow
  1. Connect() initiates the first connection attempt.
  2. On success, OnConnect callback is invoked.
  3. OnSession callback runs (blocks until session ends).
  4. On disconnection, OnDisconnect callback is invoked.
  5. WaitReconnect determines the retry delay.
  6. Process repeats until context cancellation or fatal error.
Session Handler Guidelines

The OnSession callback should:

  • Block until the session is complete.
  • Handle all protocol-specific logic.
  • Return nil to trigger reconnection on completion.
  • Return an error to stop reconnection.
  • Respect context cancellation.

Integration with darvaza.org/x/net

The reconnect client integrates seamlessly with other darvaza.org/x/net components:

  • Uses the same net.Dialer interface.
  • Supports all standard network configurations.
  • Compatible with the bind package for advanced binding.

Example: Protocol Implementation

import (
    "context"
    "net"
    "time"

    "darvaza.org/x/net/reconnect"
)

func createClient(addr string) (*reconnect.Client, error) {
    cfg := &reconnect.Config{
        Remote:         addr,
        ReconnectDelay: 5 * time.Second,

        OnConnect: func(ctx context.Context, conn net.Conn) error {
            // Send initial handshake.
            return sendHandshake(conn)
        },

        OnSession: func(ctx context.Context) error {
            // Main protocol loop.
            for {
                select {
                case <-ctx.Done():
                    return ctx.Err()
                case msg := <-messages:
                    if err := processMessage(msg); err != nil {
                        return err
                    }
                }
            }
        },

        OnError: func(ctx context.Context, conn net.Conn, err error) error {
            if isTemporary(err) {
                // Retry on temporary errors.
                return nil
            }
            // Stop on permanent errors.
            return err
        },
    }

    return reconnect.New(cfg)
}
Example: Unix Domain Socket Connection
import (
    "context"
    "net"
    "time"

    "darvaza.org/x/net/reconnect"
)

func createUnixClient() (*reconnect.Client, error) {
    cfg := &reconnect.Config{
        // Automatically detected as Unix socket
        Remote:         "/var/run/myapp.sock",
        ReconnectDelay: 5 * time.Second,

        OnConnect: func(ctx context.Context, conn net.Conn) error {
            // Unix socket connected
            logger.Info().Printf("Connected to Unix: %s", conn.RemoteAddr())
            return nil
        },

        OnSession: func(ctx context.Context) error {
            // Handle Unix socket communication
            return handleUnixProtocol(ctx)
        },
    }

    return reconnect.New(cfg)
}

// Alternative: explicit Unix socket prefix
func createExplicitUnixClient() (*reconnect.Client, error) {
    cfg := &reconnect.Config{
        Remote: "unix:/tmp/app.sock",
        // ... rest of configuration
    }
    return reconnect.New(cfg)
}

Network Type Auto-Detection

The client automatically determines the network type based on the Remote string:

Pattern Network Type Example
unix: prefix Unix socket unix:/var/run/app.sock
Absolute path (/) Unix socket /tmp/socket
Ends with .sock Unix socket ./app.sock, run/app.sock
All others TCP example.com:8080, 192.168.1.1:443

This implementation provides enterprise-grade reliability for both TCP and Unix domain socket connections whilst maintaining simplicity and flexibility for various use cases.

Documentation

Overview

Package reconnect implements a generic retrying network client.

Index

Constants

View Source
const (
	// LogFieldAddress is the field name used to store the address
	// when logging.
	LogFieldAddress = "addr"

	// LogFieldError is the field name used to store the error
	// when logging.
	LogFieldError = slog.ErrorFieldName
)
View Source
const (
	// NetworkTCP represents TCP network type
	NetworkTCP = "tcp"
	// NetworkUnix represents Unix domain socket network type
	NetworkUnix = "unix"
)
View Source
const (
	// DefaultWaitReconnect specifies how long we will wait for
	// to reconnect by default
	DefaultWaitReconnect = 5 * time.Second
)

Variables

View Source
var (
	// ErrAbnormalConnect indicates the dialer didn't return error
	// nor connection.
	ErrAbnormalConnect = core.QuietWrap(syscall.ECONNABORTED, "abnormal response")

	// ErrDoNotReconnect indicates the Waiter
	// instructed us to not reconnect
	ErrDoNotReconnect = errors.New("don't reconnect")

	// ErrNotConnected indicates the [Client] isn't currently connected.
	ErrNotConnected = core.QuietWrap(fs.ErrClosed, "not connected")

	// ErrRunning indicates the [Client] has already being started.
	ErrRunning = core.QuietWrap(syscall.EBUSY, "client already running")
)
View Source
var (
	// ErrConfigBusy indicates the [Config] is in use and can't
	// be used to create another [Client].
	ErrConfigBusy = core.QuietWrap(fs.ErrPermission, "config already in use")
)

Functions

func IsFatal

func IsFatal(err error) bool

IsFatal tells if the error means the connection should be closed and not retried.

func IsNonError

func IsNonError(err error) bool

IsNonError checks if the error is an actual error instead of a manual shutdown.

func NewConstantWaiter

func NewConstantWaiter(d time.Duration) func(context.Context) error

NewConstantWaiter blocks for a given amount of time, or until the context is cancelled. If the given duration is negative, the Waiter won't wait, but it will still check for context terminations. If zero, the Waiter will wait the default amount.

func NewDoNotReconnectWaiter

func NewDoNotReconnectWaiter(err error) func(context.Context) error

NewDoNotReconnectWaiter returns a Waiter that will return the context cancellation cause, the specified error, or ErrDoNotReconnect.

func NewImmediateErrorWaiter

func NewImmediateErrorWaiter(err error) func(context.Context) error

NewImmediateErrorWaiter returns a Waiter that will return the context cancellation cause or the specified error, if any. There is no actual waiting.

func ParseRemote added in v0.6.1

func ParseRemote(remote string) (network, address string, err error)

ParseRemote determines the network type and address from a remote string. It supports: - "unix:/path/to/socket" - explicit Unix socket. - "/path/to/socket" - Unix socket (absolute path). - "path/to/file.sock" - Unix socket (ends with .sock). - "host:port" - TCP socket.

func TimeoutToAbsoluteTime

func TimeoutToAbsoluteTime(base time.Time, d time.Duration) time.Time

TimeoutToAbsoluteTime adds the given time.Duration to a base time.Time. If the duration is negative, a zero time.Time will be returned. If the base is zero, the current time will be used.

func ValidateRemote added in v0.6.1

func ValidateRemote(remote string) error

ValidateRemote validates a remote address for use with reconnect clients. It returns nil if the address is valid for either TCP or Unix socket connection. For TCP addresses, it validates host:port format. For Unix socket addresses, it accepts the address as-is.

Types

type CatcherFunc added in v0.3.0

type CatcherFunc func(context.Context, error) error

CatcherFunc is a catch function for core.ErrGroup's GoCatch.

func NewCatchFunc added in v0.3.0

func NewCatchFunc(nonErrors ...error) CatcherFunc

NewCatchFunc creates a CatcherFunc turning any of the given errors into nil.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a reconnecting network client.

func Must

func Must(cfg *Config, options ...OptionFunc) *Client

Must is like New but it panics on errors.

func New

func New(cfg *Config, options ...OptionFunc) (*Client, error)

New creates a new Client using the given Config and options.

func (*Client) Close

func (c *Client) Close() error

Close terminates the current connection

func (*Client) Config

func (c *Client) Config() *Config

Config returns the Config object used when [Reload] is called.

func (*Client) Connect

func (c *Client) Connect() error

Connect launches the Client.

func (*Client) Done

func (c *Client) Done() <-chan struct{}

Done returns a channel that watches the Client workers, and provides the cancellation reason.

func (*Client) Err

func (c *Client) Err() error

Err returns the cancellation reason. It will return nil if the cause was initiated by the user.

func (*Client) Go

func (c *Client) Go(funcs ...WorkerFunc)

Go spawns a goroutine within the Client's context.

func (*Client) GoCatch added in v0.3.0

func (c *Client) GoCatch(run WorkerFunc, catch CatcherFunc)

GoCatch spawns a goroutine within the Client's context, optionally allowing filtering the error to stop cascading.

func (*Client) LocalAddr added in v0.2.4

func (c *Client) LocalAddr() net.Addr

LocalAddr returns the local address if connected.

func (*Client) Read

func (c *Client) Read(p []byte) (int, error)

Read reads from the TCP connection, if connected.

func (*Client) Reload

func (c *Client) Reload() error

Reload attempts to apply changes done to the Config since the last time, or since created.

func (*Client) RemoteAddr added in v0.2.4

func (c *Client) RemoteAddr() net.Addr

RemoteAddr returns the remote address if connected.

func (*Client) ResetDeadline

func (c *Client) ResetDeadline() error

ResetDeadline sets the connection's read and write deadlines using the default values.

func (*Client) ResetReadDeadline

func (c *Client) ResetReadDeadline() error

ResetReadDeadline resets the connection's read deadline using the default duration.

func (*Client) ResetWriteDeadline

func (c *Client) ResetWriteDeadline() error

ResetWriteDeadline resets the connection's write deadline using the default duration.

func (*Client) SetDeadline

func (c *Client) SetDeadline(read, write time.Duration) error

SetDeadline sets the connections's read and write deadlines. if write is zero but read is positive, write is set using the same value as read. zero or negative can be used to disable the deadline.

func (*Client) SetReadDeadline added in v0.2.3

func (c *Client) SetReadDeadline(d time.Duration) error

SetReadDeadline sets the connections' read deadline to the specified duration. Use zero or negative to disable it.

func (*Client) SetWriteDeadline added in v0.2.3

func (c *Client) SetWriteDeadline(d time.Duration) error

SetWriteDeadline sets the connections' write deadline to the specified duration. Use zero or negative to disable it.

func (*Client) Shutdown

func (c *Client) Shutdown(ctx context.Context) error

Shutdown initiates a shutdown and waits until the workers are done, or the given context times out.

func (*Client) Wait

func (c *Client) Wait() error

Wait blocks until the Client workers have finished, and returns the cancellation reason.

func (*Client) WithDebug added in v0.2.4

func (c *Client) WithDebug(addr net.Addr) (slog.Logger, bool)

WithDebug gets a logger at Debug level optionally annotated by an IP address. If the Debug log-level is disabled, it will return `nil, false`.

func (*Client) WithError

func (c *Client) WithError(addr net.Addr, err error) (slog.Logger, bool)

WithError gets a logger at Error level optionally annotated by an IP address. If the Error log-level is disabled, it will return `nil, false`.

func (*Client) WithInfo

func (c *Client) WithInfo(addr net.Addr) (slog.Logger, bool)

WithInfo gets a logger at Info level optionally annotated by an IP address. If the Info log-level is disabled, it will return `nil, false`.

func (*Client) Write

func (c *Client) Write(p []byte) (int, error)

Write writes to the TCP connection, if connected.

type Config

type Config struct {
	Context context.Context
	Logger  slog.Logger

	// Remote indicates the remote endpoint: TCP "host:port" or Unix socket path,
	// e.g., "/path/to/socket" or "unix:/path".
	Remote string

	// KeepAlive indicates the value to be set to TCP connections
	// for the low level keep alive messages.
	KeepAlive time.Duration `default:"5s"`
	// DialTimeout indicates how long are we willing to wait for new
	// connections getting established.
	DialTimeout time.Duration `default:"2s"`
	// ReadTimeout is the default read deadline for the connection.
	// Zero or negative disables the deadline.
	ReadTimeout time.Duration `default:"2s"`
	// WriteTimeout is the default write deadline for the connection.
	// Zero or negative disables the deadline.
	WriteTimeout time.Duration `default:"2s"`

	// ReconnectDelay specifies how long to wait between re-connections
	// unless [WaitReconnect] is specified. Negative implies reconnecting is disabled.
	ReconnectDelay time.Duration
	// WaitReconnect is a helper used to wait between re-connection attempts.
	WaitReconnect Waiter

	// OnSocket is called, when defined, against the raw socket before attempting to
	// connect
	OnSocket func(context.Context, syscall.RawConn) error
	// OnConnect is called, when defined, immediately after the connection is established
	// but before the session is created.
	OnConnect func(context.Context, net.Conn) error

	// OnSession is expected to block until it's done.
	OnSession func(context.Context) error
	// OnDisconnect is called after closing the connection and can be used to
	// prevent further connection retries.
	OnDisconnect func(context.Context, net.Conn) error
	// OnError is called after all errors and gives us the opportunity to
	// decide how the error should be treated by the reconnection logic.
	OnError func(context.Context, net.Conn, error) error
	// contains filtered or unexported fields
}

Config describes the operation of the Client.

func (*Config) ExportDialer

func (cfg *Config) ExportDialer() net.Dialer

ExportDialer creates a net.Dialer from the Config.

func (*Config) SetDefaults

func (cfg *Config) SetDefaults() error

SetDefaults fills any gap in the config.

func (*Config) Valid

func (cfg *Config) Valid() error

Valid checks if the Config is fit to be used.

type OptionFunc

type OptionFunc func(*Config) error

An OptionFunc modifies a Config consistently before SetDefaults() and Validate().

type Shutdowner added in v0.3.0

type Shutdowner interface {
	Shutdown(context.Context) error
}

A Shutdowner is an object that provides a Shutdown method that takes a context with deadline to shut down all associated workers.

type StreamSession added in v0.2.2

type StreamSession[Input, Output any] struct {

	// QueueSize specifies how many [Output] type entries can be buffered
	// for delivery before [StreamSession.Send] blocks.
	QueueSize uint
	// Conn specifies the underlying connection
	Conn io.ReadWriteCloser
	// Context is an optional [context.Context] to allow cascading cancellations.
	Context context.Context

	// Split identifies the next encoded [Input] type in the inbound stream.
	// If not set, [bufio.SplitLine] will be used.
	Split bufio.SplitFunc
	// Marshal is used, if MarshalTo isn't set, to encode an [Output] type.
	// If neither is set, [StreamSession.Go] will fail.
	Marshal func(Output) ([]byte, error)
	// MarshalTo, if set, is used to write the encoded representation of
	// and [Output] type.
	MarshalTo func(Output, io.Writer) error
	// Unmarshal is used to decode an [Input] type previously identified
	// by [StreamSession.Split].
	// If not net, [StreamSession.Go] will fail.
	Unmarshal func([]byte) (Input, error)

	// SetReadDeadline is an optional hook called before reading the a message
	SetReadDeadline func() error
	// SetWriteDeadline is an optional hook called before writing a message
	SetWriteDeadline func() error
	// UnsetReadDeadline is an optional hook called after having read a message
	UnsetReadDeadline func() error
	// UnsetWriteDeadline is an optional hook called after having wrote a message
	UnsetWriteDeadline func() error

	// OnError is optionally called when an error occurs
	OnError func(error)
	// contains filtered or unexported fields
}

StreamSession provides an asynchronous stream session using message types for receiving and sending.

func (*StreamSession[_, _]) Close added in v0.2.2

func (s *StreamSession[_, _]) Close() error

Close initiates a shutdown of the session.

func (*StreamSession[_, _]) Done added in v0.2.2

func (s *StreamSession[_, _]) Done() <-chan struct{}

Done returns a channel that will be closed when all workers are done.

func (*StreamSession[Input, Output]) Err added in v0.3.0

func (s *StreamSession[Input, Output]) Err() error

Err returns the error that initiated a shutdown.

func (*StreamSession[_, _]) Go added in v0.2.4

func (s *StreamSession[_, _]) Go(funcs ...WorkerFunc)

Go spawns a goroutine within the session's context.

func (*StreamSession[_, _]) GoCatch added in v0.3.0

func (s *StreamSession[_, _]) GoCatch(run WorkerFunc, catch CatcherFunc)

GoCatch spawns a goroutine within the session's context, and allows a catcher function to filter returned errors.

func (*StreamSession[Input, _]) Next added in v0.2.2

func (s *StreamSession[Input, _]) Next() (Input, bool)

Next blocks until a new message is received.

func (*StreamSession[Input, _]) Recv added in v0.2.2

func (s *StreamSession[Input, _]) Recv() <-chan Input

Recv returns a channel where inbound messages can be received.

func (*StreamSession[_, Output]) Send added in v0.2.2

func (s *StreamSession[_, Output]) Send(m Output) error

Send sends a message asynchronously, unless the queue is full.

func (*StreamSession[_, _]) Shutdown added in v0.3.0

func (s *StreamSession[_, _]) Shutdown(ctx context.Context) error

Shutdown initiates a shutdown and waits until it's done or the given context has expired.

func (*StreamSession[_, _]) Spawn added in v0.2.2

func (s *StreamSession[_, _]) Spawn() error

Spawn starts the StreamSession.

func (*StreamSession[_, _]) Wait added in v0.2.2

func (s *StreamSession[_, _]) Wait() error

Wait blocks until all workers are done.

type Waiter

type Waiter func(context.Context) error

A Waiter is a function that blocks and returns an error when cancelled or nil when we are good to continue.

type WorkGroup added in v0.3.0

type WorkGroup interface {
	Go(...WorkerFunc)
	GoCatch(WorkerFunc, CatcherFunc)

	Shutdown(context.Context) error

	Wait() error
	Done() <-chan struct{}
	Err() error
}

A WorkGroup is an error group interface.

type WorkerFunc added in v0.3.0

type WorkerFunc func(context.Context) error

WorkerFunc is a run function for core.ErrGroup's GoCatch.

func NewShutdownFunc added in v0.3.0

func NewShutdownFunc(s Shutdowner, tio time.Duration) WorkerFunc

NewShutdownFunc creates a shutdown WorkerFunc, optionally with a deadline.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL