disq

package module
v0.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2022 License: MIT Imports: 16 Imported by: 0

README

disq

A custom job-queue library for convoy. Provides in-memory (localstorage) and redis (stream and list) backends only for now.

Features

  • Redis (Stream and List), and in-memory backends.
  • Message Delay
  • Automatic retries

Usage

Create a broker

You can create a single broker for publishing and consuming messages from a queue. You'll need to first create a task and a message though.

import (
    "github.com/frain-dev/disq"
    redisBroker "github.com/frain-dev/disq/brokers/redis"
) 

//Create a Task
var CountHandler, _ = disq.RegisterTask(&disq.TaskOptions{
	Name: "CountHandler",
	Handler: func(name string) error {
		time.Sleep(time.Duration(10) * time.Second)
		fmt.Println("Hello", name)
		return nil
	},
	RetryLimit: 3,
})

//Create a Message
var value = fmt.Sprint("message_", uuid.NewString())
var ctx = context.Background()

var msg := &disq.Message{
    Ctx:      ctx,
    TaskName: CountHandler.Name(),
    Args:     []interface{}{value},
}

// Create a (redis stream) Broker
cfg := redisBroker.RedisConfig{
		Redis:       c, //redis client
		Name:        name, //name of queue
		Concurency:  int32(concurency),
		StreamGroup: "disq:",
	}

var broker = redisBroker.NewStream(&cfg)

// Publish and Consume with the broker
broker.publish(msg)
broker.Consume(ctx)
broker.Stats() //View consumer stats

Create multiple brokers and assign them to a worker

You can create multiple brokers, create a worker and manage those brokers with the worker.

import (
    "github.com/frain-dev/disq"
) 
//Create a worker
var brokers = []disq.Broker{broker1, broker2, broker3}
var w = disq.NewWorker(brokers)

//start processing messages
var err = w.StartAll(ctx)
if err != nil {
    log.Fatal(err)
}

//Get stats from all brokers
for name, broker := range w.GetAllBrokers() {
    var len, _ = broker.Len()
    log.Printf("Broker_%d Queue Size: %+v", name, len)
    log.Printf("Broker_%d Stats: %+v\n\n", i, broker.Stats())
}

Full example

For a full working example see the example folder. To run it;

go run example/publisher/publisher.go 
go run example/consumer/consumer.go

Contributing

Please see CONTRIBUTING for details.

Credits

License

The MIT License (MIT). Please see License File for more information.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Logger *log.Logger

Functions

func BytesToString added in v0.1.4

func BytesToString(b []byte) string

BytesToString converts byte slice to string.

func ConsumerName

func ConsumerName() string

func DurEqual

func DurEqual(d1, d2 time.Duration, threshold int) bool

func ErrorHandler added in v0.1.6

func ErrorHandler(msg *Message, msgErr error, retries *uint32) error

func FormatHandlerError added in v0.1.6

func FormatHandlerError(msg *Message, retrylimit int) error

func Scheduler

func Scheduler(name string, c *redis.Client,
	fn func(ctx context.Context) (int, error))

func SetLogger

func SetLogger(logger *log.Logger)

func StringToBytes added in v0.1.4

func StringToBytes(s string) []byte

StringToBytes converts string to byte slice.

func UnixMs

func UnixMs(tm time.Time) int64

func Version

func Version() string

func WithRedisLock

func WithRedisLock(
	ctx context.Context, name string, redis Redis, fn func(ctx context.Context) error,
) error

Types

type Broker

type Broker interface {
	Consume(context.Context)
	Publish(*Message) error
	Process(*Message) error
	FetchN(context.Context, int, time.Duration) ([]Message, error)
	Delete(*Message) error
	Name() string
	Config() Config
	Stats() *Stats
	Status() bool
	Len() (int, error)
	Stop() error
}

type Config

type Config interface {
	Init() error
}

type DisqError added in v0.1.6

type DisqError interface {
	GetDelay() time.Duration
	GetRateLimit() bool
}

type Error added in v0.1.6

type Error struct {
	RateLimit bool
	Delay     time.Duration
	Err       error
}

func (*Error) Error added in v0.1.6

func (e *Error) Error() string

func (*Error) GetDelay added in v0.1.6

func (e *Error) GetDelay() time.Duration

func (*Error) GetRateLimit added in v0.1.6

func (e *Error) GetRateLimit() bool

type Handler

type Handler interface {
	HandleMessage(msg *Message) error
}

Handler is an interface for processing messages.

func NewHandler

func NewHandler(fn interface{}) Handler

type HandlerFunc

type HandlerFunc func(*Message) error

func (HandlerFunc) HandleMessage

func (fn HandlerFunc) HandleMessage(msg *Message) error

type Message

type Message struct {
	Ctx context.Context `msgpack:"-"`

	ID string `msgpack:"ID"`

	Name string `msgpack:"Name"`

	// Delay specifies the duration the queue must wait
	// before executing the message.
	Delay time.Duration `msgpack:"Delay"`

	Args []interface{} `msgpack:"Args"`

	ArgsBin []byte `msgpack:"ArgsBin"`

	TaskName string `msgpack:"TaskName"`

	RetryCount int `msgpack:"RetryCount"`

	//Execution time need for localstorage delays
	ExecutionTime time.Time `msgpack:"ExecutionTime"`

	Err error `msgpack:"Err"`
}

Message is used as a uniform object for publishing and consuming messages from a queue.

func NewMessage

func NewMessage(ctx context.Context, args ...interface{}) *Message

func (*Message) MarshalArgs added in v0.1.5

func (m *Message) MarshalArgs() ([]byte, error)

func (*Message) MarshalBinary added in v0.1.1

func (m *Message) MarshalBinary() ([]byte, error)

func (*Message) SetDelay

func (m *Message) SetDelay(delay time.Duration)

func (*Message) UnmarshalBinary added in v0.1.1

func (m *Message) UnmarshalBinary(b []byte) error

type MessageRaw

type MessageRaw Message

type Redis

type Redis interface {
	Del(ctx context.Context, keys ...string) *redis.IntCmd
	SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd
	Pipelined(ctx context.Context, fn func(pipe redis.Pipeliner) error) ([]redis.Cmder, error)

	Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd
	EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd
	ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd
	ScriptLoad(ctx context.Context, script string) *redis.StringCmd

	//Stream and ZSET methods
	TxPipeline() redis.Pipeliner
	XAdd(ctx context.Context, a *redis.XAddArgs) *redis.StringCmd
	XDel(ctx context.Context, stream string, ids ...string) *redis.IntCmd
	XLen(ctx context.Context, stream string) *redis.IntCmd
	XRangeN(ctx context.Context, stream, start, stop string, count int64) *redis.XMessageSliceCmd
	XRange(ctx context.Context, stream, start, stop string) *redis.XMessageSliceCmd
	XGroupCreateMkStream(ctx context.Context, stream, group, start string) *redis.StatusCmd
	XReadGroup(ctx context.Context, a *redis.XReadGroupArgs) *redis.XStreamSliceCmd
	XAck(ctx context.Context, stream, group string, ids ...string) *redis.IntCmd
	XInfoStream(ctx context.Context, key string) *redis.XInfoStreamCmd
	XPendingExt(ctx context.Context, a *redis.XPendingExtArgs) *redis.XPendingExtCmd
	XPending(ctx context.Context, stream, group string) *redis.XPendingCmd
	XTrim(ctx context.Context, key string, maxLen int64) *redis.IntCmd
	XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *redis.IntCmd
	ZAdd(ctx context.Context, key string, members ...*redis.Z) *redis.IntCmd
	ZRangeByScore(ctx context.Context, key string, opt *redis.ZRangeBy) *redis.StringSliceCmd
	ZRem(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
	XInfoConsumers(ctx context.Context, key string, group string) *redis.XInfoConsumersCmd
	//List methods
	LIndex(ctx context.Context, key string, index int64) *redis.StringCmd
	LLen(ctx context.Context, key string) *redis.IntCmd
	LPop(ctx context.Context, key string) *redis.StringCmd
	LPopCount(ctx context.Context, key string, count int) *redis.StringSliceCmd
	LPush(ctx context.Context, key string, values ...interface{}) *redis.IntCmd
	LRange(ctx context.Context, key string, start, stop int64) *redis.StringSliceCmd
	LRem(ctx context.Context, key string, count int64, value interface{}) *redis.IntCmd
	LTrim(ctx context.Context, key string, start, stop int64) *redis.StatusCmd
}

type Stats

type Stats struct {
	Name      string
	Processed uint32
	Retries   uint32
	Fails     uint32
}

type Task

type Task struct {
	// contains filtered or unexported fields
}

func NewTask

func NewTask(opt *TaskOptions) *Task

func RegisterTask

func RegisterTask(opt *TaskOptions) (*Task, error)

func (*Task) HandleMessage

func (t *Task) HandleMessage(msg *Message) error

func (*Task) Name

func (t *Task) Name() string

func (*Task) RetryLimit

func (t *Task) RetryLimit() int

type TaskMap

type TaskMap struct {
	// contains filtered or unexported fields
}
var Tasks TaskMap

func (*TaskMap) LoadTask

func (s *TaskMap) LoadTask(name string) (*Task, error)

func (*TaskMap) RegisterTasks

func (s *TaskMap) RegisterTasks(opts *TaskOptions) (*Task, error)

func (*TaskMap) UpdateTask added in v0.1.8

func (s *TaskMap) UpdateTask(name string, task *Task) error

type TaskOptions

type TaskOptions struct {
	Name string

	Handler interface{}

	RetryLimit int
}

Directories

Path Synopsis
brokers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL