client

package
v1.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2021 License: MIT Imports: 12 Imported by: 0

README

Lmstfy client

Usage

Initialize
import github.com/bitleak/lmstfy/client

c := client.NewLmstfyClient(host, port, namespace, token)

c.ConfigRetry(3, 50) // optional, config the client to retry when some errors happened. retry 3 times with 50ms interval 
Producer example
// Publish a job with ttl==forever, tries==3, delay==5s
jobID, err := c.Publish("q1", []byte("hello"), 0, 3, 5)
Consumer example
// Consume a job from the q1, if there's not job availble, wait until 12s passed (polling).
// And if this consumer fail to ACK the job in 10s, the job can be retried by other consumers.
job, err := c.Consume("q1", 10, 12)
if err != nil {
    panic(err)
}

// Do something with the `job`
fmt.Println(string(job.Data))

err := c.Ack("q1", job.ID)
if err != nil {
    panic(err)
}
// Consume 5 jobs from the q1, if there's not job availble, wait until 12s passed (polling).
// If there are some jobs but not enough 5, return jobs as much as possible.
// And if this consumer fail to ACK any job in 10s, the job can be retried by other consumers.
jobs, err := c.BatchConsume("q1", 5, 10, 12)
if err != nil {
    panic(err)
}

// Do something with the `job`
for _, job := range jobs {
    fmt.Println(string(job.Data))
    err := c.Ack("q1", job.ID)
    if err != nil {
        panic(err)
    }
}

CAUTION: consume would return nil job and error when queue was empty or not found, you can enable the client option to return error when the job is nil by revoking EnableErrorOnNilJob() function.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type APIError

type APIError struct {
	Type      ErrType
	Reason    string
	JobID     string
	RequestID string
}

API error. implements the error interface.

func (*APIError) Error

func (e *APIError) Error() string

type ErrType

type ErrType int

API error type. implements the Stringer interface.

const (
	RequestErr ErrType = iota + 1
	ResponseErr
)

func (ErrType) String

func (t ErrType) String() string

type Job

type Job struct {
	Namespace   string `json:"namespace"`
	Queue       string `json:"queue"`
	Data        []byte `json:"data"`
	ID          string `json:"job_id"`
	TTL         int64  `json:"ttl"`
	ElapsedMS   int64  `json:"elapsed_ms"`
	RemainTries int64  `json:"remain_tries"`
}

type LmstfyClient

type LmstfyClient struct {
	Namespace string
	Token     string
	Host      string
	Port      int
	// contains filtered or unexported fields
}

func NewLmstfyClient

func NewLmstfyClient(host string, port int, namespace, token string) *LmstfyClient

func NewLmstfyWithClient added in v1.0.5

func NewLmstfyWithClient(cli *http.Client, host string, port int, namespace, token string) *LmstfyClient

NewLmstfyWithClient allow using user defined http client to setup the lmstfy client

func (*LmstfyClient) Ack

func (c *LmstfyClient) Ack(queue, jobID string) *APIError

Mark a job as finished, so it won't be retried by others.

func (*LmstfyClient) BatchConsume

func (c *LmstfyClient) BatchConsume(queues []string, count, ttrSecond, timeoutSecond uint32) (jobs []*Job, e error)

BatchConsume consume some jobs. Consuming will decrease these jobs tries by 1 first.

  • ttrSecond is the time-to-run of these jobs. If these jobs are not finished before the TTR expires, these job will be released for consuming again if the `(tries - 1) > 0`.
  • count is the job count of this consume. If it's zero or over 100, this method will return an error. If it's positive, this method would return some jobs, and it's count is between 0 and count.

func (*LmstfyClient) BatchConsumeWithFreezeTries added in v1.0.6

func (c *LmstfyClient) BatchConsumeWithFreezeTries(queues []string, count, ttrSecond, timeoutSecond uint32) (jobs []*Job, e error)

BatchConsume consume some jobs. Consuming with freeze tries will not decrease these jobs tries.

  • ttrSecond is the time-to-run of these jobs. If these jobs are not finished before the TTR expires, these job will be released for consuming again if the `(tries - 1) > 0`.
  • count is the job count of this consume. If it's zero or over 100, this method will return an error. If it's positive, this method would return some jobs, and it's count is between 0 and count.

func (*LmstfyClient) BatchPublish added in v1.0.3

func (c *LmstfyClient) BatchPublish(queue string, jobs []interface{}, ttlSecond uint32, tries uint16, delaySecond uint32) (jobIDs []string, e error)

BatchPublish publish lots of jobs at one time

  • ttlSecond is the time-to-live of the job. If it's zero, job won't expire; if it's positive, the value is the TTL.
  • tries is the maximum times the job can be fetched.
  • delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied.

func (*LmstfyClient) ConfigRetry

func (c *LmstfyClient) ConfigRetry(retryCount int, backOffMillisecond int)

func (*LmstfyClient) Consume

func (c *LmstfyClient) Consume(queue string, ttrSecond, timeoutSecond uint32) (job *Job, e error)

Consume a job. Consuming will decrease the job's tries by 1 first.

  • ttrSecond is the time-to-run of the job. If the job is not finished before the TTR expires, the job will be released for consuming again if the `(tries - 1) > 0`.
  • timeoutSecond is the long-polling wait time. If it's zero, this method will return immediately with or without a job; if it's positive, this method would polling for new job until timeout.

func (*LmstfyClient) ConsumeFromQueues

func (c *LmstfyClient) ConsumeFromQueues(ttrSecond, timeoutSecond uint32, queues ...string) (job *Job, e error)

Consume from multiple queues with priority. The order of the queues in the params implies the priority. eg.

ConsumeFromQueues(120, 5, "queue-a", "queue-b", "queue-c")

if all the queues have jobs to be fetched, the job in `queue-a` will be return.

func (*LmstfyClient) ConsumeFromQueuesWithFreezeTries added in v1.0.7

func (c *LmstfyClient) ConsumeFromQueuesWithFreezeTries(ttrSecond, timeoutSecond uint32, queues ...string) (job *Job, e error)

func (*LmstfyClient) ConsumeWithFreezeTries added in v1.0.6

func (c *LmstfyClient) ConsumeWithFreezeTries(queue string, ttrSecond, timeoutSecond uint32) (job *Job, e error)

ConsumeWithFreezeTries a job. Consuming with retries will not decrease the job's tries.

  • ttrSecond is the time-to-run of the job. If the job is not finished before the TTR expires, the job will be released for consuming again if the `(tries - 1) > 0`.
  • timeoutSecond is the long-polling wait time. If it's zero, this method will return immediately with or without a job; if it's positive, this method would polling for new job until timeout.

func (*LmstfyClient) DeleteDeadLetter added in v1.0.6

func (c *LmstfyClient) DeleteDeadLetter(queue string, limit int64) *APIError

func (*LmstfyClient) EnableErrorOnNilJob added in v1.0.3

func (c *LmstfyClient) EnableErrorOnNilJob()

EnableErrorOnNilJob would make the client return error when the job was nil(maybe queue was not found)

func (*LmstfyClient) PeekDeadLetter

func (c *LmstfyClient) PeekDeadLetter(queue string) (deadLetterSize int, deadLetterHead string, e *APIError)

Peek the deadletter of the queue

func (*LmstfyClient) PeekJob

func (c *LmstfyClient) PeekJob(queue, jobID string) (job *Job, e *APIError)

Peek a specific job data

func (*LmstfyClient) PeekQueue

func (c *LmstfyClient) PeekQueue(queue string) (job *Job, e *APIError)

Peek the job in the head of the queue

func (*LmstfyClient) Publish

func (c *LmstfyClient) Publish(queue string, data []byte, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error)

Publish a new job to the queue.

  • ttlSecond is the time-to-live of the job. If it's zero, job won't expire; if it's positive, the value is the TTL.
  • tries is the maximum times the job can be fetched.
  • delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied.

func (*LmstfyClient) QueueSize

func (c *LmstfyClient) QueueSize(queue string) (int, *APIError)

Get queue size. how many jobs are ready for consuming

func (*LmstfyClient) RePublish

func (c *LmstfyClient) RePublish(job *Job, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error)

RePublish delete(ack) the job of the queue and publish the job again.

  • ttlSecond is the time-to-live of the job. If it's zero, job won't expire; if it's positive, the value is the TTL.
  • tries is the maximum times the job can be fetched.
  • delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied.

func (*LmstfyClient) RespawnDeadLetter

func (c *LmstfyClient) RespawnDeadLetter(queue string, limit, ttlSecond int64) (count int, e *APIError)

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL