topic

package
v0.0.0-...-f80b70b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2022 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Publish logic select the code form Google Code with MIT. https://github.com/googleapis/google-cloud-go/blob/master/pubsub/topic.go

Index

Constants

View Source
const (
	// MaxPublishRequestCount is the maximum number of messages that can be in
	// a single publish request, as defined by the PubSub service.
	MaxPublishRequestCount = 1000

	// MaxPublishRequestBytes is the maximum size of a single publish request
	// in bytes, as defined by the PubSub service.
	MaxPublishRequestBytes = 1e7 // 10m

	// prefix use to specific to other client use the same MQ.
	AckTopicPrefix = "ack_"
	PulsePrefix    = "p_"
)

Variables

View Source
var DefaultPublishSettings = Settings{
	DelayThreshold: 10 * time.Millisecond,
	CountThreshold: 100,
	ByteThreshold:  1e6,
	NumGoroutines:  100 * runtime.GOMAXPROCS(0),
	Timeout:        60 * time.Second,

	BufferedByteLimit: 10 * MaxPublishRequestBytes,
}

DefaultPublishSettings holds the default values for topics' Settings.

Functions

This section is empty.

Types

type BundleTopic

type BundleTopic struct {
	*pubsub.PubSub
	visitor.Middleware

	// Settings for publishing messages. All changes must be made before the
	// first call to Publish. The default is DefaultPublishSettings.
	// it means could not dynamically change and hot start.
	Settings
	// contains filtered or unexported fields
}

func NewTopic

func NewTopic(driverMetadata protocol.Metadata, options ...Option) (*BundleTopic, error)

NewTopic new a topic and init it with the connection options

func (*BundleTopic) Publish

func (t *BundleTopic) Publish(ctx context.Context, msg *protocol.Message) *aresult.Result

Publish publishes msg to the topic asynchronously. Messages are batched and sent according to the topic's Settings. Publish never blocks.

Publish returns a non-nil aresult.Result which will be ready when the protocol has been sent (or has failed to be sent) to the server.

Publish creates goroutines for batching and sending messages. These goroutines need to be stopped by calling t.Stop(). Once stopped, future calls to Publish will immediately return a aresult.Result with an error. advice: don't resume so quickly like less than 10* millisecond of Bundler ticker flush setting. that's ensure all the protocol in bundle with the key return error when scheduler paused.

Warning: do not use incoming protocol pointer again that if protocol had been successfully add in the scheduler, protocol would be equal nil to gc.

Warning: when use ordering feature, recommend to limit the QPS to 100, or use synchronous

func (*BundleTopic) ResumePublish

func (t *BundleTopic) ResumePublish(orderingKey string)

ResumePublish resumes accepting messages for the provided ordering key. Publishing using an ordering key might be paused if an error is encountered while publishing, to prevent messages from being published out of order.

func (*BundleTopic) Stop

func (t *BundleTopic) Stop()

Stop sends all remaining published messages and stop goroutines created for handling publishing. Returns once all outstanding messages have been sent or have failed to be sent.

type Option

type Option func(*BundleTopic) error

func WithMiddlewares

func WithMiddlewares(middlewares visitor.Middleware) Option

func WithOrdered

func WithOrdered() Option

WithRequiredACK would turn on the ack function.

type Settings

type Settings struct {
	// EnableMessageOrdering enables delivery of ordered keys.
	EnableMessageOrdering bool

	// Publish a non-empty batch after this delay has passed.
	DelayThreshold time.Duration

	// Publish a batch when it has this many messages. The maximum is
	// MaxPublishRequestCount.
	CountThreshold int

	// Publish a batch when its size in bytes reaches this value.
	ByteThreshold int

	// The number of goroutines used in each of the data structures that are
	// involved along the the Publish path. Adjusting this value adjusts
	// concurrency along the publish path.
	//
	// Defaults to a multiple of GOMAXPROCS.
	NumGoroutines int

	// The maximum time that the client will attempt to publish a bundle of messages.
	Timeout time.Duration

	// The maximum number of bytes that the Bundler will keep in memory before
	// returning ErrOverflow.
	//
	// Defaults to DefaultPublishSettings.BufferedByteLimit.
	BufferedByteLimit int
}

Settings control the bundling of published messages.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL