queue

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2021 License: MIT Imports: 25 Imported by: 0

README

core-queue

A simple queue implementation for package Core.

Build Go Reference codecov Go Report Card Sourcegraph

Queues in go is not as prominent as in some other languages, since go excels at handling concurrency. However, the deferrableDecorator queue can still offer some benefit missing from the native mechanism, say go channels. The queued job won't be lost even if the system shutdown. In other words, it means jobs can be retried until success. Plus, it is also possible to queue the execution of a particular job until a lengthy period of time. Useful when you need to implement "send email after 30 days" type of Job handler.

Example

package main

import (
	"context"
	"fmt"
	queue "github.com/DoNewsCode/core-queue"
	"time"
)

type ExampleJob string

func (e ExampleJob) Type() string {
	return "example"
}

func (e ExampleJob) Data() interface{} {
	return e
}

type ExampleListener struct {
	ch chan struct{}
}

func (e *ExampleListener) Listen() queue.Job {
	return ExampleJob("")
}

func (e *ExampleListener) Process(ctx context.Context, job queue.Job) error {
	fmt.Println(job.Data())
	e.ch <- struct{}{}
	return nil
}

func main() {
	queueDispatcher := queue.NewQueue(queue.NewInProcessDriver())
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	var ch = make(chan struct{})
	go queueDispatcher.Consume(ctx)

	queueDispatcher.Subscribe(&ExampleListener{ch: ch})
	queueDispatcher.Dispatch(ctx, queue.Adjust(ExampleJob("foo"), queue.Defer(time.Second)))
	queueDispatcher.Dispatch(ctx, queue.Adjust(ExampleJob("bar"), queue.Defer(time.Hour)))

	<-ch

}

GoDoc

https://pkg.go.dev/github.com/DoNewsCode/core-queue

Documentation

Overview

Package queue provides a deferrableDecorator queue implementation that supports reflectionJob retires and deferred dispatch.

It is recommended to read documentation on the core package before getting started on the queue package.

Introduction

Queues in go is not as prominent as in some other languages, since go excels at handling concurrency. However, the deferrableDecorator queue can still offer some benefit missing from the native mechanism, say go channels. The queued reflectionJob won't be lost even if the system shutdown. In other words, it means jobs can be retried until success. Plus, it is also possible to queue the execution of a particular reflectionJob until a lengthy period of time. Useful when you need to implement "send email after 30 days" type of Job handler.

Simple Usage

First and foremost we should create a reflectionJob, waiting the queue to dispatch. A reflectionJob can be any struct that implements the Job interface.

 type Job interface {
		Type() string
		Data() interface{}
	}

Although the object that implements the reflectionJob interface can be dispatched immediately, it only minimally describes the reflectionJob's property . We can tune the properties with the Adjust helper. For example, we want to run the reflectionJob after 3 minutes with maximum 5 retries:

newJob := queue.Adjust(reflectionJob, queue.Defer(3 * time.Minute), queue.MaxAttempts(5))

Like the Job package, you don't have to use this helper. Manually create a queueable Job by implementing this interface on top of the normal Job interface:

type deferrableDecorator interface {
  Defer() time.Duration
  Decorate(s *PersistedJob)
}

The PersistentJob passed to the Decorate method contains the tunable configuration such as maximum retries.

No matter how you create a persisted Job, to fire it, send it though a dispatcher. The normal dispatcher in the Jobs package won't work, as a queue implementation is required. Luckily, it is deadly simple to convert a standard dispatcher to a queue.JobDispatcher.

queueableDispatcher := queue.NewQueue(&queue.RedisDriver{})
queueableDispatcher.dispatch(newJob)

As you can see, how the queue persist the Jobs is subject to the underlying driver. The default driver bundled in this package is the redis driver.

Once the persisted Job are stored in the external storage, a goroutine should consume them and pipe the reconstructed Job to the listeners. This is done by calling the Consume method JobFrom queue.JobDispatcher

go dispatcher.Consume(context.Background())

Note if a Job is retryable, it is your responsibility to ensure the idempotency. Also, be aware if a persisted Job have many listeners, the Job is up to retry when any of the listeners fail.

Integrate

The queue package exports configuration in this format:

queue:
  default:
    redisName: default
    parallelism: 3
    checkQueueLengthIntervalSecond: 15

While manually constructing the queue.JobDispatcher is absolutely feasible, users can use the bundled dependency provider without breaking a sweat. Using this approach, the life cycle of consumer goroutine will be managed automatically by the core.

var c *core.C
c.Provide(otredis.Providers()) // to provide the redis driver
c.Provide(queue.Providers())

A module is also bundled, providing the queue command (for reloading and flushing).

c.AddModuleFunc(queue.New)

Sometimes there are valid reasons to use more than one queue. Each dispatcher however is bounded to only one queue. To use multiple queues, multiple dispatchers are required. Inject queue.DispatcherMaker to factory a dispatcher with a specific name.

c.Invoke(function(maker queue.DispatcherMaker) {
  dispatcher, err := maker.Make("default")
  // see examples for details
})

Event-based Jobs

When an attempt to execute the Job handler failed, two kinds of special eventDispatcher-based Job will be fired. If the failed Job can be retried, "queue.RetryingJob" will be fired. If not, "queue.AbortedJob" will be fired.

Metrics

To gain visibility on how the length of the queue, inject a gauge into the core and alias it to queue.Gauge. The queue length of the all internal queues will be periodically reported to metrics collector (Presumably Prometheus).

c.provideDispatcherFactory(di.Deps{func(appName contract.AppName, env contract.Env) queue.Gauge {
  return prometheus.NewGaugeFrom(
    stdprometheus.GaugeOpts{
      Namespace: appName.String(),
      Subsystem: env.String(),
      Owner:      "queue_length",
      Help:      "The gauge of queue length",
    }, []string{"name", "channel"},
  )
}})
Example
package main

import (
	"context"
	"fmt"

	queue "github.com/DoNewsCode/core-queue"
)

func main() {
	dispatcher := &queue.SyncDispatcher{}
	// Subscribe to int Job.
	dispatcher.Subscribe(queue.Listen(queue.JobFrom(0), func(ctx context.Context, Job queue.Job) error {
		fmt.Println(Job.Data())
		return nil
	}))
	// Subscribe to string Job.
	dispatcher.Subscribe(queue.Listen(queue.JobFrom(""), func(ctx context.Context, Job queue.Job) error {
		fmt.Println(Job.Data())
		return nil
	}))
	dispatcher.Dispatch(context.Background(), queue.JobFrom(100))
	dispatcher.Dispatch(context.Background(), queue.JobFrom("Job"))
}
Output:

100
Job
Example (Defer)
package main

import (
	"context"
	"fmt"
	"time"

	queue "github.com/DoNewsCode/core-queue"

	"github.com/DoNewsCode/core"
	"github.com/DoNewsCode/core/otredis"
	"github.com/knadh/koanf/parsers/json"
	"github.com/knadh/koanf/providers/rawbytes"
	"github.com/oklog/run"
)

type DeferMockData struct {
	Value string
}

type DeferMockListener struct{}

func (m DeferMockListener) Listen() queue.Job {
	return queue.JobFrom(DeferMockData{})
}

func (m DeferMockListener) Process(_ context.Context, Job queue.Job) error {
	fmt.Println(Job.Data().(DeferMockData).Value)
	return nil
}

// bootstrapMetrics is normally done when bootstrapping the framework. We mimic it here for demonstration.
func bootstrapDefer() *core.C {
	const sampleConfig = "{\"log\":{\"level\":\"error\"},\"queue\":{\"default\":{\"parallelism\":1}}}"
	// Make sure redis is running at localhost:6379
	c := core.New(
		core.WithConfigStack(rawbytes.Provider([]byte(sampleConfig)), json.Parser()),
	)

	// Add ConfProvider
	c.ProvideEssentials()
	c.Provide(otredis.Providers())
	c.Provide(queue.Providers())
	return c
}

// serveMetrics normally lives at serveMetrics command. We mimic it here for demonstration.
func serveDefer(c *core.C, duration time.Duration) {
	var g run.Group

	c.ApplyRunGroup(&g)

	// cancel the run group after some time, so that the program ends. In real project, this is not necessary.
	ctx, cancel := context.WithTimeout(context.Background(), duration)
	defer cancel()
	g.Add(func() error {
		<-ctx.Done()
		return nil
	}, func(err error) {
		cancel()
	})

	err := g.Run()
	if err != nil {
		panic(err)
	}
}

func main() {
	c := bootstrapDefer()

	c.Invoke(func(dispatcher *queue.Queue) {
		// Subscribe
		dispatcher.Subscribe(DeferMockListener{})

		// Trigger an Job
		evt := queue.JobFrom(DeferMockData{Value: "hello world"})
		_ = dispatcher.Dispatch(context.Background(), queue.Adjust(evt, queue.Defer(time.Second)))
		_ = dispatcher.Dispatch(context.Background(), queue.Adjust(evt, queue.Defer(time.Hour)))
	})

	serveDefer(c, 2*time.Second)

}
Output:

hello world
Example (Factory)
package main

import (
	"context"
	"fmt"
	"time"

	queue "github.com/DoNewsCode/core-queue"

	"github.com/DoNewsCode/core"
	"github.com/DoNewsCode/core/otredis"
	"github.com/knadh/koanf/parsers/json"
	"github.com/knadh/koanf/providers/rawbytes"
	"github.com/oklog/run"
)

type MockFactoryData struct {
	Value string
}

type MockFactoryListener struct{}

func (m MockFactoryListener) Listen() queue.Job {
	return queue.JobFrom(MockFactoryData{})
}

func (m MockFactoryListener) Process(_ context.Context, Job queue.Job) error {
	fmt.Println(Job.Data().(MockFactoryData).Value)
	return nil
}

// bootstrapMetrics is normally done when bootstrapping the framework. We mimic it here for demonstration.
func bootstrapFactories() *core.C {
	const sampleConfig = "{\"log\":{\"level\":\"error\"},\"queue\":{\"default\":{\"parallelism\":2},\"myQueue\":{\"parallelism\":1}}}"

	// Make sure redis is running at localhost:6379
	c := core.New(
		core.WithConfigStack(rawbytes.Provider([]byte(sampleConfig)), json.Parser()),
	)

	// Add ConfProvider
	c.ProvideEssentials()
	c.Provide(otredis.Providers())
	c.Provide(queue.Providers())
	return c
}

// serveMetrics normally lives at serve command. We mimic it here for demonstration.
func serveFactories(c *core.C, duration time.Duration) {
	var g run.Group

	c.ApplyRunGroup(&g)

	// cancel the run group after some time, so that the program ends. In real project, this is not necessary.
	ctx, cancel := context.WithTimeout(context.Background(), duration)
	defer cancel()
	g.Add(func() error {
		<-ctx.Done()
		return nil
	}, func(err error) {
		cancel()
	})

	err := g.Run()
	if err != nil {
		panic(err)
	}
}

func main() {
	c := bootstrapFactories()

	c.Invoke(func(maker queue.DispatcherMaker) {
		dispatcher, err := maker.Make("myQueue")
		if err != nil {
			panic(err)
		}
		// Subscribe
		dispatcher.Subscribe(MockFactoryListener{})

		// Trigger an Job
		evt := queue.JobFrom(MockFactoryData{Value: "hello world"})
		_ = dispatcher.Dispatch(context.Background(), queue.Adjust(evt))
	})

	serveFactories(c, 1*time.Second)
}
Output:

hello world
Example (Faulty)
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	queue "github.com/DoNewsCode/core-queue"

	"github.com/DoNewsCode/core"
	"github.com/DoNewsCode/core/otredis"
	"github.com/knadh/koanf/parsers/json"
	"github.com/knadh/koanf/providers/rawbytes"
	"github.com/oklog/run"
)

type FaultyMockData struct {
	Value string
}

type FaultyMockListener struct {
	count int
}

func (m *FaultyMockListener) Listen() queue.Job {
	return queue.JobFrom(FaultyMockData{})
}

func (m *FaultyMockListener) Process(_ context.Context, Job queue.Job) error {
	if m.count < 2 {
		fmt.Println("faulty")
		m.count++
		return errors.New("faulty")
	}
	fmt.Println(Job.Data().(FaultyMockData).Value)
	return nil
}

// bootstrapMetrics is normally done when bootstrapping the framework. We mimic it here for demonstration.
func bootstrapRetry() *core.C {
	const sampleConfig = `{"log":{"level":"error"},"queue":{"default":{"parallelism":1}}}`

	// Make sure redis is running at localhost:6379
	c := core.New(
		core.WithConfigStack(rawbytes.Provider([]byte(sampleConfig)), json.Parser()),
	)

	// Add ConfProvider
	c.ProvideEssentials()
	c.Provide(otredis.Providers())
	c.Provide(queue.Providers())
	return c
}

// serveMetrics normally lives at serveMetrics command. We mimic it here for demonstration.
func serveRetry(c *core.C, duration time.Duration) {
	var g run.Group

	c.ApplyRunGroup(&g)

	// cancel the run group after some time, so that the program ends. In real project, this is not necessary.
	ctx, cancel := context.WithTimeout(context.Background(), duration)
	defer cancel()
	g.Add(func() error {
		<-ctx.Done()
		return nil
	}, func(err error) {
		cancel()
	})

	err := g.Run()
	if err != nil {
		panic(err)
	}
}

func main() {
	c := bootstrapRetry()

	c.Invoke(func(dispatcher *queue.Queue) {
		// Subscribe
		dispatcher.Subscribe(&FaultyMockListener{})

		// Trigger an Job
		evt := queue.JobFrom(FaultyMockData{Value: "hello world"})
		_ = dispatcher.Dispatch(context.Background(), queue.Adjust(evt, queue.MaxAttempts(3)))
	})

	serveRetry(c, 10*time.Second) // retries are made after a random backoff. It may take longer.

}
Output:

faulty
faulty
hello world
Example (Metrics)
package main

import (
	"context"
	"fmt"
	"time"

	queue "github.com/DoNewsCode/core-queue"

	"github.com/DoNewsCode/core"
	"github.com/DoNewsCode/core/contract"
	"github.com/DoNewsCode/core/di"
	"github.com/DoNewsCode/core/otredis"
	"github.com/go-kit/kit/metrics/prometheus"
	"github.com/knadh/koanf/parsers/json"
	"github.com/knadh/koanf/providers/rawbytes"
	"github.com/oklog/run"
	stdprometheus "github.com/prometheus/client_golang/prometheus"
)

type MockMetricsData struct {
	Value string
}

type MockMetricsListener struct{}

func (m MockMetricsListener) Listen() queue.Job {
	return queue.JobFrom(MockMetricsData{})
}

func (m MockMetricsListener) Process(_ context.Context, Job queue.Job) error {
	fmt.Println(Job.Data().(MockMetricsData).Value)
	return nil
}

// bootstrapMetrics is normally done when bootstrapping the framework. We mimic it here for demonstration.
func bootstrapMetrics() *core.C {
	const sampleConfig = "{\"log\":{\"level\":\"error\"},\"queue\":{\"default\":{\"parallelism\":1}}}"

	// Make sure redis is running at localhost:6379
	c := core.New(
		core.WithConfigStack(rawbytes.Provider([]byte(sampleConfig)), json.Parser()),
	)

	// Add ConfProvider
	c.ProvideEssentials()
	c.Provide(otredis.Providers())
	c.Provide(queue.Providers())
	c.Provide(di.Deps{func(appName contract.AppName, env contract.Env) queue.Gauge {
		return prometheus.NewGaugeFrom(
			stdprometheus.GaugeOpts{
				Namespace: appName.String(),
				Subsystem: env.String(),
				Name:      "queue_length",
				Help:      "The gauge JobFrom queue length",
			}, []string{"name", "channel"},
		)
	}})
	return c
}

// serveMetrics normally lives at serveMetrics command. We mimic it here for demonstration.
func serveMetrics(c *core.C, duration time.Duration) {
	var g run.Group

	c.ApplyRunGroup(&g)

	// cancel the run group after some time, so that the program ends. In real project, this is not necessary.
	ctx, cancel := context.WithTimeout(context.Background(), duration)
	defer cancel()
	g.Add(func() error {
		<-ctx.Done()
		return nil
	}, func(err error) {
		cancel()
	})

	err := g.Run()
	if err != nil {
		panic(err)
	}
}

func main() {
	c := bootstrapMetrics()

	c.Invoke(func(dispatcher *queue.Queue) {

		// Subscribe
		dispatcher.Subscribe(MockMetricsListener{})

		// Trigger an Job
		evt := queue.JobFrom(MockMetricsData{Value: "hello world"})
		_ = dispatcher.Dispatch(context.Background(), queue.Adjust(evt))
	})

	serveMetrics(c, time.Second)

}
Output:

hello world
Example (Minimum)
package main

import (
	"context"
	"fmt"
	"time"

	queue "github.com/DoNewsCode/core-queue"
)

type ExampleJob string

func (e ExampleJob) Type() string {
	return "example"
}

func (e ExampleJob) Data() interface{} {
	return e
}

type ExampleListener struct {
	ch chan struct{}
}

func (e *ExampleListener) Listen() queue.Job {
	return ExampleJob("")
}

func (e *ExampleListener) Process(ctx context.Context, job queue.Job) error {
	fmt.Println(job.Data())
	e.ch <- struct{}{}
	return nil
}

func main() {
	queueDispatcher := queue.NewQueue(queue.NewInProcessDriver())
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	var ch = make(chan struct{})
	go queueDispatcher.Consume(ctx)

	queueDispatcher.Subscribe(&ExampleListener{ch: ch})
	queueDispatcher.Dispatch(ctx, queue.Adjust(ExampleJob("foo"), queue.Defer(time.Second)))
	queueDispatcher.Dispatch(ctx, queue.Adjust(ExampleJob("bar"), queue.Defer(time.Hour)))

	<-ch

}
Output:

foo

Index

Examples

Constants

View Source
const (
	// BeforeRetry is an event that triggers when the job failed previously is going to be retried.
	// Note: if retry attempts are exhausted, this event won't be triggered.
	BeforeRetry event = "beforeRetry"
	// BeforeAbort is an event that triggers when the job failed previously is going
	// to be aborted. If the Job still has retry attempts remaining, this event won't
	// be triggered.
	BeforeAbort event = "beforeAbort"
)

Variables

View Source
var ErrEmpty = errors.New("no message available")

ErrEmpty means the queue is empty.

Functions

func Providers

func Providers(optionFunc ...ProvidersOptionFunc) di.Deps

Providers returns a set JobFrom dependencies related to queue. It includes the DispatcherMaker, the JobDispatcher and the exported configs.

Depends On:
	contract.ConfigAccessor
	contract.Dispatcher
	Driver        `optional:"true"`
	otredis.Maker `optional:"true"`
	log.Logger
	contract.AppName
	contract.Env
	Gauge `optional:"true"`
Provides:
	DispatcherMaker
	DispatcherFactory
	JobDispatcher
	*Queue

func UseCodec

func UseCodec(codec contract.Codec) func(*Queue)

UseCodec allows consumer to replace the default Packer with a custom one. UsePacker is an option for NewQueue.

func UseEventDispatcher added in v0.1.1

func UseEventDispatcher(dispatcher contract.Dispatcher) func(*Queue)

UseEventDispatcher is an option for NewQueue to receive events such as fail and retry.

func UseGauge

func UseGauge(gauge metrics.Gauge, interval time.Duration) func(*Queue)

UseGauge is an option for NewQueue that collects a gauge metrics

func UseJobDispatcher added in v0.1.1

func UseJobDispatcher(dispatcher JobDispatcher) func(*Queue)

UseJobDispatcher is an option for NewQueue to swap jobDispatcher dispatcher implementation

func UseLogger

func UseLogger(logger log.Logger) func(*Queue)

UseLogger is an option for NewQueue that feeds the queue with a Logger JobFrom choice.

func UseParallelism

func UseParallelism(parallelism int) func(*Queue)

UseParallelism is an option for NewQueue that sets the parallelism for queue consumption

Types

type BeforeAbortPayload added in v0.1.1

type BeforeAbortPayload struct {
	Err error
	Job *PersistedJob
}

type BeforeRetryPayload added in v0.1.1

type BeforeRetryPayload struct {
	Err error
	Job *PersistedJob
}

type ChannelConfig

type ChannelConfig struct {
	Delayed  string
	Failed   string
	Reserved string
	Waiting  string
	Timeout  string
}

ChannelConfig describes the key name JobFrom each queue, also known as channel.

type ConsumableDispatcher

type ConsumableDispatcher interface {
	JobDispatcher
	Consume(ctx context.Context) error
}

ConsumableDispatcher is the key of *Queue in the dependencies graph. Used as a type hint for injection.

type DeferrablePersistentJob

type DeferrablePersistentJob struct {
	Job
	// contains filtered or unexported fields
}

DeferrablePersistentJob is a persisted Job.

func Adjust

func Adjust(job Job, opts ...PersistOption) DeferrablePersistentJob

Adjust converts any Job to DeferrablePersistentJob. Namely, store them in external storage.

func (DeferrablePersistentJob) Decorate

func (d DeferrablePersistentJob) Decorate(s *PersistedJob)

Decorate decorates the PersistedJob JobFrom this Job by adding some meta info. it is called in the Queue, after the Packer compresses the Job.

func (DeferrablePersistentJob) Defer

Defer defers the execution JobFrom the reflectionJob for the period JobFrom time returned.

type DispatcherFactory

type DispatcherFactory struct {
	*di.Factory
}

DispatcherFactory is a factory for *Queue. Note DispatcherFactory doesn't contain the factory method itself. ie. How to factory a dispatcher left there for users to define. Users then can use this type to create their own dispatcher implementation.

Here is an example on how to create a custom DispatcherFactory with an InProcessDriver.

factory := di.NewFactory(func(name string) (di.Pair, error) {
	queuedDispatcher := queue.NewQueue(
		&Jobs.SyncDispatcher{},
		queue.NewInProcessDriver(),
	)
	return di.Pair{Conn: queuedDispatcher}, nil
})
dispatcherFactory := DispatcherFactory{Factory: factory}

func (DispatcherFactory) Make

func (s DispatcherFactory) Make(name string) (*Queue, error)

Make returns a Queue by the given name. If it has already been created under the same name, the that one will be returned.

type DispatcherMaker

type DispatcherMaker interface {
	Make(string) (*Queue, error)
}

DispatcherMaker is the key of *DispatcherFactory in the dependencies graph. Used as a type hint for injection.

type Driver

type Driver interface {
	// Push pushes the message onto the queue. It is possible to specify a time delay. If so the message
	// will be read after the delay. Use zero value if a delay is not needed.
	Push(ctx context.Context, message *PersistedJob, delay time.Duration) error
	// Pop pops the message out JobFrom the queue. It blocks until a message is available or a timeout is reached.
	Pop(ctx context.Context) (*PersistedJob, error)
	// Ack acknowledges a message has been processed.
	Ack(ctx context.Context, message *PersistedJob) error
	// \Fail marks a message has failed.
	Fail(ctx context.Context, message *PersistedJob) error
	// Reload put failed/timeout message back to the Waiting queue. If the temporary outage have been cleared,
	// messages can be tried again via Reload. Reload is not a normal retry.
	// It similarly gives otherwise dead messages one more chance,
	// but this chance is not subject to the limit JobFrom MaxAttempts, nor does it reset the number JobFrom time attempted.
	Reload(ctx context.Context, channel string) (int64, error)
	// Flush empties the queue under channel
	Flush(ctx context.Context, channel string) error
	// Info lists QueueInfo by inspecting queues one by one. Useful for metrics and monitor.
	Info(ctx context.Context) (QueueInfo, error)
	// Retry put the message back onto the delayed queue.
	Retry(ctx context.Context, message *PersistedJob) error
}

Driver is the interface for queue engines. See RedisDriver for usage.

type DriverArgs added in v0.1.2

type DriverArgs struct {
	Name      string
	Populator contract.DIPopulator
}

DriverArgs are arguments to construct the driver. See WithDriverConstructor.

type Gauge

type Gauge metrics.Gauge

Gauge is an alias used for dependency injection

type Handler

type Handler interface {
	// Listen should return a Job instance with zero value. It tells the dispatcher what type of jobDispatcher this handler is expecting.
	Listen() Job
	// Process will be called when a jobDispatcher is ready from queue.
	Process(ctx context.Context, Job Job) error
}

Handler is the handler for Job.

type InProcessDriver

type InProcessDriver struct {
	// contains filtered or unexported fields
}

InProcessDriver is a test replacement for redis driver. It doesn't persist your Job in any way, so not suitable for production use.

func NewInProcessDriver

func NewInProcessDriver() *InProcessDriver

NewInProcessDriverWithPopInterval creates an *InProcessDriver for testing

func NewInProcessDriverWithPopInterval

func NewInProcessDriverWithPopInterval(duration time.Duration) *InProcessDriver

NewInProcessDriverWithPopInterval creates an *InProcessDriver with an pop interval.

func (*InProcessDriver) Ack

func (i *InProcessDriver) Ack(ctx context.Context, message *PersistedJob) error

func (*InProcessDriver) Fail

func (i *InProcessDriver) Fail(ctx context.Context, message *PersistedJob) error

func (*InProcessDriver) Flush

func (i *InProcessDriver) Flush(ctx context.Context, channel string) error

func (*InProcessDriver) Info

func (i *InProcessDriver) Info(ctx context.Context) (QueueInfo, error)

func (*InProcessDriver) Pop

func (*InProcessDriver) Push

func (i *InProcessDriver) Push(ctx context.Context, message *PersistedJob, delay time.Duration) error

func (*InProcessDriver) Reload

func (i *InProcessDriver) Reload(ctx context.Context, channel string) (int64, error)

func (*InProcessDriver) Retry

func (i *InProcessDriver) Retry(ctx context.Context, message *PersistedJob) error

type Job

type Job interface {
	Type() string
	Data() interface{}
}

func JobFrom

func JobFrom(Job interface{}) Job

JobFrom wraps any struct, making it a valid Job.

type JobDispatcher added in v0.1.1

type JobDispatcher interface {
	Dispatch(ctx context.Context, Job Job) error
	Subscribe(listener Handler)
}

JobDispatcher is the Job registry that is able to send reflectionJob to each Handler.

type ListenFunc

type ListenFunc struct {
	Job Job
	// contains filtered or unexported fields
}

ListenFunc is a listener implemented with a callback.

func Listen

func Listen(job Job, callback func(ctx context.Context, job Job) error) ListenFunc

Listen creates a functional listener in one line.

func (ListenFunc) Listen

func (f ListenFunc) Listen() Job

Listen implement Handler

func (ListenFunc) Process

func (f ListenFunc) Process(ctx context.Context, Job Job) error

Process implements Handler

type Module

type Module struct {
	Factory *DispatcherFactory
}

Module exports queue commands, for example queue flush and queue reload.

func New

func New(factory *DispatcherFactory) Module

New creates a new module.

func (Module) ProvideCommand

func (m Module) ProvideCommand(command *cobra.Command)

ProvideCommand implements CommandProvider for the Module. It registers flush and reload command to the parent command.

type PersistOption

type PersistOption func(Job *DeferrablePersistentJob)

PersistOption defines some options for Adjust

func Defer

func Defer(duration time.Duration) PersistOption

Defer is a PersistOption that defers the execution JobFrom DeferrablePersistentJob for the period JobFrom time given.

func MaxAttempts

func MaxAttempts(attempts int) PersistOption

MaxAttempts is a PersistOption that defines how many times the Job handler can be retried.

func ScheduleAt

func ScheduleAt(t time.Time) PersistOption

ScheduleAt is a PersistOption that defers the execution JobFrom DeferrablePersistentJob until the time given.

func Timeout

func Timeout(timeout time.Duration) PersistOption

Timeout is a PersistOption that defines the maximum time the Job can be processed until timeout. Note: this timeout is shared among all listeners.

func UniqueId

func UniqueId(id string) PersistOption

UniqueId is a PersistOption that outsources the generation JobFrom uniqueId to the caller.

type PersistedJob

type PersistedJob struct {
	// The UniqueId identifies each individual message. Sometimes the message can have exact same content and even
	// exact same Key. UniqueId is used to differentiate them.
	UniqueId string
	// Key is the Message type. Usually it is the string name JobFrom the Job type before serialized.
	Key string
	// Value is the serialized bytes JobFrom the Job.
	Value []byte
	// HandleTimeout sets the upper time limit for each run JobFrom the handler. If handleTimeout exceeds, the Job will
	// be put onto the timeout queue. Note: the timeout is shared among all listeners.
	HandleTimeout time.Duration
	// Backoff sets the duration before next retry.
	Backoff time.Duration
	// Attempts denotes how many retry has been attempted. It starts From 1.
	Attempts int
	// MaxAttempts denotes the maximum number JobFrom time the handler can retry before the Job is put onto
	// the failed queue.
	// By default, MaxAttempts is 1.
	MaxAttempts int
}

PersistedJob represents a persisted Job.

func (*PersistedJob) Data

func (s *PersistedJob) Data() interface{}

Data implements Job. It returns the Value.

func (*PersistedJob) Type

func (s *PersistedJob) Type() string

Type implements Job. It returns the Key.

type ProvidersOptionFunc added in v0.1.1

type ProvidersOptionFunc func(options *providersOption)

ProvidersOptionFunc is the type of functional providersOption for Providers. Use this type to change how Providers work.

func WithDriver added in v0.1.1

func WithDriver(driver Driver) ProvidersOptionFunc

WithDriver instructs the Providers to accept a queue driver different from the default one. This option supersedes the WithDriverConstructor option.

func WithDriverConstructor added in v0.1.1

func WithDriverConstructor(f func(args DriverArgs) (Driver, error)) ProvidersOptionFunc

WithDriverConstructor instructs the Providers to accept an alternative constructor for queue driver. If the WithDriver option is set, this option becomes an no-op.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is an extension JobFrom the embed dispatcher. It adds the deferrableDecorator Job feature.

func NewQueue

func NewQueue(driver Driver, opts ...func(*Queue)) *Queue

NewQueue wraps a Queue and returns a decorated Queue. The latter Queue now can send and listen to "persisted" Jobs. Those persisted Jobs will guarantee at least one execution, as they are stored in an external storage and won't be released until the Queue acknowledges the end JobFrom execution.

func (*Queue) Consume

func (d *Queue) Consume(ctx context.Context) error

Consume starts the runner and blocks until context canceled or error occurred.

func (*Queue) Dispatch

func (d *Queue) Dispatch(ctx context.Context, e Job) error

Dispatch dispatches an Job. See contract.Dispatcher.

func (*Queue) Driver

func (d *Queue) Driver() Driver

func (*Queue) Subscribe

func (d *Queue) Subscribe(handler Handler)

Subscribe subscribes an Job. See contract.Dispatcher.

type QueueInfo

type QueueInfo struct {
	// Waiting is the length JobFrom the Waiting queue.
	Waiting int64
	// Delayed is the length JobFrom the Delayed queue.
	Delayed int64
	//Timeout is the length JobFrom the Timeout queue.
	Timeout int64
	// Failed is the length JobFrom the Failed queue.
	Failed int64
}

QueueInfo describes the state JobFrom queues.

type RedisDriver

type RedisDriver struct {
	Logger        log.Logger            // Logger is an optional logger. By default a noop logger is used
	RedisClient   redis.UniversalClient // RedisClient is used to communicate with redis
	ChannelConfig ChannelConfig         // ChannelConfig holds the name JobFrom redis keys for all queues.
	PopTimeout    time.Duration         // PopTimeout is the BRPOP timeout. ie. How long the pop action will block at most.
	Packer        contract.Codec        // Packer describes how to save the message in wire format
	// contains filtered or unexported fields
}

RedisDriver is a queue driver backed by redis. It is easy to setup, and offers at least once semantic.

func (*RedisDriver) Ack

func (r *RedisDriver) Ack(ctx context.Context, message *PersistedJob) error

Ack acknowledges a message has been processed.

func (*RedisDriver) Fail

func (r *RedisDriver) Fail(ctx context.Context, message *PersistedJob) error

Fail marks a message has failed.

func (*RedisDriver) Flush

func (r *RedisDriver) Flush(ctx context.Context, channel string) error

Flush flushes a queue JobFrom choice by deleting all its data. Use with caution.

func (*RedisDriver) Info

func (r *RedisDriver) Info(ctx context.Context) (QueueInfo, error)

Info lists QueueInfo by inspecting queues one by one. Useful for metrics and monitor.

func (*RedisDriver) Pop

func (r *RedisDriver) Pop(ctx context.Context) (*PersistedJob, error)

Pop pops the message out JobFrom the queue. It uses BRPOP underneath, so effectively it blocks until a message is available or a timeout is reached.

func (*RedisDriver) Push

func (r *RedisDriver) Push(ctx context.Context, message *PersistedJob, delay time.Duration) error

Push pushes the message onto the queue. It is possible to specify a time delay. If so the message will be read after the delay. Use zero value if a delay is not needed.

func (*RedisDriver) Reload

func (r *RedisDriver) Reload(ctx context.Context, channel string) (int64, error)

Reload put failed/timeout message back to the Waiting queue. If the temporary outage have been cleared, messages can be tried again via Reload. Reload is not a normal retry. It similarly gives otherwise dead messages one more chance, but this chance is not subject to the limit JobFrom MaxAttempts, nor does it reset the number JobFrom time attempted.

func (*RedisDriver) Retry

func (r *RedisDriver) Retry(ctx context.Context, message *PersistedJob) error

Retry put the message back onto the delayed queue. The message will be tried after a period JobFrom time specified by Backoff. Note: if one listener failed, all listeners for this Job will have to be retried. Make sure your listeners are idempotent as always.

type SyncDispatcher

type SyncDispatcher struct {
	// contains filtered or unexported fields
}

SyncDispatcher is a contract.Dispatcher implementation that dispatches Jobs synchronously. SyncDispatcher is safe for concurrent use.

func (*SyncDispatcher) Dispatch

func (d *SyncDispatcher) Dispatch(ctx context.Context, Job Job) error

Dispatch dispatches Jobs synchronously. If any listener returns an error, abort the process immediately and return that error to caller.

func (*SyncDispatcher) Subscribe

func (d *SyncDispatcher) Subscribe(listener Handler)

Subscribe subscribes the listener to the dispatcher.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL