pubsub

package
v0.0.0-...-ef901e4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 7, 2022 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrClosed = fmt.Errorf("Redis PSUBSCRIBE stream closed due to storage shutdown: %w", context.Canceled)

ErrClosed represents dispatcher has been closed.

Functions

This section is empty.

Types

type AwaitCancelFunc

type AwaitCancelFunc func(err error)

AwaitCancelFunc is function to cancel awaiter.

type DispatcherParams

type DispatcherParams struct {
	ReconcileInterval        time.Duration
	ReconcileRetryInterval   time.Duration
	ReconcileMinimumInterval time.Duration
}

DispatcherParams tunes Dispatcher

type RedisChannelID

type RedisChannelID string

RedisChannelID represents channel ID of Redis Pub/Sub

type RedisPubSubAwaiter

type RedisPubSubAwaiter interface {
	// Chan returns channel that will be closed when new message received or error occurred (fulfilled).
	Chan() chan interface{}
	// After Chan() has been closed, can obtain error object if error occurred.
	Err() error
}

RedisPubSubAwaiter is Promise-like object repsresents Pub/Sub message await.

type RedisPubSubDispatcher

type RedisPubSubDispatcher interface {
	Await(ctx context.Context, channel RedisChannelID) (RedisPubSubAwaiter, AwaitCancelFunc)
	Shutdown(ctx context.Context)
}

RedisPubSubDispatcher subscribe Redis PubSub with PSUBSCRIBE (wildcard subscription), then broadcast message to redisPubsubAwaiter. Because go-redis open/close underlying TCP connection for each subscription, it cause massive TCP CLOSE_WAIT connections if Storage.FetchMessage make SUBSCRIBE for each call.

func NewDispatcher

func NewDispatcher(ctx context.Context, deps deps.StorageDeps, params DispatcherParams, psubscribe RedisSubscribeRawFunc, pattern RedisChannelID) RedisPubSubDispatcher

NewDispatcher creates instance

type RedisPubSubPromise

type RedisPubSubPromise interface {
	Resolve()
	Reject(err error)

	Chan() chan interface{}
	Err() error
}

RedisPubSubPromise supports resolve, reject operations in addition to RedisPubSubAwaiter

func NewPromise

func NewPromise() RedisPubSubPromise

NewPromise creates new RedisPubSubPromise

type RedisRawPubSub

type RedisRawPubSub interface {
	Receive(context.Context) (interface{}, error)
	Ping(context.Context, ...string) error
	ChannelSize(int) <-chan *redis.Message
	Close() error
}

RedisRawPubSub is subset of *redis.PubSub

type RedisSubscribeRawFunc

type RedisSubscribeRawFunc func(ctx context.Context, channel RedisChannelID) RedisRawPubSub

RedisSubscribeRawFunc represents (P)SUBSCRIBE command implementation.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL