Documentation
¶
Index ¶
- Constants
- Variables
- func CreateGroupIfNotExists(client redis.Cmdable, stream, group string) error
- func ParseDurationOrDefault(s *string, def time.Duration) time.Duration
- func UniqueConsumerName(base string) string
- func ValidateDuration(fl validator.FieldLevel) bool
- type Config
- type DLQHandlerFn
- type IRedStream
- type LogFn
- type RedisStream
- func (r *RedisStream) HealthCheck(ctx context.Context) error
- func (r *RedisStream) Publish(ctx context.Context, data any) (string, error)
- func (r *RedisStream) RegisterHandler(handler func(ctx context.Context, msg map[string]string) error)
- func (r *RedisStream) StartConsumer(ctx context.Context) error
- func (r *RedisStream) UseDebug(fn LogFn)
- func (r *RedisStream) UseError(fn LogFn)
- func (r *RedisStream) UseInfo(fn LogFn)
Constants ¶
const (
RedisKeysPrefix = "redstream"
)
Variables ¶
var (
Validate *validator.Validate
)
Functions ¶
func CreateGroupIfNotExists ¶
CreateGroupIfNotExists attempts to create a consumer group for a Redis stream if it doesn't already exist. It uses the XGroupCreateMkStream command, which creates both the stream and the group if they don't exist.
Parameters:
- client: A pointer to a redis.Client instance used to execute Redis commands.
- stream: The name of the Redis stream to create or use.
- group: The name of the consumer group to create.
Returns:
- error: nil if the group was created successfully or already exists, otherwise returns an error.
func ParseDurationOrDefault ¶
ParseDurationOrDefault attempts to parse a string into a time.Duration. If parsing fails or results in a non-positive duration, it returns the default value.
Parameters:
- s: A string representation of a duration (e.g., "5s", "1m30s").
- def: The default duration to return if parsing fails or results in a non-positive value.
Returns:
A time.Duration parsed from the input string, or the default value if parsing fails.
func UniqueConsumerName ¶
UniqueConsumerName generates a unique consumer name for use in Redis streams. It combines a base name with the hostname, process ID, and a random suffix to ensure uniqueness across different instances and executions.
Parameters:
- base: A string that serves as the prefix for the generated consumer name.
Returns:
A string representing a unique consumer name in the format: "<base>-<hostname>-<pid>-<random_suffix>"
func ValidateDuration ¶
func ValidateDuration(fl validator.FieldLevel) bool
ValidateDuration checks if a given field can be parsed as a valid duration.
It takes a validator.FieldLevel as an argument, which provides access to the field being validated and its metadata.
Parameters:
- fl: A validator.FieldLevel interface that allows access to the field being validated, as well as the struct it belongs to.
Returns:
- bool: true if the field can be parsed as a valid duration, false otherwise.
Types ¶
type Config ¶
type Config struct { StreamName string `validate:"required"` GroupName string `validate:"required"` ConsumerName string `validate:"required"` MaxConcurrency int UseDistributedLock bool // if false => skip the redsnyc-based lock NoProgressThreshold int // after how many consecutive zero-reclaims do we call readPendingMessagesOnce LockExpiryStr string `validate:"duration"` LockExtendStr string `validate:"duration"` BlockDurationStr string `validate:"duration"` EnableReclaim bool ReclaimStr string `validate:"duration"` CleanOldReclaimDuration string `validate:"duration"` MaxReclaimAttempts int ReclaimCount int64 ReclaimMaxExponentialFactor int DLQHandler DLQHandlerFn IgnoreDLQHandlerErrors bool DropConcurrentDuplicates bool ProcessedIdsMaxAgeStr string `validate:"duration"` UseRedisIdAsUniqueID bool StaleConsumerIdleThresholdStr string `validate:"duration"` EnableStaleConsumerCleanup bool EnableAutoRejoinOnRemoved bool AutoRejoinCheckIntervalStr string `validate:"duration"` }
Config represents the configuration for the RedStream.
type DLQHandlerFn ¶
type IRedStream ¶
type IRedStream interface { StartConsumer(ctx context.Context) error RegisterHandler(handler func(ctx context.Context, msg map[string]string) error) Publish(ctx context.Context, data any) (string, error) HealthCheck(ctx context.Context) error UseDebug(fn LogFn) UseInfo(fn LogFn) UseError(fn LogFn) }
func New ¶
func New(redisOptions *redis.UniversalOptions, cfg Config) IRedStream
New creates and initializes a new IRedStream instance.
It sets up the Redis client, configures distributed locking if enabled, parses and validates configuration values, creates the consumer group if it doesn't exist, and registers Lua scripts.
Parameters:
- redisOptions: A pointer to redis.UniversalOptions containing Redis connection settings.
- cfg: A Config struct containing the configuration for the RedStream.
Returns:
- An IRedStream interface implementation. If there's an error during initialization, it returns nil and logs a fatal error.
type RedisStream ¶
type RedisStream struct { Client redis.UniversalClient Rs *redsync.Redsync Cfg Config LockExpiry time.Duration LockExtend time.Duration BlockDuration time.Duration ReclaimInterval time.Duration CleanOldReclaimDuration time.Duration ProcessedIdsMaxAge time.Duration // contains filtered or unexported fields }
RedisStream is the main struct implementing IRedStream.
func (*RedisStream) HealthCheck ¶
func (r *RedisStream) HealthCheck(ctx context.Context) error
HealthCheck performs a health check on the Redis connection by sending a PING command. It verifies if the Redis server is responsive and the connection is active.
Parameters:
- ctx: A context.Context for handling cancellation and timeouts.
Returns:
- error: An error if the PING command fails or the connection is not healthy, nil if the health check is successful.
func (*RedisStream) Publish ¶
Publish adds a new message to the Redis stream. It optionally checks for and drops concurrent duplicates.
The function marshals the provided data into JSON, optionally acquires a lock to prevent concurrent duplicates, and then adds the message to the stream using Redis XADD command.
Parameters:
- ctx: A context.Context for handling cancellation and timeouts.
- data: Any data type that can be marshaled into JSON to be published as a message.
Returns:
- string: The ID of the published message in the Redis stream. Empty if skipped due to duplication.
- error: An error if any step in the publishing process fails, nil otherwise.
func (*RedisStream) RegisterHandler ¶
func (r *RedisStream) RegisterHandler(handler func(ctx context.Context, msg map[string]string) error)
RegisterHandler sets the handler for new messages.
func (*RedisStream) StartConsumer ¶
func (r *RedisStream) StartConsumer(ctx context.Context) error
StartConsumer initiates the consumer process for the Redis stream. It starts goroutines for listening to new messages, reclaiming pending messages (if enabled), and performing periodic cleanup of processed message IDs.
This function sets up the necessary channels and goroutines to handle message processing concurrency, new message consumption, and maintenance tasks.
Parameters:
- ctx: A context.Context for handling cancellation and timeouts.
Returns:
- error: An error if the handler function is not set, nil otherwise.
func (*RedisStream) UseDebug ¶
func (r *RedisStream) UseDebug(fn LogFn)
func (*RedisStream) UseError ¶
func (r *RedisStream) UseError(fn LogFn)
func (*RedisStream) UseInfo ¶
func (r *RedisStream) UseInfo(fn LogFn)