Documentation ¶
Overview ¶
Package queue provides a persistent queue implementation that interplays with the Dispatcher in the contract package.
It is recommended to read documentation on the events package before getting started on the queue package.
Introduction ¶
Queues in go is not as prominent as in some other languages, since go excels at concurrency. However, the persistent queue can still offer some benefit missing from the native mechanism, say go channels. The queued job won't be lost even if the system shutdown. In other word, it means jobs can be retried until success. Plus, it is also possible to queue the execution of a particular job until a lengthy period of time. Useful when you need to implement "send email after 30 days" type of event handler.
Simple Usage ¶
To convert any valid event to a persisted event, use:
pevent := queue.Persist(event)
Like the event package, you don't have to use this helper. Manually create a queueable event by implementing this interface on top of the normal event interface:
type persistent interface { Defer() time.Duration Decorate(s *PersistedEvent) }
The PersistentEvent passed to the Decorate method contains the tunable configuration such as maximum retries.
No matter how you create a persisted event, to fire it, send it though a dispatcher. The normal dispatcher in the events package won't work, as a queue implementation is required. Luckily, it is deadly simple to convert a standard dispatcher to a queue.Dispatcher.
queueableDispatcher := queue.WithQueue(&events.SyncDispatcher, &queue.RedisDriver{}) queueableDispatcher.dispatch(pevent)
As you see, how the queue persist the events is subject to the underlying driver. The default driver bundled in this package is the redis driver.
Once the persisted event are stored in the external storage, a goroutine should consume them and pipe the reconstructed event to the listeners. This is done by calling the Consume method of queue.Dispatcher
go dispatcher.Consume(context.Background())
There is no difference between listeners for normal event and listeners for persisted event. They can be used interchangeably. But note if a event is retryable, it is your responsibility to ensure the idempotency. Also, be aware if a persisted event have many listeners, the event is up to retry when any of the listeners fail.
Integrate ¶
The queue package exports configuration in this format:
queue: default: redisName: default parallelism: 3 checkQueueLengthIntervalSecond: 15
While manually constructing the queue.Dispatcher is absolutely feasible, users can use the bundled dependency provider without breaking a sweat. Using this approach, the life cycle of consumer goroutine will be managed automatically by the core.
var c *core.C c.Provide(otredis.Providers()) // to provide the redis driver c.Provide(queue.Providers())
A module is also bundled, providing the queue command (for reloading and flushing).
c.AddModuleFunc(queue.New)
Sometimes there are valid reasons to use more than one queue. Each dispatcher however is bounded to only one queue. To use multiple queues, multiple dispatchers are required. Inject queue.DispatcherMaker to factory a dispatcher with a specific name.
c.Invoke(function(maker queue.DispatcherMaker) { dispatcher, err := maker.Make("default") // see examples for details })
Events ¶
When an attempt to execute the event handler failed, two kinds of event will be fired. If the failed event can be retried, "queue.RetryingEvent" will be fired. If not, "queue.AbortedEvent" will be fired.
Metrics ¶
To gain visibility on how the length of the queue, inject a gauge into the core and alias it to queue.Gauge. The queue length of the all internal queues will be periodically reported to metrics collector (Presumably Prometheus).
c.provideDispatcherFactory(di.Deps{func(appName contract.AppName, env contract.Env) queue.Gauge { return prometheus.NewGaugeFrom( stdprometheus.GaugeOpts{ Namespace: appName.String(), Subsystem: env.String(), Owner: "queue_length", Help: "The gauge of queue length", }, []string{"name", "channel"}, ) }})
Example (Minimum) ¶
package main import ( "context" "fmt" "github.com/DoNewsCode/core/contract" "github.com/DoNewsCode/core/events" "github.com/DoNewsCode/core/queue" "time" ) func main() { dispatcher := events.SyncDispatcher{} queueDispatcher := queue.WithQueue(&dispatcher, queue.NewInProcessDriver()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() var ch = make(chan struct{}) go queueDispatcher.Consume(ctx) queueDispatcher.Subscribe(events.Listen(events.From(1), func(ctx context.Context, event contract.Event) error { fmt.Println(event.Data()) ch <- struct{}{} return nil })) queueDispatcher.Dispatch(ctx, queue.Persist(events.Of(1), queue.Defer(time.Second))) queueDispatcher.Dispatch(ctx, queue.Persist(events.Of(2), queue.Defer(time.Hour))) <-ch }
Output: 1
Index ¶
- Variables
- func Providers() di.Deps
- func UseGauge(gauge metrics.Gauge, interval time.Duration) func(*QueueableDispatcher)
- func UseLogger(logger log.Logger) func(*QueueableDispatcher)
- func UsePacker(packer Packer) func(*QueueableDispatcher)
- func UseParallelism(parallelism int) func(*QueueableDispatcher)
- type AbortedEvent
- type ChannelConfig
- type DeferrablePersistentEvent
- type Dispatcher
- type DispatcherFactory
- type DispatcherMaker
- type Driver
- type Gauge
- type InProcessDriver
- func (i *InProcessDriver) Ack(ctx context.Context, message *PersistedEvent) error
- func (i *InProcessDriver) Fail(ctx context.Context, message *PersistedEvent) error
- func (i *InProcessDriver) Flush(ctx context.Context, channel string) error
- func (i *InProcessDriver) Info(ctx context.Context) (QueueInfo, error)
- func (i *InProcessDriver) Pop(ctx context.Context) (*PersistedEvent, error)
- func (i *InProcessDriver) Push(ctx context.Context, message *PersistedEvent, delay time.Duration) error
- func (i *InProcessDriver) Reload(ctx context.Context, channel string) (int64, error)
- func (i *InProcessDriver) Retry(ctx context.Context, message *PersistedEvent) error
- type Module
- type Packer
- type PersistOption
- type PersistedEvent
- type QueueInfo
- type QueueableDispatcher
- type RedisDriver
- func (r *RedisDriver) Ack(ctx context.Context, message *PersistedEvent) error
- func (r *RedisDriver) Fail(ctx context.Context, message *PersistedEvent) error
- func (r *RedisDriver) Flush(ctx context.Context, channel string) error
- func (r *RedisDriver) Info(ctx context.Context) (QueueInfo, error)
- func (r *RedisDriver) Pop(ctx context.Context) (*PersistedEvent, error)
- func (r *RedisDriver) Push(ctx context.Context, message *PersistedEvent, delay time.Duration) error
- func (r *RedisDriver) Reload(ctx context.Context, channel string) (int64, error)
- func (r *RedisDriver) Retry(ctx context.Context, message *PersistedEvent) error
- type RetryingEvent
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrEmpty = errors.New("no message available")
ErrEmpty means the queue is empty.
Functions ¶
func Providers ¶ added in v0.2.0
Providers returns a set of dependencies related to queue. It includes the DispatcherMaker, the Dispatcher and the exported configs.
Depends On: contract.ConfigAccessor contract.Dispatcher Driver `optional:"true"` otredis.Maker `optional:"true"` log.Logger contract.AppName contract.Env Gauge `optional:"true"` Provides: DispatcherMaker DispatcherFactory Dispatcher *QueueableDispatcher
func UseGauge ¶
func UseGauge(gauge metrics.Gauge, interval time.Duration) func(*QueueableDispatcher)
UseGauge is an option for WithQueue that collects a gauge metrics
func UseLogger ¶
func UseLogger(logger log.Logger) func(*QueueableDispatcher)
UseLogger is an option for WithQueue that feeds the queue with a Logger of choice.
func UsePacker ¶
func UsePacker(packer Packer) func(*QueueableDispatcher)
UsePacker allows consumer to replace the default Packer with a custom one. UsePacker is an option for WithQueue.
func UseParallelism ¶
func UseParallelism(parallelism int) func(*QueueableDispatcher)
UseParallelism is an option for WithQueue that sets the parallelism for queue consumption
Types ¶
type AbortedEvent ¶
type AbortedEvent struct { Err error Msg *PersistedEvent }
AbortedEvent is a contract.Event that triggers when a event is timeout or failed. If the event still has retry attempts remaining, this event won't be triggered.
type ChannelConfig ¶
type ChannelConfig struct { Delayed string Failed string Reserved string Waiting string Timeout string }
ChannelConfig describes the key name of each queue, also known as channel.
type DeferrablePersistentEvent ¶
DeferrablePersistentEvent is a persisted event.
func Persist ¶
func Persist(event contract.Event, opts ...PersistOption) DeferrablePersistentEvent
Persist converts any contract.Event to DeferrablePersistentEvent. Namely, store them in external storage.
func (DeferrablePersistentEvent) Decorate ¶
func (d DeferrablePersistentEvent) Decorate(s *PersistedEvent)
Decorate decorates the PersistedEvent of this event by adding some meta info. it is called in the QueueableDispatcher, after the Packer compresses the event.
func (DeferrablePersistentEvent) Defer ¶
func (d DeferrablePersistentEvent) Defer() time.Duration
Defer defers the execution of the job for the period of time returned.
type Dispatcher ¶
type Dispatcher interface { contract.Dispatcher Consume(ctx context.Context) error }
Dispatcher is the key of *QueueableDispatcher in the dependencies graph. Used as a type hint for injection.
type DispatcherFactory ¶
DispatcherFactory is a factory for *QueueableDispatcher. Note DispatcherFactory doesn't contain the factory method itself. ie. How to factory a dispatcher left there for users to define. Users then can use this type to create their own dispatcher implementation.
Here is an example on how to create a custom DispatcherFactory with an InProcessDriver.
factory := di.NewFactory(func(name string) (di.Pair, error) { queuedDispatcher := queue.WithQueue( &events.SyncDispatcher{}, queue.NewInProcessDriver(), ) return di.Pair{Conn: queuedDispatcher}, nil }) dispatcherFactory := DispatcherFactory{Factory: factory}
func (DispatcherFactory) Make ¶
func (s DispatcherFactory) Make(name string) (*QueueableDispatcher, error)
Make returns a QueueableDispatcher by the given name. If it has already been created under the same name, the that one will be returned.
type DispatcherMaker ¶
type DispatcherMaker interface {
Make(string) (*QueueableDispatcher, error)
}
DispatcherMaker is the key of *DispatcherFactory in the dependencies graph. Used as a type hint for injection.
type Driver ¶
type Driver interface { // Push pushes the message onto the queue. It is possible to specify a time delay. If so the message // will be read after the delay. Use zero value if a delay is not needed. Push(ctx context.Context, message *PersistedEvent, delay time.Duration) error // Pop pops the message out of the queue. It blocks until a message is available or a timeout is reached. Pop(ctx context.Context) (*PersistedEvent, error) // Ack acknowledges a message has been processed. Ack(ctx context.Context, message *PersistedEvent) error // \Fail marks a message has failed. Fail(ctx context.Context, message *PersistedEvent) error // Reload put failed/timeout message back to the Waiting queue. If the temporary outage have been cleared, // messages can be tried again via Reload. Reload is not a normal retry. // It similarly gives otherwise dead messages one more chance, // but this chance is not subject to the limit of MaxAttempts, nor does it reset the number of time attempted. Reload(ctx context.Context, channel string) (int64, error) // Flush empties the queue under channel Flush(ctx context.Context, channel string) error // Info lists QueueInfo by inspecting queues one by one. Useful for metrics and monitor. Info(ctx context.Context) (QueueInfo, error) // Retry put the message back onto the delayed queue. Retry(ctx context.Context, message *PersistedEvent) error }
Driver is the interface for queue engines. See RedisDriver for usage.
type InProcessDriver ¶
type InProcessDriver struct {
// contains filtered or unexported fields
}
InProcessDriver is a test replacement for redis driver. It doesn't persist your event in any way, so not suitable for production use.
func NewInProcessDriver ¶
func NewInProcessDriver() *InProcessDriver
NewInProcessDriverWithPopInterval creates an *InProcessDriver for testing
func NewInProcessDriverWithPopInterval ¶
func NewInProcessDriverWithPopInterval(duration time.Duration) *InProcessDriver
NewInProcessDriverWithPopInterval creates an *InProcessDriver with an pop interval.
func (*InProcessDriver) Ack ¶
func (i *InProcessDriver) Ack(ctx context.Context, message *PersistedEvent) error
func (*InProcessDriver) Fail ¶
func (i *InProcessDriver) Fail(ctx context.Context, message *PersistedEvent) error
func (*InProcessDriver) Flush ¶
func (i *InProcessDriver) Flush(ctx context.Context, channel string) error
func (*InProcessDriver) Info ¶
func (i *InProcessDriver) Info(ctx context.Context) (QueueInfo, error)
func (*InProcessDriver) Pop ¶
func (i *InProcessDriver) Pop(ctx context.Context) (*PersistedEvent, error)
func (*InProcessDriver) Push ¶
func (i *InProcessDriver) Push(ctx context.Context, message *PersistedEvent, delay time.Duration) error
func (*InProcessDriver) Retry ¶
func (i *InProcessDriver) Retry(ctx context.Context, message *PersistedEvent) error
type Module ¶
type Module struct {
Factory *DispatcherFactory
}
Module exports queue commands, for example queue flush and queue reload.
func (Module) ProvideCommand ¶
ProvideCommand implements CommandProvider for the Module. It registers flush and reload command to the parent command.
type Packer ¶
type Packer interface { // Compress serializes the message to bytes Marshal(message interface{}) ([]byte, error) // Decompress reverses the bytes to message Unmarshal(data []byte, message interface{}) error }
The Packer interface describes how to save the message in wire format
type PersistOption ¶
type PersistOption func(event *DeferrablePersistentEvent)
PersistOption defines some options for Persist
func Defer ¶
func Defer(duration time.Duration) PersistOption
Defer is a PersistOption that defers the execution of DeferrablePersistentEvent for the period of time given.
func MaxAttempts ¶
func MaxAttempts(attempts int) PersistOption
MaxAttempts is a PersistOption that defines how many times the event handler can be retried.
func ScheduleAt ¶
func ScheduleAt(t time.Time) PersistOption
ScheduleAt is a PersistOption that defers the execution of DeferrablePersistentEvent until the time given.
func Timeout ¶
func Timeout(timeout time.Duration) PersistOption
Timeout is a PersistOption that defines the maximum time the event can be processed until timeout. Note: this timeout is shared among all listeners.
func UniqueId ¶
func UniqueId(id string) PersistOption
UniqueId is a PersistOption that outsources the generation of uniqueId to the caller.
type PersistedEvent ¶
type PersistedEvent struct { // The UniqueId identifies each individual message. Sometimes the message can have exact same content and even // exact same Key. UniqueId is used to differentiate them. UniqueId string // Key is the Message type. Usually it is the string name of the event type before serialized. Key string // Value is the serialized bytes of the event. Value []byte // HandleTimeout sets the upper time limit for each run of the handler. If handleTimeout exceeds, the event will // be put onto the timeout queue. Note: the timeout is shared among all listeners. HandleTimeout time.Duration // Backoff sets the duration before next retry. Backoff time.Duration // Attempts denotes how many retry has been attempted. It starts from 1. Attempts int // MaxAttempts denotes the maximum number of time the handler can retry before the event is put onto // the failed queue. // By default, MaxAttempts is 1. MaxAttempts int }
PersistedEvent represents a persisted event.
func (*PersistedEvent) Data ¶
func (s *PersistedEvent) Data() interface{}
Data implements contract.event. It returns the Value.
func (*PersistedEvent) Type ¶
func (s *PersistedEvent) Type() string
Type implements contract.event. It returns the Key.
type QueueInfo ¶
type QueueInfo struct { // Waiting is the length of the Waiting queue. Waiting int64 // Delayed is the length of the Delayed queue. Delayed int64 //Timeout is the length of the Timeout queue. Timeout int64 // Failed is the length of the Failed queue. Failed int64 }
QueueInfo describes the state of queues.
type QueueableDispatcher ¶
type QueueableDispatcher struct {
// contains filtered or unexported fields
}
QueueableDispatcher is an extension of the embed dispatcher. It adds the persistent event feature.
func WithQueue ¶
func WithQueue(baseDispatcher contract.Dispatcher, driver Driver, opts ...func(*QueueableDispatcher)) *QueueableDispatcher
WithQueue wraps a QueueableDispatcher and returns a decorated QueueableDispatcher. The latter QueueableDispatcher now can send and listen to "persisted" events. Those persisted events will guarantee at least one execution, as they are stored in an external storage and won't be released until the QueueableDispatcher acknowledges the end of execution.
func (*QueueableDispatcher) Consume ¶
func (d *QueueableDispatcher) Consume(ctx context.Context) error
Consume starts the runner and blocks until context canceled or error occurred.
func (*QueueableDispatcher) Driver ¶
func (d *QueueableDispatcher) Driver() Driver
func (*QueueableDispatcher) Subscribe ¶
func (d *QueueableDispatcher) Subscribe(listener contract.Listener)
Subscribe subscribes an event. See contract.Dispatcher.
type RedisDriver ¶
type RedisDriver struct { Logger log.Logger // Logger is an optional logger. By default a noop logger is used RedisClient redis.UniversalClient // RedisClient is used to communicate with redis ChannelConfig ChannelConfig // ChannelConfig holds the name of redis keys for all queues. PopTimeout time.Duration // PopTimeout is the BRPOP timeout. ie. How long the pop action will block at most. Packer Packer // Packer describes how to save the message in wire format // contains filtered or unexported fields }
RedisDriver is a queue driver backed by redis. It is easy to setup, and offers at least once semantic.
func (*RedisDriver) Ack ¶
func (r *RedisDriver) Ack(ctx context.Context, message *PersistedEvent) error
Ack acknowledges a message has been processed.
func (*RedisDriver) Fail ¶
func (r *RedisDriver) Fail(ctx context.Context, message *PersistedEvent) error
Fail marks a message has failed.
func (*RedisDriver) Flush ¶
func (r *RedisDriver) Flush(ctx context.Context, channel string) error
Flush flushes a queue of choice by deleting all its data. Use with caution.
func (*RedisDriver) Info ¶
func (r *RedisDriver) Info(ctx context.Context) (QueueInfo, error)
Info lists QueueInfo by inspecting queues one by one. Useful for metrics and monitor.
func (*RedisDriver) Pop ¶
func (r *RedisDriver) Pop(ctx context.Context) (*PersistedEvent, error)
Pop pops the message out of the queue. It uses BRPOP underneath, so effectively it blocks until a message is available or a timeout is reached.
func (*RedisDriver) Push ¶
func (r *RedisDriver) Push(ctx context.Context, message *PersistedEvent, delay time.Duration) error
Push pushes the message onto the queue. It is possible to specify a time delay. If so the message will be read after the delay. Use zero value if a delay is not needed.
func (*RedisDriver) Reload ¶
Reload put failed/timeout message back to the Waiting queue. If the temporary outage have been cleared, messages can be tried again via Reload. Reload is not a normal retry. It similarly gives otherwise dead messages one more chance, but this chance is not subject to the limit of MaxAttempts, nor does it reset the number of time attempted.
func (*RedisDriver) Retry ¶
func (r *RedisDriver) Retry(ctx context.Context, message *PersistedEvent) error
Retry put the message back onto the delayed queue. The message will be tried after a period of time specified by Backoff. Note: if one listener failed, all listeners for this event will have to be retried. Make sure your listeners are idempotent as always.
type RetryingEvent ¶
type RetryingEvent struct { Err error Msg *PersistedEvent }
RetryingEvent is a contract.Event that triggers when a certain event failed to be processed, and it is up for retry. Note: if retry attempts are exhausted, this event won't be triggered.