gue

package module
v2.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2021 License: MIT Imports: 14 Imported by: 0

README

gue

GoDev Coverage Status ReportCard License

Gue is Golang queue on top of PostgreSQL that uses transaction-level locks.

Originally this project used to be a fork of bgentry/que-go but because of some backward-compatibility breaking changes and original library author not being very responsive for PRs I turned fork into standalone project. Version 2 breaks internal backward-compatibility with the original project - DB table and all the internal logic (queries, algorithms) is completely rewritten.

The name Gue is yet another silly word transformation: Queue -> Que, Go + Que -> Gue.

Install

go get -u github.com/vgarvardt/gue/v2

Additionally, you need to apply DB migration.

Usage Example

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "os"
    "time"

    "github.com/jackc/pgx/v4/pgxpool"
    "golang.org/x/sync/errgroup"

    "github.com/vgarvardt/gue/v2"
    "github.com/vgarvardt/gue/v2/adapter/pgxv4"
)

type printNameArgs struct {
    Name string
}

func main() {
    printName := func(j *gue.Job) error {
        var args printNameArgs
        if err := json.Unmarshal(j.Args, &args); err != nil {
            return err
        }
        fmt.Printf("Hello %s!\n", args.Name)
        return nil
    }

    pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
    if err != nil {
        log.Fatal(err)
    }

    pgxPool, err := pgxpool.ConnectConfig(context.Background(), pgxCfg)
    if err != nil {
        log.Fatal(err)
    }
    defer pgxPool.Close()

    poolAdapter := pgxv4.NewConnPool(pgxPool)

    gc := gue.NewClient(poolAdapter)
    wm := gue.WorkMap{
        "PrintName": printName,
    }
    // create a pool w/ 2 workers
    workers := gue.NewWorkerPool(gc, wm, 2, gue.WithPoolQueue("name_printer"))

    ctx, shutdown := context.WithCancel(context.Background())

    // work jobs in goroutine
    g, gctx := errgroup.WithContext(ctx)
    g.Go(func() error {
        err := workers.Run(gctx)
        if err != nil {
            // In a real-world applications, use a better way to shut down
            // application on unrecoverable error. E.g. fx.Shutdowner from
            // go.uber.org/fx module.
            log.Fatal(err)
        }
        return err
    })

    args, err := json.Marshal(printNameArgs{Name: "vgarvardt"})
    if err != nil {
        log.Fatal(err)
    }

    j := &gue.Job{
        Type:  "PrintName",
        Args:  args,
    }
    if err := gc.Enqueue(context.Background(), j); err != nil {
        log.Fatal(err)
    }

    j := &gue.Job{
        Type:  "PrintName",
        RunAt: time.Now().UTC().Add(30 * time.Second), // delay 30 seconds
        Args:  args,
    }
    if err := gc.Enqueue(context.Background(), j); err != nil {
        log.Fatal(err)
    }

    time.Sleep(30 * time.Second) // wait for while

    // send shutdown signal to worker
    shutdown()
    if err := g.Wait(); err != nil {
        log.Fatal(err)
    }
}

PostgreSQL drivers

Package supports several PostgreSQL drivers using adapter interface internally. Currently, adapters for the following drivers have been implemented:

pgx/v4
package main

import (
    "context"
    "log"
    "os"

    "github.com/jackc/pgx/v4/pgxpool"

    "github.com/vgarvardt/gue/v2"
    "github.com/vgarvardt/gue/v2/adapter/pgxv4"
)

func main() {
    pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
    if err != nil {
        log.Fatal(err)
    }

    pgxPool, err := pgxpool.ConnectConfig(context.Background(), pgxCfg)
    if err != nil {
        log.Fatal(err)
    }
    defer pgxPool.Close()

    poolAdapter := pgxv4.NewConnPool(pgxPool)

    gc := gue.NewClient(poolAdapter)
    ...
}
pgx/v3
package main

import (
    "log"
    "os"

    "github.com/jackc/pgx"

    "github.com/vgarvardt/gue/v2"
    "github.com/vgarvardt/gue/v2/adapter/pgxv3"
)

func main() {
    pgxCfg, err := pgx.ParseURI(os.Getenv("DATABASE_URL"))
    if err != nil {
        log.Fatal(err)
    }
    pgxPool, err := pgx.NewConnPool(pgx.ConnPoolConfig{
        ConnConfig:   pgxCfg,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer pgxPool.Close()

    poolAdapter := pgxv3.NewConnPool(pgxPool)

    gc := gue.NewClient(poolAdapter)
    ...
}
lib/pq
package main

import (
    "database/sql"
    "log"
    "os"

    _ "github.com/lib/pq" // register postgres driver

    "github.com/vgarvardt/gue/v2"
    "github.com/vgarvardt/gue/v2/adapter/libpq"
)

func main() {
    db, err := sql.Open("postgres", os.Getenv("DATABASE_URL"))
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    poolAdapter := libpq.NewConnPool(db)

    gc := gue.NewClient(poolAdapter)
    ...
}
pg/v10
package main

import (
    "log"
    "os"

    "github.com/go-pg/pg/v10"

    "github.com/vgarvardt/gue/v2"
    "github.com/vgarvardt/gue/v2/adapter/gopgv10"
)

func main() {
    opts, err := pg.ParseURL(os.Getenv("DATABASE_URL"))
    if err != nil {
        log.Fatal(err)
    }

    db := pg.Connect(opts)
    defer db.Close()

    poolAdapter := gopgv10.NewConnPool(db)

    gc := gue.NewClient(poolAdapter)
    ...
}

Logging

Package supports several logging libraries using adapter interface internally. Currently, adapters for the following drivers have been implemented:

  • NoOp (adapter.NoOpLogger) - default adapter that does nothing, so it is basically /dev/null logger
  • Stdlib log - adapter that uses log logger for logs output. Instantiate it with adapter.NewStdLogger(...).
  • Uber zap - adapter that uses go.uber.org/zap logger for logs output. Instantiate it with adapter.zap.New(...).

Testing

Linter and tests are running for every Pull Request, but it is possible to run linter and tests locally using docker and make.

Run linter: make link. This command runs liner in docker container with the project source code mounted.

Run tests: make test. This command runs project dependencies in docker containers if they are not started yet and runs go tests with coverage.

Documentation

Overview

Package gue implements Golang queue on top of PostgreSQL. It uses transaction-level locks for concurrent work.

PostgreSQL drivers

Package supports several PostgreSQL drivers using adapter interface internally. Currently, adapters for the following drivers have been implemented:

  • github.com/jackc/pgx v4
  • github.com/jackc/pgx v3
  • github.com/lib/pq

Usage

Here is a complete example showing worker setup for pgx/v4 and two jobs enqueued, one with a delay:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/jackc/pgx/v4/pgxpool"
	"golang.org/x/sync/errgroup"

	"github.com/vgarvardt/gue/v2"
	"github.com/vgarvardt/gue/v2/adapter/pgxv4"
)

type printNameArgs struct {
	Name string
}

func main() {
	printName := func(j *gue.Job) error {
		var args printNameArgs
		if err := json.Unmarshal(j.Args, &args); err != nil {
			return err
		}
		fmt.Printf("Hello %s!\n", args.Name)
		return nil
	}

	pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
	if err != nil {
		log.Fatal(err)
	}

	pgxPool, err := pgxpool.ConnectConfig(context.Background(), pgxCfg)
	if err != nil {
		log.Fatal(err)
	}
	defer pgxPool.Close()

	poolAdapter := pgxv4.NewConnPool(pgxPool)

	gc := gue.NewClient(poolAdapter)
	wm := gue.WorkMap{
		"PrintName": printName,
	}
	// create a pool w/ 2 workers
	workers := gue.NewWorkerPool(gc, wm, 2, gue.WithPoolQueue("name_printer"))

	ctx, shutdown := context.WithCancel(context.Background())

	// work jobs in goroutine
	g, gctx := errgroup.WithContext(ctx)
	g.Go(func() error {
		err := workers.Run(gctx)
		if err != nil {
			// In a real-world applications, use a better way to shut down
			// application on unrecoverable error. E.g. fx.Shutdowner from
			// go.uber.org/fx module.
			log.Fatal(err)
		}
		return err
	})

	args, err := json.Marshal(printNameArgs{Name: "vgarvardt"})
	if err != nil {
		log.Fatal(err)
	}

	j := &gue.Job{
		Type:  "PrintName",
		Args:  args,
	}
	if err := gc.Enqueue(context.Background(), j); err != nil {
		log.Fatal(err)
	}

	j := &gue.Job{
		Type:  "PrintName",
		RunAt: time.Now().UTC().Add(30 * time.Second), // delay 30 seconds
		Args:  args,
	}
	if err := gc.Enqueue(context.Background(), j); err != nil {
		log.Fatal(err)
	}

	time.Sleep(30 * time.Second) // wait for while

	// send shutdown signal to worker
	shutdown()
	if err := g.Wait(); err != nil {
		log.Fatal(err)
	}
}

Index

Constants

This section is empty.

Variables

View Source
var ErrMissingType = errors.New("job type must be specified")

ErrMissingType is returned when you attempt to enqueue a job with no Type specified.

Functions

This section is empty.

Types

type Backoff

type Backoff func(retries int) time.Duration

Backoff is the interface for backoff implementation that will be used to reschedule errored jobs.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a Gue client that can add jobs to the queue and remove jobs from the queue.

func NewClient

func NewClient(pool adapter.ConnPool, options ...ClientOption) *Client

NewClient creates a new Client that uses the pgx pool.

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, j *Job) error

Enqueue adds a job to the queue.

func (*Client) EnqueueTx added in v2.0.1

func (c *Client) EnqueueTx(ctx context.Context, j *Job, tx adapter.Tx) error

EnqueueTx adds a job to the queue within the scope of the transaction. This allows you to guarantee that an enqueued job will either be committed or rolled back atomically with other changes in the course of this transaction.

It is the caller's responsibility to Commit or Rollback the transaction after this function is called.

func (*Client) LockJob

func (c *Client) LockJob(ctx context.Context, queue string) (*Job, error)

LockJob attempts to retrieve a Job from the database in the specified queue. If a job is found, it will be locked on the transactional level, so other workers will be skipping it. If no job is found, nil will be returned instead of an error.

Because Gue uses transaction-level locks, we have to hold the same transaction throughout the process of getting a job, working it, deleting it, and releasing the lock.

After the Job has been worked, you must call either Done() or Error() on it in order to commit transaction to persist Job changes (remove or update it).

type ClientOption

type ClientOption func(*Client)

ClientOption defines a type that allows to set client properties during the build-time.

func WithClientBackoff

func WithClientBackoff(backoff Backoff) ClientOption

WithClientBackoff sets backoff implementation that will be applied to errored jobs within current client session.

func WithClientID

func WithClientID(id string) ClientOption

WithClientID sets client ID for easier identification in logs

func WithClientLogger

func WithClientLogger(logger adapter.Logger) ClientOption

WithClientLogger sets Logger implementation to client

type Job

type Job struct {
	// ID is the unique database ID of the Job. It is ignored on job creation.
	ID int64

	// Queue is the name of the queue. It defaults to the empty queue "".
	Queue string

	// Priority is the priority of the Job. The default priority is 0, and a
	// lower number means a higher priority.
	//
	// The highest priority is -32768, the lowest one is +32767
	Priority int16

	// RunAt is the time that this job should be executed. It defaults to now(),
	// meaning the job will execute immediately. Set it to a value in the future
	// to delay a job's execution.
	RunAt time.Time

	// Type maps job to a worker func.
	Type string

	// Args must be the bytes of a valid JSON string
	Args []byte

	// ErrorCount is the number of times this job has attempted to run, but
	// failed with an error. It is ignored on job creation.
	ErrorCount int32

	// LastError is the error message or stack trace from the last time the job
	// failed. It is ignored on job creation.
	LastError pgtype.Text
	// contains filtered or unexported fields
}

Job is a single unit of work for Gue to perform.

func (*Job) Delete

func (j *Job) Delete(ctx context.Context) error

Delete marks this job as complete by deleting it form the database.

You must also later call Done() to return this job's database connection to the pool.

func (*Job) Done

func (j *Job) Done(ctx context.Context) error

Done commits transaction that marks job as done.

func (*Job) Error

func (j *Job) Error(ctx context.Context, msg string) (err error)

Error marks the job as failed and schedules it to be reworked. An error message or backtrace can be provided as msg, which will be saved on the job. It will also increase the error count.

This call marks job as done and releases (commits) transaction, so calling Done() is not required, although calling it will not cause any issues.

func (*Job) Tx

func (j *Job) Tx() adapter.Tx

Tx returns DB transaction that this job is locked to. You may use it as you please until you call Done(). At that point, this transaction will be committed. This function will return nil if the Job's transaction was closed with Done().

type WorkFunc

type WorkFunc func(j *Job) error

WorkFunc is a function that performs a Job. If an error is returned, the job is re-enqueued with exponential backoff.

type WorkMap

type WorkMap map[string]WorkFunc

WorkMap is a map of Job names to WorkFuncs that are used to perform Jobs of a given type.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker is a single worker that pulls jobs off the specified queue. If no Job is found, the Worker will sleep for interval seconds.

func NewWorker

func NewWorker(c *Client, wm WorkMap, options ...WorkerOption) *Worker

NewWorker returns a Worker that fetches Jobs from the Client and executes them using WorkMap. If the type of Job is not registered in the WorkMap, it's considered an error and the job is re-enqueued with a backoff.

Worker defaults to a poll interval of 5 seconds, which can be overridden by WithWorkerPollInterval option. The default queue is the nameless queue "", which can be overridden by WithWorkerQueue option.

func (*Worker) Run added in v2.1.0

func (w *Worker) Run(ctx context.Context) error

Run pulls jobs off the Worker's queue at its interval. This function does not run in its own goroutine so it’s possible to wait for completion. Use context cancellation to shut it down.

func (*Worker) Start deprecated

func (w *Worker) Start(ctx context.Context) error

Start pulls jobs off the Worker's queue at its interval. This function runs in its own goroutine, use cancel context to shut it down.

Deprecated: use Run instead of Start. Start leaks resources and does not wait for shutdown to complete.

func (*Worker) WorkOne

func (w *Worker) WorkOne(ctx context.Context) (didWork bool)

WorkOne tries to consume single message from the queue.

type WorkerOption

type WorkerOption func(*Worker)

WorkerOption defines a type that allows to set worker properties during the build-time.

func WithWorkerID

func WithWorkerID(id string) WorkerOption

WithWorkerID sets worker ID for easier identification in logs

func WithWorkerLogger

func WithWorkerLogger(logger adapter.Logger) WorkerOption

WithWorkerLogger sets Logger implementation to worker

func WithWorkerPollInterval

func WithWorkerPollInterval(d time.Duration) WorkerOption

WithWorkerPollInterval overrides default poll interval with the given value. Poll interval is the "sleep" duration if there were no jobs found in the DB.

func WithWorkerQueue

func WithWorkerQueue(queue string) WorkerOption

WithWorkerQueue overrides default worker queue name with the given value.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is a pool of Workers, each working jobs from the queue queue at the specified interval using the WorkMap.

func NewWorkerPool

func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOption) *WorkerPool

NewWorkerPool creates a new WorkerPool with count workers using the Client c.

Each Worker in the pool default to a poll interval of 5 seconds, which can be overridden by WithPoolPollInterval option. The default queue is the nameless queue "", which can be overridden by WithPoolQueue option.

func (*WorkerPool) Run added in v2.1.0

func (w *WorkerPool) Run(ctx context.Context) error

Run runs all of the Workers in the WorkerPool in own goroutines. Run blocks until all workers exit. Use context cancellation for shutdown.

func (*WorkerPool) Start deprecated

func (w *WorkerPool) Start(ctx context.Context) error

Start starts all of the Workers in the WorkerPool in own goroutines. Use cancel context to shut them down.

Deprecated: use Run instead of Start. Start leaks resources and does not wait for shutdown to complete.

type WorkerPoolOption

type WorkerPoolOption func(pool *WorkerPool)

WorkerPoolOption defines a type that allows to set worker pool properties during the build-time.

func WithPoolID

func WithPoolID(id string) WorkerPoolOption

WithPoolID sets worker pool ID for easier identification in logs

func WithPoolLogger

func WithPoolLogger(logger adapter.Logger) WorkerPoolOption

WithPoolLogger sets Logger implementation to worker pool

func WithPoolPollInterval

func WithPoolPollInterval(d time.Duration) WorkerPoolOption

WithPoolPollInterval overrides default poll interval with the given value. Poll interval is the "sleep" duration if there were no jobs found in the DB.

func WithPoolQueue

func WithPoolQueue(queue string) WorkerPoolOption

WithPoolQueue overrides default worker queue name with the given value.

Directories

Path Synopsis
gopgv10
Package gopgv10 implements github.com/go-pg/pg/v10 adapter.
Package gopgv10 implements github.com/go-pg/pg/v10 adapter.
zap

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL