genie

package module
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 15, 2021 License: MIT Imports: 17 Imported by: 0

README

🧞 Genie

Genie is a dead simple job-queue library/tool.

Usage

import "github.com/spy16/genie"

func main() {
    // connect to sqlite3.
    q, err := genie.Open("sqlite3://my-queue.db", nil)
    if err != nil {
        panic(err)
    }
    defer q.Close()


    // enqueue items on the queue.
    // this can be exposed as http api or something.
    _ = q.Push(ctx, Item{
        ID: "job1",
        Type:"job-category",
        Payload:"arbitrary data for executing job",
    })

    // run the poll-excute loop
    log.Fatalf("exited: %v", q.Run(ctx, myExecutorFunc))
}

func myExecutorFunc(ctx context.Context, item genie.Item) error {
    // do your thing

    return nil
    // return genie.ErrFail to fail immediately
    // return genie.ErrSkip to skip this item
    // return any other error to signal retry.
}

Documentation

Index

Constants

View Source
const (
	StatusDone    = "DONE"    // fn finished successfully.
	StatusFailed  = "FAILED"  // all attempts failed or fn returned ErrFail.
	StatusPending = "PENDING" // attempts are still remaining.
	StatusSkipped = "SKIPPED" // fn returned ErrSkip
)

Status values an item on the queue can have.

Variables

View Source
var (
	// ErrSkip can be returned by HandlerFn to indicate that the queued item
	// be skipped immediately.
	ErrSkip = errors.New("skip")

	// ErrFail can be returned by HandlerFn to indicate no further retries
	// should be attempted.
	ErrFail = errors.New("failed")
)

Functions

func Router

func Router(q Queue, customBanner ...string) http.Handler

Router returns a new web portal handler.

Types

type Fn added in v0.3.0

type Fn func(ctx context.Context, item Item) error

type Handler added in v0.2.0

type Handler interface {
	Handle(ctx context.Context, item Item) ([]byte, error)
	Sanitize(ctx context.Context, item *Item) error
}

HandlerFn is invoked by the queue instance when an item is available for execution or for validation when items are being enqueued.

type HandlerFn

type HandlerFn func(ctx context.Context, item Item) ([]byte, error)

HandlerFn implements Handler using Go native func value.

func (HandlerFn) Handle added in v0.2.0

func (h HandlerFn) Handle(ctx context.Context, item Item) ([]byte, error)

func (HandlerFn) Sanitize added in v0.2.0

func (h HandlerFn) Sanitize(_ context.Context, _ *Item) error

type Item

type Item struct {
	ID          string    `json:"id"`
	Type        string    `json:"type"`
	Payload     string    `json:"payload"`
	GroupID     string    `json:"group_id"`
	Attempt     int       `json:"attempt"`
	MaxAttempts int       `json:"max_attempts"`
	NextAttempt time.Time `json:"next_attempt"`
	Result      string    `json:"result"`
}

Item represents an item on the queue.

type Options

type Options struct {
	PollInt      time.Duration
	FnTimeout    time.Duration
	MaxAttempts  int
	RetryBackoff time.Duration
}

Options represents optional queue configurations.

type Queue

type Queue interface {
	ForEach(ctx context.Context, groupID, status string, fn Fn) error
	Push(ctx context.Context, items ...Item) error
	Run(ctx context.Context) error
	Stats() ([]Stats, error)
	JobTypes() []string
	Close() error
}

Queue represents a priority or delay queue.

func Open

func Open(queueSpec string, enableTypes []string, h Handler) (Queue, error)

Open opens a queue based on the spec and returns it. If the keys/tables required for the queue are not present, they will be created as needed.

type Stats

type Stats struct {
	GroupID string `json:"group_id" db:"group_id"`
	Type    string `json:"type"`
	Total   int    `json:"total"`
	Done    int    `json:"done"`
	Pending int    `json:"pending"`
	Failed  int    `json:"failed"`
	Skipped int    `json:"skipped"`
}

Stats represents queue status break down by type.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL