pgqueue

package module
v0.0.0-...-1e697c3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2021 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package pgqueue implements a durable, at-least-once, optionally ordered message queue on top of PostgreSQL.

Index

Constants

View Source
const MarkAsDeliveredSQL string = `
	deliveries = deliveries + 1,
	last_delivered_at = NOW() AT TIME ZONE 'UTC'
`
View Source
const UpdateLastAckSQL string = `
	last_ack_at = NOW() AT TIME ZONE 'UTC'
`

Variables

View Source
var (
	Ordered   = OrderGuarantee{/* contains filtered or unexported fields */}
	Unordered = OrderGuarantee{/* contains filtered or unexported fields */}
)
View Source
var ErrRequeued = errors.New("a message was requeued; redelivery not guaranteed unless consuming starts again")

Functions

This section is empty.

Types

type AcceptFunc

type AcceptFunc = func(context.Context, func(Delivery)) error

type AcceptQueriesFunc

type AcceptQueriesFunc = func(context.Context, func(QueryWithArgs)) error

type Ack

type Ack bool
const (
	OK      Ack = true
	Requeue Ack = false
)

func (Ack) String

func (ack Ack) String() string

type ConsumeFunc

type ConsumeFunc = func(context.Context, GetHandler) error

func Subscribe

func Subscribe(ctx context.Context, driver SubscriptionDriver) (consume ConsumeFunc, err error)

Subscribe creates a subscription and returns a function to consume from it.

A published message will be copied to all existing subscriptions at the time, even if they aren't any active consumers from it.

It depends on the provided SubscriptionDriver how message delivery for concurrent consumers to the same subscription behaves.

type Delivery

type Delivery struct {
	// Unwrap unwraps the delivery as it comes from the queue into a value
	// that a handler can use.
	Unwrap  func(into interface{}) error
	OK      func(context.Context) error
	Requeue func(context.Context) error
}

A Delivery is an attempted delivery of a message.

func (Delivery) Ack

func (d Delivery) Ack(ctx context.Context, ack Ack) error

type DeliveryIterator

type DeliveryIterator struct {
	Next     coro.Resume
	Yielded  Delivery
	Returned error
}

func NewDeliveryIterator

func NewDeliveryIterator(g func(func() error), f func(yield func(Delivery)) error, options ...coro.SetOption) *DeliveryIterator

type DeliveryRows

type DeliveryRows struct {
	Rows sqler.Rows
	sqler.Tx
	Deliver func(Delivery)
}

DeliveryRows is a row for a delivery with a transaction to ACK it.

type DeliveryRowsIterator

type DeliveryRowsIterator struct {
	Next     coro.Resume
	Yielded  DeliveryRows
	Returned error
}

func NewDeliveryRowsIterator

func NewDeliveryRowsIterator(g func(func() error), f func(yield func(DeliveryRows)) error, options ...coro.SetOption) *DeliveryRowsIterator

type GetHandler

type GetHandler = func() (unwrapInto interface{}, handle HandleFunc)

GetHandler is a function that is called with each incoming message. The function provides a value to unwrap the message into, and a handler function to then use this value.

When a message arrives as a Delivery from the ListenForDeliveries or FetchPendingDeliveries methods of the SubscriptionDriver, this function, provided to the consume function returned by Subscriber, is called. Then, the Delivery's UnwrapMessage is called with the returned unwrapInto value. If that doesn't fail, the handle function is called.

The handle function should return OK to acknowledge that the message has been processed and should be removed from the queue, or Requeue otherwise.

type HandleFunc

type HandleFunc = func(context.Context) (context.Context, Ack)

type NextRowFunc

type NextRowFunc sqlcoro.NextFunc

type OrderGuarantee

type OrderGuarantee struct {
	// contains filtered or unexported fields
}

func (OrderGuarantee) FetchIncomingRows

func (o OrderGuarantee) FetchIncomingRows(
	ctx context.Context,
	yieldDelivery func(Delivery),
	beginRowDelivery beginRowDeliveryFunc,
	query func(context.Context, sqler.Queryer, func(baseQuery string) string) (sqler.Rows, error),
) error

func (OrderGuarantee) FetchPendingRows

func (o OrderGuarantee) FetchPendingRows(
	ctx context.Context,
	yieldDelivery func(Delivery),
	beginRowDelivery beginRowDeliveryFunc,
	query func(context.Context, sqler.Queryer, func(baseQuery string) string) (sqler.Rows, error),
) error

type PQSubscriptionDriver

type PQSubscriptionDriver struct {
	DB                           sqler.DB
	ExecInsertSubscription       func(context.Context) error
	ListenForIncomingBaseQueries func(context.Context) (accept AcceptQueriesFunc, close func() error, err error)
	PendingBaseQuery             QueryWithArgs
	RowsToDeliveries             func(context.Context, *DeliveryRowsIterator) error
	Ordered                      OrderGuarantee
}

func (PQSubscriptionDriver) FetchPendingDeliveries

func (drv PQSubscriptionDriver) FetchPendingDeliveries(ctx context.Context, yield func(Delivery)) error

func (PQSubscriptionDriver) InsertSubscription

func (drv PQSubscriptionDriver) InsertSubscription(ctx context.Context) error

func (PQSubscriptionDriver) ListenForDeliveries

func (drv PQSubscriptionDriver) ListenForDeliveries(ctx context.Context) (accept AcceptFunc, close func() error, err error)

type Panic

type Panic struct {
	// contains filtered or unexported fields
}

A Panic is a panic captured as an error. Is is returned by the consume function returned by Subscribe when either the deliveries listener or the provided handler panic.

func (Panic) Error

func (p Panic) Error() string

func (Panic) Unwrap

func (p Panic) Unwrap() error

type QueryWithArgs

type QueryWithArgs struct {
	Query string
	Args  []interface{}
}

func NewQueryWithArgs

func NewQueryWithArgs(query string, args ...interface{}) QueryWithArgs

func (QueryWithArgs) PollIncomingDeliveries

func (q QueryWithArgs) PollIncomingDeliveries(each time.Duration) func(context.Context) (accept AcceptQueriesFunc, close func() error, err error)

type QueryWithArgsIterator

type QueryWithArgsIterator struct {
	Next     coro.Resume
	Yielded  QueryWithArgs
	Returned error
}

func NewQueryWithArgsIterator

func NewQueryWithArgsIterator(g func(func() error), f func(yield func(QueryWithArgs)) error, options ...coro.SetOption) *QueryWithArgsIterator

type SubscriptionDriver

type SubscriptionDriver interface {
	InsertSubscription(context.Context) error
	ListenForDeliveries(context.Context) (accept AcceptFunc, close func() error, err error)
	FetchPendingDeliveries(context.Context, func(Delivery)) error
}

A SubscriptionDriver is the abstract interface that

Directories

Path Synopsis
Package stopcontext extentds the standard Context interface with an additional Stopped signal, intended for gracefully stopping some iterative process associated with the Context.
Package stopcontext extentds the standard Context interface with an additional Stopped signal, intended for gracefully stopping some iterative process associated with the Context.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL