Documentation
¶
Overview ¶
Package queue defines a simple message queue pool interface and utilities.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrClosed = fmt.Errorf("pool is already closed")
Functions ¶
This section is empty.
Types ¶
type BasePool ¶
type BasePool[T any] struct { // contains filtered or unexported fields }
BasePool implements the boring parts of ListenerPool.
func NewBasePool ¶
func NewBasePool[T any](listen func(string) bool, close func() error) (pool *BasePool[T], send func(msg Message[T]))
NewBasePool creates a BasePool managing waiters. The functions listen and close do the same as specified in the ListenerPool interface. They are called when the corresponding functions are called on the created BasePool. The returned function send propagates msg to waiters.
type JSONParser ¶
type JSONParser[T any] struct{}
func (JSONParser[T]) Parse ¶
func (j JSONParser[T]) Parse(construct value.Constructor[T], data string) (v T, err error)
type ListenerPool ¶
type ListenerPool[T any] interface { // Wait adds ch to the list of Go channels sent to when a notification // on channel occurs. If not currently listening to channel, Wait calls // Listen(channel). As a special case, the value "" (the empty string) // subscribes to all channels currently being listened to. Wait(channel string, ch WaiterChan[T]) (ok bool) // WaitFn is Wait, but instead of sending on a Go channel, calls fn. WaitFn(channel string, fn WaiterFn[T]) (ok bool) // Listen begins listening to channel. Listen(channel string) (ok bool) // Closes the connection with the message queue and stops listening for // notifications. This does not close any channels passed to Wait. // Subsequent calls after already closed return ErrClosed. Close() error }
A ListenerPool allows multiple users to listen to a channel in a queue service such as SQS queues without each acquiring a connection. Queue services have strings as channel names and handle data of type T. For example, Postgres channels allow a ListenerPool[string].
type ParsingPool ¶
type ParsingPool[T any] struct { ListenerPool[T] SendOnErr bool // contains filtered or unexported fields }
A ParsingPool proxies messages from a ListenerPool[string] and sends the parsed T to waiters. If parser returns an error, the message will be discarded unless SendOnErr is true, which will send the channel name and a zero-value payload.
func NewParsingPool ¶
func NewParsingPool[T any](pool ListenerPool[string], parser Parser[T]) *ParsingPool[T]
NewParsingPool creates a new ParsingPool using pool and parser. Returns a nil pointer if no pool or parser is provided or pool.Wait fails.