redisqueue

package module
v0.0.0-...-879ca03 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2023 License: MIT Imports: 17 Imported by: 0

README

Redis-queue GO library

Install
import:
- package: github.com/best-expendables-v2/redis-queue
  version: x.x.x
examples: Check examples folder

Documentation

Index

Constants

View Source
const (
	FieldJobPayload = "job"
)

Variables

View Source
var (
	ErrFailedWithUnknownData = errors.New("consume failed with unknown data")
	ErrJobExceedRetryTimes   = errors.New("job exceeds retry times")
	ErrorInValidJobModel     = errors.New("invalid job struct")
	ErrDbEmpty               = errors.New("db connection invalid")
	ErrRedisConnectionEmpty  = errors.New("Redis connection invalid")
)

Functions

func GetGormFromContext

func GetGormFromContext(ctx context.Context) *gorm.DB

func GetRedisClientFromContext

func GetRedisClientFromContext(ctx context.Context) *redis.Client

func LogFailedJob

func LogFailedJob(ctx context.Context, j Job, err error)

func MakeConsumer

func MakeConsumer(
	jobFactory JobFactory,
	handlerFactory HandlerFactory,
	publisher Publisher,
	failJobHandlers []FailHandler,
	middlewareList []HandlerMiddleWare,
) *defaultConsumer

func NewRelicGormWithTransaction

func NewRelicGormWithTransaction(dbConn *gorm.DB) func(next HandleFunc) HandleFunc

func NewRelicToGorm

func NewRelicToGorm(dbConn *gorm.DB) func(next HandleFunc) HandleFunc

func NewRelicToRedis

func NewRelicToRedis(c *redis.Client) func(next HandleFunc) HandleFunc

func NewRmqConn

func NewRmqConn(redisConn *redis.Client) (rmq.Connection, error)

NewRmqConn returns a connection to RedisQueue with redisClient

func NewRmqConnFromRedisConfig

func NewRmqConnFromRedisConfig(redisConfig *RedisConfig) (rmq.Connection, error)

NewRmqConnFromRedisConfig returns a connection to RedisQueue using redisConfig

func SetGormToContext

func SetGormToContext(ctx context.Context, dbConn *gorm.DB) context.Context

func SetRedisClientToContext

func SetRedisClientToContext(ctx context.Context, c *redis.Client) context.Context

func WithNewRelicForConsumer

func WithNewRelicForConsumer(nrApp newrelic.Application) func(next HandleFunc) HandleFunc

Types

type BaseHandler

type BaseHandler struct{}

Base handler, all handler should embed this one

func (*BaseHandler) Handle

func (handler *BaseHandler) Handle(_ context.Context, _ Job) error

Handle job

func (*BaseHandler) ShouldRejectOnFailure

func (handler *BaseHandler) ShouldRejectOnFailure(err error) bool

Depend on error, we can move job to rejected queue OR just skip, ignore the job

func (*BaseHandler) ShouldRetryOnError

func (handler *BaseHandler) ShouldRetryOnError(err error) bool

Determine if which this error job should retry or fail

type ConsumerManager

type ConsumerManager interface {
	Add(queueName string, consumer rmq.Consumer)
	StartConsuming(queueName string, replicas int, pollDuration time.Duration)
	StopConsuming(queueName string)
}

func NewConsumerManager

func NewConsumerManager() (ConsumerManager, error)

NewConsumerManager returns a ConsumerManager

func NewConsumerManagerFromConfig

func NewConsumerManagerFromConfig(conf *RedisConfig) (ConsumerManager, error)

func NewConsumerManagerWithConnection

func NewConsumerManagerWithConnection(conn rmq.Connection) ConsumerManager

type ConsumerManagerMock

type ConsumerManagerMock struct {
	AddFn            func(queueName string, consumer rmq.Consumer)
	StartConsumingFn func(queueName string, replicas int, pollDuration time.Duration)
	StopConsumingFn  func(queueName string)
}

func (ConsumerManagerMock) Add

func (p ConsumerManagerMock) Add(queueName string, consumer rmq.Consumer)

func (ConsumerManagerMock) StartConsuming

func (p ConsumerManagerMock) StartConsuming(queueName string, replicas int, pollDuration time.Duration)

func (ConsumerManagerMock) StopConsuming

func (p ConsumerManagerMock) StopConsuming(queueName string)

type FailHandler

type FailHandler func(ctx context.Context, j Job, err error)

type HandleFunc

type HandleFunc func(context.Context, Job) error

func LogJob

func LogJob(next HandleFunc) HandleFunc

Log message middleware

type Handler

type Handler interface {
	// Handle job return nil error mean job was processed successfully, otherwise job
	// was failed and method FailJob will be called
	Handle(ctx context.Context, job Job) error
	// Should retry on error
	ShouldRetryOnError(err error) bool
	// Should be move to rejected queue in case of fail
	ShouldRejectOnFailure(err error) bool
}

type HandlerFactory

type HandlerFactory func(ctx context.Context) Handler

Make handler from context Brand new handler is created for every request

type HandlerMiddleWare

type HandlerMiddleWare func(next HandleFunc) HandleFunc

type HandlerMock

type HandlerMock struct {
	HandleFn                func(ctx context.Context, job Job) error
	ShouldRetryOnErrorFn    func(err error) bool
	ShouldRejectOnFailureFn func(err error) bool
}

func (HandlerMock) Handle

func (handler HandlerMock) Handle(ctx context.Context, job Job) error

func (HandlerMock) ShouldRejectOnFailure

func (handler HandlerMock) ShouldRejectOnFailure(err error) bool

func (HandlerMock) ShouldRetryOnError

func (handler HandlerMock) ShouldRetryOnError(err error) bool

type Job

type Job interface {
	// Get job ID
	GetID() string
	// Get user who triggered job
	GetUserID() string
	// Get trace id of request which triggered job
	GetTraceID() string
	// Set Queue
	OnQueue(queueName string)
	// Increase number of attempt times
	Attempt()
	// Get the number of job's attempt times
	GetAttempts() int
	// Mark job as failed
	Fail(err error)
	// Retry on error
	Retry(err error)
	// Determine if the job has been marked as a failure.
	HasFailed() bool
	// Get the number of times to attempt a job. Default is 1.
	GetMaxTries() int
	// Get job's Queue name
	GetQueue() string
	// Get delay time time in second before the job is retried again
	Delay() int
	// Return error string why job failed
	GetFailedError() string
}

type JobFactory

type JobFactory func() Job

Return a new job instance for consumer to decode payload json

type Publisher

type Publisher interface {
	Publish(ctx context.Context, job Job) error
	PublishOnDelay(ctx context.Context, job Job) error
	PublishRejected(ctx context.Context, job Job) error
	UseMiddlewares(m ...PublisherHandlerMiddleWare)
}

func NewPublisher

func NewPublisher() (Publisher, error)

func NewPublisherFromConfig

func NewPublisherFromConfig(conf *RedisConfig) (Publisher, error)

func NewPublisherWithConnection

func NewPublisherWithConnection(conn rmq.Connection) (Publisher, error)

type PublisherHandlerFunc

type PublisherHandlerFunc func(ctx context.Context, job Job) error

type PublisherHandlerMiddleWare

type PublisherHandlerMiddleWare func(next PublisherHandlerFunc) PublisherHandlerFunc

func WithNewRelicTransaction

func WithNewRelicTransaction() PublisherHandlerMiddleWare

type PublisherMock

type PublisherMock struct {
	PublishFn         func(queue string, job Job) error
	PublishOnDelayFn  func(queue string, job Job, delayAt time.Time) error
	PublishRejectedFn func(job Job) error
}

func (PublisherMock) Publish

func (p PublisherMock) Publish(queue string, job Job) error

func (PublisherMock) PublishOnDelay

func (p PublisherMock) PublishOnDelay(queue string, job Job, delayAt time.Time) error

func (PublisherMock) PublishRejected

func (p PublisherMock) PublishRejected(job Job) error

type RedisConfig

type RedisConfig struct {
	RedisMaster              string `envconfig:"REDIS_MASTER" required:"true"`
	SentinelHost             string `envconfig:"SENTINEL_HOST" required:"true"`
	SentinelPort             string `envconfig:"SENTINEL_PORT" required:"true"`
	RedisMaxActiveConnection int    `envconfig:"REDIS_MAX_ACTIVE" required:"false"`
	MaxIdle                  int    `envconfig:"REDIS_MAX_IDLE" required:"false"`
}

func GetConfigFromEnv

func GetConfigFromEnv() RedisConfig

func (*RedisConfig) GetSentinelAddress

func (redisConfig *RedisConfig) GetSentinelAddress() string

type RedisJob

type RedisJob struct {
	ID       string
	UserID   string
	TraceID  string
	Queue    string
	Attempts int
	Failed   bool
	Error    string
}

func NewRedisJob

func NewRedisJob(userID, traceID string) *RedisJob

func (*RedisJob) Attempt

func (job *RedisJob) Attempt()

func (*RedisJob) Delay

func (job *RedisJob) Delay() int

func (*RedisJob) Fail

func (job *RedisJob) Fail(err error)

func (*RedisJob) GetAttempts

func (job *RedisJob) GetAttempts() int

func (*RedisJob) GetFailedError

func (job *RedisJob) GetFailedError() string

func (*RedisJob) GetID

func (job *RedisJob) GetID() string

func (*RedisJob) GetMaxTries

func (job *RedisJob) GetMaxTries() int

func (*RedisJob) GetQueue

func (job *RedisJob) GetQueue() string

func (*RedisJob) GetTraceID

func (job *RedisJob) GetTraceID() string

func (*RedisJob) GetUserID

func (job *RedisJob) GetUserID() string

func (*RedisJob) HasFailed

func (job *RedisJob) HasFailed() bool

func (*RedisJob) OnQueue

func (job *RedisJob) OnQueue(queue string)

func (*RedisJob) Retry

func (job *RedisJob) Retry(err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL