Documentation
¶
Overview ¶
Package pgmq provides a Go client for Postgres Message Queue (PGMQ) v1.10.0+.
It supports PostgreSQL 16, 17, and 18 with full coverage of the PGMQ SQL API including queue management, message sending/reading, metrics, and LISTEN/NOTIFY.
The client works with any pgx-compatible connection type: *pgxpool.Pool, *pgx.Conn, or pgx.Tx.
Index ¶
- Variables
- type Client
- func (c *Client) Archive(ctx context.Context, queue string, msgID int64) (bool, error)
- func (c *Client) ArchiveBatch(ctx context.Context, queue string, msgIDs []int64) ([]int64, error)
- func (c *Client) ConvertArchivePartitioned(ctx context.Context, queue string, partitionInterval string, ...) error
- func (c *Client) CreateExtension(ctx context.Context) error
- func (c *Client) CreateFIFOIndex(ctx context.Context, queue string) error
- func (c *Client) CreateFIFOIndexesAll(ctx context.Context) error
- func (c *Client) CreateNonPartitionedQueue(ctx context.Context, queue string) error
- func (c *Client) CreatePartitionedQueue(ctx context.Context, queue string, partitionInterval string, ...) error
- func (c *Client) CreateQueue(ctx context.Context, queue string) error
- func (c *Client) CreateUnloggedQueue(ctx context.Context, queue string) error
- func (c *Client) DB() DB
- func (c *Client) Delete(ctx context.Context, queue string, msgID int64) (bool, error)
- func (c *Client) DeleteBatch(ctx context.Context, queue string, msgIDs []int64) ([]int64, error)
- func (c *Client) DetachArchive(ctx context.Context, queue string) error
- func (c *Client) DisableNotifyInsert(ctx context.Context, queue string) error
- func (c *Client) DropQueue(ctx context.Context, queue string) error
- func (c *Client) EnableNotifyInsert(ctx context.Context, queue string, throttleIntervalMs int) error
- func (c *Client) ListQueues(ctx context.Context) ([]QueueInfo, error)
- func (c *Client) Metrics(ctx context.Context, queue string) (*Metrics, error)
- func (c *Client) MetricsAll(ctx context.Context) ([]Metrics, error)
- func (c *Client) Ping(ctx context.Context) error
- func (c *Client) Pop(ctx context.Context, queue string) (*Message, error)
- func (c *Client) PopBatch(ctx context.Context, queue string, qty int64) ([]*Message, error)
- func (c *Client) Purge(ctx context.Context, queue string) (int64, error)
- func (c *Client) Read(ctx context.Context, queue string, vt int64, opts ...ReadOption) (*Message, error)
- func (c *Client) ReadBatch(ctx context.Context, queue string, vt int64, numMsgs int64, opts ...ReadOption) ([]*Message, error)
- func (c *Client) ReadGrouped(ctx context.Context, queue string, vt int64, qty int64) ([]*Message, error)
- func (c *Client) ReadGroupedRoundRobin(ctx context.Context, queue string, vt int64, qty int64) ([]*Message, error)
- func (c *Client) ReadGroupedRoundRobinWithPoll(ctx context.Context, queue string, vt int64, qty int64, opts ...PollOption) ([]*Message, error)
- func (c *Client) ReadGroupedWithPoll(ctx context.Context, queue string, vt int64, qty int64, opts ...PollOption) ([]*Message, error)
- func (c *Client) ReadWithPoll(ctx context.Context, queue string, vt int64, numMsgs int64, opts ...PollOption) ([]*Message, error)
- func (c *Client) Send(ctx context.Context, queue string, msg json.RawMessage, opts ...SendOption) (int64, error)
- func (c *Client) SendBatch(ctx context.Context, queue string, msgs []json.RawMessage, opts ...SendOption) ([]int64, error)
- func (c *Client) SetVT(ctx context.Context, queue string, msgID int64, vt int64) (*Message, error)
- func (c *Client) SetVTBatch(ctx context.Context, queue string, msgIDs []int64, vt int64) ([]*Message, error)
- func (c *Client) SetVTBatchTimestamp(ctx context.Context, queue string, msgIDs []int64, vt time.Time) ([]*Message, error)
- func (c *Client) SetVTTimestamp(ctx context.Context, queue string, msgID int64, vt time.Time) (*Message, error)
- type DB
- type Message
- type Metrics
- type PollOption
- type QueueInfo
- type ReadOption
- type SendOption
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoRows is returned when a read or pop operation finds no available messages. ErrNoRows = errors.New("pgmq: no rows in result set") // ErrQueueNotFound is returned when an operation targets a queue that does not exist. ErrQueueNotFound = errors.New("pgmq: queue not found") // ErrInvalidOption is returned when conflicting or invalid options are provided. ErrInvalidOption = errors.New("pgmq: invalid option") )
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a PGMQ client that wraps a DB connection.
func New ¶
New creates a new PGMQ Client using the provided DB interface. The DB can be a *pgxpool.Pool, *pgx.Conn, or pgx.Tx.
func NewFromConnString ¶
NewFromConnString creates a new PGMQ Client by establishing a connection pool from the given connection string.
func (*Client) Archive ¶
Archive moves a single message from the queue to the archive table. Returns true if the message was found and archived, false otherwise. Archived messages can be viewed with: SELECT * FROM pgmq.a_<queue_name>
func (*Client) ArchiveBatch ¶
ArchiveBatch moves multiple messages from the queue to the archive table. Returns the IDs of messages that were actually archived.
func (*Client) ConvertArchivePartitioned ¶ added in v1.0.4
func (c *Client) ConvertArchivePartitioned(ctx context.Context, queue string, partitionInterval string, retentionInterval string, leadingPartition int) error
ConvertArchivePartitioned converts a non-partitioned archive table into a partitioned archive table for the specified queue. partitionInterval and retentionInterval control partition sizing and retention. leadingPartition defaults to 10 in PGMQ.
func (*Client) CreateExtension ¶
CreateExtension creates the PGMQ extension if it does not already exist.
func (*Client) CreateFIFOIndex ¶ added in v1.0.4
CreateFIFOIndex creates a FIFO index for the specified queue.
func (*Client) CreateFIFOIndexesAll ¶ added in v1.0.4
CreateFIFOIndexesAll creates FIFO indexes for all queues that do not have them.
func (*Client) CreateNonPartitionedQueue ¶ added in v1.0.4
CreateNonPartitionedQueue creates a new non-partitioned queue. This is functionally equivalent to CreateQueue.
func (*Client) CreatePartitionedQueue ¶
func (c *Client) CreatePartitionedQueue(ctx context.Context, queue string, partitionInterval string, retentionInterval string) error
CreatePartitionedQueue creates a new partitioned queue. Requires the pg_partman extension. partitionInterval controls the range of each partition (e.g. "10000" for ID-based), and retentionInterval controls how long partitions are kept (e.g. "100000").
func (*Client) CreateQueue ¶
CreateQueue creates a new standard queue with the given name. This sets up the queue's tables, indexes, and metadata.
func (*Client) CreateUnloggedQueue ¶
CreateUnloggedQueue creates a new unlogged queue. Unlogged queues use unlogged tables which do not write to WAL, providing better performance at the cost of durability (data is lost on crash).
func (*Client) DB ¶
DB returns the underlying DB interface for advanced usage such as starting transactions or running custom queries.
func (*Client) Delete ¶
Delete permanently deletes a single message from the queue by its ID. Returns true if the message was found and deleted, false otherwise. Use Archive instead if you want to retain the message for auditing.
func (*Client) DeleteBatch ¶
DeleteBatch permanently deletes multiple messages from the queue by their IDs. Returns the IDs of messages that were actually deleted.
func (*Client) DetachArchive ¶ added in v1.0.4
DetachArchive detaches a queue's archive table (deprecated in PGMQ; no-op).
func (*Client) DisableNotifyInsert ¶
DisableNotifyInsert disables LISTEN/NOTIFY on message inserts for the specified queue.
func (*Client) DropQueue ¶
DropQueue deletes the given queue, including its tables, indexes, and metadata. Returns ErrQueueNotFound if the queue does not exist.
func (*Client) EnableNotifyInsert ¶
func (c *Client) EnableNotifyInsert(ctx context.Context, queue string, throttleIntervalMs int) error
EnableNotifyInsert enables PostgreSQL LISTEN/NOTIFY on message inserts for the specified queue. When enabled, a NOTIFY event is fired when new messages are inserted, allowing consumers to react immediately instead of polling.
The throttleIntervalMs parameter controls the minimum interval between notifications in milliseconds. Use 0 to disable throttling.
func (*Client) ListQueues ¶
ListQueues returns metadata about all existing PGMQ queues.
func (*Client) Metrics ¶
Metrics returns statistics for the specified queue including queue length, message ages, and total message count.
func (*Client) MetricsAll ¶
MetricsAll returns statistics for all existing queues.
func (*Client) Pop ¶
Pop reads and immediately deletes a single message from the queue. Unlike Read, the visibility timeout does not apply because the message is deleted immediately.
Returns ErrNoRows if the queue is empty.
func (*Client) PopBatch ¶
PopBatch reads and immediately deletes up to qty messages from the queue. Returns an empty slice if the queue is empty.
func (*Client) Purge ¶
Purge removes all messages from the specified queue. Returns the number of messages that were purged.
func (*Client) Read ¶
func (c *Client) Read(ctx context.Context, queue string, vt int64, opts ...ReadOption) (*Message, error)
Read reads a single message from the queue. The message becomes invisible for the duration of the visibility timeout (vt) in seconds. If vt is 0, the default of 30 seconds is used.
Returns ErrNoRows if no messages are available.
func (*Client) ReadBatch ¶
func (c *Client) ReadBatch(ctx context.Context, queue string, vt int64, numMsgs int64, opts ...ReadOption) ([]*Message, error)
ReadBatch reads up to numMsgs messages from the queue. Each returned message becomes invisible for the duration of the visibility timeout (vt) in seconds. If vt is 0, the default of 30 seconds is used.
Returns an empty slice if no messages are available.
func (*Client) ReadGrouped ¶
func (c *Client) ReadGrouped(ctx context.Context, queue string, vt int64, qty int64) ([]*Message, error)
ReadGrouped reads up to qty messages from the queue using FIFO grouped ordering (similar to AWS SQS message groups). Messages are grouped by the "x-pgmq-group" header value.
func (*Client) ReadGroupedRoundRobin ¶
func (c *Client) ReadGroupedRoundRobin(ctx context.Context, queue string, vt int64, qty int64) ([]*Message, error)
ReadGroupedRoundRobin reads up to qty messages using round-robin grouped ordering.
func (*Client) ReadGroupedRoundRobinWithPoll ¶
func (c *Client) ReadGroupedRoundRobinWithPoll(ctx context.Context, queue string, vt int64, qty int64, opts ...PollOption) ([]*Message, error)
ReadGroupedRoundRobinWithPoll reads up to qty messages using round-robin grouped ordering with long-polling support.
func (*Client) ReadGroupedWithPoll ¶
func (c *Client) ReadGroupedWithPoll(ctx context.Context, queue string, vt int64, qty int64, opts ...PollOption) ([]*Message, error)
ReadGroupedWithPoll reads up to qty messages using FIFO grouped ordering with long-polling support.
func (*Client) ReadWithPoll ¶
func (c *Client) ReadWithPoll(ctx context.Context, queue string, vt int64, numMsgs int64, opts ...PollOption) ([]*Message, error)
ReadWithPoll reads up to numMsgs messages from the queue using long-polling. It will poll for messages for up to maxPollSeconds (default 5) with a polling interval of pollIntervalMs (default 100ms).
Returns an empty slice if no messages become available within the polling window.
func (*Client) Send ¶
func (c *Client) Send(ctx context.Context, queue string, msg json.RawMessage, opts ...SendOption) (int64, error)
Send sends a single message to the specified queue. Returns the message ID.
Options can be provided to set a delay or headers:
client.Send(ctx, "my_queue", msg) client.Send(ctx, "my_queue", msg, WithDelay(10)) client.Send(ctx, "my_queue", msg, WithHeaders(h)) client.Send(ctx, "my_queue", msg, WithHeaders(h), WithDelay(10)) client.Send(ctx, "my_queue", msg, WithDelayTimestamp(t)) client.Send(ctx, "my_queue", msg, WithHeaders(h), WithDelayTimestamp(t))
func (*Client) SendBatch ¶
func (c *Client) SendBatch(ctx context.Context, queue string, msgs []json.RawMessage, opts ...SendOption) ([]int64, error)
SendBatch sends multiple messages to the specified queue. Returns the message IDs.
Options can be provided to set a delay or per-message headers:
client.SendBatch(ctx, "my_queue", msgs) client.SendBatch(ctx, "my_queue", msgs, WithDelay(10)) client.SendBatch(ctx, "my_queue", msgs, WithBatchHeaders(headers)) client.SendBatch(ctx, "my_queue", msgs, WithBatchHeaders(headers), WithDelay(10)) client.SendBatch(ctx, "my_queue", msgs, WithDelayTimestamp(t)) client.SendBatch(ctx, "my_queue", msgs, WithBatchHeaders(headers), WithDelayTimestamp(t))
func (*Client) SetVT ¶
SetVT sets the visibility timeout of a single message to vt seconds from now. Returns the updated message record. Returns ErrNoRows if the message is not found.
func (*Client) SetVTBatch ¶
func (c *Client) SetVTBatch(ctx context.Context, queue string, msgIDs []int64, vt int64) ([]*Message, error)
SetVTBatch sets the visibility timeout of multiple messages to vt seconds from now. Returns the updated message records.
func (*Client) SetVTBatchTimestamp ¶
func (c *Client) SetVTBatchTimestamp(ctx context.Context, queue string, msgIDs []int64, vt time.Time) ([]*Message, error)
SetVTBatchTimestamp sets the visibility timeout of multiple messages to a specific timestamp. Returns the updated message records.
func (*Client) SetVTTimestamp ¶
func (c *Client) SetVTTimestamp(ctx context.Context, queue string, msgID int64, vt time.Time) (*Message, error)
SetVTTimestamp sets the visibility timeout of a single message to a specific timestamp. Returns the updated message record. Returns ErrNoRows if the message is not found.
type DB ¶
type DB interface {
Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error)
Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
}
DB is an interface satisfied by *pgxpool.Pool, *pgx.Conn, and pgx.Tx.
type Message ¶
type Message struct {
MsgID int64
ReadCount int64
EnqueuedAt time.Time
LastReadAt *time.Time // Available in PGMQ v1.10.0+
VT time.Time
Message json.RawMessage
Headers json.RawMessage
}
Message represents a message read from a PGMQ queue.
type Metrics ¶
type Metrics struct {
QueueName string
QueueLength int64
NewestMsgAgeSec *int64
OldestMsgAgeSec *int64
TotalMessages int64
ScrapeTime time.Time
QueueVisibleLength int64
}
Metrics represents statistics for a single PGMQ queue.
type PollOption ¶
type PollOption func(*pollOpts)
PollOption configures optional parameters for ReadWithPoll.
func WithMaxPollSeconds ¶
func WithMaxPollSeconds(seconds int) PollOption
WithMaxPollSeconds sets the maximum number of seconds to poll. Default is 5.
func WithPollConditional ¶
func WithPollConditional(filter json.RawMessage) PollOption
WithPollConditional sets an experimental JSONB filter for conditional poll reads.
func WithPollIntervalMs ¶
func WithPollIntervalMs(ms int) PollOption
WithPollIntervalMs sets the polling interval in milliseconds. Default is 100.
type ReadOption ¶
type ReadOption func(*readOpts)
ReadOption configures optional parameters for Read and ReadBatch.
func WithConditional ¶
func WithConditional(filter json.RawMessage) ReadOption
WithConditional sets an experimental JSONB filter for conditional reads.
type SendOption ¶
type SendOption func(*sendOpts)
SendOption configures optional parameters for Send and SendBatch.
func WithBatchHeaders ¶
func WithBatchHeaders(headers []json.RawMessage) SendOption
WithBatchHeaders sets per-message JSONB headers for a batch send. The length of the headers slice should match the number of messages. Used with SendBatch.
func WithDelay ¶
func WithDelay(seconds int) SendOption
WithDelay sets the message delay in seconds.
func WithDelayTimestamp ¶
func WithDelayTimestamp(t time.Time) SendOption
WithDelayTimestamp sets the message delay as an absolute timestamp. The message will become visible at the specified time.
func WithHeaders ¶
func WithHeaders(h json.RawMessage) SendOption
WithHeaders sets JSONB headers (metadata) on the message. Used with Send for a single message.