workers

package module
v0.10.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2021 License: MIT Imports: 24 Imported by: 5

README

Build Status GoDoc

Sidekiq compatible background workers in golang.

  • reliable queueing for all queues using brpoplpush
  • handles retries
  • support custom middleware
  • customize concurrency per queue
  • responds to Unix signals to safely wait for jobs to finish before exiting.
  • provides stats on what jobs are currently running
  • redis sentinel support
  • well tested

Example usage:

package main

import (
  "fmt"

  workers "github.com/digitalocean/go-workers2"
)

func myJob(message *workers.Msg) error {
  // do something with your message
  // message.Jid()
  // message.Args() is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson)
  return nil
}

func myMiddleware(queue string, mgr *workers.Manager, next workers.JobFunc) workers.JobFunc {
  return func(message *workers.Msg) (err error) {
    // do something before each message is processed
    err = next(message)
    // do something after each message is processed
    return
  }
}

func main() {
  // Create a manager, which manages workers
  manager, err := workers.NewManager(workers.Options{
    // location of redis instance
    ServerAddr: "localhost:6379",
    // instance of the database
    Database:   0,
    // number of connections to keep open with redis
    PoolSize:   30,
    // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
    ProcessID:  "1",
  })

  if err != nil {
    fmt.Println(err)
  }

  // create a middleware chain with the default middlewares, and append myMiddleware
  mids := workers.DefaultMiddlewares().Append(myMiddleware)

  // pull messages from "myqueue" with concurrency of 10
  // this worker will not run myMiddleware, but will run the default middlewares
  manager.AddWorker("myqueue", 10, myJob)

  // pull messages from "myqueue2" with concurrency of 20
  // this worker will run the default middlewares and myMiddleware
  manager.AddWorker("myqueue2", 20, myJob, mids...)

  // pull messages from "myqueue3" with concurrency of 20
  // this worker will only run myMiddleware
  manager.AddWorker("myqueue3", 20, myJob, myMiddleware)

  // If you already have a manager and want to enqueue
  // to the same place:
  producer := manager.Producer()

  // Alternatively, if you want to create a producer to enqueue messages
  // producer, err := workers.NewProducer(Options{
  //   // location of redis instance
  //   ServerAddr: "localhost:6379",
  //   // instance of the database
  //   Database:   0,
  //   // number of connections to keep open with redis
  //   PoolSize:   30,
  //   // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
  //   ProcessID:  "1",
  // })

  // Add a job to a queue
  producer.Enqueue("myqueue3", "Add", []int{1, 2})

  // Add a job to a queue with retry
  producer.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true})

  // stats will be available at http://localhost:8080/stats
  go workers.StartAPIServer(8080)

  // Blocks until process is told to exit via unix signal
  manager.Run()
}

When running the above code example, it will produce the following output at localhost:8080/stats:

[
  {
    "manager_name": "",
    "processed": 5,
    "failed": 57,
    "jobs": {
      "myqueue": null,
      "myqueue2": null,
      "myqueue3": null
    },
    "enqueued": {
      "myqueue": 0,
      "myqueue2": 0,
      "myqueue3": 0
    },
    "retry_count": 4
  }
]

Development sponsored by DigitalOcean. Code forked from github/jrallison/go-workers. Initial development sponsored by Customer.io.

Documentation

Index

Constants

View Source
const (
	// DefaultRetryMax is default for max number of retries for a job
	DefaultRetryMax = 25

	// RetryTimeFormat is default for retry time format
	RetryTimeFormat = "2006-01-02 15:04:05 MST"
)
View Source
const (
	// NanoSecondPrecision is a constant for the number of nanoseconds in a second
	NanoSecondPrecision = 1000000000.0
)

Variables

View Source
var Logger = log.New(os.Stdout, "go-workers2: ", log.Ldate|log.Lmicroseconds)

Logger is the default go-workers2 logger, only used here in this file. TODO: remove this

Functions

func ConfigureAPIServer added in v0.10.0

func ConfigureAPIServer(options APIOptions)

ConfigureAPIServer allows global API server configuration with the given options

func RegisterAPIEndpoints added in v0.10.0

func RegisterAPIEndpoints(mux *http.ServeMux)

RegisterAPIEndpoints sets up API server endpoints

func StartAPIServer added in v0.9.0

func StartAPIServer(port int)

StartAPIServer starts the API server

func StopAPIServer added in v0.9.2

func StopAPIServer()

StopAPIServer stops the API server

Types

type APIOptions added in v0.10.0

type APIOptions struct {
	Logger *log.Logger
	Mux    *http.ServeMux
}

APIOptions contains the set of configuration options for the global api

type Args

type Args struct {
	// contains filtered or unexported fields
}

Args is the set of parameters for a message

func (Args) Equals

func (d Args) Equals(other interface{}) bool

func (Args) ToJson

func (d Args) ToJson() string

ToJson return data in JSON format th message

type EnqueueData

type EnqueueData struct {
	Queue      string      `json:"queue,omitempty"`
	Class      string      `json:"class"`
	Args       interface{} `json:"args"`
	Jid        string      `json:"jid"`
	EnqueuedAt float64     `json:"enqueued_at"`
	EnqueueOptions
}

EnqueueData stores data and configuration for new work

type EnqueueOptions

type EnqueueOptions struct {
	RetryCount int     `json:"retry_count,omitempty"`
	RetryMax   int     `json:"retry_max,omitempty"`
	Retry      bool    `json:"retry,omitempty"`
	At         float64 `json:"at,omitempty"`
}

EnqueueOptions stores configuration for new work

type Fetcher

type Fetcher interface {
	Queue() string
	Fetch()
	Acknowledge(*Msg)
	Ready() chan bool
	Messages() chan *Msg
	Close()
	Closed() bool
}

Fetcher is an interface for managing work messages

type JobFunc

type JobFunc func(message *Msg) error

JobFunc is a message processor

func LogMiddleware

func LogMiddleware(queue string, mgr *Manager, next JobFunc) JobFunc

LogMiddleware is the default logging middleware

func NopMiddleware

func NopMiddleware(queue string, mgr *Manager, final JobFunc) JobFunc

NopMiddleware does nothing

func RetryMiddleware

func RetryMiddleware(queue string, mgr *Manager, next JobFunc) JobFunc

RetryMiddleware middleware that allows retries for jobs failures

func StatsMiddleware

func StatsMiddleware(queue string, mgr *Manager, next JobFunc) JobFunc

StatsMiddleware middleware to collect stats on processed messages

type JobStatus added in v0.8.1

type JobStatus struct {
	Message   *Msg  `json:"message"`
	StartedAt int64 `json:"started_at"`
}

JobStatus contains the status and data for active jobs of a manager

type Manager added in v0.8.0

type Manager struct {
	// contains filtered or unexported fields
}

Manager coordinates work, workers, and signaling needed for job processing

func NewManager added in v0.8.0

func NewManager(options Options) (*Manager, error)

NewManager creates a new manager with provide options

func NewManagerWithRedisClient added in v0.8.3

func NewManagerWithRedisClient(options Options, client *redis.Client) (*Manager, error)

NewManagerWithRedisClient creates a new manager with provide options and pre-configured Redis client

func (*Manager) AddBeforeStartHooks added in v0.8.0

func (m *Manager) AddBeforeStartHooks(hooks ...func())

AddBeforeStartHooks adds functions to be executed before the manager starts

func (*Manager) AddDuringDrainHooks added in v0.8.0

func (m *Manager) AddDuringDrainHooks(hooks ...func())

AddDuringDrainHooks adds function to be execute during a drain operation

func (*Manager) AddRetriesExhaustedHandlers added in v0.9.3

func (m *Manager) AddRetriesExhaustedHandlers(handlers ...RetriesExhaustedFunc)

AddRetriesExhaustedHandlers adds function(s) to be executed when retries are exhausted for a job.

func (*Manager) AddWorker added in v0.8.0

func (m *Manager) AddWorker(queue string, concurrency int, job JobFunc, mids ...MiddlewareFunc)

AddWorker adds a new job processing worker

func (*Manager) GetRedisClient added in v0.8.1

func (m *Manager) GetRedisClient() *redis.Client

GetRedisClient returns the Redis client used by the manager

func (*Manager) GetRetries added in v0.9.0

func (m *Manager) GetRetries(page uint64, pageSize int64, match string) (Retries, error)

GetRetries returns the set of retry jobs for the manager

func (*Manager) GetStats added in v0.8.1

func (m *Manager) GetStats() (Stats, error)

GetStats returns the set of stats for the manager

func (*Manager) Producer added in v0.8.0

func (m *Manager) Producer() *Producer

Producer creates a new work producer with configuration identical to the manager

func (*Manager) Run added in v0.8.0

func (m *Manager) Run()

Run starts all workers under this Manager and blocks until they exit.

func (*Manager) SetRetriesExhaustedHandlers added in v0.9.3

func (m *Manager) SetRetriesExhaustedHandlers(handlers ...RetriesExhaustedFunc)

SetRetriesExhaustedHandlers sets function(s) that will be sequentially executed when retries are exhausted for a job.

func (*Manager) Stop added in v0.8.0

func (m *Manager) Stop()

Stop all workers under this Manager and returns immediately.

type MiddlewareFunc

type MiddlewareFunc func(queue string, m *Manager, next JobFunc) JobFunc

MiddlewareFunc is an extra function on the processing pipeline

type Middlewares

type Middlewares []MiddlewareFunc

Middlewares contains the lists of all configured middleware functions

func DefaultMiddlewares

func DefaultMiddlewares() Middlewares

DefaultMiddlewares creates the default middleware pipeline

func NewMiddlewares

func NewMiddlewares(mids ...MiddlewareFunc) Middlewares

NewMiddlewares creates the processing pipeline given the list of middleware funcs

func (Middlewares) Append

func (m Middlewares) Append(mid MiddlewareFunc) Middlewares

Append adds middleware to the end of the processing pipeline

func (Middlewares) Prepend

func (m Middlewares) Prepend(mid MiddlewareFunc) Middlewares

Prepend adds middleware to the front of the processing pipeline

type Msg

type Msg struct {
	// contains filtered or unexported fields
}

Msg is the struct for job data (parameters and metadata)

func NewMsg

func NewMsg(content string) (*Msg, error)

NewMsg returns a new message

func (*Msg) Args

func (m *Msg) Args() *Args

Args returns arguments attribute of a message

func (*Msg) Class added in v0.9.3

func (m *Msg) Class() string

Class returns class attribute of a message

func (Msg) Equals

func (d Msg) Equals(other interface{}) bool

func (*Msg) Jid

func (m *Msg) Jid() string

Jid returns job id attribute of a message

func (*Msg) OriginalJson

func (m *Msg) OriginalJson() string

OriginalJson returns the original JSON message

func (Msg) ToJson

func (d Msg) ToJson() string

ToJson return data in JSON format th message

type Options

type Options struct {
	ProcessID    string
	Namespace    string
	PollInterval time.Duration
	Database     int
	Password     string
	PoolSize     int

	// Provide one of ServerAddr or (SentinelAddrs + RedisMasterName)
	ServerAddr      string
	SentinelAddrs   string
	RedisMasterName string
	RedisTLSConfig  *tls.Config

	// Optional display name used when displaying manager stats
	ManagerDisplayName string

	// Log
	Logger *log.Logger
	// contains filtered or unexported fields
}

Options contains the set of configuration options for a manager and/or producer

type Producer added in v0.8.0

type Producer struct {
	// contains filtered or unexported fields
}

Producer is used to enqueue new work

func NewProducer added in v0.8.0

func NewProducer(options Options) (*Producer, error)

NewProducer creates a new producer with the given options

func NewProducerWithRedisClient added in v0.8.3

func NewProducerWithRedisClient(options Options, client *redis.Client) (*Producer, error)

NewProducerWithRedisClient creates a new producer with the given options and Redis client

func (*Producer) Enqueue added in v0.8.0

func (p *Producer) Enqueue(queue, class string, args interface{}) (string, error)

Enqueue enqueues new work for immediate processing

func (*Producer) EnqueueAt added in v0.8.0

func (p *Producer) EnqueueAt(queue, class string, at time.Time, args interface{}) (string, error)

EnqueueAt enqueues new work for processing at a specific time

func (*Producer) EnqueueIn added in v0.8.0

func (p *Producer) EnqueueIn(queue, class string, in float64, args interface{}) (string, error)

EnqueueIn enqueues new work for delayed processing

func (*Producer) EnqueueWithOptions added in v0.8.0

func (p *Producer) EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error)

EnqueueWithOptions enqueues new work for processing with the given options

func (*Producer) GetRedisClient added in v0.8.1

func (p *Producer) GetRedisClient() *redis.Client

GetRedisClient returns the Redis client used by the producer Deprecated: the Redis client is an internal implementation and access will be removed

type Retries added in v0.9.0

type Retries struct {
	TotalRetryCount int64  `json:"total_retry_count"`
	RetryJobs       []*Msg `json:"retry_jobs"`
}

Retries stores retry information

type RetriesExhaustedFunc added in v0.9.3

type RetriesExhaustedFunc func(queue string, message *Msg, err error)

RetriesExhaustedFunc gets executed when retry attempts have been exhausted.

type Stats

type Stats struct {
	Name       string                 `json:"manager_name"`
	Processed  int64                  `json:"processed"`
	Failed     int64                  `json:"failed"`
	Jobs       map[string][]JobStatus `json:"jobs"`
	Enqueued   map[string]int64       `json:"enqueued"`
	RetryCount int64                  `json:"retry_count"`
}

Stats containts current stats for a manager

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL