Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // WithSubscribeBufferSize returns a SubscribeOption that configures the buffer // size of the channel returned by the Subscribe method. // A larger buffer can help prevent blocking (when using default delivery strategy) // if the subscriber processes messages slower than they are produced but consumes // more memory. WithSubscribeBufferSize = bridge.WithWatchBuffSize // WithSubscribeDrain returns a SubscribeOption that causes the subscription to // drop older messages in its buffer when it is full, removing the oldest message // to make room for the newest one. This strategy prioritizes newer messages at the // expense of potentially losing older ones when the subscriber cannot keep up with // the message rate. WithSubscribeDrain = bridge.WithWatchDrain // WithSubscribeSkip returns a SubscribeOption that causes the subscription to // drop messages if its buffer is full, instead of blocking the Castix multiplexer. // This can be useful when timely processing is less critical than overall system // responsiveness, but may lead to message loss. WithSubscribeSkip = bridge.WithWatchSkip )
Functions ¶
func Pass ¶
func Pass[T any](in T) T
Pass is a utility Convert function that returns the input value unchanged. It can be used when no type conversion is necessary between the source and subscriber (i.e., IN and OUT are the same type).
func WithSourceFilter ¶
func WithSourceFilter[T any](f Filter[T]) bridge.AttachFilterOption[T]
WithSourceFilter returns a SourceOption that applies a filter to messages coming from a specific source. Only messages for which the filter function `f` returns true will be passed on for conversion and distribution. The filter is applied before type conversion.
func WithSubscribeFilter ¶
func WithSubscribeFilter[T any](f Filter[T]) bridge.WatchFilterOption[T]
WithSubscribeFilter returns a SubscribeOption that applies a filter to messages designated for a specific subscription. Only messages for which the filter function `f` returns true will be sent to the subscription's channel. The filter is applied before sending a message to the subscribed channel.
Types ¶
type Castix ¶
type Castix[IN, OUT any] struct { // contains filtered or unexported fields }
Castix provides a flexible way to manage message streams. It acts as a multiplexer, allowing multiple message producers (sources) to broadcast messages to multiple consumers (subscribers). Castix also handles type conversion between the source message type (IN) and the subscriber's desired message type (OUT) using a provided Convert function.
func New ¶
New creates and initializes a new Castix instance. It requires a Convert function `cv` that defines how messages of type IN are transformed into messages of type OUT. If no transformation is needed (IN and OUT are the same type), the Pass function can be used.
func (Castix[IN, OUT]) Source ¶
func (x Castix[IN, OUT]) Source(ch <-chan IN, opts ...SourceOption) Leave
Source attaches a Go channel `ch` as a message producer to the Castix instance. Messages sent to `ch` will be processed by Castix, converted according to the function provided to New, and then distributed to all active subscribers.
Callers can provide SourceOption arguments to customize the behavior of the source. Available options include:
- WithSourceFilter: Applies a filter to messages before they are sent to the subscribers.
It returns a Leave function. This function MUST be called when the source channel is no longer needed. Calling Leave detaches the source from Castix and releases associated resources. The source will also be detached automatically if the provided `ch` channel is closed. In both scenarios, resources are cleaned up.
func (Castix[IN, OUT]) Subscribe ¶
func (x Castix[IN, OUT]) Subscribe(opts ...SubscribeOption) (<-chan OUT, Leave)
Subscribe creates a new subscription to receive messages processed by the Castix instance. It returns two values:
- A receive-only channel on which converted messages of type OUT will be delivered.
- A Leave function.
Messages sent to this channel originate from all attached sources and are converted using the Convert function specified when the Castix instance was created.
Callers can provide `SubscribeOption` arguments to customize the subscription's behavior. Available options include:
- WithSubscribeBufferSize: Configures the buffer size of the returned message channel.
- WithSubscribeFilter: Applies a filter to messages before they are sent to the channel.
- WithSubscribeDrain: Ensures all buffered messages are processed when unsubscribing.
- WithSubscribeSkip: Allows dropping messages if the subscription channel's buffer is full.
The returned Leave function MUST be called when the subscription is no longer needed. This will close the message channel and release all associated resources. It's important to call Leave to prevent indefinite goroutine blocking and resource leaks.
type Convert ¶
type Convert[IN, OUT any] func(IN) OUT
Convert defines a function signature for converting a value of type IN to a value of type OUT. This is used by Castix to transform messages from their source type to the desired output type for subscribers.
type Filter ¶
Filter defines a function signature for a predicate that determines whether a message of type T should be processed or discarded/skipped. Must return true if a trial is passed and false otherwise
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option defines a configuration option that can be applied when creating a new Castix instance using New.
func UseGoroutine ¶
func UseGoroutine() Option
UseGoroutine returns an Option that configures Castix to use goroutine-based multiplexing for handling its input channels. This is generally the default behavior if no specific input handling option is provided. In this mode, each attached input channel is monitored by a dedicated goroutine.
func UseReflection ¶
func UseReflection() Option
UseReflection returns an Option that configures Castix to use reflection-based multiplexing (via reflect.Select) for handling its input channels. This approach is recommended when dealing with a large number of input channels that are expected to receive messages infrequently. It is particularly well-suited for scenarios where conserving goroutine resources across many potentially idle channels is beneficial, as it uses a single goroutine to manage all inputs. For scenarios with fewer channels experiencing high-throughput and frequent messages, goroutine-based multiplexing (see UseGoroutine) may offer better performance.
type SourceOption ¶
type SourceOption = bridge.AttachOption
SourceOption represents an option that can be applied when attaching a source to a Castix instance via the Source method.
type SubscribeOption ¶
type SubscribeOption = bridge.WatchOption
SubscribeOption represents an option that can be applied when creating a subscription to a Castix instance via the Subscribe method.