redis

package
v1.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2021 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PoolPrefix       = "j"
	QueuePrefix      = "q"
	DeadLetterPrefix = "d"
	MetaPrefix       = "m"

	BatchSize = int64(100)
)
View Source
const (
	Namespace = "infra"
	Subsystem = "lmstfy_redis"
)
View Source
const MaxRedisConnections = 5000

Variables

This section is empty.

Functions

func NewEngine

func NewEngine(redisName string, conn *go_redis.Client) (engine.Engine, error)

func PoolJobKey

func PoolJobKey(j engine.Job) string

func PoolJobKey2

func PoolJobKey2(namespace, queue, jobID string) string

func PoolJobKeyPrefix

func PoolJobKeyPrefix(namespace, queue string) string

func PreloadDeadLetterLuaScript

func PreloadDeadLetterLuaScript(redis *RedisInstance) error

Because the DeadLetter is not like Timer which is a singleton, DeadLetters are transient objects like Queue. So we have to preload the lua scripts separately.

func PreloadQueueLuaScript added in v1.0.5

func PreloadQueueLuaScript(redis *RedisInstance) error

func RedisInstanceMonitor

func RedisInstanceMonitor(redis *RedisInstance)

func Setup

func Setup(conf *config.Config, l *logrus.Logger) error

Setup set the essential config of redis engine

Types

type DeadLetter

type DeadLetter struct {
	// contains filtered or unexported fields
}

DeadLetter is where dead job will be buried, the job can be respawned into ready queue

func NewDeadLetter

func NewDeadLetter(namespace, queue string, redis *RedisInstance) (*DeadLetter, error)

NewDeadLetter return an instance of DeadLetter storage

func (*DeadLetter) Add

func (dl *DeadLetter) Add(jobID string) error

Add a job to dead letter. NOTE the data format is the same as the ready queue (lua struct `HHc0`), by doing this we could directly pop the dead job back to the ready queue.

NOTE: this method is not called any where except in tests, but this logic is implement in the timer's pump script. please refer to that.

func (*DeadLetter) Delete

func (dl *DeadLetter) Delete(limit int64) (count int64, err error)

func (*DeadLetter) Name

func (dl *DeadLetter) Name() string

func (*DeadLetter) Peek

func (dl *DeadLetter) Peek() (size int64, jobID string, err error)

func (*DeadLetter) Respawn

func (dl *DeadLetter) Respawn(limit, ttlSecond int64) (count int64, err error)

func (*DeadLetter) Size

func (dl *DeadLetter) Size() (size int64, err error)

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine that connects all the dots including: - store jobs to timer set or ready queue - deliver jobs to clients - manage dead letters

func (*Engine) BatchConsume

func (e *Engine) BatchConsume(namespace string, queues []string, count, ttrSecond, timeoutSecond uint32) (jobs []engine.Job, err error)

BatchConsume consume some jobs of a queue

func (*Engine) Consume

func (e *Engine) Consume(namespace string, queues []string, ttrSecond, timeoutSecond uint32) (job engine.Job, err error)

Consume multiple queues under the same namespace. the queue order implies priority: the first queue in the list is of the highest priority when that queue has job ready to be consumed. if none of the queues has any job, then consume wait for any queue that has job first.

func (*Engine) Delete

func (e *Engine) Delete(namespace, queue, jobID string) error

func (*Engine) DeleteDeadLetter

func (e *Engine) DeleteDeadLetter(namespace, queue string, limit int64) (count int64, err error)

func (*Engine) Destroy

func (e *Engine) Destroy(namespace, queue string) (count int64, err error)

func (*Engine) DumpInfo

func (e *Engine) DumpInfo(out io.Writer) error

func (*Engine) Peek

func (e *Engine) Peek(namespace, queue, optionalJobID string) (job engine.Job, err error)

func (*Engine) PeekDeadLetter

func (e *Engine) PeekDeadLetter(namespace, queue string) (size int64, jobID string, err error)

func (*Engine) Publish

func (e *Engine) Publish(namespace, queue string, body []byte, ttlSecond, delaySecond uint32, tries uint16) (jobID string, err error)

func (*Engine) RespawnDeadLetter

func (e *Engine) RespawnDeadLetter(namespace, queue string, limit, ttlSecond int64) (count int64, err error)

func (*Engine) Shutdown

func (e *Engine) Shutdown()

func (*Engine) Size

func (e *Engine) Size(namespace, queue string) (size int64, err error)

func (*Engine) SizeOfDeadLetter

func (e *Engine) SizeOfDeadLetter(namespace, queue string) (size int64, err error)

SizeOfDeadLetter return the queue size of dead letter

type MetaManager

type MetaManager struct {
	// contains filtered or unexported fields
}

func NewMetaManager

func NewMetaManager(redis *RedisInstance) *MetaManager

func (*MetaManager) Dump

func (m *MetaManager) Dump() (map[string][]string, error)

func (*MetaManager) ListNamespaces

func (m *MetaManager) ListNamespaces() (namespaces []string, err error)

func (*MetaManager) ListQueues

func (m *MetaManager) ListQueues(namespace string) (queues []string, err error)

func (*MetaManager) RecordIfNotExist

func (m *MetaManager) RecordIfNotExist(namespace, queue string)

func (*MetaManager) Remove

func (m *MetaManager) Remove(namespace, queue string)

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool stores all the jobs' data. this is a global singleton per engine note: this `Pool` is NOT the same terminology as the EnginePool

func NewPool

func NewPool(redis *RedisInstance) *Pool

func (*Pool) Add

func (p *Pool) Add(j engine.Job) error

func (*Pool) Delete

func (p *Pool) Delete(namespace, queue, jobID string) error

func (*Pool) Get

func (p *Pool) Get(namespace, queue, jobID string) (body []byte, ttlSecond uint32, err error)

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is the "ready queue" that has all the jobs that can be consumed right now

func NewQueue

func NewQueue(namespace, queue string, redis *RedisInstance, timer *Timer) *Queue

func (*Queue) Destroy

func (q *Queue) Destroy() (count int64, err error)

func (*Queue) Name

func (q *Queue) Name() string

func (*Queue) Peek

func (q *Queue) Peek() (jobID string, tries uint16, err error)

Peek a right-most element in the list without popping it

func (*Queue) Poll

func (q *Queue) Poll(timeoutSecond, ttrSecond uint32) (jobID string, tries uint16, err error)

Pop a job. If the tries > 0, add job to the "in-flight" timer with timestamp set to `TTR + now()`; Or we might just move the job to "dead-letter".

func (*Queue) Push

func (q *Queue) Push(j engine.Job, tries uint16) error

Push a job into the queue, the job data format: {tries}{job id}

func (*Queue) Size

func (q *Queue) Size() (size int64, err error)

Return number of the current in-queue jobs

type QueueName

type QueueName struct {
	Namespace string
	Queue     string
}

func PollQueues

func PollQueues(redis *RedisInstance, timer *Timer, queueNames []QueueName, timeoutSecond, ttrSecond uint32) (queueName *QueueName, jobID string, retries uint16, err error)

Poll from multiple queues using blocking method; OR pop a job from one queue using non-blocking method

func (*QueueName) Decode

func (k *QueueName) Decode(str string) error

func (*QueueName) String

func (k *QueueName) String() string

type RedisInfo

type RedisInfo struct {
	MemUsed   int64 // used_memory
	MemMax    int64 // maxmemory
	NKeys     int64 // total keys
	NExpires  int64 // keys with TTL
	NClients  int64 // connected_clients
	NBlocking int64 // blocked_clients
}

func GetRedisInfo

func GetRedisInfo(redis *RedisInstance) *RedisInfo

type RedisInstance

type RedisInstance struct {
	Name string
	Conn *go_redis.Client
}

type SizeMonitor

type SizeMonitor struct {
	// contains filtered or unexported fields
}

func NewSizeMonitor

func NewSizeMonitor(redis *RedisInstance, timer *Timer, preloadData map[string][]string) *SizeMonitor

func (*SizeMonitor) Loop

func (m *SizeMonitor) Loop()

func (*SizeMonitor) MonitorIfNotExist

func (m *SizeMonitor) MonitorIfNotExist(namespace, queue string)

func (*SizeMonitor) Remove

func (m *SizeMonitor) Remove(namespace, queue string)

type SizeProvider

type SizeProvider interface {
	Size() (size int64, err error)
}

type Timer

type Timer struct {
	// contains filtered or unexported fields
}

Timer is the other way of saying "delay queue". timer kick jobs into ready queue when it's ready.

func NewTimer

func NewTimer(name string, redis *RedisInstance, interval time.Duration) (*Timer, error)

NewTimer return an instance of delay queue

func (*Timer) Add

func (t *Timer) Add(namespace, queue, jobID string, delaySecond uint32, tries uint16) error

func (*Timer) Name

func (t *Timer) Name() string

func (*Timer) Shutdown

func (t *Timer) Shutdown()

func (*Timer) Size

func (t *Timer) Size() (size int64, err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL