cerra

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2025 License: MIT Imports: 8 Imported by: 0

README

Cerra

Test Go Reference Go Report Card

Cerra is a simple task queue library in Go that supports in-memory, Redis, and rabbitmq (amqp) backends.

Features

  • Support In-Memory
  • Support Redis
  • Support rabbitmq (amqp)

Resources:

Requirements

Make sure you have Go installed. Version 1.18 or higher.

Installation

To install cerra, use go get:

go get github.com/zaidfadhil/cerra

Usage

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/zaidfadhil/cerra"
)

func main() {
	// Create a new queue with the in-memory backend
	// Set Max number of workers. 0 for max number of workers (runtime.NumCPU() * 2)
        queue := cerra.NewQueue(cerra.NewInMemoryBackend(), 0)

	// Update max number of workers.
	queue.UpdateMaxWorkerNum(5)

	// Add a handler function
	queue.AddHandler(func(ctx context.Context, task *cerra.Task) error {
		fmt.Printf("Received task with ID %s and payload %v\n", task.ID, task.Payload)
		return nil
	})

	// Start the queue
	queue.Start()

	// Enqueue some tasks
	for i := 0; i < 10; i++ {
		task := cerra.NewTask([]byte(fmt.Sprint(i)))
		err := queue.Enqueue(task) 
                if err != nil {
			fmt.Printf("error enqueueing task: %v\n", err)
		}
		fmt.Println("Enqueue", i)
	}

	// Wait for the tasks to be processed
	time.Sleep(3 * time.Second)

	// Close the queue
	queue.Close()
}
More Backends

to use Redis as a backend for the queues, just replace the in-memory backend with redis

// Create Redis Backend
backend := redis.New(redis.Options{
	Address:  "localhost:6379",
	Password: "redis",
	Stream:   "cerra",
	Group:    "cerra",
	Consumer: "cerra",
})

// Create a new queue
queue := cerra.NewQueue(backend, 0)

and the same for using rabbitmq with amqp

// Create amqp Backend
backend := amqp.New(amqp.Options{
	Address:      "amqp://user:pass@localhost:5672/",
	Queue:        "cerra",
	ExchangeName: "cerra-exchange",
	ExchangeType: "direct",
	RoutingKey:   "cerra-key",
})

// Create a new queue
queue := cerra.NewQueue(backend, 0)

License

Cerra is licensed under the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEmtpyQueue    = errors.New("cerra: empty queue")
	ErrInActiveQueue = errors.New("cerra: inactive queue")
	ErrQueueClosed   = errors.New("cerra: queue closed")
)

Functions

func NewInMemoryBackend

func NewInMemoryBackend() *inMemoryBackend

Types

type Backend

type Backend interface {
	Enqueue(task *Task) error
	Dequeue() (*Task, error)
	Close() error
}

type Queue

type Queue struct {
	sync.Mutex
	Backend Backend
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(backend Backend, workers int) *Queue

func (*Queue) AddHandler

func (q *Queue) AddHandler(handler func(context.Context, *Task) error)

func (*Queue) Close

func (q *Queue) Close()

func (*Queue) Enqueue

func (q *Queue) Enqueue(t *Task) error

func (*Queue) Start

func (q *Queue) Start()

func (*Queue) UpdateMaxWorkerNum

func (q *Queue) UpdateMaxWorkerNum(num int)

type Task

type Task struct {
	ID         string        `json:"id"`
	Payload    []byte        `json:"payload"`
	Timeout    time.Duration `json:"timeout"`
	RetryLimit int           `json:"retry_limit"`
	RetryCount int           `json:"retry_count"`
}

func NewTask

func NewTask(payload []byte) *Task

func NewTaskWithID

func NewTaskWithID(id string, payload []byte) *Task

func (*Task) Encode

func (t *Task) Encode() ([]byte, error)

func (*Task) SetID

func (t *Task) SetID(id string)

func (*Task) SetRetryLimit

func (t *Task) SetRetryLimit(limit int)

func (*Task) SetTimeout

func (t *Task) SetTimeout(timeout time.Duration)

func (*Task) ToMap

func (t *Task) ToMap() map[string]interface{}

Directories

Path Synopsis
_example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL