taskflow

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2025 License: MIT Imports: 9 Imported by: 0

README

tracesight

⏳taskflow

taskflow is a lightweight Go library for running background jobs from a MySQL queue table. It handles:

  • Fetching jobs from the database
  • Locking and retrying failed jobs
  • Creating new jobs programmatically
  • Running custom job logic with optional timeouts
  • Structured logging via user-defined callbacks
  • Graceful shutdown of worker pools

Installation

go get github.com/sky93/taskflow

Database Schema

You must create or already have a card.jobs table. For example:

CREATE TABLE IF NOT EXISTS card.jobs (
  id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
  operation VARCHAR(50) NOT NULL,
  status ENUM('PENDING','IN_PROGRESS','COMPLETED','FAILED') NOT NULL DEFAULT 'PENDING',
  payload TEXT NULL,
  output TEXT NULL,
  locked_by VARCHAR(50) NULL,
  locked_until DATETIME NULL,
  retry_count INT UNSIGNED NOT NULL DEFAULT 0,
  available_at DATETIME NOT NULL DEFAULT '1970-01-01 00:00:00',
  created_at DATETIME NOT NULL,
  updated_at DATETIME NOT NULL
);

Quick Start Example

Below is a complete example demonstrating how to:

  1. Initialize a *sql.DB
  2. Create a Config and pass it to taskflow.New(...)
  3. Register a custom job handler
  4. Create a job
  5. Start workers
  6. Shut down gracefully
package main

import (
    "context"
    "database/sql"
    "fmt"
    "time"

    "github.com/sky93/taskflow"
    _ "github.com/go-sql-driver/mysql"
)

// Our "Hello" job
type HelloJob struct {
    name   string
    output string
}

func NewHelloJob(payload *string) (taskflow.Job, error) {
    if payload == nil || *payload == "" {
        return nil, fmt.Errorf("invalid or empty payload")
    }
    return &HelloJob{name: *payload}, nil
}

func (j *HelloJob) Run() error {
    // Simulate some logic
    j.output = "Hello, " + j.name + "!"
    return nil
}

func (j *HelloJob) GetOutput() *string {
    if j.output == "" {
        return nil
    }
    return &j.output
}

func main() {
    // 1) Connect to the DB
    dsn := "root:password@tcp(127.0.0.1:3306)/card?parseTime=true"
    db, err := sql.Open("mysql", dsn)
    if err != nil {
        panic(err)
    }
    if err := db.Ping(); err != nil {
        panic(err)
    }
    fmt.Println("Connected to database.")

    // 2) Create the taskflow config
    cfg := &taskflow.Config{
        DB:           db,
        RetryCount:   3,
        BackoffTime:  30 * time.Second,
        PollInterval: 5 * time.Second,
        JobTimeout:   10 * time.Second,

        // Optional structured logging
        InfoLog: func(ev taskflow.LogEvent) {
            fmt.Printf("[INFO] %s\n", ev.Message)
            if ev.Err != nil {
                fmt.Printf("       error: %v\n", ev.Err)
            }
        },
        ErrorLog: func(ev taskflow.LogEvent) {
            fmt.Printf("[ERROR] %s\n", ev.Message)
            if ev.Err != nil {
                fmt.Printf("        error: %v\n", ev.Err)
            }
            fmt.Println()
        },
    }

    // 3) Create an instance of TaskFlow
    flow := taskflow.New(cfg)

    // 4) Register our handler (globally for "HELLO")
    taskflow.RegisterHandler("HELLO", taskflow.MakeHandler(NewHelloJob))

    // 5) Create a new job
    jobID, err := flow.CreateJob("HELLO", `"Alice"`)
    if err != nil {
        panic(err)
    }
    fmt.Printf("Created job ID %d\n", jobID)

    // 6) Start workers
    ctx := context.Background()
    flow.StartWorkers(ctx, 2)

    // Let it run for 30 seconds
    time.Sleep(30 * time.Second)

    // 7) Shutdown gracefully
    flow.Shutdown(10 * time.Second)
    fmt.Println("All done.")
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AdvancedJob

type AdvancedJob interface {
	Run(jr JobRecord) (any, error)
	RetryCount() uint           // returns how many times we allow this job to fail
	BackoffTime() time.Duration // how long to wait after a failure
	JobTimeout() time.Duration  // how long this job may run before timing out
}

AdvancedJob Job is the interface each job must implement.

type AdvancedJobConstructor

type AdvancedJobConstructor func() (AdvancedJob, error)

type Config

type Config struct {
	// DB is the user-provided database connection where the card.jobs table is stored.
	DB *sql.DB

	// RetryCount is how many times we allow a job to fail before ignoring it.
	RetryCount uint

	// BackoffTime is how long we wait before letting a failed job become available again.
	BackoffTime time.Duration

	// PollInterval is how frequently workers check for new jobs.
	PollInterval time.Duration

	// JobTimeout is how long we allow an individual job to run before marking it as failed.
	// If zero, there is no enforced timeout.
	JobTimeout time.Duration

	// InfoLog is called for informational or success logs.
	// If nil, defaults to printing to stdout.
	InfoLog func(ev LogEvent)

	// ErrorLog is called for error logs.
	// If nil, defaults to printing to stderr (or stdout).
	ErrorLog func(ev LogEvent)
}

Config holds the settings and resources needed by the queue system.

type JobHandler

type JobHandler func(jr JobRecord) (any, error)

JobHandler is a function that receives the payload string, constructs and runs the job, and returns an optional output string plus an error.

type JobRecord

type JobRecord struct {
	ID        uint64
	Operation Operation
	Status    JobStatus

	Output      any
	LockedBy    *string
	LockedUntil *time.Time
	RetryCount  uint
	AvailableAt *time.Time
	CreatedAt   time.Time
	UpdatedAt   time.Time
	// contains filtered or unexported fields
}

JobRecord corresponds to one row in the card.jobs table.

func (*JobRecord) GetPayload

func (jr *JobRecord) GetPayload(input any) error

type JobStatus

type JobStatus string

JobStatus enumerates the possible states of a job.

const (
	JobPending    JobStatus = "PENDING"
	JobInProgress JobStatus = "IN_PROGRESS"
	JobCompleted  JobStatus = "COMPLETED"
	JobFailed     JobStatus = "FAILED"
)

type LogEvent

type LogEvent struct {
	// A human-readable message about the event.
	Message string

	// The ID of the worker that triggered the log (if any).
	WorkerID string

	// The Job ID, if available.
	JobID *uint64

	// The operation name, if available.
	Operation *string

	// Any error associated with the event.
	Err error

	// How long the job or operation took, if relevant.
	Duration *time.Duration
}

LogEvent captures information about a logging event.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func (*Manager) Shutdown

func (m *Manager) Shutdown(timeout time.Duration)

Shutdown attempts a graceful shutdown: cancel context, wait for workers up to 'timeout'.

type Operation

type Operation string

Operation is a type for your job "name" or "action" (e.g., "ADD_CUSTOMER").

type TaskFlow

type TaskFlow struct {
	// contains filtered or unexported fields
}

func New

func New(cfg Config) *TaskFlow

func (*TaskFlow) CreateJob

func (tf *TaskFlow) CreateJob(ctx context.Context, operation Operation, payload any, executeAt time.Time) (int64, error)

CreateJob inserts a new job into the database

func (*TaskFlow) RegisterAdvancedHandler

func (tf *TaskFlow) RegisterAdvancedHandler(op Operation, constructor func() AdvancedJob)

func (*TaskFlow) RegisterHandler

func (tf *TaskFlow) RegisterHandler(op Operation, handler JobHandler)

RegisterHandler allows end users to associate an Operation with a JobHandler.

func (*TaskFlow) Shutdown

func (tf *TaskFlow) Shutdown(timeout time.Duration)

Shutdown gracefully stops all workers, waiting up to `timeout` for them to exit.

func (*TaskFlow) StartWorkers

func (tf *TaskFlow) StartWorkers(ctx context.Context, count int)

StartWorkers spawns `count` workers to process jobs using the current config. It returns immediately, but you can call Shutdown(...) later to stop them.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func (*Worker) Run

func (w *Worker) Run(ctx context.Context)

Run keeps polling the DB for jobs until context is canceled.

type WorkerStatus

type WorkerStatus int
const (
	WorkerIdle WorkerStatus = iota
	WorkerBusy
	WorkerFailing
	WorkerExecFailed
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL