gorder

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2024 License: MIT Imports: 10 Imported by: 1

README

gorder

Go Version GoDoc Build GoReport

gorder logo

Gorder is a highly efficient, in-memory task worker with strict ordering capabilities. It is designed for parallel task execution using a worker pool and supports safe concurrent usage. With features such as task retries, backoff mechanisms, and task queue management, Gorder ensures reliable and ordered task processing. It can be used for scheduling tasks in a particular order, eg. for async database operations with a strict order of execution.

Installation

To use Gorder in your Go project, install it via Go modules:

go get github.com/yourusername/gorder

Features

  • Worker Pool: Utilizes multiple workers to handle tasks in parallel.
  • Strict Ordering: Ensures tasks with the same key are processed in the order they are received.
  • Retry Mechanism: Configurable number of retries with exponential backoff.
  • Graceful Shutdown: Options to wait for task completion before shutdown.
  • Logging Interface: Customizable logging for better monitoring and debugging.
Usage
q := gorder.New[string](ctx, gorder.Options{
    Workers: 100,
    Logger: slog.Default(),
})

q.Push("queue1", "task1", func(ctx context.Context) error {
    time.Sleep(1 * time.Second)
    println("1")
    return nil
})
q.Push("queue1", "task2", func(ctx context.Context) error {
    time.Sleep(200*time.Millisecond)
    println("2")
    return nil
})
q.Push("queue1", "task3", func(ctx context.Context) error {
    println("3")
    return nil
})

// Output:
// 1
// 2
// 3
  • Tasks in different queues can be executed in parallel. Queue key passed to Push() as first argument is used to identify the queue.
  • You can use any comparable type as queue key. For example, string, int, or any struct with string fields.
  • It will retry failed tasks until the maximum number of retries is reached. If the number of retries is exceeded, the task will be thrown. You can configure the maximum number of retries in Options.
Configuration

Gorder provides several configuration options via the Options struct:

  • Workers: Number of workers (default: number of CPUs).
  • FlushInterval: Maximum time between flushing tasks to workers (default: 100ms).
  • UnusedThreshold: Time to keep idle queues before deletion (default: 5 minutes).
  • Retries: Maximum number of retries for a failed task (default: 10).
  • RetryBackoffMinTimeout: Minimum backoff time between retries (default: 100ms).
  • RetryBackoffMaxTimeout: Maximum backoff time (default: 10s).
  • NoRetries: Disable retries completely (default: false).
  • DoNotThrowOnShutdown: Wait for task completion on shutdown (default: false).
  • Logger: Custom logger for error, info, and debug logs.
Handling Errors

Gorder provides feedback on task execution through logging. Ensure your logger is properly set up to capture logs at different severity levels for effective error handling and debugging.

Cons
  • No Persistence: Tasks are handled in-memory, risking loss on crash without durable storage integration.

License

This project is licensed under the terms of the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrWaitForAck is returned in Next() when the previous task is waiting for ack
	ErrWaitForAck = errors.New("previous task iswaiting for ack")
	// ErrQueueNotFound is returned in Next() when the queue is not found
	ErrQueueNotFound = errors.New("queue not found")
	// ErrQueueIsEmpty is returned in Next() when the queue is empty
	ErrQueueIsEmpty = errors.New("queue is empty")
	// ErrNothingToAck is returned in Ack() when there is no tasks to ack
	ErrNothingToAck = errors.New("nothing to ack")
)

Functions

This section is empty.

Types

type Gorder

type Gorder[T comparable] struct {
	// contains filtered or unexported fields
}

Gorder is a in-memory task worker with strict ordering. It handles tasks in parallel using worker pool. It is safe for concurrent use.

func New

func New[T comparable](ctx context.Context, rawOpts ...Options) *Gorder[T]

New creates a new Gorder. It starts workers and flusher goroutines.

func (*Gorder[T]) BrokenQueues

func (q *Gorder[T]) BrokenQueues() map[T]int

BrokenQueues returns number of retries for each broken queue.

func (*Gorder[T]) Push

func (q *Gorder[T]) Push(queueKey T, name string, f TaskFunc)

Push adds task to the end of queue. It is non blocking and safe for concurrent use. Tasks with different queue keys are executed in parallel, not sequentially. Tasks with the same queue key are executed in order. Don't use it after Shutdown.

func (*Gorder[T]) Shutdown

func (q *Gorder[T]) Shutdown(ctx context.Context) error

Shutdown stops the Gorder and waits for all tasks to complete. By default it throws broken tasks without retries. To change this behavior, use DoNotThrowOnShutdown option.

func (*Gorder[T]) Stat

func (q *Gorder[T]) Stat() map[T]QueueStat

Stat returns number of tasks for each queue and some additional info.

type Logger

type Logger interface {
	Debug(string, ...any)
	Info(string, ...any)
	Warn(string, ...any)
	Error(string, ...any)
}

Logger is an interface for logging. You can use slog.

type Options

type Options struct {
	// Workers is the number of workers to handle tasks functions.
	// The default value is the number of CPUs.
	Workers int

	// FlushInterval is the max time of sleeping between flushing tasks to workers.
	// The default value is 100ms.
	FlushInterval time.Duration

	// UnusedThreshold is the max time between pushing tasks to queue before it gets deleted.
	// The default value is 5 minutes.
	UnusedThreshold time.Duration

	// Retries is the max number of retries for failed task before throwing it.
	// Use -1 to make retries unlimited.
	// The default value is 10.
	Retries int

	// RetryBackoffMinTimeout is the min timeout between retries.
	// The default value is 100ms.
	RetryBackoffMinTimeout time.Duration

	// RetryBackoffMaxTimeout is the max timeout between retries.
	// The default value is 10s.
	RetryBackoffMaxTimeout time.Duration

	// NoRetries is a flag that indicates that we should not retry failed tasks.
	// The default value is false.
	NoRetries bool

	// DoNotThrowOnShutdown is a flag that indicates that we should wait for all tasks to complete before shutting down.
	// The default value is false.
	DoNotThrowOnShutdown bool

	// Logger is used to log messages in case of errors.
	// The default value is nil, so there is no logging.
	Logger Logger
}

Options contains options for Gorder. Every field is optional.

type Queue

type Queue[T comparable] struct {
	// contains filtered or unexported fields
}

Queue provides in memory task queue with requirements

  1. Strict task ordering;
  2. Guarantees that tasks won't get lost with Ack mechanics.

It is safe for concurrent use.

func NewQueue

func NewQueue[T comparable]() *Queue[T]

NewQueue creates a new queue.

func (*Queue[T]) Ack

func (q *Queue[T]) Ack(queueKey T) error

Ack removes task from queue, it can be called only after Next() or it will return error.

func (*Queue[T]) AllQueues

func (q *Queue[T]) AllQueues() []T

AllQueues returns list of all queue keys.

func (*Queue[T]) DeleteUnusedQueues

func (q *Queue[T]) DeleteUnusedQueues(activeThreshold time.Duration) (deleted []T, struggling []T)

DeleteUnusedQueues removes empty queues with last active duration > activeThreshold; return keys of deleted queues. It also returns keys of queues with active tasks, but with last active duration > activeThreshold

func (*Queue[T]) Next

func (q *Queue[T]) Next(queueKey T) (Task, error)

Next returns a next task from the queue according to the order key. You should call Ack() after handling this task to remove it from queue. Two subsequent calls of Next() without Ack() will return error.

func (*Queue[T]) Push

func (q *Queue[T]) Push(queueKey T, task Task)

Push adds task to the end of queue.

func (*Queue[T]) Stat

func (q *Queue[T]) Stat() map[T]QueueStat

Stat returns number of tasks for each queue

func (*Queue[T]) WaitingQueues

func (q *Queue[T]) WaitingQueues() []T

WaitingQueues returns list of all queue keys with waiting tasks.

type QueueStat

type QueueStat struct {
	Length int

	IsWaitForAck bool
	// contains filtered or unexported fields
}

QueueStat contains information about the queue.

type Task

type Task struct {
	Name string
	Foo  TaskFunc
}

Task contains a name for information and a function to be executed.

func NewTask

func NewTask(name string, f TaskFunc) Task

NewTask creates a new task.

type TaskFunc

type TaskFunc func(ctx context.Context) error

TaskFunc is a general function to execute as a task.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL