koda

package module
v0.0.0-...-ed9c6d1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 6, 2016 License: MIT Imports: 11 Imported by: 0

README

koda-go

A managed, hybrid priority/delayed job queue. Built on redis.

Build Status

Getting Started

// Add a job to the priority queue. Lowest priority is 0, highest priority is 100.
koda.Submit("send-newsletter", 100, map[string][]string{
    "users": []string{"bob@google.com"},
})

// Alternatively, a job can be put on the delayed queue.
koda.SubmitDelayed("send-newsletter", 5*time.Minute, map[string][]string{
    "users": []string{"mary@google.com"},
})

// Register a HandlerFunc and specify the # of workers to work the queue
koda.Register("send-newsletter", 10, func(job *koda.Job) error {
    // Unmarshal the payload specified by Submit/SubmitDelayed
    var payload map[string][]string
    if err := job.UnmarshalPayload(&payload); err != nil {
        return err
    }

    for _, user := range payload["users"] {
        fmt.Println("Sending newsletter to", user)
        // TODO: actually send the email to the user
    }

    return nil
})

koda.WorkForever()

Documentation

Index

Constants

View Source
const (
	// Initial jobs are created jobs, but not associated with a queue.
	Initial JobState = 0
	// Queued jobs are in a queue, waiting to be processed.
	Queued = 1
	// Working jobs are currently being processed.
	Working = 2
	// Finished jobs that have completed successfully
	Finished = 3
	// Dead jobs are jobs that have failed > Qeuue.MaxAttempts
	Dead = 4
)

Variables

View Source
var DefaultClient = NewClient(nil)

DefaultClient is the Client used by the package-level functions.

Functions

func Configure

func Configure(opts *Options)

Configure the DefaultClient with the given Options

func Register

func Register(queue string, numWorkers int, f HandlerFunc)

Register a given HandlerFunc with a queue

func WorkForever

func WorkForever()

WorkForever will being processing registered queues. This routine will block until SIGINT is received.

Types

type Canceller

type Canceller interface {
	Cancel()
	CancelWithTimeout(d time.Duration)
}

Canceller allows the user to cancel all working jobs. If timeout is not set, all currently working jobs will immediately be marked failed.

func Work

func Work() Canceller

Work will begin processing any registered queues in a separate goroutine. Use returned Canceller to stop any outstanding workers.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a managed koda client. Users should not attempt instantiate their own Client, but instead should use NewClient.

func NewClient

func NewClient(opts *Options) *Client

NewClient creates a new client with the given Options.

func (*Client) CreateJob

func (c *Client) CreateJob(payload interface{}) (Job, error)

CreateJob will create a job in the Initial state.

func (*Client) Job

func (c *Client) Job(id int) (Job, error)

Job fetches a job with the given job ID

func (*Client) Register

func (c *Client) Register(queue Queue, f HandlerFunc)

Register a HandlerFunc for a given Queue

func (*Client) Submit

func (c *Client) Submit(queue Queue, priority int, payload interface{}) (Job, error)

Submit creates a job and puts it on the priority queue.

func (*Client) SubmitDelayed

func (c *Client) SubmitDelayed(queue Queue, d time.Duration, payload interface{}) (Job, error)

SubmitDelayed creates a job and puts it on the delayed queue.

func (*Client) SubmitDelayedJob

func (c *Client) SubmitDelayedJob(queue Queue, d time.Duration, job Job) (Job, error)

SubmitDelayedJob puts an existing job on the delayed queue.

func (*Client) SubmitJob

func (c *Client) SubmitJob(queue Queue, priority int, job Job) (Job, error)

SubmitJob puts an existing job on the priority queue.

func (*Client) Work

func (c *Client) Work() Canceller

Work will begin processing any registered queues in a separate goroutine. Use returned Canceller to stop any outstanding workers.

func (*Client) WorkForever

func (c *Client) WorkForever()

WorkForever will being processing registered queues. This routine will block until SIGINT is received.

type Conn

type Conn interface {
	Incr(key string) (int, error)
	// TODO: Update this to return a map[string]string
	HGetAll(key string) ([]string, error)
	HSetAll(key string, fields map[string]string) error
	RPush(key string, value ...string) (int, error)
	BLPop(timeout time.Duration, keys ...string) ([]string, error)
	ZAddNX(key string, score float64, member string) (int, error)
	// ZPopByScore has the same interface as ZRANGEBYSCORE, but also removes each member
	ZPopByScore(key string, min, max float64, minIncl, maxIncl bool, offset, count int) ([]string, error)
	Subscribe(channel string) (<-chan string, error)
	Close() error
}

Conn may be implemented by custom redis connections. See Options.ConnFactory. Note to implementers, each function must be atomic.

type HandlerFunc

type HandlerFunc func(j *Job) error

HandlerFunc is used when registering a worker for a queue. If an error is returned, the job will be marked as failed. If Job.NumAttempts exceeds Queue.MaxAttempts, the job is placed in the Dead state. Otherwise it is placed on the delayed queue with a delay of Queue.RetryInterval

type Job

type Job struct {
	ID             int
	State          JobState
	DelayedUntil   time.Time
	CreationTime   time.Time
	CompletionTime time.Time
	Priority       int
	NumAttempts    int
	// contains filtered or unexported fields
}

Job represents a koda job. Job should not be instantiated directly. Instead use Client.CreateJob, Client.Submit and Client.SubmitDelayed to create a Job.

func Submit

func Submit(queue string, priority int, payload interface{}) (Job, error)

Submit creates a job and puts it on the priority queue.

func SubmitDelayed

func SubmitDelayed(queue string, d time.Duration, payload interface{}) (Job, error)

SubmitDelayed creates a job and puts it on the delayed queue.

func (*Job) UnmarshalPayload

func (j *Job) UnmarshalPayload(v interface{}) error

UnmarshalPayload will unmarshal the associated payload into v.

type JobState

type JobState int

JobState represesents the state of a job

func (JobState) String

func (s JobState) String() string

type Options

type Options struct {
	// Redis URL (format: redis://[auth@]host[:port][/database])
	// Default: redis://localhost:6379
	URL string

	// Prefix for redis keys
	// Default: koda
	Prefix string

	ConnFactory func() Conn
}

Options for a Client.

type Queue

type Queue struct {
	Name string

	// The number of simultaneous workers
	// Default: 1
	NumWorkers int

	// The maximum number of attempts for a single job.
	// Default: 1
	MaxAttempts int

	// The interval between attempts
	// Default: 0
	RetryInterval time.Duration
	// contains filtered or unexported fields
}

Queue represesents a configurable queue.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL