taskstore

package module
v1.18.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2025 License: AGPL-3.0 Imports: 21 Imported by: 1

README

Task Store Open in Gitpod

Tests Status Go Report Card PkgGoDev

TaskStore is a robust, asynchronous task processing package that provides both durable task queues and scheduled task execution. It's designed to offload time-consuming or resource-intensive operations from your main application and automate recurring tasks.

By deferring tasks to the background, you can improve application responsiveness and prevent performance bottlenecks.

TaskStore leverages a durable database (SQLite, MySQL, or PostgreSQL) to ensure reliable persistence and fault tolerance.

License

This project is licensed under the GNU Affero General Public License v3.0 (AGPL-3.0). You can find a copy of the license at https://www.gnu.org/licenses/agpl-3.0.en.html

For commercial use, please use my contact page to obtain a commercial license.

Installation

go get github.com/dracory/taskstore

Queue Features

Atomic Task Claiming

Tasks are claimed atomically using database transactions with SELECT FOR UPDATE, preventing race conditions where multiple workers might process the same task simultaneously.

Concurrency Control
  • Default limit: 10 concurrent tasks per queue
  • Configurable: Set via MaxConcurrency in NewStoreOptions
  • Semaphore-based: Automatic backpressure when limit is reached
store, err := taskstore.NewStore(taskstore.NewStoreOptions{
    DB:                      databaseInstance,
    TaskDefinitionTableName: "task_definition",
    TaskQueueTableName:      "task_queue",
    MaxConcurrency:          20, // Allow 20 concurrent tasks
})
Graceful Shutdown

[!WARNING] The methods TaskQueueStop() and TaskQueueStopByName() are deprecated. Use the new TaskQueueRunner pattern instead. See Runners documentation.

  • TaskQueueStop() – Stop default queue and wait for all tasks to complete
  • TaskQueueStopByName(queueName) – Stop specific queue and wait for all tasks
  • Ensures no task goroutines are abandoned
// Deprecated approach - still works but not recommended
store.TaskQueueRunConcurrent(ctx, "emails", 10, 1)
store.TaskQueueStopByName("emails")
Error Handling

Configure custom error handlers for monitoring and alerting:

store.SetErrorHandler(func(queueName, taskID string, err error) {
    log.Printf("[ERROR] Queue: %s, Task: %s, Error: %v", queueName, taskID, err)
    // Send to monitoring system
    metrics.RecordTaskError(queueName, taskID)
})
Context Propagation (Optional)

Task handlers can optionally implement TaskHandlerWithContext to support cancellation:

func (h *EmailHandler) HandleWithContext(ctx context.Context) bool {
    select {
    case <-ctx.Done():
        h.LogInfo("Task cancelled")
        return false
    case <-time.After(5 * time.Second):
        // Send email...
        h.LogSuccess("Email sent")
        return true
    }
}

Note: Existing handlers without HandleWithContext continue to work - this is fully backward compatible.

Setup

myTaskStore, err := taskstore.NewStore(taskstore.NewStoreOptions{
    DB:                 databaseInstance,
    TaskDefinitionTableName: "my_task_definition",
    TaskQueueTableName:      "my_task_queue",
    ScheduleTableName:      "my_schedules",
    AutomigrateEnabled:      true,
    DebugEnabled:           false,
})
if err != nil {
    // handle error
}

Documentation

Task Definitions

The task definition specifies a unit of work to be completed. It can be performed immediately, or enqueued to the database and deferred for asynchronous processing, ensuring your application remains responsive.

Each task definition is uniquely identified by an alias and provides a human-readable title and description.

Each task definition is uniquely identified by an alias that allows the task to be easily called. A human-readable title and description give the user more information on the task definition.

To define a task definition, implement the TaskDefinitionHandlerInterface and provide a Handle method that contains the task's logic.

Optionally, extend the TaskDefinitionHandlerBase struct for additional features like parameter retrieval.

Task definitions can be executed directly from the command line (CLI) or as part of a background task queue.

The tasks placed in the task queue will be processed at a specified interval.

package tasks

func NewHelloWorldTask() *HelloWorldTask {
    return &HelloWorldTask{}
}

type HelloWorldTask struct {
    taskstore.TaskDefinitionHandlerBase
}

var _ taskstore.TaskDefinitionHandlerInterface = (*HelloWorldTask)(nil) // verify it extends the task handler interface

func (task *HelloWorldTask) Alias() string {
    return "HelloWorldTask"
}

func (task *HelloWorldTask) Title() string {
    return "Hello World"
}

func (task *HelloWorldTask) Description() string {
    return "Say hello world"
}

// Enqueue. Optional shortcut to quickly add this task to the task queue
func (task *HelloWorldTask) Enqueue(name string) (taskstore.TaskQueueInterface, error) {
    return myTaskStore.TaskDefinitionEnqueueByAlias(taskstore.DefaultQueueName, task.Alias(), map[string]any{
        "name": name,
    })
}

func (task *HelloWorldTask) Handle() bool {
    name := task.GetParam("name")

    // Optional to allow adding the task to the task queue manually. Useful while in development
    if !task.HasQueuedTask() && task.GetParam("enqueue") == "yes" {
        _, err := task.Enqueue(name)

        if err != nil {
            task.LogError("Error enqueuing task: " + err.Error())
        } else {
            task.LogSuccess("Task enqueued.")
        }
        
        return true
    }

    if name != "" {
        task.LogInfo("Hello " + name + "!")
    } else {
        task.LogInfo("Hello World!")
    }

    return true
}

Registering Task Definitions to the TaskStore

Registering the task definition to the task store will persist it in the database.

ctx := context.Background()
if err := myTaskStore.TaskHandlerAdd(ctx, NewHelloWorldTask(), true); err != nil {
    // handle error
}

Executing Task Definitions in the Terminal

To add the option to execute tasks from the terminal add the following to your main method

myTaskStore.TaskDefinitionExecuteCli(args[1], args[1:])

Example:

go run . HelloWorldTask --name="Tom Jones"

Adding the Task to the Task Queue

To add a task to the background task queue

_, err := myTaskStore.TaskDefinitionEnqueueByAlias(
    taskstore.DefaultQueueName,
    "HelloWorldTask",
    map[string]any{
        "name": name,
    },
)

Or if you have defined an Enqueue method as in the example task above.

NewHelloWorldTask().Enqueue("Tom Jones")

Starting the Task Queue

To start processing tasks, create and start a Task Queue Runner:

ctx := context.Background()

// Create a task queue runner for the default queue
queueRunner := taskstore.NewTaskQueueRunner(myTaskStore, taskstore.TaskQueueRunnerOptions{
    IntervalSeconds: 10,        // Check for tasks every 10 seconds
    UnstuckMinutes:  1,         // Reclaim stuck tasks after 1 minute
    QueueName:       "default", // Process the default queue
    Logger:          log.Default(),
})

// Start the runner
queueRunner.Start(ctx)

// Later: gracefully stop the runner
defer queueRunner.Stop()

For scheduled tasks, use the Schedule Runner:

// Create a schedule runner
scheduleRunner := taskstore.NewScheduleRunner(myTaskStore, taskstore.ScheduleRunnerOptions{
    IntervalSeconds: 60, // Check schedules every 60 seconds
    Logger:          log.Default(),
})

// Initialize next run times for existing schedules
if err := scheduleRunner.SetInitialRuns(ctx); err != nil {
    log.Printf("Error setting initial runs: %v", err)
}

// Start the runner
scheduleRunner.Start(ctx)

// Later: gracefully stop the runner
defer scheduleRunner.Stop()

See Runners documentation for more details.

Store Methods

  • AutoMigrate() error – automigrates (creates) the task definition and task queue tables
  • EnableDebug(debug bool) StoreInterface – enables / disables the debug option

Task Definition Methods

  • TaskDefinitionCreate(ctx context.Context, task TaskDefinitionInterface) error – creates a task definition
  • TaskDefinitionFindByAlias(ctx context.Context, alias string) (TaskDefinitionInterface, error) – finds a task definition by alias
  • TaskDefinitionFindByID(ctx context.Context, id string) (TaskDefinitionInterface, error) – finds a task definition by ID
  • TaskDefinitionList(ctx context.Context, options TaskDefinitionQueryInterface) ([]TaskDefinitionInterface, error) – lists task definitions
  • TaskDefinitionUpdate(ctx context.Context, task TaskDefinitionInterface) error – updates a task definition
  • TaskDefinitionSoftDelete(ctx context.Context, task TaskDefinitionInterface) error – soft deletes a task definition

Task Queue Methods

  • TaskQueueCreate(ctx context.Context, queue TaskQueueInterface) error – creates a new queued task
  • TaskQueueDeleteByID(ctx context.Context, id string) error – deletes a queued task by ID
  • TaskQueueFindByID(ctx context.Context, id string) (TaskQueueInterface, error) – finds a queued task by ID
  • TaskQueueSoftDeleteByID(ctx context.Context, id string) error – soft deletes a queued task by ID (populates the deleted_at field)
  • TaskQueueList(ctx context.Context, options TaskQueueQueryInterface) ([]TaskQueueInterface, error) – lists the queued tasks
  • TaskQueueUpdate(ctx context.Context, queue TaskQueueInterface) error – updates a queued task
Deprecated Methods

[!WARNING] The following methods are deprecated and will be removed in a future version. Use the new TaskQueueRunner pattern instead. See Runners documentation.

  • TaskQueueRunDefault(ctx context.Context, processSeconds int, unstuckMinutes int)Deprecated: starts the default queue worker
  • TaskQueueRunSerial(ctx context.Context, queueName string, processSeconds int, unstuckMinutes int)Deprecated: starts a serial worker for a named queue
  • TaskQueueRunConcurrent(ctx context.Context, queueName string, processSeconds int, unstuckMinutes int)Deprecated: starts a concurrent worker for a named queue
  • TaskQueueStop() / TaskQueueStopByName(queueName string)Deprecated: stops queue workers and waits for in‑flight tasks. Use TaskQueueRunner.Stop() instead.

Frequently Asked Questions (FAQ)

1. What is TaskStore used for?

TaskStore is a versatile tool for offloading time-consuming or resource-intensive tasks from your main application. By deferring these tasks to the background, you can improve application responsiveness and prevent performance bottlenecks.

It's ideal for tasks like data processing, sending emails, generating reports, or performing batch operations.

2. How does TaskStore work?

TaskStore creates a durable queue in your database (SQLite, MySQL, or PostgreSQL) to store tasks. These tasks are then processed asynchronously by a background worker. You can define tasks using a simple interface and schedule them to be executed at specific intervals or on demand.

3. What are the benefits of using TaskStore?
  • Improved application performance: Offload time-consuming tasks to prevent performance bottlenecks.
  • Asynchronous processing: Execute tasks independently of your main application flow.
  • Reliability: Ensure tasks are completed even if your application crashes.
  • Flexibility: Schedule tasks to run at specific intervals or on demand.
  • Ease of use: Define tasks using a simple interface and integrate with your existing application.
4. How do I create a task definition in TaskStore?

To create a task definition, you'll need to implement the TaskDefinitionHandlerInterface and provide a Handle method that contains the task's logic. You can also extend the TaskHandlerBase struct for additional features.

5. How do I schedule a task to run in the background?

Use TaskDefinitionEnqueueByAlias to add a task to the background task queue, and start a TaskQueueRunner to process tasks. For recurring schedules, create a Schedule entity and use ScheduleRunner to automatically enqueue tasks based on recurrence rules.

6. Can I monitor the status of tasks?

Yes, TaskStore provides methods to list tasks, check their status, and view task details.

7. How does TaskStore handle task failures?

If a task fails, it can be retried automatically or marked as failed. You can customize the retry logic to suit your specific needs.

8. Is TaskStore suitable for large-scale applications?

Yes, TaskStore is designed to handle large volumes of tasks. It can be scaled horizontally by adding more worker processes.

9. Does TaskStore support different database systems?

Yes, TaskStore supports SQLite, MySQL, and PostgreSQL.

10. Can I customize TaskStore to fit my specific needs?

Yes, TaskStore is highly customizable. You can extend and modify the code to suit your requirements.

Similar (in Alphabetical Order)

Documentation

Index

Constants

View Source
const ASC = "asc"
View Source
const COLUMN_ALIAS = "alias"
View Source
const COLUMN_ATTEMPTS = "attempts"
View Source
const COLUMN_COMPLETED_AT = "completed_at"
View Source
const COLUMN_CREATED_AT = "created_at"
View Source
const COLUMN_DESCRIPTION = "description"
View Source
const COLUMN_DETAILS = "details"
View Source
const COLUMN_END_AT = "end_at"
View Source
const COLUMN_EXECUTION_COUNT = "execution_count"
View Source
const COLUMN_ID = "id"
View Source
const COLUMN_IS_RECURRING = "is_recurring"
View Source
const COLUMN_LAST_RUN_AT = "last_run_at"
View Source
const COLUMN_MAX_EXECUTION_COUNT = "max_execution_count"
View Source
const COLUMN_MEMO = "memo"
View Source
const COLUMN_METAS = "metas"
View Source
const COLUMN_NAME = "name"
View Source
const COLUMN_NEXT_RUN_AT = "next_run_at"
View Source
const COLUMN_OUTPUT = "output"
View Source
const COLUMN_PARAMETERS = "parameters"
View Source
const COLUMN_QUEUE_NAME = "queue_name"
View Source
const COLUMN_RECURRENCE_RULE = "recurrence_rule"
View Source
const COLUMN_SOFT_DELETED_AT = "soft_deleted_at"
View Source
const COLUMN_STARTED_AT = "started_at"
View Source
const COLUMN_START_AT = "start_at"
View Source
const COLUMN_STATUS = "status"
View Source
const COLUMN_TASK_DEFINITION_ID = "task_definition_id"
View Source
const COLUMN_TASK_ID = "task_id"
View Source
const COLUMN_TITLE = "title"
View Source
const COLUMN_UPDATED_AT = "updated_at"
View Source
const DESC = "desc"
View Source
const DefaultQueueName = "default"
View Source
const TaskDefinitionStatusActive = "active"
View Source
const TaskDefinitionStatusCanceled = "canceled"
View Source
const TaskQueueStatusCanceled = "canceled"
View Source
const TaskQueueStatusDeleted = "deleted"
View Source
const TaskQueueStatusFailed = "failed"
View Source
const TaskQueueStatusPaused = "paused"
View Source
const TaskQueueStatusQueued = "queued"
View Source
const TaskQueueStatusRunning = "running"
View Source
const TaskQueueStatusSuccess = "success"

Variables

This section is empty.

Functions

func NextRunAt

func NextRunAt(rule RecurrenceRuleInterface, now *carbon.Carbon) (*carbon.Carbon, error)

NextRunAt calculates the next time a recurrence rule should run, given the current time. It respects the rule's start and end times, interval and frequency, and returns an error if no further runs exist.

Types

type DayOfWeek

type DayOfWeek string

DayOfWeek represents a day of the week used in weekly recurrence rules.

const (
	DayOfWeekMonday    DayOfWeek = "monday"
	DayOfWeekTuesday   DayOfWeek = "tuesday"
	DayOfWeekWednesday DayOfWeek = "wednesday"
	DayOfWeekThursday  DayOfWeek = "thursday"
	DayOfWeekFriday    DayOfWeek = "friday"
	DayOfWeekSaturday  DayOfWeek = "saturday"
	DayOfWeekSunday    DayOfWeek = "sunday"
)

type Frequency

type Frequency string

Define a string type alias Frequency represents how often a schedule recurs (daily, weekly, etc.). It is a string-based alias compatible with rrule-go frequencies.

const (
	FrequencyNone     Frequency = "none"
	FrequencySecondly Frequency = "secondly"
	FrequencyMinutely Frequency = "minutely"
	FrequencyHourly   Frequency = "hourly"
	FrequencyDaily    Frequency = "daily"
	FrequencyWeekly   Frequency = "weekly"
	FrequencyMonthly  Frequency = "monthly"
	FrequencyYearly   Frequency = "yearly"
)

Define the constants as strings

type MonthOfYear

type MonthOfYear string

MonthOfYear represents a month used in yearly or monthly recurrence rules.

const (
	MonthOfYearJanuary   MonthOfYear = "JANUARY"
	MonthOfYearFebruary  MonthOfYear = "FEBRUARY"
	MonthOfYearMarch     MonthOfYear = "MARCH"
	MonthOfYearApril     MonthOfYear = "APRIL"
	MonthOfYearMay       MonthOfYear = "MAY"
	MonthOfYearJune      MonthOfYear = "JUNE"
	MonthOfYearJuly      MonthOfYear = "JULY"
	MonthOfYearAugust    MonthOfYear = "AUGUST"
	MonthOfYearSeptember MonthOfYear = "SEPTEMBER"
	MonthOfYearOctober   MonthOfYear = "OCTOBER"
	MonthOfYearNovember  MonthOfYear = "NOVEMBER"
	MonthOfYearDecember  MonthOfYear = "DECEMBER"
)

type NewStoreOptions

type NewStoreOptions struct {
	TaskDefinitionTableName string
	TaskQueueTableName      string
	ScheduleTableName       string
	DB                      *sql.DB
	DbDriverName            string
	AutomigrateEnabled      bool
	DebugEnabled            bool
	MaxConcurrency          int                                       // Max concurrent tasks (default: 10, 0 = unlimited)
	ErrorHandler            func(queueName, taskID string, err error) // Optional error callback
}

NewStoreOptions define the options for creating a new task store

type RecurrenceRuleInterface added in v1.10.0

type RecurrenceRuleInterface interface {
	// GetFrequency returns how often the rule recurs (e.g. daily, weekly).
	GetFrequency() Frequency

	// SetFrequency sets how often the rule recurs.
	SetFrequency(Frequency) RecurrenceRuleInterface

	// GetStartsAt returns the UTC datetime when the rule becomes active.
	GetStartsAt() string

	// SetStartsAt sets the UTC datetime when the rule becomes active.
	SetStartsAt(dateTimeUTC string) RecurrenceRuleInterface

	// GetEndsAt returns the UTC datetime when the rule stops producing occurrences.
	GetEndsAt() string

	// SetEndsAt sets the UTC datetime when the rule stops producing occurrences.
	SetEndsAt(dateTimeUTC string) RecurrenceRuleInterface

	// GetInterval returns the step interval between occurrences (e.g. every N days).
	GetInterval() int

	// SetInterval sets the step interval between occurrences.
	SetInterval(int) RecurrenceRuleInterface

	// GetDaysOfWeek returns the days of the week the rule applies to (for weekly rules).
	GetDaysOfWeek() []DayOfWeek

	// SetDaysOfWeek sets the days of the week the rule applies to (for weekly rules).
	SetDaysOfWeek([]DayOfWeek) RecurrenceRuleInterface

	// GetDaysOfMonth returns the days of the month the rule applies to.
	GetDaysOfMonth() []int

	// SetDaysOfMonth sets the days of the month the rule applies to.
	SetDaysOfMonth([]int) RecurrenceRuleInterface

	// GetMonthsOfYear returns the months of the year the rule applies to.
	GetMonthsOfYear() []MonthOfYear

	// SetMonthsOfYear sets the months of the year the rule applies to.
	SetMonthsOfYear([]MonthOfYear) RecurrenceRuleInterface
}

RecurrenceRuleInterface defines the contract for recurrence rules used by schedules. It exposes frequency, start/end times, interval, and optional day/month filters.

func NewRecurrenceRule

func NewRecurrenceRule() RecurrenceRuleInterface

NewRecurrenceRule creates a new recurrence rule with default values. By default, it has no end time (MAX_DATETIME) and an interval of 1.

type ScheduleInterface

type ScheduleInterface interface {

	// GetID the unique identifier of the schedule
	GetID() string

	// SetID sets the unique identifier of the schedule
	SetID(string) ScheduleInterface

	// Name the name of the schedule
	GetName() string

	// SetName sets the name of the schedule
	SetName(string) ScheduleInterface

	// Description the description of the schedule
	GetDescription() string

	// SetDescription sets the description of the schedule
	SetDescription(string) ScheduleInterface

	// Status the status of the schedule
	// Valid values are "draft" (default), "active", "inactive"
	GetStatus() string

	// SetStatus sets the status of the schedule
	SetStatus(string) ScheduleInterface

	// RecurrenceRule the recurrence rule that defines when the schedule should run
	GetRecurrenceRule() RecurrenceRuleInterface

	// SetRecurrenceRule sets the recurrence rule that defines when the schedule should run
	SetRecurrenceRule(RecurrenceRuleInterface) ScheduleInterface

	// QueueName the name of the queue that this schedule is associated with
	GetQueueName() string

	// SetQueueName sets the name of the queue that this schedule is associated with
	SetQueueName(string) ScheduleInterface

	// TaskDefinitionID the unique identifier of the task definition
	// that this schedule is associated with
	GetTaskDefinitionID() string

	// SetTaskDefinitionID sets the unique identifier of the task definition
	// that this schedule is associated with
	SetTaskDefinitionID(string) ScheduleInterface

	// TaskParameters the parameters to be passed to the task definition
	// when it is executed
	GetTaskParameters() map[string]any

	// SetTaskParameters sets the parameters to be passed to the task definition
	// when it is executed
	SetTaskParameters(map[string]any) ScheduleInterface

	// StartAt the start date and time of the schedule
	GetStartAt() string

	// SetStartAt sets the start date and time of the schedule
	// If startAt is not set, the schedule will start at the current time
	SetStartAt(string) ScheduleInterface

	// EndAt the end date and time of the schedule
	// The default value is the maximum datetime (never expires)
	GetEndAt() string

	// SetEndAt sets the end date and time of the schedule
	SetEndAt(string) ScheduleInterface

	// ExecutionCount the number of times the schedule has been executed
	GetExecutionCount() int

	// SetExecutionCount sets the number of times the schedule has been executed
	SetExecutionCount(int) ScheduleInterface

	// MaxExecutionCount the maximum number of times the schedule is allowed to be executed
	// The default value is int max (no limit)
	// To execute only once, set maxExecutionCount to 1
	GetMaxExecutionCount() int

	// SetMaxExecutionCount sets the maximum number of times the schedule is allowed to be executed
	SetMaxExecutionCount(int) ScheduleInterface

	// LastRunAt the last date and time the schedule was executed
	GetLastRunAt() string

	// SetLastRunAt sets the last date and time the schedule was executed
	SetLastRunAt(string) ScheduleInterface

	// NextRunAt the next date and time the schedule is scheduled to run
	GetNextRunAt() string

	// SetNextRunAt sets the next date and time the schedule is scheduled to run
	SetNextRunAt(string) ScheduleInterface

	// CreatedAt the date and time the schedule was created
	GetCreatedAt() string

	// SetCreatedAt sets the date and time the schedule was created
	SetCreatedAt(string) ScheduleInterface

	// UpdatedAt the date and time the schedule was last updated
	GetUpdatedAt() string

	// SetUpdatedAt sets the date and time the schedule was last updated
	SetUpdatedAt(string) ScheduleInterface

	// SoftDeletedAt the date and time the schedule was soft deleted
	// The default value is max datetime (not soft deleted, 9999-12-31 23:59:59)
	// To soft delete a schedule, set softDeletedAt to the current time
	// To unsoft delete a schedule, set softDeletedAt to max datetime
	// A soft deleted schedule is when its in the past
	GetSoftDeletedAt() string

	// SetSoftDeletedAt sets the date and time the schedule was soft deleted
	SetSoftDeletedAt(string) ScheduleInterface

	// HasReachedEndDate returns true if the schedule has reached its end date
	HasReachedEndDate() bool

	// HasReachedMaxExecutions returns true if the schedule has reached its maximum number of executions
	HasReachedMaxExecutions() bool

	// GetNextOccurrence returns the next occurrence of the schedule
	// if invalid recurrence rule, returns error
	GetNextOccurrence() (string, error)

	// IncrementExecutionCount increments the execution count of the schedule by one
	IncrementExecutionCount() ScheduleInterface

	// UpdateNextRunAt calculates the next run at of the schedule and updates it
	UpdateNextRunAt() ScheduleInterface

	// UpdateLastRunAt updates the last run at of the schedule with current time
	UpdateLastRunAt() ScheduleInterface

	// IsDue returns true if the schedule is due to run
	IsDue() bool
}

ScheduleInterface defines the contract for a schedule, including its identity, metadata, recurrence rule, timing fields, execution limits, soft-delete semantics, and helper methods for evaluating schedule state.

func NewSchedule added in v1.15.0

func NewSchedule() ScheduleInterface

NewSchedule creates a new schedule with default values and a new recurrence rule.

type ScheduleQuery added in v1.15.0

type ScheduleQuery struct {
	// contains filtered or unexported fields
}

func (*ScheduleQuery) ID added in v1.15.0

func (q *ScheduleQuery) ID() string

func (*ScheduleQuery) Limit added in v1.15.0

func (q *ScheduleQuery) Limit() int

func (*ScheduleQuery) Name added in v1.15.0

func (q *ScheduleQuery) Name() string

func (*ScheduleQuery) Offset added in v1.15.0

func (q *ScheduleQuery) Offset() int

func (*ScheduleQuery) QueueName added in v1.15.0

func (q *ScheduleQuery) QueueName() string

func (*ScheduleQuery) SetID added in v1.15.0

func (*ScheduleQuery) SetLimit added in v1.15.0

func (q *ScheduleQuery) SetLimit(limit int) ScheduleQueryInterface

func (*ScheduleQuery) SetName added in v1.15.0

func (q *ScheduleQuery) SetName(name string) ScheduleQueryInterface

func (*ScheduleQuery) SetOffset added in v1.15.0

func (q *ScheduleQuery) SetOffset(offset int) ScheduleQueryInterface

func (*ScheduleQuery) SetQueueName added in v1.15.0

func (q *ScheduleQuery) SetQueueName(queueName string) ScheduleQueryInterface

func (*ScheduleQuery) SetStatus added in v1.15.0

func (q *ScheduleQuery) SetStatus(status string) ScheduleQueryInterface

func (*ScheduleQuery) SetTaskDefinitionID added in v1.15.0

func (q *ScheduleQuery) SetTaskDefinitionID(taskDefinitionID string) ScheduleQueryInterface

func (*ScheduleQuery) Status added in v1.15.0

func (q *ScheduleQuery) Status() string

func (*ScheduleQuery) TaskDefinitionID added in v1.15.0

func (q *ScheduleQuery) TaskDefinitionID() string

type ScheduleQueryInterface added in v1.15.0

type ScheduleQueryInterface interface {
	// ID the unique identifier of the schedule to filter by
	ID() string

	// SetID sets the unique identifier of the schedule to filter by
	SetID(string) ScheduleQueryInterface

	// Name the name of the schedule to filter by
	Name() string

	// SetName sets the name of the schedule to filter by
	SetName(string) ScheduleQueryInterface

	// Status the status of the schedule to filter by
	Status() string

	// SetStatus sets the status of the schedule to filter by
	SetStatus(string) ScheduleQueryInterface

	// QueueName the name of the queue that schedules are associated with to filter by
	QueueName() string

	// SetQueueName sets the name of the queue that schedules are associated with to filter by
	SetQueueName(string) ScheduleQueryInterface

	// TaskDefinitionID the unique identifier of the task definition that schedules are associated with to filter by
	TaskDefinitionID() string

	// SetTaskDefinitionID sets the unique identifier of the task definition that schedules are associated with to filter by
	SetTaskDefinitionID(string) ScheduleQueryInterface

	// Limit the maximum number of schedules to return
	Limit() int

	// SetLimit sets the maximum number of schedules to return
	SetLimit(int) ScheduleQueryInterface

	// Offset the number of schedules to skip before starting to return results
	Offset() int

	// SetOffset sets the number of schedules to skip before starting to return results
	SetOffset(int) ScheduleQueryInterface
}

ScheduleQueryInterface defines the query parameters used to filter and paginate schedules when listing or counting them.

func NewScheduleQuery added in v1.15.0

func NewScheduleQuery() ScheduleQueryInterface

type ScheduleRunnerInterface added in v1.15.0

type ScheduleRunnerInterface interface {
	Start(ctx context.Context)
	Stop()
	IsRunning() bool
	RunOnce(ctx context.Context) error
	SetInitialRuns(ctx context.Context) error
}

func NewScheduleRunner added in v1.15.0

func NewScheduleRunner(store StoreInterface, opts ScheduleRunnerOptions) ScheduleRunnerInterface

type ScheduleRunnerOptions added in v1.15.0

type ScheduleRunnerOptions struct {
	IntervalSeconds int
	Logger          *log.Logger
}

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store defines a session store

func NewStore

func NewStore(opts NewStoreOptions) (*Store, error)

NewStore creates a new task store

func (*Store) AutoMigrate

func (st *Store) AutoMigrate() error

AutoMigrate migrates the tables

func (*Store) EnableDebug

func (st *Store) EnableDebug(debugEnabled bool) StoreInterface

EnableDebug - enables the debug option

func (*Store) QueuedTaskForceFail

func (store *Store) QueuedTaskForceFail(ctx context.Context, queuedTask TaskQueueInterface, waitMinutes int) error

func (*Store) QueuedTaskProcessWithContext added in v1.10.0

func (store *Store) QueuedTaskProcessWithContext(ctx context.Context, queuedTask TaskQueueInterface) (bool, error)

QueuedTaskProcessWithContext processes a queued task with context support. It checks if the handler implements TaskHandlerWithContext and uses that if available, otherwise falls back to the standard Handle() method for backward compatibility.

func (*Store) ScheduleCount added in v1.15.0

func (store *Store) ScheduleCount(ctx context.Context, options ScheduleQueryInterface) (int64, error)

ScheduleCount returns the number of schedules that match the given query options.

func (*Store) ScheduleCreate added in v1.15.0

func (store *Store) ScheduleCreate(ctx context.Context, schedule ScheduleInterface) error

ScheduleCreate creates a new schedule record in the store.

func (*Store) ScheduleDelete added in v1.15.0

func (store *Store) ScheduleDelete(ctx context.Context, schedule ScheduleInterface) error

ScheduleDelete deletes the given schedule from the store.

func (*Store) ScheduleDeleteByID added in v1.15.0

func (store *Store) ScheduleDeleteByID(ctx context.Context, id string) error

ScheduleDeleteByID deletes the schedule with the given ID from the store.

func (*Store) ScheduleFindByID added in v1.15.0

func (store *Store) ScheduleFindByID(ctx context.Context, id string) (ScheduleInterface, error)

ScheduleFindByID finds a schedule by its ID.

func (*Store) ScheduleList added in v1.15.0

func (store *Store) ScheduleList(ctx context.Context, options ScheduleQueryInterface) ([]ScheduleInterface, error)

ScheduleList returns a list of schedules that match the given query options.

func (*Store) ScheduleRun added in v1.15.0

func (store *Store) ScheduleRun(ctx context.Context) error

ScheduleRun scans for due schedules and enqueues their associated tasks.

func (*Store) ScheduleSoftDelete added in v1.15.0

func (store *Store) ScheduleSoftDelete(ctx context.Context, schedule ScheduleInterface) error

ScheduleSoftDelete marks the given schedule as soft-deleted.

func (*Store) ScheduleSoftDeleteByID added in v1.15.0

func (store *Store) ScheduleSoftDeleteByID(ctx context.Context, id string) error

ScheduleSoftDeleteByID marks the schedule with the given ID as soft-deleted.

func (*Store) ScheduleUpdate added in v1.15.0

func (store *Store) ScheduleUpdate(ctx context.Context, schedule ScheduleInterface) error

ScheduleUpdate updates an existing schedule record in the store.

func (*Store) SetErrorHandler added in v1.10.0

func (st *Store) SetErrorHandler(handler func(queueName, taskID string, err error)) StoreInterface

SetErrorHandler - sets a custom error handler for queue processing errors

func (*Store) SqlCreateScheduleTable added in v1.15.0

func (st *Store) SqlCreateScheduleTable() string

SqlCreateScheduleTable - creates the schedule table

func (*Store) SqlCreateTaskDefinitionTable added in v1.10.0

func (st *Store) SqlCreateTaskDefinitionTable() string

SqlCreateTaskDefinitionTable - creates the task definition table

func (*Store) SqlCreateTaskQueueTable added in v1.10.0

func (st *Store) SqlCreateTaskQueueTable() string

SqlCreateTaskQueueTable - creates the task queue table

func (*Store) TaskDefinitionCount added in v1.10.0

func (store *Store) TaskDefinitionCount(ctx context.Context, options TaskDefinitionQueryInterface) (int64, error)

func (*Store) TaskDefinitionCreate added in v1.10.0

func (store *Store) TaskDefinitionCreate(ctx context.Context, task TaskDefinitionInterface) error

func (*Store) TaskDefinitionDelete added in v1.10.0

func (store *Store) TaskDefinitionDelete(ctx context.Context, task TaskDefinitionInterface) error

func (*Store) TaskDefinitionDeleteByID added in v1.10.0

func (store *Store) TaskDefinitionDeleteByID(ctx context.Context, id string) error

func (*Store) TaskDefinitionEnqueueByAlias added in v1.14.0

func (st *Store) TaskDefinitionEnqueueByAlias(
	ctx context.Context,
	queueName string,
	taskAlias string,
	parameters map[string]any,
) (TaskQueueInterface, error)

TaskDefinitionEnqueueByAlias finds a task by its alias and appends it to the queue

func (*Store) TaskDefinitionExecuteCli added in v1.14.0

func (store *Store) TaskDefinitionExecuteCli(alias string, args []string) bool

TaskDefinitionExecuteCli - CLI tool to find a task by its alias and execute its handler - alias "list" is reserved. it lists all the available commands

func (*Store) TaskDefinitionFindByAlias added in v1.10.0

func (store *Store) TaskDefinitionFindByAlias(ctx context.Context, alias string) (task TaskDefinitionInterface, err error)

func (*Store) TaskDefinitionFindByID added in v1.10.0

func (store *Store) TaskDefinitionFindByID(ctx context.Context, id string) (task TaskDefinitionInterface, err error)

func (*Store) TaskDefinitionList added in v1.10.0

func (store *Store) TaskDefinitionList(ctx context.Context, query TaskDefinitionQueryInterface) ([]TaskDefinitionInterface, error)

func (*Store) TaskDefinitionSoftDelete added in v1.10.0

func (store *Store) TaskDefinitionSoftDelete(ctx context.Context, task TaskDefinitionInterface) error

func (*Store) TaskDefinitionSoftDeleteByID added in v1.10.0

func (store *Store) TaskDefinitionSoftDeleteByID(ctx context.Context, id string) error

func (*Store) TaskDefinitionUpdate added in v1.10.0

func (store *Store) TaskDefinitionUpdate(ctx context.Context, task TaskDefinitionInterface) error

func (*Store) TaskHandlerAdd

func (store *Store) TaskHandlerAdd(ctx context.Context, taskHandler TaskDefinitionHandlerInterface, createIfMissing bool) error

func (*Store) TaskHandlerList

func (store *Store) TaskHandlerList() []TaskDefinitionHandlerInterface

func (*Store) TaskQueueClaimNext added in v1.10.0

func (store *Store) TaskQueueClaimNext(ctx context.Context, queueName string) (TaskQueueInterface, error)

TaskQueueClaimNext atomically claims the next queued task for processing. It uses SELECT FOR UPDATE within a transaction to prevent race conditions where multiple workers might try to process the same task.

Returns:

  • TaskQueueInterface: The claimed task (status updated to "running")
  • error: Any error that occurred during the operation

Returns (nil, nil) if no tasks are available to claim.

func (*Store) TaskQueueCount added in v1.10.0

func (store *Store) TaskQueueCount(ctx context.Context, options TaskQueueQueryInterface) (int64, error)

func (*Store) TaskQueueCreate added in v1.10.0

func (store *Store) TaskQueueCreate(ctx context.Context, queue TaskQueueInterface) error

TaskQueueCreate creates a queued task

func (*Store) TaskQueueDelete added in v1.10.0

func (store *Store) TaskQueueDelete(ctx context.Context, queue TaskQueueInterface) error

func (*Store) TaskQueueDeleteByID added in v1.10.0

func (st *Store) TaskQueueDeleteByID(ctx context.Context, id string) error

func (*Store) TaskQueueFail added in v1.10.0

func (st *Store) TaskQueueFail(ctx context.Context, queue TaskQueueInterface) error

TaskQueueFail fails a queued task

func (*Store) TaskQueueFindByID added in v1.10.0

func (store *Store) TaskQueueFindByID(ctx context.Context, id string) (TaskQueueInterface, error)

TaskQueueFindByID finds a Queue by ID

func (*Store) TaskQueueFindNextQueuedTask added in v1.10.0

func (store *Store) TaskQueueFindNextQueuedTask(ctx context.Context) (TaskQueueInterface, error)

func (*Store) TaskQueueFindNextQueuedTaskByQueue added in v1.10.0

func (store *Store) TaskQueueFindNextQueuedTaskByQueue(ctx context.Context, queueName string) (TaskQueueInterface, error)

func (*Store) TaskQueueFindRunning added in v1.10.0

func (store *Store) TaskQueueFindRunning(ctx context.Context, limit int) []TaskQueueInterface

func (*Store) TaskQueueFindRunningByQueue added in v1.10.0

func (store *Store) TaskQueueFindRunningByQueue(ctx context.Context, queueName string, limit int) []TaskQueueInterface

func (*Store) TaskQueueList added in v1.10.0

func (store *Store) TaskQueueList(ctx context.Context, query TaskQueueQueryInterface) ([]TaskQueueInterface, error)

func (*Store) TaskQueueProcessNext added in v1.10.0

func (store *Store) TaskQueueProcessNext(ctx context.Context) error

func (*Store) TaskQueueProcessNextAsyncByQueue added in v1.10.0

func (store *Store) TaskQueueProcessNextAsyncByQueue(ctx context.Context, queueName string) error

func (*Store) TaskQueueProcessNextByQueue added in v1.10.0

func (store *Store) TaskQueueProcessNextByQueue(ctx context.Context, queueName string) error

func (*Store) TaskQueueProcessTask added in v1.14.0

func (store *Store) TaskQueueProcessTask(ctx context.Context, queuedTask TaskQueueInterface) (bool, error)

func (*Store) TaskQueueRunConcurrent deprecated added in v1.14.0

func (store *Store) TaskQueueRunConcurrent(
	ctx context.Context,
	queueName string,
	processSeconds int,
	unstuckMinutes int,
)

TaskQueueRunConcurrent starts a queue processor that handles multiple tasks concurrently. Tasks are processed in parallel up to the configured MaxConcurrency limit. The processor runs in a background goroutine and can be stopped via TaskQueueStopByName.

Deprecated: Use NewTaskQueueRunner instead. This method will be removed in a future version. See docs/runners.md for the recommended approach.

func (*Store) TaskQueueRunDefault deprecated added in v1.14.0

func (store *Store) TaskQueueRunDefault(
	ctx context.Context,
	processSeconds int,
	unstuckMinutes int,
)

TaskQueueRunDefault starts the queue processor for the default queue. Equivalent to calling TaskQueueRunSerial with DefaultQueueName.

Deprecated: Use NewTaskQueueRunner instead. This method will be removed in a future version. See docs/runners.md for the recommended approach.

func (*Store) TaskQueueRunSerial deprecated added in v1.14.0

func (store *Store) TaskQueueRunSerial(
	ctx context.Context,
	queueName string,
	processSeconds int,
	unstuckMinutes int,
)

TaskQueueRunSerial starts a queue processor that handles tasks one at a time (serially). Each task must complete before the next one starts. The processor runs in a background goroutine and can be stopped via TaskQueueStopByName.

Deprecated: Use NewTaskQueueRunner instead. This method will be removed in a future version. See docs/runners.md for the recommended approach.

func (*Store) TaskQueueSoftDelete added in v1.10.0

func (store *Store) TaskQueueSoftDelete(ctx context.Context, queue TaskQueueInterface) error

func (*Store) TaskQueueSoftDeleteByID added in v1.10.0

func (store *Store) TaskQueueSoftDeleteByID(ctx context.Context, id string) error

func (*Store) TaskQueueStop deprecated added in v1.14.0

func (store *Store) TaskQueueStop()

TaskQueueStop stops the default queue processor. It blocks until the worker goroutine and all tasks have fully completed.

Deprecated: Use TaskQueueRunner.Stop() instead. This method will be removed in a future version. See docs/runners.md for the recommended approach.

func (*Store) TaskQueueStopByName deprecated added in v1.14.0

func (store *Store) TaskQueueStopByName(queueName string)

TaskQueueStopByName stops the specified queue processor. It cancels the context, waits for the queue loop to exit, and waits for all in-flight tasks to complete.

Deprecated: Use TaskQueueRunner.Stop() instead. This method will be removed in a future version. See docs/runners.md for the recommended approach.

func (*Store) TaskQueueSuccess added in v1.10.0

func (st *Store) TaskQueueSuccess(ctx context.Context, queue TaskQueueInterface) error

TaskQueueSuccess completes a queued task successfully

func (*Store) TaskQueueUnstuck added in v1.10.0

func (store *Store) TaskQueueUnstuck(ctx context.Context, waitMinutes int)

TaskQueueUnstuck clears the queue of tasks running for more than the specified wait time as most probably these have abnormally exited (panicked) and stop the rest of the queue from being processed

The tasks are marked as failed. However, if they are still running in the background and they are successfully completed, they will be marked as success

================================================================= Business Logic 1. Checks is there are running tasks in progress 2. If running for more than the specified wait minutes mark as failed =================================================================

func (*Store) TaskQueueUnstuckByQueue added in v1.10.0

func (store *Store) TaskQueueUnstuckByQueue(ctx context.Context, queueName string, waitMinutes int)

func (*Store) TaskQueueUpdate added in v1.10.0

func (store *Store) TaskQueueUpdate(ctx context.Context, queue TaskQueueInterface) error

TaskQueueUpdate creates a Queue

type StoreInterface

type StoreInterface interface {
	AutoMigrate() error
	EnableDebug(debug bool) StoreInterface
	SetErrorHandler(handler func(queueName, taskID string, err error)) StoreInterface

	TaskQueueCount(ctx context.Context, options TaskQueueQueryInterface) (int64, error)
	TaskQueueCreate(ctx context.Context, TaskQueue TaskQueueInterface) error
	TaskQueueDelete(ctx context.Context, TaskQueue TaskQueueInterface) error
	TaskQueueDeleteByID(ctx context.Context, id string) error
	TaskQueueFindByID(ctx context.Context, TaskQueueID string) (TaskQueueInterface, error)
	TaskQueueList(ctx context.Context, query TaskQueueQueryInterface) ([]TaskQueueInterface, error)
	TaskQueueSoftDelete(ctx context.Context, TaskQueue TaskQueueInterface) error
	TaskQueueSoftDeleteByID(ctx context.Context, id string) error
	TaskQueueUpdate(ctx context.Context, TaskQueue TaskQueueInterface) error
	TaskQueueClaimNext(ctx context.Context, queueName string) (TaskQueueInterface, error)

	// Deprecated: Use NewTaskQueueRunner instead. These methods will be removed in a future version.
	// See docs/runners.md for the recommended approach.
	TaskQueueRunDefault(ctx context.Context, processSeconds int, unstuckMinutes int)
	// Deprecated: Use NewTaskQueueRunner instead. These methods will be removed in a future version.
	// See docs/runners.md for the recommended approach.
	TaskQueueRunSerial(ctx context.Context, queueName string, processSeconds int, unstuckMinutes int)
	// Deprecated: Use NewTaskQueueRunner instead. These methods will be removed in a future version.
	// See docs/runners.md for the recommended approach.
	TaskQueueRunConcurrent(ctx context.Context, queueName string, processSeconds int, unstuckMinutes int)
	// Deprecated: Use TaskQueueRunner.Stop() instead. These methods will be removed in a future version.
	// See docs/runners.md for the recommended approach.
	TaskQueueStop()
	// Deprecated: Use TaskQueueRunner.Stop() instead. These methods will be removed in a future version.
	// See docs/runners.md for the recommended approach.
	TaskQueueStopByName(queueName string)
	TaskQueueProcessTask(ctx context.Context, queuedTask TaskQueueInterface) (bool, error)

	TaskDefinitionCount(ctx context.Context, options TaskDefinitionQueryInterface) (int64, error)
	TaskDefinitionCreate(ctx context.Context, TaskDefinition TaskDefinitionInterface) error
	TaskDefinitionDelete(ctx context.Context, TaskDefinition TaskDefinitionInterface) error
	TaskDefinitionDeleteByID(ctx context.Context, id string) error
	TaskDefinitionFindByAlias(ctx context.Context, alias string) (TaskDefinitionInterface, error)
	TaskDefinitionFindByID(ctx context.Context, id string) (TaskDefinitionInterface, error)
	TaskDefinitionList(ctx context.Context, options TaskDefinitionQueryInterface) ([]TaskDefinitionInterface, error)
	TaskDefinitionSoftDelete(ctx context.Context, TaskDefinition TaskDefinitionInterface) error
	TaskDefinitionSoftDeleteByID(ctx context.Context, id string) error
	TaskDefinitionUpdate(ctx context.Context, TaskDefinition TaskDefinitionInterface) error

	// TaskDefinition Operations
	TaskDefinitionEnqueueByAlias(ctx context.Context, queueName string, alias string, parameters map[string]any) (TaskQueueInterface, error)
	TaskDefinitionExecuteCli(alias string, args []string) bool

	TaskHandlerList() []TaskDefinitionHandlerInterface
	TaskHandlerAdd(ctx context.Context, taskHandler TaskDefinitionHandlerInterface, createIfMissing bool) error

	ScheduleCount(ctx context.Context, options ScheduleQueryInterface) (int64, error)
	ScheduleCreate(ctx context.Context, schedule ScheduleInterface) error
	ScheduleDelete(ctx context.Context, schedule ScheduleInterface) error
	ScheduleDeleteByID(ctx context.Context, id string) error
	ScheduleFindByID(ctx context.Context, id string) (ScheduleInterface, error)
	ScheduleList(ctx context.Context, options ScheduleQueryInterface) ([]ScheduleInterface, error)
	ScheduleSoftDelete(ctx context.Context, schedule ScheduleInterface) error
	ScheduleSoftDeleteByID(ctx context.Context, id string) error
	ScheduleUpdate(ctx context.Context, schedule ScheduleInterface) error
	ScheduleRun(ctx context.Context) error
}

type TaskDefinitionHandlerBase added in v1.17.0

type TaskDefinitionHandlerBase struct {
	// contains filtered or unexported fields
}

TaskDefinitionHandlerBase provides concurrency-safe shared behavior for task definition handlers, including access to the current queued task, parameter lookup and logging of error, info and success messages during task execution.

func (*TaskDefinitionHandlerBase) ErrorMessage added in v1.17.0

func (handler *TaskDefinitionHandlerBase) ErrorMessage() string

ErrorMessage alias is kept for backwards compatibility. Deprecated: use GetLastErrorMessage instead. Will be removed after 2026-11-30.

func (*TaskDefinitionHandlerBase) GetLastErrorMessage added in v1.17.0

func (handler *TaskDefinitionHandlerBase) GetLastErrorMessage() string

GetLastErrorMessage returns the last error message recorded via LogError.

func (*TaskDefinitionHandlerBase) GetLastInfoMessage added in v1.17.0

func (handler *TaskDefinitionHandlerBase) GetLastInfoMessage() string

GetLastInfoMessage returns the last informational message recorded via LogInfo.

func (*TaskDefinitionHandlerBase) GetLastSuccessMessage added in v1.17.0

func (handler *TaskDefinitionHandlerBase) GetLastSuccessMessage() string

GetLastSuccessMessage returns the last success message recorded via LogSuccess.

func (*TaskDefinitionHandlerBase) GetOptions added in v1.17.0

func (handler *TaskDefinitionHandlerBase) GetOptions() map[string]string

GetOptions returns the options map used when the handler is executed directly without an associated queued task.

func (*TaskDefinitionHandlerBase) GetOutput added in v1.17.0

func (handler *TaskDefinitionHandlerBase) GetOutput() string

func (*TaskDefinitionHandlerBase) GetParam added in v1.17.0

func (handler *TaskDefinitionHandlerBase) GetParam(paramName string) string

GetParam returns the value of a named parameter for the current execution. When a queued task is present it reads from the task's parameter map; otherwise it falls back to the handler options. If the parameter is missing or the queued task parameters cannot be decoded, an empty string is returned.

func (*TaskDefinitionHandlerBase) GetParamArray added in v1.17.0

func (handler *TaskDefinitionHandlerBase) GetParamArray(paramName string) []string

GetParamArray returns the named parameter split on semicolons into a slice. If the parameter is missing or empty, it returns an empty slice.

func (*TaskDefinitionHandlerBase) GetQueuedTask added in v1.17.0

func (handler *TaskDefinitionHandlerBase) GetQueuedTask() TaskQueueInterface

GetQueuedTask returns the currently associated queued task, if any.

func (*TaskDefinitionHandlerBase) HasQueuedTask added in v1.17.0

func (handler *TaskDefinitionHandlerBase) HasQueuedTask() bool

HasQueuedTask reports whether the handler is currently associated with a queued task.

func (*TaskDefinitionHandlerBase) InfoMessage added in v1.17.0

func (handler *TaskDefinitionHandlerBase) InfoMessage() string

InfoMessage alias is kept for backwards compatibility. Deprecated: use GetLastInfoMessage instead. Will be removed after 2026-11-30.

func (*TaskDefinitionHandlerBase) LastErrorMessage added in v1.17.0

func (handler *TaskDefinitionHandlerBase) LastErrorMessage() string

LastErrorMessage alias is kept for backwards compatibility. Deprecated: use GetLastErrorMessage instead. Will be removed after 2026-11-30.

func (*TaskDefinitionHandlerBase) LastInfoMessage added in v1.17.0

func (handler *TaskDefinitionHandlerBase) LastInfoMessage() string

LastInfoMessage alias is kept for backwards compatibility. Deprecated: use GetLastInfoMessage instead. Will be removed after 2026-11-30.

func (*TaskDefinitionHandlerBase) LastSuccessMessage added in v1.17.0

func (handler *TaskDefinitionHandlerBase) LastSuccessMessage() string

LastSuccessMessage alias is kept for backwards compatibility. Deprecated: use GetLastSuccessMessage instead. Will be removed after 2026-11-30.

func (*TaskDefinitionHandlerBase) LogError added in v1.17.0

func (handler *TaskDefinitionHandlerBase) LogError(message string)

LogError records an error message for the handler and either appends it to the queued task details (when a queued task is present) or prints it using cfmt.Errorln.

func (*TaskDefinitionHandlerBase) LogInfo added in v1.17.0

func (handler *TaskDefinitionHandlerBase) LogInfo(message string)

LogInfo records an informational message for the handler and either appends it to the queued task details (when a queued task is present) or prints it using cfmt.Infoln.

func (*TaskDefinitionHandlerBase) LogSuccess added in v1.17.0

func (handler *TaskDefinitionHandlerBase) LogSuccess(message string)

LogSuccess records a success message for the handler and either appends it to the queued task details (when a queued task is present) or prints it using cfmt.Successln.

func (*TaskDefinitionHandlerBase) Options added in v1.17.0

func (handler *TaskDefinitionHandlerBase) Options() map[string]string

Options alias is kept for backwards compatibility. Deprecated: use GetOptions instead. Will be removed after 2026-11-30.

func (*TaskDefinitionHandlerBase) QueuedTask added in v1.17.0

func (handler *TaskDefinitionHandlerBase) QueuedTask() TaskQueueInterface

QueuedTask alias is kept for backwards compatibility. Deprecated: use GetQueuedTask instead. Will be removed after 2026-11-30.

func (*TaskDefinitionHandlerBase) SetOptions added in v1.17.0

func (handler *TaskDefinitionHandlerBase) SetOptions(options map[string]string)

SetOptions sets the options map used when the handler is executed directly without an associated queued task.

func (*TaskDefinitionHandlerBase) SetOutput added in v1.17.0

func (handler *TaskDefinitionHandlerBase) SetOutput(output string)

func (*TaskDefinitionHandlerBase) SetQueuedTask added in v1.17.0

func (handler *TaskDefinitionHandlerBase) SetQueuedTask(queuedTask TaskQueueInterface)

SetQueuedTask associates the handler with a specific queued task instance.

func (*TaskDefinitionHandlerBase) SuccessMessage added in v1.17.0

func (handler *TaskDefinitionHandlerBase) SuccessMessage() string

SuccessMessage alias is kept for backwards compatibility. Deprecated: use GetLastSuccessMessage instead. Will be removed after 2026-11-30.

type TaskDefinitionHandlerInterface added in v1.17.0

type TaskDefinitionHandlerInterface interface {

	// Alias returns the unique identifier used to reference the task
	// definition when enqueuing or executing it.
	Alias() string

	// Title returns a short human-readable name for the task.
	Title() string

	// Description returns a longer human-readable description of what the
	// task does.
	Description() string

	// Handle executes the task logic and returns true on success.
	Handle() bool

	// HasQueuedTask reports whether the handler is currently associated with a
	// queued task.
	HasQueuedTask() bool

	// LogError records an error message for the handler and either appends it
	// to the queued task details or logs it directly.
	LogError(message string)

	// LogInfo records an informational message for the handler and either
	// appends it to the queued task details or logs it directly.
	LogInfo(message string)

	// LogSuccess records a success message for the handler and either appends
	// it to the queued task details or logs it directly.
	LogSuccess(message string)

	// GetQueuedTask returns the currently associated queued task, if any.
	GetQueuedTask() TaskQueueInterface

	// SetQueuedTask associates the handler with a queued task instance when
	// invoked as part of background processing.
	SetQueuedTask(queuedTask TaskQueueInterface)

	// GetOptions returns the options map used when the handler is executed
	// directly without an associated queued task.
	GetOptions() map[string]string

	// SetOptions provides key-value options when the handler is executed
	// directly, without an associated queued task.
	SetOptions(options map[string]string)

	// GetOutput returns the current output value for the handler. When a
	// queued task is associated, this typically reflects the queued task's
	// output; otherwise it is a handler-local value.
	GetOutput() string

	// SetOutput stores the output value for the handler. When a queued task is
	// associated, implementations should propagate this value to the queued
	// task's output as well.
	SetOutput(output string)

	// GetLastErrorMessage returns the last error message recorded during
	// handler execution.
	GetLastErrorMessage() string

	// GetLastInfoMessage returns the last informational message recorded during
	// handler execution.
	GetLastInfoMessage() string

	// GetLastSuccessMessage returns the last success message recorded during
	// handler execution.
	GetLastSuccessMessage() string

	// GetParam returns the value of a named parameter for the current
	// execution, reading from the queued task parameters when present or from
	// the handler options otherwise.
	GetParam(paramName string) string

	// GetParamArray returns the named parameter split on semicolons into a
	// slice. If the parameter is missing or empty, it returns an empty slice.
	GetParamArray(paramName string) []string
}

TaskDefinitionHandlerInterface defines the contract for a task definition handler implementation. Handlers provide metadata (alias, title, description), implement the task logic in Handle, and support being wired to a queued task or executed directly with options.

type TaskDefinitionInterface added in v1.10.0

type TaskDefinitionInterface interface {
	Data() map[string]string
	DataChanged() map[string]string
	MarkAsNotDirty()

	IsActive() bool
	IsCanceled() bool
	IsSoftDeleted() bool

	GetAlias() string

	// Alias alias is kept for backwards compatibility.
	// Deprecated: use GetAlias instead. Will be removed after 2026-11-30.
	Alias() string
	SetAlias(alias string) TaskDefinitionInterface

	GetCreatedAt() string

	// CreatedAt alias is kept for backwards compatibility.
	// Deprecated: use GetCreatedAt instead. Will be removed after 2026-11-30.
	CreatedAt() string
	CreatedAtCarbon() *carbon.Carbon
	SetCreatedAt(createdAt string) TaskDefinitionInterface

	GetDescription() string

	// Description alias is kept for backwards compatibility.
	// Deprecated: use GetDescription instead. Will be removed after 2026-11-30.
	Description() string
	SetDescription(description string) TaskDefinitionInterface

	GetID() string

	// ID alias is kept for backwards compatibility.
	// Deprecated: use GetID instead. Will be removed after 2026-11-30.
	ID() string
	SetID(id string) TaskDefinitionInterface

	GetMemo() string

	// Memo alias is kept for backwards compatibility.
	// Deprecated: use GetMemo instead. Will be removed after 2026-11-30.
	Memo() string
	SetMemo(memo string) TaskDefinitionInterface

	GetIsRecurring() int

	// IsRecurring alias is kept for backwards compatibility.
	// Deprecated: use GetIsRecurring instead. Will be removed after 2026-11-30.
	IsRecurring() int
	SetIsRecurring(isRecurring int) TaskDefinitionInterface

	GetRecurrenceRule() string

	// RecurrenceRule alias is kept for backwards compatibility.
	// Deprecated: use GetRecurrenceRule instead. Will be removed after 2026-11-30.
	RecurrenceRule() string
	SetRecurrenceRule(recurrenceRule string) TaskDefinitionInterface

	GetSoftDeletedAt() string

	// SoftDeletedAt alias is kept for backwards compatibility.
	// Deprecated: use GetSoftDeletedAt instead. Will be removed after 2026-11-30.
	SoftDeletedAt() string
	SoftDeletedAtCarbon() *carbon.Carbon
	SetSoftDeletedAt(deletedAt string) TaskDefinitionInterface

	GetStatus() string

	// Status alias is kept for backwards compatibility.
	// Deprecated: use GetStatus instead. Will be removed after 2026-11-30.
	Status() string
	SetStatus(status string) TaskDefinitionInterface

	GetTitle() string

	// Title alias is kept for backwards compatibility.
	// Deprecated: use GetTitle instead. Will be removed after 2026-11-30.
	Title() string
	SetTitle(title string) TaskDefinitionInterface

	GetUpdatedAt() string

	// UpdatedAt alias is kept for backwards compatibility.
	// Deprecated: use GetUpdatedAt instead. Will be removed after 2026-11-30.
	UpdatedAt() string
	UpdatedAtCarbon() *carbon.Carbon
	SetUpdatedAt(updatedAt string) TaskDefinitionInterface
}

func NewTaskDefinition added in v1.10.0

func NewTaskDefinition() TaskDefinitionInterface

func NewTaskDefinitionFromExistingData added in v1.10.0

func NewTaskDefinitionFromExistingData(data map[string]string) TaskDefinitionInterface

type TaskDefinitionQueryInterface added in v1.10.0

type TaskDefinitionQueryInterface interface {
	Validate() error

	Columns() []string
	SetColumns(columns []string) TaskDefinitionQueryInterface

	HasCountOnly() bool
	IsCountOnly() bool
	SetCountOnly(countOnly bool) TaskDefinitionQueryInterface

	HasAlias() bool
	Alias() string
	SetAlias(alias string) TaskDefinitionQueryInterface

	HasCreatedAtGte() bool
	CreatedAtGte() string
	SetCreatedAtGte(createdAtGte string) TaskDefinitionQueryInterface

	HasCreatedAtLte() bool
	CreatedAtLte() string
	SetCreatedAtLte(createdAtLte string) TaskDefinitionQueryInterface

	HasID() bool
	ID() string
	SetID(id string) TaskDefinitionQueryInterface

	HasIDIn() bool
	IDIn() []string
	SetIDIn(idIn []string) TaskDefinitionQueryInterface

	HasLimit() bool
	Limit() int
	SetLimit(limit int) TaskDefinitionQueryInterface

	HasOffset() bool
	Offset() int
	SetOffset(offset int) TaskDefinitionQueryInterface

	HasSortOrder() bool
	SortOrder() string
	SetSortOrder(sortOrder string) TaskDefinitionQueryInterface

	HasOrderBy() bool
	OrderBy() string
	SetOrderBy(orderBy string) TaskDefinitionQueryInterface

	HasSoftDeletedIncluded() bool
	SoftDeletedIncluded() bool
	SetSoftDeletedIncluded(withDeleted bool) TaskDefinitionQueryInterface

	HasStatus() bool
	Status() string
	SetStatus(status string) TaskDefinitionQueryInterface

	HasStatusIn() bool
	StatusIn() []string
	SetStatusIn(statusIn []string) TaskDefinitionQueryInterface
}

func TaskDefinitionQuery added in v1.10.0

func TaskDefinitionQuery() TaskDefinitionQueryInterface

type TaskHandlerBase

type TaskHandlerBase = TaskDefinitionHandlerBase

TaskHandlerBase alias is kept for backwards compatibility. Deprecated: use TaskDefinitionHandlerBase instead. Will be removed after 2026-11-30.

type TaskHandlerInterface

type TaskHandlerInterface = TaskDefinitionHandlerInterface

TaskHandlerInterface alias is kept for backwards compatibility. Deprecated: use TaskDefinitionHandlerInterface instead. Will be removed after 2026-11-30.

type TaskHandlerWithContext added in v1.10.0

type TaskHandlerWithContext interface {
	TaskDefinitionHandlerInterface
	HandleWithContext(ctx context.Context) bool
}

TaskHandlerWithContext is an optional interface that task handlers can implement to receive context for cancellation support. This is backward compatible - handlers that don't implement this will continue to work using the standard Handle() method.

Example usage:

type MyHandler struct {
    TaskDefinitionHandlerBase
}

func (h *MyHandler) HandleWithContext(ctx context.Context) bool {
    select {
    case <-ctx.Done():
        h.LogInfo("Task cancelled")
        return false
    case <-time.After(5 * time.Second):
        h.LogSuccess("Task completed")
        return true
    }
}

type TaskQueueInterface added in v1.10.0

type TaskQueueInterface interface {
	Data() map[string]string
	DataChanged() map[string]string
	MarkAsNotDirty()

	IsCanceled() bool
	IsDeleted() bool
	IsFailed() bool
	IsQueued() bool
	IsPaused() bool
	IsRunning() bool
	IsSuccess() bool
	IsSoftDeleted() bool

	GetAttempts() int

	// Attempts alias is kept for backwards compatibility.
	// Deprecated: use GetAttempts instead. Will be removed after 2026-11-30.
	Attempts() int
	SetAttempts(attempts int) TaskQueueInterface

	GetCompletedAt() string

	// CompletedAt alias is kept for backwards compatibility.
	// Deprecated: use GetCompletedAt instead. Will be removed after 2026-11-30.
	CompletedAt() string
	CompletedAtCarbon() *carbon.Carbon
	SetCompletedAt(completedAt string) TaskQueueInterface

	GetCreatedAt() string

	// CreatedAt alias is kept for backwards compatibility.
	// Deprecated: use GetCreatedAt instead. Will be removed after 2026-11-30.
	CreatedAt() string
	CreatedAtCarbon() *carbon.Carbon
	SetCreatedAt(createdAt string) TaskQueueInterface

	GetDetails() string

	// Details alias is kept for backwards compatibility.
	// Deprecated: use GetDetails instead. Will be removed after 2026-11-30.
	Details() string
	AppendDetails(details string) TaskQueueInterface
	SetDetails(details string) TaskQueueInterface

	GetID() string

	// ID alias is kept for backwards compatibility.
	// Deprecated: use GetID instead. Will be removed after 2026-11-30.
	ID() string
	SetID(id string) TaskQueueInterface

	GetOutput() string

	// Output alias is kept for backwards compatibility.
	// Deprecated: use GetOutput instead. Will be removed after 2026-11-30.
	Output() string
	SetOutput(output string) TaskQueueInterface

	GetParameters() string

	// Parameters alias is kept for backwards compatibility.
	// Deprecated: use GetParameters instead. Will be removed after 2026-11-30.
	Parameters() string
	SetParameters(parameters string) TaskQueueInterface
	ParametersMap() (map[string]string, error)
	SetParametersMap(parameters map[string]string) (TaskQueueInterface, error)

	GetSoftDeletedAt() string

	// SoftDeletedAt alias is kept for backwards compatibility.
	// Deprecated: use GetSoftDeletedAt instead. Will be removed after 2026-11-30.
	SoftDeletedAt() string
	SoftDeletedAtCarbon() *carbon.Carbon
	SetSoftDeletedAt(deletedAt string) TaskQueueInterface

	GetStartedAt() string

	// StartedAt alias is kept for backwards compatibility.
	// Deprecated: use GetStartedAt instead. Will be removed after 2026-11-30.
	StartedAt() string
	StartedAtCarbon() *carbon.Carbon
	SetStartedAt(startedAt string) TaskQueueInterface

	GetStatus() string

	// Status alias is kept for backwards compatibility.
	// Deprecated: use GetStatus instead. Will be removed after 2026-11-30.
	Status() string
	SetStatus(status string) TaskQueueInterface

	GetTaskID() string

	// TaskID alias is kept for backwards compatibility.
	// Deprecated: use GetTaskID instead. Will be removed after 2026-11-30.
	TaskID() string
	SetTaskID(taskID string) TaskQueueInterface

	GetUpdatedAt() string

	// UpdatedAt alias is kept for backwards compatibility.
	// Deprecated: use GetUpdatedAt instead. Will be removed after 2026-11-30.
	UpdatedAt() string
	UpdatedAtCarbon() *carbon.Carbon
	SetUpdatedAt(updatedAt string) TaskQueueInterface

	GetQueueName() string

	// QueueName alias is kept for backwards compatibility.
	// Deprecated: use GetQueueName instead. Will be removed after 2026-11-30.
	QueueName() string
	SetQueueName(queueName string) TaskQueueInterface
}

func NewTaskQueue added in v1.10.0

func NewTaskQueue(queueName ...string) TaskQueueInterface

NewTaskQueue creates a new task queue If a queue name is provided, it will be used; otherwise DefaultQueueName is used.

func NewTaskQueueFromExistingData added in v1.10.0

func NewTaskQueueFromExistingData(data map[string]string) TaskQueueInterface

type TaskQueueQueryInterface added in v1.10.0

type TaskQueueQueryInterface interface {
	Validate() error

	Columns() []string
	SetColumns(columns []string) TaskQueueQueryInterface

	HasCountOnly() bool
	IsCountOnly() bool
	SetCountOnly(countOnly bool) TaskQueueQueryInterface

	HasCreatedAtGte() bool
	CreatedAtGte() string
	SetCreatedAtGte(createdAtGte string) TaskQueueQueryInterface

	HasCreatedAtLte() bool
	CreatedAtLte() string
	SetCreatedAtLte(createdAtLte string) TaskQueueQueryInterface

	HasID() bool
	ID() string
	SetID(id string) TaskQueueQueryInterface

	HasIDIn() bool
	IDIn() []string
	SetIDIn(idIn []string) TaskQueueQueryInterface

	HasLimit() bool
	Limit() int
	SetLimit(limit int) TaskQueueQueryInterface

	HasOffset() bool
	Offset() int
	SetOffset(offset int) TaskQueueQueryInterface

	HasSortOrder() bool
	SortOrder() string
	SetSortOrder(sortOrder string) TaskQueueQueryInterface

	HasOrderBy() bool
	OrderBy() string
	SetOrderBy(orderBy string) TaskQueueQueryInterface

	HasSoftDeletedIncluded() bool
	SoftDeletedIncluded() bool
	SetSoftDeletedIncluded(withDeleted bool) TaskQueueQueryInterface

	HasStatus() bool
	Status() string
	SetStatus(status string) TaskQueueQueryInterface

	HasStatusIn() bool
	StatusIn() []string
	SetStatusIn(statusIn []string) TaskQueueQueryInterface

	HasTaskID() bool
	TaskID() string
	SetTaskID(taskID string) TaskQueueQueryInterface

	HasQueueName() bool
	QueueName() string
	SetQueueName(queueName string) TaskQueueQueryInterface
}

func TaskQueueQuery added in v1.10.0

func TaskQueueQuery() TaskQueueQueryInterface

type TaskQueueRunnerInterface added in v1.15.0

type TaskQueueRunnerInterface interface {
	Start(ctx context.Context)
	Stop()
	IsRunning() bool
	RunOnce(ctx context.Context) error
}

func NewTaskQueueRunner added in v1.15.0

func NewTaskQueueRunner(store StoreInterface, opts TaskQueueRunnerOptions) TaskQueueRunnerInterface

type TaskQueueRunnerOptions added in v1.15.0

type TaskQueueRunnerOptions struct {
	IntervalSeconds int
	UnstuckMinutes  int
	QueueName       string
	Logger          *log.Logger
	MaxConcurrency  int // 0 or 1 = serial, >1 = concurrent (default: 1)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL