 Documentation
      ¶
      Documentation
      ¶
    
    
  
    
  
    Overview ¶
Package msgqueue implements task/job queue with in-memory, SQS, IronMQ backends.
go-msgqueue is a thin wrapper for SQS and IronMQ clients that uses Redis to implement rate limiting and call once semantic.
go-msgqueue consists of following components:
- memqueue - in memory queue that can be used for local unit testing.
- azsqs - Amazon SQS backend.
- ironmq - IronMQ backend.
- Manager - provides common interface for creating new queues.
- Processor - queue processor that works with memqueue, azsqs, and ironmq.
rate limiting is implemented in the processor package using https://github.com/go-redis/redis_rate. Call once is implemented in clients by checking if message name exists in Redis database.
Example (CustomRateLimit) ¶
package main
import (
	"fmt"
	"time"
	"github.com/go-msgqueue/msgqueue"
	"github.com/go-msgqueue/msgqueue/memqueue"
)
type RateLimitError string
func (e RateLimitError) Error() string {
	return string(e)
}
func (RateLimitError) Delay() time.Duration {
	return 3 * time.Second
}
func main() {
	start := time.Now()
	q := memqueue.NewQueue(&msgqueue.Options{
		Handler: func() error {
			fmt.Println("retried in", timeSince(start))
			return RateLimitError("calm down")
		},
		RetryLimit: 2,
		MinBackoff: time.Millisecond,
	})
	q.Call()
	// Wait for all messages to be processed.
	_ = q.Close()
}
Output: retried in 0s retried in 3s
Example (MaxWorkers) ¶
start := time.Now()
q := memqueue.NewQueue(&msgqueue.Options{
	Handler: func() {
		fmt.Println(timeSince(start))
		time.Sleep(time.Second)
	},
	Redis:       redisRing(),
	WorkerLimit: 1,
})
for i := 0; i < 3; i++ {
	q.Call()
}
// Wait for all messages to be processed.
_ = q.Close()
Output: 0s 1s 2s
Example (MessageDelay) ¶
package main
import (
	"fmt"
	"math"
	"time"
	"github.com/go-msgqueue/msgqueue"
	"github.com/go-msgqueue/msgqueue/memqueue"
)
func timeSince(start time.Time) time.Duration {
	secs := float64(time.Since(start)) / float64(time.Second)
	return time.Duration(math.Floor(secs)) * time.Second
}
func main() {
	start := time.Now()
	q := memqueue.NewQueue(&msgqueue.Options{
		Handler: func() {
			fmt.Println("processed with delay", timeSince(start))
		},
	})
	msg := msgqueue.NewMessage()
	msg.Delay = time.Second
	q.Add(msg)
	// Wait for all messages to be processed.
	_ = q.Close()
}
Output: processed with delay 1s
Example (Once) ¶
q := memqueue.NewQueue(&msgqueue.Options{
	Handler: func(name string) {
		fmt.Println("hello", name)
	},
	Redis:     redisRing(),
	RateLimit: rate.Every(time.Second),
})
for i := 0; i < 10; i++ {
	// Call once in a second.
	q.CallOnce(time.Second, "world")
}
// Wait for all messages to be processed.
_ = q.Close()
Output: hello world
Example (RateLimit) ¶
start := time.Now()
q := memqueue.NewQueue(&msgqueue.Options{
	Handler:   func() {},
	Redis:     redisRing(),
	RateLimit: rate.Every(time.Second),
})
const n = 5
for i := 0; i < n; i++ {
	q.Call()
}
// Wait for all messages to be processed.
_ = q.Close()
fmt.Printf("%d msg/s", timeSinceCeil(start)/time.Second/n)
Output: 1 msg/s
Example (RetryOnError) ¶
package main
import (
	"errors"
	"fmt"
	"math"
	"time"
	"github.com/go-msgqueue/msgqueue"
	"github.com/go-msgqueue/msgqueue/memqueue"
)
func timeSince(start time.Time) time.Duration {
	secs := float64(time.Since(start)) / float64(time.Second)
	return time.Duration(math.Floor(secs)) * time.Second
}
func main() {
	start := time.Now()
	q := memqueue.NewQueue(&msgqueue.Options{
		Handler: func() error {
			fmt.Println("retried in", timeSince(start))
			return errors.New("fake error")
		},
		RetryLimit: 3,
		MinBackoff: time.Second,
	})
	q.Call()
	// Wait for all messages to be processed.
	_ = q.Close()
}
Output: retried in 0s retried in 1s retried in 3s
Index ¶
- Variables
- func SetLogger(logger *log.Logger)
- type Batcher
- type BatcherOptions
- type Delayer
- type Handler
- type HandlerFunc
- type Manager
- type Message
- type Options
- type Processor
- func (p *Processor) Add(msg *Message) error
- func (p *Processor) Len() int
- func (p *Processor) Options() *Options
- func (p *Processor) Process(msg *Message) error
- func (p *Processor) ProcessAll() error
- func (p *Processor) ProcessOne() error
- func (p *Processor) Purge() error
- func (p *Processor) Put(msg *Message) error
- func (p *Processor) Queue() Queue
- func (p *Processor) Start() error
- func (p *Processor) Stats() *ProcessorStats
- func (p *Processor) Stop() error
- func (p *Processor) StopTimeout(timeout time.Duration) error
- func (p *Processor) String() string
 
- type ProcessorStats
- type Queue
- type RateLimiter
- type Redis
- type Storage
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrDuplicate = errors.New("queue: message with such name already exists")
    Functions ¶
Types ¶
type Batcher ¶ added in v1.3.0
type Batcher struct {
	// contains filtered or unexported fields
}
    Batcher collects messages for later batch processing.
func NewBatcher ¶ added in v1.3.0
func NewBatcher(p *Processor, opt *BatcherOptions) *Batcher
type BatcherOptions ¶ added in v1.3.0
type Handler ¶
Handler is an interface for processing messages.
func NewHandler ¶
func NewHandler(fn interface{}) Handler
    type HandlerFunc ¶
func (HandlerFunc) HandleMessage ¶
func (fn HandlerFunc) HandleMessage(msg *Message) error
type Manager ¶ added in v1.3.0
Manager is an interface that abstracts creation of new queues. It is implemented in subpackages memqueue, azsqs, and ironmq.
type Message ¶
type Message struct {
	// SQS/IronMQ message id.
	Id string
	// Optional name for the message. Messages with the same name
	// are processed only once.
	Name string
	// Delay specifies the duration the queue must wait
	// before executing the message.
	Delay time.Duration
	// Function args passed to the handler.
	Args []interface{}
	// Text representation of the Args.
	Body string
	// SQS/IronMQ reservation id that is used to release/delete the message..
	ReservationId string
	// The number of times the message has been reserved or released.
	ReservedCount int
	Err error
	// contains filtered or unexported fields
}
    Message is used to create and retrieve messages from a queue.
func NewMessage ¶
func NewMessage(args ...interface{}) *Message
    func (*Message) EncodeArgs ¶ added in v1.3.10
func (*Message) SetDelayName ¶
SetDelayName sets delay and generates message name from the args.
type Options ¶
type Options struct {
	// Queue name.
	Name string
	// Queue group name.
	GroupName string
	// Function called to process a message.
	Handler interface{}
	// Function called to process failed message.
	FallbackHandler interface{}
	// Maximum number of goroutines processing messages.
	// Default is 32 * number of CPUs.
	MaxWorkers int
	// Maximum number of goroutines fetching messages.
	// Default is 4 * number of CPUs.
	MaxFetchers int
	// Global limit of concurrently running workers. Overrides MaxWorkers.
	WorkerLimit int
	// Size of the buffer where reserved messages are stored.
	// Default is 10 messages.
	BufferSize int
	// Number of messages reserved in the queue in 1 request.
	// Default is 10.
	ReservationSize int
	// Time after which the reserved message is returned to the queue.
	ReservationTimeout time.Duration
	// Time that a long polling receive call waits for a message to become
	// available before returning an empty response.
	// Default is 10 seconds.
	WaitTimeout time.Duration
	// Number of tries/releases after which the message fails permanently
	// and is deleted.
	RetryLimit int
	// Minimum backoff time between retries.
	MinBackoff time.Duration
	// Maximum backoff time between retries.
	MaxBackoff time.Duration
	// Number of consecutive failures after which queue processing is paused.
	// Default is 100 failures.
	PauseErrorsThreshold int
	// Processing rate limit.
	RateLimit rate.Limit
	// Redis client that is used for storing metadata.
	Redis Redis
	// Optional storage interface. The default is to use Redis.
	Storage Storage
	// Optional rate limiter interface. The default is to use Redis.
	RateLimiter RateLimiter
	// contains filtered or unexported fields
}
    type Processor ¶ added in v1.3.0
type Processor struct {
	// contains filtered or unexported fields
}
    Processor reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.
func NewProcessor ¶ added in v1.3.0
New creates new Processor for the queue using provided processing options.
func StartProcessor ¶ added in v1.3.0
Starts creates new Processor and starts it.
func (*Processor) Process ¶ added in v1.3.0
Process is low-level API to process message bypassing the internal queue.
func (*Processor) ProcessAll ¶ added in v1.3.0
ProcessAll starts workers to process messages in the queue and then stops them when all messages are processed.
func (*Processor) ProcessOne ¶ added in v1.3.0
ProcessOne processes at most one message in the queue.
func (*Processor) Stats ¶ added in v1.3.0
func (p *Processor) Stats() *ProcessorStats
Stats returns processor stats.
func (*Processor) StopTimeout ¶ added in v1.3.0
StopTimeout waits workers for timeout duration to finish processing current messages and stops workers.
type ProcessorStats ¶ added in v1.3.0
type Queue ¶ added in v1.3.0
type Queue interface {
	Name() string
	Processor() *Processor
	Add(msg *Message) error
	Call(args ...interface{}) error
	CallOnce(dur time.Duration, args ...interface{}) error
	Len() (int, error)
	ReserveN(n int) ([]*Message, error)
	Release(*Message) error
	Delete(msg *Message) error
	Purge() error
	Close() error
	CloseTimeout(timeout time.Duration) error
}
    type RateLimiter ¶
type Redis ¶
type Redis interface {
	Del(keys ...string) *redis.IntCmd
	SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
	SAdd(key string, members ...interface{}) *redis.IntCmd
	SMembers(key string) *redis.StringSliceCmd
	Pipelined(func(pipe redis.Pipeliner) error) ([]redis.Cmder, error)
	Eval(script string, keys []string, args ...interface{}) *redis.Cmd
	Publish(channel string, message interface{}) *redis.IntCmd
}