gorelay

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2026 License: MIT Imports: 11 Imported by: 0

README

<<<<<<< HEAD

GoRelay - Durable Task Queue for Go

Go Report Card Go Reference

GoRelay is a simple, durable background job library for Go. It turns any function into a crash-resistant, retryable task with zero infrastructure required.

Features

  • Simple API - Just Register, Enqueue, and Schedule
  • Durable - Tasks survive crashes and restarts
  • Zero Infrastructure - SQLite by default, no external dependencies
  • Dashboard - Beautiful web UI for monitoring
  • Retries - Automatic with exponential backoff
  • Scheduling - Run tasks at any time in the future
  • Multiple Backends - SQLite, PostgreSQL, Redis support

Quick Start

package main

import (
    "fmt"
    "github.com/amitstephen-dev/gorelay"
)

type EmailPayload struct {
    To   string
    Body string
}

func SendEmail(payload interface{}) error {
    p := payload.(*EmailPayload)
    fmt.Printf("Sending email to %s: %s\n", p.To, p.Body)
    return nil
}

func main() {
    r := gorelay.New()
    
    r.Register("email.send", SendEmail, &EmailPayload{})
    r.EnableDashboard(":8080")
    r.Start()
    
    r.Enqueue("email.send", &EmailPayload{
        To:   "user@example.com",
        Body: "Hello!",
    })
    
    select {}
}package main

import "fmt"

// hasDuplicate returns true and a slice of duplicates if any values repeat.
func hasDuplicate(nums []int) (bool, []int) {
    seen := make(map[int]struct{})
    // Map to track duplicates uniquely so we don't report the same duplicate twice
    duplicatesMap := make(map[int]struct{})
    
    for _, num := range nums {
        if _, exists := seen[num]; exists {
            duplicatesMap[num] = struct{}{}
        } else {
            seen[num] = struct{}{}
        }
    }
    
    // If no duplicates were found, return early
    if len(duplicatesMap) == 0 {
        return false, []int{}
    }
    
    // Convert the duplicates map into a slice
    var duplicates []int
    for num := range duplicatesMap {
        duplicates = append(duplicates, num)
    }
    
    return true, duplicates
}

func main() {
    nums := []int{1, 2, 3, 1, 2}
    hasDup, values := hasDuplicate(nums)
    
    fmt.Println("Contains duplicates:", hasDup)
    fmt.Println("Duplicate values:", values)
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// Core errors
	ErrHandlerNotFound     = errors.New("handler not found for topic")
	ErrTaskNotFound        = errors.New("task not found")
	ErrTaskAlreadyExists   = errors.New("task already exists")
	ErrTaskAlreadyComplete = errors.New("task already completed")

	// Storage errors
	ErrStorageFull      = errors.New("storage is full")
	ErrStorageTimeout   = errors.New("storage operation timeout")
	ErrConnectionFailed = errors.New("connection to storage failed")

	// Worker errors
	ErrWorkerPoolFull = errors.New("worker pool is full")
	ErrWorkerTimeout  = errors.New("worker execution timeout")

	// Queue errors
	ErrQueueFull  = errors.New("queue is full")
	ErrQueueEmpty = errors.New("queue is empty")

	// Validation errors
	ErrInvalidTopic        = errors.New("invalid topic name")
	ErrInvalidPayload      = errors.New("invalid payload")
	ErrInvalidScheduleTime = errors.New("invalid schedule time")

	// Configuration errors
	ErrInvalidConfig      = errors.New("invalid configuration")
	ErrInvalidStorageType = errors.New("invalid storage type")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	// Worker configuration
	WorkerCount     int `json:"worker_count"`
	WorkerBatchSize int `json:"worker_batch_size"`

	// Retry configuration
	MaxRetries    int           `json:"max_retries"`
	RetryBackoff  time.Duration `json:"retry_backoff"`
	RetryMaxDelay time.Duration `json:"retry_max_delay"`

	// Storage configuration
	StorageType string `json:"storage_type"`
	StorageDSN  string `json:"storage_dsn"`

	// Dashboard configuration
	DashboardAddr string `json:"dashboard_addr"`
	DashboardAuth bool   `json:"dashboard_auth"`
	DashboardUser string `json:"dashboard_user"`
	DashboardPass string `json:"dashboard_pass"`

	// Task configuration
	TaskRetention     time.Duration `json:"task_retention"`
	FailedRetention   time.Duration `json:"failed_retention"`
	VisibilityTimeout time.Duration `json:"visibility_timeout"`

	// Performance
	BatchSize      int   `json:"batch_size"`
	RingBufferSize int   `json:"ring_buffer_size"`
	MemoryLimit    int64 `json:"memory_limit"`
}

func DefaultConfig

func DefaultConfig() *Config

func (*Config) Validate

func (c *Config) Validate() error

type Option

type Option func(*Config)

func WithDashboard

func WithDashboard(addr string) Option

func WithMaxRetries

func WithMaxRetries(retries int) Option

func WithStorage

func WithStorage(dsn string) Option

func WithWorkerCount

func WithWorkerCount(count int) Option

type Relay

type Relay struct {
	// contains filtered or unexported fields
}

func New

func New(opts ...Option) *Relay

New creates a new Relay instance

func (*Relay) EnableDashboard

func (r *Relay) EnableDashboard(addr string)

EnableDashboard enables the dashboard on the specified address

func (*Relay) Enqueue

func (r *Relay) Enqueue(topic string, payload interface{}) (string, error)

Enqueue adds a task to the queue

func (*Relay) Register

func (r *Relay) Register(topic string, fn registry.HandlerFunc, payloadType interface{}) error

Register registers a handler for a topic

func (*Relay) Schedule

func (r *Relay) Schedule(executeAt time.Time, topic string, payload interface{}) (string, error)

Schedule adds a task to be executed at a specific time

func (*Relay) Shutdown

func (r *Relay) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the relay

func (*Relay) Start

func (r *Relay) Start() error

Start starts the relay workers and scheduler

type RelayError

type RelayError struct {
	Op  string
	Err error
}

func (*RelayError) Error

func (e *RelayError) Error() string

func (*RelayError) Unwrap

func (e *RelayError) Unwrap() error

Directories

Path Synopsis
examples
basic command
priority command
webapp command
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL