Documentation
¶
Overview ¶
Package with implementation of methods for work with a Tarantool's queue implementations.
Since: 1.5.
See also ¶
* Tarantool queue module https://github.com/tarantool/queue
Example (SimpleQueue) ¶
Example demonstrates an operations like Put and Take with queue.
cfg := queue.Cfg{ Temporary: false, Kind: queue.FIFO, Opts: queue.Opts{ Ttl: 10 * time.Second, }, } opts := tarantool.Opts{ Timeout: 2500 * time.Millisecond, User: "test", Pass: "test", } conn, err := tarantool.Connect("127.0.0.1:3013", opts) if err != nil { fmt.Printf("error in prepare is %v", err) return } defer conn.Close() q := queue.New(conn, "test_queue") if err := q.Create(cfg); err != nil { fmt.Printf("error in queue is %v", err) return } defer q.Drop() testData_1 := "test_data_1" if _, err = q.Put(testData_1); err != nil { fmt.Printf("error in put is %v", err) return } testData_2 := "test_data_2" task_2, err := q.PutWithOpts(testData_2, queue.Opts{Ttl: 2 * time.Second}) if err != nil { fmt.Printf("error in put with config is %v", err) return } task, err := q.Take() if err != nil { fmt.Printf("error in take with is %v", err) return } task.Ack() fmt.Println("data_1: ", task.Data()) err = task_2.Bury() if err != nil { fmt.Printf("error in bury with is %v", err) return } task, err = q.TakeTimeout(2 * time.Second) if err != nil { fmt.Printf("error in take with timeout") } if task != nil { fmt.Printf("Task should be nil, but %d", task.Id()) return }
Output: data_1: test_data_1
Example (SimpleQueueCustomMsgPack) ¶
Example demonstrates an operations like Put and Take with queue and custom MsgPack structure.
Features of the implementation:
- If you use the connection timeout and call TakeWithTimeout with a parameter greater than the connection timeout, the parameter is reduced to it.
- If you use the connection timeout and call Take, we return an error if we cannot take the task out of the queue within the time corresponding to the connection timeout.
// Setup queue module and start Tarantool instance before execution: // Terminal 1: // $ make deps // $ TEST_TNT_LISTEN=3013 tarantool queue/config.lua // // Terminal 2: // $ cd queue // $ go test -v example_msgpack_test.go package main import ( "fmt" "log" "time" "github.com/tarantool/go-tarantool" "github.com/tarantool/go-tarantool/queue" ) type dummyData struct { Dummy bool } func (c *dummyData) DecodeMsgpack(d *decoder) error { var err error if c.Dummy, err = d.DecodeBool(); err != nil { return err } return nil } func (c *dummyData) EncodeMsgpack(e *encoder) error { return e.EncodeBool(c.Dummy) } // Example demonstrates an operations like Put and Take with queue and custom // MsgPack structure. // // Features of the implementation: // // - If you use the connection timeout and call TakeWithTimeout with a // parameter greater than the connection timeout, the parameter is reduced to // it. // // - If you use the connection timeout and call Take, we return an error if we // cannot take the task out of the queue within the time corresponding to the // connection timeout. func main() { opts := tarantool.Opts{ Reconnect: time.Second, Timeout: 2500 * time.Millisecond, MaxReconnects: 5, User: "test", Pass: "test", } conn, err := tarantool.Connect("127.0.0.1:3013", opts) if err != nil { log.Fatalf("connection: %s", err) return } defer conn.Close() cfg := queue.Cfg{ Temporary: true, IfNotExists: true, Kind: queue.FIFO, Opts: queue.Opts{ Ttl: 10 * time.Second, Ttr: 5 * time.Second, Delay: 3 * time.Second, Pri: 1, }, } que := queue.New(conn, "test_queue_msgpack") if err = que.Create(cfg); err != nil { log.Fatalf("queue create: %s", err) return } // Put data. task, err := que.Put("test_data") if err != nil { log.Fatalf("put task: %s", err) } fmt.Println("Task id is", task.Id()) // Take data. task, err = que.Take() // Blocking operation. if err != nil { log.Fatalf("take task: %s", err) } fmt.Println("Data is", task.Data()) task.Ack() // Take typed example. putData := dummyData{} // Put data. task, err = que.Put(&putData) if err != nil { log.Fatalf("put typed task: %s", err) } fmt.Println("Task id is ", task.Id()) takeData := dummyData{} // Take data. task, err = que.TakeTyped(&takeData) // Blocking operation. if err != nil { log.Fatalf("take take typed: %s", err) } fmt.Println("Data is ", takeData) // Same data. fmt.Println("Data is ", task.Data()) task, err = que.Put([]int{1, 2, 3}) if err != nil { log.Fatalf("Put failed: %s", err) } task.Bury() task, err = que.TakeTimeout(2 * time.Second) if err != nil { log.Fatalf("Take with timeout failed: %s", err) } if task == nil { fmt.Println("Task is nil") } que.Drop() }
Output: Task id is 0 Data is test_data Task id is 0 Data is {false} Data is &{false} Task is nil
Index ¶
- Constants
- type Cfg
- type Opts
- type Queue
- type Task
- func (t *Task) Ack() error
- func (t *Task) Bury() error
- func (t *Task) Data() interface{}
- func (t *Task) DecodeMsgpack(d *decoder) error
- func (t *Task) Delete() error
- func (t *Task) Id() uint64
- func (t *Task) IsBuried() bool
- func (t *Task) IsDelayed() bool
- func (t *Task) IsDone() bool
- func (t *Task) IsReady() bool
- func (t *Task) IsTaken() bool
- func (t *Task) Release() error
- func (t *Task) ReleaseCfg(cfg Opts) error
- func (t *Task) Status() string
Examples ¶
Constants ¶
const ( READY = "r" TAKEN = "t" DONE = "-" BURIED = "!" DELAYED = "~" )
const ( FIFO queueType = "fifo" FIFO_TTL queueType = "fifottl" UTUBE queueType = "utube" UTUBE_TTL queueType = "utubettl" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Queue ¶
type Queue interface { // Exists checks tube for existence. // Note: it uses Eval, so user needs 'execute universe' privilege. Exists() (bool, error) // Create creates new tube with configuration. // Note: it uses Eval, so user needs 'execute universe' privilege // Note: you'd better not use this function in your application, cause it is // administrative task to create or delete queue. Create(cfg Cfg) error // Drop destroys tube. // Note: you'd better not use this function in your application, cause it is // administrative task to create or delete queue. Drop() error // Put creates new task in a tube. Put(data interface{}) (*Task, error) // PutWithOpts creates new task with options different from tube's defaults. PutWithOpts(data interface{}, cfg Opts) (*Task, error) // Take takes 'ready' task from a tube and marks it as 'in progress'. // Note: if connection has a request Timeout, then 0.9 * connection.Timeout is // used as a timeout. // If you use a connection timeout and we can not take task from queue in // a time equal to the connection timeout after calling `Take` then we // return an error. Take() (*Task, error) // TakeTimeout takes 'ready' task from a tube and marks it as "in progress", // or it is timeouted after "timeout" period. // Note: if connection has a request Timeout, and conn.Timeout * 0.9 < timeout // then timeout = conn.Timeout*0.9. // If you use connection timeout and call `TakeTimeout` with parameter // greater than the connection timeout then parameter reduced to it. TakeTimeout(timeout time.Duration) (*Task, error) // TakeTyped takes 'ready' task from a tube and marks it as 'in progress' // Note: if connection has a request Timeout, then 0.9 * connection.Timeout is // used as a timeout. // Data will be unpacked to result. TakeTyped(interface{}) (*Task, error) // TakeTypedTimeout takes 'ready' task from a tube and marks it as "in progress", // or it is timeouted after "timeout" period. // Note: if connection has a request Timeout, and conn.Timeout * 0.9 < timeout // then timeout = conn.Timeout*0.9. // Data will be unpacked to result. TakeTypedTimeout(timeout time.Duration, result interface{}) (*Task, error) // Peek returns task by its id. Peek(taskId uint64) (*Task, error) // Kick reverts effect of Task.Bury() for count tasks. Kick(count uint64) (uint64, error) // Delete the task identified by its id. Delete(taskId uint64) error // Statistic returns some statistic about queue. Statistic() (interface{}, error) }
Queue is a handle to Tarantool queue's tube.
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
Task represents a task from Tarantool queue's tube.
func (*Task) Bury ¶
Bury signals that task task cannot be executed in the current circumstances, task becomes "buried" - ie neither completed, nor ready, so it could not be deleted or taken by other worker. To revert "burying" call queue.Kick(numberOfBurried).
func (*Task) DecodeMsgpack ¶
func (*Task) Release ¶
Release returns task back in the queue without making it complete. In outher words, this worker failed to complete the task, and it, so other worker could try to do that again.
func (*Task) ReleaseCfg ¶
ReleaseCfg returns task to a queue and changes its configuration.