Documentation
¶
Overview ¶
Package msg provides tools for message passing and queues between the different nodes of the Beyla pipelines.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Queue ¶
type Queue[T any] struct { // contains filtered or unexported fields }
Queue is a simple message queue that allows sending messages to multiple subscribers. It also allows bypassing messages to other queues, so that a message sent to one queue can be received by subscribers of another queue. If a message is sent to a queue that has no subscribers, it will not block the sender and the message will be lost. This is by design, as the queue is meant to be used for fire-and-forget
func (*Queue[T]) Bypass ¶
Bypass allows this queue to bypass messages to another queue. This means that messages sent to this queue will also be sent to the other queue. This operation is not thread-safe and does not control for graph cycles.
func (*Queue[T]) Close ¶
func (q *Queue[T]) Close()
Close all the subscribers of this queue. This will close all the channels or will close the bypassed channel
func (*Queue[T]) MarkCloseable ¶
func (q *Queue[T]) MarkCloseable()
MarkCloseable decreases the internal counter of submitters, and if it reaches 0, (meaning that all senders have closed their channels) it will close the queue. This method is useful for multiple nodes sending messages to the same queue, and willing to close it only when all
func (*Queue[T]) Send ¶
func (q *Queue[T]) Send(o T)
Send a message to all subscribers of this queue. If there are no subscribers and the internal channel is full, the sender might block unless the Queue has been instantiated with the NotBlockIfNoSubscribers option. In that case, the message will be lost and the sender will not be blocked.
func (*Queue[T]) Subscribe ¶
func (q *Queue[T]) Subscribe() <-chan T
Subscribe to this queue. This will return a channel that will receive messages. It's important to notice that, if Subscribe is invoked after Send, the sent message will be lost, or forwarded to other subscribed but not to the channel resulting from the last invocation. You can't subscribe to a queue that is bypassing to another queue. Concurrent invocations to Subscribe and Bypass are thread-safe between them, so you can be sure that any subscriber will get its own effective channel. But invocations to Subscribe are not thread-safe with the Send method. This means that concurrent invocations to Subscribe and Send might result in few initial lost messages.
type QueueOpts ¶
type QueueOpts func(*queueConfig)
QueueOpts allow configuring some operation of a queue
func ChannelBufferLen ¶
ChannelBufferLen sets the length of the channel buffer for the queue.
func ClosingAttempts ¶
ClosingAttempts sets the number of invocations to MarkCloseable before the channel is effectively closed. This is useful when multiple nodes are sending messages to the same queue, and we want to close the queue only when all of them have marked the channel as closeable.
func NotBlockIfNoSubscribers ¶
func NotBlockIfNoSubscribers() QueueOpts
NotBlockIfNoSubscribers will prevent the Send operation to block when there are no subscribers to the channel. This is useful to define connections to destination nodes that are optional and might not be instantiated.