Documentation ¶
Overview ¶
Mediation layer between the server and database queries.
Logic that's not related to validating request input/turning errors into HTTP responses should go here.
Index ¶
- Variables
- func ArchiveStuckJobs(ctx context.Context, logger log.Logger, olderThan time.Duration) error
- func Enqueue(ctx context.Context, db *newmodels.Queries, params newmodels.EnqueueJobParams) (newmodels.QueuedJob, error)
- func HandleStatusCallback(ctx context.Context, logger log.Logger, id types.PrefixUUID, name string, ...) error
- func WatchStuckJobs(ctx context.Context, logger log.Logger, interval time.Duration, ...)
- type DownstreamHandler
- type Handler
- type JobProcessor
Constants ¶
This section is empty.
Variables ¶
var DefaultTimeout = 5 * time.Minute
DefaultTimeout is the default amount of time a JobProcessor should wait for a job to complete, once it's been sent to the downstream server.
var ErrFailedDecrement = fmt.Errorf("could not decrement queued job counter; job may have been archived or attempt number may not match the database")
UnavailableSleepFactor determines how long the application should sleep between 503 Service Unavailable downstream responses.
Functions ¶
func ArchiveStuckJobs ¶
ArchiveStuckJobs marks as failed any queued jobs with an updated_at timestamp older than the olderThan value.
func Enqueue ¶
func Enqueue(ctx context.Context, db *newmodels.Queries, params newmodels.EnqueueJobParams) (newmodels.QueuedJob, error)
Enqueue creates a new queued job with the given ID and fields. A sql.ErrNoRows will be returned if the `name` does not exist in the jobs table. Otherwise the QueuedJob will be returned.
func HandleStatusCallback ¶
func HandleStatusCallback(ctx context.Context, logger log.Logger, id types.PrefixUUID, name string, status newmodels.ArchivedJobStatus, attempt int16, retryable bool) error
HandleStatusCallback updates a queued job with the provided status and the attempts remaining. Likely the job will either be inserted into archived_jobs and removed from queued_jobs, or the job will have its attempts counter decremented in queued_jobs.
This can return an error if any of the following happens: the archived_job already exists, the queued job no longer exists by the time you attempt to delete it, the number of attempts for the queued job don't match up with the passed in value (slow)
Types ¶
type DownstreamHandler ¶
func NewDownstreamHandler ¶
func NewDownstreamHandler(logger log.Logger, downstreamUrl string, downstreamPassword string) *DownstreamHandler
type JobProcessor ¶
type JobProcessor struct { log.Logger // Amount of time we should wait for the Handler to perform the work before // marking the job as failed. Timeout time.Duration Handler Handler }
JobProcessor is the default implementation of the Worker interface.
func NewJobProcessor ¶
func NewJobProcessor(h Handler) *JobProcessor
NewJobProcessor creates a services.JobProcessor that makes requests to the downstream url.
By default the Client uses Basic Auth with "jobs" as the username, and the configured password as the password.
If the downstream server does not hit the callback, jobs sent to the downstream server are timed out and marked as failed after DefaultTimeout has elapsed.