services

package
v0.0.0-...-7ed8402 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2023 License: MIT Imports: 19 Imported by: 3

Documentation

Overview

Mediation layer between the server and database queries.

Logic that's not related to validating request input/turning errors into HTTP responses should go here.

Index

Constants

This section is empty.

Variables

View Source
var DefaultTimeout = 5 * time.Minute

DefaultTimeout is the default amount of time a JobProcessor should wait for a job to complete, once it's been sent to the downstream server.

View Source
var ErrFailedDecrement = fmt.Errorf("could not decrement queued job counter; job may have been archived or attempt number may not match the database")
View Source
var UnavailableSleepFactor = 500

UnavailableSleepFactor determines how long the application should sleep between 503 Service Unavailable downstream responses.

Functions

func ArchiveStuckJobs

func ArchiveStuckJobs(ctx context.Context, logger log.Logger, olderThan time.Duration) error

ArchiveStuckJobs marks as failed any queued jobs with an updated_at timestamp older than the olderThan value.

func Enqueue

Enqueue creates a new queued job with the given ID and fields. A sql.ErrNoRows will be returned if the `name` does not exist in the jobs table. Otherwise the QueuedJob will be returned.

func HandleStatusCallback

func HandleStatusCallback(ctx context.Context, logger log.Logger, id types.PrefixUUID, name string, status newmodels.ArchivedJobStatus, attempt int16, retryable bool) error

HandleStatusCallback updates a queued job with the provided status and the attempts remaining. Likely the job will either be inserted into archived_jobs and removed from queued_jobs, or the job will have its attempts counter decremented in queued_jobs.

This can return an error if any of the following happens: the archived_job already exists, the queued job no longer exists by the time you attempt to delete it, the number of attempts for the queued job don't match up with the passed in value (slow)

func WatchStuckJobs

func WatchStuckJobs(ctx context.Context, logger log.Logger, interval time.Duration, olderThan time.Duration)

WatchStuckJobs polls the queued_jobs table for stuck jobs (defined as in-progress jobs that haven't been updated in oldDuration time), and marks them as failed.

Types

type DownstreamHandler

type DownstreamHandler struct {
	log.Logger
	// contains filtered or unexported fields
}

func NewDownstreamHandler

func NewDownstreamHandler(logger log.Logger, downstreamUrl string, downstreamPassword string) *DownstreamHandler

func (*DownstreamHandler) Handle

type Handler

type Handler interface {
	log.Logger
	// Handle is responsible for notifying some downstream thing that there is
	// work to be done.
	Handle(context.Context, *newmodels.QueuedJob) error
}

type JobProcessor

type JobProcessor struct {
	log.Logger
	// Amount of time we should wait for the Handler to perform the work before
	// marking the job as failed.
	Timeout time.Duration

	Handler Handler
}

JobProcessor is the default implementation of the Worker interface.

func NewJobProcessor

func NewJobProcessor(h Handler) *JobProcessor

NewJobProcessor creates a services.JobProcessor that makes requests to the downstream url.

By default the Client uses Basic Auth with "jobs" as the username, and the configured password as the password.

If the downstream server does not hit the callback, jobs sent to the downstream server are timed out and marked as failed after DefaultTimeout has elapsed.

func (*JobProcessor) DoWork

func (jp *JobProcessor) DoWork(ctx context.Context, qj *newmodels.QueuedJob) error

DoWork sends the given queued job to the downstream service, then waits for it to complete.

func (*JobProcessor) Sleep

func (jp *JobProcessor) Sleep(failedAttempts int32) time.Duration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL