zsched

package module
v1.2.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2025 License: MIT Imports: 17 Imported by: 0

README

zsched

Go Reference

A lightweight, opinionated mix of a queue system and task orchestrator built in and for Go, using TimescaleDB and AMQP broker. Designed for simplicity and performance.

[!CAUTION] Until v2 is released, the SDK is not considered stable and breaking changes might happen.

Web UI

✨ Features

  • Queue System: Built on LavinMQ (AMQP 0.9.1) for reliable message delivery
  • Cron Scheduling: Dispatch tasks at specific times, with parameters
  • Retry Logic: Configurable retry mechanisms for handling task failures
  • Concurrency Control: Fine-grained control over task execution concurrency
  • Persistent Storage: TimescaleDB integration for storing tasks and execution logs
  • REST API: Complete HTTP API for task dispatch and log retrieval
  • Web Dashboard: Clean, lightweight UI for task management and monitoring
  • Hooks: Execute actions before and after task executions, including Prometheus metrics

🚀 Quick Start

Installation
go get github.com/vlourme/zsched
Basic Usage
  1. Define a task:
var helloTask = zsched.NewTask(
	"hello",
	func(ctx *zsched.Context[UserCtx]) error {
        time.Sleep(2 * time.Second) // Simulate a long running task

		ctx.Infoln("Hello", ctx.GetStr("name"))

		return nil
	},
	zsched.WithConcurrency(10),
	zsched.WithMaxRetries(3),
	zsched.WithSchedule("* * * * * *", map[string]any{"name": "John"}),
)

Note: Schedules might not be respected by the engine, it will dispatch the task and executed when workers are available. This is useful for tasks that might dispatch children tasks.

  1. Create and configure the engine:
func main() {
    userCtx := UserCtx{} // User context, can be any type

	engine, err := zsched.NewBuilder(userCtx).
		WithRabbitMQBroker(os.Getenv("RABBITMQ_URL")).
		WithTimescaleDBStorage(os.Getenv("POSTGRES_URL")).
		WithHooks(&hooks.PrometheusHook{}, &hooks.TaskLoggerHook{}). // Optional hooks
		WithAPI(":8080").
		Build()
	if err != nil {
		panic(err)
	}

	engine.Register(helloTask)
	engine.Start()
}

Check out the example for a complete example.

  1. Run your application:
go run main.go
  1. Dispatch tasks via API:
curl -X POST http://localhost:8080/tasks/hello-world \
  -H "Content-Type: application/json" \
  -d '{"name": "John"}'

📦 Hooks

Hooks are used to execute actions before and after task executions.

Available Hooks
  • PrometheusHook: Exposes Prometheus metrics for task execution.
  • TaskLoggerHook: Logs task executions to the database, required when using the API and Web UI.

🐳 Docker support

You will have to dockerize your tasks and the engine, we advice to follow the Dockerfile (example/Dockerfile) and docker-compose.yml files to get started.

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	StatusPending stateStatus = "pending"
	StatusRunning stateStatus = "running"
	StatusSuccess stateStatus = "success"
	StatusFailed  stateStatus = "failed"
)

Variables

This section is empty.

Functions

func GetTask

func GetTask[T any](c *gin.Context)

GetTask returns a task by name

func GetTasks

func GetTasks[T any](c *gin.Context)

GetTasks returns all tasks

func NewBuilder

func NewBuilder[T any](userContext T) *builder[T]

NewBuilder creates a new builder for the engine

func NewTaskLogger added in v1.2.12

func NewTaskLogger[T any](storage storage.Storage) (*taskLogger[T], error)

func PostTask

func PostTask[T any](c *gin.Context)

PostTask dispatches a task

func WithCollector

func WithCollector(collector TaskCollectorAction, bufferSize ...int) func(*taskConfig)

WithCollector sets the collector for the task with optional buffer size

func WithConcurrency

func WithConcurrency(concurrency int) func(*taskConfig)

WithConcurrency sets the concurrency for the task

func WithDefaultParameters added in v1.2.5

func WithDefaultParameters(parameters map[string]any) func(*taskConfig)

WithDefaultParameters sets the default parameters for the task

func WithMaxRetries

func WithMaxRetries(maxRetries int) func(*taskConfig)

WithMaxRetries sets the max retries for the task, default is 3. -1 means infinite retries.

func WithSchedule

func WithSchedule(schedule string, parameters map[string]any) func(*taskConfig)

WithSchedule adds a schedule to the task. The schedule is a cron expression with seconds precision (e.g. "0 0 * * * *").

func WithTags added in v1.2.9

func WithTags(tags ...string) func(*taskConfig)

WithTags sets the tags for the task

Types

type AnyTask

type AnyTask interface {
	Name() string
}

AnyTask is the interface that represents any task, no matter the inner type of the user context

type Collector

type Collector struct {
	// contains filtered or unexported fields
}

func (*Collector) Close

func (c *Collector) Close()

Close closes the collector

func (*Collector) Consume

func (c *Collector) Consume(fn func(value any))

Consume consumes values from the collector

func (*Collector) ConsumeWithCtx

func (c *Collector) ConsumeWithCtx(ctx context.Context, fn func(value any))

Consume consumes values from the collector

func (*Collector) Pull

func (c *Collector) Pull() any

Pull pulls a value from the collector

func (*Collector) Push

func (c *Collector) Push(value any)

Push pushes a value to the collector

type Context

type Context[T any] struct {
	logger.Logger
	State
	// contains filtered or unexported fields
}

Context is a temporary object into the task execution context It allow logging, outputting values and accessing the user context

func (*Context[T]) Execute

func (c *Context[T]) Execute(params ...map[string]any) error

Execute starts the same task with given parameters

func (*Context[T]) Push

func (c *Context[T]) Push(value any)

Push pushes a value to the collector

func (*Context[T]) UserContext

func (c *Context[T]) UserContext() T

UserContext returns the user context of the task

type Engine

type Engine[T any] struct {
	// contains filtered or unexported fields
}

Engine is the main engine for Zsched scheduler

func (*Engine[T]) Close

func (e *Engine[T]) Close() error

Close closes the engine

func (*Engine[T]) Register

func (e *Engine[T]) Register(task ...*Task[T])

Register registers new tasks to the scheduler

func (*Engine[T]) Start

func (e *Engine[T]) Start() error

Start starts the engine, this function is blocking until the engine is stopped

type Hook

type Hook interface {
	// Initialize is called when the engine is initialized
	Initialize(storage storage.Storage) error

	// BeforeExecute is called before the task is executed
	BeforeExecute(task AnyTask, state *State) error

	// AfterExecute is called after the task is executed
	AfterExecute(task AnyTask, state *State) error
}

Hook is the interface for the hooks

type State

type State struct {
	// id is the id of the state
	ID uuid.UUID `json:"id"`

	// taskID is the id of the task
	TaskID uuid.UUID `json:"task_id"`

	// parentID is the id of the parent task
	ParentID uuid.UUID `json:"parent_id,omitempty"`

	// parameters is the parameters for the task
	Parameters map[string]any `json:"parameters"`

	// InitializedAt is the time the state was initialized
	InitializedAt time.Time `json:"initialized_at"`

	// StartedAt is the time the task started executing
	StartedAt time.Time `json:"started_at"`

	// Iteration is the current iteration of the task
	Iterations int `json:"iterations"`

	Status stateStatus `json:"status"`

	// LastError is the last error of the task
	LastError string `json:"last_error"`
}

State is the State of the task

func (*State) EncodeParameters

func (s *State) EncodeParameters() (string, error)

EncodeParameters encodes the parameters to a JSON string

func (*State) GetAny added in v1.2.5

func (s *State) GetAny(name string, defaultValue ...any) any

GetAny returns the any value of the parameter by name

func (*State) GetBool

func (s *State) GetBool(name string, defaultValue ...bool) bool

GetBool returns the bool value of the parameter by name

func (*State) GetFloat

func (s *State) GetFloat(name string, defaultValue ...float64) float64

GetFloat returns the float value of the parameter by name

func (*State) GetInt

func (s *State) GetInt(name string, defaultValue ...int) int

GetInt returns the int value of the parameter by name

func (*State) GetStr

func (s *State) GetStr(name string, defaultValue ...string) string

GetStr returns the string value of the parameter by name

func (*State) Serialize

func (s *State) Serialize() ([]byte, error)

Serialize serializes the state to a byte array

type Task

type Task[T any] struct {

	// Name of the task, should be unique without any spaces or special characters
	TaskName string `json:"name"`

	// Action to be performed by the task
	Action taskAction[T] `json:"-"`
	// contains filtered or unexported fields
}

func NewTask

func NewTask[T any](name string, action taskAction[T], opts ...func(*taskConfig)) *Task[T]

NewTask creates a new task

func (*Task[T]) Collector

func (t *Task[T]) Collector() *Collector

Collector returns the collector for the task

func (*Task[T]) Execute

func (t *Task[T]) Execute(params ...map[string]any) error

Execute executes one or multiple executions of the task

func (*Task[T]) Name

func (t *Task[T]) Name() string

formatName formats the name of the task to a valid RabbitMQ queue name TODO: Move to a separate package

type TaskCollectorAction

type TaskCollectorAction func(c *Collector, userContext any)

Directories

Path Synopsis
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL