Documentation ¶
Overview ¶
Package pgqueue is a library allows to use a single PostgreSQL instance as a low-throughput queue server.
client, err := pgqueue.Open(dsn) if err != nil { log.Fatalln("cannot open database connection:", err) } defer client.Close() if err := client.CreateTable(); err != nil { log.Fatalln("cannot create queue table:", err) } queue := client.Queue("example-queue-reservation") defer queue.Close() content := []byte("content") if err := queue.Push(content); err != nil { log.Fatalln("cannot push message to queue:", err) } r, err := queue.Reserve(1 * time.Minute) if err != nil { log.Fatalln("cannot reserve message from the queue:", err) } fmt.Printf("content: %s\n", r.Content) if err := r.Done(); err != nil { log.Fatalln("cannot mark message as done:", err) }
Example (Basic) ¶
package main import ( "fmt" "log" "os" "cirello.io/pgqueue" _ "github.com/lib/pq" ) var dsn = os.Getenv("PGQUEUE_TEST_DSN") func main() { client, err := pgqueue.Open(dsn) if err != nil { log.Fatalln("cannot open database connection:", err) } defer client.Close() if err := client.CreateTable(); err != nil { log.Fatalln("cannot create queue table:", err) } queue := client.Queue("example-queue-name") defer queue.Close() content := []byte("content") if err := queue.Push(content); err != nil { log.Fatalln("cannot push message to queue:", err) } poppedContent, err := queue.Pop() if err != nil { log.Fatalln("cannot pop message from the queue:", err) } fmt.Printf("content: %s\n", poppedContent) }
Output: content: content
Example (EmptyQueue) ¶
package main import ( "fmt" "log" "os" "cirello.io/pgqueue" _ "github.com/lib/pq" ) var dsn = os.Getenv("PGQUEUE_TEST_DSN") func main() { client, err := pgqueue.Open(dsn) if err != nil { log.Fatalln("cannot open database connection:", err) } defer client.Close() if err := client.CreateTable(); err != nil { log.Fatalln("cannot create queue table:", err) } queue := client.Queue("empty-name") defer queue.Close() _, err = queue.Pop() fmt.Println("err:", err) }
Output: err: empty queue
Example (LargeMessage) ¶
package main import ( "bytes" "fmt" "log" "os" "cirello.io/pgqueue" _ "github.com/lib/pq" ) var dsn = os.Getenv("PGQUEUE_TEST_DSN") func main() { client, err := pgqueue.Open(dsn) if err != nil { log.Fatalln("cannot open database connection:", err) } defer client.Close() if err := client.CreateTable(); err != nil { log.Fatalln("cannot create queue table:", err) } queue := client.Queue("example-queue-large-message") defer queue.Close() content := bytes.Repeat([]byte{0}, pgqueue.DefaultMaxMessageLength+1) err = queue.Push(content) fmt.Println("err:", err) }
Output: err: message is too large
Example (Listen) ¶
package main import ( "fmt" "log" "os" "time" "cirello.io/pgqueue" _ "github.com/lib/pq" ) var dsn = os.Getenv("PGQUEUE_TEST_DSN") func main() { client, err := pgqueue.Open(dsn) if err != nil { log.Fatalln("cannot open database connection:", err) } defer client.Close() queue := client.Queue("example-queue-listen") defer queue.Close() go queue.Push([]byte("content")) watch := queue.Watch(time.Minute) for watch.Next() { msg := watch.Message() fmt.Printf("msg: %s\n", msg.Content) msg.Done() queue.Close() } if err := watch.Err(); err != nil && err != pgqueue.ErrAlreadyClosed { log.Fatalln("cannot observe queue:", err) } }
Output: msg: content
Example (Reservation) ¶
package main import ( "fmt" "log" "os" "time" "cirello.io/pgqueue" _ "github.com/lib/pq" ) var dsn = os.Getenv("PGQUEUE_TEST_DSN") func main() { client, err := pgqueue.Open(dsn) if err != nil { log.Fatalln("cannot open database connection:", err) } defer client.Close() if err := client.CreateTable(); err != nil { log.Fatalln("cannot create queue table:", err) } queue := client.Queue("example-queue-reservation") defer queue.Close() content := []byte("content") if err := queue.Push(content); err != nil { log.Fatalln("cannot push message to queue:", err) } r, err := queue.Reserve(1 * time.Minute) if err != nil { log.Fatalln("cannot reserve message from the queue:", err) } fmt.Printf("content: %s\n", r.Content) if err := r.Done(); err != nil { log.Fatalln("cannot mark message as done:", err) } }
Output: content: content
Example (ReservedReleased) ¶
package main import ( "fmt" "log" "os" "time" "cirello.io/pgqueue" _ "github.com/lib/pq" ) var dsn = os.Getenv("PGQUEUE_TEST_DSN") func main() { client, err := pgqueue.Open(dsn) if err != nil { log.Fatalln("cannot open database connection:", err) } defer client.Close() if err := client.CreateTable(); err != nil { log.Fatalln("cannot create queue table:", err) } queue := client.Queue("example-queue-release") defer queue.Close() content := []byte("content") if err := queue.Push(content); err != nil { log.Fatalln("cannot push message to queue:", err) } r, err := queue.Reserve(1 * time.Minute) if err != nil { log.Fatalln("cannot pop message from the queue:", err) } fmt.Printf("content: %s\n", r.Content) if err := r.Release(); err != nil { log.Fatalln("cannot release the message back to the queue:", err) } }
Output: content: content
Example (ReservedTouch) ¶
package main import ( "fmt" "log" "os" "time" "cirello.io/pgqueue" _ "github.com/lib/pq" ) var dsn = os.Getenv("PGQUEUE_TEST_DSN") func main() { client, err := pgqueue.Open(dsn) if err != nil { log.Fatalln("cannot open database connection:", err) } defer client.Close() if err := client.CreateTable(); err != nil { log.Fatalln("cannot create queue table:", err) } queue := client.Queue("example-queue-touch") defer queue.Close() content := []byte("content") if err := queue.Push(content); err != nil { log.Fatalln("cannot push message to queue:", err) } r, err := queue.Reserve(10 * time.Second) if err != nil { log.Fatalln("cannot pop message from the queue:", err) } fmt.Printf("content: %s\n", r.Content) time.Sleep(5 * time.Second) if err := r.Touch(1 * time.Minute); err != nil { log.Fatalln("cannot extend message lease:", err) } }
Output: content: content
Example (Vacuum) ¶
package main import ( "log" "os" "time" "cirello.io/pgqueue" _ "github.com/lib/pq" ) var dsn = os.Getenv("PGQUEUE_TEST_DSN") func main() { const reservationTime = 500 * time.Millisecond client, err := pgqueue.Open(dsn, pgqueue.WithMaxDeliveries(1), pgqueue.DisableAutoVacuum()) if err != nil { log.Fatalln("cannot open database connection:", err) } defer client.Close() if err := client.CreateTable(); err != nil { log.Fatalln("cannot create queue table:", err) } queue := client.Queue("example-queue-vacuum") defer queue.Close() for i := 0; i < 10; i++ { content := []byte("content") if err := queue.Push(content); err != nil { log.Fatalln("cannot push message to queue:", err) } if _, err := queue.Pop(); err != nil { log.Fatalln("cannot pop message from the queue:", err) } } client.Vacuum() stats := queue.VacuumStats() if err := stats.Err; err != nil { log.Fatalln("cannot clean up queue:", err) } }
Output:
Index ¶
Examples ¶
Constants ¶
const DefaultDeadLetterQueueNamePrefix = "deadletter"
DefaultDeadLetterQueueNamePrefix indicates the name of the dead letter queue.
const DefaultMaxDeliveriesCount = 5
DefaultMaxDeliveriesCount is how many delivery attempt each message gets before getting skipped on Pop and Reserve calls.
const DefaultMaxMessageLength = 65536
DefaultMaxMessageLength indicates the maximum content length acceptable for new messages. Although it is theoretically possible to use large messages, the idea here is to be conservative until the properties of PostgreSQL are fully mapped.
Variables ¶
var ErrAlreadyClosed = errors.New("queue is already closed")
ErrAlreadyClosed indicates the queue is closed and all its watchers are going to report the queue is no longer available.
var ErrDeadletterQueueDisabled = errors.New("deadletter queue disabled")
ErrDeadletterQueueDisabled indicates that is not possible to dump messages from the target deadletter queue because its support has been disabled.
var ErrEmptyQueue = fmt.Errorf("empty queue")
ErrEmptyQueue indicates there isn't any message available at the head of the queue.
var ErrInvalidDuration = errors.New("invalid duration")
ErrInvalidDuration indicates the duration used is too small. It must larger than a millisecond and be multiple of a millisecond.
var ErrMessageExpired = errors.New("message expired")
ErrMessageExpired indicates the message deadline has been reached and the current message pointer can no longer be used to update it.
var ErrMessageTooLarge = fmt.Errorf("message is too large")
ErrMessageTooLarge indicates the content to be pushed is too large.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client uses a postgreSQL database to run a queue system.
func Open ¶
func Open(dsn string, opts ...ClientOption) (*Client, error)
Open uses the given database connection and start operating the queue system.
func (*Client) CreateTable ¶
CreateTable prepares the underlying table for the queue system.
func (*Client) DumpDeadLetterQueue ¶
DumpDeadLetterQueue writes the messages into the writer and remove them from the database.
type ClientCloseError ¶
ClientCloseError reports all the errors that happened during client close.
func (*ClientCloseError) Error ¶
func (e *ClientCloseError) Error() string
func (*ClientCloseError) Is ¶
func (e *ClientCloseError) Is(target error) bool
Is detects if the given target matches either the listener or the driver error.
type ClientOption ¶
type ClientOption func(*Client)
ClientOption reconfigures the behavior of the pgqueue Client.
func DisableAutoVacuum ¶
func DisableAutoVacuum() ClientOption
DisableAutoVacuum forces the use of manual queue clean up.
func DisableDeadletterQueue ¶
func DisableDeadletterQueue() ClientOption
DisableDeadletterQueue forces the errored messages to be deleted from the queue.
func WithCustomTable ¶
func WithCustomTable(tableName string) ClientOption
WithCustomTable changes the name of the postgresql table used for the queue.
func WithMaxDeliveries ¶
func WithMaxDeliveries(maxDeliveries int) ClientOption
WithMaxDeliveries indicates how many delivery attempts each message gets. If zero, the client retries the message forever.
func WithMaxMessageLength ¶
func WithMaxMessageLength(maxMessageLength int) ClientOption
WithMaxMessageLength indicates how long each message can be before it is considered an error. If zero, it imposes no limit
type Message ¶
type Message struct { Content []byte LeasedUntil time.Time // contains filtered or unexported fields }
Message represents on message from the queue
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue holds the configuration definition for one queue.
func (*Queue) Pop ¶
Pop retrieves the pending message from the queue, if any available. If the queue is empty, it returns ErrEmptyQueue.
func (*Queue) Reserve ¶
Reserve retrieves the pending message from the queue, if any available. It marks as it as InProgress until the defined lease duration. If the message is not marked as Done by the lease time, it is returned to the queue. Lease duration must be multiple of milliseconds.
func (*Queue) VacuumStats ¶
func (q *Queue) VacuumStats() VacuumStats
VacuumStats reports the result of the last vacuum cycle.
type VacuumStats ¶
type VacuumStats struct { // LastRun indicates the time of the lastest vacuum cycle. LastRun time.Time // PageSize indicates how large the vacuum operation was in order to // keep it short and non-disruptive. PageSize int64 // Err indicates why the vacuum cycle failed. If nil, it succeeded. Err error }
VacuumStats reports the consequences of the clean up.
type Watcher ¶
type Watcher struct {
// contains filtered or unexported fields
}
Watcher holds the pointer necessary to listen for postgreSQL events that indicates a new message has arrive in the pipe.