redis

package
v0.0.0-...-50e45a6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2025 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const DEFAULT_PING_ATTEMPTS = 10
View Source
const DEFAULT_PING_BACKOFF_LIMIT = 60
View Source
const STREAM_CLEANUP_BUCKET_ARCHIVE = "stream_cleanup_bucket:archive"
View Source
const STREAM_CLEANUP_BUCKET_DELETE = "stream_cleanup_bucket:delete"
View Source
const STREAM_CLEANUP_BUCKET_DELETE_ARCHIVE = "stream_cleanup_bucket:delete_archive"
View Source
const STREAM_META_DATA_PREFIX = "{streamweaver_stream_metadata}:"

The curly braces are used to force keys with simiar tags to go the same cluster slot, which is useful for sharding.

View Source
const STREAM_REGISTRY_KEY = "stream_registry"

Variables

This section is empty.

Functions

func ByteSliceToRedisMessageMapSlice

func ByteSliceToRedisMessageMapSlice(values [][]byte) []map[string]interface{}

Converts a slice of byte slices into a slice of maps that can be used with Redis.

func NewClusterClient

func NewClusterClient(opts *ClusterClientOptions, logger logging.LoggerContract) (*redis.ClusterClient, error)

Types

type ClusterClientOptions

type ClusterClientOptions struct {
	Ctx              context.Context
	Nodes            []string
	Password         string
	DB               int
	MaxPingRetries   int
	PingBackoffLimit int
}

type ClusterInfo

type ClusterInfo struct {
	Nodes []*ClusterNodeInfo
}

func GetClusterInfo

func GetClusterInfo(context context.Context, client *redis.ClusterClient) (*ClusterInfo, error)

type ClusterNodeInfo

type ClusterNodeInfo struct {
	ID          string   `json:"id"`
	Address     string   `json:"address"`
	Hostname    string   `json:"hostname"`
	Flags       []string `json:"flags"`
	Master      string   `json:"master"`
	PingSent    int64    `json:"ping_sent"`
	PingRecv    int64    `json:"ping_recv"`
	ConfigEpoch int64    `json:"config_epoch"`
	LinkState   string   `json:"link_state"`
	Slot        string   `json:"slot"`
}

type CreateStreamParameters

type CreateStreamParameters struct {
	Name          string
	CleanupPolicy string
	MaxAge        int64
}

func (*CreateStreamParameters) Validate

func (p *CreateStreamParameters) Validate() error

type MockRedisClient

type MockRedisClient struct {
	mock.Mock
}

func (*MockRedisClient) HGetAll

func (*MockRedisClient) HSet

func (m *MockRedisClient) HSet(ctx context.Context, key string, values ...interface{}) *rdb.IntCmd

func (*MockRedisClient) HSetNX

func (m *MockRedisClient) HSetNX(ctx context.Context, key, field string, value interface{}) *rdb.BoolCmd

func (*MockRedisClient) SAdd

func (m *MockRedisClient) SAdd(ctx context.Context, key string, members ...interface{}) *rdb.IntCmd

func (*MockRedisClient) SMembers

func (m *MockRedisClient) SMembers(ctx context.Context, key string) *rdb.StringSliceCmd

func (*MockRedisClient) XAdd

func (m *MockRedisClient) XAdd(ctx context.Context, params *rdb.XAddArgs) *rdb.StringCmd

func (*MockRedisClient) XDel

func (m *MockRedisClient) XDel(ctx context.Context, stream string, ids ...string) *rdb.IntCmd

func (*MockRedisClient) XInfoStream

func (m *MockRedisClient) XInfoStream(ctx context.Context, stream string) *rdb.XInfoStreamCmd

func (*MockRedisClient) XRange

func (m *MockRedisClient) XRange(ctx context.Context, stream, start, stop string) *rdb.XMessageSliceCmd

func (*MockRedisClient) XRangeN

func (m *MockRedisClient) XRangeN(ctx context.Context, stream, start, stop string, count int64) *rdb.XMessageSliceCmd

func (*MockRedisClient) XTrimMinID

func (m *MockRedisClient) XTrimMinID(ctx context.Context, stream string, minID string) *rdb.IntCmd

type RedisNotEnoughNodesError

type RedisNotEnoughNodesError struct{}

func NotEnoughNodesError

func NotEnoughNodesError() *RedisNotEnoughNodesError

func (*RedisNotEnoughNodesError) Error

func (e *RedisNotEnoughNodesError) Error() string

type RedisStreamClient

type RedisStreamClient interface {
	XAdd(ctx context.Context, args *rdb.XAddArgs) *rdb.StringCmd
	XDel(ctx context.Context, stream string, ids ...string) *rdb.IntCmd
	XInfoStream(ctx context.Context, stream string) *rdb.XInfoStreamCmd
	XTrimMinID(ctx context.Context, stream string, minID string) *rdb.IntCmd
	XRange(ctx context.Context, stream, start, stop string) *rdb.XMessageSliceCmd
	XRangeN(ctx context.Context, stream, start, stop string, count int64) *rdb.XMessageSliceCmd
	HSet(ctx context.Context, key string, values ...interface{}) *rdb.IntCmd
	HSetNX(ctx context.Context, key, field string, value interface{}) *rdb.BoolCmd
	HGetAll(ctx context.Context, key string) *rdb.MapStringStringCmd
	SAdd(ctx context.Context, key string, members ...interface{}) *rdb.IntCmd
	SMembers(ctx context.Context, key string) *rdb.StringSliceCmd
}

type RedisStreamNotFoundError

type RedisStreamNotFoundError struct {
	Name string
}

func StreamNotFoundError

func StreamNotFoundError(name string) *RedisStreamNotFoundError

func (*RedisStreamNotFoundError) Error

func (e *RedisStreamNotFoundError) Error() string

type RedisStreamPublishError

type RedisStreamPublishError struct {
	Err error
}

func StreamPublishError

func StreamPublishError(err error) *RedisStreamPublishError

func (*RedisStreamPublishError) Error

func (e *RedisStreamPublishError) Error() string

type RedisStreamService

type RedisStreamService interface {
	// Create a new Redis stream for producing and consuming messages
	CreateStream(params *CreateStreamParameters) error
	// Count messages older than a given ID in a stream
	CountMessagesOlderThan(streamName string, minId string, batchSize int64) (int64, error)
	// Delete messages older than a given ID from a stream
	DeleteMessagesOlderThan(streamName string, minId string) error
	// Get messages older than a given ID from a stream
	GetMessagesOlderThan(streamName string, minId string, count int64) ([]redis.XMessage, error)
	// Publish messages to a stream
	PublishMessages(streamName string, messages [][]byte) (*StreamPublishResult, error)
}

type RedisStreamServiceImpl

type RedisStreamServiceImpl struct {
	Ctx                    context.Context
	Client                 RedisStreamClient
	StreamMetadataService  StreamMetadataService
	Logger                 logging.LoggerContract
	GlobalRetentionOptions *config.RetentionConfig
}

Implements RedisStreamServiceContract

func (*RedisStreamServiceImpl) CountMessagesOlderThan

func (s *RedisStreamServiceImpl) CountMessagesOlderThan(streamName string, minId string, batchSize int64) (int64, error)

func (*RedisStreamServiceImpl) CreateStream

func (s *RedisStreamServiceImpl) CreateStream(params *CreateStreamParameters) error

func (*RedisStreamServiceImpl) DeleteMessagesOlderThan

func (s *RedisStreamServiceImpl) DeleteMessagesOlderThan(streamName string, minId string) error

func (*RedisStreamServiceImpl) GetMessagesOlderThan

func (s *RedisStreamServiceImpl) GetMessagesOlderThan(streamName string, minId string, count int64) ([]redis.XMessage, error)

func (*RedisStreamServiceImpl) PublishMessages

func (s *RedisStreamServiceImpl) PublishMessages(streamName string, messages [][]byte) (*StreamPublishResult, error)

Publish messages to a stream

func (*RedisStreamServiceImpl) StreamExists

func (s *RedisStreamServiceImpl) StreamExists(streamName string) (bool, error)

type RedisStreamServiceMock

type RedisStreamServiceMock struct {
	mock.Mock
}

func NewRedisStreamServiceMock

func NewRedisStreamServiceMock() *RedisStreamServiceMock

func (*RedisStreamServiceMock) CountMessagesOlderThan

func (m *RedisStreamServiceMock) CountMessagesOlderThan(streamName string, minId string, batchSize int64) (int64, error)

func (*RedisStreamServiceMock) CreateStream

func (m *RedisStreamServiceMock) CreateStream(params *CreateStreamParameters) error

func (*RedisStreamServiceMock) DeleteMessagesOlderThan

func (m *RedisStreamServiceMock) DeleteMessagesOlderThan(streamName string, minId string) error

func (*RedisStreamServiceMock) GetMessagesOlderThan

func (m *RedisStreamServiceMock) GetMessagesOlderThan(streamName string, minId string, count int64) ([]redis.XMessage, error)

func (*RedisStreamServiceMock) PublishMessages

func (m *RedisStreamServiceMock) PublishMessages(streamName string, messages [][]byte) (*StreamPublishResult, error)

func (*RedisStreamServiceMock) StreamExists

func (m *RedisStreamServiceMock) StreamExists(streamName string) (bool, error)

type RedisStreamServiceOptions

type RedisStreamServiceOptions struct {
	Ctx                    context.Context
	MetadataService        StreamMetadataService
	RedisClient            RedisStreamClient
	GlobalRetentionOptions *config.RetentionConfig
}

type StreamMetadata

type StreamMetadata struct {
	Name          string
	MaxAge        int64
	CleanupPolicy string
	CreatedAt     int64
	UpdatedAt     int64
}

type StreamMetadataService

type StreamMetadataService interface {
	AddToRegistry(streamName string) error
	AddToCleanupBucket(streamName string, bucketKey string) error
	GetStreamMetadata(streamHash string) (*StreamMetadata, error)
	ListStreams() ([]string, error)
	WriteStreamMetadata(value *StreamMetadata) error
}

type StreamMetadataServiceImpl

type StreamMetadataServiceImpl struct {
	Ctx    context.Context
	Logger logging.LoggerContract
	Client RedisStreamClient
}

func (*StreamMetadataServiceImpl) AddToCleanupBucket

func (s *StreamMetadataServiceImpl) AddToCleanupBucket(streamName string, bucketKey string) error

Adds a stream to the bucket for the cleanup policy

func (*StreamMetadataServiceImpl) AddToRegistry

func (s *StreamMetadataServiceImpl) AddToRegistry(streamName string) error

Adds a stream to the registry

func (*StreamMetadataServiceImpl) GetStreamMetadata

func (s *StreamMetadataServiceImpl) GetStreamMetadata(hash string) (*StreamMetadata, error)

Gets the metadata for a stream

func (*StreamMetadataServiceImpl) ListStreams

func (s *StreamMetadataServiceImpl) ListStreams() ([]string, error)

Lists all streams in the registry

func (*StreamMetadataServiceImpl) WriteStreamMetadata

func (s *StreamMetadataServiceImpl) WriteStreamMetadata(value *StreamMetadata) error

type StreamMetadataServiceMock

type StreamMetadataServiceMock struct {
	mock.Mock
}

func NewStreamMetadataServiceMock

func NewStreamMetadataServiceMock() *StreamMetadataServiceMock

func (*StreamMetadataServiceMock) AddToCleanupBucket

func (m *StreamMetadataServiceMock) AddToCleanupBucket(streamName string, bucketKey string) error

func (*StreamMetadataServiceMock) AddToRegistry

func (m *StreamMetadataServiceMock) AddToRegistry(streamName string) error

func (*StreamMetadataServiceMock) GetStreamMetadata

func (m *StreamMetadataServiceMock) GetStreamMetadata(streamName string) (*StreamMetadata, error)

func (*StreamMetadataServiceMock) ListStreams

func (m *StreamMetadataServiceMock) ListStreams() ([]string, error)

func (*StreamMetadataServiceMock) ReadStreamMetadata

func (m *StreamMetadataServiceMock) ReadStreamMetadata(streamName string) (*StreamMetadata, error)

func (*StreamMetadataServiceMock) WriteStreamMetadata

func (m *StreamMetadataServiceMock) WriteStreamMetadata(value *StreamMetadata) error

type StreamPublishResult

type StreamPublishResult struct {
	MessageIds []string
	Published  int
	Failed     int
	Errors     []error
}

func (*StreamPublishResult) AddError

func (r *StreamPublishResult) AddError(err error)

func (*StreamPublishResult) AddMessageId

func (r *StreamPublishResult) AddMessageId(id string)

func (*StreamPublishResult) IncrementFailed

func (r *StreamPublishResult) IncrementFailed()

func (*StreamPublishResult) IncrementPublished

func (r *StreamPublishResult) IncrementPublished()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL