queue

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 13, 2024 License: AGPL-3.0 Imports: 3 Imported by: 0

Documentation

Overview

Package queue defines a simple message queue pool interface and utilities.

Index

Constants

This section is empty.

Variables

View Source
var ErrClosed = fmt.Errorf("pool is already closed")

Functions

This section is empty.

Types

type BasePool

type BasePool[T any] struct {
	// contains filtered or unexported fields
}

BasePool implements the boring parts of ListenerPool.

func NewBasePool

func NewBasePool[T any](listen func(string) bool, close func() error) (pool *BasePool[T], send func(msg Message[T]))

NewBasePool creates a BasePool managing waiters. The functions listen and close do the same as specified in the ListenerPool interface. They are called when the corresponding functions are called on the created BasePool. The returned function send propagates msg to waiters.

func (*BasePool[T]) Close

func (p *BasePool[T]) Close() (err error)

func (*BasePool[T]) Listen

func (p *BasePool[T]) Listen(channel string) bool

func (*BasePool[T]) Wait

func (p *BasePool[T]) Wait(channel string, ch WaiterChan[T]) bool

func (*BasePool[T]) WaitFn

func (p *BasePool[T]) WaitFn(channel string, fn WaiterFn[T]) bool

type JSONParser

type JSONParser[T any] struct{}

func (JSONParser[T]) Parse

func (j JSONParser[T]) Parse(construct value.Constructor[T], data string) (v T, err error)

type ListenerPool

type ListenerPool[T any] interface {
	// Wait adds ch to the list of Go channels sent to when a notification
	// on channel occurs. If not currently listening to channel, Wait calls
	// Listen(channel). As a special case, the value "" (the empty string)
	// subscribes to all channels currently being listened to.
	Wait(channel string, ch WaiterChan[T]) (ok bool)
	// WaitFn is Wait, but instead of sending on a Go channel, calls fn.
	WaitFn(channel string, fn WaiterFn[T]) (ok bool)
	// Listen begins listening to channel.
	Listen(channel string) (ok bool)

	// Closes the connection with the message queue and stops listening for
	// notifications. This does not close any channels passed to Wait.
	// Subsequent calls after already closed return ErrClosed.
	Close() error
}

A ListenerPool allows multiple users to listen to a channel in a queue service such as SQS queues without each acquiring a connection. Queue services have strings as channel names and handle data of type T. For example, Postgres channels allow a ListenerPool[string].

type Message

type Message[T any] struct {
	Channel string
	Payload T
}

type Parser

type Parser[T any] interface {
	Parse(value.Constructor[T], string) (T, error)
}

type ParsingPool

type ParsingPool[T any] struct {
	ListenerPool[T]

	SendOnErr bool
	// contains filtered or unexported fields
}

A ParsingPool proxies messages from a ListenerPool[string] and sends the parsed T to waiters. If parser returns an error, the message will be discarded unless SendOnErr is true, which will send the channel name and a zero-value payload.

func NewParsingPool

func NewParsingPool[T any](pool ListenerPool[string], parser Parser[T]) *ParsingPool[T]

NewParsingPool creates a new ParsingPool using pool and parser. Returns a nil pointer if no pool or parser is provided or pool.Wait fails.

type WaiterChan

type WaiterChan[T any] chan Message[T]

type WaiterFn

type WaiterFn[T any] func(Message[T])

Directories

Path Synopsis
providers
pgxqueue
Package pgxqueue implements ListenerPool for PostgreSQL's LISTEN/NOTIFY.
Package pgxqueue implements ListenerPool for PostgreSQL's LISTEN/NOTIFY.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL