queue

package
v1.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2022 License: BSD-2-Clause Imports: 4 Imported by: 6

Documentation

Overview

Package with implementation of methods for work with a Tarantool's queue implementations.

Since: 1.5.

See also

* Tarantool queue module https://github.com/tarantool/queue

Example (SimpleQueue)

Example demonstrates an operations like Put and Take with queue.

cfg := queue.Cfg{
	Temporary: false,
	Kind:      queue.FIFO,
	Opts: queue.Opts{
		Ttl: 10 * time.Second,
	},
}
opts := tarantool.Opts{
	Timeout: 2500 * time.Millisecond,
	User:    "test",
	Pass:    "test",
}

conn, err := tarantool.Connect("127.0.0.1:3013", opts)
if err != nil {
	fmt.Printf("error in prepare is %v", err)
	return
}
defer conn.Close()

q := queue.New(conn, "test_queue")
if err := q.Create(cfg); err != nil {
	fmt.Printf("error in queue is %v", err)
	return
}

defer q.Drop()

testData_1 := "test_data_1"
if _, err = q.Put(testData_1); err != nil {
	fmt.Printf("error in put is %v", err)
	return
}

testData_2 := "test_data_2"
task_2, err := q.PutWithOpts(testData_2, queue.Opts{Ttl: 2 * time.Second})
if err != nil {
	fmt.Printf("error in put with config is %v", err)
	return
}

task, err := q.Take()
if err != nil {
	fmt.Printf("error in take with is %v", err)
	return
}
task.Ack()
fmt.Println("data_1: ", task.Data())

err = task_2.Bury()
if err != nil {
	fmt.Printf("error in bury with is %v", err)
	return
}

task, err = q.TakeTimeout(2 * time.Second)
if err != nil {
	fmt.Printf("error in take with timeout")
}
if task != nil {
	fmt.Printf("Task should be nil, but %d", task.Id())
	return
}
Output:

data_1:  test_data_1
Example (SimpleQueueCustomMsgPack)

Example demonstrates an operations like Put and Take with queue and custom MsgPack structure.

Features of the implementation:

- If you use the connection timeout and call TakeWithTimeout with a parameter greater than the connection timeout, the parameter is reduced to it.

- If you use the connection timeout and call Take, we return an error if we cannot take the task out of the queue within the time corresponding to the connection timeout.

// Setup queue module and start Tarantool instance before execution:
// Terminal 1:
// $ make deps
// $ TEST_TNT_LISTEN=3013 tarantool queue/config.lua
//
// Terminal 2:
// $ cd queue
// $ go test -v example_msgpack_test.go
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/tarantool/go-tarantool"
	"github.com/tarantool/go-tarantool/queue"
)

type dummyData struct {
	Dummy bool
}

func (c *dummyData) DecodeMsgpack(d *decoder) error {
	var err error
	if c.Dummy, err = d.DecodeBool(); err != nil {
		return err
	}
	return nil
}

func (c *dummyData) EncodeMsgpack(e *encoder) error {
	return e.EncodeBool(c.Dummy)
}

// Example demonstrates an operations like Put and Take with queue and custom
// MsgPack structure.
//
// Features of the implementation:
//
// - If you use the connection timeout and call TakeWithTimeout with a
// parameter greater than the connection timeout, the parameter is reduced to
// it.
//
// - If you use the connection timeout and call Take, we return an error if we
// cannot take the task out of the queue within the time corresponding to the
// connection timeout.
func main() {
	opts := tarantool.Opts{
		Reconnect:     time.Second,
		Timeout:       2500 * time.Millisecond,
		MaxReconnects: 5,
		User:          "test",
		Pass:          "test",
	}
	conn, err := tarantool.Connect("127.0.0.1:3013", opts)
	if err != nil {
		log.Fatalf("connection: %s", err)
		return
	}
	defer conn.Close()

	cfg := queue.Cfg{
		Temporary:   true,
		IfNotExists: true,
		Kind:        queue.FIFO,
		Opts: queue.Opts{
			Ttl:   10 * time.Second,
			Ttr:   5 * time.Second,
			Delay: 3 * time.Second,
			Pri:   1,
		},
	}

	que := queue.New(conn, "test_queue_msgpack")
	if err = que.Create(cfg); err != nil {
		log.Fatalf("queue create: %s", err)
		return
	}

	// Put data.
	task, err := que.Put("test_data")
	if err != nil {
		log.Fatalf("put task: %s", err)
	}
	fmt.Println("Task id is", task.Id())

	// Take data.
	task, err = que.Take() // Blocking operation.
	if err != nil {
		log.Fatalf("take task: %s", err)
	}
	fmt.Println("Data is", task.Data())
	task.Ack()

	// Take typed example.
	putData := dummyData{}
	// Put data.
	task, err = que.Put(&putData)
	if err != nil {
		log.Fatalf("put typed task: %s", err)
	}
	fmt.Println("Task id is ", task.Id())

	takeData := dummyData{}
	// Take data.
	task, err = que.TakeTyped(&takeData) // Blocking operation.
	if err != nil {
		log.Fatalf("take take typed: %s", err)
	}
	fmt.Println("Data is ", takeData)
	// Same data.
	fmt.Println("Data is ", task.Data())

	task, err = que.Put([]int{1, 2, 3})
	if err != nil {
		log.Fatalf("Put failed: %s", err)
	}
	task.Bury()

	task, err = que.TakeTimeout(2 * time.Second)
	if err != nil {
		log.Fatalf("Take with timeout failed: %s", err)
	}
	if task == nil {
		fmt.Println("Task is nil")
	}

	que.Drop()

}
Output:

Task id is 0
Data is test_data
Task id is  0
Data is  {false}
Data is  &{false}
Task is nil

Index

Examples

Constants

View Source
const (
	READY   = "r"
	TAKEN   = "t"
	DONE    = "-"
	BURIED  = "!"
	DELAYED = "~"
)
View Source
const (
	FIFO      queueType = "fifo"
	FIFO_TTL  queueType = "fifottl"
	UTUBE     queueType = "utube"
	UTUBE_TTL queueType = "utubettl"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Cfg

type Cfg struct {
	Temporary   bool // If true, the contents do not persist on disk.
	IfNotExists bool // If true, no error will be returned if the tube already exists.
	Kind        queueType
	Opts
}

type Opts

type Opts struct {
	Pri   int           // Task priorities.
	Ttl   time.Duration // Task time to live.
	Ttr   time.Duration // Task time to execute.
	Delay time.Duration // Delayed execution.
	Utube string
}

type Queue

type Queue interface {
	// Exists checks tube for existence.
	// Note: it uses Eval, so user needs 'execute universe' privilege.
	Exists() (bool, error)
	// Create creates new tube with configuration.
	// Note: it uses Eval, so user needs 'execute universe' privilege
	// Note: you'd better not use this function in your application, cause it is
	// administrative task to create or delete queue.
	Create(cfg Cfg) error
	// Drop destroys tube.
	// Note: you'd better not use this function in your application, cause it is
	// administrative task to create or delete queue.
	Drop() error
	// Put creates new task in a tube.
	Put(data interface{}) (*Task, error)
	// PutWithOpts creates new task with options different from tube's defaults.
	PutWithOpts(data interface{}, cfg Opts) (*Task, error)
	// Take takes 'ready' task from a tube and marks it as 'in progress'.
	// Note: if connection has a request Timeout, then 0.9 * connection.Timeout is
	// used as a timeout.
	// If you use a connection timeout and we can not take task from queue in
	// a time equal to the connection timeout after calling `Take` then we
	// return an error.
	Take() (*Task, error)
	// TakeTimeout takes 'ready' task from a tube and marks it as "in progress",
	// or it is timeouted after "timeout" period.
	// Note: if connection has a request Timeout, and conn.Timeout * 0.9 < timeout
	// then timeout = conn.Timeout*0.9.
	// If you use connection timeout and call `TakeTimeout` with parameter
	// greater than the connection timeout then parameter reduced to it.
	TakeTimeout(timeout time.Duration) (*Task, error)
	// TakeTyped takes 'ready' task from a tube and marks it as 'in progress'
	// Note: if connection has a request Timeout, then 0.9 * connection.Timeout is
	// used as a timeout.
	// Data will be unpacked to result.
	TakeTyped(interface{}) (*Task, error)
	// TakeTypedTimeout takes 'ready' task from a tube and marks it as "in progress",
	// or it is timeouted after "timeout" period.
	// Note: if connection has a request Timeout, and conn.Timeout * 0.9 < timeout
	// then timeout = conn.Timeout*0.9.
	// Data will be unpacked to result.
	TakeTypedTimeout(timeout time.Duration, result interface{}) (*Task, error)
	// Peek returns task by its id.
	Peek(taskId uint64) (*Task, error)
	// Kick reverts effect of Task.Bury() for count tasks.
	Kick(count uint64) (uint64, error)
	// Delete the task identified by its id.
	Delete(taskId uint64) error
	// Statistic returns some statistic about queue.
	Statistic() (interface{}, error)
}

Queue is a handle to Tarantool queue's tube.

func New

func New(conn tarantool.Connector, name string) Queue

New creates a queue handle.

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task represents a task from Tarantool queue's tube.

func (*Task) Ack

func (t *Task) Ack() error

Ack signals about task completion.

func (*Task) Bury

func (t *Task) Bury() error

Bury signals that task task cannot be executed in the current circumstances, task becomes "buried" - ie neither completed, nor ready, so it could not be deleted or taken by other worker. To revert "burying" call queue.Kick(numberOfBurried).

func (*Task) Data

func (t *Task) Data() interface{}

Data is a getter for task data.

func (*Task) DecodeMsgpack

func (t *Task) DecodeMsgpack(d *decoder) error

func (*Task) Delete

func (t *Task) Delete() error

Delete task from queue.

func (*Task) Id

func (t *Task) Id() uint64

Id is a getter for task id.

func (*Task) IsBuried

func (t *Task) IsBuried() bool

IsBurred returns if task is buried.

func (*Task) IsDelayed

func (t *Task) IsDelayed() bool

IsDelayed returns if task is delayed.

func (*Task) IsDone

func (t *Task) IsDone() bool

IsDone returns if task is done.

func (*Task) IsReady

func (t *Task) IsReady() bool

IsReady returns if task is ready.

func (*Task) IsTaken

func (t *Task) IsTaken() bool

IsTaken returns if task is taken.

func (*Task) Release

func (t *Task) Release() error

Release returns task back in the queue without making it complete. In outher words, this worker failed to complete the task, and it, so other worker could try to do that again.

func (*Task) ReleaseCfg

func (t *Task) ReleaseCfg(cfg Opts) error

ReleaseCfg returns task to a queue and changes its configuration.

func (*Task) Status

func (t *Task) Status() string

Status is a getter for task status.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL