Documentation
¶
Overview ¶
Package bsjob is a wrapper around beanspike jobs
Index ¶
- Constants
- Variables
- type Client
- func (c *Client) Close()
- func (c *Client) Connect(host string, port int, statsHandler func(string, string, float64)) error
- func (c *Client) Put(tubeID string, v interface{}, metadata string, tob int64) (int64, error)
- func (c *Client) StartReserveAndKeepLoop(id TubeID, dec JobDecoder, handler JobHandler, batchSize int) error
- type Conn
- type ErrorLogger
- type ErrorLoggerFunc
- type Job
- func (job *Job) Bury(reason string) error
- func (job *Job) Delete() (bool, error)
- func (job *Job) Monitor() chan struct{}
- func (job *Job) Release(delay time.Duration) error
- func (job *Job) ReleaseWithRetry(delay time.Duration, incr, retryFlag bool) error
- func (job *Job) RetryCount() int
- func (job *Job) String() string
- func (job *Job) Touch() error
- type JobDecoder
- type JobDecoderFunc
- type JobHandler
- type JobHandlerFunc
- type ManagedJob
- type Stats
- type Tube
- func (tube *Tube) BumpDelayedEntries() (int, error)
- func (tube *Tube) BumpReservedEntries() (int, error)
- func (tube *Tube) Bury(id int64, reason []byte) error
- func (tube *Tube) Delete(id int64) (bool, error)
- func (tube *Tube) KickJob(id int64) error
- func (tube *Tube) PeekBuried() (id int64, body []byte, ttr time.Duration, reason []byte, err error)
- func (tube *Tube) Put(body []byte, delay time.Duration, ttr time.Duration, lz bool, metadata string, ...) (id int64, err error)
- func (tube *Tube) Release(id int64, delay time.Duration) error
- func (tube *Tube) ReleaseWithRetry(id int64, delay time.Duration, incr, retryFlag bool) error
- func (tube *Tube) Reserve() (id int64, body []byte, ttr time.Duration, retries int, retryFlag bool, ...)
- func (tube *Tube) ReserveBatch(ctx context.Context, dec JobDecoder, h JobHandler, batchSize int) (reserved int)
- func (tube *Tube) SetRetries(maxRetries int, sleepBetweenRetries time.Duration, sleepMultiplier float64)
- func (tube *Tube) Stats() (s *Stats, err error)
- func (tube *Tube) Touch(id int64) error
- func (tube *Tube) UpdateJobBody(id int64, body []byte, lz bool) error
- type TubeID
Examples ¶
Constants ¶
const ( IdleInTubeTime = 2 * time.Second OutTubeTTR = 30 * time.Second )
const ( AerospikeHost = "aerospike-exports" AerospikePort = 3000 AerospikeHostEnv = "AEROSPIKE_HOST" AerospikePortEnv = "AEROSPIKE_PORT" AerospikeNamespace = "beanspike" CompressionSizeThreshold = 1024 * 2 // run admin operations at most 1/10 seconds AerospikeAdminDelay = 2 AerospikeAdminScanSize = 2500 AerospikeMetadataSet = "metadata" // Bin names. Don't exceed 14 chars AerospikeNameStatus = "status" AerospikeNameBody = "body" AerospikeNameBy = "by" AerospikeNameReason = "reason" AerospikeNameDelay = "delay" AerospikeNameDelayValue = "seconds" // these are not used, review AerospikeNameTtr = "ttr" AerospikeNameTtrKey = "ttrkey" AerospikeNameTtrValue = "seconds" // these are not used, review AerospikeNameCompressedSize = "csize" AerospikeNameSize = "size" AerospikeNameRetries = "retries" AerospikeNameRetryFlag = "retryflag" AerospikeNameMetadata = "metadata" AerospikeNameBuriedMetadata = "buried_meta" AerospikeNameToB = "tob" // For the scan policy AerospikeQueryQueueSize = 4 // For bin AerospikeNameStatus AerospikeSymReady = "READY" AerospikeSymReserved = "RESERVED" AerospikeSymReservedTtr = "RESERVEDTTR" AerospikeSymBuried = "BURIED" AerospikeSymDelayed = "DELAYED" AerospikeSymDeleted = "DELETED" AerospikeKeySuffixTtr = "reservedttr" AerospikeKeySuffixDelayed = "delayed" )
const DefaultJobIDBatchSize = 100
const IDOverflowThreshold = math.MaxInt64 - 900*(math.MaxInt64/(100000000*60*60*24*365))
The current projection is ~2900 years if we start incrementing the counter in 100M steps per second, so the threshold around the value for 2000y should give us both a signal we are growing quickly and buying time to solve the problem.
Variables ¶
var ( ErrAlreadyConnected = errors.New("already connected") ErrNotConnected = errors.New("not connected") )
var ( ErrEmptyRecord = errors.New("ASSERT: Record empty") ErrNotBuriedOrDelayed = errors.New("Job is not buried or delayed") ErrJobNotFound = errors.New("Job not found") ErrDecodingError = errors.New("decoding error") )
var ErrParsing = errors.New("error parsing body")
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) StartReserveAndKeepLoop ¶ added in v1.0.0
func (c *Client) StartReserveAndKeepLoop(id TubeID, dec JobDecoder, handler JobHandler, batchSize int) error
Example ¶
log.SetFlags(0) beanspike.DefaultErrorLogger = beanspike.ErrorLoggerFunc(func(err error) { log.Printf("ERROR %s", err) }) client := beanspike.NewClient(context.TODO()) if err := client.Connect("localhost", 3000, func(string, string, float64) {}); err != nil { log.Fatal("dial: ", err) } decoder := beanspike.JobDecoderFunc(func(_ []byte) (interface{}, error) { return nil, nil }) reserved := make(chan struct{}, 1) var numReserved, numReleased int32 handler := beanspike.JobHandlerFunc(func(ctx context.Context, job *beanspike.ManagedJob, _ interface{}) { go func() { <-ctx.Done() //log.Printf("job %d has been released", job.ID) atomic.AddInt32(&numReleased, 1) }() //log.Printf("job %d has been reserved", job.ID) reserved <- struct{}{} atomic.AddInt32(&numReserved, 1) }) if err := client.StartReserveAndKeepLoop("jmappush_google_stream_in", decoder, handler, 500); err != nil { log.Fatal("failed to start reserve-n-keep loop: ", err) } sinceStart := func(round time.Duration) func() time.Duration { start := time.Now() return func() time.Duration { return time.Since(start).Round(round) } }(time.Second) done := make(chan struct{}) go func() { n := 0 for { select { case <-reserved: n++ if n%200 == 0 { log.Printf("reserved %d in %s", n, sinceStart()) } if n > 4000 { log.Printf("all jobs reserved in %s", sinceStart()) close(done) return } } } }() sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) select { case <-sigs: log.Printf("interrupted") case <-done: log.Printf("done; reserved %d, released %d", atomic.LoadInt32(&numReserved), atomic.LoadInt32(&numReleased)) } client.Close() log.Printf("client.Close(); reserved %d, released %d", atomic.LoadInt32(&numReserved), atomic.LoadInt32(&numReleased)) log.Printf("finished in %s", sinceStart()) // NOTE I do not use golang's Output as timing and actual number of jobs may and will vary, // however one can expect output like this // // reserved 200 in 2s // reserved 400 in 2s // reserved 600 in 3s // reserved 800 in 3s // reserved 1000 in 3s // reserved 1200 in 3s // reserved 1400 in 3s // reserved 1600 in 4s // reserved 1800 in 4s // reserved 2000 in 4s // reserved 2200 in 4s // reserved 2400 in 4s // reserved 2600 in 4s // reserved 2800 in 5s // reserved 3000 in 5s // reserved 3200 in 5s // reserved 3400 in 5s // reserved 3600 in 5s // reserved 3800 in 5s // reserved 4000 in 5s // all jobs reserved in 6s // done; reserved 4001, released 0 // client.Close(); reserved 4001, released 4001 // finished in 7s
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
func DialDefaultWithRetry ¶
func (*Conn) SetCollector ¶ added in v1.0.3
SetCollector set the collector to use for stats TODO: it is a kludge for collecting scan metrics and should be refactored
func (*Conn) SetIDBatchSize ¶ added in v1.1.1
type ErrorLogger ¶ added in v1.0.0
type ErrorLogger interface {
LogError(error)
}
ErrorLogger defines error logging interface app can implement to catch errors
var DefaultErrorLogger ErrorLogger = ErrorLoggerFunc(func(err error) {})
TODO consider replacing this variable with "Options" pattern.
type ErrorLoggerFunc ¶ added in v1.0.0
type ErrorLoggerFunc func(error)
The ErrorLoggerFunc type is an adapter to allow the use of ordinary functions as error logger. If f is a function with the appropriate signature, ErrorLoggerFunc(f) is a ErrorLogger that calls f.
func (ErrorLoggerFunc) LogError ¶ added in v1.0.0
func (f ErrorLoggerFunc) LogError(err error)
LogError calls f(err)
type Job ¶
type Job struct { ID int64 Body []byte TTR time.Duration Retries int RetryFlag bool Tube *Tube ToB int64 }
Job represents a Beanspike Job
func (*Job) ReleaseWithRetry ¶
func (*Job) RetryCount ¶
type JobDecoder ¶ added in v1.0.0
type JobDecoder interface { // Decode unmarshalls job's payload. Decode([]byte) (interface{}, error) }
JobDecoder defines interface app should implement for decoding incoming jobs into the app specific objects.
type JobDecoderFunc ¶ added in v1.0.0
func (JobDecoderFunc) Decode ¶ added in v1.0.0
func (f JobDecoderFunc) Decode(b []byte) (interface{}, error)
type JobHandler ¶ added in v1.0.0
type JobHandler interface { // Handle processes an incoming job. // Application should watch given context; it is being canceled on job release. Handle(context.Context, *ManagedJob, interface{}) }
type JobHandlerFunc ¶ added in v1.0.0
type JobHandlerFunc func(context.Context, *ManagedJob, interface{})
func (JobHandlerFunc) Handle ¶ added in v1.0.0
func (f JobHandlerFunc) Handle(ctx context.Context, job *ManagedJob, v interface{})
type ManagedJob ¶ added in v1.0.0
type ManagedJob struct { *Job // contains filtered or unexported fields }
func NewManagedJob ¶ added in v1.0.0
func NewManagedJob(job *Job, cancel func()) *ManagedJob
func (*ManagedJob) StopAndRelease ¶ added in v1.0.0
func (job *ManagedJob) StopAndRelease()
StopAndRelease provides application way to stop job keeping activity and release the job
type Tube ¶
func (*Tube) BumpDelayedEntries ¶ added in v0.1.11
func (*Tube) BumpReservedEntries ¶ added in v0.1.10
func (*Tube) PeekBuried ¶
func (*Tube) ReleaseWithRetry ¶
func (*Tube) ReserveBatch ¶
func (tube *Tube) ReserveBatch(ctx context.Context, dec JobDecoder, h JobHandler, batchSize int) (reserved int)
ReserveBatch reserves at most max jobs and returns number of reserved jobs. ReserveBatch uses JobDecoder for unmarshalling a job payload and pass it to the given JobHandler. Any errors are being logged with DefaultErrorLogger.
Example ¶
log.SetFlags(0) conn, err := beanspike.Dial("", "localhost", 3000, func(string, string, float64) {}) if err != nil { log.Fatal("dial ", err) } tube, err := conn.Use("jmappush_google_stream_in") if err != nil { log.Fatal("use ", err) } beanspike.DefaultErrorLogger = beanspike.ErrorLoggerFunc(func(err error) { log.Printf("ERROR %s\n", err) }) decoder := beanspike.JobDecoderFunc(func(_ []byte) (interface{}, error) { return nil, nil }) handler := beanspike.JobHandlerFunc(func(_ context.Context, _ *beanspike.ManagedJob, _ interface{}) {}) sinceStart := func(round time.Duration) func() time.Duration { start := time.Now() return func() time.Duration { return time.Since(start).Round(round) } }(time.Second) var reserved, cycles, emptyCycles int ctx := context.TODO() for { cycles++ n := tube.ReserveBatch(ctx, decoder, handler, 500) log.Printf("Reserved %d\n", n) if n == 0 { emptyCycles++ if emptyCycles == 3 { break } log.Printf("Zzzz... elapsed %s\n", sinceStart()) time.Sleep(2 * time.Second) continue } else { emptyCycles = 0 } reserved += n } log.Printf("Total reserved %d in %s and %d cycles\n", reserved, sinceStart(), cycles) // NOTE I do not use golang's Output as timing and actual number of jobs may and will vary, // however one can expect output like this // // Reserved 500 // Reserved 500 // Reserved 500 // Reserved 500 // Reserved 500 // Reserved 500 // Reserved 500 // Reserved 500 // Reserved 123 // Reserved 0 // Zzzz... elapsed 14s // Reserved 0 // Zzzz... elapsed 16s // Reserved 0 // Total reserved 4123 in 19s and 12 cycles
func (*Tube) SetRetries ¶ added in v1.1.1
func (tube *Tube) SetRetries(maxRetries int, sleepBetweenRetries time.Duration, sleepMultiplier float64)
SetRetries sets the retry policy used in Put function.
func (*Tube) UpdateJobBody ¶ added in v1.0.4
UpdateJobBody updates the body of a job without any other changes to the job metadata. This function supposed to be called only if job is already "owned" by the client and it (client) is going to keep it. The use case is update job after refreshing JWE token as result of the key rotation.