storage

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 26, 2021 License: GPL-3.0 Imports: 9 Imported by: 0

Documentation

Overview

Package storage is an abstraction/utility layer over Redis.

Index

Constants

View Source
const (
	// Each aggregation has a corresponding Redis Hash named in the form
	// "<AggrKeyPrefix><aggregation-id>" and containing various information
	// about the aggregation itself (eg. its limit).
	AggrKeyPrefix = "aggr:"

	// Job IDs of an individual aggregation exist in a Redis List named
	// in the form "<JobsKeyPrefix><aggregation-id>".
	JobsKeyPrefix = "jobs:"

	// Each Job has a corresponding Redis Hash named in the form
	// "<JobKeyPrefix><job-id>"
	JobKeyPrefix = "job:"

	// CallbackQueue contains IDs of jobs that are completed
	// and their callback is to be executed
	// TODO: this introduces coupling with the notifier. See how we can
	// separate it
	CallbackQueue = "CallbackQueue"

	// RIPQueue contains ids of jobs to be deleted
	RIPQueue = "JobDeletionQueue"
)

Variables

View Source
var (

	// ErrEmptyQueue is returned by ZPOP when there is no job in the queue
	ErrEmptyQueue = errors.New("Queue is empty")
	// ErrRetryLater is returned by ZPOP when there are only future jobs in the queue
	ErrRetryLater = errors.New("Retry again later")
	// ErrNotFound is returned by GetJob and GetAggregation when a requested
	// job, or aggregation respectively is not found in Redis.
	ErrNotFound = errors.New("Not Found")
)

Functions

func AggregationFromMap added in v0.1.0

func AggregationFromMap(m map[string]string) (job.Aggregation, error)

Types

type Storage

type Storage struct {
	Redis *redis.Client
}

Storage wraps a redis.Client instance.

func New

func New(r *redis.Client) (*Storage, error)

New returns a new Storage that can communicate with Redis. If Redis is not up an error will be returned.

Callers should set right after set AggrKeyPrefix, JobKeyPrefix and CallbackQueue fields on the returned storage.

func (*Storage) AggregationExists

func (s *Storage) AggregationExists(a *job.Aggregation) (bool, error)

AggregationExists checks if the given aggregation exists in Redis. If a non-nil error is returned, the first returned value should be ignored.

func (*Storage) GetAggregation

func (s *Storage) GetAggregation(id string) (*job.Aggregation, error)

GetAggregation fetches from Redis the aggregation denoted by id. In the case of ErrNotFound, the returned aggregation has valid ID and the default limit.

func (*Storage) GetJob

func (s *Storage) GetJob(id string) (job.Job, error)

GetJob fetches the job with the given id from Redis. In the case of ErrNotFound, the returned job has valid ID and can be used further.

func (*Storage) GetStats

func (s *Storage) GetStats(id string) ([]byte, error)

GetStats fetches stats prefixed entries from Redis

func (*Storage) JobExists

func (s *Storage) JobExists(j *job.Job) (bool, error)

JobExists checks if the given job exists in Redis. If a non-nil error is returned, the first returned value should be ignored.

func (*Storage) PopCallback

func (s *Storage) PopCallback() (job.Job, error)

PopCallback attempts to pop a Job from the callback queue. If it succeeds the job with the popped ID is returned.

func (*Storage) PopJob

func (s *Storage) PopJob(a *job.Aggregation) (job.Job, error)

PopJob attempts to pop a Job for that aggregation. If it succeeds the job with the popped ID is returned.

func (*Storage) PopRip

func (s *Storage) PopRip() (job.Job, error)

PopRip fetches a job from the RIPQueue ( if any ) and reports any errors. If the queue is empty an ErrEmptyQueue error is returned. Notice: Due to the nature of job deletion, the returned job is not guaranteed to be available in Redis.

func (*Storage) QueueJobForDeletion

func (s *Storage) QueueJobForDeletion(id string, delay time.Duration) error

QueueJobForDeletion pushes the provided job id to RIPQueue and returns any errors The job deletion can be delayed by the specified delay minutes.

func (*Storage) QueuePendingCallback

func (s *Storage) QueuePendingCallback(j *job.Job, delay time.Duration) error

QueuePendingCallback sets the state of a job to "Pending", saves it and adds it to its aggregation queue If a delay >0 is given, the job is queued with a higher score & actually later in time.

func (*Storage) QueuePendingDownload

func (s *Storage) QueuePendingDownload(j *job.Job, delay time.Duration) error

QueuePendingDownload sets the state of a job to "Pending", saves it and adds it to its aggregation queue. If a delay >0 is given, the job is queued with a higher score & actually later in time.

TODO: should we check that job already exists in redis? maybe do HSET instead?

func (*Storage) RemoveAggregation

func (s *Storage) RemoveAggregation(id string) error

RemoveAggregation deletes the aggregation key from Redis

func (*Storage) RemoveJob

func (s *Storage) RemoveJob(id string) error

RemoveJob removes the job key from Redis.

func (*Storage) RetryCallback

func (s *Storage) RetryCallback(j *job.Job) error

RetryCallback resets a job's callback state and injects it back to the callback queue. If the job is not found, an error is returned.

func (*Storage) SaveAggregation

func (s *Storage) SaveAggregation(a *job.Aggregation) error

SaveAggregation updates/creates the current aggregation in redis.

func (*Storage) SaveJob

func (s *Storage) SaveJob(j *job.Job) error

SaveJob updates or creates j in Redis.

TODO: should we check that the corresponding aggregation exists in redis?

func (*Storage) SetStats

func (s *Storage) SetStats(id, stats string, expiration time.Duration) error

SetStats saves stats in Redis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL