pulsar

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2023 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewBroker

func NewBroker(config Config) (broker.IBroker, error)

Types

type Broker

type Broker struct {
	*broker.Broker
	// contains filtered or unexported fields
}

func (*Broker) Close

func (b *Broker) Close() error

func (*Broker) Send

func (b *Broker) Send(ctx context.Context, name string, value interface{}) error

func (*Broker) SendDelay

func (b *Broker) SendDelay(ctx context.Context, name string, value interface{}, delay time.Duration) error

func (*Broker) Worker

func (b *Broker) Worker() error

type Config

type Config struct {
	// Configure the service URL for the Pulsar service.
	// This parameter is required
	URL string
	// Timeout for the establishment of a TCP connection (default: 5 seconds)
	ConnectionTimeout time.Duration
	// Set the operation timeout (default: 30 seconds)
	// Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the
	// operation will be marked as failed
	OperationTimeout time.Duration
	// Max number of connections to a single broker that will kept in the pool. (Default: 1 connection)
	MaxConnectionsPerBroker int
	// Authentication provider with specified auth token
	AuthToken string

	// Topic specify the topic this producer will be publishing on.
	// Specify the topic this consumer will subscribe on.
	// Either a topic, a list of topics or a topics pattern are required when subscribing
	Topic string
	// Specify the subscription name for this consumer
	// This argument is required when subscribing
	SubscriptionName string
	// Sets the size of the consumer receive queue.
	// The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the
	// application handler. Using a higher value could potentially increase the consumer
	// throughput at the expense of bigger memory utilization.
	// Default value is `1000` messages and should be good for most use cases.
	ReceiverQueueSize int
	// The delay after which to redeliver the messages that failed to be
	// processed. Default is 1min.
	NackRedeliveryDelay time.Duration
	// Auto retry send messages to default filled DLQPolicy topics
	// default RetryTopic: SubscriptionName+"-RETRY", DlqTopic: SubscriptionName+"-DLQ", MaxReconsumeTimes = 16
	RetryEnable bool
	// Custom RetryTopic,DlqTopic,MaxReconsumeTimes
	DLQ *DLQPolicy
	// Define the concurrency number of worker processes, default runtime.NumCPU()*2
	Concurrency int
	// Custom codec
	Codec codec.Codec
	// A Logger represents an active logging object that generates lines of output to an io.Writer
	Logger log.ILogger
}

type DLQPolicy

type DLQPolicy struct {
	// Maximum number of times that a message will be delivered before being sent to the dead letter queue.
	MaxDeliveries uint32
	// Name of the topic where the failing messages will be sent.
	DeadLetterTopic string
	// Name of the topic where the retry messages will be sent.
	RetryLetterTopic string
}

DLQPolicy Configuration for Dead Letter Queue consumer policy

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL