Documentation
¶
Overview ¶
Package pgque is a Go client for PgQue — a zero-bloat PostgreSQL message queue built on top of PgQ.
Quick start:
client, err := pgque.Connect(ctx, dsn)
// produce
id, err := client.Send(ctx, "orders", pgque.Event{Type: "order.created", Payload: order})
// consume (high-level)
c := client.NewConsumer("orders", "processor")
c.Handle("order.created", func(ctx context.Context, msg pgque.Message) error { ... })
c.Start(ctx)
Index ¶
- type Client
- func (c *Client) Ack(ctx context.Context, batchID int64) error
- func (c *Client) Close()
- func (c *Client) Nack(ctx context.Context, batchID int64, msg Message, retryAfterSecs int, ...) error
- func (c *Client) NewConsumer(queue, name string, opts ...Option) *Consumer
- func (c *Client) Pool() *pgxpool.Pool
- func (c *Client) Receive(ctx context.Context, queue, consumer string, maxMessages int) ([]Message, error)
- func (c *Client) Send(ctx context.Context, queue string, ev Event) (int64, error)
- type Consumer
- type Event
- type HandlerFunc
- type Message
- type Option
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is the main pgque client. It wraps a pgx connection pool and is safe for concurrent use.
func Connect ¶
Connect opens a new Client using the given PostgreSQL DSN. The caller must call Close when done.
func (*Client) Ack ¶
Ack finishes a batch, advancing the consumer's position in the queue. batchID comes from Message.BatchID.
func (*Client) Nack ¶
func (c *Client) Nack(ctx context.Context, batchID int64, msg Message, retryAfterSecs int, reason string) error
Nack re-queues a single message for later redelivery. If the message has exceeded the queue's max_retries, it is routed to the dead-letter table instead. retryAfterSecs controls the delay before the next delivery attempt. reason is optional (pass "" for none).
After nacking individual messages, call Ack to finish the batch.
func (*Client) NewConsumer ¶
NewConsumer creates a Consumer for the given queue and consumer name. Use the returned Consumer's Handle method to register handlers, then call Start to begin consuming.
func (*Client) Pool ¶
Pool returns the underlying pgxpool for direct SQL access (e.g. DDL like create_queue, register_consumer).
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer polls a pgque queue and dispatches messages to registered handlers. It uses LISTEN/NOTIFY to wake up immediately when new events are ticked, falling back to a configurable poll interval when no notification arrives.
func (*Consumer) Handle ¶
func (c *Consumer) Handle(eventType string, fn HandlerFunc)
Handle registers a handler for the given event type. Use "*" to register a catch-all handler for any unregistered type.
type HandlerFunc ¶
HandlerFunc processes a single message. Return a non-nil error to nack the message; return nil to indicate successful processing.
type Message ¶
type Message struct {
MsgID int64 `json:"msg_id"`
BatchID int64 `json:"batch_id"`
Type string `json:"type"`
Payload string `json:"payload"` // raw JSON
RetryCount *int `json:"retry_count"`
CreatedAt time.Time `json:"created_at"`
Extra1 *string `json:"extra1"`
Extra2 *string `json:"extra2"`
Extra3 *string `json:"extra3"`
Extra4 *string `json:"extra4"`
}
Message is a message received from a queue. Payload is the raw JSON string as stored in PostgreSQL.
type Option ¶
type Option func(*Consumer)
Option configures a Consumer.
func WithMaxMessages ¶
WithMaxMessages sets the maximum number of messages fetched per Receive call. Defaults to 100.
func WithPollInterval ¶
WithPollInterval sets the fallback sleep duration between poll cycles when the queue is empty and no NOTIFY arrives. Defaults to 30s.
func WithRetryAfter ¶
WithRetryAfter sets the number of seconds before a nacked message is redelivered. Defaults to 60.