broker

package module
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2025 License: MIT Imports: 4 Imported by: 0

README

broker

go broker interface,you can use kafka,redis,pulsar etc.

pulsar in docker

https://pulsar.apache.org/docs/2.11.x/getting-started-docker/

docker rm -f `docker ps -a | grep pulsar-server | awk '{print $1}'`
docker run -idt \
--name pulsar-server \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:2.9.5 \
bin/pulsar standalone

pulsar-go

https://pulsar.apache.org/docs/zh-CN/client-libraries-go/

usage

For specific usage, refer to gpulsar/gredis test
kafka consumer groups require Version to be >= V0_10_2_0
if lower than V0_10_2_0, please use go-god/broker v1.1.2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DummyLogger = LoggerFunc(func(string, ...interface{}) {})

DummyLogger dummy logger writes nothing.

Functions

func ParseMessage

func ParseMessage(msg interface{}) ([]byte, error)

ParseMessage parse msg

func Recovery

func Recovery(logger Logger)

Recovery catch go runtime panic

Types

type Broker

type Broker interface {
	// Publish pub message to topic
	Publish(ctx context.Context, topic string, msg interface{}, opts ...PubOption) error

	// Subscribe sub message from topic + channel
	Subscribe(ctx context.Context, topic string, channel string, subHandler SubHandler, opts ...SubOption) error

	// Shutdown graceful shutdown broker
	Shutdown(ctx context.Context) error
}

Broker broker interface

type Logger

type Logger interface {
	Printf(string, ...interface{})
}

Logger is logger interface.

type LoggerFunc

type LoggerFunc func(string, ...interface{})

LoggerFunc is a bridge between Logger and any third party logger.

func (LoggerFunc) Printf

func (f LoggerFunc) Printf(msg string, args ...interface{})

Printf implements Logger interface.

type Option

type Option func(o *Options)

Option options functional option

func WithAuthToken

func WithAuthToken(token string) Option

WithAuthToken set broker token eg:pulsar broker

func WithBrokerAddress

func WithBrokerAddress(address ...string) Option

WithBrokerAddress set broker address

func WithBrokerPrefix

func WithBrokerPrefix(prefix string) Option

WithBrokerPrefix set broker prefix

func WithConnectionTimeout

func WithConnectionTimeout(t time.Duration) Option

WithConnectionTimeout set broker connection timeout

func WithConsumerAutoCommitInterval added in v1.3.8

func WithConsumerAutoCommitInterval(interval time.Duration) Option

WithConsumerAutoCommitInterval set consumer auto commit interval.

func WithGracefulWait

func WithGracefulWait(t time.Duration) Option

WithGracefulWait set sub graceful exit time

func WithListenerName

func WithListenerName(name string) Option

WithListenerName set broker listener name

func WithLogger

func WithLogger(logger Logger) Option

WithLogger set broker logger

func WithMaxConnectionsPerBroker

func WithMaxConnectionsPerBroker(num int) Option

WithMaxConnectionsPerBroker set max connection

func WithNoDataWaitSec

func WithNoDataWaitSec(sec int) Option

WithNoDataWaitSec no data wait second

func WithOperationTimeout

func WithOperationTimeout(t time.Duration) Option

WithOperationTimeout set broker op timeout

func WithPassword

func WithPassword(pwd string) Option

WithPassword set broker password

func WithRedisConf

func WithRedisConf(conf *RedisConf) Option

WithRedisConf with redis config

func WithUser

func WithUser(user string) Option

WithUser set broker user

type Options

type Options struct {
	Address  []string // client connection address list
	Prefix   string   // client mq prefix
	User     string   // user
	Password string   // password

	// ========pulsar mq===============
	// ListenerName Configure the net model for vpc user to connect the pulsar broker
	ListenerName string
	// AuthToken auth token
	AuthToken string
	// OperationTimeout operation timeout
	OperationTimeout time.Duration
	// ConnectionTimeout timeout for the establishment of a TCP connection (default: 10 seconds)
	ConnectionTimeout time.Duration

	// MaxConnectionsPerBroker the max number of connections to a single broker
	// that will keep in the pool. (Default: 1 connection)
	// this param for pulsar connection per broker
	MaxConnectionsPerBroker int

	// =======redis mq================
	RedisConf *RedisConf

	// graceful exit time
	GracefulWait time.Duration

	// no data wait second
	NoDataWaitSec int

	// ConsumerAutoCommitInterval consumer auto commit interval (default: 1s)
	ConsumerAutoCommitInterval time.Duration

	// Logger logger
	Logger Logger
}

Options broker option

type PubOption

type PubOption func(p *PublishOptions)

PubOption publish option

func WithDisableBatching

func WithDisableBatching() PubOption

WithDisableBatching disable batch publish

func WithPublishDelay

func WithPublishDelay(t time.Duration) PubOption

WithPublishDelay set publish delay time

func WithPublishName

func WithPublishName(name string) PubOption

WithPublishName set publish script name

func WithSendTimeout

func WithSendTimeout(t time.Duration) PubOption

WithSendTimeout set publish send msg timeout

type PublishOptions

type PublishOptions struct {
	// PublishDelay specifies the time period within which the messages sent will be batched (default: 10ms)
	// if message is enabled. If set to a no zero value, messages will be queued until this time
	// interval or until
	PublishDelay time.Duration

	// Name specifies a name for the producer.
	// if you use pulsar mq,if not assigned, the system will generate
	// a globally unique name which can be access with
	// Producer.ProducerName().
	//
	// for kafka publish message key
	// The partitioning key for this message. Pre-existing Encoders include
	// StringEncoder and ByteEncoder.
	Name string

	// DisableBatching controls whether automatic batching of messages is enabled for the producer.
	// default batching is enabled.
	// When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
	// broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
	// messages will be compressed at the batch level, leading to a much better compression ratio
	// for similar headers or contents.
	// When enabled default batch delay is set to 1 ms and default batch size is 1000 messages
	// Setting `DisableBatching: true` will make the producer to send messages individually
	DisableBatching bool

	// SendTimeout specifies the timeout for a message that has not been acknowledged by the server since sent.
	// Send and SendAsync returns an error after timeout.
	// Default is 30 seconds, negative such as -1 to disable.
	SendTimeout time.Duration
}

PublishOptions publish message option

type RedisConf

type RedisConf struct {
	// host:port address.
	Address string

	// Optional password. Must match the password specified in the
	// require pass server configuration option.
	Password string

	// Database to be selected after connecting to the server.
	DB int

	// Maximum number of retries before giving up.
	// Default is to not retry failed commands.
	MaxRetries int

	// Dial timeout for establishing new connections.
	// Default is 5 seconds.
	DialTimeout time.Duration

	// Timeout for socket reads. If reached, commands will fail
	// with a timeout instead of blocking. Use value -1 for no timeout and 0 for default.
	// Default is 3 seconds.
	ReadTimeout time.Duration

	// Timeout for socket writes. If reached, commands will fail
	// with a timeout instead of blocking.
	// Default is ReadTimeout.
	WriteTimeout time.Duration

	// Maximum number of socket connections.
	// Default is 10 connections per every CPU as reported by runtime.NumCPU.
	PoolSize int

	// Amount of time client waits for connection if all connections
	// are busy before returning an error.
	// Default is ReadTimeout + 1 second.
	PoolTimeout time.Duration

	// Minimum number of idle connections which is useful when establishing
	// new connection is slow.
	MinIdleConns int

	// ConnMaxIdleTime is the maximum amount of time a connection may be idle.
	// Should be less than server's timeout.
	//
	// Expired connections may be closed lazily before reuse.
	// If d <= 0, connections are not closed due to a connection's idle time.
	//
	// Default is 30 minutes. -1 disables idle timeout check.
	ConnMaxIdleTime time.Duration

	// ConnMaxLifetime is the maximum amount of time a connection may be reused.
	//
	// Expired connections may be closed lazily before reuse.
	// If <= 0, connections are not closed due to a connection's age.
	//
	// Default is 1800s
	ConnMaxLifetime time.Duration
}

RedisConf redis client config

type SubHandler

type SubHandler func(ctx context.Context, value []byte) error

SubHandler subscribe func

type SubOption

type SubOption func(s *SubscribeOptions)

SubOption subscribe option

func WithCommitOffsetBlock added in v1.3.7

func WithCommitOffsetBlock() SubOption

WithCommitOffsetBlock commit offset block when message consumer.

func WithSubBufferSize added in v1.4.0

func WithSubBufferSize(size int) SubOption

WithSubBufferSize set consume msg buffer size

func WithSubConsumeMsgGoroutines added in v1.4.0

func WithSubConsumeMsgGoroutines(size int) SubOption

WithSubConsumeMsgGoroutines set consume msg from buffer goroutines

func WithSubEnableBuffer added in v1.4.0

func WithSubEnableBuffer() SubOption

WithSubEnableBuffer enable consume msg buffer

func WithSubInterval

func WithSubInterval(t time.Duration) SubOption

WithSubInterval set sub interval

func WithSubKeyHandlers added in v1.3.6

func WithSubKeyHandlers(keyHandlers map[string]SubHandler) SubOption

WithSubKeyHandlers set sub key => subHandler map

func WithSubMessageChannel added in v1.4.0

func WithSubMessageChannel() SubOption

WithSubMessageChannel set sub message channel

func WithSubMessageChannelSize added in v1.4.0

func WithSubMessageChannelSize(size int) SubOption

WithSubMessageChannelSize set sub message channel size

func WithSubName

func WithSubName(name string) SubOption

WithSubName set sub name

func WithSubOffset

func WithSubOffset(offset int64) SubOption

WithSubOffset set sub offset

func WithSubPullMsgGoroutines added in v1.4.0

func WithSubPullMsgGoroutines(size int) SubOption

WithSubPullMsgGoroutines set goroutines for pull msg,default:1

func WithSubRetryEnable

func WithSubRetryEnable() SubOption

WithSubRetryEnable set sub retry

func WithSubTopics added in v1.4.0

func WithSubTopics(topics []string) SubOption

WithSubTopics set sub topics

func WithSubType

func WithSubType(t SubscriptionType) SubOption

WithSubType set subType

type SubscribeOptions

type SubscribeOptions struct {
	// specifies the consumer name,like kafka consume group id
	Name string

	// KeyHandlers for kafka consumer message key handler map
	// for redis sub,you can specify different message subscriber functions to handle msg.
	KeyHandlers map[string]SubHandler

	// consume msg from multiple topics
	Topics []string

	// pull msg goroutines for subscribe,default:1
	PullMsgGoroutines int

	// for kafka whether put the message in the buffer pool first when consuming it
	EnableBuffer         bool
	BufferSize           int
	ConsumeMsgGoroutines int

	// for pulsar mq receive messages from channel.
	// The channel returns a struct which contains message and the consumer from where
	// the message was received. It's not necessary here since we have 1 single consumer, but the channel could be
	// shared across multiple consumers as well
	MessageChannel     bool // default:false
	MessageChannelSize int  // default:100

	Offset int64

	// Commit the offset to the backend for kafka
	// Note: calling Commit performs a blocking synchronous operation.
	CommitOffsetBlock bool

	// SubInterval subscribe interval,default:0
	SubInterval time.Duration

	// ===========pulsar mq=======
	// subType specifies the subscription type to be used when subscribing to a topic.
	// Default is `Shared` 1:N
	// Exclusive there can be only 1 consumer on the same topic with the same subscription name
	//
	// Shared 1:N
	// Shared subscription mode, multiple consumer will be able to use the same subscription name
	// and the messages will be dispatched according to
	//
	// Failover subscription mode, multiple consumer will be able to use the same subscription name
	// but only 1 consumer will receive the messages.
	// If that consumer disconnects, one of the other connected consumers will start receiving messages.
	SubType SubscriptionType

	// ReceiverQueueSize sets the size of the consumer receive queue.
	// The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the
	// application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer
	// throughput at the expense of bigger memory utilization.
	// Default value is `1000` messages and should be good for most use cases.
	ReceiverQueueSize int

	// retryEnable for pulsar sub RetryEnable
	RetryEnable bool
}

SubscribeOptions subscribe message option

type SubscriptionType

type SubscriptionType int

SubscriptionType of subscription supported by Pulsar

const (
	// Exclusive there can be only 1 consumer on the same topic with the same subscription name
	Exclusive SubscriptionType = iota

	// Shared subscription mode, multiple consumer will be able to use the same subscription name
	// and the messages will be dispatched according to
	// a round-robin rotation between the connected consumers
	Shared

	// Failover subscription mode, multiple consumer will be able to use the same subscription name
	// but only 1 consumer will receive the messages.
	// If that consumer disconnects, one of the other connected consumers will start receiving messages.
	Failover

	// KeyShared subscription mode, multiple consumer will be able to use the same
	// subscription and all messages with the same key will be dispatched to only one consumer
	KeyShared
)

Directories

Path Synopsis
Package backoff provides backoff functionality
Package backoff provides backoff functionality

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL