beanspike

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 19, 2024 License: BSD-3-Clause Imports: 23 Imported by: 0

README

Beanspike

Beanstalk inspired job queue backed by Aerospike KVS.

Basic usage

Put
conn, _ := DialDefault()
tube, _ := conn.Use("testtube")
id, err := tube.Put([]byte("hello"), 0, 0, true)

Put(body []byte, delay time.Duration, ttr time.Duration, lz bool) where delay will hold the job in a delayed mode before releasing it for processing. Note, in practice all delayed jobs will be behind any immediately available jobs in the tube. ttr is the time the consumer of the job must Touch the task within or have it return to the ready state. If lz is true, the action will try and compress the payload trading some performance for memory usage on Aerospike. Note, if the payload is too small or the LZ4 compression is unable to provide a space saving, the entry may be inserted uncompressed anyway. Passing true is largely safe from a performance prespective unless you know that body is large and high entorpy. Cases where compression was attempted and abandoned in currently present jobs are reflected in Stats.SkippedSize.

Reserve & Delete
conn, _ := DialDefault()
tube, _ := conn.Use("testtube")
id, body, ttl, err := tube.Reserve()
....
exists, err := tube.Delete(id)
Touch
err = tube.Touch(id)

If a job is Put in the tube with a ttr, the job must be touched at some period smaller than this value to keep it reserved. An error on Touch should be treated as an instruction to abandon the job because it has been deleted or has timed out.

Stats
conn, _ := DialDefault()
tube, _ := conn.Use("testtube")
stats, err := tube.Stats()
Delete a tube

This is mainly for unit test clean up but there may be a legitimate usecase.

conn, _ := DialDefault()
conn.Delete("testtube")

Connection information

If DialDefault() is used, various reasonable defaults are set.

The client uses environment variables AEROSPIKE_HOST and AEROSPIKE_PORT to connect to an Aerospike node.

The library will attempt to parse AEROSPIKE_PORT either as a port number or a Docker environment variable as might be set if the the application is in a container that has been linked to an Aerospike conatiner with --link aerospike:aerospike.

If nothing is specified, the client assumes aerospike and 3000

The alternative Dial(id string, host string, port int) requires everything to be set explicitly.

Unit Tests

$ gpm
$ source gvp
$ go test

Benchmarks

$ go test --bench=.
...
BenchmarkPut	    	2000	    589034 ns/op
BenchmarkReserve	     200	  14759261 ns/op
BenchmarkRelease	    3000	    607290 ns/op

TODO

  • Fix TTL related implementations.
  • Messaging implementation for Reserve.
  • Possibly review stats. Review UDF lifecycle management.
  • Kick and Bury unit tests

Documentation

Overview

Package bsjob is a wrapper around beanspike jobs

Index

Examples

Constants

View Source
const (
	IdleInTubeTime = 2 * time.Second
	OutTubeTTR     = 30 * time.Second
)
View Source
const (
	AerospikeHost      = "aerospike-exports"
	AerospikePort      = 3000
	AerospikeHostEnv   = "AEROSPIKE_HOST"
	AerospikePortEnv   = "AEROSPIKE_PORT"
	AerospikeNamespace = "beanspike"

	CompressionSizeThreshold = 1024 * 2

	// run admin operations at most 1/10 seconds
	AerospikeAdminDelay    = 2
	AerospikeAdminScanSize = 2500

	AerospikeMetadataSet = "metadata"

	// Bin names. Don't exceed 14 chars
	AerospikeNameStatus         = "status"
	AerospikeNameBody           = "body"
	AerospikeNameBy             = "by"
	AerospikeNameReason         = "reason"
	AerospikeNameDelay          = "delay"
	AerospikeNameDelayValue     = "seconds" // these are not used, review
	AerospikeNameTtr            = "ttr"
	AerospikeNameTtrKey         = "ttrkey"
	AerospikeNameTtrValue       = "seconds" // these are not used, review
	AerospikeNameCompressedSize = "csize"
	AerospikeNameSize           = "size"
	AerospikeNameRetries        = "retries"
	AerospikeNameRetryFlag      = "retryflag"
	AerospikeNameMetadata       = "metadata"
	AerospikeNameBuriedMetadata = "buried_meta"
	AerospikeNameToB            = "tob"

	// For the scan policy
	AerospikeQueryQueueSize = 4

	// For bin AerospikeNameStatus
	AerospikeSymReady       = "READY"
	AerospikeSymReserved    = "RESERVED"
	AerospikeSymReservedTtr = "RESERVEDTTR"
	AerospikeSymBuried      = "BURIED"
	AerospikeSymDelayed     = "DELAYED"
	AerospikeSymDeleted     = "DELETED"

	AerospikeKeySuffixTtr     = "reservedttr"
	AerospikeKeySuffixDelayed = "delayed"
)
View Source
const DefaultJobIDBatchSize = 100
View Source
const IDOverflowThreshold = math.MaxInt64 - 900*(math.MaxInt64/(100000000*60*60*24*365))

The current projection is ~2900 years if we start incrementing the counter in 100M steps per second, so the threshold around the value for 2000y should give us both a signal we are growing quickly and buying time to solve the problem.

Variables

View Source
var (
	ErrAlreadyConnected = errors.New("already connected")
	ErrNotConnected     = errors.New("not connected")
)
View Source
var (
	ErrEmptyRecord        = errors.New("ASSERT: Record empty")
	ErrNotBuriedOrDelayed = errors.New("Job is not buried or delayed")
	ErrJobNotFound        = errors.New("Job not found")
	ErrDecodingError      = errors.New("decoding error")
)
View Source
var ErrParsing = errors.New("error parsing body")

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(ctx context.Context) *Client

func (*Client) Close

func (c *Client) Close()

func (*Client) Connect

func (c *Client) Connect(host string, port int, statsHandler func(string, string, float64)) error

func (*Client) Put

func (c *Client) Put(tubeID string, v interface{}, metadata string, tob int64) (int64, error)

func (*Client) StartReserveAndKeepLoop added in v1.0.0

func (c *Client) StartReserveAndKeepLoop(id TubeID, dec JobDecoder, handler JobHandler, batchSize int) error
Example
log.SetFlags(0)

beanspike.DefaultErrorLogger = beanspike.ErrorLoggerFunc(func(err error) {
	log.Printf("ERROR %s", err)
})

client := beanspike.NewClient(context.TODO())

if err := client.Connect("localhost", 3000, func(string, string, float64) {}); err != nil {
	log.Fatal("dial: ", err)
}

decoder := beanspike.JobDecoderFunc(func(_ []byte) (interface{}, error) { return nil, nil })

reserved := make(chan struct{}, 1)

var numReserved, numReleased int32

handler := beanspike.JobHandlerFunc(func(ctx context.Context, job *beanspike.ManagedJob, _ interface{}) {
	go func() {
		<-ctx.Done()
		//log.Printf("job %d has been released", job.ID)
		atomic.AddInt32(&numReleased, 1)
	}()
	//log.Printf("job %d has been reserved", job.ID)
	reserved <- struct{}{}
	atomic.AddInt32(&numReserved, 1)
})

if err := client.StartReserveAndKeepLoop("jmappush_google_stream_in", decoder, handler, 500); err != nil {
	log.Fatal("failed to start reserve-n-keep loop: ", err)
}

sinceStart := func(round time.Duration) func() time.Duration {
	start := time.Now()
	return func() time.Duration { return time.Since(start).Round(round) }
}(time.Second)

done := make(chan struct{})

go func() {
	n := 0
	for {
		select {
		case <-reserved:
			n++
			if n%200 == 0 {
				log.Printf("reserved %d in %s", n, sinceStart())
			}
			if n > 4000 {
				log.Printf("all jobs reserved in %s", sinceStart())
				close(done)
				return
			}
		}
	}
}()

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

select {
case <-sigs:
	log.Printf("interrupted")
case <-done:
	log.Printf("done; reserved %d, released %d",
		atomic.LoadInt32(&numReserved), atomic.LoadInt32(&numReleased))
}

client.Close()

log.Printf("client.Close(); reserved %d, released %d",
	atomic.LoadInt32(&numReserved), atomic.LoadInt32(&numReleased))

log.Printf("finished in %s", sinceStart())

// NOTE I do not use golang's Output as timing and actual number of jobs may and will vary,
// however one can expect output like this
//
// reserved 200 in 2s
// reserved 400 in 2s
// reserved 600 in 3s
// reserved 800 in 3s
// reserved 1000 in 3s
// reserved 1200 in 3s
// reserved 1400 in 3s
// reserved 1600 in 4s
// reserved 1800 in 4s
// reserved 2000 in 4s
// reserved 2200 in 4s
// reserved 2400 in 4s
// reserved 2600 in 4s
// reserved 2800 in 5s
// reserved 3000 in 5s
// reserved 3200 in 5s
// reserved 3400 in 5s
// reserved 3600 in 5s
// reserved 3800 in 5s
// reserved 4000 in 5s
// all jobs reserved in 6s
// done; reserved 4001, released 0
// client.Close(); reserved 4001, released 4001
// finished in 7s

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

func Dial

func Dial(id string, host string, port int, statsHandler func(string, string, float64)) (*Conn, error)

func DialDefault

func DialDefault(statsHandler func(string, string, float64)) (*Conn, error)

func DialDefaultWithRetry

func DialDefaultWithRetry(statsHandler func(string, string, float64)) *Conn

func (*Conn) Delete

func (conn *Conn) Delete(name string) error

Any Tubes that reference this name should be discarded after this operation

func (*Conn) SetCollector added in v1.0.3

func (conn *Conn) SetCollector(collector stats.Collector)

SetCollector set the collector to use for stats TODO: it is a kludge for collecting scan metrics and should be refactored

func (*Conn) SetIDBatchSize added in v1.1.1

func (conn *Conn) SetIDBatchSize(size int64)

func (*Conn) Use

func (conn *Conn) Use(name string) (*Tube, error)

Note, index limits may cause this to fail

type ErrorLogger added in v1.0.0

type ErrorLogger interface {
	LogError(error)
}

ErrorLogger defines error logging interface app can implement to catch errors

var DefaultErrorLogger ErrorLogger = ErrorLoggerFunc(func(err error) {})

TODO consider replacing this variable with "Options" pattern.

type ErrorLoggerFunc added in v1.0.0

type ErrorLoggerFunc func(error)

The ErrorLoggerFunc type is an adapter to allow the use of ordinary functions as error logger. If f is a function with the appropriate signature, ErrorLoggerFunc(f) is a ErrorLogger that calls f.

func (ErrorLoggerFunc) LogError added in v1.0.0

func (f ErrorLoggerFunc) LogError(err error)

LogError calls f(err)

type Job

type Job struct {
	ID        int64
	Body      []byte
	TTR       time.Duration
	Retries   int
	RetryFlag bool
	Tube      *Tube
	ToB       int64
}

Job represents a Beanspike Job

func Reserve

func Reserve(tube *Tube) (*Job, error)

func (*Job) Bury

func (job *Job) Bury(reason string) error

func (*Job) Delete

func (job *Job) Delete() (bool, error)

func (*Job) Monitor

func (job *Job) Monitor() chan struct{}

func (*Job) Release

func (job *Job) Release(delay time.Duration) error

func (*Job) ReleaseWithRetry

func (job *Job) ReleaseWithRetry(delay time.Duration, incr, retryFlag bool) error

func (*Job) RetryCount

func (job *Job) RetryCount() int

func (*Job) String

func (job *Job) String() string

func (*Job) Touch

func (job *Job) Touch() error

type JobDecoder added in v1.0.0

type JobDecoder interface {
	// Decode unmarshalls job's payload.
	Decode([]byte) (interface{}, error)
}

JobDecoder defines interface app should implement for decoding incoming jobs into the app specific objects.

type JobDecoderFunc added in v1.0.0

type JobDecoderFunc func([]byte) (interface{}, error)

func (JobDecoderFunc) Decode added in v1.0.0

func (f JobDecoderFunc) Decode(b []byte) (interface{}, error)

type JobHandler added in v1.0.0

type JobHandler interface {
	// Handle processes an incoming job.
	// Application should watch given context; it is being canceled on job release.
	Handle(context.Context, *ManagedJob, interface{})
}

type JobHandlerFunc added in v1.0.0

type JobHandlerFunc func(context.Context, *ManagedJob, interface{})

func (JobHandlerFunc) Handle added in v1.0.0

func (f JobHandlerFunc) Handle(ctx context.Context, job *ManagedJob, v interface{})

type ManagedJob added in v1.0.0

type ManagedJob struct {
	*Job
	// contains filtered or unexported fields
}

func NewManagedJob added in v1.0.0

func NewManagedJob(job *Job, cancel func()) *ManagedJob

func (*ManagedJob) StopAndRelease added in v1.0.0

func (job *ManagedJob) StopAndRelease()

StopAndRelease provides application way to stop job keeping activity and release the job

type Stats

type Stats struct {
	Jobs        int
	Ready       int
	Buried      int
	Delayed     int
	Reserved    int
	Deleted     int
	JobSize     int
	UsedSize    int
	SkippedSize int
}

type Tube

type Tube struct {
	Conn *Conn
	Name string
	// contains filtered or unexported fields
}

func (*Tube) BumpDelayedEntries added in v0.1.11

func (tube *Tube) BumpDelayedEntries() (int, error)

func (*Tube) BumpReservedEntries added in v0.1.10

func (tube *Tube) BumpReservedEntries() (int, error)

func (*Tube) Bury

func (tube *Tube) Bury(id int64, reason []byte) error

func (*Tube) Delete

func (tube *Tube) Delete(id int64) (bool, error)

func (*Tube) KickJob

func (tube *Tube) KickJob(id int64) error

Job does not have to be reserved by this client

func (*Tube) PeekBuried

func (tube *Tube) PeekBuried() (id int64, body []byte, ttr time.Duration, reason []byte, err error)

func (*Tube) Put

func (tube *Tube) Put(body []byte, delay time.Duration, ttr time.Duration, lz bool,
	metadata string, tob int64) (id int64, err error)

func (*Tube) Release

func (tube *Tube) Release(id int64, delay time.Duration) error

func (*Tube) ReleaseWithRetry

func (tube *Tube) ReleaseWithRetry(id int64, delay time.Duration, incr, retryFlag bool) error

func (*Tube) Reserve

func (tube *Tube) Reserve() (id int64, body []byte, ttr time.Duration, retries int, retryFlag bool, tob int64, err error)

func (*Tube) ReserveBatch

func (tube *Tube) ReserveBatch(ctx context.Context, dec JobDecoder, h JobHandler, batchSize int) (reserved int)

ReserveBatch reserves at most max jobs and returns number of reserved jobs. ReserveBatch uses JobDecoder for unmarshalling a job payload and pass it to the given JobHandler. Any errors are being logged with DefaultErrorLogger.

Example
log.SetFlags(0)

conn, err := beanspike.Dial("", "localhost", 3000, func(string, string, float64) {})
if err != nil {
	log.Fatal("dial ", err)
}

tube, err := conn.Use("jmappush_google_stream_in")
if err != nil {
	log.Fatal("use ", err)
}

beanspike.DefaultErrorLogger = beanspike.ErrorLoggerFunc(func(err error) {
	log.Printf("ERROR %s\n", err)
})

decoder := beanspike.JobDecoderFunc(func(_ []byte) (interface{}, error) { return nil, nil })

handler := beanspike.JobHandlerFunc(func(_ context.Context, _ *beanspike.ManagedJob, _ interface{}) {})

sinceStart := func(round time.Duration) func() time.Duration {
	start := time.Now()
	return func() time.Duration { return time.Since(start).Round(round) }
}(time.Second)

var reserved, cycles, emptyCycles int

ctx := context.TODO()

for {
	cycles++
	n := tube.ReserveBatch(ctx, decoder, handler, 500)

	log.Printf("Reserved %d\n", n)
	if n == 0 {
		emptyCycles++
		if emptyCycles == 3 {
			break
		}

		log.Printf("Zzzz... elapsed %s\n", sinceStart())
		time.Sleep(2 * time.Second)
		continue
	} else {
		emptyCycles = 0
	}

	reserved += n
}

log.Printf("Total reserved %d in %s and %d cycles\n", reserved, sinceStart(), cycles)

// NOTE I do not use golang's Output as timing and actual number of jobs may and will vary,
// however one can expect output like this
//
// Reserved 500
// Reserved 500
// Reserved 500
// Reserved 500
// Reserved 500
// Reserved 500
// Reserved 500
// Reserved 500
// Reserved 123
// Reserved 0
// Zzzz... elapsed 14s
// Reserved 0
// Zzzz... elapsed 16s
// Reserved 0
// Total reserved 4123 in 19s and 12 cycles

func (*Tube) SetRetries added in v1.1.1

func (tube *Tube) SetRetries(maxRetries int, sleepBetweenRetries time.Duration, sleepMultiplier float64)

SetRetries sets the retry policy used in Put function.

func (*Tube) Stats

func (tube *Tube) Stats() (s *Stats, err error)

func (*Tube) Touch

func (tube *Tube) Touch(id int64) error

func (*Tube) UpdateJobBody added in v1.0.4

func (tube *Tube) UpdateJobBody(id int64, body []byte, lz bool) error

UpdateJobBody updates the body of a job without any other changes to the job metadata. This function supposed to be called only if job is already "owned" by the client and it (client) is going to keep it. The use case is update job after refreshing JWE token as result of the key rotation.

type TubeID

type TubeID string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL