queue

package
v2.0.0-...-2a4499d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2023 License: BSD-2-Clause Imports: 5 Imported by: 0

Documentation

Overview

Package with implementation of methods for work with a Tarantool's queue implementations.

Since: 1.5.

See also

* Tarantool queue module https://github.com/tarantool/queue

Example (ConnectionPool)

Example demonstrates how to use the queue package with the pool package. First of all, you need to create a ConnectionHandler implementation for the a ConnectionPool object to process new connections from RW-instances.

You need to register a shared session UUID at a first master connection. It needs to be used to re-identify as the shared session on new RW-instances. See QueueConnectionHandler.Discovered() implementation.

After that, you need to create a ConnectorAdapter object with RW mode for the ConnectionPool to send requests into RW-instances. This adapter can be used to create a ready-to-work queue object.

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"

	"github.com/GruffGemini/go-tarantool/v2"
	"github.com/GruffGemini/go-tarantool/v2/pool"
	"github.com/GruffGemini/go-tarantool/v2/queue"
	"github.com/GruffGemini/go-tarantool/v2/test_helpers"
	"github.com/google/uuid"
)

// QueueConnectionHandler handles new connections in a ConnectionPool.
type QueueConnectionHandler struct {
	name string
	cfg  queue.Cfg

	uuid       uuid.UUID
	registered bool
	err        error
	mutex      sync.Mutex
	updated    chan struct{}
	masterCnt  int32
}

// QueueConnectionHandler implements the ConnectionHandler interface.
var _ pool.ConnectionHandler = &QueueConnectionHandler{}

// NewQueueConnectionHandler creates a QueueConnectionHandler object.
func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandler {
	return &QueueConnectionHandler{
		name:    name,
		cfg:     cfg,
		updated: make(chan struct{}, 10),
	}
}

// Discovered configures a queue for an instance and identifies a shared queue
// session on master instances.
//
// NOTE: the Queue supports only a master-replica cluster configuration. It
// does not support a master-master configuration.
func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
	role pool.Role) error {
	h.mutex.Lock()
	defer h.mutex.Unlock()

	if h.err != nil {
		return h.err
	}

	master := role == pool.MasterRole

	q := queue.New(conn, h.name)

	// Check is queue ready to work.
	if state, err := q.State(); err != nil {
		h.updated <- struct{}{}
		h.err = err
		return err
	} else if master && state != queue.RunningState {
		return fmt.Errorf("queue state is not RUNNING: %d", state)
	} else if !master && state != queue.InitState && state != queue.WaitingState {
		return fmt.Errorf("queue state is not INIT and not WAITING: %d", state)
	}

	defer func() {
		h.updated <- struct{}{}
	}()

	// Set up a queue module configuration for an instance. Ideally, this
	// should be done before box.cfg({}) or you need to wait some time
	// before start a work.
	//
	// See:
	// https://github.com/tarantool/queue/issues/206
	opts := queue.CfgOpts{InReplicaset: true, Ttr: 60 * time.Second}

	if h.err = q.Cfg(opts); h.err != nil {
		return fmt.Errorf("unable to configure queue: %w", h.err)
	}

	// The queue only works with a master instance.
	if !master {
		return nil
	}

	if !h.registered {
		// We register a shared session at the first time.
		if h.uuid, h.err = q.Identify(nil); h.err != nil {
			return h.err
		}
		h.registered = true
	} else {
		// We re-identify as the shared session.
		if _, h.err = q.Identify(&h.uuid); h.err != nil {
			return h.err
		}
	}

	if h.err = q.Create(h.cfg); h.err != nil {
		return h.err
	}

	fmt.Printf("Master %s is ready to work!\n", conn.Addr())
	atomic.AddInt32(&h.masterCnt, 1)

	return nil
}

// Deactivated doesn't do anything useful for the example.
func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection,
	role pool.Role) error {
	if role == pool.MasterRole {
		atomic.AddInt32(&h.masterCnt, -1)
	}
	return nil
}

// Closes closes a QueueConnectionHandler object.
func (h *QueueConnectionHandler) Close() {
	close(h.updated)
}

// Example demonstrates how to use the queue package with the pool
// package. First of all, you need to create a ConnectionHandler implementation
// for the a ConnectionPool object to process new connections from
// RW-instances.
//
// You need to register a shared session UUID at a first master connection.
// It needs to be used to re-identify as the shared session on new
// RW-instances. See QueueConnectionHandler.Discovered() implementation.
//
// After that, you need to create a ConnectorAdapter object with RW mode for
// the ConnectionPool to send requests into RW-instances. This adapter can
// be used to create a ready-to-work queue object.
func main() {
	// Create a ConnectionHandler object.
	cfg := queue.Cfg{
		Temporary:   false,
		IfNotExists: true,
		Kind:        queue.FIFO,
		Opts: queue.Opts{
			Ttl: 10 * time.Second,
		},
	}
	h := NewQueueConnectionHandler("test_queue", cfg)
	defer h.Close()

	// Create a ConnectionPool object.
	servers := []string{
		"127.0.0.1:3014",
		"127.0.0.1:3015",
	}
	connOpts := tarantool.Opts{
		Timeout: 5 * time.Second,
		User:    "test",
		Pass:    "test",
	}
	poolOpts := pool.Opts{
		CheckTimeout:      5 * time.Second,
		ConnectionHandler: h,
	}
	connPool, err := pool.ConnectWithOpts(servers, connOpts, poolOpts)
	if err != nil {
		fmt.Printf("Unable to connect to the pool: %s", err)
		return
	}
	defer connPool.Close()

	// Wait for a queue initialization and master instance identification in
	// the queue.
	<-h.updated
	<-h.updated
	if h.err != nil {
		fmt.Printf("Unable to identify in the pool: %s", h.err)
		return
	}

	// Create a Queue object from the ConnectionPool object via
	// a ConnectorAdapter.
	rw := pool.NewConnectorAdapter(connPool, pool.RW)
	q := queue.New(rw, "test_queue")
	fmt.Println("A Queue object is ready to work.")

	testData := "test_data"
	fmt.Println("Send data:", testData)
	if _, err = q.Put(testData); err != nil {
		fmt.Printf("Unable to put data into the queue: %s", err)
		return
	}

	// Switch a master instance in the pool.
	roles := []bool{true, false}
	for {
		err := test_helpers.SetClusterRO(servers, connOpts, roles)
		if err == nil {
			break
		}
	}

	// Wait for a replica instance connection and a new master instance
	// re-identification.
	<-h.updated
	<-h.updated
	h.mutex.Lock()
	err = h.err
	h.mutex.Unlock()

	if err != nil {
		fmt.Printf("Unable to re-identify in the pool: %s", err)
		return
	}

	for i := 0; i < 2 && atomic.LoadInt32(&h.masterCnt) != 1; i++ {
		// The pool does not immediately detect role switching. It may happen
		// that requests will be sent to RO instances. In that case q.Take()
		// method will return a nil value.
		//
		// We need to make the example test output deterministic so we need to
		// avoid it here. But in real life, you need to take this into account.
		time.Sleep(poolOpts.CheckTimeout)
	}

	for {
		// Take a data from the new master instance.
		task, err := q.Take()

		if err == pool.ErrNoRwInstance {
			// It may be not registered yet by the pool.
			continue
		} else if err != nil {
			fmt.Println("Unable to got task:", err)
		} else if task == nil {
			fmt.Println("task == nil")
		} else if task.Data() == nil {
			fmt.Println("task.Data() == nil")
		} else {
			task.Ack()
			fmt.Println("Got data:", task.Data())
		}
		break
	}

}
Output:

Master 127.0.0.1:3014 is ready to work!
A Queue object is ready to work.
Send data: test_data
Master 127.0.0.1:3015 is ready to work!
Got data: test_data
Example (SimpleQueue)

Example demonstrates an operations like Put and Take with queue.

cfg := queue.Cfg{
	Temporary: false,
	Kind:      queue.FIFO,
	Opts: queue.Opts{
		Ttl: 10 * time.Second,
	},
}
opts := tarantool.Opts{
	Timeout: 2500 * time.Millisecond,
	User:    "test",
	Pass:    "test",
}

conn, err := tarantool.Connect("127.0.0.1:3013", opts)
if err != nil {
	fmt.Printf("error in prepare is %v", err)
	return
}
defer conn.Close()

q := queue.New(conn, "test_queue")
if err := q.Create(cfg); err != nil {
	fmt.Printf("error in queue is %v", err)
	return
}

defer q.Drop()

testData_1 := "test_data_1"
if _, err = q.Put(testData_1); err != nil {
	fmt.Printf("error in put is %v", err)
	return
}

testData_2 := "test_data_2"
task_2, err := q.PutWithOpts(testData_2, queue.Opts{Ttl: 2 * time.Second})
if err != nil {
	fmt.Printf("error in put with config is %v", err)
	return
}

task, err := q.Take()
if err != nil {
	fmt.Printf("error in take with is %v", err)
	return
}
task.Ack()
fmt.Println("data_1: ", task.Data())

err = task_2.Bury()
if err != nil {
	fmt.Printf("error in bury with is %v", err)
	return
}

task, err = q.TakeTimeout(2 * time.Second)
if err != nil {
	fmt.Printf("error in take with timeout")
}
if task != nil {
	fmt.Printf("Task should be nil, but %d", task.Id())
	return
}
Output:

data_1:  test_data_1
Example (SimpleQueueCustomMsgPack)

Example demonstrates an operations like Put and Take with queue and custom MsgPack structure.

Features of the implementation:

- If you use the connection timeout and call TakeWithTimeout with a parameter greater than the connection timeout, the parameter is reduced to it.

- If you use the connection timeout and call Take, we return an error if we cannot take the task out of the queue within the time corresponding to the connection timeout.

// Setup queue module and start Tarantool instance before execution:
// Terminal 1:
// $ make deps
// $ TEST_TNT_LISTEN=3013 tarantool queue/config.lua
//
// Terminal 2:
// $ cd queue
// $ go test -v example_msgpack_test.go
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/vmihailenco/msgpack/v5"

	"github.com/GruffGemini/go-tarantool/v2"
	"github.com/GruffGemini/go-tarantool/v2/queue"
)

type dummyData struct {
	Dummy bool
}

func (c *dummyData) DecodeMsgpack(d *msgpack.Decoder) error {
	var err error
	if c.Dummy, err = d.DecodeBool(); err != nil {
		return err
	}
	return nil
}

func (c *dummyData) EncodeMsgpack(e *msgpack.Encoder) error {
	return e.EncodeBool(c.Dummy)
}

// Example demonstrates an operations like Put and Take with queue and custom
// MsgPack structure.
//
// Features of the implementation:
//
// - If you use the connection timeout and call TakeWithTimeout with a
// parameter greater than the connection timeout, the parameter is reduced to
// it.
//
// - If you use the connection timeout and call Take, we return an error if we
// cannot take the task out of the queue within the time corresponding to the
// connection timeout.
func main() {
	opts := tarantool.Opts{
		Reconnect:     time.Second,
		Timeout:       5 * time.Second,
		MaxReconnects: 5,
		User:          "test",
		Pass:          "test",
	}
	conn, err := tarantool.Connect("127.0.0.1:3013", opts)
	if err != nil {
		log.Fatalf("connection: %s", err)
		return
	}
	defer conn.Close()

	cfg := queue.Cfg{
		Temporary:   true,
		IfNotExists: true,
		Kind:        queue.FIFO,
		Opts: queue.Opts{
			Ttl:   20 * time.Second,
			Ttr:   10 * time.Second,
			Delay: 6 * time.Second,
			Pri:   1,
		},
	}

	que := queue.New(conn, "test_queue_msgpack")
	if err = que.Create(cfg); err != nil {
		fmt.Printf("queue create: %s", err)
		return
	}

	// Put data.
	task, err := que.Put("test_data")
	if err != nil {
		fmt.Printf("put task: %s", err)
		return
	}
	fmt.Println("Task id is", task.Id())

	// Take data.
	task, err = que.Take() // Blocking operation.
	if err != nil {
		fmt.Printf("take task: %s", err)
		return
	}
	fmt.Println("Data is", task.Data())
	task.Ack()

	// Take typed example.
	putData := dummyData{}
	// Put data.
	task, err = que.Put(&putData)
	if err != nil {
		fmt.Printf("put typed task: %s", err)
		return
	}
	fmt.Println("Task id is ", task.Id())

	takeData := dummyData{}
	// Take data.
	task, err = que.TakeTyped(&takeData) // Blocking operation.
	if err != nil {
		fmt.Printf("take take typed: %s", err)
		return
	}
	fmt.Println("Data is ", takeData)
	// Same data.
	fmt.Println("Data is ", task.Data())

	task, err = que.Put([]int{1, 2, 3})
	if err != nil {
		fmt.Printf("Put failed: %s", err)
		return
	}
	task.Bury()

	task, err = que.TakeTimeout(2 * time.Second)
	if err != nil {
		fmt.Printf("Take with timeout failed: %s", err)
		return
	}
	if task == nil {
		fmt.Println("Task is nil")
	}

	que.Drop()

}
Output:

Task id is 0
Data is test_data
Task id is  0
Data is  {false}
Data is  &{false}
Task is nil

Index

Examples

Constants

View Source
const (
	READY   = "r"
	TAKEN   = "t"
	DONE    = "-"
	BURIED  = "!"
	DELAYED = "~"
)
View Source
const (
	FIFO      queueType = "fifo"
	FIFO_TTL  queueType = "fifottl"
	UTUBE     queueType = "utube"
	UTUBE_TTL queueType = "utubettl"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Cfg

type Cfg struct {
	Temporary   bool // If true, the contents do not persist on disk.
	IfNotExists bool // If true, no error will be returned if the tube already exists.
	Kind        queueType
	Opts
}

type CfgOpts

type CfgOpts struct {
	// Enable replication mode. Must be true if the queue is used in master and
	// replica mode. With replication mode enabled, the potential loss of
	// performance can be ~20% compared to single mode. Default value is false.
	InReplicaset bool
	// Time to release in seconds. The time after which, if there is no active
	// connection in the session, it will be released with all its tasks.
	Ttr time.Duration
}

CfgOpts is argument type for the Queue.Cfg() call.

type Opts

type Opts struct {
	Pri   int           // Task priorities.
	Ttl   time.Duration // Task time to live.
	Ttr   time.Duration // Task time to execute.
	Delay time.Duration // Delayed execution.
	Utube string
}

type Queue

type Queue interface {
	// Set queue settings.
	Cfg(opts CfgOpts) error
	// Exists checks tube for existence.
	// Note: it uses Eval, so user needs 'execute universe' privilege.
	Exists() (bool, error)
	// Identify to a shared session.
	// In the queue the session has a unique UUID and many connections may
	// share one logical session. Also, the consumer can reconnect to the
	// existing session during the ttr time.
	// To get the UUID of the current session, call the Queue.Identify(nil).
	Identify(u *uuid.UUID) (uuid.UUID, error)
	// Create creates new tube with configuration.
	// Note: it uses Eval, so user needs 'execute universe' privilege
	// Note: you'd better not use this function in your application, cause it is
	// administrative task to create or delete queue.
	Create(cfg Cfg) error
	// Drop destroys tube.
	// Note: you'd better not use this function in your application, cause it is
	// administrative task to create or delete queue.
	Drop() error
	// ReleaseAll forcibly returns all taken tasks to a ready state.
	ReleaseAll() error
	// Put creates new task in a tube.
	Put(data interface{}) (*Task, error)
	// PutWithOpts creates new task with options different from tube's defaults.
	PutWithOpts(data interface{}, cfg Opts) (*Task, error)
	// Take takes 'ready' task from a tube and marks it as 'in progress'.
	// Note: if connection has a request Timeout, then 0.9 * connection.Timeout is
	// used as a timeout.
	// If you use a connection timeout and we can not take task from queue in
	// a time equal to the connection timeout after calling `Take` then we
	// return an error.
	Take() (*Task, error)
	// TakeTimeout takes 'ready' task from a tube and marks it as "in progress",
	// or it is timeouted after "timeout" period.
	// Note: if connection has a request Timeout, and conn.Timeout * 0.9 < timeout
	// then timeout = conn.Timeout*0.9.
	// If you use connection timeout and call `TakeTimeout` with parameter
	// greater than the connection timeout then parameter reduced to it.
	TakeTimeout(timeout time.Duration) (*Task, error)
	// TakeTyped takes 'ready' task from a tube and marks it as 'in progress'
	// Note: if connection has a request Timeout, then 0.9 * connection.Timeout is
	// used as a timeout.
	// Data will be unpacked to result.
	TakeTyped(interface{}) (*Task, error)
	// TakeTypedTimeout takes 'ready' task from a tube and marks it as "in progress",
	// or it is timeouted after "timeout" period.
	// Note: if connection has a request Timeout, and conn.Timeout * 0.9 < timeout
	// then timeout = conn.Timeout*0.9.
	// Data will be unpacked to result.
	TakeTypedTimeout(timeout time.Duration, result interface{}) (*Task, error)
	// Peek returns task by its id.
	Peek(taskId uint64) (*Task, error)
	// Kick reverts effect of Task.Bury() for count tasks.
	Kick(count uint64) (uint64, error)
	// Delete the task identified by its id.
	Delete(taskId uint64) error
	// State returns a current queue state.
	State() (State, error)
	// Statistic returns some statistic about queue.
	Statistic() (interface{}, error)
}

Queue is a handle to Tarantool queue's tube.

func New

func New(conn tarantool.Connector, name string) Queue

New creates a queue handle.

type State

type State int
const (
	UnknownState State = iota
	InitState
	StartupState
	RunningState
	EndingState
	WaitingState
)

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task represents a task from Tarantool queue's tube.

func (*Task) Ack

func (t *Task) Ack() error

Ack signals about task completion.

func (*Task) Bury

func (t *Task) Bury() error

Bury signals that task task cannot be executed in the current circumstances, task becomes "buried" - ie neither completed, nor ready, so it could not be deleted or taken by other worker. To revert "burying" call queue.Kick(numberOfBurried).

func (*Task) Data

func (t *Task) Data() interface{}

Data is a getter for task data.

func (*Task) DecodeMsgpack

func (t *Task) DecodeMsgpack(d *msgpack.Decoder) error

func (*Task) Delete

func (t *Task) Delete() error

Delete task from queue.

func (*Task) Id

func (t *Task) Id() uint64

Id is a getter for task id.

func (*Task) IsBuried

func (t *Task) IsBuried() bool

IsBurred returns if task is buried.

func (*Task) IsDelayed

func (t *Task) IsDelayed() bool

IsDelayed returns if task is delayed.

func (*Task) IsDone

func (t *Task) IsDone() bool

IsDone returns if task is done.

func (*Task) IsReady

func (t *Task) IsReady() bool

IsReady returns if task is ready.

func (*Task) IsTaken

func (t *Task) IsTaken() bool

IsTaken returns if task is taken.

func (*Task) Release

func (t *Task) Release() error

Release returns task back in the queue without making it complete. In other words, this worker failed to complete the task, and it, so other worker could try to do that again.

func (*Task) ReleaseCfg

func (t *Task) ReleaseCfg(cfg Opts) error

ReleaseCfg returns task to a queue and changes its configuration.

func (*Task) Status

func (t *Task) Status() string

Status is a getter for task status.

func (*Task) Touch

func (t *Task) Touch(increment time.Duration) error

Touch increases ttr of running task.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL