asyncjobs

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2023 License: Apache-2.0 Imports: 28 Imported by: 1

README

Choria Asynchronous Jos

Overview

This is an Asynchronous Job Queue system that relies on NATS JetStream for storage and general job life cycle management. It is compatible with any NATS JetStream based system like a private hosted JetStream, Choria Streams or a commercial SaaS.

Each Task is stored in JetStream by a unique ID and Work Queue item is made referencing that Task. JetStream will handle dealing with scheduling, retries, acknowledgements and more of the Work Queue item. The stored Task will be updated during the lifecycle.

Multiple processes can process jobs concurrently, thus job processing is both horizontally and vertically scalable. Job handlers are implemented in Go with one process hosting one or many handlers. Other languages can implement Job Handlers using NATS Request-Reply services. Per process concurrency and overall per-queue concurrency controls exist.

This package heavily inspired by hibiken/asynq.

Go Reference Go Report Card CodeQL Unit Tests

Status

This is a brand-new project, under heavy development. The core Task handling is in good shape and reasonably stable. Task Scheduler is still subject to some change.

Synopsis

Tasks are published to Work Queues:

// establish a connection to the EMAIL work queue using a NATS context
client, _ := asyncjobs.NewClient(asyncjobs.NatsConn(nc), asyncjobs.BindWorkQueue("EMAIL"))

// create a task with the type 'email:new' and body from newEmail()
task, _ := asyncjobs.NewTask("email:new", newEmail())

// store it in the Work Queue
client.EnqueueTask(ctx, task)

Tasks are processes by horizontally and vertically scalable processes. Typically, a Handler handles one type of Task. We have Prometheus integration, concurrency and backoffs configured.

// establish a connection to the EMAIL work queue using a 
// NATS context, with concurrency, prometheus stats and backoff
client, _ := asyncjobs.NewClient(
	asyncjobs.NatsContext("EMAIL"), 
	asyncjobs.BindWorkQueue("EMAIL"),
	asyncjobs.ClientConcurrency(10),
	asyncjobs.PrometheusListenPort(8080),
	asyncjobs.RetryBackoffPolicy(asyncjobs.RetryLinearTenMinutes))

router := asyncjobs.NewTaskRouter()
router.Handler("email:new", func(ctx context.Context, log asyncjobs.Logger, task *asyncjobs.Task) (any, error) {
	log.Printf("Processing task %s", task.ID)

	// do work here using task.Payload

	return "sent", nil
})

client.Run(ctx, router)

See our documentation for a deep dive into the use cases, architecture, abilities and more.

Requirements

NATS 2.8.0 or newer with JetStream enabled.

Features

See the Feature List page for a full feature break down.

Documentation

Index

Examples

Constants

View Source
const (
	// ShortedScheduledDeadline is the shortest deadline a scheduled task may have
	ShortedScheduledDeadline = 30 * time.Second
	// DefaultJobRunTime when not configured for a queue this is the default run-time handlers will get
	DefaultJobRunTime = time.Hour
	// DefaultMaxTries when not configured for a task this is the default tries it will get
	DefaultMaxTries = 10
	// DefaultQueueMaxConcurrent when not configured for a queue this is the default concurrency setting
	DefaultQueueMaxConcurrent = 100
)
View Source
const (
	// TaskStateChangeEventType is the event type for TaskStateChangeEvent events
	TaskStateChangeEventType = "io.choria.asyncjobs.v1.task_state"

	// LeaderElectedEventType is the event type for LeaderElectedEvent events
	LeaderElectedEventType = "io.choria.asyncjobs.v1.leader_elected"
)
View Source
const (
	// RequestReplyContentTypeHeader is the header text sent to indicate the body encoding and type
	RequestReplyContentTypeHeader = "AJ-Content-Type"
	// RequestReplyDeadlineHeader is the header indicating the deadline for processing the item
	RequestReplyDeadlineHeader = "AJ-Handler-Deadline"
	// RequestReplyTerminateError is the header to send in a reply that the task should be terminated via ErrTerminateTask
	RequestReplyTerminateError = "AJ-Terminate"
	// RequestReplyError is the header indicating a generic failure in handling an item
	RequestReplyError = "AJ-Error"
	// RequestReplyTaskType is the content type indicating the payload is a Task in JSON format
	RequestReplyTaskType = "application/x-asyncjobs-task+json"
)
View Source
const (
	// TasksStreamName is the name of the JetStream Stream storing tasks
	TasksStreamName = "CHORIA_AJ_TASKS"
	// TasksStreamSubjects is a NATS wildcard matching all tasks
	TasksStreamSubjects = "CHORIA_AJ.T.*"
	// TasksStreamSubjectPattern is the printf pattern that can be used to find an individual task by its task ID
	TasksStreamSubjectPattern = "CHORIA_AJ.T.%s"

	// EventsSubjectWildcard is the NATS wildcard for receiving all events
	EventsSubjectWildcard = "CHORIA_AJ.E.>"
	// TaskStateChangeEventSubjectPattern is a printf pattern for determining the event publish subject
	TaskStateChangeEventSubjectPattern = "CHORIA_AJ.E.task_state.%s"
	// TaskStateChangeEventSubjectWildcard is a NATS wildcard for receiving all TaskStateChangeEvent messages
	TaskStateChangeEventSubjectWildcard = "CHORIA_AJ.E.task_state.*"
	// LeaderElectedEventSubjectPattern is the pattern for determining the event publish subject
	LeaderElectedEventSubjectPattern = "CHORIA_AJ.E.leader_election.%s"
	// LeaderElectedEventSubjectWildcard is the NATS wildcard for receiving all LeaderElectedEvent messages
	LeaderElectedEventSubjectWildcard = "CHORIA_AJ.E.leader_election.>"

	// WorkStreamNamePattern is the printf pattern for determining JetStream Stream names per queue
	WorkStreamNamePattern = "CHORIA_AJ_Q_%s"
	// WorkStreamSubjectPattern is the printf pattern individual items are placed in, placeholders for JobID and JobType
	WorkStreamSubjectPattern = "CHORIA_AJ.Q.%s.%s"
	// WorkStreamSubjectWildcard is a NATS filter matching all enqueued items for any task store
	WorkStreamSubjectWildcard = "CHORIA_AJ.Q.>"
	// WorkStreamNamePrefix is the prefix that, when removed, reveals the queue name
	WorkStreamNamePrefix = "CHORIA_AJ_Q_"

	// RequestReplyTaskHandlerPattern is the subject request reply task handlers should listen on by default
	RequestReplyTaskHandlerPattern = "CHORIA_AJ.H.T.%s"

	// ConfigBucketName is the KV bucket for configuration like scheduled tasks
	ConfigBucketName = "CHORIA_AJ_CONFIGURATION"

	// LeaderElectionBucketName is the KV bucket that will manage leader elections
	LeaderElectionBucketName = "CHORIA_AJ_ELECTIONS"
)

Variables

View Source
var (
	// ErrTaskNotFound is the error indicating a task does not exist rather than a failure to load
	ErrTaskNotFound = errors.New("task not found")
	// ErrTerminateTask indicates that a task failed, and no further processing attempts should be made
	ErrTerminateTask = fmt.Errorf("terminate task")
	// ErrNoTasks indicates the task store is empty
	ErrNoTasks = fmt.Errorf("no tasks found")
	// ErrTaskPastDeadline indicates a task that was scheduled for handling is past its deadline
	ErrTaskPastDeadline = fmt.Errorf("past deadline")
	// ErrTaskExceedsMaxTries indicates a task exceeded its maximum attempts
	ErrTaskExceedsMaxTries = fmt.Errorf("exceeded maximum tries")
	// ErrTaskAlreadyActive indicates that a task is already in the active state
	ErrTaskAlreadyActive = fmt.Errorf("task already active")
	// ErrTaskTypeCannotEnqueue indicates that a task is in a state where it cannot be enqueued as new
	ErrTaskTypeCannotEnqueue = fmt.Errorf("cannot enqueue a task in state")
	// ErrTaskUpdateFailed indicates a task update failed
	ErrTaskUpdateFailed = fmt.Errorf("failed updating task state")
	// ErrTaskAlreadyInState indicates an update failed because a task was already in the desired state
	ErrTaskAlreadyInState = fmt.Errorf("%w, already in desired state", ErrTaskUpdateFailed)
	// ErrTaskLoadFailed indicates a task failed for an unknown reason
	ErrTaskLoadFailed = fmt.Errorf("loading task failed")
	// ErrTaskTypeRequired indicates an empty task type was given
	ErrTaskTypeRequired = fmt.Errorf("task type is required")
	// ErrTaskTypeInvalid indicates an invalid task type was given
	ErrTaskTypeInvalid = fmt.Errorf("task type is invalid")
	// ErrTaskDependenciesFailed indicates that the task cannot be run as its dependencies failed
	ErrTaskDependenciesFailed = fmt.Errorf("task dependencies failed")

	// ErrNoHandlerForTaskType indicates that a task could not be handled by any known handlers
	ErrNoHandlerForTaskType = fmt.Errorf("no handler for task type")
	// ErrDuplicateHandlerForTaskType indicates a task handler for a specific type is already registered
	ErrDuplicateHandlerForTaskType = fmt.Errorf("duplicate handler for task type")

	// ErrInvalidHeaders indicates that message headers from JetStream were not valid
	ErrInvalidHeaders = fmt.Errorf("coult not decode headers")
	// ErrContextWithoutDeadline indicates a context.Context was passed without deadline when it was expected
	ErrContextWithoutDeadline = fmt.Errorf("non deadline context given")
	// ErrInvalidStorageItem indicates a Work Queue item had no JetStream state associated with it
	ErrInvalidStorageItem = fmt.Errorf("invalid storage item")
	// ErrNoNatsConn indicates that a nil connection was supplied
	ErrNoNatsConn = fmt.Errorf("no NATS connection supplied")
	// ErrNoMux indicates that a processor was started with no routing mux configured
	ErrNoMux = fmt.Errorf("mux is required")
	// ErrStorageNotReady indicates the underlying storage is not ready
	ErrStorageNotReady = fmt.Errorf("storage not ready")

	// ErrQueueNotFound is the error indicating a queue does not exist rather than a failure to load
	ErrQueueNotFound = errors.New("queue not found")
	// ErrQueueConsumerNotFound indicates that the Work Queue store has no consumers defined
	ErrQueueConsumerNotFound = errors.New("queue consumer not found")
	// ErrQueueNameRequired indicates a queue has no name
	ErrQueueNameRequired = fmt.Errorf("queue name is required")
	// ErrQueueItemCorrupt indicates that an item received from the work queue was invalid - perhaps invalid JSON
	ErrQueueItemCorrupt = fmt.Errorf("corrupt queue item received")
	// ErrQueueItemInvalid is an item read from the queue with no data or obviously bad data
	ErrQueueItemInvalid = fmt.Errorf("invalid queue item received")
	// ErrInvalidQueueState indicates a queue was attempted to be used but no internal state is known of that queue
	ErrInvalidQueueState = fmt.Errorf("invalid queue storage state")
	// ErrDuplicateItem indicates that the Work Queue deduplication protection refused a message
	ErrDuplicateItem = fmt.Errorf("duplicate work queue item")
	// ErrExternalCommandNotFound indicates a command for an ExternalProcess handler was not found
	ErrExternalCommandNotFound = fmt.Errorf("command not found")
	// ErrExternalCommandFailed indicates a command for an ExternalProcess handler failed
	ErrExternalCommandFailed = fmt.Errorf("execution failed")
	// ErrUnknownEventType indicates that while parsing an event an unknown type of event was encountered
	ErrUnknownEventType = fmt.Errorf("unknown event type")

	// ErrUnknownRetryPolicy indicates the requested retry policy does not exist
	ErrUnknownRetryPolicy = fmt.Errorf("unknown retry policy")

	// ErrUnknownDiscardPolicy indicates a discard policy could not be found matching a name
	ErrUnknownDiscardPolicy = fmt.Errorf("unknown discard policy")

	// ErrRequestReplyFailed indicates a callout to a remote handler failed due to a timeout, lack of listeners or network error
	ErrRequestReplyFailed = fmt.Errorf("request-reply callout failed")
	// ErrRequestReplyNoDeadline indicates a request-reply handler was called without a deadline
	ErrRequestReplyNoDeadline = fmt.Errorf("request-reply requires deadline context")
	// ErrRequestReplyShortDeadline indicates a deadline context has a too short timeout
	ErrRequestReplyShortDeadline = fmt.Errorf("deadline too short")

	// ErrScheduleNameIsRequired indicates a schedule name is needed when creating new schedules
	ErrScheduleNameIsRequired = errors.New("name is required")
	// ErrScheduleNameInvalid indicates the name given to a task is invalid
	ErrScheduleNameInvalid = errors.New("name is invalid")
	// ErrScheduleIsRequired indicates a cron schedule must be supplied when creating new schedules
	ErrScheduleIsRequired = errors.New("schedule is required")
	// ErrScheduleInvalid indicates an invalid cron schedule was supplied
	ErrScheduleInvalid = errors.New("invalid cron schedule")
	// ErrScheduledTaskAlreadyExist indicates a scheduled task that was being created already existed
	ErrScheduledTaskAlreadyExist = errors.New("scheduled task already exist")
	// ErrScheduledTaskNotFound indicates the requested task does not exist
	ErrScheduledTaskNotFound = errors.New("scheduled task not found")
	// ErrScheduledTaskInvalid indicates a loaded task was invalid
	ErrScheduledTaskInvalid = errors.New("invalid scheduled task")
	// ErrScheduledTaskShortDeadline indicates the time allowed for task execution is too short
	ErrScheduledTaskShortDeadline = errors.New("deadline too short")
)
View Source
var (
	// RetryLinearTenMinutes is a 50-step policy between 1 and 10 minutes
	RetryLinearTenMinutes = linearPolicy(50, 0.90, time.Minute, 10*time.Minute)

	// RetryLinearOneHour is a 50-step policy between 10 minutes and 1 hour
	RetryLinearOneHour = linearPolicy(20, 0.90, 10*time.Minute, 60*time.Minute)

	// RetryLinearOneMinute is a 20-step policy between 1 second and 1 minute
	RetryLinearOneMinute = linearPolicy(20, 0.5, time.Second, time.Minute)

	// RetryDefault is the default retry policy
	RetryDefault = RetryLinearTenMinutes
)

Functions

func IsRetryPolicyKnown added in v0.0.5

func IsRetryPolicyKnown(name string) bool

IsRetryPolicyKnown determines if the named policy exist

func IsValidName added in v0.0.5

func IsValidName(name string) bool

IsValidName is a generic strict name validator for what we want people to put in name - task names etc, things that turn into subjects

func ParseEventJSON added in v0.0.2

func ParseEventJSON(event []byte) (any, string, error)

ParseEventJSON parses event bytes returning the parsed Event and its event type

func RequestReplySubjectForTaskType added in v0.1.0

func RequestReplySubjectForTaskType(taskType string) string

RequestReplySubjectForTaskType returns the subject a request-reply handler should listen on for a specified task type

func RetryPolicyNames added in v0.0.5

func RetryPolicyNames() []string

RetryPolicyNames returns a list of pre-generated retry policies

func RetrySleep added in v0.0.2

func RetrySleep(ctx context.Context, p RetryPolicyProvider, n int) error

RetrySleep sleeps for the duration for try n or until interrupted by ctx

Types

type BaseEvent added in v0.0.2

type BaseEvent struct {
	EventID   string    `json:"event_id"`
	EventType string    `json:"type"`
	TimeStamp time.Time `json:"timestamp"`
}

BaseEvent is present in all event types and can be used to detect the type

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client connects Task producers and Task handlers to the backend

Example (Consumer)
queue := &Queue{
	Name:          "P100",
	MaxRunTime:    60 * time.Minute,
	MaxConcurrent: 20,
	MaxTries:      100,
}

// Uses the NATS CLI context WQ for connection details, will create the queue if
// it does not already exist
client, err := NewClient(NatsContext("WQ"), WorkQueue(queue), RetryBackoffPolicy(RetryLinearOneHour))
panicIfErr(err)

router := NewTaskRouter()
err = router.HandleFunc("email:send", func(_ context.Context, _ Logger, t *Task) (any, error) {
	log.Printf("Processing task: %s", t.ID)

	// handle task.Payload which is a JSON encoded email

	// task record will be updated with this payload result
	return "success", nil
})
panicIfErr(err)

// Starts handling registered tasks, blocks until canceled
err = client.Run(context.Background(), router)
panicIfErr(err)
Output:

Example (Producer)
queue := &Queue{
	Name:          "P100",
	MaxRunTime:    60 * time.Minute,
	MaxConcurrent: 20,
	MaxTries:      100,
}

email := newEmail("user@example.net", "Test Subject", "Test Body")

// Creates a new task that has a deadline for processing 1 hour from now
task, err := NewTask("email:send", email, TaskDeadline(time.Now().Add(time.Hour)))
panicIfErr(err)

// Uses the NATS CLI context WQ for connection details, will create the queue if
// it does not already exist
client, err := NewClient(NatsContext("WQ"), WorkQueue(queue))
panicIfErr(err)

// Adds the task to the queue called P100
err = client.EnqueueTask(context.Background(), task)
panicIfErr(err)
Output:

func NewClient

func NewClient(opts ...ClientOpt) (*Client, error)

NewClient creates a new client, one of NatsConn() or NatsContext() must be passed, other options are optional.

When no Queue() is supplied a default queue called DEFAULT will be used

func (*Client) EnqueueTask

func (c *Client) EnqueueTask(ctx context.Context, task *Task) error

EnqueueTask adds a task to the named queue which must already exist

func (*Client) LoadScheduledTaskByName added in v0.0.5

func (c *Client) LoadScheduledTaskByName(name string) (*ScheduledTask, error)

LoadScheduledTaskByName loads a scheduled task by name

func (*Client) LoadTaskByID

func (c *Client) LoadTaskByID(id string) (*Task, error)

LoadTaskByID loads a task from the backend using its ID

Example
client, err := NewClient(NatsContext("WQ"))
panicIfErr(err)

task, err := client.LoadTaskByID("24ErgVol4ZjpoQ8FAima9R2jEHB")
panicIfErr(err)

fmt.Printf("Loaded task %s in state %s", task.ID, task.State)
Output:

func (*Client) NewScheduledTask added in v0.0.5

func (c *Client) NewScheduledTask(name string, schedule string, queue string, task *Task) error

NewScheduledTask creates a new scheduled task, an existing schedule will result in failure

func (*Client) RemoveScheduledTask added in v0.0.5

func (c *Client) RemoveScheduledTask(name string) error

RemoveScheduledTask removes a scheduled task

func (*Client) RetryTaskByID added in v0.0.2

func (c *Client) RetryTaskByID(ctx context.Context, id string) error

RetryTaskByID will retry a task, first removing an entry from the Work Queue if already there

func (*Client) Run

func (c *Client) Run(ctx context.Context, router *Mux) error

Run starts processing messages using the router until error or interruption

func (*Client) ScheduledTasksStorage added in v0.0.5

func (c *Client) ScheduledTasksStorage() ScheduledTaskStorage

ScheduledTasksStorage gives access to administrative functions for task maintenance

func (*Client) StorageAdmin

func (c *Client) StorageAdmin() StorageAdmin

StorageAdmin access admin features of the storage backend

type ClientOpt

type ClientOpt func(opts *ClientOpts) error

ClientOpt configures the client

func BindWorkQueue

func BindWorkQueue(queue string) ClientOpt

BindWorkQueue binds the client to a work queue that should already exist

func ClientConcurrency

func ClientConcurrency(c int) ClientOpt

ClientConcurrency sets the concurrency to use when executing tasks within this client for horizontal scaling. This is capped by the per-queue maximum concurrency set using the queue setting MaxConcurrent. Generally a queue would have a larger concurrency like 100 (DefaultQueueMaxConcurrent) and an individual task processor would be below that. This allows for horizontal and vertical scaling but without unbounded growth - the queue MaxConcurrent is the absolute upper limit for in-flight jobs for 1 specific queue.

func CustomLogger

func CustomLogger(log Logger) ClientOpt

CustomLogger sets a custom logger to use for all logging

func DiscardTaskStates added in v0.0.2

func DiscardTaskStates(states ...TaskState) ClientOpt

DiscardTaskStates configures the client to discard Tasks that reach a final state in the list of supplied TaskState

func DiscardTaskStatesByName added in v0.1.0

func DiscardTaskStatesByName(states ...string) ClientOpt

DiscardTaskStatesByName configures the client to discard Tasks that reach a final state in the list of supplied TaskState

func MemoryStorage

func MemoryStorage() ClientOpt

MemoryStorage enables storing tasks and work queue in memory in JetStream

func NatsConn

func NatsConn(nc *nats.Conn) ClientOpt

NatsConn sets an already connected NATS connection as communications channel

func NatsContext

func NatsContext(c string, opts ...nats.Option) ClientOpt

NatsContext attempts to connect to the NATS client context c

func NoStorageInit added in v0.0.2

func NoStorageInit() ClientOpt

NoStorageInit skips setting up any queues or task stores when creating a client

func PrometheusListenPort

func PrometheusListenPort(port int) ClientOpt

PrometheusListenPort enables prometheus listening on a specific port

func RetryBackoffPolicy

func RetryBackoffPolicy(p RetryPolicyProvider) ClientOpt

RetryBackoffPolicy uses p to schedule job retries, defaults to a linear curve backoff with jitter between 1 and 10 minutes

func RetryBackoffPolicyName added in v0.0.5

func RetryBackoffPolicyName(name string) ClientOpt

RetryBackoffPolicyName uses the policy named to schedule job retries by using RetryPolicyLookup(name)

func StoreReplicas

func StoreReplicas(r uint) ClientOpt

StoreReplicas sets the replica level to keep for the tasks store and work queue

Used only when initially creating the underlying streams.

func TaskRetention

func TaskRetention(r time.Duration) ClientOpt

TaskRetention is the time tasks will be kept for in the task storage

Used only when initially creating the underlying streams.

func WorkQueue

func WorkQueue(queue *Queue) ClientOpt

WorkQueue configures the client to consume messages from a specific queue

When not set the "DEFAULT" queue will be used.

type ClientOpts

type ClientOpts struct {
	// contains filtered or unexported fields
}

ClientOpts configures the client

type HandlerFunc

type HandlerFunc func(ctx context.Context, log Logger, t *Task) (any, error)

HandlerFunc handles a single task, the response bytes will be stored in the original task

type ItemKind

type ItemKind int

ItemKind indicates the kind of job a work queue entry represents

var (
	// TaskItem is a task as defined by Task
	TaskItem ItemKind = 0
)

type LeaderElectedEvent added in v0.0.5

type LeaderElectedEvent struct {
	BaseEvent

	// Name of the process that gained leadership
	Name string `json:"name"`
	// Component is the component that is reporting
	Component string `json:"component"`
}

LeaderElectedEvent notifies that a leader election was won

func NewLeaderElectedEvent added in v0.0.5

func NewLeaderElectedEvent(name string, component string) (*LeaderElectedEvent, error)

NewLeaderElectedEvent creates a new event notifying of a leader election win

type Logger

type Logger interface {
	Debugf(format string, v ...any)
	Infof(format string, v ...any)
	Warnf(format string, v ...any)
	Errorf(format string, v ...any)
}

Logger is a pluggable logger interface

type Mux

type Mux struct {
	// contains filtered or unexported fields
}

Mux routes messages

Note: this will change to be nearer to a server mux and include support for middleware

func NewTaskRouter

func NewTaskRouter() *Mux

NewTaskRouter creates a new Mux

func (*Mux) ExternalProcess added in v0.0.7

func (m *Mux) ExternalProcess(taskType string, command string) error

ExternalProcess sets up a delegated handler that calls an external command to handle the task.

The task will be passed in JSON format on STDIN, any STDOUT/STDERR output will become the task result. Any non 0 exit code will be treated as a task failure.

func (*Mux) HandleFunc

func (m *Mux) HandleFunc(taskType string, h HandlerFunc) error

HandleFunc registers a task for a taskType. The taskType must match exactly with the matching tasks

func (*Mux) Handler

func (m *Mux) Handler(t *Task) HandlerFunc

Handler looks up the handler function for a task

func (*Mux) RequestReply added in v0.0.4

func (m *Mux) RequestReply(taskType string, client *Client) error

RequestReply sets up a delegated handler via NATS Request-Reply

type ProcessItem

type ProcessItem struct {
	Kind  ItemKind `json:"kind"`
	JobID string   `json:"job"`
	// contains filtered or unexported fields
}

ProcessItem is an individual item stored in the work queue

type Queue

type Queue struct {
	// Name is a unique name for the work queue, should be in the character range a-zA-Z0-9
	Name string `json:"name"`
	// MaxAge is the absolute longest time an entry can stay in the queue. When not set items will not expire
	MaxAge time.Duration `json:"max_age"`
	// MaxEntries represents the maximum amount of entries that can be in the queue. When it's full new entries will be rejected. When unset no limit is applied.
	MaxEntries int `json:"max_entries"`
	// DiscardOld indicates that when MaxEntries are reached old entries will be discarded rather than new ones rejected
	DiscardOld bool `json:"discard_old"`
	// MaxTries is the maximum amount of times a entry can be tried, entries will be tried every MaxRunTime with some jitter applied. Default to DefaultMaxTries
	MaxTries int `json:"max_tries"`
	// MaxRunTime is the maximum time a task can be processed. Defaults to DefaultJobRunTime
	MaxRunTime time.Duration `json:"max_runtime"`
	// MaxConcurrent is the total number of in-flight tasks across all active task handlers combined. Defaults to DefaultQueueMaxConcurrent
	MaxConcurrent int `json:"max_concurrent"`
	// NoCreate will not try to create a queue, will bind to an existing one or fail
	NoCreate bool
	// contains filtered or unexported fields
}

Queue represents a work queue

type QueueInfo

type QueueInfo struct {
	// Name is the name of the queue
	Name string `json:"name"`
	// Time is the information was gathered
	Time time.Time `json:"time"`
	// Stream is the active JetStream Stream Information
	Stream *api.StreamInfo `json:"stream_info"`
	// Consumer is the worker stream information
	Consumer *api.ConsumerInfo `json:"consumer_info"`
}

QueueInfo holds information about a queue state

type RetryPolicy

type RetryPolicy struct {
	// Intervals is a range of time periods backoff will be based off
	Intervals []time.Duration
	// Jitter is a factor applied to the specific interval avoid repeating same backoff periods
	Jitter float64
}

RetryPolicy defines a period that failed jobs will be retried against

func (RetryPolicy) Duration

func (p RetryPolicy) Duration(n int) time.Duration

Duration is the period to sleep for try n, it includes a jitter

type RetryPolicyProvider added in v0.0.2

type RetryPolicyProvider interface {
	Duration(n int) time.Duration
}

RetryPolicyProvider is the interface that the ReplyPolicy implements, use this to implement your own exponential backoff system or similar for task retries.

func RetryPolicyLookup added in v0.0.5

func RetryPolicyLookup(name string) (RetryPolicyProvider, error)

RetryPolicyLookup loads a policy by name

type ScheduleWatchEntry added in v0.0.5

type ScheduleWatchEntry struct {
	Name   string
	Task   *ScheduledTask
	Delete bool
}

type ScheduledTask added in v0.0.5

type ScheduledTask struct {
	// Name is a unique name for the scheduled task
	Name string `json:"name"`
	// Schedule is a cron specification for the schedule
	Schedule string `json:"schedule"`
	// Queue is the name of a queue to enqueue the task into
	Queue string `json:"queue"`
	// TaskType is the type of task to create
	TaskType string `json:"task_type"`
	// Payload is the task payload for the enqueued tasks
	Payload []byte `json:"payload"`
	// Deadline is the time after scheduling that the deadline would be
	Deadline time.Duration `json:"deadline,omitempty"`
	// MaxTries is how many times the created task could be tried
	MaxTries int `json:"max_tries"`
	// CreatedAt is when the schedule was created
	CreatedAt time.Time `json:"created_at"`
}

ScheduledTask represents a cron like schedule and task properties that will result in regular new tasks to be created machine schedule

type ScheduledTaskStorage added in v0.0.5

type ScheduledTaskStorage interface {
	SaveScheduledTask(st *ScheduledTask, update bool) error
	LoadScheduledTaskByName(name string) (*ScheduledTask, error)
	DeleteScheduledTaskByName(name string) error
	ScheduledTasks(ctx context.Context) ([]*ScheduledTask, error)
	ScheduledTasksWatch(ctx context.Context) (chan *ScheduleWatchEntry, error)
	EnqueueTask(ctx context.Context, queue *Queue, task *Task) error
	ElectionStorage() (nats.KeyValue, error)
	PublishLeaderElectedEvent(ctx context.Context, name string, component string) error
}

type Storage

type Storage interface {
	SaveTaskState(ctx context.Context, task *Task, notify bool) error
	EnqueueTask(ctx context.Context, queue *Queue, task *Task) error
	RetryTaskByID(ctx context.Context, queue *Queue, id string) error
	LoadTaskByID(id string) (*Task, error)
	DeleteTaskByID(id string) error
	PublishTaskStateChangeEvent(ctx context.Context, task *Task) error
	AckItem(ctx context.Context, item *ProcessItem) error
	NakBlockedItem(ctx context.Context, item *ProcessItem) error
	NakItem(ctx context.Context, item *ProcessItem) error
	TerminateItem(ctx context.Context, item *ProcessItem) error
	PollQueue(ctx context.Context, q *Queue) (*ProcessItem, error)
	PrepareQueue(q *Queue, replicas int, memory bool) error
	PrepareTasks(memory bool, replicas int, retention time.Duration) error
	PrepareConfigurationStore(memory bool, replicas int) error
	SaveScheduledTask(st *ScheduledTask, update bool) error
	LoadScheduledTaskByName(name string) (*ScheduledTask, error)
	DeleteScheduledTaskByName(name string) error
	ScheduledTasks(ctx context.Context) ([]*ScheduledTask, error)
	ScheduledTasksWatch(ctx context.Context) (chan *ScheduleWatchEntry, error)
}

Storage implements the backend access

type StorageAdmin

type StorageAdmin interface {
	Queues() ([]*QueueInfo, error)
	QueueNames() ([]string, error)
	QueueInfo(name string) (*QueueInfo, error)
	PurgeQueue(name string) error
	DeleteQueue(name string) error
	PrepareQueue(q *Queue, replicas int, memory bool) error
	ConfigurationInfo() (*nats.KeyValueBucketStatus, error)
	PrepareConfigurationStore(memory bool, replicas int) error
	PrepareTasks(memory bool, replicas int, retention time.Duration) error
	DeleteTaskByID(id string) error
	TasksInfo() (*TasksInfo, error)
	Tasks(ctx context.Context, limit int32) (chan *Task, error)
	TasksStore() (*jsm.Manager, *jsm.Stream, error)
	ElectionStorage() (nats.KeyValue, error)
}

StorageAdmin is helpers to support the CLI mainly, this leaks a bunch of details about JetStream but that's ok, we're not really intending to change the storage or support more

type Task

type Task struct {
	// ID is a k-sortable unique ID for the task
	ID string `json:"id"`
	// Type is a free form string that can later be used as a routing key to send tasks to handlers
	Type string `json:"type"`
	// Queue is the name of the queue the task was enqueued with, set only during the enqueue operation else empty
	Queue string `json:"queue"`
	// Dependencies are IDs of tasks that should complete before this one becomes unblocked
	Dependencies []string `json:"dependencies,omitempty"`
	// DependentResults are results for dependent tasks
	DependencyResults map[string]*TaskResult `json:"dependency_results,omitempty"`
	// LoadDependencies indicates if this task should load dependency results before execting
	LoadDependencies bool `json:"load_dependencies,omitempty"`
	// Payload is a JSON representation of the associated work
	Payload []byte `json:"payload"`
	// Deadline is a cut-off time for the job to complete, should a job be scheduled after this time it will fail.
	// In-Flight jobs are allowed to continue past this time. Only starting handlers are impacted by this deadline.
	Deadline *time.Time `json:"deadline,omitempty"`
	// MaxTries sets a per task maximum try limit. If this task is in a queue that allow fewer tries the queue max tries
	// will override this setting.  A task may not exceed the work queue max tries
	MaxTries int `json:"max_tries"`
	// Result is the outcome of the job, only set for successful jobs
	Result *TaskResult `json:"result,omitempty"`
	// State is the most recent recorded state the job is in
	State TaskState `json:"state"`
	// CreatedAt is the time the job was created in UTC timezone
	CreatedAt time.Time `json:"created"`
	// LastTriedAt is a time stamp for when the job was last handed to a handler
	LastTriedAt *time.Time `json:"tried,omitempty"`
	// Tries is how many times the job was handled
	Tries int `json:"tries"`
	// LastErr is the most recent handling error if any
	LastErr string `json:"last_err,omitempty"`
	// contains filtered or unexported fields
}

Task represents a job item that handlers will execute

func NewTask

func NewTask(taskType string, payload any, opts ...TaskOpt) (*Task, error)

NewTask creates a new task of taskType that can later be used to route tasks to handlers. The task will carry a JSON encoded representation of payload.

Example (With_deadline)
email := newEmail("user@example.net", "Test Subject", "Test Body")

// Creates a new task that has a deadline for processing 1 hour from now
task, err := NewTask("email:send", email, TaskDeadline(time.Now().Add(time.Hour)))
if err != nil {
	panic(fmt.Sprintf("Could not create task: %v", err))
}

fmt.Printf("Task ID: %s\n", task.ID)
Output:

func (*Task) HasDependencies added in v0.1.0

func (t *Task) HasDependencies() bool

HasDependencies determines if the task has any dependencies

func (*Task) IsPastDeadline added in v0.0.2

func (t *Task) IsPastDeadline() bool

IsPastDeadline determines if the task is past it's deadline

type TaskOpt

type TaskOpt func(*Task) error

TaskOpt configures Tasks made using NewTask()

func TaskDeadline

func TaskDeadline(deadline time.Time) TaskOpt

TaskDeadline sets an absolute time after which the task should not be handled

func TaskDependsOn added in v0.1.0

func TaskDependsOn(tasks ...*Task) TaskOpt

TaskDependsOn are Tasks that this task is dependent on, can be called multiple times

func TaskDependsOnIDs added in v0.1.0

func TaskDependsOnIDs(ids ...string) TaskOpt

TaskDependsOnIDs are IDs that this task is dependent on, can be called multiple times

func TaskMaxTries added in v0.0.5

func TaskMaxTries(tries int) TaskOpt

TaskMaxTries sets a maximum to the amount of processing attempts a task will have, the queue max tries will override this

func TaskRequiresDependencyResults added in v0.1.0

func TaskRequiresDependencyResults() TaskOpt

TaskRequiresDependencyResults indicates that if a task has any dependencies their results should be loaded before execution

type TaskResult

type TaskResult struct {
	Payload     any       `json:"payload"`
	CompletedAt time.Time `json:"completed"`
}

TaskResult is the result of task execution, this will only be set for successfully processed jobs

type TaskScheduler added in v0.0.5

type TaskScheduler struct {
	// contains filtered or unexported fields
}

func NewTaskScheduler added in v0.0.5

func NewTaskScheduler(name string, c *Client) (*TaskScheduler, error)

NewTaskScheduler creates a new Task Scheduler service

func (*TaskScheduler) Count added in v0.0.5

func (s *TaskScheduler) Count() int

Count reports how many schedules are managed by this Scheduler

func (*TaskScheduler) Run added in v0.0.5

func (s *TaskScheduler) Run(ctx context.Context, wg *sync.WaitGroup) error

Run starts the Task Scheduler, it will block until done

func (*TaskScheduler) Stop added in v0.0.5

func (s *TaskScheduler) Stop()

Stop stops the scheduler service

type TaskState

type TaskState string

TaskState indicates the current state a task is in

const (
	// TaskStateUnknown is for tasks that do not have a state set
	TaskStateUnknown TaskState = ""
	// TaskStateNew newly created tasks that have not been handled yet
	TaskStateNew TaskState = "new"
	// TaskStateActive tasks that are currently being handled
	TaskStateActive TaskState = "active"
	// TaskStateRetry tasks that previously failed and are waiting retry
	TaskStateRetry TaskState = "retry"
	// TaskStateExpired tasks that reached their deadline or maximum tries
	TaskStateExpired TaskState = "expired"
	// TaskStateTerminated indicates that the task was terminated via the ErrTerminateTask error
	TaskStateTerminated TaskState = "terminated"
	// TaskStateCompleted tasks that are completed
	TaskStateCompleted TaskState = "complete"
	// TaskStateQueueError tasks that could not have their associated Work Queue item created
	TaskStateQueueError TaskState = "queue_error"
	// TaskStateBlocked tasks that are waiting on dependencies
	TaskStateBlocked TaskState = "blocked"
	// TaskStateUnreachable tasks that could not be run due to dependency problems
	TaskStateUnreachable TaskState = "unreachable"
)

type TaskStateChangeEvent added in v0.0.2

type TaskStateChangeEvent struct {
	BaseEvent

	// TaskID is the ID of the task, use with LoadTaskByID() to access the task
	TaskID string `json:"task_id"`
	// State is the new state of the Task
	State TaskState `json:"state"`
	// Tries is how many times the Task has been processed
	Tries int `json:"tries"`
	// Queue is the queue the task is in, can be empty
	Queue string `json:"queue,omitempty"`
	// TaskType is the task routing type
	TaskType string `json:"task_type"`
	// LstErr is the error that caused a task to change state for error state changes
	LastErr string `json:"last_error,omitempty"`
	// Age is the time since the task was created in milliseconds
	Age time.Duration `json:"task_age,omitempty"`
}

TaskStateChangeEvent notifies that a significant change occurred in a Task

func NewTaskStateChangeEvent added in v0.0.2

func NewTaskStateChangeEvent(t *Task) (*TaskStateChangeEvent, error)

NewTaskStateChangeEvent creates a new event notifying of a change in task state

type TasksInfo

type TasksInfo struct {
	// Time is the information was gathered
	Time time.Time `json:"time"`
	// Stream is the active JetStream Stream Information
	Stream *api.StreamInfo `json:"stream_info"`
}

TasksInfo is state about the tasks store

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL