database

package
v1.51.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type JobDBHandler

type JobDBHandler struct {
	EncryptionKey string
	// contains filtered or unexported fields
}

JobDBHandler implements JobDBHandlerFunctions and holds the database connection.

func NewJobDBHandler

func NewJobDBHandler(dbConnection *helper.Database, withTableDrop bool, encryptionKey ...string) (*JobDBHandler, error)

NewJobDBHandler creates a new instance of JobDBHandler. It initializes the database connection and optionally drops existing tables. If withTableDrop is true, it will drop the existing job tables before creating new ones

func (JobDBHandler) AddRetentionArchive added in v1.6.0

func (r JobDBHandler) AddRetentionArchive(retention time.Duration) error

AddRetentionArchive updates the retention archive settings for the job archive.

func (JobDBHandler) BatchInsertJobs

func (r JobDBHandler) BatchInsertJobs(jobs []*model.Job) error

BatchInsertJobs inserts multiple job records into the database in a single transaction.

func (JobDBHandler) CheckTablesExistance added in v1.4.0

func (r JobDBHandler) CheckTablesExistance() (bool, error)

CheckTablesExistance checks if the 'job' and 'job_archive' tables exist in the database. It returns true if both tables exist, otherwise false.

func (JobDBHandler) CreateTable

func (r JobDBHandler) CreateTable() error

CreateTable creates the 'job' and 'job_archive' tables in the database. If the tables already exist, it does not create them again. It also creates a trigger for notifying events on the table and all necessary indexes.

func (JobDBHandler) DeleteJob

func (r JobDBHandler) DeleteJob(rid uuid.UUID) error

DeleteJob deletes a job record from the job archive based on its RID. We only delete jobs from the archive as queued and running jobs should be cancelled first. Cancelling a job will move it to the archive with CANCELLED status.

func (JobDBHandler) DropTables

func (r JobDBHandler) DropTables() error

DropTables drops the 'job' and 'job_archive' tables from the database.

func (JobDBHandler) InsertJob

func (r JobDBHandler) InsertJob(job *model.Job) (*model.Job, error)

InsertJob inserts a new job record into the database.

func (JobDBHandler) InsertJobTx

func (r JobDBHandler) InsertJobTx(tx *sql.Tx, job *model.Job) (*model.Job, error)

InsertJobTx inserts a new job record into the database within a transaction.

func (JobDBHandler) RemoveRetentionArchive added in v1.6.0

func (r JobDBHandler) RemoveRetentionArchive() error

RemoveRetentionArchive removes the retention archive settings for the job archive.

func (JobDBHandler) SelectAllJobs

func (r JobDBHandler) SelectAllJobs(lastID int, entries int) ([]*model.Job, error)

SelectAllJobs retrieves a paginated list of jobs for a all workers. It returns jobs that were created before the specified lastID, or the newest jobs if lastID is 0.

func (JobDBHandler) SelectAllJobsBySearch

func (r JobDBHandler) SelectAllJobsBySearch(search string, lastID int, entries int) ([]*model.Job, error)

SelectAllJobsBySearch retrieves a paginated list of jobs for a worker, filtered by search string.

It searches across 'rid', 'worker_id', and 'status' fields. The search is case-insensitive and uses ILIKE for partial matches.

It returns jobs that were created before the specified lastID, or the newest jobs if lastID is 0.

func (JobDBHandler) SelectAllJobsByWorkerRID

func (r JobDBHandler) SelectAllJobsByWorkerRID(workerRid uuid.UUID, lastID int, entries int) ([]*model.Job, error)

SelectAllJobsByWorkerRID retrieves a paginated list of jobs for a specific worker, filtered by worker RID. It returns jobs that were created before the specified lastID, or the newest jobs if lastID is 0.

func (JobDBHandler) SelectAllJobsFromArchive

func (r JobDBHandler) SelectAllJobsFromArchive(lastID int, entries int) ([]*model.Job, error)

SelectAllJobsFromArchive retrieves a paginated list of archived jobs. It returns jobs that were created before the specified lastID, or the newest jobs if lastID is 0.

func (JobDBHandler) SelectAllJobsFromArchiveBySearch

func (r JobDBHandler) SelectAllJobsFromArchiveBySearch(search string, lastID int, entries int) ([]*model.Job, error)

SelectAllJobsFromArchiveBySearch retrieves a paginated list of archived jobs filtered by search string. It searches across 'rid', 'worker_id', 'task_name', and 'status' fields. It returns jobs that were created before the specified lastID, or the newest jobs if lastID

func (JobDBHandler) SelectJob

func (r JobDBHandler) SelectJob(rid uuid.UUID) (*model.Job, error)

SelectJob retrieves a single job record from the database based on its RID.

func (JobDBHandler) SelectJobFromArchive

func (r JobDBHandler) SelectJobFromArchive(rid uuid.UUID) (*model.Job, error)

SelectJobFromArchive retrieves a single archived job record from the database based on its RID.

func (JobDBHandler) UpdateJobFinal

func (r JobDBHandler) UpdateJobFinal(job *model.Job) (*model.Job, error)

UpdateJobFinal updates an existing job record in the database to state 'FAILED' or 'SUCCEEDED'.

It deletes the job from the 'job' table and inserts it into the 'job_archive' table. The archived job will have the status set to the provided status, and it will include results and error information.

It returns the archived job record.

func (JobDBHandler) UpdateJobsInitial

func (r JobDBHandler) UpdateJobsInitial(worker *model.Worker) ([]*model.Job, error)

UpdateJobsInitial updates an existing queued non locked job record in the database.

Checks if the job is in 'QUEUED' or 'FAILED' status and if the worker can handle the task. The worker must have the task in its available tasks and the next interval must be available if set. If the job is scheduled it must be scheduled within the next 10 minutes. It updates the job to 'RUNNING' status, increments the schedule count and attempts, and sets the started_at timestamp. It uses the `FOR UPDATE SKIP LOCKED` clause to avoid locking issues with concurrent updates.

It returns the updated job records.

func (JobDBHandler) UpdateStaleJobs added in v1.9.0

func (r JobDBHandler) UpdateStaleJobs() (int, error)

UpdateStaleJobs updates all jobs to QUEUED status where the assigned worker is STOPPED so they can be picked up by available workers again. It returns the number of jobs that were updated. Jobs are considered stale if their assigned worker has STOPPED status.

type JobDBHandlerFunctions

type JobDBHandlerFunctions interface {
	CheckTablesExistance() (bool, error)
	CreateTable() error
	DropTables() error
	InsertJob(job *model.Job) (*model.Job, error)
	InsertJobTx(tx *sql.Tx, job *model.Job) (*model.Job, error)
	BatchInsertJobs(jobs []*model.Job) error
	UpdateJobsInitial(worker *model.Worker) ([]*model.Job, error)
	UpdateJobFinal(job *model.Job) (*model.Job, error)
	UpdateStaleJobs() (int, error)
	DeleteJob(rid uuid.UUID) error
	SelectJob(rid uuid.UUID) (*model.Job, error)
	SelectAllJobs(lastID int, entries int) ([]*model.Job, error)
	SelectAllJobsByWorkerRID(workerRid uuid.UUID, lastID int, entries int) ([]*model.Job, error)
	SelectAllJobsBySearch(search string, lastID int, entries int) ([]*model.Job, error)
	// Job Archive
	AddRetentionArchive(retention time.Duration) error
	RemoveRetentionArchive() error
	SelectJobFromArchive(rid uuid.UUID) (*model.Job, error)
	SelectAllJobsFromArchive(lastID int, entries int) ([]*model.Job, error)
	SelectAllJobsFromArchiveBySearch(search string, lastID int, entries int) ([]*model.Job, error)
}

JobDBHandlerFunctions defines the interface for Job database operations.

type MasterDBHandler added in v1.6.0

type MasterDBHandler struct {
	// contains filtered or unexported fields
}

MasterDBHandler implements MasterDBHandlerFunctions and holds the database connection.

func NewMasterDBHandler added in v1.6.0

func NewMasterDBHandler(dbConnection *helper.Database, withTableDrop bool) (*MasterDBHandler, error)

NewMasterDBHandler creates a new instance of MasterDBHandler. It initializes the database connection and creates the master table if it does not exist.

func (MasterDBHandler) CheckTableExistance added in v1.6.0

func (r MasterDBHandler) CheckTableExistance() (bool, error)

CheckTableExistance checks if the 'master' table exists in the database. It returns true if the table exists, otherwise false.

func (MasterDBHandler) CreateTable added in v1.6.0

func (r MasterDBHandler) CreateTable() error

CreateTable creates the 'master' and 'master_archive' tables in the database. If the tables already exist, it does not create them again. It also creates a trigger for notifying events on the table and all necessary indexes.

func (MasterDBHandler) DropTable added in v1.6.0

func (r MasterDBHandler) DropTable() error

DropTables drops the 'master' and 'master_archive' tables from the database.

func (MasterDBHandler) SelectMaster added in v1.6.0

func (r MasterDBHandler) SelectMaster() (*model.Master, error)

SelectMaster retrieves the current master entry from the database.

func (MasterDBHandler) UpdateMaster added in v1.6.0

func (r MasterDBHandler) UpdateMaster(worker *model.Worker, settings *model.MasterSettings) (*model.Master, error)

UpdateMaster updates the master entry with the given worker's ID and settings. It locks the row for update to ensure that only one worker can update the master at a time. It returns the old master entry if it was successfully updated, or nil if no update was done.

type MasterDBHandlerFunctions added in v1.6.0

type MasterDBHandlerFunctions interface {
	CheckTableExistance() (bool, error)
	CreateTable() error
	DropTable() error
	UpdateMaster(worker *model.Worker, settings *model.MasterSettings) (*model.Master, error)
	SelectMaster() (*model.Master, error)
}

MasterDBHandlerFunctions defines the interface for Master database operations.

type QueuerListener

type QueuerListener struct {
	Listener *pq.Listener
	Channel  string
}

func NewQueuerDBListener

func NewQueuerDBListener(dbConfig *helper.DatabaseConfiguration, channel string) (*QueuerListener, error)

NewQueuerDBListener creates a new QueuerListener instance. It initializes a PostgreSQL listener for the specified channel using the provided database configuration. The listener will automatically reconnect if the connection is lost, with a 10-second timeout and a 1-minute interval for reconnection attempts. If an error occurs during the creation of the listener, it returns an error. The listener will log any errors encountered during listening.

func (*QueuerListener) Listen

func (l *QueuerListener) Listen(ctx context.Context, cancel context.CancelFunc, notifyFunction func(data string))

Listen listens for events on the specified channel and processes them. It takes a context for cancellation, a cancel function to stop listening, and a notifyFunction that will be called with the event data when an event is received. The listener will check the connection every 90 seconds and will cancel the context if an error occurs during the ping. The notifyFunction will be called in a separate goroutine to avoid blocking the listener. If the context is done, the listener will stop listening and returns. It will log any errors encountered during the ping operation.

func (*QueuerListener) ListenWithTimeout added in v1.9.0

func (l *QueuerListener) ListenWithTimeout(ctx context.Context, cancel context.CancelFunc, notifyFunction func(data string), pingTimeout time.Duration)

ListenWithTimeout is similar to Listen but allows configuring the ping timeout interval. This is primarily used for testing to avoid waiting the full 90 seconds.

type WorkerDBHandler

type WorkerDBHandler struct {
	// contains filtered or unexported fields
}

WorkerDBHandler implements WorkerDBHandlerFunctions and holds the database connection.

func NewWorkerDBHandler

func NewWorkerDBHandler(dbConnection *helper.Database, withTableDrop bool) (*WorkerDBHandler, error)

NewWorkerDBHandler creates a new instance of WorkerDBHandler. It initializes the database connection and optionally drops the existing worker table. If withTableDrop is true, it will drop the existing worker table before creating a new one.

func (WorkerDBHandler) CheckTableExistance

func (r WorkerDBHandler) CheckTableExistance() (bool, error)

CheckTableExistance checks if the 'worker' table exists in the database. It returns true if the table exists, otherwise false.

func (WorkerDBHandler) CreateTable

func (r WorkerDBHandler) CreateTable() error

CreateTable creates the 'worker' table in the database if it doesn't already exist. It defines the structure of the table with appropriate columns and types. If the table already exists, it will not create it again. It also creates necessary indexes for efficient querying.

func (WorkerDBHandler) DeleteStaleWorkers added in v1.24.0

func (r WorkerDBHandler) DeleteStaleWorkers(deleteThreshold time.Duration) (int, error)

DeleteStaleWorkers deletes workers that have been in STOPPED status for longer than the deleteThreshold. It returns the number of workers that were deleted.

func (WorkerDBHandler) DeleteWorker

func (r WorkerDBHandler) DeleteWorker(rid uuid.UUID) error

DeleteWorker deletes a worker record from the database based on its RID. It removes the worker from the database and returns an error if the deletion fails.

func (WorkerDBHandler) DropTable

func (r WorkerDBHandler) DropTable() error

DropTable drops the 'worker' table from the database. It will remove the table and all its data. This operation is irreversible, so it should be used with caution. It is used during testing or when resetting the database schema. If the table does not exist, it will not return an error.

func (WorkerDBHandler) InsertWorker

func (r WorkerDBHandler) InsertWorker(worker *model.Worker) (*model.Worker, error)

InsertWorker inserts a new worker record with name, options and max concurrency into the database. It returns the newly created worker with an automatically generated RID. If the insertion fails, it returns an error.

func (WorkerDBHandler) SelectAllConnections added in v1.9.0

func (r WorkerDBHandler) SelectAllConnections() ([]*model.Connection, error)

SelectAllConnections retrieves all active connections from the database. It returns a slice of Connection records. If the query fails, it returns an error.

func (WorkerDBHandler) SelectAllWorkers

func (r WorkerDBHandler) SelectAllWorkers(lastID int, entries int) ([]*model.Worker, error)

SelectAllWorkers retrieves a paginated list of all workers. It returns a slice of worker records, ordered by creation date in descending order. It returns workers that were created before the specified lastID, or the newest workers if lastID is 0.

func (WorkerDBHandler) SelectAllWorkersBySearch

func (r WorkerDBHandler) SelectAllWorkersBySearch(search string, lastID int, entries int) ([]*model.Worker, error)

SelectAllWorkersBySearch retrieves a paginated list of workers, filtered by search string. It searches across 'queue_name', 'name', and 'status' fields. The search is case-insensitive and uses ILIKE for partial matches. It returns a slice of worker records, ordered by creation date in descending order. It returns workers that were created before the specified lastID, or the newest workers if last

func (WorkerDBHandler) SelectWorker

func (r WorkerDBHandler) SelectWorker(rid uuid.UUID) (*model.Worker, error)

SelectWorker retrieves a single worker record from the database based on its RID. It returns the worker record. If the worker is not found or an error occurs during the query, it returns an error.

func (WorkerDBHandler) UpdateStaleWorkers added in v1.9.0

func (r WorkerDBHandler) UpdateStaleWorkers(staleThreshold time.Duration) (int, error)

UpdateStaleWorkers updates all stale workers to STOPPED status based on the provided threshold. It returns the number of workers that were updated. Workers are considered stale if they have READY or RUNNING status and their updated_at timestamp is older than the threshold.

func (WorkerDBHandler) UpdateWorker

func (r WorkerDBHandler) UpdateWorker(worker *model.Worker) (*model.Worker, error)

UpdateWorker updates an existing worker record in the database based on its RID. It updates the worker's name, options, available tasks, next interval functions, max concurrency, and status. It returns the updated worker record with an automatically updated updated_at timestamp. If the update fails, it returns an error.

type WorkerDBHandlerFunctions

type WorkerDBHandlerFunctions interface {
	CheckTableExistance() (bool, error)
	CreateTable() error
	DropTable() error
	InsertWorker(worker *model.Worker) (*model.Worker, error)
	UpdateWorker(worker *model.Worker) (*model.Worker, error)
	UpdateStaleWorkers(staleThreshold time.Duration) (int, error)
	DeleteWorker(rid uuid.UUID) error
	DeleteStaleWorkers(deleteThreshold time.Duration) (int, error)
	SelectWorker(rid uuid.UUID) (*model.Worker, error)
	SelectAllWorkers(lastID int, entries int) ([]*model.Worker, error)
	SelectAllWorkersBySearch(search string, lastID int, entries int) ([]*model.Worker, error)
	// Connections
	SelectAllConnections() ([]*model.Connection, error)
}

WorkerDBHandlerFunctions defines the interface for Worker database operations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL