pgmq

package module
v0.0.0-...-14f6485 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2025 License: MIT Imports: 8 Imported by: 0

README

go-pgmq

Go Reference Go Report Card codecov

Another Go (Golang) client for Postgres Message Queue (PGMQ) version 1.5.0.

It is derived from pgmq-go, which is based loosely on the Rust client.

The current implementation works with the latest PGMQ (currently v1.5.0) and introduces transactions support. Future PGMQ versions will be followed closely, as they are released.

Also, being more opinionated, it depends on the following modules:

Table of Contents

Usage

Start a PostgreSQL instance with the PGMQ extension installed:

docker run -d --name postgres -e POSTGRES_PASSWORD=password -p 5432:5432 tembo.docker.scarf.sh/tembo/pg17-pgmq:latest

Then, in your project, do something simmilar to:

package main

import (
    "context"
    "fmt"

    "github.com/softexpert/go-pgmq"
)

func main() {
    ctx := context.Background()

    q, err := pgmq.NewFromPgxConnStr(ctx, "postgres://postgres:password@localhost:5432/postgres")
    if err != nil {
        panic(err)
    }

    q.WithDefaultVT(45) // set default visibility delay to 45 seconds

    err = q.CreateQueue(ctx, "my_queue")
    if err != nil {
        panic(err)
    }

    id, err := q.Send(ctx, "my_queue", json.RawMessage(`{"foo": "bar"}`), json.RawMessage(`{}`))
    if err != nil {
        panic(err)
    }

    // retrieve the message, rendering it invisible for 30s
    msg, err := q.Read(ctx, "my_queue", 30)
    if err != nil {
        panic(err)
    }

    // make message "invisible" for 180s
    msg, err := q.SetVT(ctx, "my_queue", id, 180)
    if err != nil {
        panic(err)
    }

    // Archive the message by moving it to the "pgmq.a_<queue_name>" table.
    // Alternatively, you can `Delete` the message, or read and delete in one
    // call by using `Pop`.
    _, err = q.Archive(ctx, "my_queue", id)
    if err != nil {
        panic(err)
    }
}

Concepts

Constructors

A connection string can be used directly to establish the connection to the underlying database.

    // hypothetical configuration; please adjust according to your needs
    q, err := pgmq.NewFromPgxConnStr(ctx, "postgres://postgres:password@localhost:5432/postgres")

A pgxpool.Config object can be provided to establish the connection to the underlying database.

    // hypothetical configuration; please adjust according to your needs
    q, err := pgmq.NewFromPgxConfig(ctx, &pgxpool.Config{
			ConnConfig: &pgx.ConnConfig{
				Config: pgconn.Config{
					Host:           "myhost",
					Port:           5432,
					Database:       "mydatabase",
					User:           "postgres",
					Password:       "password",
					ConnectTimeout: time.Second * 90,
				},
				StatementCacheCapacity: 100,
				DefaultQueryExecMode:   pgx.QueryExecModeDescribeExec,
			},
			MaxConns:              100,
			MinConns:              10,
			HealthCheckPeriod:     time.Second * 90,
			MaxConnLifetime:       time.Minute * 5,
			MaxConnIdleTime:       time.Minute,
			MaxConnLifetimeJitter: time.Second * 20,
		})

An existing pgxpool.Pool object can be used to establish connection to the underlying database.

    // hypothetical configuration; please adjust according to your needs
    myPool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
    ...
    q, err := pgmq.NewFromPgxPool(ctx, myPool)
Other initialization parameters

Contributions

Your contributions are welcome ❤ .

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoRows = errors.New("gopgmq: no rows in result set")

Functions

This section is empty.

Types

type PgMQ

type PgMQ struct {
	ActiveOps atomic.Int64 // number of ongoing activities - used to properly shutdown
	// contains filtered or unexported fields
}

func NewFromPgxConfig

func NewFromPgxConfig(ctx context.Context, pgConf *pgxpool.Config) (pgMQ *PgMQ, pool *pgxpool.Pool, err error)

NewFromPgxConfig creates a PgMQ object if pgConf is valid and a connexion is established with the underlying database

func NewFromPgxConnStr

func NewFromPgxConnStr(ctx context.Context, connString string) (pgMQ *PgMQ, pool *pgxpool.Pool, err error)

NewFromPgxConnStr creates a PgMQ object if connString is valid and a connexion is established with the underlying database

func NewFromPgxPool

func NewFromPgxPool(ctx context.Context, srcpool *pgxpool.Pool) (pgMQ *PgMQ, pool *pgxpool.Pool, err error)

NewFromPgxPool creates a PgMQ object if the provided srcpool has a valid connexion to the underlying database and the pgmq extension is available

func (*PgMQ) Archive

func (p *PgMQ) Archive(ctx context.Context, queue string, msgID int64) (archived bool, err error)

Archive moves a message from the queue table to the archive table by its id. View messages on the archive table with sql:

SELECT * FROM pgmq.a_<queue_name>;

func (*PgMQ) ArchiveBatch

func (p *PgMQ) ArchiveBatch(ctx context.Context, queue string, msgIDs []int64) (archived []int64, err error)

ArchiveBatch moves a batch of messages from the queue table to the archive table by their ids. View messages on the archive table with sql:

SELECT * FROM pgmq.a_<queue_name>;

func (*PgMQ) ArchiveBatchTX

func (p *PgMQ) ArchiveBatchTX(ctx context.Context, queue string, msgIDs []int64) (tx pgx.Tx, archived []int64, err error)

ArchiveBatchTX creates a transaction and moves a batch of messages from the queue table to the archive table by their ids. Messages can be viewed on the archive table with sql:

SELECT * FROM pgmq.a_<queue_name>;

Returned transaction will be committed / rolled back by the caller

func (*PgMQ) ArchiveBatchWithTX

func (p *PgMQ) ArchiveBatchWithTX(ctx context.Context, tx pgx.Tx, queue string, msgIDs []int64) (archived []int64, err error)

ArchiveBatchWithTX receives a transaction and moves a batch of messages from the queue table to the archive table by their ids. Messages can be viewed on the archive table with sql:

SELECT * FROM pgmq.a_<queue_name>;

Transaction will be committed / rolled back by the caller

func (*PgMQ) ArchiveTX

func (p *PgMQ) ArchiveTX(ctx context.Context, queue string, msgID int64) (tx pgx.Tx, archived bool, err error)

ArchiveTX creates a transaction and moves a message from the queue table to the archive table by its id. Messages can be viewed on the archive table with sql:

SELECT * FROM pgmq.a_<queue_name>;

Returned transaction will be committed / rolled back by the caller

func (*PgMQ) ArchiveWithTX

func (p *PgMQ) ArchiveWithTX(ctx context.Context, tx pgx.Tx, queue string, msgID int64) (archived bool, err error)

ArchiveWithTX receives a transaction and moves a message from the queue table to the archive table by its id. Messages can be viewed on the archive table with sql:

SELECT * FROM pgmq.a_<queue_name>;

Transaction will be committed / rolled back by the caller

func (*PgMQ) Close

func (p *PgMQ) Close()

Close will cancel the context and wait for all transactions to be terminated

func (*PgMQ) CreateQueue

func (p *PgMQ) CreateQueue(ctx context.Context, queue string) (err error)

CreateQueue creates a new unpartitioned queue. It prepares the queue's tables, indexes, and metadata.

func (*PgMQ) CreateUnloggedQueue

func (p *PgMQ) CreateUnloggedQueue(ctx context.Context, queue string) (err error)

CreateUnloggedQueue creates a new unlogged queue, which uses an unlogged table under the hood. This sets up the queue's tables, indexes, and metadata.

func (*PgMQ) Delete

func (p *PgMQ) Delete(ctx context.Context, queue string, msgID int64) (deleted bool, err error)

Delete deletes a message from the queue by its id. This is a permanent delete and cannot be undone. If you want to retain a log of the message, use the Archive method.

func (*PgMQ) DeleteBatch

func (p *PgMQ) DeleteBatch(ctx context.Context, queue string, msgIDs []int64) (deleted []int64, err error)

DeleteBatch deletes a batch of messages from the queue by their ids. This is a permanent delete and cannot be undone. If you want to retain a log of the messages, use the ArchiveBatch method.

func (*PgMQ) DeleteBatchTX

func (p *PgMQ) DeleteBatchTX(ctx context.Context, queue string, msgIDs []int64) (tx pgx.Tx, deleted []int64, err error)

DeleteBatchTX creates a transaction and deletes a batch of messages from the queue by their ids. This is a permanent delete and cannot be undone. To retain a log of the messages, ArchiveBatch method has to be used.

Returned transaction will be committed / rolled back by the caller

func (*PgMQ) DeleteBatchWithTX

func (p *PgMQ) DeleteBatchWithTX(ctx context.Context, tx pgx.Tx, queue string, msgIDs []int64) (deleted []int64, err error)

DeleteBatchWithTX receives a transaction and deletes a batch of messages from the queue by their ids. This is a permanent delete and cannot be undone. To retain a log of the messages, the ArchiveBatch method has to be used.

Transaction will be committed / rolled back by the caller

func (*PgMQ) DeleteTX

func (p *PgMQ) DeleteTX(ctx context.Context, queue string, msgID int64) (tx pgx.Tx, deleted bool, err error)

DeleteTX creates a transaction and deletes a message from the queue by its id. This is a permanent delete and cannot be undone. To retain a log of the message, Archive method has to be used.

Returned transaction will be committed / rolled back by the caller

func (*PgMQ) DeleteWithTX

func (p *PgMQ) DeleteWithTX(ctx context.Context, tx pgx.Tx, queue string, msgID int64) (deleted bool, err error)

DeleteTX receives a transaction and deletes a message from the queue by its id. This is a permanent delete and cannot be undone. To retain a log of the message, Archive method has to be used.

Transaction will be committed / rolled back by the caller

func (*PgMQ) DropQueue

func (p *PgMQ) DropQueue(ctx context.Context, queue string) (err error)

DropQueue deletes the given queue. It deletes the queue's tables, indices, and metadata. It will return an error if the queue does not exist.

func (*PgMQ) Exec

func (p *PgMQ) Exec(ctx context.Context, sql string, args ...any) (r pgx.Rows, err error)

Exec executes given query against the current DB connection

Returned pgx.Rows must be closed by the calling code

func (*PgMQ) ExecTX

func (p *PgMQ) ExecTX(ctx context.Context, sql string, args ...any) (tx pgx.Tx, r pgx.Rows, err error)

ExecTX creates a transaction and executes the given query against the current DB connection

Returned transaction will be committed / rolled back by the caller

func (*PgMQ) ExecWithTX

func (p *PgMQ) ExecWithTX(ctx context.Context, tx pgx.Tx, sql string, args ...any) (r pgx.Rows, err error)

ExecWithTX receives a transaction and executes the given query against the current DB connection

Transaction will be committed / rolled back by the caller

func (*PgMQ) Ping

func (p *PgMQ) Ping() error

Ping will check if the underlying DB connection is alive

func (*PgMQ) Pool

func (p *PgMQ) Pool() *pgxpool.Pool

Pool returns current pool

func (*PgMQ) Pop

func (p *PgMQ) Pop(ctx context.Context, queue string) (_ *PgMQMessage, err error)

Pop reads single message from the queue and deletes it at the same time. Similar to Read and ReadBatch if no messages are available an ErrNoRows is returned. Unlike these methods, the visibility timeout does not apply. This is because the message is immediately deleted.

func (*PgMQ) PopTX

func (p *PgMQ) PopTX(ctx context.Context, queue string) (tx pgx.Tx, _ *PgMQMessage, err error)

PopTX creates a transaction, reads single message from the queue and deletes it at the same time. Similar to ReadTX and ReadBatchTX, if no messages are available an ErrNoRows is returned. The visibility timeout does not apply - the message is immediately deleted.

Returned transaction will be committed / rolled back by the caller

func (*PgMQ) PopWithTX

func (p *PgMQ) PopWithTX(ctx context.Context, tx pgx.Tx, queue string) (_ *PgMQMessage, err error)

PopWithTX receives a transaction, reads single message from the queue and deletes it at the same time. Similar to ReadWithTX and ReadBatchWithTX, if no messages are available an ErrNoRows is returned. The visibility timeout does not apply - the message is immediately deleted.

Transaction will be committed / rolled back by the caller

func (*PgMQ) Read

func (p *PgMQ) Read(ctx context.Context, queue string, vt int64, condition json.RawMessage) (_ *PgMQMessage, err error)

Read a single message from the queue. If the queue is empty or all messages are invisible, an ErrNoRows errors is returned. If a message is returned, it is made invisible for the duration of the visibility timeout (vt) in seconds. A negative visibility value means using the default visibility value.

func (*PgMQ) ReadBatch

func (p *PgMQ) ReadBatch(ctx context.Context, queue string, vt int64, numMsgs int64, condition json.RawMessage) (msgs []*PgMQMessage, err error)

ReadBatch reads a specified number of messages from the queue. Any messages that are returned are made invisible for the duration of the visibility timeout (vt) in seconds. A negative visibility value means using the default visibility value.

func (*PgMQ) ReadBatchTX

func (p *PgMQ) ReadBatchTX(ctx context.Context, queue string, vt int64, numMsgs int64, condition json.RawMessage) (tx pgx.Tx, msgs []*PgMQMessage, err error)

ReadBatchTX creates a transaction and reads a specified number of messages from the queue. Any messages that are returned are made invisible for the duration of the visibility timeout (vt) in seconds. If vt is negative, the default visibility duration is used.

Returned transaction will be committed / rolled back by the caller

func (*PgMQ) ReadBatchWithTX

func (p *PgMQ) ReadBatchWithTX(ctx context.Context, tx pgx.Tx, queue string, vt int64, numMsgs int64, condition json.RawMessage) (msgs []*PgMQMessage, err error)

ReadBatchWithTX receives a transaction and reads a specified number of messages from the queue. Any messages that are returned are made invisible for the duration of the visibility timeout (vt) in seconds. If vt is negative, the default visibility duration is used.

Transaction will be committed / rolled back by the caller

func (*PgMQ) ReadTX

func (p *PgMQ) ReadTX(ctx context.Context, queue string, vt int64, condition json.RawMessage) (tx pgx.Tx, _ *PgMQMessage, err error)

ReadTX creates a transaction and reads a single message from the queue. If the queue is empty or all messages are invisible, an ErrNoRows errors is returned. If a message is returned, it is made invisible for the duration of the visibility timeout (vt) in seconds. If vt is negative, the default visibility duration is used.

Returned transaction will be committed / rolled back by the caller

func (*PgMQ) ReadWithTX

func (p *PgMQ) ReadWithTX(ctx context.Context, tx pgx.Tx, queue string, vt int64, condition json.RawMessage) (_ *PgMQMessage, err error)

ReadWithTX receives a transaction and reads a single message from the queue. If the queue is empty or all messages are invisible, an ErrNoRows errors is returned. If a message is returned, it is made invisible for the duration of the visibility timeout (vt) in seconds. If vt is negative, the default visibility duration is used. A transaction is received and used.

Transaction will be committed / rolled back by the caller

func (*PgMQ) Send

func (p *PgMQ) Send(ctx context.Context, queue string, msg json.RawMessage, headers json.RawMessage) (int64, error)

Send sends a single message to a queue. The message id, unique to the queue, is returned.

func (*PgMQ) SendBatch

func (p *PgMQ) SendBatch(ctx context.Context, queue string, msgs []json.RawMessage, headers []json.RawMessage) ([]int64, error)

SendBatch sends a batch of messages to a queue. The message ids, unique to the queue, are returned.

func (*PgMQ) SendBatchTX

func (p *PgMQ) SendBatchTX(ctx context.Context, queue string, msgs []json.RawMessage, headers []json.RawMessage) (tx pgx.Tx, msgIDs []int64, err error)

SendBatchTX starts a transaction and sends a batch of messages to a queue. The transaction and the message ids, unique to the queue, are returned.

Returned transaction will be committed / rolled back by the caller

func (*PgMQ) SendBatchWithDelay

func (p *PgMQ) SendBatchWithDelay(ctx context.Context, queue string, msgs []json.RawMessage, headers []json.RawMessage, delay int64) (msgIDs []int64, err error)

SendBatchWithDelay sends a batch of messages to a queue with a delay. The delay is specified in seconds. A negative duration value means using the default visibility value. The message ids, unique to the queue, are returned.

func (*PgMQ) SendBatchWithDelayTX

func (p *PgMQ) SendBatchWithDelayTX(ctx context.Context, queue string, msgs []json.RawMessage, headers []json.RawMessage, delay int64) (tx pgx.Tx, msgIDs []int64, err error)

SendBatchWithDelayTX starts a transaction and sends a batch of messages to a queue with a delay. The delay is specified in seconds; negative delay value means use the default value. The transaction and the message ids, unique to the queue, are returned.

Returned transaction will be committed / rolled back by the caller

func (*PgMQ) SendBatchWithTX

func (p *PgMQ) SendBatchWithTX(ctx context.Context, tx pgx.Tx, queue string, msgs []json.RawMessage, headers []json.RawMessage) (msgIDs []int64, err error)

SendBatchTX receives a transaction and sends a batch of messages to a queue. The message ids, unique to the queue, are returned.

Transaction will be committed / rolled back by the caller

func (*PgMQ) SendBatchWithTXDelay

func (p *PgMQ) SendBatchWithTXDelay(ctx context.Context, tx pgx.Tx, queue string, msgs []json.RawMessage, headers []json.RawMessage, delay int64) (msgIDs []int64, err error)

SendBatchWithDelayTX receives a transaction and sends a batch of messages to a queue with a delay. The delay is specified in seconds; negative delay value means use the default value. The message ids, unique to the queue, are returned.

Transaction will be committed / rolled back by the caller

func (*PgMQ) SendTX

func (p *PgMQ) SendTX(ctx context.Context, queue string, msg json.RawMessage, headers json.RawMessage) (pgx.Tx, int64, error)

SendTX starts a transaction and sends a single message to a queue. The transaction and the message id, unique to the queue, are returned.

Returned transaction will be committed / rolled back by the caller

func (*PgMQ) SendWithDelay

func (p *PgMQ) SendWithDelay(ctx context.Context, queue string, msg json.RawMessage, headers json.RawMessage, delay int64) (msgID int64, err error)

SendWithDelay sends a single message to a queue with a delay. The delay is specified in seconds. A negative duration value means using the default visibility value. The message id, unique to the queue, is returned.

func (*PgMQ) SendWithDelayTX

func (p *PgMQ) SendWithDelayTX(ctx context.Context, queue string, msg json.RawMessage, headers json.RawMessage, delay int64) (tx pgx.Tx, msgID int64, err error)

SendWithDelayTX starts a transaction and sends a single message to a queue with a delay. The delay is specified in seconds; negative delay value means use the default value. The transaction and the message id, unique to the queue, are returned.

Returned transaction will be committed / rolled back by the caller

func (*PgMQ) SendWithTX

func (p *PgMQ) SendWithTX(ctx context.Context, tx pgx.Tx, queue string, msg json.RawMessage, headers json.RawMessage) (int64, error)

SendTX starts a transaction and sends a single message to a queue. The transaction and the message id, unique to the queue, are returned.

Transaction will be committed / rolled back by the caller

func (*PgMQ) SendWithTxDelay

func (p *PgMQ) SendWithTxDelay(ctx context.Context, tx pgx.Tx, queue string, msg json.RawMessage, headers json.RawMessage, delay int64) (msgID int64, err error)

SendWithTxDelay receives a transaction and sends a single message to a queue with a delay. The delay is specified in seconds; negative delay value means use the default value. The message id, unique to the queue, is returned.

Transaction will be committed / rolled back by the caller

func (*PgMQ) SetVT

func (p *PgMQ) SetVT(ctx context.Context, queue string, msgID int64, invTime int64) (_ int64, err error)

SetVT sets a new visibility time for a message from the queue.

If invTime is negative, then the defaultVT value is applied

func (*PgMQ) SetVTTX

func (p *PgMQ) SetVTTX(ctx context.Context, queue string, msgID int64, invTime int64) (tx pgx.Tx, _ int64, err error)

SetVTTX creates a transaction and sets a new visibility time for a message from the queue.

Returned transaction will be committed / rolled back by the caller

func (*PgMQ) SetVTWithTX

func (p *PgMQ) SetVTWithTX(ctx context.Context, tx pgx.Tx, queue string, msgID int64, invTime int64) (_ int64, err error)

SetVT receives a transaction and sets a new visibility time for a message from the queue.

Transaction will be committed / rolled back by the caller

func (*PgMQ) WithDefaultTxOptions

func (p *PgMQ) WithDefaultTxOptions(newTxOpt pgx.TxOptions) *PgMQ

WithDefaultTxOptions will change the default TX options

func (*PgMQ) WithDefaultVT

func (p *PgMQ) WithDefaultVT(newVT int64) *PgMQ

WithDefaultVT will change the default visibility time value

type PgMQMessage

type PgMQMessage struct {
	MsgID      int64
	ReadCount  int64
	EnqueuedAt time.Time
	// VT is "visibility time" - the UTC timestamp at which the message will
	// be available again for reading.
	VT      time.Time
	Message json.RawMessage
	Headers json.RawMessage
}

type PgMQQueueRecord

type PgMQQueueRecord struct {
	QueueName     string
	IsPartitioned bool
	IsUnlogged    bool
	CreatedAt     time.Time
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL