faktory_worker

package module
v1.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2022 License: MPL-2.0 Imports: 16 Imported by: 21

README

faktory_worker_go

travis

This repository provides a Faktory worker process for Go apps. This worker process fetches background jobs from the Faktory server and processes them.

How is this different from all the other Go background worker libraries? They all use Redis or another "dumb" datastore. This library is far simpler because the Faktory server implements most of the data storage, retry logic, Web UI, etc.

Installation

You must install Faktory first. Then:

go get -u github.com/contribsys/faktory_worker_go

Usage

To process background jobs, follow these steps:

  1. Register your job types and their associated funcs
  2. Set a few optional parameters
  3. Start the processing

There are a couple ways to stop the process. In this example, send the TERM or INT signal.

package main

import (
  "log"

  worker "github.com/contribsys/faktory_worker_go"
)

func someFunc(ctx context.Context, args ...interface{}) error {
  help := worker.HelperFor(ctx)
  log.Printf("Working on job %s\n", help.Jid())
  return nil
}

func main() {
  mgr := worker.NewManager()

  // register job types and the function to execute them
  mgr.Register("SomeJob", someFunc)
  //mgr.Register("AnotherJob", anotherFunc)

  // use up to N goroutines to execute jobs
  mgr.Concurrency = 20

  // pull jobs from these queues, in this order of precedence
  mgr.ProcessStrictPriorityQueues("critical", "default", "bulk")

  // alternatively you can use weights to avoid starvation
  //mgr.ProcessWeightedPriorityQueues(map[string]int{"critical":3, "default":2, "bulk":1})

  // Start processing jobs, this method does not return.
  mgr.Run()
}

Alternatively you can control the stopping of the Manager using RunWithContext. You must process any signals yourself.

package main

import (
  "context"
  "log"
  "os"
  "os/signal"
  "syscall"

  worker "github.com/contribsys/faktory_worker_go"
)

func someFunc(ctx context.Context, args ...interface{}) error {
  help := worker.HelperFor(ctx)
  log.Printf("Working on job %s\n", help.Jid())
  return nil
}

func main() {
  ctx, cancel := context.WithCancel(context.Background())
  mgr := worker.NewManager()

  // register job types and the function to execute them
  mgr.Register("SomeJob", someFunc)
  //mgr.Register("AnotherJob", anotherFunc)

  // use up to N goroutines to execute jobs
  mgr.Concurrency = 20

  // pull jobs from these queues, in this order of precedence
  mgr.ProcessStrictPriorityQueues("critical", "default", "bulk")

  // alternatively you can use weights to avoid starvation
  //mgr.ProcessWeightedPriorityQueues(map[string]int{"critical":3, "default":2, "bulk":1})

  go func(){
    // Start processing jobs in background routine, this method does not return 
    // unless an error is returned or cancel() is called
    mgr.RunWithContext(ctx)
  }()
  
  go func() {
    stopSignals := []os.Signal{
      syscall.SIGTERM, 
      syscall.SIGINT,
    }
    stop := make(chan os.Signal, len(stopSignals))
    for _, s := range stopSignals {
       signal.Notify(stop, s)
    }

    for {
      select {
      case <-ctx.Done():
        return
      case <-stop:
        cancel()
      }
    }
  }()
  
  <-ctx.Done()
}

See test/main.go for a working example.

FAQ

  • How do I specify the Faktory server location?

By default, it will use localhost:7419 which is sufficient for local development. Use FAKTORY_URL to specify the URL, e.g. tcp://faktory.example.com:12345 or use FAKTORY_PROVIDER to specify the environment variable which does contain the URL: FAKTORY_PROVIDER=FAKTORYTOGO_URL. This level of indirection is useful for SaaSes, Heroku Addons, etc.

  • How do I push new jobs to Faktory?
  1. Inside a job, you can check out a connection from the Pool of Faktory connections using the job helper's With method:
func someFunc(ctx context.Context, args ...interface{}) error {
  help := worker.HelperFor(ctx)
  return help.With(func(cl *faktory.Client) error {
    job := faktory.NewJob("SomeJob", 1, 2, 3)
    return cl.Push(job)
  })
}
  1. You can always open a client connection to Faktory directly but this won't perform as well:
import (
  faktory "github.com/contribsys/faktory/client"
)

client, err := faktory.Open()
job := faktory.NewJob("SomeJob", 1, 2, 3)
err = client.Push(job)

NB: Client instances are not safe to share, you can use a Pool of Clients which is thread-safe.

See the Faktory Client API for Go or Ruby. You can implement a Faktory client in any programming langauge. See the wiki for details.

Author

Mike Perham, @getajobmike, @contribsys

License

This codebase is licensed via the Mozilla Public License, v2.0. https://choosealicense.com/licenses/mpl-2.0/

Documentation

Index

Constants

View Source
const (
	Startup  lifecycleEventType = 1
	Quiet    lifecycleEventType = 2
	Shutdown lifecycleEventType = 3
)
View Source
const (
	Version = "1.6.0"
)

Variables

View Source
var (
	SIGTERM os.Signal = syscall.SIGTERM
	SIGTSTP os.Signal = syscall.SIGTSTP
	SIGTTIN os.Signal = syscall.SIGTTIN
	SIGINT  os.Signal = os.Interrupt
)
View Source
var (
	NoAssociatedBatchError = fmt.Errorf("No batch associated with this job")
)

Functions

This section is empty.

Types

type Handler

type Handler func(ctx context.Context, job *faktory.Job) error

type Helper added in v1.4.0

type Helper interface {
	Jid() string
	JobType() string

	// Custom provides access to the job custom hash.
	// Returns the value and `ok=true` if the key was found.
	// If not, returns `nil` and `ok=false`.
	//
	// No type checking is performed, please use with caution.
	Custom(key string) (value interface{}, ok bool)

	// Faktory Enterprise:
	// the BID of the Batch associated with this job
	Bid() string

	// Faktory Enterprise:
	// the BID of the Batch associated with this callback (complete or success) job
	CallbackBid() string

	// Faktory Enterprise:
	// open the batch associated with this job so we can add more jobs to it.
	//
	//   func myJob(ctx context.Context, args ...interface{}) error {
	//     helper := worker.HelperFor(ctx)
	//     helper.Batch(func(b *faktory.Batch) error {
	//       return b.Push(faktory.NewJob("sometype", 1, 2, 3))
	//     })
	Batch(func(*faktory.Batch) error) error

	// allows direct access to the Faktory server from the job
	With(func(*faktory.Client) error) error

	// Faktory Enterprise:
	// this method integrates with Faktory Enterprise's Job Tracking feature.
	// `reserveUntil` is optional, only needed for long jobs which have more dynamic
	// lifetimes.
	//
	//     helper.TrackProgress(10, "Updating code...", nil)
	//     helper.TrackProgress(20, "Cleaning caches...", &time.Now().Add(1 * time.Hour)))
	//
	TrackProgress(percent int, desc string, reserveUntil *time.Time) error
}

The Helper provides access to valuable data and APIs within an executing job.

We're pretty strict about what's exposed in the Helper because execution should be orthogonal to most of the Job payload contents.

  func myJob(ctx context.Context, args ...interface{}) error {
    helper := worker.HelperFor(ctx)
    jid := helper.Jid()

    helper.With(func(cl *faktory.Client) error {
      cl.Push("anotherJob", 4, "arg")
		 })

func HelperFor added in v1.4.0

func HelperFor(ctx context.Context) Helper

Caution: this method must only be called within the context of an executing job. It will panic if it cannot create a Helper due to missing context values.

type LifecycleEventHandler added in v1.4.0

type LifecycleEventHandler func(*Manager) error

type Logger

type Logger interface {
	Debug(v ...interface{})
	Debugf(format string, args ...interface{})
	Info(v ...interface{})
	Infof(format string, args ...interface{})
	Warn(v ...interface{})
	Warnf(format string, args ...interface{})
	Error(v ...interface{})
	Errorf(format string, args ...interface{})
	Fatal(v ...interface{})
	Fatalf(format string, args ...interface{})
}

func NewStdLogger

func NewStdLogger() Logger

type Manager

type Manager struct {
	Concurrency int
	Logger      Logger
	ProcessWID  string
	Labels      []string
	Pool        *faktory.Pool
	// contains filtered or unexported fields
}

Manager coordinates the processes for the worker. It is responsible for starting and stopping goroutines to perform work at the desired concurrency level

func NewManager

func NewManager() *Manager

NewManager returns a new manager with default values.

func (*Manager) On

func (mgr *Manager) On(event lifecycleEventType, fn LifecycleEventHandler)

Register a callback to be fired when a process lifecycle event occurs. These are useful for hooking into process startup or shutdown.

func (*Manager) ProcessStrictPriorityQueues

func (mgr *Manager) ProcessStrictPriorityQueues(queues ...string)

One of the Process*Queues methods should be called once before Run()

func (*Manager) ProcessWeightedPriorityQueues

func (mgr *Manager) ProcessWeightedPriorityQueues(queues map[string]int)

func (*Manager) Quiet

func (mgr *Manager) Quiet()

After calling Quiet(), no more jobs will be pulled from Faktory by this process.

func (*Manager) Register

func (mgr *Manager) Register(name string, fn Perform)

Register a handler for the given jobtype. It is expected that all jobtypes are registered upon process startup.

mgr.Register("ImportantJob", ImportantFunc)

func (*Manager) Run

func (mgr *Manager) Run() error

Run starts processing jobs. This method does not return unless an error is encountered while starting.

func (*Manager) RunWithContext added in v1.6.0

func (mgr *Manager) RunWithContext(ctx context.Context) error

RunWithContext starts processing jobs. The method will return if an error is encountered while starting. If the context is present then os signals will be ignored, the context must be canceled for the method to return after running.

func (*Manager) Terminate

func (mgr *Manager) Terminate(reallydie bool)

Terminate signals that the various components should shutdown. Blocks on the shutdownWaiter until all components have finished.

func (*Manager) Use

func (mgr *Manager) Use(middleware ...MiddlewareFunc)

Use(...) adds middleware to the chain.

type MiddlewareFunc

type MiddlewareFunc func(ctx context.Context, job *faktory.Job, next func(ctx context.Context) error) error

type NoHandlerError added in v1.4.0

type NoHandlerError struct {
	JobType string
}

func (*NoHandlerError) Error added in v1.4.0

func (s *NoHandlerError) Error() string

type Perform

type Perform func(ctx context.Context, args ...interface{}) error

Perform actually executes the job. It must be thread-safe.

type PerformExecutor added in v1.4.0

type PerformExecutor interface {
	Execute(*faktory.Job, Perform) error
}

func NewTestExecutor added in v1.4.0

func NewTestExecutor(p *faktory.Pool) PerformExecutor

type StdLogger

type StdLogger struct {
	*log.Logger
}

func (*StdLogger) Debug

func (l *StdLogger) Debug(v ...interface{})

func (*StdLogger) Debugf

func (l *StdLogger) Debugf(format string, v ...interface{})

func (*StdLogger) Error

func (l *StdLogger) Error(v ...interface{})

func (*StdLogger) Errorf

func (l *StdLogger) Errorf(format string, v ...interface{})

func (*StdLogger) Info

func (l *StdLogger) Info(v ...interface{})

func (*StdLogger) Infof

func (l *StdLogger) Infof(format string, v ...interface{})

func (*StdLogger) Warn

func (l *StdLogger) Warn(v ...interface{})

func (*StdLogger) Warnf

func (l *StdLogger) Warnf(format string, v ...interface{})

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL