gochannel

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2020 License: MIT Imports: 8 Imported by: 69

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Output channel buffer size.
	OutputChannelBuffer int64

	// If persistent is set to true, when subscriber subscribes to the topic,
	// it will receive all previously produced messages.
	//
	// All messages are persisted to the memory (simple slice),
	// so be aware that with large amount of messages you can go out of the memory.
	Persistent bool

	// When true, Publish will block until subscriber Ack's the message.
	// If there are no subscribers, Publish will not block (also when Persistent is true).
	BlockPublishUntilSubscriberAck bool
}

type FanOut added in v1.1.0

type FanOut struct {
	// contains filtered or unexported fields
}

FanOut is a component that receives messages from the subscriber and passes them to all publishers. In effect, messages are "multiplied".

A typical use case for using FanOut is having one external subscription and multiple workers inside the process.

You need to call AddSubscription method for all topics that you want to listen to. This needs to be done *before* starting the FanOut.

FanOut exposes the standard Subscriber interface.

func NewFanOut added in v1.1.0

func NewFanOut(
	subscriber message.Subscriber,
	logger watermill.LoggerAdapter,
) (*FanOut, error)

NewFanOut creates a new FanOut.

func (*FanOut) AddSubscription added in v1.1.0

func (f *FanOut) AddSubscription(topic string)

AddSubscription add an internal subscription for the given topic. You need to call this method with all topics that you want to listen to, before the FanOut is started. AddSubscription is idempotent.

func (*FanOut) Close added in v1.1.0

func (f *FanOut) Close() error

func (*FanOut) Run added in v1.1.0

func (f *FanOut) Run(ctx context.Context) error

Run runs the FanOut.

func (*FanOut) Running added in v1.1.0

func (f *FanOut) Running() chan struct{}

Running is closed when FanOut is running.

func (*FanOut) Subscribe added in v1.1.0

func (f *FanOut) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

type GoChannel

type GoChannel struct {
	// contains filtered or unexported fields
}

GoChannel is the simplest Pub/Sub implementation. It is based on Golang's channels which are sent within the process.

GoChannel has no global state, that means that you need to use the same instance for Publishing and Subscribing!

When GoChannel is persistent, messages order is not guaranteed.

func NewGoChannel

func NewGoChannel(config Config, logger watermill.LoggerAdapter) *GoChannel

NewGoChannel creates new GoChannel Pub/Sub.

This GoChannel is not persistent. That means if you send a message to a topic to which no subscriber is subscribed, that message will be discarded.

func (*GoChannel) Close

func (g *GoChannel) Close() error

func (*GoChannel) Publish

func (g *GoChannel) Publish(topic string, messages ...*message.Message) error

Publish in GoChannel is NOT blocking until all consumers consume. Messages will be send in background.

Messages may be persisted or not, depending of persistent attribute.

func (*GoChannel) Subscribe

func (g *GoChannel) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

Subscribe returns channel to which all published messages are sent. Messages are not persisted. If there are no subscribers and message is produced it will be gone.

There are no consumer groups support etc. Every consumer will receive every produced message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL