disque

package module
v0.0.0-...-8b96326 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2020 License: MIT Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Disque

type Disque struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(o *redis.Options) *Disque

NewClient creates a new Disque client

func (*Disque) AckJob

func (me *Disque) AckJob(ctx context.Context, ID ...string) (err error)

AckJob command acknowledges the execution of one or more jobs via job IDs.

func (*Disque) AddJob

func (me *Disque) AddJob(ctx context.Context, queueName string, data string, options map[string]string) (ID string, err error)

AddJob adds a job onto a Disque

ADDJOB queue_name job <ms-timeout>

[REPLICATE <count>]
[DELAY <sec>]
[RETRY <sec>]
[TTL <sec>]
[MAXLEN <count>]
[ASYNC]

Example:

options := make(map[string]string)
options["ms-timeout"] = msec
options["TTL"] = "600"
options["ASYNC"] = "true"
d.AddJob("queue_name", "data", options)

func (*Disque) DeleteJob

func (me *Disque) DeleteJob(ctx context.Context, ID ...string) (err error)

DeleteJob completely delete jobs from the queue

func (*Disque) DequeueJob

func (me *Disque) DequeueJob(ctx context.Context, ID ...string) (err error)

DequeueJob command removes one or more jobs via job IDs from the queue.

func (*Disque) EnqueueJob

func (me *Disque) EnqueueJob(ctx context.Context, ID ...string) (err error)

EnqueueJob command queues jobs if not already queued one or more jobs via job IDs.

func (*Disque) FastAckJob

func (me *Disque) FastAckJob(ctx context.Context, ID ...string) (err error)

FastAckJob command acknowledges the execution of one or more jobs via job IDs.

func (*Disque) GetJob

func (me *Disque) GetJob(ctx context.Context, timeout time.Duration, queueName ...string) (job *Job, err error)

GetJob a single job from a Disque queue.

func (*Disque) GetJobDetails

func (me *Disque) GetJobDetails(ctx context.Context, ID string) (details *JobDetails, err error)

GetJobDetails will retrieve multiple jobs from a Disque queue.

func (*Disque) GetJobs

func (me *Disque) GetJobs(ctx context.Context, count int, timeout time.Duration, queueName ...string) (jobs []*Job, err error)

GetJobs will retrieve multiple jobs from a Disque queue.

func (*Disque) JobScan

func (me *Disque) JobScan(ctx context.Context, queueName string, count int, idOnly bool) (jobs []*JobDetails, err error)

JobScan scans queue for jobs count is the number of jobs to return -1 == all, 0 == default (currently 100), > 0 count WARNING: idOnly must currently be set to true, otherwise an error is reported

func (*Disque) NackJob

func (me *Disque) NackJob(ctx context.Context, ID ...string) (err error)

NackJob command tells Disque to put the job back in the queue ASAP

func (*Disque) QueueLength

func (me *Disque) QueueLength(ctx context.Context, queueName string) (queueLength int64, err error)

QueueLength will return the number of items in specified queue

func (*Disque) QueueList

func (me *Disque) QueueList(ctx context.Context) (queues []string, err error)

QueueList will return a list of all Disque queues

func (*Disque) QueueStat

func (me *Disque) QueueStat(ctx context.Context, queueName string) (stats *QueueDetails, err error)

QueueStat will return the statistics for specified queue

func (*Disque) WorkingJob

func (me *Disque) WorkingJob(ctx context.Context, ID string) (postponed int64, err error)

WorkingJob command The next delivery is postponed for the job retry time, however the command works in a best effort way since there is no way to guarantee during failures that another node in a different network partition won't perform a delivery of the same job. Another limitation of the WORKING command is that it cannot be sent to nodes not knowing about this particular job. In such a case the command replies with a NOJOB error. Similarly, if the job is already acknowledged an error is returned. Note that the WORKING command is refused by Disque nodes if 50% of the job time to live has already elapsed. This limitation makes Disque safer since usually the retry time is much smaller than the time-to-live of a job, so it can't happen that a set of broken workers monopolize a job with WORKING and never process it. After 50% of the TTL has elapsed, the job will be delivered to other workers anyway.

Note that WORKING returns the number of seconds you (likely) postponed the message visibility for other workers (the command basically returns the retry time of the job)

type Job

type Job struct {
	ID            string
	Data          string
	QueueName     string
	NackCount     int64
	DeliveryCount int64
}

Job represents a Disque job

type JobDetails

type JobDetails struct {
	ID            string
	Data          string
	QueueName     string
	NackCount     int64
	DeliveryCount int64

	State             string
	ReplicationFactor int
	TTL               time.Duration
	CreatedAt         time.Time
	Delay             time.Duration
	Retry             time.Duration
	NodesDelivered    []string
	NodesConfirmed    []string
	NextRequeueWithin time.Duration
	NextAwakeWithin   time.Duration
}

JobDetails contains details for a specific Disque job

type QueueDetails

type QueueDetails struct {
	QueueName  string
	Length     int64
	Age        int64
	Idle       int64
	Blocked    int64
	ImportFrom []string
	ImportRate int64
	JobsIn     int64
	JobsOut    int64
	Pause      string
}

QueueDetails contains details for a specific Disque queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL