msg

package
v0.0.0-...-1b86826 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2025 License: Apache-2.0 Imports: 2 Imported by: 0

Documentation

Overview

Package msg provides tools for message passing and queues between the different nodes of the Beyla pipelines.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Queue

type Queue[T any] struct {
	// contains filtered or unexported fields
}

Queue is a simple message queue that allows sending messages to multiple subscribers. It also allows bypassing messages to other queues, so that a message sent to one queue can be received by subscribers of another queue. If a message is sent to a queue that has no subscribers, it will not block the sender and the message will be lost. This is by design, as the queue is meant to be used for fire-and-forget

func NewQueue

func NewQueue[T any](opts ...QueueOpts) *Queue[T]

NewQueue creates a new Queue instance with the given options.

func (*Queue[T]) Bypass

func (q *Queue[T]) Bypass(to *Queue[T])

Bypass allows this queue to bypass messages to another queue. This means that messages sent to this queue will also be sent to the other queue. This operation is not thread-safe and does not control for graph cycles.

func (*Queue[T]) Close

func (q *Queue[T]) Close()

Close all the subscribers of this queue. This will close all the channels or will close the bypassed channel

func (*Queue[T]) MarkCloseable

func (q *Queue[T]) MarkCloseable()

MarkCloseable decreases the internal counter of submitters, and if it reaches 0, (meaning that all senders have closed their channels) it will close the queue. This method is useful for multiple nodes sending messages to the same queue, and willing to close it only when all

func (*Queue[T]) Send

func (q *Queue[T]) Send(o T)

Send a message to all subscribers of this queue. If there are no subscribers and the internal channel is full, the sender might block unless the Queue has been instantiated with the NotBlockIfNoSubscribers option. In that case, the message will be lost and the sender will not be blocked.

func (*Queue[T]) Subscribe

func (q *Queue[T]) Subscribe() <-chan T

Subscribe to this queue. This will return a channel that will receive messages. It's important to notice that, if Subscribe is invoked after Send, the sent message will be lost, or forwarded to other subscribed but not to the channel resulting from the last invocation. You can't subscribe to a queue that is bypassing to another queue. Concurrent invocations to Subscribe and Bypass are thread-safe between them, so you can be sure that any subscriber will get its own effective channel. But invocations to Subscribe are not thread-safe with the Send method. This means that concurrent invocations to Subscribe and Send might result in few initial lost messages.

type QueueOpts

type QueueOpts func(*queueConfig)

QueueOpts allow configuring some operation of a queue

func ChannelBufferLen

func ChannelBufferLen(l int) QueueOpts

ChannelBufferLen sets the length of the channel buffer for the queue.

func ClosingAttempts

func ClosingAttempts(attempts int) QueueOpts

ClosingAttempts sets the number of invocations to MarkCloseable before the channel is effectively closed. This is useful when multiple nodes are sending messages to the same queue, and we want to close the queue only when all of them have marked the channel as closeable.

func NotBlockIfNoSubscribers

func NotBlockIfNoSubscribers() QueueOpts

NotBlockIfNoSubscribers will prevent the Send operation to block when there are no subscribers to the channel. This is useful to define connections to destination nodes that are optional and might not be instantiated.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL