disque

package
v0.0.0-...-5d0e3c9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2016 License: BSD-2-Clause Imports: 10 Imported by: 9

Documentation

Overview

Package disque implements a simlpe client for the Disque in-memory distributed queue [https://github.com/antirez/disque]

Index

Examples

Constants

View Source
const HelloVersionId = 1

Variables

This section is empty.

Functions

This section is empty.

Types

type AddRequest

type AddRequest struct {
	// The job about to be added to the queue
	Job Job
	// If the add is not async, this is the maximal timeout for replicating the job to the minimal number of nodes
	Timeout time.Duration
	//  the number of nodes the job should be replicated to.
	Replicate int
	//  the duration that should elapse before the job is queued by any server
	Delay time.Duration
	// A duration after which, if no ACK is received, the job is put again into the queue for delivery
	Retry time.Duration
	// the max job life in seconds. After this time, the job is deleted even if it was not successfully delivered
	TTL time.Duration
	// if there are already Maxlen messages queued for the specified queue, adding returns an error
	Maxlen int
	// If set to true, the add command returns ASAP and replicates the job to other nodes in the background
	Async bool
}

AddRequest describes how you want a job to be enqueued

type Client

type Client interface {

	// Add sents an ADDJOB command to disque, as specified by the AddRequest. Returns the job id or an error
	Add(AddRequest) (string, error)

	// AddMulti sends multiple ADDJOB in pipeline
	AddMulti([]AddRequest) ([]string, error)

	// Get gets one job from any of the given queues, or times out if timeout has elapsed without a job being available. Returns a job or an error
	Get(timeout time.Duration, queues ...string) (Job, error)

	// GetMulti gets <count> jobs from the given queues, or times out if timeout has elapsed without enough jobs being available. Returns a job or an error
	GetMulti(count int, timeout time.Duration, queues ...string) ([]Job, error)

	// Ack sends and ACKJOB command with the given job ids
	Ack(jobIds ...string) error

	// FastAck sends a FASTACK commadn with the given job ids. See the disque docs about the
	// difference between ACK and FASTACK
	FastAck(jobIds ...string) error

	// Qlen returns the length of a given queue
	Qlen(qname string) (int, error)

	// Hello is a handshake request with the server, returns a description of the cluster state
	Hello() (HelloResponse, error)

	// Close closes the underlying connection
	Close() error
}

Client is the interface that describes a disque client

Example
pool := NewPool(DialFunc(dial), addr)
client, err := pool.Get()
if err != nil {
	panic(err)
}
defer client.Close()

qname := "test1"

// Create an "add" request with optional parameters.
// TODO: create a builder-style API for this
ja := AddRequest{
	Job: Job{
		Queue: qname,
		Data:  []byte("foo"),
	},
	Timeout:   time.Millisecond * 100,
	Replicate: pool.Size(),
}

// Add the job to the queue
if _, err := client.Add(ja); err != nil {
	panic(err)
}

job, err := client.Get(time.Second, qname)
if err != nil {
	panic(err)
}

fmt.Println(string(job.Data))
Output:

foo

type DialFunc

type DialFunc func(string) (redis.Conn, error)

DialFunc is a redis dialer function that should be supplied to the pool

type HelloResponse

type HelloResponse struct {
	NodeId string
	Nodes  nodeList
}

HelloResponse is returned from the Hello command and tells us the state of the cluster

type Job

type Job struct {
	// The name of the queue this job is sent/received to/from
	Queue string
	// Job data - this can be anything
	Data []byte
	// contains filtered or unexported fields
}

Job describes a job that is about to be enqueued or is received from the client for proecessing

func (Job) Id

func (j Job) Id() string

Id returns the job id. the underlying id is only settable by the client, so there is only a getter for it

type Node

type Node struct {
	Id       string
	Addr     string
	Priority int
}

Node describes a node in the cluster, received from Hello

func (Node) IsNull

func (n Node) IsNull() bool

IsNull checks if a node is empty or not

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a client pool that keeps track of the available servers in the cluster, and retrieves clients to random nodes in the cluster. Pooled connections should be closed to automatically be returned to the pool

func NewPool

func NewPool(f DialFunc, addrs ...string) *Pool

NewPool creates a new client pool, with a given redis dial function, and an initial list of ip:port addresses to try connecting to. You should call RefreshNodes after creating the pool to update the list of all nodes in the pool, and optionally call RunRefreshLoop to let the queue do this periodically in the background

func (*Pool) Close

func (p *Pool) Close() error

Close closes all pools

func (*Pool) Get

func (p *Pool) Get() (Client, error)

Get returns a client, or an error if we could not init one

func (*Pool) RefreshNodes

func (p *Pool) RefreshNodes() error

RefreshNodes uses a HELLO call to refresh the node list in the cluster

func (*Pool) RunRefreshLoop

func (p *Pool) RunRefreshLoop()

RunRefreshLoop starts a goroutine that periodically refreshes the node list using HELLO

func (*Pool) Size

func (p *Pool) Size() int

func (*Pool) UpdateNodes

func (p *Pool) UpdateNodes(nodes nodeList)

UpdateNodes explicitly sets the nodes of the pool

type RedisClient

type RedisClient struct {
	// contains filtered or unexported fields
}

RedisClient implements a redigo based client

func (*RedisClient) Ack

func (c *RedisClient) Ack(jobIds ...string) error

Ack sends and ACKJOB command with the given job ids

func (*RedisClient) Add

func (c *RedisClient) Add(r AddRequest) (string, error)

Add sents an ADDJOB command to disque, as specified by the AddRequest. Returns the job id or an error

func (*RedisClient) AddMulti

func (c *RedisClient) AddMulti(rs []AddRequest) ([]string, error)

AddMulti sends multiple ADDJOB in pipeline

func (*RedisClient) Close

func (c *RedisClient) Close() error

Close closes the underlying connection

func (*RedisClient) Enqueue

func (c *RedisClient) Enqueue(jobIds ...string) error

Enqueue an already existing job by jobId. This can be used for fast retries

func (*RedisClient) FastAck

func (c *RedisClient) FastAck(jobIds ...string) error

FastAck sends a FASTACK commadn with the given job ids. See the disque docs about the difference between ACK and FASTACK

func (*RedisClient) Get

func (c *RedisClient) Get(timeout time.Duration, queues ...string) (Job, error)

Get gets one job from any of the given queues, or times out if timeout has elapsed without a job being available. Returns a job or an error

func (*RedisClient) GetMulti

func (c *RedisClient) GetMulti(count int, timeout time.Duration, queues ...string) ([]Job, error)

GetMulti gets <count> jobs from the given queues, or times out if timeout has elapsed without enough jobs being available. Returns a list of jobs or an error

func (*RedisClient) Hello

func (c *RedisClient) Hello() (HelloResponse, error)

Hello is a handshake request with the server, returns a description of the cluster state TODO: implement this

func (*RedisClient) Qlen

func (c *RedisClient) Qlen(qname string) (int, error)

Qlen returns the length of a given queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL