rediszset

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 2, 2024 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// NoSleep can be set to SubscriberConfig.NackResendSleep
	NoSleep time.Duration = -1

	DefaultBlockTime = time.Millisecond * 100

	DefaultClaimInterval = time.Second * 5

	DefaultClaimBatchSize = int64(100)

	DefaultMaxIdleTime = time.Second * 60

	DefaultCheckConsumersInterval = time.Second * 300
	DefaultConsumerTimeout        = time.Second * 600
	DefaultConsumerLockExpiration = 300
	DefaultZRangeCount            = 100
)
View Source
const DelayKey = "_delay"
View Source
const UUIDHeaderKey = "_watermill_message_uuid"

Variables

This section is empty.

Functions

This section is empty.

Types

type DefaultMarshallerUnmarshaller

type DefaultMarshallerUnmarshaller struct{}

func (DefaultMarshallerUnmarshaller) Marshal

func (DefaultMarshallerUnmarshaller) Unmarshal

type Lock

type Lock struct {
	// contains filtered or unexported fields
}

func NewLock

func NewLock(ctx context.Context, client redis.UniversalClient, lockKey string, seconds int64) (*Lock, bool, error)

func (*Lock) Close

func (l *Lock) Close(ctx context.Context) error

type Marshaller

type Marshaller interface {
	Marshal(topic string, msg *message.Message) (string, error)
}

Marshaler marshals Watermill's message to Kafka message.

type MarshallerUnmarshaller

type MarshallerUnmarshaller interface {
	Marshaller
	Unmarshaller
}

type Member

type Member struct {
	Topic string  `json:"topic"`
	UUID  string  `json:"uuid"`
	Score float64 `json:"score"`

	// Metadata contains the message metadata.
	//
	// Can be used to store data which doesn't require unmarshalling the entire payload.
	// It is something similar to HTTP request's headers.
	//
	// Metadata is marshaled and will be saved to the PubSub.
	Metadata message.Metadata `json:"metadata"`

	// Payload is the message's payload.
	Payload message.Payload `json:"payload"`
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)

NewPublisher creates a new redis stream Publisher.

func (*Publisher) Close

func (p *Publisher) Close() error

func (*Publisher) Publish

func (p *Publisher) Publish(topic string, msgs ...*message.Message) error

Publish publishes message to redis stream

Publish is blocking and waits for redis response. When any of messages delivery fails - function is interrupted.

type PublisherConfig

type PublisherConfig struct {
	Client     redis.UniversalClient
	Marshaller Marshaller
	Maxlens    map[string]int64
}

func (*PublisherConfig) Validate

func (c *PublisherConfig) Validate() error

type ReadChannel

type ReadChannel struct {
	Z    *redis.Z
	Lock *Lock
}

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)

NewSubscriber creates a new redis stream Subscriber.

func (*Subscriber) Close

func (s *Subscriber) Close() error

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

type SubscriberConfig

type SubscriberConfig struct {
	Client redis.UniversalClient

	Unmarshaller Unmarshaller

	// Redis stream consumer id, paired with ConsumerGroup.
	Consumer string

	// How long after Nack message should be redelivered.
	NackResendSleep time.Duration

	// Block to wait next redis stream message.
	BlockTime time.Duration

	// Claim idle pending message interval.
	ClaimInterval time.Duration

	// How many pending messages are claimed at most each claim interval.
	ClaimBatchSize int64

	// How long should we treat a pending message as claimable.
	MaxIdleTime time.Duration

	// Check consumer status interval.
	CheckConsumersInterval time.Duration

	// After this timeout an idle consumer with no pending messages will be removed from the consumer group.
	ConsumerTimeout time.Duration

	// Start consumption from the specified message ID.
	// When using "0", the consumer group will consume from the very first message.
	// When using "$", the consumer group will consume from the latest message.
	OldestId string

	// If consumer group in not set, for fanout start consumption from the specified message ID.
	// When using "0", the consumer will consume from the very first message.
	// When using "$", the consumer will consume from the latest message.
	FanOutOldestId string

	// If this is set, it will be called to decide whether a pending message that
	// has been idle for more than MaxIdleTime should actually be claimed.
	// If this is not set, then all pending messages that have been idle for more than MaxIdleTime will be claimed.
	// This can be useful e.g. for tasks where the processing time can be very variable -
	// so we can't just use a short MaxIdleTime; but at the same time dead
	// consumers should be spotted quickly - so we can't just use a long MaxIdleTime either.
	// In such cases, if we have another way for checking consumers' health, then we can
	// leverage that in this callback.
	ShouldClaimPendingMessage func(redis.XPendingExt) bool

	// If this is set, it will be called to decide whether a reading error
	// should return the read method and close the subscriber or just log the error
	// and continue.
	ShouldStopOnReadErrors func(error) bool

	LockExpiration int64

	ZRangeCount int64
}

func (*SubscriberConfig) Validate

func (sc *SubscriberConfig) Validate() error

type Unmarshaller

type Unmarshaller interface {
	Unmarshal(value string) (*message.Message, error)
}

Unmarshaler unmarshals Kafka's message to Watermill's message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL