beanspike

package module
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2018 License: BSD-3-Clause Imports: 17 Imported by: 0

README

Beanspike

Beanstalk inspired job queue backed by Aerospike KVS.

Basic usage

Put
conn, _ := DialDefault()
tube, _ := conn.Use("testtube")
id, err := tube.Put([]byte("hello"), 0, 0, true)

Put(body []byte, delay time.Duration, ttr time.Duration, lz bool) where delay will hold the job in a delayed mode before releasing it for processing. Note, in practice all delayed jobs will be behind any immediately available jobs in the tube. ttr is the time the consumer of the job must Touch the task within or have it return to the ready state. If lz is true, the action will try and compress the payload trading some performance for memory usage on Aerospike. Note, if the payload is too small or the LZ4 compression is unable to provide a space saving, the entry may be inserted uncompressed anyway. Passing true is largely safe from a performance prespective unless you know that body is large and high entorpy. Cases where compression was attempted and abandoned in currently present jobs are reflected in Stats.SkippedSize.

Reserve & Delete
conn, _ := DialDefault()
tube, _ := conn.Use("testtube")
id, body, ttl, err := tube.Reserve()
....
exists, err := tube.Delete(id)
Touch
err = tube.Touch(id)

If a job is Put in the tube with a ttr, the job must be touched at some period smaller than this value to keep it reserved. An error on Touch should be treated as an instruction to abandon the job because it has been deleted or has timed out.

Stats
conn, _ := DialDefault()
tube, _ := conn.Use("testtube")
stats, err := tube.Stats()
Delete a tube

This is mainly for unit test clean up but there may be a legitimate usecase.

conn, _ := DialDefault()
conn.Delete("testtube")

Connection information

If DialDefault() is used, various reasonable defaults are set.

The client uses environment variables AEROSPIKE_HOST and AEROSPIKE_PORT to connect to an Aerospike node.

The library will attempt to parse AEROSPIKE_PORT either as a port number or a Docker environment variable as might be set if the the application is in a container that has been linked to an Aerospike conatiner with --link aerospike:aerospike.

If nothing is specified, the client assumes aerospike and 3000

The alternative Dial(id string, host string, port int) requires everything to be set explicitly.

Unit Tests

$ gpm
$ source gvp
$ go test

Benchmarks

$ go test --bench=.
...
BenchmarkPut	    	2000	    589034 ns/op
BenchmarkReserve	     200	  14759261 ns/op
BenchmarkRelease	    3000	    607290 ns/op

TODO

  • Fix TTL related implementations.
  • Messaging implementation for Reserve.
  • Possibly review stats. Review UDF lifecycle management.
  • Kick and Bury unit tests

Documentation

Overview

Package bsjob is a wrapper around beanspike jobs

Index

Constants

View Source
const (
	IdleInTubeTime = 2 * time.Second
	OutTubeTTR     = 30 * time.Second
)
View Source
const (
	AerospikeHost      = "aerospike"
	AerospikePort      = 3000
	AerospikeHostEnv   = "AEROSPIKE_HOST"
	AerospikePortEnv   = "AEROSPIKE_PORT"
	AerospikeNamespace = "beanspike"

	CompressionSizeThreshold = 1024 * 2

	// run admin operations at most 1/10 seconds
	AerospikeAdminDelay    = 2
	AerospikeAdminScanSize = 2500

	AerospikeMetadataSet = "metadata"

	// Bin names. Don't exceed 14 chars
	AerospikeNameStatus         = "status"
	AerospikeNameBody           = "body"
	AerospikeNameBy             = "by"
	AerospikeNameReason         = "reason"
	AerospikeNameDelay          = "delay"
	AerospikeNameDelayValue     = "seconds" // these are not used, review
	AerospikeNameTtr            = "ttr"
	AerospikeNameTtrKey         = "ttrkey"
	AerospikeNameTtrValue       = "seconds" // these are not used, review
	AerospikeNameCompressedSize = "csize"
	AerospikeNameSize           = "size"
	AerospikeNameRetries        = "retries"
	AerospikeNameRetryFlag      = "retryflag"
	AerospikeNameMetadata       = "metadata"
	AerospikeNameBuriedMetadata = "buried_meta"

	// For the scan policy
	AerospikeQueryQueueSize = 4

	// For bin AerospikeNameStatus
	AerospikeSymReady       = "READY"
	AerospikeSymReserved    = "RESERVED"
	AerospikeSymReservedTtr = "RESERVEDTTR"
	AerospikeSymBuried      = "BURIED"
	AerospikeSymDelayed     = "DELAYED"
	AerospikeSymDeleted     = "DELETED"

	AerospikeKeySuffixTtr     = "reservedttr"
	AerospikeKeySuffixDelayed = "delayed"
)

Variables

View Source
var (
	ErrAlreadyConnected = errors.New("already connected")
	ErrNotConnected     = errors.New("not connected")
)
View Source
var (
	ErrEmptyRecord        = errors.New("ASSERT: Record empty")
	ErrNotBuriedOrDelayed = errors.New("Job is not buried or delayed")
	ErrJobNotFound        = errors.New("Job not found")
)
View Source
var ErrParsing = errors.New("error parsing body")

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(ctx context.Context) *Client

func (*Client) Close

func (c *Client) Close()

func (*Client) Connect

func (c *Client) Connect(statsHandler func(string, string, float64)) error

func (*Client) Handle

func (c *Client) Handle(id TubeID, h TaskHandler) error

func (*Client) Put

func (c *Client) Put(tubeID string, v interface{}, metadata string) (int64, error)

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

func Dial

func Dial(id string, host string, port int, statsHandler func(string, string, float64)) (*Conn, error)

func DialDefault

func DialDefault(statsHandler func(string, string, float64)) (*Conn, error)

func DialDefaultWithRetry

func DialDefaultWithRetry(statsHandler func(string, string, float64)) *Conn

func (*Conn) Delete

func (conn *Conn) Delete(name string) error

Any Tubes that reference this name should be discarded after this operation

func (*Conn) Use

func (conn *Conn) Use(name string) (*Tube, error)

Note, index limits may cause this to fail

type Job

type Job struct {
	ID        int64
	Body      []byte
	TTR       time.Duration
	Retries   int
	RetryFlag bool
	Tube      *Tube
}

Job represents a Beanspike Job

func Reserve

func Reserve(tube *Tube) (*Job, error)

func (*Job) Bury

func (job *Job) Bury(reason string) error

func (*Job) Delete

func (job *Job) Delete() (bool, error)

func (*Job) Monitor

func (job *Job) Monitor() chan struct{}

func (*Job) Release

func (job *Job) Release(delay time.Duration) error

func (*Job) ReleaseWithRetry

func (job *Job) ReleaseWithRetry(delay time.Duration, incr, retryFlag bool) error

func (*Job) RetryCount

func (job *Job) RetryCount() int

func (*Job) String

func (job *Job) String() string

func (*Job) Touch

func (job *Job) Touch() error

type Stats

type Stats struct {
	Jobs        int
	Ready       int
	Buried      int
	Delayed     int
	Reserved    int
	Deleted     int
	JobSize     int
	UsedSize    int
	SkippedSize int
}

type TaskHandler

type TaskHandler interface {
	// ReserveAndDecode reserve a task and decode it to appropriate value type
	ReserveAndDecode(*Tube) (*Job, interface{}, error)
	// Handle processes incoming message.
	// Calling context.CancelFunc stops all underlying goroutines and release a beanspike job
	Handle(context.CancelFunc, interface{})
	// OnRelease will be called when handler context canceled
	OnRelease(interface{})
}

type Tube

type Tube struct {
	Conn *Conn
	Name string
	// contains filtered or unexported fields
}

func (*Tube) Bury

func (tube *Tube) Bury(id int64, reason []byte) error

func (*Tube) Delete

func (tube *Tube) Delete(id int64) (bool, error)

func (*Tube) KickJob

func (tube *Tube) KickJob(id int64) error

Job does not have to be reserved by this client

func (*Tube) PeekBuried

func (tube *Tube) PeekBuried() (id int64, body []byte, ttr time.Duration, reason []byte, err error)

func (*Tube) Put

func (tube *Tube) Put(body []byte, delay time.Duration, ttr time.Duration, lz bool,
	metadata string) (id int64, err error)

func (*Tube) Release

func (tube *Tube) Release(id int64, delay time.Duration) error

func (*Tube) ReleaseWithRetry

func (tube *Tube) ReleaseWithRetry(id int64, delay time.Duration, incr, retryFlag bool) error

func (*Tube) Reserve

func (tube *Tube) Reserve() (id int64, body []byte, ttr time.Duration, retries int, retryFlag bool,
	metadata string, err error)

func (*Tube) ReserveBatch

func (tube *Tube) ReserveBatch(batchSize int) (jobs []*Job, err error)

func (*Tube) Stats

func (tube *Tube) Stats() (s *Stats, err error)

func (*Tube) Touch

func (tube *Tube) Touch(id int64) error

type TubeID

type TubeID string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL