Back to godoc.org
cirello.io/pgqueue

Package pgqueue

v0.0.0-...-d83c58f
Latest Go to latest

The highest tagged major version is .

Published: Aug 17, 2020 | License: Apache-2.0 | Module: cirello.io/pgqueue

Overview

Package pgqueue is a library allows to use a single PostgreSQL instance as a low-throughput queue server.

client, err := pgqueue.Open(dsn)
if err != nil {
    log.Fatalln("cannot open database connection:", err)
}
defer client.Close()
if err := client.CreateTable(); err != nil {
    log.Fatalln("cannot create queue table:", err)
}
queue := client.Queue("example-queue-reservation")
defer queue.Close()
content := []byte("content")
if err := queue.Push(content); err != nil {
    log.Fatalln("cannot push message to queue:", err)
}
r, err := queue.Reserve(1 * time.Minute)
if err != nil {
    log.Fatalln("cannot reserve message from the queue:", err)
}
fmt.Printf("content: %s\n", r.Content)
if err := r.Done(); err != nil {
    log.Fatalln("cannot mark message as done:", err)
}
Example (Basic)

Code:

package main

import (
	"cirello.io/pgqueue"
	"fmt"
	"log"
	"os"

	_ "github.com/lib/pq"
)

var dsn = os.Getenv("PGQUEUE_TEST_DSN")

func main() {
	client, err := pgqueue.Open(dsn)
	if err != nil {
		log.Fatalln("cannot open database connection:", err)
	}
	defer client.Close()
	if err := client.CreateTable(); err != nil {
		log.Fatalln("cannot create queue table:", err)
	}
	queue := client.Queue("example-queue-name")
	defer queue.Close()
	content := []byte("content")
	if err := queue.Push(content); err != nil {
		log.Fatalln("cannot push message to queue:", err)
	}
	poppedContent, err := queue.Pop()
	if err != nil {
		log.Fatalln("cannot pop message from the queue:", err)
	}
	fmt.Printf("content: %s\n", poppedContent)
}
content: content
Example (EmptyQueue)

Code:

package main

import (
	"cirello.io/pgqueue"
	"fmt"
	"log"
	"os"

	_ "github.com/lib/pq"
)

var dsn = os.Getenv("PGQUEUE_TEST_DSN")

func main() {
	client, err := pgqueue.Open(dsn)
	if err != nil {
		log.Fatalln("cannot open database connection:", err)
	}
	defer client.Close()
	if err := client.CreateTable(); err != nil {
		log.Fatalln("cannot create queue table:", err)
	}
	queue := client.Queue("empty-name")
	defer queue.Close()
	_, err = queue.Pop()
	fmt.Println("err:", err)
}
err: empty queue
Example (LargeMessage)

Code:

package main

import (
	"bytes"
	"cirello.io/pgqueue"
	"fmt"
	"log"
	"os"

	_ "github.com/lib/pq"
)

var dsn = os.Getenv("PGQUEUE_TEST_DSN")

func main() {
	client, err := pgqueue.Open(dsn)
	if err != nil {
		log.Fatalln("cannot open database connection:", err)
	}
	defer client.Close()
	if err := client.CreateTable(); err != nil {
		log.Fatalln("cannot create queue table:", err)
	}
	queue := client.Queue("example-queue-large-message")
	defer queue.Close()
	content := bytes.Repeat([]byte{0}, pgqueue.DefaultMaxMessageLength+1)
	err = queue.Push(content)
	fmt.Println("err:", err)
}
err: message is too large
Example (Listen)

Code:

package main

import (
	"cirello.io/pgqueue"
	"fmt"
	"log"
	"os"
	"time"

	_ "github.com/lib/pq"
)

var dsn = os.Getenv("PGQUEUE_TEST_DSN")

func main() {
	client, err := pgqueue.Open(dsn)
	if err != nil {
		log.Fatalln("cannot open database connection:", err)
	}
	defer client.Close()
	queue := client.Queue("example-queue-listen")
	defer queue.Close()
	go queue.Push([]byte("content"))
	watch := queue.Watch(time.Minute)
	for watch.Next() {
		msg := watch.Message()
		fmt.Printf("msg: %s\n", msg.Content)
		msg.Done()
		queue.Close()
	}
	if err := watch.Err(); err != nil && err != pgqueue.ErrAlreadyClosed {
		log.Fatalln("cannot observe queue:", err)
	}
}
msg: content
Example (Reservation)

Code:

package main

import (
	"cirello.io/pgqueue"
	"fmt"
	"log"
	"os"
	"time"

	_ "github.com/lib/pq"
)

var dsn = os.Getenv("PGQUEUE_TEST_DSN")

func main() {
	client, err := pgqueue.Open(dsn)
	if err != nil {
		log.Fatalln("cannot open database connection:", err)
	}
	defer client.Close()
	if err := client.CreateTable(); err != nil {
		log.Fatalln("cannot create queue table:", err)
	}
	queue := client.Queue("example-queue-reservation")
	defer queue.Close()
	content := []byte("content")
	if err := queue.Push(content); err != nil {
		log.Fatalln("cannot push message to queue:", err)
	}
	r, err := queue.Reserve(1 * time.Minute)
	if err != nil {
		log.Fatalln("cannot reserve message from the queue:", err)
	}
	fmt.Printf("content: %s\n", r.Content)
	if err := r.Done(); err != nil {
		log.Fatalln("cannot mark message as done:", err)
	}
}
content: content
Example (ReservedReleased)

Code:

package main

import (
	"cirello.io/pgqueue"
	"fmt"
	"log"
	"os"
	"time"

	_ "github.com/lib/pq"
)

var dsn = os.Getenv("PGQUEUE_TEST_DSN")

func main() {
	client, err := pgqueue.Open(dsn)
	if err != nil {
		log.Fatalln("cannot open database connection:", err)
	}
	defer client.Close()
	if err := client.CreateTable(); err != nil {
		log.Fatalln("cannot create queue table:", err)
	}
	queue := client.Queue("example-queue-release")
	defer queue.Close()
	content := []byte("content")
	if err := queue.Push(content); err != nil {
		log.Fatalln("cannot push message to queue:", err)
	}
	r, err := queue.Reserve(1 * time.Minute)
	if err != nil {
		log.Fatalln("cannot pop message from the queue:", err)
	}
	fmt.Printf("content: %s\n", r.Content)
	if err := r.Release(); err != nil {
		log.Fatalln("cannot release the message back to the queue:", err)
	}
}
content: content
Example (ReservedTouch)

Code:

package main

import (
	"cirello.io/pgqueue"
	"fmt"
	"log"
	"os"
	"time"

	_ "github.com/lib/pq"
)

var dsn = os.Getenv("PGQUEUE_TEST_DSN")

func main() {
	client, err := pgqueue.Open(dsn)
	if err != nil {
		log.Fatalln("cannot open database connection:", err)
	}
	defer client.Close()
	if err := client.CreateTable(); err != nil {
		log.Fatalln("cannot create queue table:", err)
	}
	queue := client.Queue("example-queue-touch")
	defer queue.Close()
	content := []byte("content")
	if err := queue.Push(content); err != nil {
		log.Fatalln("cannot push message to queue:", err)
	}
	r, err := queue.Reserve(10 * time.Second)
	if err != nil {
		log.Fatalln("cannot pop message from the queue:", err)
	}
	fmt.Printf("content: %s\n", r.Content)
	time.Sleep(5 * time.Second)
	if err := r.Touch(1 * time.Minute); err != nil {
		log.Fatalln("cannot extend message lease:", err)
	}
}
content: content
Example (Vacuum)

Code:

package main

import (
	"cirello.io/pgqueue"
	"log"
	"os"
	"time"

	_ "github.com/lib/pq"
)

var dsn = os.Getenv("PGQUEUE_TEST_DSN")

func main() {
	const reservationTime = 500 * time.Millisecond
	client, err := pgqueue.Open(dsn, pgqueue.WithMaxDeliveries(1), pgqueue.DisableAutoVacuum())
	if err != nil {
		log.Fatalln("cannot open database connection:", err)
	}
	defer client.Close()
	if err := client.CreateTable(); err != nil {
		log.Fatalln("cannot create queue table:", err)
	}
	queue := client.Queue("example-queue-vacuum")
	defer queue.Close()
	for i := 0; i < 10; i++ {
		content := []byte("content")
		if err := queue.Push(content); err != nil {
			log.Fatalln("cannot push message to queue:", err)
		}
		if _, err := queue.Pop(); err != nil {
			log.Fatalln("cannot pop message from the queue:", err)
		}
	}
	client.Vacuum()
	stats := queue.VacuumStats()
	if err := stats.Err; err != nil {
		log.Fatalln("cannot clean up queue:", err)
	}
}

Index

Examples

Constants

const DefaultDeadLetterQueueNamePrefix = "deadletter"

DefaultDeadLetterQueueNamePrefix indicates the name of the dead letter queue.

const DefaultMaxDeliveriesCount = 5

DefaultMaxDeliveriesCount is how many delivery attempt each message gets before getting skipped on Pop and Reserve calls.

const DefaultMaxMessageLength = 65536

DefaultMaxMessageLength indicates the maximum content length acceptable for new messages. Although it is theoretically possible to use large messages, the idea here is to be conservative until the properties of PostgreSQL are fully mapped.

Variables

var ErrAlreadyClosed = errors.New("queue is already closed")

ErrAlreadyClosed indicates the queue is closed and all its watchers are going to report the queue is no longer available.

var ErrDeadletterQueueDisabled = errors.New("deadletter queue disabled")

ErrDeadletterQueueDisabled indicates that is not possible to dump messages from the target deadletter queue because its support has been disabled.

var ErrEmptyQueue = fmt.Errorf("empty queue")

ErrEmptyQueue indicates there isn't any message available at the head of the queue.

var ErrInvalidDuration = errors.New("invalid duration")

ErrInvalidDuration indicates the duration used is too small. It must larger than a millisecond and be multiple of a millisecond.

var ErrMessageExpired = errors.New("message expired")

ErrMessageExpired indicates the message deadline has been reached and the current message pointer can no longer be used to update it.

var ErrMessageTooLarge = fmt.Errorf("message is too large")

ErrMessageTooLarge indicates the content to be pushed is too large.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client uses a postgreSQL database to run a queue system.

func Open

func Open(dsn string, opts ...ClientOption) (*Client, error)

Open uses the given database connection and start operating the queue system.

func (*Client) Close

func (c *Client) Close() error

Close stops the queue system.

func (*Client) CreateTable

func (c *Client) CreateTable() error

CreateTable prepares the underlying table for the queue system.

func (*Client) DumpDeadLetterQueue

func (c *Client) DumpDeadLetterQueue(queue string, w io.Writer) error

DumpDeadLetterQueue writes the messages into the writer and remove them from the database.

func (*Client) Queue

func (c *Client) Queue(queue string) *Queue

Queue configures a queue.

func (*Client) Vacuum

func (c *Client) Vacuum()

Vacuum cleans up the queue from done or dead messages.

type ClientCloseError

type ClientCloseError struct {
	ListenerError error
	DriverError   error
}

ClientCloseError reports all the errors that happened during client close.

func (*ClientCloseError) Error

func (e *ClientCloseError) Error() string

func (*ClientCloseError) Is

func (e *ClientCloseError) Is(target error) bool

Is detects if the given target matches either the listener or the driver error.

type ClientOption

type ClientOption func(*Client)

ClientOption reconfigures the behavior of the pgqueue Client.

func DisableAutoVacuum

func DisableAutoVacuum() ClientOption

DisableAutoVacuum forces the use of manual queue clean up.

func DisableDeadletterQueue

func DisableDeadletterQueue() ClientOption

DisableDeadletterQueue forces the errored messages to be deleted from the queue.

func WithCustomTable

func WithCustomTable(tableName string) ClientOption

WithCustomTable changes the name of the postgresql table used for the queue.

func WithMaxDeliveries

func WithMaxDeliveries(maxDeliveries int) ClientOption

WithMaxDeliveries indicates how many delivery attempts each message gets. If zero, the client retries the message forever.

func WithMaxMessageLength

func WithMaxMessageLength(maxMessageLength int) ClientOption

WithMaxMessageLength indicates how long each message can be before it is considered an error. If zero, it imposes no limit

type Message

type Message struct {
	Content     []byte
	LeasedUntil time.Time
	// contains filtered or unexported fields
}

Message represents on message from the queue

func (*Message) Done

func (m *Message) Done() error

Done mark message as done.

func (*Message) Release

func (m *Message) Release() error

Release put the message back to the queue.

func (*Message) Touch

func (m *Message) Touch(extension time.Duration) error

Touch extends the lease by the given duration. The duration must be multiples of milliseconds.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue holds the configuration definition for one queue.

func (*Queue) Close

func (q *Queue) Close() error

Close closes the queue.

func (*Queue) Pop

func (q *Queue) Pop() ([]byte, error)

Pop retrieves the pending message from the queue, if any available. If the queue is empty, it returns ErrEmptyQueue.

func (*Queue) Push

func (q *Queue) Push(content []byte) error

Push enqueues the given content to the target queue.

func (*Queue) Reserve

func (q *Queue) Reserve(lease time.Duration) (*Message, error)

Reserve retrieves the pending message from the queue, if any available. It marks as it as InProgress until the defined lease duration. If the message is not marked as Done by the lease time, it is returned to the queue. Lease duration must be multiple of milliseconds.

func (*Queue) VacuumStats

func (q *Queue) VacuumStats() VacuumStats

VacuumStats reports the result of the last vacuum cycle.

func (*Queue) Watch

func (q *Queue) Watch(lease time.Duration) *Watcher

Watch observes new messages for the target queue.

type State

type State string

State indicates the possible states of a message

const (
	New        State = "new"
	InProgress State = "in-progress"
	Done       State = "done"
)

Acceptable states for messages.

type VacuumStats

type VacuumStats struct {
	// LastRun indicates the time of the lastest vacuum cycle.
	LastRun time.Time
	// PageSize indicates how large the vacuum operation was in order to
	// keep it short and non-disruptive.
	PageSize int64
	// Err indicates why the vacuum cycle failed. If nil, it succeeded.
	Err error
}

VacuumStats reports the consequences of the clean up.

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

Watcher holds the pointer necessary to listen for postgreSQL events that indicates a new message has arrive in the pipe.

func (*Watcher) Err

func (w *Watcher) Err() error

Err holds the last known error that might have happened in Watcher lifespan.

func (*Watcher) Message

func (w *Watcher) Message() *Message

Message returns the current message store in the Watcher.

func (*Watcher) Next

func (w *Watcher) Next() bool

Next waits for the next message to arrive and store it into Watcher.

Package Files

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier