castix

package module
v1.0.0-beta.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2025 License: MIT Imports: 3 Imported by: 0

README

Castix

Go Version Go Reference Go Report Card

Castix provides a flexible way to manage message streams in your applications. It acts as a central hub ( multiplexer) where multiple message producers (sources) can send messages, and multiple consumers (subscribers) can receive them.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// WithSubscribeBufferSize returns a SubscribeOption that configures the buffer
	// size of the channel returned by the Subscribe method.
	// A larger buffer can help prevent blocking (when using default delivery strategy)
	// if the subscriber processes messages slower than they are produced but consumes
	// more memory.
	WithSubscribeBufferSize = bridge.WithWatchBuffSize

	// WithSubscribeDrain returns a SubscribeOption that causes the subscription to
	// drop older messages in its buffer when it is full, removing the oldest message
	// to make room for the newest one. This strategy prioritizes newer messages at the
	// expense of potentially losing older ones when the subscriber cannot keep up with
	// the message rate.
	WithSubscribeDrain = bridge.WithWatchDrain

	// WithSubscribeSkip returns a SubscribeOption that causes the subscription to
	// drop messages if its buffer is full, instead of blocking the Castix multiplexer.
	// This can be useful when timely processing is less critical than overall system
	// responsiveness, but may lead to message loss.
	WithSubscribeSkip = bridge.WithWatchSkip
)

Functions

func Pass

func Pass[T any](in T) T

Pass is a utility Convert function that returns the input value unchanged. It can be used when no type conversion is necessary between the source and subscriber (i.e., IN and OUT are the same type).

func WithSourceFilter

func WithSourceFilter[T any](f Filter[T]) bridge.AttachFilterOption[T]

WithSourceFilter returns a SourceOption that applies a filter to messages coming from a specific source. Only messages for which the filter function `f` returns true will be passed on for conversion and distribution. The filter is applied before type conversion.

func WithSubscribeFilter

func WithSubscribeFilter[T any](f Filter[T]) bridge.WatchFilterOption[T]

WithSubscribeFilter returns a SubscribeOption that applies a filter to messages designated for a specific subscription. Only messages for which the filter function `f` returns true will be sent to the subscription's channel. The filter is applied before sending a message to the subscribed channel.

Types

type Castix

type Castix[IN, OUT any] struct {
	// contains filtered or unexported fields
}

Castix provides a flexible way to manage message streams. It acts as a multiplexer, allowing multiple message producers (sources) to broadcast messages to multiple consumers (subscribers). Castix also handles type conversion between the source message type (IN) and the subscriber's desired message type (OUT) using a provided Convert function.

func New

func New[IN, OUT any](cv Convert[IN, OUT], opts ...Option) *Castix[IN, OUT]

New creates and initializes a new Castix instance. It requires a Convert function `cv` that defines how messages of type IN are transformed into messages of type OUT. If no transformation is needed (IN and OUT are the same type), the Pass function can be used.

func (Castix[IN, OUT]) Source

func (x Castix[IN, OUT]) Source(ch <-chan IN, opts ...SourceOption) Leave

Source attaches a Go channel `ch` as a message producer to the Castix instance. Messages sent to `ch` will be processed by Castix, converted according to the function provided to New, and then distributed to all active subscribers.

Callers can provide SourceOption arguments to customize the behavior of the source. Available options include:

  • WithSourceFilter: Applies a filter to messages before they are sent to the subscribers.

It returns a Leave function. This function MUST be called when the source channel is no longer needed. Calling Leave detaches the source from Castix and releases associated resources. The source will also be detached automatically if the provided `ch` channel is closed. In both scenarios, resources are cleaned up.

func (Castix[IN, OUT]) Subscribe

func (x Castix[IN, OUT]) Subscribe(opts ...SubscribeOption) (<-chan OUT, Leave)

Subscribe creates a new subscription to receive messages processed by the Castix instance. It returns two values:

  1. A receive-only channel on which converted messages of type OUT will be delivered.
  2. A Leave function.

Messages sent to this channel originate from all attached sources and are converted using the Convert function specified when the Castix instance was created.

Callers can provide `SubscribeOption` arguments to customize the subscription's behavior. Available options include:

  • WithSubscribeBufferSize: Configures the buffer size of the returned message channel.
  • WithSubscribeFilter: Applies a filter to messages before they are sent to the channel.
  • WithSubscribeDrain: Ensures all buffered messages are processed when unsubscribing.
  • WithSubscribeSkip: Allows dropping messages if the subscription channel's buffer is full.

The returned Leave function MUST be called when the subscription is no longer needed. This will close the message channel and release all associated resources. It's important to call Leave to prevent indefinite goroutine blocking and resource leaks.

type Convert

type Convert[IN, OUT any] func(IN) OUT

Convert defines a function signature for converting a value of type IN to a value of type OUT. This is used by Castix to transform messages from their source type to the desired output type for subscribers.

type Filter

type Filter[T any] func(T) bool

Filter defines a function signature for a predicate that determines whether a message of type T should be processed or discarded/skipped. Must return true if a trial is passed and false otherwise

type Leave

type Leave = bridge.Leave

Leave is a function that should be called to clean up resources

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option defines a configuration option that can be applied when creating a new Castix instance using New.

func UseGoroutine

func UseGoroutine() Option

UseGoroutine returns an Option that configures Castix to use goroutine-based multiplexing for handling its input channels. This is generally the default behavior if no specific input handling option is provided. In this mode, each attached input channel is monitored by a dedicated goroutine.

func UseReflection

func UseReflection() Option

UseReflection returns an Option that configures Castix to use reflection-based multiplexing (via reflect.Select) for handling its input channels. This approach is recommended when dealing with a large number of input channels that are expected to receive messages infrequently. It is particularly well-suited for scenarios where conserving goroutine resources across many potentially idle channels is beneficial, as it uses a single goroutine to manage all inputs. For scenarios with fewer channels experiencing high-throughput and frequent messages, goroutine-based multiplexing (see UseGoroutine) may offer better performance.

type SourceOption

type SourceOption = bridge.AttachOption

SourceOption represents an option that can be applied when attaching a source to a Castix instance via the Source method.

type SubscribeOption

type SubscribeOption = bridge.WatchOption

SubscribeOption represents an option that can be applied when creating a subscription to a Castix instance via the Subscribe method.

Directories

Path Synopsis
internal
mux

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL