workers

package module
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 26, 2018 License: MIT Imports: 21 Imported by: 8

README

Build Status GoDoc

Sidekiq compatible background workers in golang.

  • reliable queueing for all queues using brpoplpush
  • handles retries
  • support custom middleware
  • customize concurrency per queue
  • responds to Unix signals to safely wait for jobs to finish before exiting.
  • provides stats on what jobs are currently running
  • redis sentinel support
  • well tested

Example usage:

package main

import (
	"github.com/digitalocean/go-workers2"
)

func myJob(message *workers.Msg) error {
  // do something with your message
  // message.Jid()
  // message.Args() is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson)
  return nil
}

func myMiddleware(queue string, next JobFunc) JobFunc {
  return func(message *workers.Msg) (err error) {
    // do something before each message is processed
    err = next()
    // do something after each message is processed
    return
  }
}

func main() {
  workers.Configure(Options{
    // location of redis instance
    ServerAddr: "localhost:6379",
    // instance of the database
    Database:   0,
    // number of connections to keep open with redis
    PoolSize:   30,
    // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
    ProcessID:  "1",
  })

  // create a middleware chain with the default middlewares, and append myMiddleware
  mids := workers.DefaultMiddlewares().Append(myMiddleware)

  // pull messages from "myqueue" with concurrency of 10
  // this processor will not run myMiddleware, but will run the default middlewares
  workers.Process("myqueue", myJob, 10)

  // pull messages from "myqueue2" with concurrency of 20
  // this processor will run the default middlewares and myMiddleware
  workers.Process("myqueue2", myJob, 20, mids...)

  // pull messages from "myqueue3" with concurrency of 20
  // this processor will only run myMiddleware
  workers.Process("myqueue3", myJob, 20, myMiddleware)

  // Add a job to a queue
  workers.Enqueue("myqueue3", "Add", []int{1, 2})

  // Add a job to a queue with retry
  workers.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true})

  // stats will be available at http://localhost:8080/stats
  go workers.StatsServer(8080)

  // Blocks until process is told to exit via unix signal
  workers.Run()
}

Development sponsored by DigitalOcean. Code forked from github/jrallison/go-workers. Initial development sponsored by Customer.io.

Documentation

Index

Constants

View Source
const (
	DefaultRetryMax = 25
	RetryTimeFormat = "2006-01-02 15:04:05 MST"
)
View Source
const (
	RETRY_KEY          = "goretry"
	SCHEDULED_JOBS_KEY = "schedule"
)
View Source
const (
	NanoSecondPrecision = 1000000000.0
)

Variables

View Source
var Config *config

Functions

func BeforeStart

func BeforeStart(f func())

func Configure

func Configure(options Options) error

func DuringDrain

func DuringDrain(f func())

func Enqueue

func Enqueue(queue, class string, args interface{}) (string, error)

func EnqueueAt

func EnqueueAt(queue, class string, at time.Time, args interface{}) (string, error)

func EnqueueIn

func EnqueueIn(queue, class string, in float64, args interface{}) (string, error)

func EnqueueWithOptions

func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error)

func Process

func Process(queue string, job JobFunc, concurrency int, mids ...MiddlewareFunc)

func Quit

func Quit()

func ResetManagers

func ResetManagers() error

func Run

func Run()

func Start

func Start()

func Stats

func Stats(w http.ResponseWriter, req *http.Request)

func StatsServer

func StatsServer(port int)

Types

type Args

type Args struct {
	// contains filtered or unexported fields
}

func (Args) Equals

func (d Args) Equals(other interface{}) bool

func (Args) ToJson

func (d Args) ToJson() string

type EnqueueData

type EnqueueData struct {
	Queue      string      `json:"queue,omitempty"`
	Class      string      `json:"class"`
	Args       interface{} `json:"args"`
	Jid        string      `json:"jid"`
	EnqueuedAt float64     `json:"enqueued_at"`
	EnqueueOptions
}

type EnqueueOptions

type EnqueueOptions struct {
	RetryCount int     `json:"retry_count,omitempty"`
	Retry      bool    `json:"retry,omitempty"`
	At         float64 `json:"at,omitempty"`
}

type Fetcher

type Fetcher interface {
	Queue() string
	Fetch()
	Acknowledge(*Msg)
	Ready() chan bool
	FinishedWork() chan bool
	Messages() chan *Msg
	Close()
	Closed() bool
}

func NewFetch

func NewFetch(queue string, messages chan *Msg, ready chan bool) Fetcher

type JobFunc

type JobFunc func(message *Msg) error

func LogMiddleware

func LogMiddleware(queue string, next JobFunc) JobFunc

func NopMiddleware

func NopMiddleware(queue string, final JobFunc) JobFunc

NopMiddleware does nothing

func RetryMiddleware

func RetryMiddleware(queue string, next JobFunc) JobFunc

func StatsMiddleware

func StatsMiddleware(queue string, next JobFunc) JobFunc

type MiddlewareFunc

type MiddlewareFunc func(queue string, next JobFunc) JobFunc

type Middlewares

type Middlewares []MiddlewareFunc

func DefaultMiddlewares

func DefaultMiddlewares() Middlewares

func NewMiddlewares

func NewMiddlewares(mids ...MiddlewareFunc) Middlewares

func (Middlewares) Append

func (m Middlewares) Append(mid MiddlewareFunc) Middlewares

func (Middlewares) Prepend

func (m Middlewares) Prepend(mid MiddlewareFunc) Middlewares

type Msg

type Msg struct {
	// contains filtered or unexported fields
}

func NewMsg

func NewMsg(content string) (*Msg, error)

func (*Msg) Args

func (m *Msg) Args() *Args

func (Msg) Equals

func (d Msg) Equals(other interface{}) bool

func (*Msg) Jid

func (m *Msg) Jid() string

func (*Msg) OriginalJson

func (m *Msg) OriginalJson() string

func (Msg) ToJson

func (d Msg) ToJson() string

type Options

type Options struct {
	ProcessID    string
	Namespace    string
	PollInterval int
	Database     int
	Password     string
	PoolSize     int

	// Provide one of ServerAddr or (SentinelAddrs + RedisMasterName)
	ServerAddr      string
	SentinelAddrs   string
	RedisMasterName string
}

type WorkersLogger

type WorkersLogger interface {
	Println(...interface{})
	Printf(string, ...interface{})
}
var Logger WorkersLogger = log.New(os.Stdout, "workers: ", log.Ldate|log.Lmicroseconds)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL