Documentation ¶
Index ¶
- Constants
- Variables
- type Broker
- type Chain
- type ChainMessage
- type ChainMeta
- type ChainOpts
- type Group
- type GroupMessage
- type GroupMeta
- type GroupOpts
- type Job
- type JobCtx
- type JobMessage
- type JobOpts
- type Meta
- type Results
- type Server
- func (s *Server) DeleteJob(ctx context.Context, id string) error
- func (s *Server) Enqueue(ctx context.Context, t Job) (string, error)
- func (s *Server) EnqueueChain(ctx context.Context, c Chain) (string, error)
- func (s *Server) EnqueueGroup(ctx context.Context, t Group) (string, error)
- func (s *Server) GetChain(ctx context.Context, id string) (ChainMessage, error)
- func (s *Server) GetFailed(ctx context.Context) ([]string, error)
- func (s *Server) GetGroup(ctx context.Context, id string) (GroupMessage, error)
- func (s *Server) GetJob(ctx context.Context, id string) (JobMessage, error)
- func (s *Server) GetPending(ctx context.Context, queue string) ([]JobMessage, error)
- func (s *Server) GetResult(ctx context.Context, id string) ([]byte, error)
- func (s *Server) GetSuccess(ctx context.Context) ([]string, error)
- func (s *Server) GetTasks() []string
- func (s *Server) RegisterTask(name string, fn handler, opts TaskOpts) error
- func (s *Server) Start(ctx context.Context)
- type ServerOpts
- type Task
- type TaskOpts
Constants ¶
const ( // This is the initial state when a job is pushed onto the broker. StatusStarted = "queued" // This is the state when a worker has recieved a job. StatusProcessing = "processing" // The state when a job completes, but returns an error (and all retries are over). StatusFailed = "failed" // The state when a job completes without any error. StatusDone = "successful" // The state when a job errors out and is queued again to be retried. // This state is analogous to statusStarted. StatusRetrying = "retrying" )
const (
DefaultQueue = "tasqueue:tasks"
)
Variables ¶
var ErrNotFound = errors.New("result not found")
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker interface { // Enqueue places a task in the queue Enqueue(ctx context.Context, msg []byte, queue string) error // EnqueueScheduled accepts a task (msg, queue) and also a timestamp // The job should be enqueued at the particular timestamp. EnqueueScheduled(ctx context.Context, msg []byte, queue string, ts time.Time) error // Consume listens for tasks on the queue and calls processor Consume(ctx context.Context, work chan []byte, queue string) // GetPending returns a list of stored job messages on the particular queue GetPending(ctx context.Context, queue string) ([]string, error) }
type Chain ¶
type ChainMessage ¶
type ChainMessage struct {
ChainMeta
}
ChainMessage is a wrapper over Chain, containing meta info such as status, id. A ChainMessage is stored in the results store.
type ChainMeta ¶
type ChainMeta struct { ID string // Status of the overall chain Status string // ID of the current job part of chain JobID string // List of IDs of completed jobs PrevJobs []string }
ChainMeta contains fields related to a chain job.
type ChainOpts ¶
type ChainOpts struct { // Optional ID passed by client. If empty, Tasqueue generates it. ID string }
type GroupMessage ¶
GroupMessage is a wrapper over Group, containing meta info such as status, id. A GroupMessage is stored in the results store.
type GroupMeta ¶
type GroupMeta struct { ID string Status string // JobStatus is a map of job id -> status JobStatus map[string]string }
GroupMeta contains fields related to a group job. These are updated when a task is consumed.
type Job ¶
type Job struct { // If task is successful, the OnSuccess jobs are enqueued. OnSuccess []*Job Task string Payload []byte // If task fails, the OnError jobs are enqueued. OnError []*Job Opts JobOpts }
Job represents a unit of work pushed by producers. It is the responsibility of the task handler to unmarshal (if required) the payload and process it in any manner.
type JobCtx ¶
JobCtx is passed onto handler functions. It allows access to a job's meta information to the handler.
type JobMessage ¶
JobMessage is a wrapper over Task, used to transport the task over a broker. It contains additional fields such as status and a ID.
type JobOpts ¶
type JobOpts struct { // Optional ID passed by client. If empty, Tasqueue generates it. ID string ETA time.Time Queue string MaxRetries uint32 Schedule string Timeout time.Duration }
JobOpts holds the various options available to configure a job.
type Meta ¶
type Meta struct { ID string OnSuccessIDs []string Status string Queue string Schedule string MaxRetry uint32 Retried uint32 PrevErr string ProcessedAt time.Time // PrevJobResults contains any job result set by the previous job in a chain. // This will be nil if the previous job doesn't set the results on JobCtx. PrevJobResult []byte }
Meta contains fields related to a job. These are updated when a task is consumed.
func DefaultMeta ¶
DefaultMeta returns Meta with a ID and other defaults filled in.
type Results ¶
type Results interface { Get(ctx context.Context, id string) ([]byte, error) // NilError is used to check internally if the "id" is missing NilError() error Set(ctx context.Context, id string, b []byte) error // DeleteJob removes the job's saved metadata from the store DeleteJob(ctx context.Context, id string) error GetFailed(ctx context.Context) ([]string, error) GetSuccess(ctx context.Context) ([]string, error) SetFailed(ctx context.Context, id string) error SetSuccess(ctx context.Context, id string) error }
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the main store that holds the broker and the results communication interfaces. It also stores the registered tasks.
func NewServer ¶
func NewServer(o ServerOpts) (*Server, error)
NewServer() returns a new instance of server, with sane defaults.
func (*Server) DeleteJob ¶
DeleteJob() removes the stored results of a particular job. It does not "dequeue" an unprocessed job. It is useful for removing the status of old finished jobs.
func (*Server) Enqueue ¶
Enqueue() accepts a job and returns the assigned ID. The following steps take place: 1. Converts it into a job message, which assigns a ID (among other meta info) to the job. 2. Sets the job status as "started" on the results store. 3. Enqueues the job (if the job is scheduled, pushes it onto the scheduler)
func (*Server) EnqueueChain ¶
func (*Server) EnqueueGroup ¶
EnqueueGroup() accepts a group and returns the assigned ID. The following steps take place: 1. Converts it into a group message, which assigns a ID (among other meta info) to the group. 2. Sets the group status as "started" on the results store. 3. Loops over all jobs part of the group and enqueues the job each job. 4. The job status map is updated with the IDs of each enqueued job.
func (*Server) GetJob ¶
GetJob accepts a ID and returns the job message in the results store. This is useful to check the status of a job message.
func (*Server) GetPending ¶
GetPending() returns the pending job message's in the broker's queue.
func (*Server) GetResult ¶
GetResult() accepts a ID and returns the result of the job in the results store.
func (*Server) GetSuccess ¶
GetSuccess() returns the list of ids of jobs that were successful.
func (*Server) RegisterTask ¶
RegisterTask maps a new task against the tasks map on the server. It accepts different options for the task (to set callbacks).