queue

package
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2021 License: MIT Imports: 23 Imported by: 0

Documentation

Overview

Package queue provides a persistent queue implementation that interplays with the Dispatcher in the contract package.

It is recommended to read documentation on the events package before getting started on the queue package.

Introduction

Queues in go is not as prominent as in some other languages, since go excels at concurrency. However, the persistent queue can still offer some benefit missing from the native mechanism, say go channels. The queued job won't be lost even if the system shutdown. In other word, it means jobs can be retried until success. Plus, it is also possible to queue the execution of a particular job until a lengthy period of time. Useful when you need to implement "send email after 30 days" type of event handler.

Simple Usage

To convert any valid event to a persisted event, use:

pevent := queue.Persist(event)

Like the event package, you don't have to use this helper. Manually create a queueable event by implementing this interface on top of the normal event interface:

type persistent interface {
  Defer() time.Duration
  Decorate(s *PersistedEvent)
}

The PersistentEvent passed to the Decorate method contains the tunable configuration such as maximum retries.

No matter how you create a persisted event, to fire it, send it though a dispatcher. The normal dispatcher in the events package won't work, as a queue implementation is required. Luckily, it is deadly simple to convert a standard dispatcher to a queue.Dispatcher.

queueableDispatcher := queue.WithQueue(&events.SyncDispatcher, &queue.RedisDriver{})
queueableDispatcher.dispatch(pevent)

As you see, how the queue persist the events is subject to the underlying driver. The default driver bundled in this package is the redis driver.

Once the persisted event are stored in the external storage, a goroutine should consume them and pipe the reconstructed event to the listeners. This is done by calling the Consume method of queue.Dispatcher

go dispatcher.Consume(context.Background())

There is no difference between listeners for normal event and listeners for persisted event. They can be used interchangeably. But note if a event is retryable, it is your responsibility to ensure the idempotency. Also, be aware if a persisted event have many listeners, the event is up to retry when any of the listeners fail.

Integrate

The queue package exports configuration in this format:

queue:
  default:
    redisName: default
    parallelism: 3
    checkQueueLengthIntervalSecond: 15

While manually constructing the queue.Dispatcher is absolutely feasible, users can use the bundled dependency provider without breaking a sweat. Using this approach, the life cycle of consumer goroutine will be managed automatically by the core.

var c *core.C
c.Provide(otredis.Providers()) // to provide the redis driver
c.Provide(queue.Providers())

A module is also bundled, providing the queue command (for reloading and flushing).

c.AddModuleFunc(queue.New)

Sometimes there are valid reasons to use more than one queue. Each dispatcher however is bounded to only one queue. To use multiple queues, multiple dispatchers are required. Inject queue.DispatcherMaker to factory a dispatcher with a specific name.

c.Invoke(function(maker queue.DispatcherMaker) {
  dispatcher, err := maker.Make("default")
  // see examples for details
})

Events

When an attempt to execute the event handler failed, two kinds of event will be fired. If the failed event can be retried, "queue.RetryingEvent" will be fired. If not, "queue.AbortedEvent" will be fired.

Metrics

To gain visibility on how the length of the queue, inject a gauge into the core and alias it to queue.Gauge. The queue length of the all internal queues will be periodically reported to metrics collector (Presumably Prometheus).

c.provideDispatcherFactory(di.Deps{func(appName contract.AppName, env contract.Env) queue.Gauge {
  return prometheus.NewGaugeFrom(
    stdprometheus.GaugeOpts{
      Namespace: appName.String(),
      Subsystem: env.String(),
      Owner:      "queue_length",
      Help:      "The gauge of queue length",
    }, []string{"name", "channel"},
  )
}})
Example (Minimum)
package main

import (
	"context"
	"fmt"
	"github.com/DoNewsCode/core/contract"
	"github.com/DoNewsCode/core/events"
	"github.com/DoNewsCode/core/queue"
	"time"
)

func main() {
	dispatcher := events.SyncDispatcher{}
	queueDispatcher := queue.WithQueue(&dispatcher, queue.NewInProcessDriver())
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	var ch = make(chan struct{})
	go queueDispatcher.Consume(ctx)

	queueDispatcher.Subscribe(events.Listen(events.From(1), func(ctx context.Context, event contract.Event) error {
		fmt.Println(event.Data())
		ch <- struct{}{}
		return nil
	}))
	queueDispatcher.Dispatch(ctx, queue.Persist(events.Of(1), queue.Defer(time.Second)))
	queueDispatcher.Dispatch(ctx, queue.Persist(events.Of(2), queue.Defer(time.Hour)))

	<-ch

}
Output:

1

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrEmpty = errors.New("no message available")

ErrEmpty means the queue is empty.

Functions

func Providers added in v0.2.0

func Providers() di.Deps

Providers returns a set of dependencies related to queue. It includes the DispatcherMaker, the Dispatcher and the exported configs.

Depends On:
	contract.ConfigAccessor
	contract.Dispatcher
	Driver        `optional:"true"`
	otredis.Maker `optional:"true"`
	log.Logger
	contract.AppName
	contract.Env
	Gauge `optional:"true"`
Provides:
	DispatcherMaker
	DispatcherFactory
	Dispatcher
	*QueueableDispatcher

func UseGauge

func UseGauge(gauge metrics.Gauge, interval time.Duration) func(*QueueableDispatcher)

UseGauge is an option for WithQueue that collects a gauge metrics

func UseLogger

func UseLogger(logger log.Logger) func(*QueueableDispatcher)

UseLogger is an option for WithQueue that feeds the queue with a Logger of choice.

func UsePacker

func UsePacker(packer Packer) func(*QueueableDispatcher)

UsePacker allows consumer to replace the default Packer with a custom one. UsePacker is an option for WithQueue.

func UseParallelism

func UseParallelism(parallelism int) func(*QueueableDispatcher)

UseParallelism is an option for WithQueue that sets the parallelism for queue consumption

Types

type AbortedEvent

type AbortedEvent struct {
	Err error
	Msg *PersistedEvent
}

AbortedEvent is a contract.Event that triggers when a event is timeout or failed. If the event still has retry attempts remaining, this event won't be triggered.

type ChannelConfig

type ChannelConfig struct {
	Delayed  string
	Failed   string
	Reserved string
	Waiting  string
	Timeout  string
}

ChannelConfig describes the key name of each queue, also known as channel.

type DeferrablePersistentEvent

type DeferrablePersistentEvent struct {
	contract.Event
	// contains filtered or unexported fields
}

DeferrablePersistentEvent is a persisted event.

func Persist

func Persist(event contract.Event, opts ...PersistOption) DeferrablePersistentEvent

Persist converts any contract.Event to DeferrablePersistentEvent. Namely, store them in external storage.

func (DeferrablePersistentEvent) Decorate

Decorate decorates the PersistedEvent of this event by adding some meta info. it is called in the QueueableDispatcher, after the Packer compresses the event.

func (DeferrablePersistentEvent) Defer

Defer defers the execution of the job for the period of time returned.

type Dispatcher

type Dispatcher interface {
	contract.Dispatcher
	Consume(ctx context.Context) error
}

Dispatcher is the key of *QueueableDispatcher in the dependencies graph. Used as a type hint for injection.

type DispatcherFactory

type DispatcherFactory struct {
	*di.Factory
}

DispatcherFactory is a factory for *QueueableDispatcher. Note DispatcherFactory doesn't contain the factory method itself. ie. How to factory a dispatcher left there for users to define. Users then can use this type to create their own dispatcher implementation.

Here is an example on how to create a custom DispatcherFactory with an InProcessDriver.

factory := di.NewFactory(func(name string) (di.Pair, error) {
	queuedDispatcher := queue.WithQueue(
		&events.SyncDispatcher{},
		queue.NewInProcessDriver(),
	)
	return di.Pair{Conn: queuedDispatcher}, nil
})
dispatcherFactory := DispatcherFactory{Factory: factory}

func (DispatcherFactory) Make

Make returns a QueueableDispatcher by the given name. If it has already been created under the same name, the that one will be returned.

type DispatcherMaker

type DispatcherMaker interface {
	Make(string) (*QueueableDispatcher, error)
}

DispatcherMaker is the key of *DispatcherFactory in the dependencies graph. Used as a type hint for injection.

type Driver

type Driver interface {
	// Push pushes the message onto the queue. It is possible to specify a time delay. If so the message
	// will be read after the delay. Use zero value if a delay is not needed.
	Push(ctx context.Context, message *PersistedEvent, delay time.Duration) error
	// Pop pops the message out of the queue. It blocks until a message is available or a timeout is reached.
	Pop(ctx context.Context) (*PersistedEvent, error)
	// Ack acknowledges a message has been processed.
	Ack(ctx context.Context, message *PersistedEvent) error
	// \Fail marks a message has failed.
	Fail(ctx context.Context, message *PersistedEvent) error
	// Reload put failed/timeout message back to the Waiting queue. If the temporary outage have been cleared,
	// messages can be tried again via Reload. Reload is not a normal retry.
	// It similarly gives otherwise dead messages one more chance,
	// but this chance is not subject to the limit of MaxAttempts, nor does it reset the number of time attempted.
	Reload(ctx context.Context, channel string) (int64, error)
	// Flush empties the queue under channel
	Flush(ctx context.Context, channel string) error
	// Info lists QueueInfo by inspecting queues one by one. Useful for metrics and monitor.
	Info(ctx context.Context) (QueueInfo, error)
	// Retry put the message back onto the delayed queue.
	Retry(ctx context.Context, message *PersistedEvent) error
}

Driver is the interface for queue engines. See RedisDriver for usage.

type Gauge

type Gauge metrics.Gauge

Gauge is an alias used for dependency injection

type InProcessDriver

type InProcessDriver struct {
	// contains filtered or unexported fields
}

InProcessDriver is a test replacement for redis driver. It doesn't persist your event in any way, so not suitable for production use.

func NewInProcessDriver

func NewInProcessDriver() *InProcessDriver

NewInProcessDriverWithPopInterval creates an *InProcessDriver for testing

func NewInProcessDriverWithPopInterval

func NewInProcessDriverWithPopInterval(duration time.Duration) *InProcessDriver

NewInProcessDriverWithPopInterval creates an *InProcessDriver with an pop interval.

func (*InProcessDriver) Ack

func (i *InProcessDriver) Ack(ctx context.Context, message *PersistedEvent) error

func (*InProcessDriver) Fail

func (i *InProcessDriver) Fail(ctx context.Context, message *PersistedEvent) error

func (*InProcessDriver) Flush

func (i *InProcessDriver) Flush(ctx context.Context, channel string) error

func (*InProcessDriver) Info

func (i *InProcessDriver) Info(ctx context.Context) (QueueInfo, error)

func (*InProcessDriver) Pop

func (*InProcessDriver) Push

func (i *InProcessDriver) Push(ctx context.Context, message *PersistedEvent, delay time.Duration) error

func (*InProcessDriver) Reload

func (i *InProcessDriver) Reload(ctx context.Context, channel string) (int64, error)

func (*InProcessDriver) Retry

func (i *InProcessDriver) Retry(ctx context.Context, message *PersistedEvent) error

type Module

type Module struct {
	Factory *DispatcherFactory
}

Module exports queue commands, for example queue flush and queue reload.

func New

func New(factory *DispatcherFactory) Module

New creates a new module.

func (Module) ProvideCommand

func (m Module) ProvideCommand(command *cobra.Command)

ProvideCommand implements CommandProvider for the Module. It registers flush and reload command to the parent command.

type Packer

type Packer interface {
	// Compress serializes the message to bytes
	Marshal(message interface{}) ([]byte, error)
	// Decompress reverses the bytes to message
	Unmarshal(data []byte, message interface{}) error
}

The Packer interface describes how to save the message in wire format

type PersistOption

type PersistOption func(event *DeferrablePersistentEvent)

PersistOption defines some options for Persist

func Defer

func Defer(duration time.Duration) PersistOption

Defer is a PersistOption that defers the execution of DeferrablePersistentEvent for the period of time given.

func MaxAttempts

func MaxAttempts(attempts int) PersistOption

MaxAttempts is a PersistOption that defines how many times the event handler can be retried.

func ScheduleAt

func ScheduleAt(t time.Time) PersistOption

ScheduleAt is a PersistOption that defers the execution of DeferrablePersistentEvent until the time given.

func Timeout

func Timeout(timeout time.Duration) PersistOption

Timeout is a PersistOption that defines the maximum time the event can be processed until timeout. Note: this timeout is shared among all listeners.

func UniqueId

func UniqueId(id string) PersistOption

UniqueId is a PersistOption that outsources the generation of uniqueId to the caller.

type PersistedEvent

type PersistedEvent struct {
	// The UniqueId identifies each individual message. Sometimes the message can have exact same content and even
	// exact same Key. UniqueId is used to differentiate them.
	UniqueId string
	// Key is the Message type. Usually it is the string name of the event type before serialized.
	Key string
	// Value is the serialized bytes of the event.
	Value []byte
	// HandleTimeout sets the upper time limit for each run of the handler. If handleTimeout exceeds, the event will
	// be put onto the timeout queue. Note: the timeout is shared among all listeners.
	HandleTimeout time.Duration
	// Backoff sets the duration before next retry.
	Backoff time.Duration
	// Attempts denotes how many retry has been attempted. It starts from 1.
	Attempts int
	// MaxAttempts denotes the maximum number of time the handler can retry before the event is put onto
	// the failed queue.
	// By default, MaxAttempts is 1.
	MaxAttempts int
}

PersistedEvent represents a persisted event.

func (*PersistedEvent) Data

func (s *PersistedEvent) Data() interface{}

Data implements contract.event. It returns the Value.

func (*PersistedEvent) Type

func (s *PersistedEvent) Type() string

Type implements contract.event. It returns the Key.

type QueueInfo

type QueueInfo struct {
	// Waiting is the length of the Waiting queue.
	Waiting int64
	// Delayed is the length of the Delayed queue.
	Delayed int64
	//Timeout is the length of the Timeout queue.
	Timeout int64
	// Failed is the length of the Failed queue.
	Failed int64
}

QueueInfo describes the state of queues.

type QueueableDispatcher

type QueueableDispatcher struct {
	// contains filtered or unexported fields
}

QueueableDispatcher is an extension of the embed dispatcher. It adds the persistent event feature.

func WithQueue

func WithQueue(baseDispatcher contract.Dispatcher, driver Driver, opts ...func(*QueueableDispatcher)) *QueueableDispatcher

WithQueue wraps a QueueableDispatcher and returns a decorated QueueableDispatcher. The latter QueueableDispatcher now can send and listen to "persisted" events. Those persisted events will guarantee at least one execution, as they are stored in an external storage and won't be released until the QueueableDispatcher acknowledges the end of execution.

func (*QueueableDispatcher) Consume

func (d *QueueableDispatcher) Consume(ctx context.Context) error

Consume starts the runner and blocks until context canceled or error occurred.

func (*QueueableDispatcher) Dispatch

func (d *QueueableDispatcher) Dispatch(ctx context.Context, e contract.Event) error

Dispatch dispatches an event. See contract.Dispatcher.

func (*QueueableDispatcher) Driver

func (d *QueueableDispatcher) Driver() Driver

func (*QueueableDispatcher) Subscribe

func (d *QueueableDispatcher) Subscribe(listener contract.Listener)

Subscribe subscribes an event. See contract.Dispatcher.

type RedisDriver

type RedisDriver struct {
	Logger        log.Logger            // Logger is an optional logger. By default a noop logger is used
	RedisClient   redis.UniversalClient // RedisClient is used to communicate with redis
	ChannelConfig ChannelConfig         // ChannelConfig holds the name of redis keys for all queues.
	PopTimeout    time.Duration         // PopTimeout is the BRPOP timeout. ie. How long the pop action will block at most.
	Packer        Packer                // Packer describes how to save the message in wire format
	// contains filtered or unexported fields
}

RedisDriver is a queue driver backed by redis. It is easy to setup, and offers at least once semantic.

func (*RedisDriver) Ack

func (r *RedisDriver) Ack(ctx context.Context, message *PersistedEvent) error

Ack acknowledges a message has been processed.

func (*RedisDriver) Fail

func (r *RedisDriver) Fail(ctx context.Context, message *PersistedEvent) error

Fail marks a message has failed.

func (*RedisDriver) Flush

func (r *RedisDriver) Flush(ctx context.Context, channel string) error

Flush flushes a queue of choice by deleting all its data. Use with caution.

func (*RedisDriver) Info

func (r *RedisDriver) Info(ctx context.Context) (QueueInfo, error)

Info lists QueueInfo by inspecting queues one by one. Useful for metrics and monitor.

func (*RedisDriver) Pop

Pop pops the message out of the queue. It uses BRPOP underneath, so effectively it blocks until a message is available or a timeout is reached.

func (*RedisDriver) Push

func (r *RedisDriver) Push(ctx context.Context, message *PersistedEvent, delay time.Duration) error

Push pushes the message onto the queue. It is possible to specify a time delay. If so the message will be read after the delay. Use zero value if a delay is not needed.

func (*RedisDriver) Reload

func (r *RedisDriver) Reload(ctx context.Context, channel string) (int64, error)

Reload put failed/timeout message back to the Waiting queue. If the temporary outage have been cleared, messages can be tried again via Reload. Reload is not a normal retry. It similarly gives otherwise dead messages one more chance, but this chance is not subject to the limit of MaxAttempts, nor does it reset the number of time attempted.

func (*RedisDriver) Retry

func (r *RedisDriver) Retry(ctx context.Context, message *PersistedEvent) error

Retry put the message back onto the delayed queue. The message will be tried after a period of time specified by Backoff. Note: if one listener failed, all listeners for this event will have to be retried. Make sure your listeners are idempotent as always.

type RetryingEvent

type RetryingEvent struct {
	Err error
	Msg *PersistedEvent
}

RetryingEvent is a contract.Event that triggers when a certain event failed to be processed, and it is up for retry. Note: if retry attempts are exhausted, this event won't be triggered.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL