libmq

package
v0.2.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2024 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ProviderSet = wire.NewSet(NewMQ)

Functions

This section is empty.

Types

type MQ

type MQ struct {
	// contains filtered or unexported fields
}

func NewMQ

func NewMQ(c *conf.MQ, dbc *conf.Database, cachec *conf.Cache, app *libapp.Settings) (*MQ, func(), error)

func (*MQ) Publish

func (a *MQ) Publish(ctx context.Context, topic string, payload []byte) error

func (*MQ) RegisterTopic

func (a *MQ) RegisterTopic(topic TopicInterface) error

RegisterTopic register Topic to MQ If a message keep fail after retry, It will be sent to the PoisonQueue. PoisonQueue will retry messages in a very low rate.

func (*MQ) Start

func (a *MQ) Start(ctx context.Context) error

func (*MQ) Stop

func (a *MQ) Stop(ctx context.Context) error

type Option added in v0.1.0

type Option func(o *Options)

Option represents a store option function.

func WithConsumePoisoned added in v0.1.0

func WithConsumePoisoned() Option

type Options added in v0.1.0

type Options struct {
	ConsumePoisoned bool
}

type Topic

type Topic[T any] struct {
	// contains filtered or unexported fields
}

func NewTopic

func NewTopic[T any](topic string, consumerFunc func(context.Context, *T) error, opts ...Option) *Topic[T]

func (*Topic[T]) Consume

func (t *Topic[T]) Consume(ctx context.Context, i []byte) error

func (*Topic[T]) GetOptions added in v0.1.0

func (t *Topic[T]) GetOptions() *Options

func (*Topic[T]) LocalCall added in v0.2.1

func (t *Topic[T]) LocalCall(ctx context.Context, i T) error

func (*Topic[T]) Name

func (t *Topic[T]) Name() string

func (*Topic[T]) Publish added in v0.1.0

func (t *Topic[T]) Publish(ctx context.Context, i T) error

func (*Topic[T]) SetMQ

func (t *Topic[T]) SetMQ(mq *MQ)

type TopicInterface added in v0.1.0

type TopicInterface interface {
	Name() string
	Consume(context.Context, []byte) error
	SetMQ(*MQ)
	GetOptions() *Options
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL