tinyq

package module
v1.0.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2021 License: MIT Imports: 4 Imported by: 0

README

Queue

an implementation of a persistent FIFO queue. with ability to pause/start dequeue.

NOTE: This package is provided "as is" with no guarantee. Use it at your own risk and always test it yourself before using it in a production environment. If you find any issues, please create a new issue.

Install:

go get -u github.com/twiny/tinyq/...

Examples:

see usage examples

Known Issues:

  • all goroutines are asleep - deadlock caused by not closing queue.Dequeue channel.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrQueueIsPaused  = errors.New("queue is already paused")
	ErrQueueIsRunning = errors.New("queue is already running")
	ErrUnknownCommand = errors.New("unknown command")
)

Functions

This section is empty.

Types

type Command

type Command string
const (
	Start Command = "start"
	Pause Command = "pause"
	Stop  Command = "stop"
)

type Message

type Message struct {
	UUID   string        `json:"uuid"`
	Status MessageStatus `json:"status"`
	Body   []byte        `json:"body"`
	Detail string        `json:"detail"`
}

Message

func NewMessage

func NewMessage(key string, data interface{}) (Message, error)

NewMessage

func (Message) Value

func (msg Message) Value(data interface{}) error

Data

type MessageStatus

type MessageStatus string

MessageStatus

const (
	Pending MessageStatus = "pending"
	Success MessageStatus = "success"
	Failed  MessageStatus = "failed"
)

type Messages

type Messages struct {
	Total  uint64    `json:"total"`
	Offset uint64    `json:"offset"`
	Limit  uint64    `json:"limit"`
	Page   uint64    `json:"page"`
	Items  []Message `json:"items"`
}

Messages

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue

func NewQueue

func NewQueue(store Store, autostart bool) *Queue

NewQueue

func (*Queue) Close

func (q *Queue) Close()

Close

func (*Queue) Dequeue

func (q *Queue) Dequeue() <-chan Message

Dequeue

func (*Queue) Enqueue

func (q *Queue) Enqueue(obj interface{}) error

Enqueue

func (*Queue) Errs

func (q *Queue) Errs() <-chan error

Errors

func (*Queue) Exec

func (q *Queue) Exec(cmd Command) error

func (*Queue) List

func (q *Queue) List(typ MessageStatus, offset, limit uint64) (Messages, error)

List

func (*Queue) Notify

func (q *Queue) Notify(uuid string, merr error) error

Notify

func (*Queue) Remove

func (q *Queue) Remove(typ MessageStatus) error

Remove

func (*Queue) Retry

func (q *Queue) Retry() error

Retry

func (*Queue) Statistic

func (q *Queue) Statistic() (Stats, error)

Statistic

type State

type State string

State

type Stats

type Stats struct {
	IsRunning bool   `json:"is_running"`
	Pending   uint64 `json:"pending"`
	Failed    uint64 `json:"failed"`
}

Stats

type Store

type Store interface {
	Enqueue(msg Message) error
	Dequeue() (Message, error)
	IsEmpty() error
	Notify(uuid string, merr error) error
	List(typ MessageStatus, offset, limit uint64) (Messages, error)
	Retry() error
	Remove(typ MessageStatus) error
	Statistic() (Stats, error)
	Close()
}

Store

Directories

Path Synopsis
_examples
http command
stores

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL