client

package
v0.0.0-...-6ea62f9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2021 License: BSD-3-Clause Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type APIError

type APIError struct {
	Type      ErrType
	Reason    string
	JobID     string
	RequestID string
}

API error. implements the error interface.

func (*APIError) Error

func (e *APIError) Error() string

type Consumer

type Consumer struct {
	*LmstfyClient

	ErrorCallback func(err error)
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(cfg *ConsumerConfig) *Consumer

NewConsumer would create a new instance of the high-level consumer

func (*Consumer) Close

func (c *Consumer) Close()

Close would stop all polling threads

func (*Consumer) Receive

func (c *Consumer) Receive(ctx context.Context, fn func(ctx context.Context, job *Job)) error

Receive would create threads and wait until all the threads finish.

type ConsumerConfig

type ConsumerConfig struct {
	Host      string
	Port      int
	Namespace string
	Token     string
	Queues    []string
	TTR       uint32

	Threads int
}

type ErrType

type ErrType int

API error type. implements the Stringer interface.

const (
	RequestErr ErrType = iota + 1
	ResponseErr
)

func (ErrType) String

func (t ErrType) String() string

type Job

type Job struct {
	*LmstfyClient

	Namespace string `json:"namespace"`
	Queue     string `json:"queue"`
	Data      []byte `json:"data"`
	ID        string `json:"job_id"`
	TTL       int64  `json:"ttl"`
	ElapsedMS int64  `json:"elapsed_ms"`
}

func (*Job) Ack

func (job *Job) Ack() *APIError

Ack allow users to ack the job

type LmstfyClient

type LmstfyClient struct {
	Namespace string
	Token     string
	Host      string
	Port      int
	// contains filtered or unexported fields
}

func NewLmstfyClient

func NewLmstfyClient(host string, port int, namespace, token string) *LmstfyClient

func NewLmstfyWithClient

func NewLmstfyWithClient(cli *http.Client, host string, port int, namespace, token string) *LmstfyClient

NewLmstfyWithClient allow using user defined http client to setup the lmstfy client

func (*LmstfyClient) Ack

func (c *LmstfyClient) Ack(queue, jobID string) *APIError

Mark a job as finished, so it won't be retried by others.

func (*LmstfyClient) BatchConsume

func (c *LmstfyClient) BatchConsume(queues []string, count, ttrSecond, timeoutSecond uint32) (jobs []*Job, e error)

BatchConsume consume some jobs. Consuming will decrease these jobs tries by 1 first.

  • ttrSecond is the time-to-run of these jobs. If these jobs are not finished before the TTR expires, these job will be released for consuming again if the `(tries - 1) > 0`.
  • count is the job count of this consume. If it's zero or over 100, this method will return an error. If it's positive, this method would return some jobs, and it's count is between 0 and count.

func (*LmstfyClient) BatchConsumeWithFreezeTries

func (c *LmstfyClient) BatchConsumeWithFreezeTries(queues []string, count, ttrSecond, timeoutSecond uint32) (jobs []*Job, e error)

BatchConsume consume some jobs. Consuming with freeze tries will not decrease these jobs tries.

  • ttrSecond is the time-to-run of these jobs. If these jobs are not finished before the TTR expires, these job will be released for consuming again if the `(tries - 1) > 0`.
  • count is the job count of this consume. If it's zero or over 100, this method will return an error. If it's positive, this method would return some jobs, and it's count is between 0 and count.

func (*LmstfyClient) BatchPublish

func (c *LmstfyClient) BatchPublish(queue string, jobs []interface{}, ttlSecond uint32, tries uint16, delaySecond uint32) (jobIDs []string, e error)

BatchPublish publish lots of jobs at one time

  • ttlSecond is the time-to-live of the job. If it's zero, job won't expire; if it's positive, the value is the TTL.
  • tries is the maximum times the job can be fetched.
  • delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied.

func (*LmstfyClient) ConfigRetry

func (c *LmstfyClient) ConfigRetry(retryCount int, backOffMillisecond int)

func (*LmstfyClient) Consume

func (c *LmstfyClient) Consume(queue string, ttrSecond, timeoutSecond uint32) (job *Job, e error)

Consume a job. Consuming will decrease the job's tries by 1 first.

  • ttrSecond is the time-to-run of the job. If the job is not finished before the TTR expires, the job will be released for consuming again if the `(tries - 1) > 0`.
  • timeoutSecond is the long-polling wait time. If it's zero, this method will return immediately with or without a job; if it's positive, this method would polling for new job until timeout.

func (*LmstfyClient) ConsumeFromQueues

func (c *LmstfyClient) ConsumeFromQueues(ttrSecond, timeoutSecond uint32, queues ...string) (job *Job, e error)

Consume from multiple queues with priority. The order of the queues in the params implies the priority. eg.

ConsumeFromQueues(120, 5, "queue-a", "queue-b", "queue-c")

if all the queues have jobs to be fetched, the job in `queue-a` will be return.

func (*LmstfyClient) ConsumeFromQueuesWithFreezeTries

func (c *LmstfyClient) ConsumeFromQueuesWithFreezeTries(ttrSecond, timeoutSecond uint32, queues ...string) (job *Job, e error)

func (*LmstfyClient) ConsumeWithFreezeTries

func (c *LmstfyClient) ConsumeWithFreezeTries(queue string, ttrSecond, timeoutSecond uint32) (job *Job, e error)

ConsumeWithFreezeTries a job. Consuming with retries will not decrease the job's tries.

  • ttrSecond is the time-to-run of the job. If the job is not finished before the TTR expires, the job will be released for consuming again if the `(tries - 1) > 0`.
  • timeoutSecond is the long-polling wait time. If it's zero, this method will return immediately with or without a job; if it's positive, this method would polling for new job until timeout.

func (*LmstfyClient) DeleteDeadLetter

func (c *LmstfyClient) DeleteDeadLetter(queue string, limit int64) *APIError

func (*LmstfyClient) EnableErrorOnNilJob

func (c *LmstfyClient) EnableErrorOnNilJob()

EnableErrorOnNilJob would make the client return error when the job was nil(maybe queue was not found)

func (*LmstfyClient) PeekDeadLetter

func (c *LmstfyClient) PeekDeadLetter(queue string) (deadLetterSize int, deadLetterHead string, e *APIError)

Peek the deadletter of the queue

func (*LmstfyClient) PeekJob

func (c *LmstfyClient) PeekJob(queue, jobID string) (job *Job, e *APIError)

Peek a specific job data

func (*LmstfyClient) PeekQueue

func (c *LmstfyClient) PeekQueue(queue string) (job *Job, e *APIError)

Peek the job in the head of the queue

func (*LmstfyClient) Publish

func (c *LmstfyClient) Publish(queue string, data []byte, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error)

Publish a new job to the queue.

  • ttlSecond is the time-to-live of the job. If it's zero, job won't expire; if it's positive, the value is the TTL.
  • tries is the maximum times the job can be fetched.
  • delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied.

func (*LmstfyClient) QueueSize

func (c *LmstfyClient) QueueSize(queue string) (int, *APIError)

Get queue size. how many jobs are ready for consuming

func (*LmstfyClient) RePublish

func (c *LmstfyClient) RePublish(job *Job, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error)

RePublish delete(ack) the job of the queue and publish the job again.

  • ttlSecond is the time-to-live of the job. If it's zero, job won't expire; if it's positive, the value is the TTL.
  • tries is the maximum times the job can be fetched.
  • delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied.

func (*LmstfyClient) RespawnDeadLetter

func (c *LmstfyClient) RespawnDeadLetter(queue string, limit, ttlSecond int64) (count int, e *APIError)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL