Documentation ¶
Overview ¶
Package with implementation of methods for work with a Tarantool's queue implementations.
Since: 1.5.
See also ¶
* Tarantool queue module https://github.com/tarantool/queue
Example (ConnectionPool) ¶
Example demonstrates how to use the queue package with the pool package. First of all, you need to create a ConnectionHandler implementation for the a ConnectionPool object to process new connections from RW-instances.
You need to register a shared session UUID at a first master connection. It needs to be used to re-identify as the shared session on new RW-instances. See QueueConnectionHandler.Discovered() implementation.
After that, you need to create a ConnectorAdapter object with RW mode for the ConnectionPool to send requests into RW-instances. This adapter can be used to create a ready-to-work queue object.
package main import ( "fmt" "sync" "sync/atomic" "time" "github.com/GruffGemini/go-tarantool/v2" "github.com/GruffGemini/go-tarantool/v2/pool" "github.com/GruffGemini/go-tarantool/v2/queue" "github.com/GruffGemini/go-tarantool/v2/test_helpers" "github.com/google/uuid" ) // QueueConnectionHandler handles new connections in a ConnectionPool. type QueueConnectionHandler struct { name string cfg queue.Cfg uuid uuid.UUID registered bool err error mutex sync.Mutex updated chan struct{} masterCnt int32 } // QueueConnectionHandler implements the ConnectionHandler interface. var _ pool.ConnectionHandler = &QueueConnectionHandler{} // NewQueueConnectionHandler creates a QueueConnectionHandler object. func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandler { return &QueueConnectionHandler{ name: name, cfg: cfg, updated: make(chan struct{}, 10), } } // Discovered configures a queue for an instance and identifies a shared queue // session on master instances. // // NOTE: the Queue supports only a master-replica cluster configuration. It // does not support a master-master configuration. func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection, role pool.Role) error { h.mutex.Lock() defer h.mutex.Unlock() if h.err != nil { return h.err } master := role == pool.MasterRole q := queue.New(conn, h.name) // Check is queue ready to work. if state, err := q.State(); err != nil { h.updated <- struct{}{} h.err = err return err } else if master && state != queue.RunningState { return fmt.Errorf("queue state is not RUNNING: %d", state) } else if !master && state != queue.InitState && state != queue.WaitingState { return fmt.Errorf("queue state is not INIT and not WAITING: %d", state) } defer func() { h.updated <- struct{}{} }() // Set up a queue module configuration for an instance. Ideally, this // should be done before box.cfg({}) or you need to wait some time // before start a work. // // See: // https://github.com/tarantool/queue/issues/206 opts := queue.CfgOpts{InReplicaset: true, Ttr: 60 * time.Second} if h.err = q.Cfg(opts); h.err != nil { return fmt.Errorf("unable to configure queue: %w", h.err) } // The queue only works with a master instance. if !master { return nil } if !h.registered { // We register a shared session at the first time. if h.uuid, h.err = q.Identify(nil); h.err != nil { return h.err } h.registered = true } else { // We re-identify as the shared session. if _, h.err = q.Identify(&h.uuid); h.err != nil { return h.err } } if h.err = q.Create(h.cfg); h.err != nil { return h.err } fmt.Printf("Master %s is ready to work!\n", conn.Addr()) atomic.AddInt32(&h.masterCnt, 1) return nil } // Deactivated doesn't do anything useful for the example. func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection, role pool.Role) error { if role == pool.MasterRole { atomic.AddInt32(&h.masterCnt, -1) } return nil } // Closes closes a QueueConnectionHandler object. func (h *QueueConnectionHandler) Close() { close(h.updated) } // Example demonstrates how to use the queue package with the pool // package. First of all, you need to create a ConnectionHandler implementation // for the a ConnectionPool object to process new connections from // RW-instances. // // You need to register a shared session UUID at a first master connection. // It needs to be used to re-identify as the shared session on new // RW-instances. See QueueConnectionHandler.Discovered() implementation. // // After that, you need to create a ConnectorAdapter object with RW mode for // the ConnectionPool to send requests into RW-instances. This adapter can // be used to create a ready-to-work queue object. func main() { // Create a ConnectionHandler object. cfg := queue.Cfg{ Temporary: false, IfNotExists: true, Kind: queue.FIFO, Opts: queue.Opts{ Ttl: 10 * time.Second, }, } h := NewQueueConnectionHandler("test_queue", cfg) defer h.Close() // Create a ConnectionPool object. servers := []string{ "127.0.0.1:3014", "127.0.0.1:3015", } connOpts := tarantool.Opts{ Timeout: 5 * time.Second, User: "test", Pass: "test", } poolOpts := pool.Opts{ CheckTimeout: 5 * time.Second, ConnectionHandler: h, } connPool, err := pool.ConnectWithOpts(servers, connOpts, poolOpts) if err != nil { fmt.Printf("Unable to connect to the pool: %s", err) return } defer connPool.Close() // Wait for a queue initialization and master instance identification in // the queue. <-h.updated <-h.updated if h.err != nil { fmt.Printf("Unable to identify in the pool: %s", h.err) return } // Create a Queue object from the ConnectionPool object via // a ConnectorAdapter. rw := pool.NewConnectorAdapter(connPool, pool.RW) q := queue.New(rw, "test_queue") fmt.Println("A Queue object is ready to work.") testData := "test_data" fmt.Println("Send data:", testData) if _, err = q.Put(testData); err != nil { fmt.Printf("Unable to put data into the queue: %s", err) return } // Switch a master instance in the pool. roles := []bool{true, false} for { err := test_helpers.SetClusterRO(servers, connOpts, roles) if err == nil { break } } // Wait for a replica instance connection and a new master instance // re-identification. <-h.updated <-h.updated h.mutex.Lock() err = h.err h.mutex.Unlock() if err != nil { fmt.Printf("Unable to re-identify in the pool: %s", err) return } for i := 0; i < 2 && atomic.LoadInt32(&h.masterCnt) != 1; i++ { // The pool does not immediately detect role switching. It may happen // that requests will be sent to RO instances. In that case q.Take() // method will return a nil value. // // We need to make the example test output deterministic so we need to // avoid it here. But in real life, you need to take this into account. time.Sleep(poolOpts.CheckTimeout) } for { // Take a data from the new master instance. task, err := q.Take() if err == pool.ErrNoRwInstance { // It may be not registered yet by the pool. continue } else if err != nil { fmt.Println("Unable to got task:", err) } else if task == nil { fmt.Println("task == nil") } else if task.Data() == nil { fmt.Println("task.Data() == nil") } else { task.Ack() fmt.Println("Got data:", task.Data()) } break } }
Output: Master 127.0.0.1:3014 is ready to work! A Queue object is ready to work. Send data: test_data Master 127.0.0.1:3015 is ready to work! Got data: test_data
Example (SimpleQueue) ¶
Example demonstrates an operations like Put and Take with queue.
cfg := queue.Cfg{ Temporary: false, Kind: queue.FIFO, Opts: queue.Opts{ Ttl: 10 * time.Second, }, } opts := tarantool.Opts{ Timeout: 2500 * time.Millisecond, User: "test", Pass: "test", } conn, err := tarantool.Connect("127.0.0.1:3013", opts) if err != nil { fmt.Printf("error in prepare is %v", err) return } defer conn.Close() q := queue.New(conn, "test_queue") if err := q.Create(cfg); err != nil { fmt.Printf("error in queue is %v", err) return } defer q.Drop() testData_1 := "test_data_1" if _, err = q.Put(testData_1); err != nil { fmt.Printf("error in put is %v", err) return } testData_2 := "test_data_2" task_2, err := q.PutWithOpts(testData_2, queue.Opts{Ttl: 2 * time.Second}) if err != nil { fmt.Printf("error in put with config is %v", err) return } task, err := q.Take() if err != nil { fmt.Printf("error in take with is %v", err) return } task.Ack() fmt.Println("data_1: ", task.Data()) err = task_2.Bury() if err != nil { fmt.Printf("error in bury with is %v", err) return } task, err = q.TakeTimeout(2 * time.Second) if err != nil { fmt.Printf("error in take with timeout") } if task != nil { fmt.Printf("Task should be nil, but %d", task.Id()) return }
Output: data_1: test_data_1
Example (SimpleQueueCustomMsgPack) ¶
Example demonstrates an operations like Put and Take with queue and custom MsgPack structure.
Features of the implementation:
- If you use the connection timeout and call TakeWithTimeout with a parameter greater than the connection timeout, the parameter is reduced to it.
- If you use the connection timeout and call Take, we return an error if we cannot take the task out of the queue within the time corresponding to the connection timeout.
// Setup queue module and start Tarantool instance before execution: // Terminal 1: // $ make deps // $ TEST_TNT_LISTEN=3013 tarantool queue/config.lua // // Terminal 2: // $ cd queue // $ go test -v example_msgpack_test.go package main import ( "fmt" "log" "time" "github.com/vmihailenco/msgpack/v5" "github.com/GruffGemini/go-tarantool/v2" "github.com/GruffGemini/go-tarantool/v2/queue" ) type dummyData struct { Dummy bool } func (c *dummyData) DecodeMsgpack(d *msgpack.Decoder) error { var err error if c.Dummy, err = d.DecodeBool(); err != nil { return err } return nil } func (c *dummyData) EncodeMsgpack(e *msgpack.Encoder) error { return e.EncodeBool(c.Dummy) } // Example demonstrates an operations like Put and Take with queue and custom // MsgPack structure. // // Features of the implementation: // // - If you use the connection timeout and call TakeWithTimeout with a // parameter greater than the connection timeout, the parameter is reduced to // it. // // - If you use the connection timeout and call Take, we return an error if we // cannot take the task out of the queue within the time corresponding to the // connection timeout. func main() { opts := tarantool.Opts{ Reconnect: time.Second, Timeout: 5 * time.Second, MaxReconnects: 5, User: "test", Pass: "test", } conn, err := tarantool.Connect("127.0.0.1:3013", opts) if err != nil { log.Fatalf("connection: %s", err) return } defer conn.Close() cfg := queue.Cfg{ Temporary: true, IfNotExists: true, Kind: queue.FIFO, Opts: queue.Opts{ Ttl: 20 * time.Second, Ttr: 10 * time.Second, Delay: 6 * time.Second, Pri: 1, }, } que := queue.New(conn, "test_queue_msgpack") if err = que.Create(cfg); err != nil { fmt.Printf("queue create: %s", err) return } // Put data. task, err := que.Put("test_data") if err != nil { fmt.Printf("put task: %s", err) return } fmt.Println("Task id is", task.Id()) // Take data. task, err = que.Take() // Blocking operation. if err != nil { fmt.Printf("take task: %s", err) return } fmt.Println("Data is", task.Data()) task.Ack() // Take typed example. putData := dummyData{} // Put data. task, err = que.Put(&putData) if err != nil { fmt.Printf("put typed task: %s", err) return } fmt.Println("Task id is ", task.Id()) takeData := dummyData{} // Take data. task, err = que.TakeTyped(&takeData) // Blocking operation. if err != nil { fmt.Printf("take take typed: %s", err) return } fmt.Println("Data is ", takeData) // Same data. fmt.Println("Data is ", task.Data()) task, err = que.Put([]int{1, 2, 3}) if err != nil { fmt.Printf("Put failed: %s", err) return } task.Bury() task, err = que.TakeTimeout(2 * time.Second) if err != nil { fmt.Printf("Take with timeout failed: %s", err) return } if task == nil { fmt.Println("Task is nil") } que.Drop() }
Output: Task id is 0 Data is test_data Task id is 0 Data is {false} Data is &{false} Task is nil
Index ¶
- Constants
- type Cfg
- type CfgOpts
- type Opts
- type Queue
- type State
- type Task
- func (t *Task) Ack() error
- func (t *Task) Bury() error
- func (t *Task) Data() interface{}
- func (t *Task) DecodeMsgpack(d *msgpack.Decoder) error
- func (t *Task) Delete() error
- func (t *Task) Id() uint64
- func (t *Task) IsBuried() bool
- func (t *Task) IsDelayed() bool
- func (t *Task) IsDone() bool
- func (t *Task) IsReady() bool
- func (t *Task) IsTaken() bool
- func (t *Task) Release() error
- func (t *Task) ReleaseCfg(cfg Opts) error
- func (t *Task) Status() string
- func (t *Task) Touch(increment time.Duration) error
Examples ¶
Constants ¶
const ( READY = "r" TAKEN = "t" DONE = "-" BURIED = "!" DELAYED = "~" )
const ( FIFO queueType = "fifo" FIFO_TTL queueType = "fifottl" UTUBE queueType = "utube" UTUBE_TTL queueType = "utubettl" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CfgOpts ¶
type CfgOpts struct { // Enable replication mode. Must be true if the queue is used in master and // replica mode. With replication mode enabled, the potential loss of // performance can be ~20% compared to single mode. Default value is false. InReplicaset bool // Time to release in seconds. The time after which, if there is no active // connection in the session, it will be released with all its tasks. Ttr time.Duration }
CfgOpts is argument type for the Queue.Cfg() call.
type Queue ¶
type Queue interface { // Set queue settings. Cfg(opts CfgOpts) error // Exists checks tube for existence. // Note: it uses Eval, so user needs 'execute universe' privilege. Exists() (bool, error) // Identify to a shared session. // In the queue the session has a unique UUID and many connections may // share one logical session. Also, the consumer can reconnect to the // existing session during the ttr time. // To get the UUID of the current session, call the Queue.Identify(nil). Identify(u *uuid.UUID) (uuid.UUID, error) // Create creates new tube with configuration. // Note: it uses Eval, so user needs 'execute universe' privilege // Note: you'd better not use this function in your application, cause it is // administrative task to create or delete queue. Create(cfg Cfg) error // Drop destroys tube. // Note: you'd better not use this function in your application, cause it is // administrative task to create or delete queue. Drop() error // ReleaseAll forcibly returns all taken tasks to a ready state. ReleaseAll() error // Put creates new task in a tube. Put(data interface{}) (*Task, error) // PutWithOpts creates new task with options different from tube's defaults. PutWithOpts(data interface{}, cfg Opts) (*Task, error) // Take takes 'ready' task from a tube and marks it as 'in progress'. // Note: if connection has a request Timeout, then 0.9 * connection.Timeout is // used as a timeout. // If you use a connection timeout and we can not take task from queue in // a time equal to the connection timeout after calling `Take` then we // return an error. Take() (*Task, error) // TakeTimeout takes 'ready' task from a tube and marks it as "in progress", // or it is timeouted after "timeout" period. // Note: if connection has a request Timeout, and conn.Timeout * 0.9 < timeout // then timeout = conn.Timeout*0.9. // If you use connection timeout and call `TakeTimeout` with parameter // greater than the connection timeout then parameter reduced to it. TakeTimeout(timeout time.Duration) (*Task, error) // TakeTyped takes 'ready' task from a tube and marks it as 'in progress' // Note: if connection has a request Timeout, then 0.9 * connection.Timeout is // used as a timeout. // Data will be unpacked to result. TakeTyped(interface{}) (*Task, error) // TakeTypedTimeout takes 'ready' task from a tube and marks it as "in progress", // or it is timeouted after "timeout" period. // Note: if connection has a request Timeout, and conn.Timeout * 0.9 < timeout // then timeout = conn.Timeout*0.9. // Data will be unpacked to result. TakeTypedTimeout(timeout time.Duration, result interface{}) (*Task, error) // Peek returns task by its id. Peek(taskId uint64) (*Task, error) // Kick reverts effect of Task.Bury() for count tasks. Kick(count uint64) (uint64, error) // Delete the task identified by its id. Delete(taskId uint64) error // State returns a current queue state. State() (State, error) // Statistic returns some statistic about queue. Statistic() (interface{}, error) }
Queue is a handle to Tarantool queue's tube.
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
Task represents a task from Tarantool queue's tube.
func (*Task) Bury ¶
Bury signals that task task cannot be executed in the current circumstances, task becomes "buried" - ie neither completed, nor ready, so it could not be deleted or taken by other worker. To revert "burying" call queue.Kick(numberOfBurried).
func (*Task) DecodeMsgpack ¶
func (*Task) Release ¶
Release returns task back in the queue without making it complete. In other words, this worker failed to complete the task, and it, so other worker could try to do that again.
func (*Task) ReleaseCfg ¶
ReleaseCfg returns task to a queue and changes its configuration.