redis_queue

package module
v0.0.0-...-b0a5b4a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2024 License: MIT Imports: 3 Imported by: 3

README

redis-queue

a reliable, lightweight message queue for golang

Feature

  1. Queue Interaction: Enables push and pop interactions with the message queue.
  2. Atomic: Handles push and pop operations atomically, providing safety against race conditions.
  3. Persistence: Ensures no message loss, even in the event of an unexpected program termination.

Usage

go get -u github.com/asheswook/redis-queue
package main

import (
  "github.com/redis/go-redis/v9"
  redisqueue "github.com/asheswook/redis-queue"
)

func main() {
  // Using go-redis client
  // You can also use normal client instead of cluster client
  client := redis.NewClusterClient( 
    &redis.ClusterOptions{
      Addrs: config.Addrs,
    },
  )

  queue := redisqueue.NewSafeQueue(
    &redisqueue.Config{
      Redis: client,
      Queue: struct {
        Name  string
        Retry int
      }{
        Name:  fmt.Sprintf("{%s}", "YOUR_QUEUE_NAME"),
        Retry: config.Retry,
      },
      Safe: struct {
        AckZSetName string
        TTL         int
      }{
        AckZSetName: fmt.Sprintf("{%s}:ack", "YOUR_ACK_ZSET_NAME"),
        TTL:         config.TTL,
      },
    },
  )

  err := queue.Push("testPayload")
  msg, err := queue.SafePop()
  if msg == nil && err == nil {
    // No message
  }

  if err != nil {
    // Error
  }

  // Signal the message has been processed successfully.
  _ = msg.Ack()
}

How it works

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrAckFailed = errors.New("ack failed")
View Source
var ErrAckNotAvailable = errors.New("ack is not available for this message, only safe popped message can be acked")
View Source
var ErrAckPopEOF = errors.New("EOF while ackPop")
View Source
var ErrAckPopFailed = errors.New("redis queue safePop failed")
View Source
var ErrPopFailed = errors.New("redis queue pop failed")
View Source
var ErrPushFailed = errors.New("redis queue push failed")
View Source
var ErrTimestampUpdateFailed = errors.New("timestamp update failed")
View Source
var ErrUnexpectedType = errors.New("redis queue message is unexpected type")

Functions

This section is empty.

Types

type CommonMessage

type CommonMessage struct {
	// contains filtered or unexported fields
}

CommonMessage is a simple message

func NewCommonMessage

func NewCommonMessage(payload string) *CommonMessage

func (*CommonMessage) Payload

func (msg *CommonMessage) Payload() string

type CommonQueue

type CommonQueue struct {
	Name string
	// contains filtered or unexported fields
}

CommonQueue is a simple queue

func NewCommonQueue

func NewCommonQueue(cfg *Config) *CommonQueue

func (*CommonQueue) Pop

func (q *CommonQueue) Pop() (msg Message, err error)

func (*CommonQueue) Push

func (q *CommonQueue) Push(payload string) (err error)

type Config

type Config struct {
	Redis redisClient

	Queue struct {
		Name  string
		Retry int
	}

	Safe struct {
		AckZSetName string
		TTL         int
	}
}

type Message

type Message interface {
	Payload() string
}

type Queue

type Queue interface {
	Pop() (Message, error)
	Push(payload string) error
}

type SafeMessage

type SafeMessage struct {
	// contains filtered or unexported fields
}

SafeMessage is a message that can be acked. When you receive a message from SafePop, you can ack it. If you don't ack it, the message will be popped again after a while.

func NewSafeMessage

func NewSafeMessage(payload string, queue *SafeQueue) *SafeMessage

func (*SafeMessage) Ack

func (msg *SafeMessage) Ack() error

func (*SafeMessage) Payload

func (msg *SafeMessage) Payload() string

type SafeQueue

type SafeQueue struct {
	Name    string
	AckName string
	// contains filtered or unexported fields
}

func NewSafeQueue

func NewSafeQueue(cfg *Config) *SafeQueue

func (*SafeQueue) Pop

func (q *SafeQueue) Pop() (msg Message, err error)

func (*SafeQueue) Push

func (q *SafeQueue) Push(payload string) (err error)

func (*SafeQueue) SafePop

func (q *SafeQueue) SafePop() (msg *SafeMessage, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL