redis

package module
v0.0.0-...-192d75f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2024 License: MIT Imports: 14 Imported by: 0

README

Overview

Go Reference Go codecov Go Report Card Mit License

Package redis is a redis backed implementation of the queue interface

Installation

go get -u -v github.com/gopi-frame/queue/driver/redis

Import

import "github.com/gopi-frame/queue/redis"

Usage

package main

import (
    "context"
    "github.com/gopi-frame/queue"
    "github.com/gopi-frame/queue/driver/redis"
    redislib "github.com/redis/go-redis/v9"
)

type CustomJob struct {
    queue.Job `json:"-"`
}

func (c *CustomJob) Handle() error {
    // do something
    return nil
}

func (c *CustomJob) Failed(err error) {
    // handle failed job
}

func main() {
    db := redislib.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    if err := db.Ping(context.Background()).Err(); err != nil {
        panic(err)
    }
    q := redis.NewQueue(&redis.Config{
        DB: db,
        Name: "queue",
        Job: new(CustomJob),
    })
    q.Enqueue(new(CustomJob))
    q.Enqueue(new(CustomJob))
    q.Enqueue(new(CustomJob))
    fmt.Println("count:", q.Count()) // Output: count: 3
    for {
        if job, ok := q.Dequeue(); ok {
            if err := job.Handle(); err != nil {
                if job.GetQueueable().GetAttempts < job.GetMaxAttempts() {
                    q.Release(job)
                } else {
                    job.Failed(err)
                }
            } else {
                q.Ack(job)
            }
        } else {
            time.Sleep(time.Millisecond * 100)
        }
    }
}

Documentation

Overview

Package redis is a redis queue driver for queue package.

Index

Constants

View Source
const QueueJobIDKeyFormat = "GOPI:QUEUE:INDEX:%s"

QueueJobIDKeyFormat is the redis key format for storing job id in a stored set

View Source
const QueueJobItemKeyFormat = "GOPI:QUEUE:ITEMS:%s"

QueueJobItemKeyFormat is the redis key format for storing job hash in a hash table

Variables

This section is empty.

Functions

func Open

func Open(options map[string]any) (queuecontract.Queue, error)

Open is a convenience function that calls Driver.Open.

Types

type Config

type Config struct {
	DB   rediscontract.Client
	Name string
	Job  queuecontract.Job
}

Config for redis queue

func NewConfig

func NewConfig(db rediscontract.Client, name string, job queuecontract.Job) *Config

NewConfig creates a new redis queue config

func (*Config) Apply

func (c *Config) Apply(opts ...Option) error

func (*Config) Valid

func (c *Config) Valid() error

Valid validates the config

type Driver

type Driver struct {
}

Driver is the redis queue driver

func (Driver) Open

func (Driver) Open(options map[string]any) (queuecontract.Queue, error)

Open opens the redis queue

type Job

type Job struct {
	ID          uuid.UUID `json:"id"`
	Queue       string    `json:"queue"`
	Payload     queue.Job `json:"payload"`
	Attempts    int       `json:"attempts"`
	AvailableAt time.Time `json:"available_at"`
	CreatedAt   time.Time `json:"created_at"`
}

Job is redis queueable job wrapper

func NewJob

func NewJob(queue string, payload queue.Job) *Job

NewJob creates new redis queueable job

func (*Job) GetAttempts

func (j *Job) GetAttempts() int

GetAttempts returns how many times the job has been attempted

func (*Job) GetID

func (j *Job) GetID() string

GetID returns the job ID

func (*Job) GetPayload

func (j *Job) GetPayload() queue.Job

GetPayload returns the job payload

func (*Job) GetQueue

func (j *Job) GetQueue() string

GetQueue returns the job queue

type LuaScript

type LuaScript struct{}

LuaScript is a Lua script set for redis queue

func (LuaScript) Dequeue

func (LuaScript) Dequeue() string

Dequeue returns the Lua script for popping a job

Parameters:

KEYS[1] = index sorted set key
KEYS[2] = hash table key
ARGV[1] = number of jobs to pop
ARGV[2] = current time

func (LuaScript) Enqueue

func (LuaScript) Enqueue() string

Enqueue returns the Lua script for pushing a job

Parameters:

KEYS[1] = index sorted set key
KEYS[2] = hash table key
ARGV[1] = score
ARGV[2] = job id
ARGV[3] = job hash

func (LuaScript) Remove

func (LuaScript) Remove() string

Remove returns the Lua script for removing a job

Parameters:

KEYS[1] = index sorted set key
KEYS[2] = hash table key
ARGV[1] = job id

type Option

type Option func(cfg *Config) error

func WithDB

func WithDB(db rediscontract.Client) Option

WithDB sets the redis client. If db is nil, then db is not changed.

func WithJob

func WithJob(job queuecontract.Job) Option

WithJob sets the job type. If job is nil, then an exception is returned.

func WithName

func WithName(name string) Option

WithName sets the queue name. If name is empty, then name is not changed.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is a redis queue implementation

func NewQueue

func NewQueue(cfg *Config, opts ...Option) *Queue

NewQueue creates a new redis queue

func (*Queue) Ack

func (q *Queue) Ack(_ queuecontract.Job)

Ack acknowledges a job

func (*Queue) Count

func (q *Queue) Count() int64

Count returns the number of jobs in the queue

func (*Queue) Dequeue

func (q *Queue) Dequeue() (queuecontract.Job, bool)

Dequeue removes and returns a job from the queue

func (*Queue) Empty

func (q *Queue) Empty() bool

Empty returns true if queue is empty

func (*Queue) Enqueue

func (q *Queue) Enqueue(job queuecontract.Job) (queuecontract.Job, bool)

Enqueue adds a job to the queue

func (*Queue) Name

func (q *Queue) Name() string

Name returns the queue name

func (*Queue) Release

func (q *Queue) Release(job queuecontract.Job)

Release releases a job back to the queue

func (*Queue) Remove

func (q *Queue) Remove(job queuecontract.Job)

Remove removes a job from the queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL