Documentation
¶
Index ¶
- Constants
- func ByteSliceToRedisMessageMapSlice(values [][]byte) []map[string]interface{}
- func NewClusterClient(opts *ClusterClientOptions, logger logging.LoggerContract) (*redis.ClusterClient, error)
- type ClusterClientOptions
- type ClusterInfo
- type ClusterNodeInfo
- type CreateStreamParameters
- type MockRedisClient
- func (m *MockRedisClient) HGetAll(ctx context.Context, key string) *rdb.MapStringStringCmd
- func (m *MockRedisClient) HSet(ctx context.Context, key string, values ...interface{}) *rdb.IntCmd
- func (m *MockRedisClient) HSetNX(ctx context.Context, key, field string, value interface{}) *rdb.BoolCmd
- func (m *MockRedisClient) SAdd(ctx context.Context, key string, members ...interface{}) *rdb.IntCmd
- func (m *MockRedisClient) SMembers(ctx context.Context, key string) *rdb.StringSliceCmd
- func (m *MockRedisClient) XAdd(ctx context.Context, params *rdb.XAddArgs) *rdb.StringCmd
- func (m *MockRedisClient) XDel(ctx context.Context, stream string, ids ...string) *rdb.IntCmd
- func (m *MockRedisClient) XInfoStream(ctx context.Context, stream string) *rdb.XInfoStreamCmd
- func (m *MockRedisClient) XRange(ctx context.Context, stream, start, stop string) *rdb.XMessageSliceCmd
- func (m *MockRedisClient) XRangeN(ctx context.Context, stream, start, stop string, count int64) *rdb.XMessageSliceCmd
- func (m *MockRedisClient) XTrimMinID(ctx context.Context, stream string, minID string) *rdb.IntCmd
- type RedisNotEnoughNodesError
- type RedisStreamClient
- type RedisStreamNotFoundError
- type RedisStreamPublishError
- type RedisStreamService
- type RedisStreamServiceImpl
- func (s *RedisStreamServiceImpl) CountMessagesOlderThan(streamName string, minId string, batchSize int64) (int64, error)
- func (s *RedisStreamServiceImpl) CreateStream(params *CreateStreamParameters) error
- func (s *RedisStreamServiceImpl) DeleteMessagesOlderThan(streamName string, minId string) error
- func (s *RedisStreamServiceImpl) GetMessagesOlderThan(streamName string, minId string, count int64) ([]redis.XMessage, error)
- func (s *RedisStreamServiceImpl) PublishMessages(streamName string, messages [][]byte) (*StreamPublishResult, error)
- func (s *RedisStreamServiceImpl) StreamExists(streamName string) (bool, error)
- type RedisStreamServiceMock
- func (m *RedisStreamServiceMock) CountMessagesOlderThan(streamName string, minId string, batchSize int64) (int64, error)
- func (m *RedisStreamServiceMock) CreateStream(params *CreateStreamParameters) error
- func (m *RedisStreamServiceMock) DeleteMessagesOlderThan(streamName string, minId string) error
- func (m *RedisStreamServiceMock) GetMessagesOlderThan(streamName string, minId string, count int64) ([]redis.XMessage, error)
- func (m *RedisStreamServiceMock) PublishMessages(streamName string, messages [][]byte) (*StreamPublishResult, error)
- func (m *RedisStreamServiceMock) StreamExists(streamName string) (bool, error)
- type RedisStreamServiceOptions
- type StreamMetadata
- type StreamMetadataService
- type StreamMetadataServiceImpl
- func (s *StreamMetadataServiceImpl) AddToCleanupBucket(streamName string, bucketKey string) error
- func (s *StreamMetadataServiceImpl) AddToRegistry(streamName string) error
- func (s *StreamMetadataServiceImpl) GetStreamMetadata(hash string) (*StreamMetadata, error)
- func (s *StreamMetadataServiceImpl) ListStreams() ([]string, error)
- func (s *StreamMetadataServiceImpl) WriteStreamMetadata(value *StreamMetadata) error
- type StreamMetadataServiceMock
- func (m *StreamMetadataServiceMock) AddToCleanupBucket(streamName string, bucketKey string) error
- func (m *StreamMetadataServiceMock) AddToRegistry(streamName string) error
- func (m *StreamMetadataServiceMock) GetStreamMetadata(streamName string) (*StreamMetadata, error)
- func (m *StreamMetadataServiceMock) ListStreams() ([]string, error)
- func (m *StreamMetadataServiceMock) ReadStreamMetadata(streamName string) (*StreamMetadata, error)
- func (m *StreamMetadataServiceMock) WriteStreamMetadata(value *StreamMetadata) error
- type StreamPublishResult
Constants ¶
View Source
const DEFAULT_PING_ATTEMPTS = 10
View Source
const DEFAULT_PING_BACKOFF_LIMIT = 60
View Source
const STREAM_CLEANUP_BUCKET_ARCHIVE = "stream_cleanup_bucket:archive"
View Source
const STREAM_CLEANUP_BUCKET_DELETE = "stream_cleanup_bucket:delete"
View Source
const STREAM_CLEANUP_BUCKET_DELETE_ARCHIVE = "stream_cleanup_bucket:delete_archive"
View Source
const STREAM_META_DATA_PREFIX = "{streamweaver_stream_metadata}:"
The curly braces are used to force keys with simiar tags to go the same cluster slot, which is useful for sharding.
View Source
const STREAM_REGISTRY_KEY = "stream_registry"
Variables ¶
This section is empty.
Functions ¶
func ByteSliceToRedisMessageMapSlice ¶
Converts a slice of byte slices into a slice of maps that can be used with Redis.
func NewClusterClient ¶
func NewClusterClient(opts *ClusterClientOptions, logger logging.LoggerContract) (*redis.ClusterClient, error)
Types ¶
type ClusterClientOptions ¶
type ClusterInfo ¶
type ClusterInfo struct {
Nodes []*ClusterNodeInfo
}
func GetClusterInfo ¶
func GetClusterInfo(context context.Context, client *redis.ClusterClient) (*ClusterInfo, error)
type ClusterNodeInfo ¶
type ClusterNodeInfo struct { ID string `json:"id"` Address string `json:"address"` Hostname string `json:"hostname"` Flags []string `json:"flags"` Master string `json:"master"` PingSent int64 `json:"ping_sent"` PingRecv int64 `json:"ping_recv"` ConfigEpoch int64 `json:"config_epoch"` LinkState string `json:"link_state"` Slot string `json:"slot"` }
type CreateStreamParameters ¶
func (*CreateStreamParameters) Validate ¶
func (p *CreateStreamParameters) Validate() error
type MockRedisClient ¶
func (*MockRedisClient) HGetAll ¶
func (m *MockRedisClient) HGetAll(ctx context.Context, key string) *rdb.MapStringStringCmd
func (*MockRedisClient) SMembers ¶
func (m *MockRedisClient) SMembers(ctx context.Context, key string) *rdb.StringSliceCmd
func (*MockRedisClient) XInfoStream ¶
func (m *MockRedisClient) XInfoStream(ctx context.Context, stream string) *rdb.XInfoStreamCmd
func (*MockRedisClient) XRange ¶
func (m *MockRedisClient) XRange(ctx context.Context, stream, start, stop string) *rdb.XMessageSliceCmd
func (*MockRedisClient) XRangeN ¶
func (m *MockRedisClient) XRangeN(ctx context.Context, stream, start, stop string, count int64) *rdb.XMessageSliceCmd
func (*MockRedisClient) XTrimMinID ¶
type RedisNotEnoughNodesError ¶
type RedisNotEnoughNodesError struct{}
func NotEnoughNodesError ¶
func NotEnoughNodesError() *RedisNotEnoughNodesError
func (*RedisNotEnoughNodesError) Error ¶
func (e *RedisNotEnoughNodesError) Error() string
type RedisStreamClient ¶
type RedisStreamClient interface { XAdd(ctx context.Context, args *rdb.XAddArgs) *rdb.StringCmd XDel(ctx context.Context, stream string, ids ...string) *rdb.IntCmd XInfoStream(ctx context.Context, stream string) *rdb.XInfoStreamCmd XTrimMinID(ctx context.Context, stream string, minID string) *rdb.IntCmd XRange(ctx context.Context, stream, start, stop string) *rdb.XMessageSliceCmd XRangeN(ctx context.Context, stream, start, stop string, count int64) *rdb.XMessageSliceCmd HSet(ctx context.Context, key string, values ...interface{}) *rdb.IntCmd HSetNX(ctx context.Context, key, field string, value interface{}) *rdb.BoolCmd HGetAll(ctx context.Context, key string) *rdb.MapStringStringCmd SAdd(ctx context.Context, key string, members ...interface{}) *rdb.IntCmd SMembers(ctx context.Context, key string) *rdb.StringSliceCmd }
type RedisStreamNotFoundError ¶
type RedisStreamNotFoundError struct {
Name string
}
func StreamNotFoundError ¶
func StreamNotFoundError(name string) *RedisStreamNotFoundError
func (*RedisStreamNotFoundError) Error ¶
func (e *RedisStreamNotFoundError) Error() string
type RedisStreamPublishError ¶
type RedisStreamPublishError struct {
Err error
}
func StreamPublishError ¶
func StreamPublishError(err error) *RedisStreamPublishError
func (*RedisStreamPublishError) Error ¶
func (e *RedisStreamPublishError) Error() string
type RedisStreamService ¶
type RedisStreamService interface { // Create a new Redis stream for producing and consuming messages CreateStream(params *CreateStreamParameters) error // Count messages older than a given ID in a stream CountMessagesOlderThan(streamName string, minId string, batchSize int64) (int64, error) // Delete messages older than a given ID from a stream DeleteMessagesOlderThan(streamName string, minId string) error // Get messages older than a given ID from a stream GetMessagesOlderThan(streamName string, minId string, count int64) ([]redis.XMessage, error) // Publish messages to a stream PublishMessages(streamName string, messages [][]byte) (*StreamPublishResult, error) }
func NewRedisStreamService ¶
func NewRedisStreamService(opts *RedisStreamServiceOptions, logger logging.LoggerContract) RedisStreamService
type RedisStreamServiceImpl ¶
type RedisStreamServiceImpl struct { Ctx context.Context Client RedisStreamClient StreamMetadataService StreamMetadataService Logger logging.LoggerContract GlobalRetentionOptions *config.RetentionConfig }
Implements RedisStreamServiceContract
func (*RedisStreamServiceImpl) CountMessagesOlderThan ¶
func (*RedisStreamServiceImpl) CreateStream ¶
func (s *RedisStreamServiceImpl) CreateStream(params *CreateStreamParameters) error
func (*RedisStreamServiceImpl) DeleteMessagesOlderThan ¶
func (s *RedisStreamServiceImpl) DeleteMessagesOlderThan(streamName string, minId string) error
func (*RedisStreamServiceImpl) GetMessagesOlderThan ¶
func (s *RedisStreamServiceImpl) GetMessagesOlderThan(streamName string, minId string, count int64) ([]redis.XMessage, error)
func (*RedisStreamServiceImpl) PublishMessages ¶
func (s *RedisStreamServiceImpl) PublishMessages(streamName string, messages [][]byte) (*StreamPublishResult, error)
Publish messages to a stream
func (*RedisStreamServiceImpl) StreamExists ¶
func (s *RedisStreamServiceImpl) StreamExists(streamName string) (bool, error)
type RedisStreamServiceMock ¶
func NewRedisStreamServiceMock ¶
func NewRedisStreamServiceMock() *RedisStreamServiceMock
func (*RedisStreamServiceMock) CountMessagesOlderThan ¶
func (*RedisStreamServiceMock) CreateStream ¶
func (m *RedisStreamServiceMock) CreateStream(params *CreateStreamParameters) error
func (*RedisStreamServiceMock) DeleteMessagesOlderThan ¶
func (m *RedisStreamServiceMock) DeleteMessagesOlderThan(streamName string, minId string) error
func (*RedisStreamServiceMock) GetMessagesOlderThan ¶
func (m *RedisStreamServiceMock) GetMessagesOlderThan(streamName string, minId string, count int64) ([]redis.XMessage, error)
func (*RedisStreamServiceMock) PublishMessages ¶
func (m *RedisStreamServiceMock) PublishMessages(streamName string, messages [][]byte) (*StreamPublishResult, error)
func (*RedisStreamServiceMock) StreamExists ¶
func (m *RedisStreamServiceMock) StreamExists(streamName string) (bool, error)
type RedisStreamServiceOptions ¶
type RedisStreamServiceOptions struct { Ctx context.Context MetadataService StreamMetadataService RedisClient RedisStreamClient GlobalRetentionOptions *config.RetentionConfig }
type StreamMetadata ¶
type StreamMetadataService ¶
type StreamMetadataService interface { AddToRegistry(streamName string) error AddToCleanupBucket(streamName string, bucketKey string) error GetStreamMetadata(streamHash string) (*StreamMetadata, error) ListStreams() ([]string, error) WriteStreamMetadata(value *StreamMetadata) error }
func NewStreamMetadataService ¶
func NewStreamMetadataService(ctx context.Context, client RedisStreamClient, logger logging.LoggerContract) StreamMetadataService
type StreamMetadataServiceImpl ¶
type StreamMetadataServiceImpl struct { Ctx context.Context Logger logging.LoggerContract Client RedisStreamClient }
func (*StreamMetadataServiceImpl) AddToCleanupBucket ¶
func (s *StreamMetadataServiceImpl) AddToCleanupBucket(streamName string, bucketKey string) error
Adds a stream to the bucket for the cleanup policy
func (*StreamMetadataServiceImpl) AddToRegistry ¶
func (s *StreamMetadataServiceImpl) AddToRegistry(streamName string) error
Adds a stream to the registry
func (*StreamMetadataServiceImpl) GetStreamMetadata ¶
func (s *StreamMetadataServiceImpl) GetStreamMetadata(hash string) (*StreamMetadata, error)
Gets the metadata for a stream
func (*StreamMetadataServiceImpl) ListStreams ¶
func (s *StreamMetadataServiceImpl) ListStreams() ([]string, error)
Lists all streams in the registry
func (*StreamMetadataServiceImpl) WriteStreamMetadata ¶
func (s *StreamMetadataServiceImpl) WriteStreamMetadata(value *StreamMetadata) error
type StreamMetadataServiceMock ¶
func NewStreamMetadataServiceMock ¶
func NewStreamMetadataServiceMock() *StreamMetadataServiceMock
func (*StreamMetadataServiceMock) AddToCleanupBucket ¶
func (m *StreamMetadataServiceMock) AddToCleanupBucket(streamName string, bucketKey string) error
func (*StreamMetadataServiceMock) AddToRegistry ¶
func (m *StreamMetadataServiceMock) AddToRegistry(streamName string) error
func (*StreamMetadataServiceMock) GetStreamMetadata ¶
func (m *StreamMetadataServiceMock) GetStreamMetadata(streamName string) (*StreamMetadata, error)
func (*StreamMetadataServiceMock) ListStreams ¶
func (m *StreamMetadataServiceMock) ListStreams() ([]string, error)
func (*StreamMetadataServiceMock) ReadStreamMetadata ¶
func (m *StreamMetadataServiceMock) ReadStreamMetadata(streamName string) (*StreamMetadata, error)
func (*StreamMetadataServiceMock) WriteStreamMetadata ¶
func (m *StreamMetadataServiceMock) WriteStreamMetadata(value *StreamMetadata) error
type StreamPublishResult ¶
func (*StreamPublishResult) AddError ¶
func (r *StreamPublishResult) AddError(err error)
func (*StreamPublishResult) AddMessageId ¶
func (r *StreamPublishResult) AddMessageId(id string)
func (*StreamPublishResult) IncrementFailed ¶
func (r *StreamPublishResult) IncrementFailed()
func (*StreamPublishResult) IncrementPublished ¶
func (r *StreamPublishResult) IncrementPublished()
Click to show internal directories.
Click to hide internal directories.