Documentation
¶
Overview ¶
Package synk provides a distributed job queue system for processing tasks in a distributed environment.
Index ¶
- Variables
- type AttemptError
- type CleanerConfig
- type Client
- func (c *Client) Cancel(ctx context.Context, jobID *int64) error
- func (c *Client) Delete(ctx context.Context, jobID *int64) error
- func (c *Client) Insert(name string, params JobArgs, options ...*InsertOptions) (*int64, error)
- func (c *Client) InsertTx(tx *sql.Tx, name string, params JobArgs, options ...*InsertOptions) (id *int64, err error)
- func (c *Client) Retry(ctx context.Context, jobID *int64) error
- func (c *Client) Shutdown()
- func (c *Client) Start()
- type InsertOptions
- type Job
- type JobArgs
- type JobRow
- type JobState
- type Option
- type Priority
- type QueueConfig
- type Storage
- type Worker
- type WorkerDefaults
Constants ¶
This section is empty.
Variables ¶
var CleanerConfigDefault = &CleanerConfig{ CleanInterval: time.Hour * 24, ByStatus: map[JobState]time.Duration{ JobStateCompleted: time.Hour * 24 * 30, JobStateCancelled: time.Hour * 24 * 90, }, }
CleanerConfigDefault provides default settings for the job cleaner.
var QueueConfigDefault = &QueueConfig{ MaxWorkers: 100, TimeFetch: time.Millisecond * 200, JobTimeout: time.Minute, }
QueueConfigDefault is the default configuration for the queue system. It sets the maximum number of workers to 100, the time interval to fetch jobs to 200 milliseconds, and the timeout for each job to 1 minute.
Functions ¶
This section is empty.
Types ¶
type AttemptError ¶
type AttemptError struct {
// At is the time at which the error occurred.
At time.Time `json:"at"`
// Attempt is the attempt number on which the error occurred (maps to
// Attempt on a job row).
Attempt int `json:"attempt"`
// Error contains the stringified error of an error returned from a job or a
// panic value in case of a panic.
Error string `json:"error"`
// Trace contains a stack trace from a job that panicked. The trace is
// produced by invoking `debug.Trace()`.
Trace string `json:"trace"`
}
AttemptError represents an error that occurred during a job attempt. It contains details about the time of the error, the attempt number, the error message, and a stack trace if the job panicked.
type CleanerConfig ¶
type CleanerConfig struct {
// CleanInterval is the time interval at which the cleaner will run to remove old jobs.
CleanInterval time.Duration
// ByStatus is a map that defines the retention duration for jobs based on their status.
// The key is the JobState, and the value is the duration after which jobs in that state
// should be cleaned up.
ByStatus map[JobState]time.Duration
}
CleanerConfig represents the configuration settings for the job cleaner.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client represents a Client that manages the configuration, context, and producers for a specific task.
func NewClient ¶
NewClient creates a new instance of worker with the provided context and options. It initializes the client's configuration, queues, and workers. If no queues or workers are configured, it panics. It also generates a unique client ID and sets up producers for each queue.
func (*Client) Insert ¶
Insert add a job into the queue to be processed. If no options are provided, it will use the default options.
func (*Client) InsertTx ¶
func (c *Client) InsertTx(tx *sql.Tx, name string, params JobArgs, options ...*InsertOptions) (id *int64, err error)
InsertTx adds a job into the specified queue within the context of the provided transaction, allowing the operation to be part of an atomic database transaction.
func (*Client) Shutdown ¶
func (c *Client) Shutdown()
Shutdown cancels the client's context and stops any ongoing work. It calls the cancel functions associated with the client to gracefully shut down any operations.
func (*Client) Start ¶
func (c *Client) Start()
Start it initializes the client's context and starts the producers for each queue. Each producer runs in a separate goroutine, fetching and processing jobs according to its configuration. The method waits for all producers to complete their work before returning. It also sets up a heartbeat mechanism to log the total number of completed jobs at regular intervals.
type InsertOptions ¶
type InsertOptions struct {
// ScheduledAt is the time at which the job should be scheduled to run.
// If not specified, the current time is used.
ScheduledAt time.Time
// Priority is the priority of the job, which can be used to determine the order
// in which jobs are processed. Higher values indicate higher priority.
Priority Priority
// Pending indicates whether the job is pending execution.
// If true, the job is considered pending and will not be executed until it is marked
// as ready. If false, the job is ready to be executed.
Pending bool
// Queue is the name of the queue to which the job belongs.
// If not specified, the default queue is used.
Queue string
// MaxRetries is the maximum number of times the job can be retried if it fails.
// If not specified, the default value is used.
MaxRetries int
// DependsOn is a slice of job IDs that the current job depends on.
// If not specified, the current job does not depend on any other jobs.
// If any of the dependent jobs fail, the current job will not be executed.
// This is useful for ensuring that jobs are executed in a specific order.
// For example, if job A depends on job B, and job B fails, job A will not be executed.
// If job B succeeds, job A will be executed.
DependsOn []*int64
}
InsertOptions represents options for inserting a job into the queue.
type Job ¶
type Job[T JobArgs] struct { // A pointer to a JobRow struct from the types package, // which contains metadata about the job. *JobRow // Args arguments required to process the job, of type T. Args T }
Job represents a job to be processed by a worker. It is a generic type that takes a type parameter T which must satisfy the JobArgs interface. The Job struct embeds a JobRow from the types package and includes the arguments required to process the job.
type JobArgs ¶
type JobArgs interface {
Kind() string
}
JobArgs represents an interface that defines a method for retrieving the kind of job. Any type that implements this interface must provide a Kind method that returns a string indicating the type or category of the job.
type JobRow ¶
type JobRow struct {
ID int64 `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Attempt int `json:"attempt,omitempty"`
AttemptAt *time.Time `json:"attempt_at,omitempty"`
Kind string `json:"kind,omitempty"`
Queue string `json:"queue,omitempty"`
DependsOn []int64 `json:"depends_on,omitempty"`
Args []byte `json:"args,omitempty"`
State JobState `json:"state,omitempty"`
Errors []AttemptError `json:"errors,omitempty"`
Options *InsertOptions `json:"options,omitempty"`
}
JobRow represents a row in the job table, containing information about a specific job. It includes details such as the job ID, the number of attempts, the time of the last attempt, the type of job, the queue it belongs to, the encoded arguments, the current state of the job, and any errors that occurred during attempts.
type Option ¶
type Option func(*config)
Option represents a function that modifies the configuration settings of a Config object. It allows for flexible and customizable configuration of the application by applying various options.
func WithCleaner ¶
func WithCleaner(cleaner *CleanerConfig) Option
WithCleaner is an option function that sets the cleaner configuration for the synk client. It takes a CleanerConfig struct as an argument and returns an Option function that updates the Config with the provided cleaner configuration.
func WithLogger ¶
WithLogger sets the logger for the synk client.
func WithQueue ¶
func WithQueue(name string, queueCfg ...*QueueConfig) Option
WithQueue is an option function that configures a queue with the given name and optional QueueConfig. If no QueueConfig is provided, a default configuration with MaxWorkers set to 100 is used. The function returns an Option that updates the Config with the specified queue configuration.
func WithStorage ¶
WithStorage sets the storage backend for the synk client. In this case, it uses a PostgreSQL storage implementation. This option is essential for persisting job data and ensuring reliable job processing.
func WithWorker ¶
WithWorker is an option function that sets the Workers field in the Config struct. It takes a pointer to a Workers struct and returns an Option function that assigns the provided Workers to the Config.
type QueueConfig ¶
QueueConfig holds the configuration settings for a job queue. It includes the maximum number of workers, the time interval for fetching jobs, and the timeout duration for each job.
type Storage ¶
type Storage interface {
// Ping checks the connection to the storage system.
// It returns an error if the connection is not successful.
Ping() error
// GetJobAvailable retrieves a list of available jobs from the specified queue.
// It takes the name of the queue and a limit on the number of jobs to retrieve.
// It returns a slice of pointers to JobRow and an error if the operation fails.
GetJobAvailable(queue string, limit int32, clientID *ulid.ULID) ([]*JobRow, error)
// Insert adds a new job to the specified queue with the given kind and arguments
// within the context of the provided transaction. This allows the operation to be
// part of an atomic database transaction. It returns the ID of the inserted job
// and an error if the operation fails.
Insert(tx *sql.Tx, params *JobRow) (*int64, error)
// Cancel cancels a job by its ID and returns an error if the operation fails.
Cancel(jobID *int64) error
// Retry retries a job by its ID and returns an error if the operation fails.
Retry(jobID *int64) error
// Delete deletes a job by its ID and returns an error if the operation fails.
Delete(jobID *int64) error
// UpdateJobState updates the state of a job identified by its ID.
// It takes the job ID, the new state, an optional finalized time, and an
// optional error message. It returns an error if the update fails.
UpdateJobState(jobID *int64, newState JobState, finalizedAt time.Time, e *AttemptError) error
// Cleaner is a method for cleaning up expired jobs based on their state and age.
// It takes a CleanerConfig struct as input and returns an error if the cleanup fails.
Cleaner(*CleanerConfig) (int64, error)
}
Storage is an interface that defines methods for interacting with job storage. It provides a method to retrieve available jobs from a specified queue.
type Worker ¶
type Worker[T JobArgs] interface { // Work method processes the given job within the provided context and returns an error if the job fails. Work(context.Context, *Job[T]) error // Timeout method returns the duration after which the job should be considered timed out. Timeout(*Job[T]) time.Duration // NextRetry method returns the time at which the job should be retried. NextRetry(*Job[T]) time.Time }
Worker represents a generic worker interface that processes jobs of type T. T must satisfy the JobArgs constraint. The Worker interface defines methods for executing a job, determining the timeout for a job, and calculating the next retry time for a job.
type WorkerDefaults ¶
type WorkerDefaults[T JobArgs] struct{}
WorkerDefaults is a generic struct that can be used to define default settings or configurations for a worker. The generic type parameter T represents the type of job arguments that the worker will handle.