topic

package
v0.0.0-...-53bb68d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 30, 2020 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MaxPublishRequestCount is the maximum number of messages that can be in
	// a single publish request, as defined by the PubSub service.
	MaxPublishRequestCount = 1000

	// MaxPublishRequestBytes is the maximum size of a single publish request
	// in bytes, as defined by the PubSub service.
	MaxPublishRequestBytes = 1e7 // 10m

	// prefix use to specific to other client use the same MQ.
	AckTopicPrefix = "ack_"
	WhisperPrefix  = "w_"
)

Variables

View Source
var DefaultPublishSettings = PublishSettings{
	DelayThreshold: 10 * time.Millisecond,
	CountThreshold: 100,
	ByteThreshold:  1e6,
	NumGoroutines:  100 * runtime.GOMAXPROCS(0),
	Timeout:        60 * time.Second,
	AckMapTicker:   5 * time.Second,
	MaxRetryTimes:  3,

	BufferedByteLimit: 10 * MaxPublishRequestBytes,

	RetryParams: &retry.DefaultRetryParams,

	DeadLetterPolicy: nil,
}

DefaultPublishSettings holds the default values for topics' PublishSettings.

Functions

This section is empty.

Types

type Option

type Option func(*Topic) error

func WithCount

func WithCount() Option

func WithOrdered

func WithOrdered() Option

WithRequiredACK would turn on the ack function.

func WithRequiredACK

func WithRequiredACK() Option

WithRequiredACK would turn on the ack function.

type PublishResult

type PublishResult struct {
	// contains filtered or unexported fields
}

PublishResult help to know error because of sending goroutine is another goroutine.

func (*PublishResult) Get

func (r *PublishResult) Get(ctx context.Context) (serverID string, err error)

Get returns the server-generated message ID and/or error result of a Publish call. Get blocks until the Publish call completes or the context is done.

func (*PublishResult) Ready

func (r *PublishResult) Ready() <-chan struct{}

Ready returns a channel that is closed when the result is ready. When the Ready channel is closed, Get is guaranteed not to block.

type PublishSettings

type PublishSettings struct {
	// EnableMessageOrdering enables delivery of ordered keys.
	EnableMessageOrdering bool
	EnableAck             bool

	// Publish a non-empty batch after this delay has passed.
	DelayThreshold time.Duration

	// Publish a batch when it has this many messages. The maximum is
	// MaxPublishRequestCount.
	CountThreshold int

	// Publish a batch when its size in bytes reaches this value.
	ByteThreshold int

	// The number of goroutines used in each of the data structures that are
	// involved along the the Publish path. Adjusting this value adjusts
	// concurrency along the publish path.
	//
	// Defaults to a multiple of GOMAXPROCS.
	NumGoroutines int

	// The maximum time that the client will attempt to publish a bundle of messages.
	Timeout time.Duration

	AckMapTicker  time.Duration
	MaxRetryTimes int
	// The maximum number of bytes that the Bundler will keep in memory before
	// returning ErrOverflow.
	//
	// Defaults to DefaultPublishSettings.BufferedByteLimit.
	BufferedByteLimit int

	// if nil, no retry if no ack.
	RetryParams *retry.Params
	// if nil, drop deadletter.
	DeadLetterPolicy *deadpolicy.DeadLetterPolicy
}

PublishSettings control the bundling of published messages.

type Topic

type Topic struct {

	// Settings for publishing messages. All changes must be made before the
	// first call to Publish. The default is DefaultPublishSettings.
	// it means could not dynamically change and hot start.
	PublishSettings
	// contains filtered or unexported fields
}

func NewTopic

func NewTopic(topicName string, driverMetadata mq.Metadata, options ...Option) (*Topic, error)

new a topic and init it with the connection options

func (*Topic) Publish

func (t *Topic) Publish(ctx context.Context, msg *message.Message) *PublishResult

Publish publishes msg to the topic asynchronously. Messages are batched and sent according to the topic's PublishSettings. Publish never blocks.

Publish returns a non-nil PublishResult which will be ready when the message has been sent (or has failed to be sent) to the server.

Publish creates goroutines for batching and sending messages. These goroutines need to be stopped by calling t.Stop(). Once stopped, future calls to Publish will immediately return a PublishResult with an error. advice: don't resume so quickly like less than 10* millisecond of Bundler ticker flush setting. that's ensure all the message in bundle with the key return error when scheduler paused.

Warning: do not use incoming message pointer again that if message had been successfully add in the scheduler, message would be equal nil to gc.

Warning: when use ordering feature, recommend to limit the QPS to 100, or use synchronous

func (*Topic) ResumePublish

func (t *Topic) ResumePublish(orderingKey string)

ResumePublish resumes accepting messages for the provided ordering key. Publishing using an ordering key might be paused if an error is encountered while publishing, to prevent messages from being published out of order.

func (*Topic) Stop

func (t *Topic) Stop()

Stop sends all remaining published messages and stop goroutines created for handling publishing. Returns once all outstanding messages have been sent or have failed to be sent.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL