pgmq

package module
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2026 License: MIT Imports: 8 Imported by: 0

README

pgmq-go

A Go client for Postgres Message Queue (PGMQ).

Supports PGMQ v1.10.0 and PostgreSQL 16, 17, 18.

CI

Features

  • Struct-based Client with method API
  • Functional options for send and read operations
  • Works with *pgxpool.Pool, *pgx.Conn, and pgx.Tx
  • Message headers support
  • Long-polling reads (read_with_poll)
  • FIFO grouped reads (standard and round-robin)
  • Batch operations (send, read, pop, delete, archive, set_vt)
  • Queue metrics and listing
  • LISTEN/NOTIFY support
  • Visibility timeout with int or timestamp
  • Partitioned and unlogged queues

Installation

go get github.com/0uz/pgmq-go

Quick Start

Start a Postgres instance with PGMQ:

docker run -d --name pgmq-postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 ghcr.io/pgmq/pg18-pgmq:v1.7.0
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"

    pgmq "github.com/0uz/pgmq-go"
)

func main() {
    ctx := context.Background()

    // Create a client from a connection string.
    client, err := pgmq.NewFromConnString(ctx, "postgres://postgres:password@localhost:5432/postgres")
    if err != nil {
        log.Fatal(err)
    }

    // Create the PGMQ extension.
    if err := client.CreateExtension(ctx); err != nil {
        log.Fatal(err)
    }

    // Create a queue.
    if err := client.CreateQueue(ctx, "my_queue"); err != nil {
        log.Fatal(err)
    }

    // Send a message.
    msgID, err := client.Send(ctx, "my_queue", json.RawMessage(`{"hello": "world"}`))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Sent message: %d\n", msgID)

    // Read a message (30 second visibility timeout).
    msg, err := client.Read(ctx, "my_queue", 30)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Read message: %s\n", msg.Message)

    // Archive the message.
    if _, err := client.Archive(ctx, "my_queue", msg.MsgID); err != nil {
        log.Fatal(err)
    }
}

API Reference

Client
// Create from an existing pgx-compatible DB (pool, conn, or tx).
client := pgmq.New(pool)

// Create from a connection string (creates a pool internally).
client, err := pgmq.NewFromConnString(ctx, connString)

// Create the PGMQ extension.
client.CreateExtension(ctx)

// Ping the database.
client.Ping(ctx)
Queue Management
client.CreateQueue(ctx, "my_queue")
client.CreateUnloggedQueue(ctx, "my_queue")
client.CreatePartitionedQueue(ctx, "my_queue", "10000", "100000")
client.DropQueue(ctx, "my_queue")

queues, _ := client.ListQueues(ctx)
count, _ := client.Purge(ctx, "my_queue")
Sending Messages
// Simple send.
id, _ := client.Send(ctx, "my_queue", msg)

// With delay (seconds).
id, _ := client.Send(ctx, "my_queue", msg, pgmq.WithDelay(10))

// With delay (timestamp).
id, _ := client.Send(ctx, "my_queue", msg, pgmq.WithDelayTimestamp(time.Now().Add(time.Minute)))

// With headers.
id, _ := client.Send(ctx, "my_queue", msg, pgmq.WithHeaders(json.RawMessage(`{"key": "val"}`)))

// Combined.
id, _ := client.Send(ctx, "my_queue", msg, pgmq.WithHeaders(headers), pgmq.WithDelay(5))

// Batch send.
ids, _ := client.SendBatch(ctx, "my_queue", msgs)
ids, _ := client.SendBatch(ctx, "my_queue", msgs, pgmq.WithDelay(10))
ids, _ := client.SendBatch(ctx, "my_queue", msgs, pgmq.WithBatchHeaders(headerSlice))
Reading Messages
// Read one message (visibility timeout in seconds, 0 = default 30s).
msg, _ := client.Read(ctx, "my_queue", 30)

// Read batch.
msgs, _ := client.ReadBatch(ctx, "my_queue", 30, 10)

// Read with long-polling.
msgs, _ := client.ReadWithPoll(ctx, "my_queue", 30, 10,
    pgmq.WithMaxPollSeconds(5),
    pgmq.WithPollIntervalMs(100),
)

// Pop (read + delete).
msg, _ := client.Pop(ctx, "my_queue")
msgs, _ := client.PopBatch(ctx, "my_queue", 10)

// Grouped reads (FIFO by x-pgmq-group header).
msgs, _ := client.ReadGrouped(ctx, "my_queue", 30, 10)
msgs, _ := client.ReadGroupedRoundRobin(ctx, "my_queue", 30, 10)
Message Management
// Delete.
ok, _ := client.Delete(ctx, "my_queue", msgID)
deleted, _ := client.DeleteBatch(ctx, "my_queue", msgIDs)

// Archive.
ok, _ := client.Archive(ctx, "my_queue", msgID)
archived, _ := client.ArchiveBatch(ctx, "my_queue", msgIDs)

// Visibility timeout.
msg, _ := client.SetVT(ctx, "my_queue", msgID, 60)             // seconds
msg, _ := client.SetVTTimestamp(ctx, "my_queue", msgID, ts)     // timestamp
msgs, _ := client.SetVTBatch(ctx, "my_queue", msgIDs, 60)
msgs, _ := client.SetVTBatchTimestamp(ctx, "my_queue", msgIDs, ts)
Metrics
m, _ := client.Metrics(ctx, "my_queue")
fmt.Printf("Queue length: %d, Total messages: %d\n", m.QueueLength, m.TotalMessages)

all, _ := client.MetricsAll(ctx)
LISTEN/NOTIFY
// Enable notifications on insert (throttle = 250ms).
client.EnableNotifyInsert(ctx, "my_queue", 250)

// Disable notifications.
client.DisableNotifyInsert(ctx, "my_queue")
Transaction Support
tx, _ := pool.Begin(ctx)
txClient := pgmq.New(tx)

id, _ := txClient.Send(ctx, "my_queue", msg)
msg, _ := txClient.Read(ctx, "my_queue", 30)
txClient.Archive(ctx, "my_queue", id)

tx.Commit(ctx)

Types

Message
type Message struct {
    MsgID      int64
    ReadCount  int64
    EnqueuedAt time.Time
    LastReadAt *time.Time      // PGMQ v1.10.0+
    VT         time.Time
    Message    json.RawMessage
    Headers    json.RawMessage
}
QueueInfo
type QueueInfo struct {
    QueueName     string
    IsPartitioned bool
    IsUnlogged    bool
    CreatedAt     time.Time
}
Metrics
type Metrics struct {
    QueueName          string
    QueueLength        int64
    NewestMsgAgeSec    *int64
    OldestMsgAgeSec    *int64
    TotalMessages      int64
    ScrapeTime         time.Time
    QueueVisibleLength int64
}

Errors

pgmq.ErrNoRows       // No messages available
pgmq.ErrQueueNotFound // Queue does not exist
pgmq.ErrInvalidOption // Conflicting options provided

Documentation

Overview

Package pgmq provides a Go client for Postgres Message Queue (PGMQ) v1.10.0+.

It supports PostgreSQL 16, 17, and 18 with full coverage of the PGMQ SQL API including queue management, message sending/reading, metrics, and LISTEN/NOTIFY.

The client works with any pgx-compatible connection type: *pgxpool.Pool, *pgx.Conn, or pgx.Tx.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoRows is returned when a read or pop operation finds no available messages.
	ErrNoRows = errors.New("pgmq: no rows in result set")

	// ErrQueueNotFound is returned when an operation targets a queue that does not exist.
	ErrQueueNotFound = errors.New("pgmq: queue not found")

	// ErrInvalidOption is returned when conflicting or invalid options are provided.
	ErrInvalidOption = errors.New("pgmq: invalid option")
)

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a PGMQ client that wraps a DB connection.

func New

func New(db DB) *Client

New creates a new PGMQ Client using the provided DB interface. The DB can be a *pgxpool.Pool, *pgx.Conn, or pgx.Tx.

func NewFromConnString

func NewFromConnString(ctx context.Context, connString string) (*Client, error)

NewFromConnString creates a new PGMQ Client by establishing a connection pool from the given connection string.

func (*Client) Archive

func (c *Client) Archive(ctx context.Context, queue string, msgID int64) (bool, error)

Archive moves a single message from the queue to the archive table. Returns true if the message was found and archived, false otherwise. Archived messages can be viewed with: SELECT * FROM pgmq.a_<queue_name>

func (*Client) ArchiveBatch

func (c *Client) ArchiveBatch(ctx context.Context, queue string, msgIDs []int64) ([]int64, error)

ArchiveBatch moves multiple messages from the queue to the archive table. Returns the IDs of messages that were actually archived.

func (*Client) ConvertArchivePartitioned added in v1.0.4

func (c *Client) ConvertArchivePartitioned(ctx context.Context, queue string, partitionInterval string, retentionInterval string, leadingPartition int) error

ConvertArchivePartitioned converts a non-partitioned archive table into a partitioned archive table for the specified queue. partitionInterval and retentionInterval control partition sizing and retention. leadingPartition defaults to 10 in PGMQ.

func (*Client) CreateExtension

func (c *Client) CreateExtension(ctx context.Context) error

CreateExtension creates the PGMQ extension if it does not already exist.

func (*Client) CreateFIFOIndex added in v1.0.4

func (c *Client) CreateFIFOIndex(ctx context.Context, queue string) error

CreateFIFOIndex creates a FIFO index for the specified queue.

func (*Client) CreateFIFOIndexesAll added in v1.0.4

func (c *Client) CreateFIFOIndexesAll(ctx context.Context) error

CreateFIFOIndexesAll creates FIFO indexes for all queues that do not have them.

func (*Client) CreateNonPartitionedQueue added in v1.0.4

func (c *Client) CreateNonPartitionedQueue(ctx context.Context, queue string) error

CreateNonPartitionedQueue creates a new non-partitioned queue. This is functionally equivalent to CreateQueue.

func (*Client) CreatePartitionedQueue

func (c *Client) CreatePartitionedQueue(ctx context.Context, queue string, partitionInterval string, retentionInterval string) error

CreatePartitionedQueue creates a new partitioned queue. Requires the pg_partman extension. partitionInterval controls the range of each partition (e.g. "10000" for ID-based), and retentionInterval controls how long partitions are kept (e.g. "100000").

func (*Client) CreateQueue

func (c *Client) CreateQueue(ctx context.Context, queue string) error

CreateQueue creates a new standard queue with the given name. This sets up the queue's tables, indexes, and metadata.

func (*Client) CreateUnloggedQueue

func (c *Client) CreateUnloggedQueue(ctx context.Context, queue string) error

CreateUnloggedQueue creates a new unlogged queue. Unlogged queues use unlogged tables which do not write to WAL, providing better performance at the cost of durability (data is lost on crash).

func (*Client) DB

func (c *Client) DB() DB

DB returns the underlying DB interface for advanced usage such as starting transactions or running custom queries.

func (*Client) Delete

func (c *Client) Delete(ctx context.Context, queue string, msgID int64) (bool, error)

Delete permanently deletes a single message from the queue by its ID. Returns true if the message was found and deleted, false otherwise. Use Archive instead if you want to retain the message for auditing.

func (*Client) DeleteBatch

func (c *Client) DeleteBatch(ctx context.Context, queue string, msgIDs []int64) ([]int64, error)

DeleteBatch permanently deletes multiple messages from the queue by their IDs. Returns the IDs of messages that were actually deleted.

func (*Client) DetachArchive added in v1.0.4

func (c *Client) DetachArchive(ctx context.Context, queue string) error

DetachArchive detaches a queue's archive table (deprecated in PGMQ; no-op).

func (*Client) DisableNotifyInsert

func (c *Client) DisableNotifyInsert(ctx context.Context, queue string) error

DisableNotifyInsert disables LISTEN/NOTIFY on message inserts for the specified queue.

func (*Client) DropQueue

func (c *Client) DropQueue(ctx context.Context, queue string) error

DropQueue deletes the given queue, including its tables, indexes, and metadata. Returns ErrQueueNotFound if the queue does not exist.

func (*Client) EnableNotifyInsert

func (c *Client) EnableNotifyInsert(ctx context.Context, queue string, throttleIntervalMs int) error

EnableNotifyInsert enables PostgreSQL LISTEN/NOTIFY on message inserts for the specified queue. When enabled, a NOTIFY event is fired when new messages are inserted, allowing consumers to react immediately instead of polling.

The throttleIntervalMs parameter controls the minimum interval between notifications in milliseconds. Use 0 to disable throttling.

func (*Client) ListQueues

func (c *Client) ListQueues(ctx context.Context) ([]QueueInfo, error)

ListQueues returns metadata about all existing PGMQ queues.

func (*Client) Metrics

func (c *Client) Metrics(ctx context.Context, queue string) (*Metrics, error)

Metrics returns statistics for the specified queue including queue length, message ages, and total message count.

func (*Client) MetricsAll

func (c *Client) MetricsAll(ctx context.Context) ([]Metrics, error)

MetricsAll returns statistics for all existing queues.

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) error

Ping verifies the database connection is alive.

func (*Client) Pop

func (c *Client) Pop(ctx context.Context, queue string) (*Message, error)

Pop reads and immediately deletes a single message from the queue. Unlike Read, the visibility timeout does not apply because the message is deleted immediately.

Returns ErrNoRows if the queue is empty.

func (*Client) PopBatch

func (c *Client) PopBatch(ctx context.Context, queue string, qty int64) ([]*Message, error)

PopBatch reads and immediately deletes up to qty messages from the queue. Returns an empty slice if the queue is empty.

func (*Client) Purge

func (c *Client) Purge(ctx context.Context, queue string) (int64, error)

Purge removes all messages from the specified queue. Returns the number of messages that were purged.

func (*Client) Read

func (c *Client) Read(ctx context.Context, queue string, vt int64, opts ...ReadOption) (*Message, error)

Read reads a single message from the queue. The message becomes invisible for the duration of the visibility timeout (vt) in seconds. If vt is 0, the default of 30 seconds is used.

Returns ErrNoRows if no messages are available.

func (*Client) ReadBatch

func (c *Client) ReadBatch(ctx context.Context, queue string, vt int64, numMsgs int64, opts ...ReadOption) ([]*Message, error)

ReadBatch reads up to numMsgs messages from the queue. Each returned message becomes invisible for the duration of the visibility timeout (vt) in seconds. If vt is 0, the default of 30 seconds is used.

Returns an empty slice if no messages are available.

func (*Client) ReadGrouped

func (c *Client) ReadGrouped(ctx context.Context, queue string, vt int64, qty int64) ([]*Message, error)

ReadGrouped reads up to qty messages from the queue using FIFO grouped ordering (similar to AWS SQS message groups). Messages are grouped by the "x-pgmq-group" header value.

func (*Client) ReadGroupedRoundRobin

func (c *Client) ReadGroupedRoundRobin(ctx context.Context, queue string, vt int64, qty int64) ([]*Message, error)

ReadGroupedRoundRobin reads up to qty messages using round-robin grouped ordering.

func (*Client) ReadGroupedRoundRobinWithPoll

func (c *Client) ReadGroupedRoundRobinWithPoll(ctx context.Context, queue string, vt int64, qty int64, opts ...PollOption) ([]*Message, error)

ReadGroupedRoundRobinWithPoll reads up to qty messages using round-robin grouped ordering with long-polling support.

func (*Client) ReadGroupedWithPoll

func (c *Client) ReadGroupedWithPoll(ctx context.Context, queue string, vt int64, qty int64, opts ...PollOption) ([]*Message, error)

ReadGroupedWithPoll reads up to qty messages using FIFO grouped ordering with long-polling support.

func (*Client) ReadWithPoll

func (c *Client) ReadWithPoll(ctx context.Context, queue string, vt int64, numMsgs int64, opts ...PollOption) ([]*Message, error)

ReadWithPoll reads up to numMsgs messages from the queue using long-polling. It will poll for messages for up to maxPollSeconds (default 5) with a polling interval of pollIntervalMs (default 100ms).

Returns an empty slice if no messages become available within the polling window.

func (*Client) Send

func (c *Client) Send(ctx context.Context, queue string, msg json.RawMessage, opts ...SendOption) (int64, error)

Send sends a single message to the specified queue. Returns the message ID.

Options can be provided to set a delay or headers:

client.Send(ctx, "my_queue", msg)
client.Send(ctx, "my_queue", msg, WithDelay(10))
client.Send(ctx, "my_queue", msg, WithHeaders(h))
client.Send(ctx, "my_queue", msg, WithHeaders(h), WithDelay(10))
client.Send(ctx, "my_queue", msg, WithDelayTimestamp(t))
client.Send(ctx, "my_queue", msg, WithHeaders(h), WithDelayTimestamp(t))

func (*Client) SendBatch

func (c *Client) SendBatch(ctx context.Context, queue string, msgs []json.RawMessage, opts ...SendOption) ([]int64, error)

SendBatch sends multiple messages to the specified queue. Returns the message IDs.

Options can be provided to set a delay or per-message headers:

client.SendBatch(ctx, "my_queue", msgs)
client.SendBatch(ctx, "my_queue", msgs, WithDelay(10))
client.SendBatch(ctx, "my_queue", msgs, WithBatchHeaders(headers))
client.SendBatch(ctx, "my_queue", msgs, WithBatchHeaders(headers), WithDelay(10))
client.SendBatch(ctx, "my_queue", msgs, WithDelayTimestamp(t))
client.SendBatch(ctx, "my_queue", msgs, WithBatchHeaders(headers), WithDelayTimestamp(t))

func (*Client) SetVT

func (c *Client) SetVT(ctx context.Context, queue string, msgID int64, vt int64) (*Message, error)

SetVT sets the visibility timeout of a single message to vt seconds from now. Returns the updated message record. Returns ErrNoRows if the message is not found.

func (*Client) SetVTBatch

func (c *Client) SetVTBatch(ctx context.Context, queue string, msgIDs []int64, vt int64) ([]*Message, error)

SetVTBatch sets the visibility timeout of multiple messages to vt seconds from now. Returns the updated message records.

func (*Client) SetVTBatchTimestamp

func (c *Client) SetVTBatchTimestamp(ctx context.Context, queue string, msgIDs []int64, vt time.Time) ([]*Message, error)

SetVTBatchTimestamp sets the visibility timeout of multiple messages to a specific timestamp. Returns the updated message records.

func (*Client) SetVTTimestamp

func (c *Client) SetVTTimestamp(ctx context.Context, queue string, msgID int64, vt time.Time) (*Message, error)

SetVTTimestamp sets the visibility timeout of a single message to a specific timestamp. Returns the updated message record. Returns ErrNoRows if the message is not found.

type DB

type DB interface {
	Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error)
	Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
}

DB is an interface satisfied by *pgxpool.Pool, *pgx.Conn, and pgx.Tx.

type Message

type Message struct {
	MsgID      int64
	ReadCount  int64
	EnqueuedAt time.Time
	LastReadAt *time.Time // Available in PGMQ v1.10.0+
	VT         time.Time
	Message    json.RawMessage
	Headers    json.RawMessage
}

Message represents a message read from a PGMQ queue.

type Metrics

type Metrics struct {
	QueueName          string
	QueueLength        int64
	NewestMsgAgeSec    *int64
	OldestMsgAgeSec    *int64
	TotalMessages      int64
	ScrapeTime         time.Time
	QueueVisibleLength int64
}

Metrics represents statistics for a single PGMQ queue.

type PollOption

type PollOption func(*pollOpts)

PollOption configures optional parameters for ReadWithPoll.

func WithMaxPollSeconds

func WithMaxPollSeconds(seconds int) PollOption

WithMaxPollSeconds sets the maximum number of seconds to poll. Default is 5.

func WithPollConditional

func WithPollConditional(filter json.RawMessage) PollOption

WithPollConditional sets an experimental JSONB filter for conditional poll reads.

func WithPollIntervalMs

func WithPollIntervalMs(ms int) PollOption

WithPollIntervalMs sets the polling interval in milliseconds. Default is 100.

type QueueInfo

type QueueInfo struct {
	QueueName     string
	IsPartitioned bool
	IsUnlogged    bool
	CreatedAt     time.Time
}

QueueInfo represents metadata about a PGMQ queue, returned by ListQueues.

type ReadOption

type ReadOption func(*readOpts)

ReadOption configures optional parameters for Read and ReadBatch.

func WithConditional

func WithConditional(filter json.RawMessage) ReadOption

WithConditional sets an experimental JSONB filter for conditional reads.

type SendOption

type SendOption func(*sendOpts)

SendOption configures optional parameters for Send and SendBatch.

func WithBatchHeaders

func WithBatchHeaders(headers []json.RawMessage) SendOption

WithBatchHeaders sets per-message JSONB headers for a batch send. The length of the headers slice should match the number of messages. Used with SendBatch.

func WithDelay

func WithDelay(seconds int) SendOption

WithDelay sets the message delay in seconds.

func WithDelayTimestamp

func WithDelayTimestamp(t time.Time) SendOption

WithDelayTimestamp sets the message delay as an absolute timestamp. The message will become visible at the specified time.

func WithHeaders

func WithHeaders(h json.RawMessage) SendOption

WithHeaders sets JSONB headers (metadata) on the message. Used with Send for a single message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL