rsmq

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 6, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

README

RSMQ Fork

This is a fork of github.com/semihbkgr/go-rsmq v1.3.1 (commit d0f7bbc).

Changes from Original

The main change in this fork is support for custom message IDs. This allows for:

  • Deterministic message IDs based on your own identifiers
  • Message deduplication
  • Message overriding/rescheduling
Custom ID Notes

When using custom IDs, be aware that:

  • The msg.Sent timestamp will not reflect the actual send time for overridden messages
    • This field is derived from the message ID's timestamp part
    • For custom IDs, we use a fixed timestamp to avoid timing issues
    • Use Redis sorted set scores (via the delay parameter) for actual timing
  • Message timing is controlled by Redis sorted set scores, not by the ID's timestamp part
  • IDs must be exactly 32 characters long and contain only alphanumeric characters
  • Overriding a message with the same ID but different delay will correctly update the timing

Original License

MIT License - see original repository for details.

Documentation

Index

Constants

View Source
const (
	UnsetVt      = ^uint(0)
	UnsetDelay   = ^uint(0)
	UnsetMaxsize = -(int(^uint(0)>>1) - 1)
)

Unset values are the special values to refer default values of the attributes

Variables

View Source
var (
	ErrQueueNotFound   = errors.New("queue not found")
	ErrQueueExists     = errors.New("queue exists")
	ErrMessageTooLong  = errors.New("message too long")
	ErrMessageNotFound = errors.New("message not found")
)

Errors returned on rsmq operation

View Source
var (
	ErrInvalidQname   = errors.New("queue name is in wrong pattern")
	ErrInvalidVt      = errors.New("visibility timeout is out of range [0, 9999999]")
	ErrInvalidDelay   = errors.New("delay is out of range [0, 9999999]")
	ErrInvalidMaxsize = errors.New("max size is out of range [1024, 65536] and not -1")
	ErrInvalidID      = errors.New("id is in wrong pattern")
)

Validation errors

Functions

This section is empty.

Types

type Pipeliner added in v0.6.0

type Pipeliner interface {
	HSetNX(key, field string, value interface{}) *redis.BoolCmd
	HMGet(key string, fields ...string) *redis.SliceCmd
	Time() *redis.TimeCmd
	SAdd(key string, members ...interface{}) *redis.IntCmd
	HSet(key string, values ...interface{}) *redis.IntCmd
	HIncrBy(key, field string, incr int64) *redis.IntCmd
	ZAdd(key string, members ...redis.Z) *redis.IntCmd
	ZCard(key string) *redis.IntCmd
	ZCount(key, min, max string) *redis.IntCmd
	ZRem(key string, members ...interface{}) *redis.IntCmd
	HDel(key string, fields ...string) *redis.IntCmd
	SRem(key string, members ...interface{}) *redis.IntCmd
	Del(keys ...string) *redis.IntCmd
	ZRangeByScoreWithScores(key string, opt *redis.ZRangeBy) *redis.ZSliceCmd
	Exec() ([]redis.Cmder, error)
}

Pipeliner interface defines the pipeline operations needed by RSMQ

type QueueAttributes

type QueueAttributes struct {
	Vt         uint
	Delay      uint
	Maxsize    int
	TotalRecv  uint64
	TotalSent  uint64
	Created    uint64
	Modified   uint64
	Msgs       uint64
	HiddenMsgs uint64
}

QueueAttributes contains some attributes and stats of queue

type QueueMessage

type QueueMessage struct {
	ID      string
	Message string
	Rc      uint64
	Fr      time.Time
	Sent    time.Time
}

QueueMessage contains content and metadata of message received from queue

type RedisAdapter added in v0.6.0

type RedisAdapter struct {
	// contains filtered or unexported fields
}

RedisAdapter wraps a v9 Redis client to implement the RedisClient interface It uses context.Background() for all operations to maintain backward compatibility

func NewRedisAdapter added in v0.6.0

func NewRedisAdapter(client redis.Cmdable) *RedisAdapter

NewRedisAdapter creates a new adapter for the v9 Redis client

func NewRedisAdapterWithContext added in v0.6.0

func NewRedisAdapterWithContext(ctx context.Context, client redis.Cmdable) *RedisAdapter

NewRedisAdapterWithContext creates a new adapter with a specific context

func (*RedisAdapter) Close added in v0.6.0

func (r *RedisAdapter) Close() error

func (*RedisAdapter) Del added in v0.6.0

func (r *RedisAdapter) Del(keys ...string) *redis.IntCmd

func (*RedisAdapter) EvalSha added in v0.6.0

func (r *RedisAdapter) EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd

func (*RedisAdapter) Exists added in v0.6.0

func (r *RedisAdapter) Exists(keys ...string) *redis.IntCmd

func (*RedisAdapter) HDel added in v0.6.0

func (r *RedisAdapter) HDel(key string, fields ...string) *redis.IntCmd

func (*RedisAdapter) HIncrBy added in v0.6.0

func (r *RedisAdapter) HIncrBy(key, field string, incr int64) *redis.IntCmd

func (*RedisAdapter) HMGet added in v0.6.0

func (r *RedisAdapter) HMGet(key string, fields ...string) *redis.SliceCmd

func (*RedisAdapter) HSet added in v0.6.0

func (r *RedisAdapter) HSet(key, field string, value interface{}) *redis.BoolCmd

func (*RedisAdapter) HSetNX added in v0.6.0

func (r *RedisAdapter) HSetNX(key, field string, value interface{}) *redis.BoolCmd

func (*RedisAdapter) SAdd added in v0.6.0

func (r *RedisAdapter) SAdd(key string, members ...interface{}) *redis.IntCmd

func (*RedisAdapter) SMembers added in v0.6.0

func (r *RedisAdapter) SMembers(key string) *redis.StringSliceCmd

func (*RedisAdapter) SRem added in v0.6.0

func (r *RedisAdapter) SRem(key string, members ...interface{}) *redis.IntCmd

func (*RedisAdapter) ScriptLoad added in v0.6.0

func (r *RedisAdapter) ScriptLoad(script string) *redis.StringCmd

func (*RedisAdapter) Time added in v0.6.0

func (r *RedisAdapter) Time() *redis.TimeCmd

func (*RedisAdapter) TxPipeline added in v0.6.0

func (r *RedisAdapter) TxPipeline() Pipeliner

func (*RedisAdapter) Type added in v0.6.0

func (r *RedisAdapter) Type(key string) *redis.StatusCmd

func (*RedisAdapter) ZAdd added in v0.6.0

func (r *RedisAdapter) ZAdd(key string, members ...redis.Z) *redis.IntCmd

func (*RedisAdapter) ZCard added in v0.6.0

func (r *RedisAdapter) ZCard(key string) *redis.IntCmd

func (*RedisAdapter) ZCount added in v0.6.0

func (r *RedisAdapter) ZCount(key, min, max string) *redis.IntCmd

func (*RedisAdapter) ZRem added in v0.6.0

func (r *RedisAdapter) ZRem(key string, members ...interface{}) *redis.IntCmd

type RedisClient added in v0.6.0

type RedisClient interface {
	Time() *redis.TimeCmd
	HSetNX(key, field string, value interface{}) *redis.BoolCmd
	HMGet(key string, fields ...string) *redis.SliceCmd
	SMembers(key string) *redis.StringSliceCmd
	SAdd(key string, members ...interface{}) *redis.IntCmd
	ZCard(key string) *redis.IntCmd
	ZCount(key, min, max string) *redis.IntCmd
	ZAdd(key string, members ...redis.Z) *redis.IntCmd
	HSet(key, field string, value interface{}) *redis.BoolCmd
	HIncrBy(key, field string, incr int64) *redis.IntCmd
	Del(keys ...string) *redis.IntCmd
	HDel(key string, fields ...string) *redis.IntCmd
	ZRem(key string, members ...interface{}) *redis.IntCmd
	SRem(key string, members ...interface{}) *redis.IntCmd
	EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
	ScriptLoad(script string) *redis.StringCmd
	TxPipeline() Pipeliner
	Exists(keys ...string) *redis.IntCmd
	Type(key string) *redis.StatusCmd
	Close() error
}

RedisClient interface defines the operations needed by RSMQ Both redis.Client and redis.ClusterClient implement these methods

type RedisSMQ

type RedisSMQ struct {
	// contains filtered or unexported fields
}

RedisSMQ is the client of rsmq to execute queue and message operations

func NewRedisSMQ

func NewRedisSMQ(client RedisClient, ns string, logger ...*logging.Logger) *RedisSMQ

NewRedisSMQ creates and returns new rsmq client

func (*RedisSMQ) ChangeMessageVisibility

func (rsmq *RedisSMQ) ChangeMessageVisibility(qname string, id string, vt uint) error

ChangeMessageVisibility changes message visibility to refer queue vt

err:=redisRsmq.ChangeMessageVisibility(qname,id,rsmq.UnsetVt)

func (*RedisSMQ) CreateQueue

func (rsmq *RedisSMQ) CreateQueue(qname string, vt uint, delay uint, maxsize int) error

CreateQueue creates a new queue with given attributes to create new queue with default attributes:

err:=redisRsmq.CreateQueue(qname,rsmq.UnsetVt,rsmq.UnsetDelay,rsmq.UnsetMaxsize)

func (*RedisSMQ) DeleteMessage

func (rsmq *RedisSMQ) DeleteMessage(qname string, id string) error

DeleteMessage deletes message in queue

func (*RedisSMQ) DeleteQueue

func (rsmq *RedisSMQ) DeleteQueue(qname string) error

DeleteQueue deletes queue

func (*RedisSMQ) GetQueueAttributes

func (rsmq *RedisSMQ) GetQueueAttributes(qname string) (*QueueAttributes, error)

GetQueueAttributes returns queue attributes

func (*RedisSMQ) ListQueues

func (rsmq *RedisSMQ) ListQueues() ([]string, error)

ListQueues returns the slice consist of the existing queues

func (*RedisSMQ) PopMessage

func (rsmq *RedisSMQ) PopMessage(qname string) (*QueueMessage, error)

PopMessage pop message from queue

func (*RedisSMQ) Quit

func (rsmq *RedisSMQ) Quit() error

Quit closes redis client

func (*RedisSMQ) ReceiveMessage

func (rsmq *RedisSMQ) ReceiveMessage(qname string, vt uint) (*QueueMessage, error)

ReceiveMessage receives message from the queue

func (*RedisSMQ) SendMessage

func (rsmq *RedisSMQ) SendMessage(qname string, message string, delay uint, opts ...SendMessageOption) (string, error)

SendMessage sends a message to the queue. If a custom ID is provided via WithMessageID option: - The message will use that ID instead of generating a new one - If a message with that ID already exists, it will be overridden - The msg.Sent timestamp will not reflect the actual send time for overridden messages - Message timing is controlled by the delay parameter, not by the ID's timestamp

func (*RedisSMQ) SetQueueAttributes

func (rsmq *RedisSMQ) SetQueueAttributes(qname string, vt uint, delay uint, maxsize int) (*QueueAttributes, error)

SetQueueAttributes sets queue attributes to not change some attributes:

queAttrib,err:=redisRsmq.CreateQueue(qname,rsmq.UnsetVt,rsmq.UnsetDelay,newMaxsize)

type SendMessageOption

type SendMessageOption func(*sendMessageOptions)

SendMessageOption is a function that configures sendMessageOptions

func WithMessageID

func WithMessageID(id string) SendMessageOption

WithMessageID returns a SendMessageOption that sets a custom message ID

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL