Documentation
¶
Index ¶
- Constants
- Variables
- func IsRetryPolicyKnown(name string) bool
- func IsValidName(name string) bool
- func ParseEventJSON(event []byte) (any, string, error)
- func RequestReplySubjectForTaskType(taskType string) string
- func RetryPolicyNames() []string
- func RetrySleep(ctx context.Context, p RetryPolicyProvider, n int) error
- type BaseEvent
- type Client
- func (c *Client) EnqueueTask(ctx context.Context, task *Task) error
- func (c *Client) LoadScheduledTaskByName(name string) (*ScheduledTask, error)
- func (c *Client) LoadTaskByID(id string) (*Task, error)
- func (c *Client) NewScheduledTask(name string, schedule string, queue string, task *Task) error
- func (c *Client) RemoveScheduledTask(name string) error
- func (c *Client) RetryTaskByID(ctx context.Context, id string) error
- func (c *Client) Run(ctx context.Context, router *Mux) error
- func (c *Client) ScheduledTasksStorage() ScheduledTaskStorage
- func (c *Client) StorageAdmin() StorageAdmin
- type ClientOpt
- func BindWorkQueue(queue string) ClientOpt
- func ClientConcurrency(c int) ClientOpt
- func CustomLogger(log Logger) ClientOpt
- func DiscardTaskStates(states ...TaskState) ClientOpt
- func DiscardTaskStatesByName(states ...string) ClientOpt
- func MemoryStorage() ClientOpt
- func NatsConn(nc *nats.Conn) ClientOpt
- func NatsContext(c string, opts ...nats.Option) ClientOpt
- func NoStorageInit() ClientOpt
- func PrometheusListenPort(port int) ClientOpt
- func RetryBackoffPolicy(p RetryPolicyProvider) ClientOpt
- func RetryBackoffPolicyName(name string) ClientOpt
- func StoreReplicas(r uint) ClientOpt
- func TaskRetention(r time.Duration) ClientOpt
- func TaskSignaturesOptional() ClientOpt
- func TaskSigningKey(pk ed25519.PrivateKey) ClientOpt
- func TaskSigningSeedFile(sf string) ClientOpt
- func TaskVerificationKey(pk ed25519.PublicKey) ClientOpt
- func TaskVerificationKeyFile(sf string) ClientOpt
- func TaskVerificationKeyHexEncoded(pks string) ClientOpt
- func WorkQueue(queue *Queue) ClientOpt
- type ClientOpts
- type HandlerFunc
- type ItemKind
- type LeaderElectedEvent
- type Logger
- type Middleware
- type Mux
- func (m *Mux) ExternalProcess(taskType string, command string) error
- func (m *Mux) HandleFunc(taskType string, h HandlerFunc, mws ...Middleware) error
- func (m *Mux) Handler(t *Task) HandlerFunc
- func (m *Mux) RequestReply(taskType string, client *Client) error
- func (m *Mux) Use(mws ...Middleware) error
- type ProcessItem
- type Queue
- type QueueInfo
- type RetryPolicy
- type RetryPolicyProvider
- type ScheduleWatchEntry
- type ScheduledTask
- type ScheduledTaskStorage
- type Storage
- type StorageAdmin
- type Task
- type TaskOpt
- type TaskPayloadEncoderFunc
- type TaskResult
- type TaskScheduler
- type TaskState
- type TaskStateChangeEvent
- type TasksInfo
Examples ¶
Constants ¶
const ( // ShortedScheduledDeadline is the shortest deadline a scheduled task may have ShortedScheduledDeadline = 30 * time.Second // DefaultJobRunTime when not configured for a queue this is the default run-time handlers will get DefaultJobRunTime = time.Hour // DefaultMaxTries when not configured for a task this is the default tries it will get DefaultMaxTries = 10 // DefaultQueueMaxConcurrent when not configured for a queue this is the default concurrency setting DefaultQueueMaxConcurrent = 100 )
const ( // TaskStateChangeEventType is the event type for TaskStateChangeEvent events TaskStateChangeEventType = "io.choria.asyncjobs.v1.task_state" // LeaderElectedEventType is the event type for LeaderElectedEvent events LeaderElectedEventType = "io.choria.asyncjobs.v1.leader_elected" )
const ( // RequestReplyContentTypeHeader is the header text sent to indicate the body encoding and type RequestReplyContentTypeHeader = "AJ-Content-Type" // RequestReplyDeadlineHeader is the header indicating the deadline for processing the item RequestReplyDeadlineHeader = "AJ-Handler-Deadline" // RequestReplyTerminateError is the header to send in a reply that the task should be terminated via ErrTerminateTask RequestReplyTerminateError = "AJ-Terminate" // RequestReplyError is the header indicating a generic failure in handling an item RequestReplyError = "AJ-Error" // RequestReplyTaskType is the content type indicating the payload is a Task in JSON format RequestReplyTaskType = "application/x-asyncjobs-task+json" )
const ( // TasksStreamName is the name of the JetStream Stream storing tasks TasksStreamName = "CHORIA_AJ_TASKS" // TasksStreamSubjects is a NATS wildcard matching all tasks TasksStreamSubjects = "CHORIA_AJ.T.*" // TasksStreamSubjectPattern is the printf pattern that can be used to find an individual task by its task ID TasksStreamSubjectPattern = "CHORIA_AJ.T.%s" // EventsSubjectWildcard is the NATS wildcard for receiving all events EventsSubjectWildcard = "CHORIA_AJ.E.>" // TaskStateChangeEventSubjectPattern is a printf pattern for determining the event publish subject TaskStateChangeEventSubjectPattern = "CHORIA_AJ.E.task_state.%s" // TaskStateChangeEventSubjectWildcard is a NATS wildcard for receiving all TaskStateChangeEvent messages TaskStateChangeEventSubjectWildcard = "CHORIA_AJ.E.task_state.*" // LeaderElectedEventSubjectPattern is the pattern for determining the event publish subject LeaderElectedEventSubjectPattern = "CHORIA_AJ.E.leader_election.%s" // LeaderElectedEventSubjectWildcard is the NATS wildcard for receiving all LeaderElectedEvent messages LeaderElectedEventSubjectWildcard = "CHORIA_AJ.E.leader_election.>" // WorkStreamNamePattern is the printf pattern for determining JetStream Stream names per queue WorkStreamNamePattern = "CHORIA_AJ_Q_%s" // WorkStreamSubjectPattern is the printf pattern individual items are placed in, placeholders for JobID and JobType WorkStreamSubjectPattern = "CHORIA_AJ.Q.%s.%s" // WorkStreamSubjectWildcard is a NATS filter matching all enqueued items for any task store WorkStreamSubjectWildcard = "CHORIA_AJ.Q.>" // WorkStreamNamePrefix is the prefix that, when removed, reveals the queue name WorkStreamNamePrefix = "CHORIA_AJ_Q_" // RequestReplyTaskHandlerPattern is the subject request reply task handlers should listen on by default RequestReplyTaskHandlerPattern = "CHORIA_AJ.H.T.%s" // ConfigBucketName is the KV bucket for configuration like scheduled tasks ConfigBucketName = "CHORIA_AJ_CONFIGURATION" // LeaderElectionBucketName is the KV bucket that will manage leader elections LeaderElectionBucketName = "CHORIA_AJ_ELECTIONS" )
Variables ¶
var ( // ErrTaskNotFound is the error indicating a task does not exist rather than a failure to load ErrTaskNotFound = errors.New("task not found") // ErrTerminateTask indicates that a task failed, and no further processing attempts should be made ErrTerminateTask = fmt.Errorf("terminate task") // ErrNoTasks indicates the task store is empty ErrNoTasks = fmt.Errorf("no tasks found") // ErrTaskPastDeadline indicates a task that was scheduled for handling is past its deadline ErrTaskPastDeadline = fmt.Errorf("past deadline") // ErrTaskExceedsMaxTries indicates a task exceeded its maximum attempts ErrTaskExceedsMaxTries = fmt.Errorf("exceeded maximum tries") // ErrTaskAlreadyActive indicates that a task is already in the active state ErrTaskAlreadyActive = fmt.Errorf("task already active") // ErrTaskTypeCannotEnqueue indicates that a task is in a state where it cannot be enqueued as new ErrTaskTypeCannotEnqueue = fmt.Errorf("cannot enqueue a task in state") // ErrTaskUpdateFailed indicates a task update failed ErrTaskUpdateFailed = fmt.Errorf("failed updating task state") // ErrTaskAlreadyInState indicates an update failed because a task was already in the desired state ErrTaskAlreadyInState = fmt.Errorf("%w, already in desired state", ErrTaskUpdateFailed) // ErrTaskLoadFailed indicates a task failed for an unknown reason ErrTaskLoadFailed = fmt.Errorf("loading task failed") // ErrTaskTypeRequired indicates an empty task type was given ErrTaskTypeRequired = fmt.Errorf("task type is required") // ErrTaskTypeInvalid indicates an invalid task type was given ErrTaskTypeInvalid = fmt.Errorf("task type is invalid") // ErrTaskIDInvalid indicates an invalid ID type was given ErrTaskIDInvalid = fmt.Errorf("task ID is invalid") // ErrTaskDependenciesFailed indicates that the task cannot be run as its dependencies failed ErrTaskDependenciesFailed = fmt.Errorf("task dependencies failed") // ErrTaskAlreadySigned indicates a task is already signed ErrTaskAlreadySigned = fmt.Errorf("task is already signed") // ErrTaskPayloadEncoderAlreadySet indicates that the task already has an encoder for the payload ErrTaskPayloadEncoderAlreadySet = fmt.Errorf("task payload encoder is already set") // ErrTaskSignatureRequiresQueue indicates a signature request was made without configuring the queue name for a task ErrTaskSignatureRequiresQueue = fmt.Errorf("signing a task requires the queue to be set") // ErrTaskNotSigned indicates a task was loaded that had no signature while signatures are required ErrTaskNotSigned = fmt.Errorf("task is not signed") // ErrTaskSignatureInvalid indicates a signature did not pass validation ErrTaskSignatureInvalid = fmt.Errorf("invalid task signature") // ErrNoHandlerForTaskType indicates that a task could not be handled by any known handlers ErrNoHandlerForTaskType = fmt.Errorf("no handler for task type") // ErrDuplicateHandlerForTaskType indicates a task handler for a specific type is already registered ErrDuplicateHandlerForTaskType = fmt.Errorf("duplicate handler for task type") // ErrInvalidMiddleware indicates a nil Middleware was passed to Use or HandleFunc ErrInvalidMiddleware = fmt.Errorf("invalid middleware") // ErrInvalidHeaders indicates that message headers from JetStream were not valid ErrInvalidHeaders = fmt.Errorf("coult not decode headers") // ErrContextWithoutDeadline indicates a context.Context was passed without deadline when it was expected ErrContextWithoutDeadline = fmt.Errorf("non deadline context given") // ErrInvalidStorageItem indicates a Work Queue item had no JetStream state associated with it ErrInvalidStorageItem = fmt.Errorf("invalid storage item") // ErrNoNatsConn indicates that a nil connection was supplied ErrNoNatsConn = fmt.Errorf("no NATS connection supplied") // ErrNoMux indicates that a processor was started with no routing mux configured ErrNoMux = fmt.Errorf("mux is required") // ErrStorageNotReady indicates the underlying storage is not ready ErrStorageNotReady = fmt.Errorf("storage not ready") // ErrQueueNotFound is the error indicating a queue does not exist rather than a failure to load ErrQueueNotFound = errors.New("queue not found") // ErrQueueConsumerNotFound indicates that the Work Queue store has no consumers defined ErrQueueConsumerNotFound = errors.New("queue consumer not found") // ErrQueueNameRequired indicates a queue has no name ErrQueueNameRequired = fmt.Errorf("queue name is required") // ErrQueueItemCorrupt indicates that an item received from the work queue was invalid - perhaps invalid JSON ErrQueueItemCorrupt = fmt.Errorf("corrupt queue item received") // ErrQueueItemInvalid is an item read from the queue with no data or obviously bad data ErrQueueItemInvalid = fmt.Errorf("invalid queue item received") // ErrInvalidQueueState indicates a queue was attempted to be used but no internal state is known of that queue ErrInvalidQueueState = fmt.Errorf("invalid queue storage state") // ErrDuplicateItem indicates that the Work Queue deduplication protection refused a message ErrDuplicateItem = fmt.Errorf("duplicate work queue item") // ErrExternalCommandNotFound indicates a command for an ExternalProcess handler was not found ErrExternalCommandNotFound = fmt.Errorf("command not found") // ErrExternalCommandFailed indicates a command for an ExternalProcess handler failed ErrExternalCommandFailed = fmt.Errorf("execution failed") // ErrUnknownEventType indicates that while parsing an event an unknown type of event was encountered ErrUnknownEventType = fmt.Errorf("unknown event type") // ErrUnknownRetryPolicy indicates the requested retry policy does not exist ErrUnknownRetryPolicy = fmt.Errorf("unknown retry policy") // ErrUnknownDiscardPolicy indicates a discard policy could not be found matching a name ErrUnknownDiscardPolicy = fmt.Errorf("unknown discard policy") // ErrInvalidPrivateKey indicates the private key is not valid ErrInvalidPrivateKey = fmt.Errorf("invalid private key length") // ErrRequestReplyFailed indicates a callout to a remote handler failed due to a timeout, lack of listeners or network error ErrRequestReplyFailed = fmt.Errorf("request-reply callout failed") // ErrRequestReplyNoDeadline indicates a request-reply handler was called without a deadline ErrRequestReplyNoDeadline = fmt.Errorf("request-reply requires deadline context") // ErrRequestReplyShortDeadline indicates a deadline context has a too short timeout ErrRequestReplyShortDeadline = fmt.Errorf("deadline too short") // ErrScheduleNameIsRequired indicates a schedule name is needed when creating new schedules ErrScheduleNameIsRequired = errors.New("name is required") // ErrScheduleNameInvalid indicates the name given to a task is invalid ErrScheduleNameInvalid = errors.New("name is invalid") // ErrScheduleIsRequired indicates a cron schedule must be supplied when creating new schedules ErrScheduleIsRequired = errors.New("schedule is required") // ErrScheduleInvalid indicates an invalid cron schedule was supplied ErrScheduleInvalid = errors.New("invalid cron schedule") // ErrScheduledTaskAlreadyExist indicates a scheduled task that was being created already existed ErrScheduledTaskAlreadyExist = errors.New("scheduled task already exist") // ErrScheduledTaskNotFound indicates the requested task does not exist ErrScheduledTaskNotFound = errors.New("scheduled task not found") // ErrScheduledTaskInvalid indicates a loaded task was invalid ErrScheduledTaskInvalid = errors.New("invalid scheduled task") // ErrScheduledTaskShortDeadline indicates the time allowed for task execution is too short ErrScheduledTaskShortDeadline = errors.New("deadline too short") )
var ( // RetryLinearTenMinutes is a 50-step policy between 1 and 10 minutes RetryLinearTenMinutes = linearPolicy(50, 0.90, time.Minute, 10*time.Minute) // RetryLinearOneHour is a 50-step policy between 10 minutes and 1 hour RetryLinearOneHour = linearPolicy(20, 0.90, 10*time.Minute, 60*time.Minute) // RetryLinearOneMinute is a 20-step policy between 1 second and 1 minute RetryLinearOneMinute = linearPolicy(20, 0.5, time.Second, time.Minute) // RetryDefault is the default retry policy RetryDefault = RetryLinearTenMinutes )
Functions ¶
func IsRetryPolicyKnown ¶ added in v0.0.5
IsRetryPolicyKnown determines if the named policy exist
func IsValidName ¶ added in v0.0.5
IsValidName is a generic strict name validator for what we want people to put in name - task names etc, things that turn into subjects
func ParseEventJSON ¶ added in v0.0.2
ParseEventJSON parses event bytes returning the parsed Event and its event type
func RequestReplySubjectForTaskType ¶ added in v0.1.0
RequestReplySubjectForTaskType returns the subject a request-reply handler should listen on for a specified task type
func RetryPolicyNames ¶ added in v0.0.5
func RetryPolicyNames() []string
RetryPolicyNames returns a list of pre-generated retry policies
func RetrySleep ¶ added in v0.0.2
func RetrySleep(ctx context.Context, p RetryPolicyProvider, n int) error
RetrySleep sleeps for the duration for try n or until interrupted by ctx
Types ¶
type BaseEvent ¶ added in v0.0.2
type BaseEvent struct {
EventID string `json:"event_id"`
EventType string `json:"type"`
TimeStamp time.Time `json:"timestamp"`
}
BaseEvent is present in all event types and can be used to detect the type
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client connects Task producers and Task handlers to the backend
Example (Consumer) ¶
queue := &Queue{
Name: "P100",
MaxRunTime: 60 * time.Minute,
MaxConcurrent: 20,
MaxTries: 100,
}
// Uses the NATS CLI context WQ for connection details, will create the queue if
// it does not already exist
client, err := NewClient(NatsContext("WQ"), WorkQueue(queue), RetryBackoffPolicy(RetryLinearOneHour))
panicIfErr(err)
router := NewTaskRouter()
err = router.HandleFunc("email:send", func(_ context.Context, _ Logger, t *Task) (any, error) {
log.Printf("Processing task: %s", t.ID)
// handle task.Payload which is a JSON encoded email
// task record will be updated with this payload result
return "success", nil
})
panicIfErr(err)
// Starts handling registered tasks, blocks until canceled
err = client.Run(context.Background(), router)
panicIfErr(err)
Example (Producer) ¶
queue := &Queue{
Name: "P100",
MaxRunTime: 60 * time.Minute,
MaxConcurrent: 20,
MaxTries: 100,
}
email := newEmail("user@example.net", "Test Subject", "Test Body")
// Creates a new task that has a deadline for processing 1 hour from now
task, err := NewTask("email:send", email, TaskDeadline(time.Now().Add(time.Hour)))
panicIfErr(err)
// Uses the NATS CLI context WQ for connection details, will create the queue if
// it does not already exist
client, err := NewClient(NatsContext("WQ"), WorkQueue(queue))
panicIfErr(err)
// Adds the task to the queue called P100
err = client.EnqueueTask(context.Background(), task)
panicIfErr(err)
func NewClient ¶
NewClient creates a new client, one of NatsConn() or NatsContext() must be passed, other options are optional.
When no Queue() is supplied a default queue called DEFAULT will be used
func (*Client) EnqueueTask ¶
EnqueueTask adds a task to the named queue which must already exist
func (*Client) LoadScheduledTaskByName ¶ added in v0.0.5
func (c *Client) LoadScheduledTaskByName(name string) (*ScheduledTask, error)
LoadScheduledTaskByName loads a scheduled task by name
func (*Client) LoadTaskByID ¶
LoadTaskByID loads a task from the backend using its ID
Example ¶
client, err := NewClient(NatsContext("WQ"))
panicIfErr(err)
task, err := client.LoadTaskByID("24ErgVol4ZjpoQ8FAima9R2jEHB")
panicIfErr(err)
fmt.Printf("Loaded task %s in state %s", task.ID, task.State)
func (*Client) NewScheduledTask ¶ added in v0.0.5
NewScheduledTask creates a new scheduled task, an existing schedule will result in failure
func (*Client) RemoveScheduledTask ¶ added in v0.0.5
RemoveScheduledTask removes a scheduled task
func (*Client) RetryTaskByID ¶ added in v0.0.2
RetryTaskByID will retry a task, first removing an entry from the Work Queue if already there
func (*Client) ScheduledTasksStorage ¶ added in v0.0.5
func (c *Client) ScheduledTasksStorage() ScheduledTaskStorage
ScheduledTasksStorage gives access to administrative functions for task maintenance
func (*Client) StorageAdmin ¶
func (c *Client) StorageAdmin() StorageAdmin
StorageAdmin access admin features of the storage backend
type ClientOpt ¶
type ClientOpt func(opts *ClientOpts) error
ClientOpt configures the client
func BindWorkQueue ¶
BindWorkQueue binds the client to a work queue that should already exist
func ClientConcurrency ¶
ClientConcurrency sets the concurrency to use when executing tasks within this client for horizontal scaling. This is capped by the per-queue maximum concurrency set using the queue setting MaxConcurrent. Generally a queue would have a larger concurrency like 100 (DefaultQueueMaxConcurrent) and an individual task processor would be below that. This allows for horizontal and vertical scaling but without unbounded growth - the queue MaxConcurrent is the absolute upper limit for in-flight jobs for 1 specific queue.
func CustomLogger ¶
CustomLogger sets a custom logger to use for all logging
func DiscardTaskStates ¶ added in v0.0.2
DiscardTaskStates configures the client to discard Tasks that reach a final state in the list of supplied TaskState
func DiscardTaskStatesByName ¶ added in v0.1.0
DiscardTaskStatesByName configures the client to discard Tasks that reach a final state in the list of supplied TaskState
func MemoryStorage ¶
func MemoryStorage() ClientOpt
MemoryStorage enables storing tasks and work queue in memory in JetStream
func NatsContext ¶
NatsContext attempts to connect to the NATS client context c
func NoStorageInit ¶ added in v0.0.2
func NoStorageInit() ClientOpt
NoStorageInit skips setting up any queues or task stores when creating a client
func PrometheusListenPort ¶
PrometheusListenPort enables prometheus listening on a specific port
func RetryBackoffPolicy ¶
func RetryBackoffPolicy(p RetryPolicyProvider) ClientOpt
RetryBackoffPolicy uses p to schedule job retries, defaults to a linear curve backoff with jitter between 1 and 10 minutes
func RetryBackoffPolicyName ¶ added in v0.0.5
RetryBackoffPolicyName uses the policy named to schedule job retries by using RetryPolicyLookup(name)
func StoreReplicas ¶
StoreReplicas sets the replica level to keep for the tasks store and work queue
Used only when initially creating the underlying streams.
func TaskRetention ¶
TaskRetention is the time tasks will be kept for in the task storage
Used only when initially creating the underlying streams.
func TaskSignaturesOptional ¶ added in v0.2.0
func TaskSignaturesOptional() ClientOpt
TaskSignaturesOptional indicates that only signed tasks can be loaded
func TaskSigningKey ¶ added in v0.2.0
func TaskSigningKey(pk ed25519.PrivateKey) ClientOpt
TaskSigningKey sets a key used to sign tasks, will be kept in memory for the duration
func TaskSigningSeedFile ¶ added in v0.2.0
TaskSigningSeedFile sets the path to a file holding a ed25519 seed, will be used for signing and verification and wiped between uses
func TaskVerificationKey ¶ added in v0.2.0
TaskVerificationKey sets a public key used to verify tasks
func TaskVerificationKeyFile ¶ added in v0.2.0
TaskVerificationKeyFile sets the path to a file holding a ed25519 public key, will be used for verification of tasks
func TaskVerificationKeyHexEncoded ¶ added in v0.2.0
TaskVerificationKeyHexEncoded sets a public key used to verify tasks, hex encoded string
type ClientOpts ¶
type ClientOpts struct {
// contains filtered or unexported fields
}
ClientOpts configures the client
type HandlerFunc ¶
HandlerFunc handles a single task, the response bytes will be stored in the original task
type ItemKind ¶
type ItemKind int
ItemKind indicates the kind of job a work queue entry represents
var ( // TaskItem is a task as defined by Task TaskItem ItemKind = 0 )
type LeaderElectedEvent ¶ added in v0.0.5
type LeaderElectedEvent struct {
BaseEvent
// Name of the process that gained leadership
Name string `json:"name"`
// Component is the component that is reporting
Component string `json:"component"`
}
LeaderElectedEvent notifies that a leader election was won
func NewLeaderElectedEvent ¶ added in v0.0.5
func NewLeaderElectedEvent(name string, component string) (*LeaderElectedEvent, error)
NewLeaderElectedEvent creates a new event notifying of a leader election win
type Logger ¶
type Logger interface {
Debugf(format string, v ...any)
Infof(format string, v ...any)
Warnf(format string, v ...any)
Errorf(format string, v ...any)
}
Logger is a pluggable logger interface
type Middleware ¶ added in v0.3.0
type Middleware func(HandlerFunc) HandlerFunc
Middleware wraps a HandlerFunc to add cross-cutting behavior like logging, metrics, tracing, authentication, or panic recovery.
A middleware should normally invoke next(ctx, log, t) and return its result and error unchanged unless deliberately transforming them. To short-circuit (for example on an authentication failure) return without calling next.
Middleware must use the (ctx, log, t) arguments passed to the returned closure. Capturing values from the surrounding scope at construction time will leak them across dispatches.
A typical pass-through middleware looks like:
func Logging(next HandlerFunc) HandlerFunc {
return func(ctx context.Context, log Logger, t *Task) (any, error) {
start := time.Now()
res, err := next(ctx, log, t)
log.Infof("task %s %s took %s err=%v", t.Type, t.ID, time.Since(start), err)
return res, err
}
}
func Chain ¶ added in v0.3.0
func Chain(mws ...Middleware) Middleware
Chain composes middleware into a single Middleware preserving order, so Chain(a, b, c) wraps a handler such that a runs outermost, then b, then c. It is useful for building reusable bundles like auth+logging+metrics that can be passed to Use or HandleFunc as one value.
type Mux ¶
type Mux struct {
// contains filtered or unexported fields
}
Mux routes tasks to handlers and supports global and per-route middleware.
func (*Mux) ExternalProcess ¶ added in v0.0.7
ExternalProcess sets up a delegated handler that calls an external command to handle the task.
The task will be passed in JSON format on STDIN, any STDOUT/STDERR output will become the task result. Any non 0 exit code will be treated as a task failure.
func (*Mux) HandleFunc ¶
func (m *Mux) HandleFunc(taskType string, h HandlerFunc, mws ...Middleware) error
HandleFunc registers h for an exact taskType match. Optional per-route middleware in mws is applied inside any middleware registered via Use, in the order given (first-registered runs outermost). Returns ErrDuplicateHandlerForTaskType if taskType is already registered, or ErrInvalidMiddleware if any of mws is nil.
func (*Mux) Handler ¶
func (m *Mux) Handler(t *Task) HandlerFunc
Handler looks up the handler function for a task. The returned function has any registered global and per-route middleware already applied. Tasks with no matching handler resolve to a built-in handler that returns ErrNoHandlerForTaskType; that handler is intentionally not wrapped by middleware so unrouted tasks do not generate logging or metric noise.
func (*Mux) RequestReply ¶ added in v0.0.4
RequestReply sets up a delegated handler via NATS Request-Reply
func (*Mux) Use ¶ added in v0.3.0
func (m *Mux) Use(mws ...Middleware) error
Use appends middleware that will be applied to every registered handler. Middleware registered earlier runs outermost: Use(Recovery, Logging) yields a chain like Recovery(Logging(Per-route(handler))) on dispatch. Middleware registered via Use always wraps any per-route middleware passed to HandleFunc.
Use may be called before or after HandleFunc; existing handlers are rewrapped so subsequent dispatches see the new chain. Dispatches already in flight keep the chain they previously resolved.
Returns ErrInvalidMiddleware if any of mws is nil. The built-in handler returned for unrouted task types is not wrapped.
type ProcessItem ¶
type ProcessItem struct {
Kind ItemKind `json:"kind"`
JobID string `json:"job"`
// contains filtered or unexported fields
}
ProcessItem is an individual item stored in the work queue
type Queue ¶
type Queue struct {
// Name is a unique name for the work queue, should be in the character range a-zA-Z0-9
Name string `json:"name"`
// MaxAge is the absolute longest time an entry can stay in the queue. When not set items will not expire
MaxAge time.Duration `json:"max_age"`
// MaxEntries represents the maximum amount of entries that can be in the queue. When it's full new entries will be rejected. When unset no limit is applied.
MaxEntries int `json:"max_entries"`
// DiscardOld indicates that when MaxEntries are reached old entries will be discarded rather than new ones rejected
DiscardOld bool `json:"discard_old"`
// MaxTries is the maximum amount of times a entry can be tried, entries will be tried every MaxRunTime with some jitter applied. Default to DefaultMaxTries
MaxTries int `json:"max_tries"`
// MaxRunTime is the maximum time a task can be processed. Defaults to DefaultJobRunTime
MaxRunTime time.Duration `json:"max_runtime"`
// MaxConcurrent is the total number of in-flight tasks across all active task handlers combined. Defaults to DefaultQueueMaxConcurrent
MaxConcurrent int `json:"max_concurrent"`
// NoCreate will not try to create a queue, will bind to an existing one or fail
NoCreate bool
// contains filtered or unexported fields
}
Queue represents a work queue
type QueueInfo ¶
type QueueInfo struct {
// Name is the name of the queue
Name string `json:"name"`
// Time is the information was gathered
Time time.Time `json:"time"`
// Stream is the active JetStream Stream Information
Stream *api.StreamInfo `json:"stream_info"`
// Consumer is the worker stream information
Consumer *api.ConsumerInfo `json:"consumer_info"`
}
QueueInfo holds information about a queue state
type RetryPolicy ¶
type RetryPolicy struct {
// Intervals is a range of time periods backoff will be based off
Intervals []time.Duration
// Jitter is a factor applied to the specific interval avoid repeating same backoff periods
Jitter float64
}
RetryPolicy defines a period that failed jobs will be retried against
type RetryPolicyProvider ¶ added in v0.0.2
RetryPolicyProvider is the interface that the ReplyPolicy implements, use this to implement your own exponential backoff system or similar for task retries.
func RetryPolicyLookup ¶ added in v0.0.5
func RetryPolicyLookup(name string) (RetryPolicyProvider, error)
RetryPolicyLookup loads a policy by name
type ScheduleWatchEntry ¶ added in v0.0.5
type ScheduleWatchEntry struct {
Name string
Task *ScheduledTask
Delete bool
}
type ScheduledTask ¶ added in v0.0.5
type ScheduledTask struct {
// Name is a unique name for the scheduled task
Name string `json:"name"`
// Schedule is a cron specification for the schedule
Schedule string `json:"schedule"`
// Queue is the name of a queue to enqueue the task into
Queue string `json:"queue"`
// TaskType is the type of task to create
TaskType string `json:"task_type"`
// Payload is the task payload for the enqueued tasks
Payload []byte `json:"payload"`
// Deadline is the time after scheduling that the deadline would be
Deadline time.Duration `json:"deadline,omitempty"`
// MaxTries is how many times the created task could be tried
MaxTries int `json:"max_tries"`
// CreatedAt is when the schedule was created
CreatedAt time.Time `json:"created_at"`
}
ScheduledTask represents a cron like schedule and task properties that will result in regular new tasks to be created machine schedule
type ScheduledTaskStorage ¶ added in v0.0.5
type ScheduledTaskStorage interface {
SaveScheduledTask(st *ScheduledTask, update bool) error
LoadScheduledTaskByName(name string) (*ScheduledTask, error)
DeleteScheduledTaskByName(name string) error
ScheduledTasks(ctx context.Context) ([]*ScheduledTask, error)
ScheduledTasksWatch(ctx context.Context) (chan *ScheduleWatchEntry, error)
EnqueueTask(ctx context.Context, queue *Queue, task *Task) error
ElectionStorage() (jetstream.KeyValue, error)
PublishLeaderElectedEvent(ctx context.Context, name string, component string) error
}
type Storage ¶
type Storage interface {
SaveTaskState(ctx context.Context, task *Task, notify bool) error
EnqueueTask(ctx context.Context, queue *Queue, task *Task) error
RetryTaskByID(ctx context.Context, queue *Queue, id string) error
LoadTaskByID(id string) (*Task, error)
DeleteTaskByID(id string) error
PublishTaskStateChangeEvent(ctx context.Context, task *Task) error
AckItem(ctx context.Context, item *ProcessItem) error
NakBlockedItem(ctx context.Context, item *ProcessItem) error
NakItem(ctx context.Context, item *ProcessItem) error
TerminateItem(ctx context.Context, item *ProcessItem) error
PollQueue(ctx context.Context, q *Queue) (*ProcessItem, error)
PrepareQueue(q *Queue, replicas int, memory bool) error
PrepareTasks(memory bool, replicas int, retention time.Duration) error
PrepareConfigurationStore(memory bool, replicas int) error
SaveScheduledTask(st *ScheduledTask, update bool) error
LoadScheduledTaskByName(name string) (*ScheduledTask, error)
DeleteScheduledTaskByName(name string) error
ScheduledTasks(ctx context.Context) ([]*ScheduledTask, error)
ScheduledTasksWatch(ctx context.Context) (chan *ScheduleWatchEntry, error)
}
Storage implements the backend access
type StorageAdmin ¶
type StorageAdmin interface {
Queues() ([]*QueueInfo, error)
QueueNames() ([]string, error)
QueueInfo(name string) (*QueueInfo, error)
PurgeQueue(name string) error
DeleteQueue(name string) error
PrepareQueue(q *Queue, replicas int, memory bool) error
ConfigurationInfo() (jetstream.KeyValueStatus, error)
PrepareConfigurationStore(memory bool, replicas int) error
PrepareTasks(memory bool, replicas int, retention time.Duration) error
DeleteTaskByID(id string) error
TasksInfo() (*TasksInfo, error)
Tasks(ctx context.Context, limit int32) (chan *Task, error)
TasksStore() (*jsm.Manager, *jsm.Stream, error)
ElectionStorage() (jetstream.KeyValue, error)
}
StorageAdmin is helpers to support the CLI mainly, this leaks a bunch of details about JetStream but that's ok, we're not really intending to change the storage or support more
type Task ¶
type Task struct {
// ID is a k-sortable unique ID for the task
ID string `json:"id"`
// Type is a free form string that can later be used as a routing key to send tasks to handlers
Type string `json:"type"`
// Queue is the name of the queue the task was enqueued with, set only during the enqueue operation else empty
Queue string `json:"queue"`
// Dependencies are IDs of tasks that should complete before this one becomes unblocked
Dependencies []string `json:"dependencies,omitempty"`
// DependentResults are results for dependent tasks
DependencyResults map[string]*TaskResult `json:"dependency_results,omitempty"`
// LoadDependencies indicates if this task should load dependency results before execting
LoadDependencies bool `json:"load_dependencies,omitempty"`
// Payload is a JSON representation of the associated work
Payload []byte `json:"payload"`
// Deadline is a cut-off time for the job to complete, should a job be scheduled after this time it will fail.
// In-Flight jobs are allowed to continue past this time. Only starting handlers are impacted by this deadline.
Deadline *time.Time `json:"deadline,omitempty"`
// MaxTries sets a per task maximum try limit. If this task is in a queue that allow fewer tries the queue max tries
// will override this setting. A task may not exceed the work queue max tries
MaxTries int `json:"max_tries"`
// Result is the outcome of the job, only set for successful jobs
Result *TaskResult `json:"result,omitempty"`
// State is the most recent recorded state the job is in
State TaskState `json:"state"`
// CreatedAt is the time the job was created in UTC timezone
CreatedAt time.Time `json:"created"`
// LastTriedAt is a time stamp for when the job was last handed to a handler
LastTriedAt *time.Time `json:"tried,omitempty"`
// Tries is how many times the job was handled
Tries int `json:"tries"`
// LastErr is the most recent handling error if any
LastErr string `json:"last_err,omitempty"`
// Signature is an ed25519 signature of key properties
Signature string `json:"signature,omitempty"`
// contains filtered or unexported fields
}
Task represents a job item that handlers will execute
func NewTask ¶
NewTask creates a new task of taskType that can later be used to route tasks to handlers. The task will carry a JSON encoded representation of payload.
Example (With_deadline) ¶
email := newEmail("user@example.net", "Test Subject", "Test Body")
// Creates a new task that has a deadline for processing 1 hour from now
task, err := NewTask("email:send", email, TaskDeadline(time.Now().Add(time.Hour)))
if err != nil {
panic(fmt.Sprintf("Could not create task: %v", err))
}
fmt.Printf("Task ID: %s\n", task.ID)
func (*Task) HasDependencies ¶ added in v0.1.0
HasDependencies determines if the task has any dependencies
func (*Task) IsPastDeadline ¶ added in v0.0.2
IsPastDeadline determines if the task is past it's deadline
type TaskOpt ¶
TaskOpt configures Tasks made using NewTask()
func TaskDeadline ¶
TaskDeadline sets an absolute time after which the task should not be handled
func TaskDependsOn ¶ added in v0.1.0
TaskDependsOn are Tasks that this task is dependent on, can be called multiple times
func TaskDependsOnIDs ¶ added in v0.1.0
TaskDependsOnIDs are IDs that this task is dependent on, can be called multiple times
func TaskMaxTries ¶ added in v0.0.5
TaskMaxTries sets a maximum to the amount of processing attempts a task will have, the queue max tries will override this
func TaskPayloadEncoder ¶ added in v0.2.0
func TaskPayloadEncoder(enc TaskPayloadEncoderFunc) TaskOpt
TaskPayloadEncoder uses the given encoder to encode the Task's Payload. If not provided, json.Marshal is used as the default encoder.
func TaskRequiresDependencyResults ¶ added in v0.1.0
func TaskRequiresDependencyResults() TaskOpt
TaskRequiresDependencyResults indicates that if a task has any dependencies their results should be loaded before execution
type TaskPayloadEncoderFunc ¶ added in v0.2.0
TaskPayloadEncoderFunc is the implementation of the encoder used to encode the Task's Payload
type TaskResult ¶
TaskResult is the result of task execution, this will only be set for successfully processed jobs
type TaskScheduler ¶ added in v0.0.5
type TaskScheduler struct {
// contains filtered or unexported fields
}
func NewTaskScheduler ¶ added in v0.0.5
func NewTaskScheduler(name string, c *Client) (*TaskScheduler, error)
NewTaskScheduler creates a new Task Scheduler service
func (*TaskScheduler) Count ¶ added in v0.0.5
func (s *TaskScheduler) Count() int
Count reports how many schedules are managed by this Scheduler
func (*TaskScheduler) Stop ¶ added in v0.0.5
func (s *TaskScheduler) Stop()
Stop stops the scheduler service
type TaskState ¶
type TaskState string
TaskState indicates the current state a task is in
const ( // TaskStateUnknown is for tasks that do not have a state set TaskStateUnknown TaskState = "" // TaskStateNew newly created tasks that have not been handled yet TaskStateNew TaskState = "new" // TaskStateActive tasks that are currently being handled TaskStateActive TaskState = "active" // TaskStateRetry tasks that previously failed and are waiting retry TaskStateRetry TaskState = "retry" // TaskStateExpired tasks that reached their deadline or maximum tries TaskStateExpired TaskState = "expired" // TaskStateTerminated indicates that the task was terminated via the ErrTerminateTask error TaskStateTerminated TaskState = "terminated" // TaskStateCompleted tasks that are completed TaskStateCompleted TaskState = "complete" // TaskStateQueueError tasks that could not have their associated Work Queue item created TaskStateQueueError TaskState = "queue_error" // TaskStateBlocked tasks that are waiting on dependencies TaskStateBlocked TaskState = "blocked" // TaskStateUnreachable tasks that could not be run due to dependency problems TaskStateUnreachable TaskState = "unreachable" )
type TaskStateChangeEvent ¶ added in v0.0.2
type TaskStateChangeEvent struct {
BaseEvent
// TaskID is the ID of the task, use with LoadTaskByID() to access the task
TaskID string `json:"task_id"`
// State is the new state of the Task
State TaskState `json:"state"`
// Tries is how many times the Task has been processed
Tries int `json:"tries"`
// Queue is the queue the task is in, can be empty
Queue string `json:"queue,omitempty"`
// TaskType is the task routing type
TaskType string `json:"task_type"`
// LstErr is the error that caused a task to change state for error state changes
LastErr string `json:"last_error,omitempty"`
// Age is the time since the task was created in milliseconds
Age time.Duration `json:"task_age,omitempty"`
}
TaskStateChangeEvent notifies that a significant change occurred in a Task
func NewTaskStateChangeEvent ¶ added in v0.0.2
func NewTaskStateChangeEvent(t *Task) (*TaskStateChangeEvent, error)
NewTaskStateChangeEvent creates a new event notifying of a change in task state
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package election implements a simple leader election on top of a NATS JetStream Key-Value bucket.
|
Package election implements a simple leader election on top of a NATS JetStream Key-Value bucket. |
