dqueue

package module
v0.0.0-...-6bbb11d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 6, 2024 License: MIT Imports: 4 Imported by: 1

README

D(EFERRED) Queue

Go PkgGoDev Go Report Card

It was a test task i finished in 2 hours in 2017 year, i polished code a little, created example with contexts and added 100% unit tests coverage in 2023.

What does it do?

With this package we can make deferred queue of tasks to be executed, like execute this in 3 minutes, execute that in 15 seconds from now and so on. Then, we can consume this tasks by concurrent goroutines and they (tasks) will be provided to consumers in proper order, like first task will be that to be executed in 15 seconds from now.

Basic usage

Make queue handler:

handler := dqueue.New() // import "github.com/vodolaz095/dqueue"

// payload can be anything - number, string, buffer, struct...
something := "task"

// Create tasks to be executed in future
handler.ExecuteAt(something, time.Now().Add(time.Minute))
handler.ExecuteAfter(something, time.Minute)

// Extract task ready to be executed
task, ready := handler.Get()
if ready { // task is ready
    fmt.Printf("Task %s is ready to be executed at %s",
		task.Payload.(string), 
		task.ExecuteAt.Format(time.Kitchen),
	)
} else {
	fmt.Println("No tasks are ready to be executed")
}
// Count tasks left
tasksInQueue := handler.Len()

// Extract all tasks, so, we can, for example, save all delivery queue  before closing application
tasks:= handler.Dump()

// Prune queue:
handler.Prune()

Concurrent consumers example

See full example at example.go


handler := dqueue.New() // import "github.com/vodolaz095/dqueue"

// Publish tasks
something := "task" // payload can be anything - number, string, buffer, struct...
handler.ExecuteAt(something, time.Now().Add(time.Minute))
handler.ExecuteAfter(something, time.Minute)

// make global context to be canceled when application is stopping
wg := sync.WaitGroup{}
mainCtx, mainCancel := context.WithTimeout(context.Background(), 3*time.Second)
defer mainCancel()

// Start concurrent consumers
wg := sync.WaitGroup{}
for j := 0; j < 10; j++ {
    wg.Add(1)
    go func(workerNumber int, initialCtx context.Context) {
        ctx, cancel := context.WithCancel(initialCtx)
        defer cancel()
        ticker := time.NewTicker(time.Millisecond)
        for {
            select {
            case t := <-ticker.C:
                task, ready := handler.Get()
                if ready { // task is ready
                    err := ProcessTask(task)
                    if err != nil { // aka, requeue message to be delivered in 1 minute
                      handler.ExecuteAfter(something, time.Minute)
                    }
                }
                break
            case <-ctx.Done():
                fmt.Printf("Closing worker %v, there are %v tasks in queue\n", workerNumber, handler.Len())
                wg.Done()
                ticker.Stop()
                return
            }
        }
    }(j, mainCtx)
}
wg.Wait()

// See tasks left, so they can be restored somehow when application is restarted
tasks := handler.Dump()


Documentation

Overview

Package dqueue implements thread safe deferred queue for tasks to be executed in future after some delay

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler is deferred queue handler used to store and retrieve (in time) tasks to be executed

func New

func New() Handler

New creates new deferred queue Handler.

func (*Handler) Dump

func (h *Handler) Dump() (ret []Task)

Dump returns copy of contents of the queue in sorted manner, leaving queue intact.

func (*Handler) ExecuteAfter

func (h *Handler) ExecuteAfter(payload any, after time.Duration) (ok bool)

ExecuteAfter schedules task for execution on after time.Duration provided, it returns true, if task is accepted.

func (*Handler) ExecuteAt

func (h *Handler) ExecuteAt(payload any, when time.Time) (ok bool)

ExecuteAt schedules task for execution on time desired, it returns true, if task is accepted.

func (*Handler) Get

func (h *Handler) Get() (task Task, ready bool)

Get extracts one of tasks from queue in thread safe manner. If second argument is true, it means task is ready to be executed and is removed from queue. If there are no ready tasks in queue, first argument is zero Task struct and second one - false.

func (*Handler) Len

func (h *Handler) Len() int

Len is thread save function to see how much tasks are in queue.

func (*Handler) Prune

func (h *Handler) Prune()

Prune resets queue.

type Task

type Task struct {
	ExecuteAt time.Time
	Payload   any
}

Task is element describing things to be done in future

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL