thermocline

package module
v0.0.0-...-f164392 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2016 License: MIT Imports: 6 Imported by: 0

README

thermocline Build Status GoDoc Go Report Card codecov.io

[DEPRECATED] - Using Channels in the core broker-interface was a poor decision in hindsight. ¯_(ツ)_/¯ check out github.com/fortytw2/hoplite

A Library for implementing background-job-processing systems. Think of it as the implementation of the business-logic of sidekiq, without any convenience methods/helpful scheduling logic. Just raw workers, pools, and queues.

Basic Usage

Processor functions

A Processor is the function handed to a worker, of the type

type Processor func(*Task) ([]*Task, error)

Returning more tasks from a processor is optional, but useful in many cases. I would recommend implementing your Processor as a small wrapper that performs type-casting of Task.Info, an interface, to the type you actually want to work with.

Pool Usage

You should be using thermocline through the Pool API, as using individual workers may be a bit more error-prone/harder to work with. Basic Pool use shown below.

// A Broker is a simple message-queue implementation, an interface
// with only three functions to write to use your own.
var b thermocline.Broker
b = mem.NewBroker()

// create a new task every 10ms
ticker := time.NewTicker(time.Millisecond * 10)
go func() {
    w, err := b.Write("test", thermocline.NoVersion)
    if err != nil {
        t.Fatalf("cannot get write chan %s", err)
    }
    for {
        select {
        case t := <-ticker.C:
            tk, _ := thermocline.NewTask(t)
            w <- tk
        }
    }
}()

var worked int64
// create a worker pool on the unversioned queue "test", with
p, err := thermocline.NewPool("test", thermocline.NoVersion, b, func(task *thermocline.Task) ([]*thermocline.Task, error) {
    atomic.AddInt64(&worked, 1)
    return nil, nil
}, 30)
if err != nil {
    t.Errorf("cannot create pool %s", err)
}

time.Sleep(500 * time.Millisecond)
ticker.Stop()
err = p.Stop()
if err != nil {
    // oh no a nasty error
    return err
}

fmt.Println(atomic.LoadInt64(&worked))

Runtime tune-able pools

The number of workers in a given pool can be easily tuned at runtime via pool#Add(workers int), which can both add and remove workers from a pool.

To stop a pool, simply call pool.Stop(), which will close out all workers and wait for their exit.

LICENSE

MIT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultPriority = 1

DefaultPriority is the default priority of tasks

View Source
var MaxRetries = 3

MaxRetries is the number of times a task will be attempted before it's marked "failed"

View Source
var NoVersion = "n/a"

NoVersion is used to denote non-versioned queues

Functions

This section is empty.

Types

type Broker

type Broker interface {
	// Read returns a channel of incoming tasks
	Read(queue string, version string) (<-chan *Task, error)
	// Write returns a channel of outgoing tasks
	Write(queue string, version string) (chan<- *Task, error)
	// Stats returns current statistics of the broker (most likely just a bunch
	// of counters.)
	Stats() *Stats
}

A Broker is used as the communication interface between the workers and queue-er

type Pool

type Pool struct {
	Queue   string
	Version string

	*sync.RWMutex
	// contains filtered or unexported fields
}

A Pool is a set of workers that all function on the same queue

func NewPool

func NewPool(queue, version string, b Broker, fn Processor, workers int) (*Pool, error)

NewPool returns a running worker pool on the given queue/version

func (*Pool) Add

func (p *Pool) Add(n int) error

Add changes the number of workers

func (*Pool) Len

func (p *Pool) Len() int

Len returns the total number of workers in this group

func (*Pool) Stop

func (p *Pool) Stop() error

Stop turns off all workers in the pool

type Processor

type Processor func(*Task) ([]*Task, error)

A Processor is the function given to any worker

type Stats

type Stats struct {
	Failures map[string]int `json:"failures"`
	Retries  map[string]int `json:"retries"`
	Finished map[string]int `json:"finished"`
	Total    map[string]int `json:"total"`
}

Stats are the statistics unit that should be returned by a broker

type Task

type Task struct {
	ID      string `json:"id"`
	Retries int    `json:"retries"`

	// CreatedAt
	CreatedAt time.Time `json:"created_at"`
	// WorkAt is the earliest this task can be worked
	WorkAt   time.Time `json:"work_at"`
	Priority int       `json:"priority"`

	Info interface{} `json:"info"`
}

A Task contains information sent from the queue-er to the worker

func NewDelayedTask

func NewDelayedTask(info interface{}, delay time.Duration) (*Task, error)

NewDelayedTask returns a task that will be worked after a certain time has elapsed

func NewTask

func NewTask(info interface{}) (*Task, error)

NewTask returns a normal old task

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

A Worker is the basic unit of task execution

func NewWorker

func NewWorker(reader <-chan *Task, writer chan<- *Task, fn Processor, stopper chan struct{}) *Worker

NewWorker creates a new worker

func (*Worker) Work

func (w *Worker) Work(wg *sync.WaitGroup)

Work turns on the worker

Directories

Path Synopsis
mem

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL