taskflow

package module
v0.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2025 License: MIT Imports: 10 Imported by: 0

README

tracesight

⏳taskflow

Go Reference Go Report Card

Taskflow is a lightweight Go library for running background jobs out of a MySQL queue table. It handles:

  • Fetching jobs from the database
  • Locking and retrying failed jobs
  • Creating new jobs programmatically
  • Running custom job logic with optional timeouts
  • Structured logging via user-defined callbacks
  • Graceful shutdown of worker pools

Table of Contents

  1. Installation
  2. Database Schema
  3. Quick Start Example
  4. Contributing
  5. License

Installation

go get github.com/sky93/taskflow

Database Schema

Your database should contain a jobs table. For example:

CREATE TABLE IF NOT EXISTS jobs (
  id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
  operation VARCHAR(50) NOT NULL,
  status ENUM('PENDING','IN_PROGRESS','COMPLETED','FAILED') NOT NULL DEFAULT 'PENDING',
  payload JSON NULL,
  output JSON NULL,
  error_output JSON NULL,
  locked_by VARCHAR(50) NULL,
  locked_until DATETIME NULL,
  retry_count INT UNSIGNED NOT NULL DEFAULT 0,
  available_at DATETIME NOT NULL DEFAULT '1970-01-01 00:00:00',
  created_at DATETIME NOT NULL,
  updated_at DATETIME NOT NULL
);

Quick Start Example

Below is a complete, minimal example showing:

  1. Connecting to the database
  2. Creating a taskflow.Config and a new TaskFlow
  3. Registering a custom job handler
  4. Starting workers
  5. Creating a job
  6. Shutting down gracefully
package main

import (
    "context"
    "database/sql"
    "fmt"
    "time"

    _ "github.com/go-sql-driver/mysql"
    "github.com/sky93/taskflow"
)

// MyPayload is the shape of the data we expect in the job payload.
type MyPayload struct {
    Greeting string
}

// HelloHandler processes jobs of type "HELLO".
func HelloHandler(jr taskflow.JobRecord) (any, error) {
    var payload MyPayload
    if err := jr.GetPayload(&payload); err != nil {
        return nil, err
    }

    // Here we just print the greeting; real logic can be anything.
    fmt.Println("Received greeting:", payload.Greeting)
    return nil, nil
}

func main() {
    ctx := context.Background()

    // 1) Connect to the DB
    dsn := "root:password@tcp(127.0.0.1:3306)/myDbName?parseTime=true"
    db, err := sql.Open("mysql", dsn)
    if err != nil {
        panic(err)
    }
    if err := db.Ping(); err != nil {
        panic(err)
    }
    fmt.Println("Connected to database.")

    // 2) Create the taskflow config
    cfg := taskflow.Config{
        DB:           db,
        RetryCount:   3,
        BackoffTime:  30 * time.Second,
        PollInterval: 5 * time.Second,
        JobTimeout:   10 * time.Second,

        // Optional logging
        InfoLog: func(ev taskflow.LogEvent) {
            fmt.Printf("[INFO] %s\n", ev.Message)
        },
        ErrorLog: func(ev taskflow.LogEvent) {
            fmt.Printf("[ERROR] %s\n", ev.Message)
        },
    }

    // 3) Create an instance of TaskFlow
    flow := taskflow.New(cfg)

    // 4) Register our "HELLO" handler
    flow.RegisterHandler("HELLO", HelloHandler)

    // 5) Start workers (2 concurrent workers)
    flow.StartWorkers(ctx, 2)

    // Create a new "HELLO" job
    jobID, err := flow.CreateJob(ctx, "HELLO", MyPayload{Greeting: "Hello from TaskFlow!"}, time.Now())
    if err != nil {
        panic(err)
    }
    fmt.Printf("Created job ID %d\n", jobID)

    // Let it run for a few seconds
    time.Sleep(5 * time.Second)

    // 6) Shutdown gracefully
    flow.Shutdown(10 * time.Second)
    fmt.Println("All done.")
}

Contributing

Contributions are welcome! Please follow these steps:

  1. Fork the repository
  2. Create a new branch for your feature or fix
  3. Commit your changes, and add tests if possible
  4. Submit a pull request and provide a clear description of your changes

License

This project is licensed under the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AdvancedJob

type AdvancedJob interface {
	Run(jr JobRecord) (any, error)
	RetryCount() uint           // returns how many times we allow this job to fail
	BackoffTime() time.Duration // how long to wait after a failure
	JobTimeout() time.Duration  // how long this job may run before timing out
}

AdvancedJob Job is the interface each job must implement.

type AdvancedJobConstructor

type AdvancedJobConstructor func() (AdvancedJob, error)

type Config

type Config struct {
	// DB is the user-provided database connection where the jobs table is stored.
	DB *sql.DB

	// DbName is name of the database.
	DbName string

	// RetryCount is how many times we allow a job to fail before ignoring it.
	RetryCount uint

	// BackoffTime is how long we wait before letting a failed job become available again.
	BackoffTime time.Duration

	// PollInterval is how frequently workers check for new jobs.
	PollInterval time.Duration

	// JobTimeout is how long we allow an individual job to run before marking it as failed.
	// If zero, there is no enforced timeout.
	JobTimeout time.Duration

	// InfoLog is called for informational or success logs.
	// If nil, defaults to printing to stdout.
	InfoLog func(ev LogEvent)

	// ErrorLog is called for error logs.
	// If nil, defaults to printing to stderr (or stdout).
	ErrorLog func(ev LogEvent)
}

Config holds the settings and resources needed by the queue system.

type JobHandler

type JobHandler func(jr JobRecord) (any, error)

JobHandler is a function that receives the payload string, constructs and runs the job, and returns an optional output string plus an error.

type JobRecord

type JobRecord struct {
	ID        uint64
	Operation Operation
	Status    JobStatus

	Output      any
	LockedBy    *string
	LockedUntil *time.Time
	RetryCount  uint
	AvailableAt *time.Time
	CreatedAt   time.Time
	UpdatedAt   time.Time
	// contains filtered or unexported fields
}

JobRecord corresponds to one row in the jobs table.

func (*JobRecord) GetPayload

func (jr *JobRecord) GetPayload(input any) error

type JobStatus

type JobStatus string

JobStatus enumerates the possible states of a job.

const (
	JobPending    JobStatus = "PENDING"
	JobInProgress JobStatus = "IN_PROGRESS"
	JobCompleted  JobStatus = "COMPLETED"
	JobFailed     JobStatus = "FAILED"
)

type LogEvent

type LogEvent struct {
	// A human-readable message about the event.
	Message string

	// The ID of the worker that triggered the log (if any).
	WorkerID string

	// The Job ID, if available.
	JobID *uint64

	// The operation name, if available.
	Operation *string

	// Any error associated with the event.
	Err error

	// How long the job or operation took, if relevant.
	Duration *time.Duration
}

LogEvent captures information about a logging event.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func (*Manager) Shutdown

func (m *Manager) Shutdown(timeout time.Duration)

Shutdown attempts a graceful shutdown: cancel context, wait for workers up to 'timeout'.

type Operation

type Operation string

Operation is a type for your job "name" or "action" (e.g., "ADD_CUSTOMER").

type TaskFlow

type TaskFlow struct {
	// contains filtered or unexported fields
}

func New

func New(cfg Config) *TaskFlow

func (*TaskFlow) CreateJob

func (tf *TaskFlow) CreateJob(ctx context.Context, operation Operation, payload any, executeAt time.Time) (int64, error)

CreateJob inserts a new job into the database

func (*TaskFlow) RegisterAdvancedHandler

func (tf *TaskFlow) RegisterAdvancedHandler(op Operation, constructor func() AdvancedJob)

func (*TaskFlow) RegisterHandler

func (tf *TaskFlow) RegisterHandler(op Operation, handler JobHandler)

RegisterHandler allows end users to associate an Operation with a JobHandler.

func (*TaskFlow) Shutdown

func (tf *TaskFlow) Shutdown(timeout time.Duration)

Shutdown gracefully stops all workers, waiting up to `timeout` for them to exit.

func (*TaskFlow) StartWorkers

func (tf *TaskFlow) StartWorkers(ctx context.Context, count int)

StartWorkers spawns `count` workers to process jobs using the current config. It returns immediately, but you can call Shutdown(...) later to stop them.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func (*Worker) Run

func (w *Worker) Run(ctx context.Context)

Run keeps polling the DB for jobs until context is canceled.

type WorkerStatus

type WorkerStatus int
const (
	WorkerIdle WorkerStatus = iota
	WorkerBusy
	WorkerFailing
	WorkerExecFailed
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL