redisstream

package module
v0.0.0-...-38e17da Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2025 License: Apache-2.0 Imports: 9 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type RedisStream

type RedisStream struct {
	// contains filtered or unexported fields
}

RedisStream wraps a Redis client.

func Init

func Init() *RedisStream

Init loads configuration and sets up the default event bus.

func NewRedisStream

func NewRedisStream(cfg *RedisStreamConfig) *RedisStream

NewRedisStream creates a new RedisStream and verifies the connection.

func (*RedisStream) Close

func (r *RedisStream) Close() error

Close terminates the Redis connection.

func (*RedisStream) Publish

func (r *RedisStream) Publish(ctx context.Context, topic string, event cloudevents.Event) error

Publish serializes the event as JSON and adds it to the specified stream.

func (*RedisStream) Subscribe

func (r *RedisStream) Subscribe(topic string, handler func(ctx context.Context, event cloudevents.Event), opts ...messaging.SubscriptionOption) error

Subscribe subscribes to a Redis stream.

  • If a consumer group is provided (SubscriptionOptions.Group is non-empty), it uses consumer group semantics (with XREADGroup and offset ">").
  • Otherwise, it uses a plain XREAD subscription with the fixed offset "$" for new messages.

The method accepts a context for cancellation.

type RedisStreamConfig

type RedisStreamConfig struct {
	URL               string `mapstructure:"url"`
	Username          string `mapstructure:"username"`
	Password          string `mapstructure:"password"`
	TLS               bool   `mapstructure:"tls"`               // Secure connection flag
	MaxRetries        int    `mapstructure:"maxRetries"`        // For retry logic
	MaxConnections    int    `mapstructure:"maxConnections"`    // Connection pooling config
	ConnectionTimeout int    `mapstructure:"connectionTimeout"` // Timeout for connections
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL