Documentation ¶
Overview ¶
Package disque implements a simlpe client for the Disque in-memory distributed queue [https://github.com/antirez/disque]
Index ¶
- Constants
- type AddRequest
- type Client
- type DialFunc
- type HelloResponse
- type Job
- type Node
- type Pool
- type RedisClient
- func (c *RedisClient) Ack(jobIds ...string) error
- func (c *RedisClient) Add(r AddRequest) (string, error)
- func (c *RedisClient) AddMulti(rs []AddRequest) ([]string, error)
- func (c *RedisClient) Close() error
- func (c *RedisClient) Enqueue(jobIds ...string) error
- func (c *RedisClient) FastAck(jobIds ...string) error
- func (c *RedisClient) Get(timeout time.Duration, queues ...string) (Job, error)
- func (c *RedisClient) GetMulti(count int, timeout time.Duration, queues ...string) ([]Job, error)
- func (c *RedisClient) Hello() (HelloResponse, error)
- func (c *RedisClient) Qlen(qname string) (int, error)
Examples ¶
Constants ¶
const HelloVersionId = 1
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AddRequest ¶
type AddRequest struct { // The job about to be added to the queue Job Job // If the add is not async, this is the maximal timeout for replicating the job to the minimal number of nodes Timeout time.Duration // the number of nodes the job should be replicated to. Replicate int // the duration that should elapse before the job is queued by any server Delay time.Duration // A duration after which, if no ACK is received, the job is put again into the queue for delivery Retry time.Duration // the max job life in seconds. After this time, the job is deleted even if it was not successfully delivered TTL time.Duration // if there are already Maxlen messages queued for the specified queue, adding returns an error Maxlen int // If set to true, the add command returns ASAP and replicates the job to other nodes in the background Async bool }
AddRequest describes how you want a job to be enqueued
type Client ¶
type Client interface { // Add sents an ADDJOB command to disque, as specified by the AddRequest. Returns the job id or an error Add(AddRequest) (string, error) // AddMulti sends multiple ADDJOB in pipeline AddMulti([]AddRequest) ([]string, error) // Get gets one job from any of the given queues, or times out if timeout has elapsed without a job being available. Returns a job or an error Get(timeout time.Duration, queues ...string) (Job, error) // GetMulti gets <count> jobs from the given queues, or times out if timeout has elapsed without enough jobs being available. Returns a job or an error GetMulti(count int, timeout time.Duration, queues ...string) ([]Job, error) // Ack sends and ACKJOB command with the given job ids Ack(jobIds ...string) error // FastAck sends a FASTACK commadn with the given job ids. See the disque docs about the // difference between ACK and FASTACK FastAck(jobIds ...string) error // Qlen returns the length of a given queue Qlen(qname string) (int, error) // Hello is a handshake request with the server, returns a description of the cluster state Hello() (HelloResponse, error) // Close closes the underlying connection Close() error }
Client is the interface that describes a disque client
Example ¶
pool := NewPool(DialFunc(dial), addr) client, err := pool.Get() if err != nil { panic(err) } defer client.Close() qname := "test1" // Create an "add" request with optional parameters. // TODO: create a builder-style API for this ja := AddRequest{ Job: Job{ Queue: qname, Data: []byte("foo"), }, Timeout: time.Millisecond * 100, Replicate: pool.Size(), } // Add the job to the queue if _, err := client.Add(ja); err != nil { panic(err) } job, err := client.Get(time.Second, qname) if err != nil { panic(err) } fmt.Println(string(job.Data))
Output: foo
type HelloResponse ¶
type HelloResponse struct { NodeId string Nodes nodeList }
HelloResponse is returned from the Hello command and tells us the state of the cluster
type Job ¶
type Job struct { // The name of the queue this job is sent/received to/from Queue string // Job data - this can be anything Data []byte // contains filtered or unexported fields }
Job describes a job that is about to be enqueued or is received from the client for proecessing
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool is a client pool that keeps track of the available servers in the cluster, and retrieves clients to random nodes in the cluster. Pooled connections should be closed to automatically be returned to the pool
func NewPool ¶
NewPool creates a new client pool, with a given redis dial function, and an initial list of ip:port addresses to try connecting to. You should call RefreshNodes after creating the pool to update the list of all nodes in the pool, and optionally call RunRefreshLoop to let the queue do this periodically in the background
func (*Pool) RefreshNodes ¶
RefreshNodes uses a HELLO call to refresh the node list in the cluster
func (*Pool) RunRefreshLoop ¶
func (p *Pool) RunRefreshLoop()
RunRefreshLoop starts a goroutine that periodically refreshes the node list using HELLO
func (*Pool) UpdateNodes ¶
func (p *Pool) UpdateNodes(nodes nodeList)
UpdateNodes explicitly sets the nodes of the pool
type RedisClient ¶
type RedisClient struct {
// contains filtered or unexported fields
}
RedisClient implements a redigo based client
func (*RedisClient) Ack ¶
func (c *RedisClient) Ack(jobIds ...string) error
Ack sends and ACKJOB command with the given job ids
func (*RedisClient) Add ¶
func (c *RedisClient) Add(r AddRequest) (string, error)
Add sents an ADDJOB command to disque, as specified by the AddRequest. Returns the job id or an error
func (*RedisClient) AddMulti ¶
func (c *RedisClient) AddMulti(rs []AddRequest) ([]string, error)
AddMulti sends multiple ADDJOB in pipeline
func (*RedisClient) Close ¶
func (c *RedisClient) Close() error
Close closes the underlying connection
func (*RedisClient) Enqueue ¶
func (c *RedisClient) Enqueue(jobIds ...string) error
Enqueue an already existing job by jobId. This can be used for fast retries
func (*RedisClient) FastAck ¶
func (c *RedisClient) FastAck(jobIds ...string) error
FastAck sends a FASTACK commadn with the given job ids. See the disque docs about the difference between ACK and FASTACK
func (*RedisClient) Get ¶
Get gets one job from any of the given queues, or times out if timeout has elapsed without a job being available. Returns a job or an error
func (*RedisClient) GetMulti ¶
GetMulti gets <count> jobs from the given queues, or times out if timeout has elapsed without enough jobs being available. Returns a list of jobs or an error
func (*RedisClient) Hello ¶
func (c *RedisClient) Hello() (HelloResponse, error)
Hello is a handshake request with the server, returns a description of the cluster state TODO: implement this