channels

package
v0.0.0-...-b400eb5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2021 License: AGPL-3.0 Imports: 9 Imported by: 42

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MockChannel

func MockChannel() (Publisher, ConsumerChannel)

func MockFinish

func MockFinish(channel ConsumerChannel, count uint) error

Types

type Consumer

type Consumer interface {
	Consume(Delivery)
}

type ConsumerChannel

type ConsumerChannel interface {
	AddConsumer(Consumer) bool
	StartConsuming() bool
	StopConsuming() bool
	ReturnAllUnacked() int
	PurgeRejected() int
	Publisher() Publisher
}

func ConsumerFromURI

func ConsumerFromURI(uri string, redisClient *redis.Client) (ConsumerChannel, error)

func NewQueueConsumerChannel

func NewQueueConsumerChannel(channelName string, redisClient *redis.Client) ConsumerChannel

NewQueueConsumerChannel returns a ConsumerChannel that uses Redis queues for communication. Each message delivered through this ConsumerChannel will be delivered to only one consumer, assuming the consumer Acks the message.

func NewTopicConsumerChannel

func NewTopicConsumerChannel(channelName string, redisClient *redis.Client) ConsumerChannel

NewTopicConsumerChannel returns a ConsumerChannel that uses Redis PubSub for communication. Each message delivered through this consumer channel will be delivered once to each consumer. Note, however, that network issues that prevent delivery of a message may lead to messages going completely undelivered. Consumers may Ack or Reject the messages, but this is a no-op.

type DelayRelay

type DelayRelay struct {
	*Relay
	// contains filtered or unexported fields
}

func NewDelayRelay

func NewDelayRelay(sourcePublisher Publisher, channel ConsumerChannel, publisher Publisher, sentinel string) DelayRelay

NewDelayRelay creates a DelayRelay that consumes from `channel` and relays messages to `publisher`. It also requires `sourcePublisher`, which must be able to publish messages to `channel`, and `sentinel` which should be a string that would never be a normal message from the consumer channel. Messages will pile up on the DelayRelay until DelayRelay.Flush() is called, at which point any queued messages will flush to `publisher`.

Note: `channel` should be a queue based channel, not a topic based channel. Topic based channels process messages in parallel, and thus won't block until Flush() is called

func (*DelayRelay) Flush

func (relay *DelayRelay) Flush()

type DelayRelayFilter

type DelayRelayFilter struct {
	// contains filtered or unexported fields
}

func (*DelayRelayFilter) Filter

func (filter *DelayRelayFilter) Filter(delivery Delivery) bool

type Delivery

type Delivery interface {
	Payload() string
	Ack() bool
	Reject() bool
	Return() bool
}

type IncludeAll

type IncludeAll struct {
	// contains filtered or unexported fields
}

func (*IncludeAll) Filter

func (filter *IncludeAll) Filter(delivery Delivery) bool

type InvertFilter

type InvertFilter struct {
	Subfilter RelayFilter
}

func (*InvertFilter) Filter

func (filter *InvertFilter) Filter(delivery Delivery) bool

type MockURITranslator

type MockURITranslator struct {
	// contains filtered or unexported fields
}

func (*MockURITranslator) ConsumerFromURI

func (rut *MockURITranslator) ConsumerFromURI(uri string) (ConsumerChannel, error)

func (*MockURITranslator) PublisherFromURI

func (rut *MockURITranslator) PublisherFromURI(uri string) (Publisher, error)

type MultiPublisher

type MultiPublisher []Publisher

func (MultiPublisher) Publish

func (mp MultiPublisher) Publish(payload string) bool

type Publisher

type Publisher interface {
	Publish(payload string) bool
}

func MockPublisher

func MockPublisher() (Publisher, chan Delivery)

func NewRedisQueuePublisher

func NewRedisQueuePublisher(key string, client *redis.Client) Publisher

func NewRedisTopicPublisher

func NewRedisTopicPublisher(key string, client *redis.Client) Publisher

func PublisherFromURI

func PublisherFromURI(uri string, redisClient *redis.Client) (Publisher, error)

type RedisURITranslator

type RedisURITranslator struct {
	// contains filtered or unexported fields
}

func (*RedisURITranslator) ConsumerFromURI

func (rut *RedisURITranslator) ConsumerFromURI(uri string) (ConsumerChannel, error)

func (*RedisURITranslator) PublisherFromURI

func (rut *RedisURITranslator) PublisherFromURI(uri string) (Publisher, error)

type Relay

type Relay struct {
	// contains filtered or unexported fields
}

func NewRelay

func NewRelay(channel ConsumerChannel, publishers []Publisher, filter RelayFilter, concurrency int) Relay

func (*Relay) Start

func (relay *Relay) Start() bool

func (*Relay) Stop

func (relay *Relay) Stop() bool

type RelayConsumer

type RelayConsumer struct {
	// contains filtered or unexported fields
}

func (*RelayConsumer) Consume

func (consumer *RelayConsumer) Consume(delivery Delivery)

type RelayFilter

type RelayFilter interface {
	Filter(Delivery) bool
}

RelayFilter objects provide a predicate function to determine whether a message should be passed to the next stage

type URITranslator

type URITranslator interface {
	ConsumerFromURI(string) (ConsumerChannel, error)
	PublisherFromURI(string) (Publisher, error)
}

func NewMockURITranslator

func NewMockURITranslator(redisClient *redis.Client) URITranslator

func NewRedisURITranslator

func NewRedisURITranslator(redisClient *redis.Client) URITranslator

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL