synk

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

README

synk

Synk is a lightweight and efficient for background jobs in Golang & PostgreSQL.

Documentation

Overview

Package synk provides a distributed job queue system for processing tasks in a distributed environment.

Index

Constants

This section is empty.

Variables

View Source
var CleanerConfigDefault = &CleanerConfig{
	CleanInterval: time.Hour * 24,
	ByStatus: map[JobState]time.Duration{
		JobStateCompleted: time.Hour * 24 * 30,
		JobStateCancelled: time.Hour * 24 * 90,
	},
}

CleanerConfigDefault provides default settings for the job cleaner.

View Source
var QueueConfigDefault = &QueueConfig{
	MaxWorkers: 100,
	TimeFetch:  time.Millisecond * 200,
	JobTimeout: time.Minute,
}

QueueConfigDefault is the default configuration for the queue system. It sets the maximum number of workers to 100, the time interval to fetch jobs to 200 milliseconds, and the timeout for each job to 1 minute.

Functions

This section is empty.

Types

type AttemptError

type AttemptError struct {
	// At is the time at which the error occurred.
	At time.Time `json:"at"`

	// Attempt is the attempt number on which the error occurred (maps to
	// Attempt on a job row).
	Attempt int `json:"attempt"`

	// Error contains the stringified error of an error returned from a job or a
	// panic value in case of a panic.
	Error string `json:"error"`

	// Trace contains a stack trace from a job that panicked. The trace is
	// produced by invoking `debug.Trace()`.
	Trace string `json:"trace"`
}

AttemptError represents an error that occurred during a job attempt. It contains details about the time of the error, the attempt number, the error message, and a stack trace if the job panicked.

type CleanerConfig

type CleanerConfig struct {
	// CleanInterval is the time interval at which the cleaner will run to remove old jobs.
	CleanInterval time.Duration

	// ByStatus is a map that defines the retention duration for jobs based on their status.
	// The key is the JobState, and the value is the duration after which jobs in that state
	// should be cleaned up.
	ByStatus map[JobState]time.Duration
}

CleanerConfig represents the configuration settings for the job cleaner.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a Client that manages the configuration, context, and producers for a specific task.

func NewClient

func NewClient(ctx context.Context, opts ...Option) *Client

NewClient creates a new instance of worker with the provided context and options. It initializes the client's configuration, queues, and workers. If no queues or workers are configured, it panics. It also generates a unique client ID and sets up producers for each queue.

func (*Client) Cancel added in v0.3.0

func (c *Client) Cancel(ctx context.Context, jobID *int64) error

Cancel cancels a job by its ID.

func (*Client) Delete added in v0.3.0

func (c *Client) Delete(ctx context.Context, jobID *int64) error

Delete deletes a job by its ID.

func (*Client) Insert

func (c *Client) Insert(name string, params JobArgs, options ...*InsertOptions) (*int64, error)

Insert add a job into the queue to be processed. If no options are provided, it will use the default options.

func (*Client) InsertTx

func (c *Client) InsertTx(tx *sql.Tx, name string, params JobArgs, options ...*InsertOptions) (id *int64, err error)

InsertTx adds a job into the specified queue within the context of the provided transaction, allowing the operation to be part of an atomic database transaction.

func (*Client) Retry added in v0.3.0

func (c *Client) Retry(ctx context.Context, jobID *int64) error

Retry retries a job by its ID.

func (*Client) Shutdown

func (c *Client) Shutdown()

Shutdown cancels the client's context and stops any ongoing work. It calls the cancel functions associated with the client to gracefully shut down any operations.

func (*Client) Start

func (c *Client) Start()

Start it initializes the client's context and starts the producers for each queue. Each producer runs in a separate goroutine, fetching and processing jobs according to its configuration. The method waits for all producers to complete their work before returning. It also sets up a heartbeat mechanism to log the total number of completed jobs at regular intervals.

type InsertOptions

type InsertOptions struct {
	// ScheduledAt is the time at which the job should be scheduled to run.
	// If not specified, the current time is used.
	ScheduledAt time.Time

	// Priority is the priority of the job, which can be used to determine the order
	// in which jobs are processed. Higher values indicate higher priority.
	Priority Priority

	// Pending indicates whether the job is pending execution.
	// If true, the job is considered pending and will not be executed until it is marked
	// as ready. If false, the job is ready to be executed.
	Pending bool

	// Queue is the name of the queue to which the job belongs.
	// If not specified, the default queue is used.
	Queue string

	// MaxRetries is the maximum number of times the job can be retried if it fails.
	// If not specified, the default value is used.
	MaxRetries int

	// DependsOn is a slice of job IDs that the current job depends on.
	// If not specified, the current job does not depend on any other jobs.
	// If any of the dependent jobs fail, the current job will not be executed.
	// This is useful for ensuring that jobs are executed in a specific order.
	// For example, if job A depends on job B, and job B fails, job A will not be executed.
	// If job B succeeds, job A will be executed.
	DependsOn []*int64
}

InsertOptions represents options for inserting a job into the queue.

type Job

type Job[T JobArgs] struct {
	// A pointer to a JobRow struct from the types package,
	// which contains metadata about the job.
	*JobRow

	// Args arguments required to process the job, of type T.
	Args T
}

Job represents a job to be processed by a worker. It is a generic type that takes a type parameter T which must satisfy the JobArgs interface. The Job struct embeds a JobRow from the types package and includes the arguments required to process the job.

type JobArgs

type JobArgs interface {
	Kind() string
}

JobArgs represents an interface that defines a method for retrieving the kind of job. Any type that implements this interface must provide a Kind method that returns a string indicating the type or category of the job.

type JobRow

type JobRow struct {
	ID        int64          `json:"id,omitempty"`
	Name      string         `json:"name,omitempty"`
	Attempt   int            `json:"attempt,omitempty"`
	AttemptAt *time.Time     `json:"attempt_at,omitempty"`
	Kind      string         `json:"kind,omitempty"`
	Queue     string         `json:"queue,omitempty"`
	DependsOn []int64        `json:"depends_on,omitempty"`
	Args      []byte         `json:"args,omitempty"`
	State     JobState       `json:"state,omitempty"`
	Errors    []AttemptError `json:"errors,omitempty"`
	Options   *InsertOptions `json:"options,omitempty"`
}

JobRow represents a row in the job table, containing information about a specific job. It includes details such as the job ID, the number of attempts, the time of the last attempt, the type of job, the queue it belongs to, the encoded arguments, the current state of the job, and any errors that occurred during attempts.

type JobState

type JobState string

JobState represents the status of a job.

const (
	JobStateAvailable JobState = "available"
	JobStateCancelled JobState = "cancelled"
	JobStateCompleted JobState = "completed"
	JobStateRunning   JobState = "running"
	JobStateScheduled JobState = "scheduled"
	JobStatePending   JobState = "pending"
)

type Option

type Option func(*config)

Option represents a function that modifies the configuration settings of a Config object. It allows for flexible and customizable configuration of the application by applying various options.

func WithCleaner

func WithCleaner(cleaner *CleanerConfig) Option

WithCleaner is an option function that sets the cleaner configuration for the synk client. It takes a CleanerConfig struct as an argument and returns an Option function that updates the Config with the provided cleaner configuration.

func WithLogger

func WithLogger(logger *slog.Logger) Option

WithLogger sets the logger for the synk client.

func WithQueue

func WithQueue(name string, queueCfg ...*QueueConfig) Option

WithQueue is an option function that configures a queue with the given name and optional QueueConfig. If no QueueConfig is provided, a default configuration with MaxWorkers set to 100 is used. The function returns an Option that updates the Config with the specified queue configuration.

func WithStorage

func WithStorage(storage Storage) Option

WithStorage sets the storage backend for the synk client. In this case, it uses a PostgreSQL storage implementation. This option is essential for persisting job data and ensuring reliable job processing.

func WithWorker

func WithWorker[T JobArgs](w Worker[T]) Option

WithWorker is an option function that sets the Workers field in the Config struct. It takes a pointer to a Workers struct and returns an Option function that assigns the provided Workers to the Config.

type Priority

type Priority int

Priority represents the priority of a job.

const (
	PriorityCritical Priority = 1
	PriorityHigh     Priority = 2
	PriorityMedium   Priority = 3
	PriorityLow      Priority = 4
)

type QueueConfig

type QueueConfig struct {
	MaxWorkers uint16
	TimeFetch  time.Duration
	JobTimeout time.Duration
}

QueueConfig holds the configuration settings for a job queue. It includes the maximum number of workers, the time interval for fetching jobs, and the timeout duration for each job.

type Storage

type Storage interface {
	// Ping checks the connection to the storage system.
	// It returns an error if the connection is not successful.
	Ping() error

	// GetJobAvailable retrieves a list of available jobs from the specified queue.
	// It takes the name of the queue and a limit on the number of jobs to retrieve.
	// It returns a slice of pointers to JobRow and an error if the operation fails.
	GetJobAvailable(queue string, limit int32, clientID *ulid.ULID) ([]*JobRow, error)

	// Insert adds a new job to the specified queue with the given kind and arguments
	// within the context of the provided transaction. This allows the operation to be
	// part of an atomic database transaction. It returns the ID of the inserted job
	// and an error if the operation fails.
	Insert(tx *sql.Tx, params *JobRow) (*int64, error)

	// Cancel cancels a job by its ID and returns an error if the operation fails.
	Cancel(jobID *int64) error

	// Retry retries a job by its ID and returns an error if the operation fails.
	Retry(jobID *int64) error

	// Delete deletes a job by its ID and returns an error if the operation fails.
	Delete(jobID *int64) error

	// UpdateJobState updates the state of a job identified by its ID.
	// It takes the job ID, the new state, an optional finalized time, and an
	// optional error message. It returns an error if the update fails.
	UpdateJobState(jobID *int64, newState JobState, finalizedAt time.Time, e *AttemptError) error

	// Cleaner is a method for cleaning up expired jobs based on their state and age.
	// It takes a CleanerConfig struct as input and returns an error if the cleanup fails.
	Cleaner(*CleanerConfig) (int64, error)
}

Storage is an interface that defines methods for interacting with job storage. It provides a method to retrieve available jobs from a specified queue.

type Worker

type Worker[T JobArgs] interface {
	// Work method processes the given job within the provided context and returns an error if the job fails.
	Work(context.Context, *Job[T]) error
	// Timeout method returns the duration after which the job should be considered timed out.
	Timeout(*Job[T]) time.Duration
	// NextRetry method returns the time at which the job should be retried.
	NextRetry(*Job[T]) time.Time
}

Worker represents a generic worker interface that processes jobs of type T. T must satisfy the JobArgs constraint. The Worker interface defines methods for executing a job, determining the timeout for a job, and calculating the next retry time for a job.

type WorkerDefaults

type WorkerDefaults[T JobArgs] struct{}

WorkerDefaults is a generic struct that can be used to define default settings or configurations for a worker. The generic type parameter T represents the type of job arguments that the worker will handle.

func (WorkerDefaults[T]) NextRetry

func (w WorkerDefaults[T]) NextRetry(*Job[T]) time.Time

NextRetry calculates the next retry time for a given job. It returns a zero value of time.Time, indicating no retry is scheduled.

func (WorkerDefaults[T]) Timeout

func (w WorkerDefaults[T]) Timeout(*Job[T]) time.Duration

Timeout returns the duration for which the worker should wait before timing out a job. This method can be overridden to provide custom timeout logic for different jobs.

Directories

Path Synopsis
example
storage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL