mq

package
v2.0.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2024 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Earliest = `0`
	Latest   = `$`
)

Variables

View Source
var (
	ErrGroupEmpty         = errors.New("group cannot be empty")
	ErrConsumerEmpty      = errors.New("consumer cannot be empty")
	ErrConsumerTopicEmpty = errors.New("consumer topic cannot be empty")
)

Functions

func IsErrBusyGroup

func IsErrBusyGroup(err error) bool

Types

type Data

type Data map[string]string

type Event

type Event struct {
	Name string `json:"n"`
	Id   string `json:"i"`
	Data string `json:"d"`
}

func Msg2Event

func Msg2Event(msg redis.XMessage) Event

func (Event) AsMsg

func (e Event) AsMsg() redis.XMessage

func (Event) ToMsg

func (e Event) ToMsg() redis.XMessage

type Mq

type Mq struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(opt Option, pool *redis.Client) (*Mq, error)

func NewProducer

func NewProducer(opt Option, pool *redis.Client) *Mq

func (*Mq) Close

func (mq *Mq) Close(timeout time.Duration) error

func (*Mq) Commit

func (mq *Mq) Commit(ctx context.Context, topic string, idList ...string) (okCount int64, err error)

func (*Mq) Consume

func (mq *Mq) Consume(maxCount int64, blockTime time.Duration, startId string) (msgChan <-chan []Msg, err error)

func (*Mq) FullInfo

func (mq *Mq) FullInfo(ctx context.Context, topic string, count int) (info *redis.XInfoStreamFull, err error)

func (*Mq) Group

func (mq *Mq) Group(ctx context.Context, startId string) (err error)

func (*Mq) Info

func (mq *Mq) Info(ctx context.Context, topic string) (info *redis.XInfoStream, err error)

func (*Mq) Pending

func (mq *Mq) Pending(ctx context.Context, topic string, minIdleDuration time.Duration, count int64) (list []redis.XPendingExt, err error)

func (*Mq) PendingWithConsumer

func (mq *Mq) PendingWithConsumer(ctx context.Context, topic string, minIdleDuration time.Duration, count int64) (list []redis.XPendingExt, err error)

func (*Mq) PipeSend

func (mq *Mq) PipeSend(ctx context.Context, msgList ...Msg) (docIdList []string, err error)

func (*Mq) Send

func (mq *Mq) Send(ctx context.Context, msg Msg) (docId string, err error)

func (*Mq) Trigger

func (mq *Mq) Trigger(ctx context.Context, topic string, events ...Event) (docIdList []string, err error)

type Msg

type Msg struct {
	Topic string
	XMsg  redis.XMessage
}

type Option

type Option struct {
	Group               string `json:"group" yaml:"group"`
	Consumer            string `json:"consumer" yaml:"consumer"`
	ConsumerTopic       string `json:"consumerTopic" yaml:"consumerTopic"`
	ChanSize            int64  `json:"chanSize" yaml:"chanSize"`
	RetryIntervalSecond int64  `json:"retryIntervalSecond" yaml:"retryIntervalSecond"`
	AutoCommit          bool   `json:"autoCommit" yaml:"autoCommit"`
	MaxLength           int64  `json:"maxLength" yaml:"maxLength"`
	MsgMinIdleSecond    int64  `json:"msgMinIdleSecond" yaml:"msgMinIdleSecond"`
}

func (Option) MsgMinIdle

func (o Option) MsgMinIdle() time.Duration

func (Option) RetryDuration

func (o Option) RetryDuration() time.Duration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL