redisstream

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2023 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// NoSleep can be set to SubscriberConfig.NackResendSleep
	NoSleep time.Duration = -1

	DefaultBlockTime time.Duration = time.Millisecond * 100

	DefaultClaimInterval time.Duration = time.Second * 5

	// Default max idle time for pending message.
	// After timeout, the message will be claimed and its idle consumer will be removed from consumer group
	DefaultMaxIdleTime time.Duration = time.Second * 60
)
View Source
const UUIDHeaderKey = "_watermill_message_uuid"

Variables

This section is empty.

Functions

This section is empty.

Types

type DefaultMarshallerUnmarshaller

type DefaultMarshallerUnmarshaller struct{}

func (DefaultMarshallerUnmarshaller) Marshal

func (DefaultMarshallerUnmarshaller) Marshal(_ string, msg *message.Message) (map[string]interface{}, error)

func (DefaultMarshallerUnmarshaller) Unmarshal

func (DefaultMarshallerUnmarshaller) Unmarshal(values map[string]interface{}) (msg *message.Message, err error)

type Marshaller

type Marshaller interface {
	Marshal(topic string, msg *message.Message) (map[string]interface{}, error)
}

type MarshallerUnmarshaller

type MarshallerUnmarshaller interface {
	Marshaller
	Unmarshaller
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)

NewPublisher creates a new redis stream Publisher.

func (*Publisher) Close

func (p *Publisher) Close() error

func (*Publisher) Publish

func (p *Publisher) Publish(topic string, msgs ...*message.Message) error

Publish publishes message to redis stream

Publish is blocking and wait for redis response When one of messages delivery fails - function is interrupted.

type PublisherConfig

type PublisherConfig struct {
	Client     redis.UniversalClient
	Marshaller Marshaller
	Maxlens    map[string]int64
}

func (*PublisherConfig) Validate

func (c *PublisherConfig) Validate() error

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)

NewSubscriber creates a new redis stream Subscriber

func (*Subscriber) Close

func (s *Subscriber) Close() error

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

type SubscriberConfig

type SubscriberConfig struct {
	Client redis.UniversalClient

	Unmarshaller Unmarshaller

	// Redis stream consumer id, paired with ConsumerGroup
	Consumer string
	// When empty, fan-out mode will be used
	ConsumerGroup string

	// How long after Nack message should be redelivered
	NackResendSleep time.Duration

	// Block to wait next redis stream message
	BlockTime time.Duration

	// Claim idle pending message interval
	ClaimInterval time.Duration

	// How long should we treat a consumer as offline
	MaxIdleTime time.Duration
}

func (*SubscriberConfig) Validate

func (sc *SubscriberConfig) Validate() error

type Unmarshaller

type Unmarshaller interface {
	Unmarshal(values map[string]interface{}) (msg *message.Message, err error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL