redisstream

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2024 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// NoSleep can be set to SubscriberConfig.NackResendSleep
	NoSleep time.Duration = -1

	DefaultBlockTime = time.Millisecond * 100

	DefaultClaimInterval = time.Second * 5

	DefaultClaimBatchSize = int64(100)

	DefaultMaxIdleTime = time.Second * 60

	DefaultCheckConsumersInterval = time.Second * 300
	DefaultConsumerTimeout        = time.Second * 600
)
View Source
const UUIDHeaderKey = "_watermill_message_uuid"

Variables

This section is empty.

Functions

This section is empty.

Types

type DefaultMarshallerUnmarshaller

type DefaultMarshallerUnmarshaller struct{}

func (DefaultMarshallerUnmarshaller) Marshal

func (DefaultMarshallerUnmarshaller) Marshal(_ string, msg *message.Message) (map[string]interface{}, error)

func (DefaultMarshallerUnmarshaller) Unmarshal

func (DefaultMarshallerUnmarshaller) Unmarshal(values map[string]interface{}) (msg *message.Message, err error)

type Marshaller

type Marshaller interface {
	Marshal(topic string, msg *message.Message) (map[string]interface{}, error)
}

type MarshallerUnmarshaller

type MarshallerUnmarshaller interface {
	Marshaller
	Unmarshaller
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)

NewPublisher creates a new redis stream Publisher.

func (*Publisher) Close

func (p *Publisher) Close() error

func (*Publisher) Publish

func (p *Publisher) Publish(topic string, msgs ...*message.Message) error

Publish publishes message to redis stream

Publish is blocking and waits for redis response. When any of messages delivery fails - function is interrupted.

type PublisherConfig

type PublisherConfig struct {
	Client     redis.UniversalClient
	Marshaller Marshaller
	Maxlens    map[string]int64
}

func (*PublisherConfig) Validate

func (c *PublisherConfig) Validate() error

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)

NewSubscriber creates a new redis stream Subscriber.

func (*Subscriber) Close

func (s *Subscriber) Close() error

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

type SubscriberConfig

type SubscriberConfig struct {
	Client redis.UniversalClient

	Unmarshaller Unmarshaller

	// Redis stream consumer id, paired with ConsumerGroup.
	Consumer string
	// When empty, fan-out mode will be used.
	ConsumerGroup string

	// How long after Nack message should be redelivered.
	NackResendSleep time.Duration

	// Block to wait next redis stream message.
	BlockTime time.Duration

	// Claim idle pending message interval.
	ClaimInterval time.Duration

	// How many pending messages are claimed at most each claim interval.
	ClaimBatchSize int64

	// How long should we treat a pending message as claimable.
	MaxIdleTime time.Duration

	// Check consumer status interval.
	CheckConsumersInterval time.Duration

	// After this timeout an idle consumer with no pending messages will be removed from the consumer group.
	ConsumerTimeout time.Duration

	// Start consumption from the specified message ID.
	// When using "0", the consumer group will consume from the very first message.
	// When using "$", the consumer group will consume from the latest message.
	OldestId string

	// If this is set, it will be called to decide whether a pending message that
	// has been idle for more than MaxIdleTime should actually be claimed.
	// If this is not set, then all pending messages that have been idle for more than MaxIdleTime will be claimed.
	// This can be useful e.g. for tasks where the processing time can be very variable -
	// so we can't just use a short MaxIdleTime; but at the same time dead
	// consumers should be spotted quickly - so we can't just use a long MaxIdleTime either.
	// In such cases, if we have another way for checking consumers' health, then we can
	// leverage that in this callback.
	ShouldClaimPendingMessage func(redis.XPendingExt) bool
}

func (*SubscriberConfig) Validate

func (sc *SubscriberConfig) Validate() error

type Unmarshaller

type Unmarshaller interface {
	Unmarshal(values map[string]interface{}) (msg *message.Message, err error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL