pgque

package module
v0.0.0-...-d9cf596 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

README

pgque

Go client for pgque, a zero-bloat PostgreSQL message queue built on PgQ.

Prerequisites

pgque must be installed in your Postgres database. Follow the upstream instructions, then create a queue and register at least one consumer:

SELECT pgque.create_queue('orders');
SELECT pgque.register_consumer('orders', 'processor');

Install

go get github.com/dio/pgque

Requires Go 1.26+ and pgx v5.

Quick start

client, err := pgque.Connect(ctx, "postgres://user:pass@localhost/mydb")
if err != nil {
    log.Fatal(err)
}
defer client.Close()

// Publish
id, err := client.Send(ctx, "orders", pgque.Event{
    Type:    "order.created",
    Payload: map[string]any{"order_id": 42},
})

// Consume
c := client.NewConsumer("orders", "processor")
c.Handle("order.created", func(ctx context.Context, msg pgque.Message) error {
    var payload map[string]any
    json.Unmarshal([]byte(msg.Payload), &payload)
    fmt.Println("got order:", payload["order_id"])
    return nil
})
log.Fatal(c.Start(ctx))

Consumer

Consumer.Start blocks until the context is cancelled. It opens a dedicated connection for LISTEN/NOTIFY so it wakes up as soon as pgque.ticker() fires, without constant polling. The fallback poll interval (default 30s) only kicks in when no notification arrives.

Returning an error from a handler nacks that message for redelivery. The batch is always acked when the handler loop finishes.

Catch-all handler

Register "*" to handle event types that have no explicit handler:

c.Handle("*", func(ctx context.Context, msg pgque.Message) error {
    log.Printf("unhandled type %s", msg.Type)
    return nil
})
Options
Option Default Description
WithPollInterval(d) 30s Fallback sleep when queue is empty and no NOTIFY arrives
WithMaxMessages(n) 100 Max messages per Receive call
WithRetryAfter(secs) 60 Delay before a nacked message is redelivered

Low-level API

Send, Receive, Ack, and Nack are also available directly if you prefer to manage the consume loop yourself.

// Send returns the assigned event ID.
eid, err := client.Send(ctx, "orders", pgque.Event{...})

// Receive fetches the next batch (empty slice = no batch ready).
msgs, err := client.Receive(ctx, "orders", "processor", 100)

// Ack finishes the batch.
err = client.Ack(ctx, msgs[0].BatchID)

// Nack re-queues a single message for retry.
err = client.Nack(ctx, msgs[0].BatchID, msgs[0], 60, "transient error")

Nack per message, then Ack the batch when done.

Testing

Unit tests
go test -race ./...
E2E tests

The e2e tests live in a separate Go module under e2e/ so that embedded-postgres and other test-only deps never appear in the main module's dependency graph. A go.work file ties them together for local development.

The tests spin up an embedded Postgres instance (port 5454, no external installation needed) and install the pgque schema into it at startup. The schema is fetched from a pinned upstream commit and is not checked in:

make fetch-schema   # downloads e2e/testdata/pgque.sql
make test.e2e       # runs the e2e suite (implies fetch-schema)

If e2e/testdata/pgque.sql is missing, the suite exits 0 with a message rather than failing, so CI without the fetch step does not break the build.

How the tests replace pg_cron

In production, pgque.ticker() is called by pg_cron on a schedule. In tests there is no pg_cron, so each test calls ticker() directly after publishing an event. Two queue config knobs make this reliable regardless of timing:

  • ticker_max_count=1 -- tick fires after a single new event (not a batch)
  • ticker_max_lag=0 -- tick fires even when lag is zero (no minimum wait)

These are set per queue in setupQueue and cleaned up after each test.

For retry flows (TestNack), the test calls pgque.maint_retry_events() directly. That function moves events from the retry queue back into the event table; it is intentionally separate from pgque.maint(), which only does table rotation.

License

Apache 2.0. See LICENSE.

Documentation

Overview

Package pgque is a Go client for PgQue — a zero-bloat PostgreSQL message queue built on top of PgQ.

Quick start:

client, err := pgque.Connect(ctx, dsn)
// produce
id, err := client.Send(ctx, "orders", pgque.Event{Type: "order.created", Payload: order})
// consume (high-level)
c := client.NewConsumer("orders", "processor")
c.Handle("order.created", func(ctx context.Context, msg pgque.Message) error { ... })
c.Start(ctx)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is the main pgque client. It wraps a pgx connection pool and is safe for concurrent use.

func Connect

func Connect(ctx context.Context, dsn string) (*Client, error)

Connect opens a new Client using the given PostgreSQL DSN. The caller must call Close when done.

func (*Client) Ack

func (c *Client) Ack(ctx context.Context, batchID int64) error

Ack finishes a batch, advancing the consumer's position in the queue. batchID comes from Message.BatchID.

func (*Client) Close

func (c *Client) Close()

Close releases all connections in the pool.

func (*Client) Nack

func (c *Client) Nack(ctx context.Context, batchID int64, msg Message, retryAfterSecs int, reason string) error

Nack re-queues a single message for later redelivery. If the message has exceeded the queue's max_retries, it is routed to the dead-letter table instead. retryAfterSecs controls the delay before the next delivery attempt. reason is optional (pass "" for none).

After nacking individual messages, call Ack to finish the batch.

func (*Client) NewConsumer

func (c *Client) NewConsumer(queue, name string, opts ...Option) *Consumer

NewConsumer creates a Consumer for the given queue and consumer name. Use the returned Consumer's Handle method to register handlers, then call Start to begin consuming.

func (*Client) Pool

func (c *Client) Pool() *pgxpool.Pool

Pool returns the underlying pgxpool for direct SQL access (e.g. DDL like create_queue, register_consumer).

func (*Client) Receive

func (c *Client) Receive(ctx context.Context, queue, consumer string, maxMessages int) ([]Message, error)

Receive fetches up to maxMessages from the next available batch for the named consumer. Returns an empty slice when no batch is ready.

func (*Client) Send

func (c *Client) Send(ctx context.Context, queue string, ev Event) (int64, error)

Send publishes an event to the named queue and returns the assigned event ID. ev.Type defaults to "default" when empty.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer polls a pgque queue and dispatches messages to registered handlers. It uses LISTEN/NOTIFY to wake up immediately when new events are ticked, falling back to a configurable poll interval when no notification arrives.

func (*Consumer) Handle

func (c *Consumer) Handle(eventType string, fn HandlerFunc)

Handle registers a handler for the given event type. Use "*" to register a catch-all handler for any unregistered type.

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context) error

Start begins the consume loop, blocking until ctx is cancelled. It opens a dedicated connection for LISTEN/NOTIFY (separate from the pool). Callers that need signal-based shutdown should pass a context created with signal.NotifyContext.

type Event

type Event struct {
	Type    string
	Payload any
}

Event is a message to publish to a queue. Payload is JSON-marshaled before being sent.

type HandlerFunc

type HandlerFunc func(ctx context.Context, msg Message) error

HandlerFunc processes a single message. Return a non-nil error to nack the message; return nil to indicate successful processing.

type Message

type Message struct {
	MsgID      int64     `json:"msg_id"`
	BatchID    int64     `json:"batch_id"`
	Type       string    `json:"type"`
	Payload    string    `json:"payload"` // raw JSON
	RetryCount *int      `json:"retry_count"`
	CreatedAt  time.Time `json:"created_at"`
	Extra1     *string   `json:"extra1"`
	Extra2     *string   `json:"extra2"`
	Extra3     *string   `json:"extra3"`
	Extra4     *string   `json:"extra4"`
}

Message is a message received from a queue. Payload is the raw JSON string as stored in PostgreSQL.

type Option

type Option func(*Consumer)

Option configures a Consumer.

func WithMaxMessages

func WithMaxMessages(n int) Option

WithMaxMessages sets the maximum number of messages fetched per Receive call. Defaults to 100.

func WithPollInterval

func WithPollInterval(d time.Duration) Option

WithPollInterval sets the fallback sleep duration between poll cycles when the queue is empty and no NOTIFY arrives. Defaults to 30s.

func WithRetryAfter

func WithRetryAfter(secs int) Option

WithRetryAfter sets the number of seconds before a nacked message is redelivered. Defaults to 60.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL