Documentation
¶
Index ¶
- Constants
- Variables
- func GetTimeUntilMidnight() time.Duration
- type BaseConfig
- type Consumer
- func (c *Consumer) AddConcurrentHandlers(handler MessageHandler, concurrency int32)
- func (c *Consumer) OnFinish(msg *Message)
- func (c *Consumer) OnRequeue(m *Message, delay time.Duration)
- func (c *Consumer) OnTouch(*Message)
- func (c *Consumer) SendMessage(msg *Message) error
- func (c *Consumer) StartDeferredTimeout(msg *Message, delay time.Duration)
- func (c *Consumer) Stop()
- func (c *Consumer) WaitMessage() <-chan struct{}
- type ConsumerHandlerFunc
- type DB
- func (d *DB) AutoMigrate() error
- func (d *DB) Count() (int, error)
- func (d *DB) Create(value *Message) error
- func (d *DB) DeleteCompletedMessagesOlderThan(days int) error
- func (d *DB) DeleteOldMessages(days int) error
- func (d *DB) DriverName() string
- func (d *DB) FetchPendingMessages(id int, msgs *[]Message) error
- type EventBus
- type EventHandler
- type EventMessage
- func (m *EventMessage[T]) Clone() cloner
- func (m *EventMessage[T]) DisableAutoResponse()
- func (m *EventMessage[T]) Finish()
- func (m *EventMessage[T]) HasResponded() bool
- func (m *EventMessage[T]) IsAutoResponseDisabled() bool
- func (m *EventMessage[T]) Requeue(delay time.Duration)
- func (m *EventMessage[T]) Touch()
- type EventMessageDelegate
- type Message
- type MessageDelegate
- type MessageHandler
- type NSQite
- func (n *NSQite) AddConsumer(c *Consumer)
- func (n *NSQite) Close() error
- func (n *NSQite) DelConsumer(topic, channel string)
- func (n *NSQite) Finish(msg *Message, channel string) error
- func (n *NSQite) Publish(topic string, msg *Message) error
- func (n *NSQite) PublishTx(tx SessionFunc, topic string, msg *Message) error
- type Option
- type Producer
- type Publisher
- type SessionFunc
- type Subscriber
- func (s *Subscriber[T]) AddConcurrentHandlers(handler EventHandler[T], concurrency int32)
- func (s *Subscriber[T]) GetChannel() string
- func (s *Subscriber[T]) GetTopic() string
- func (s *Subscriber[T]) OnFinish(_ *EventMessage[T])
- func (s *Subscriber[T]) OnRequeue(m *EventMessage[T], delay time.Duration)
- func (s *Subscriber[T]) OnTouch(*EventMessage[T])
- func (s *Subscriber[T]) SendMessage(ctx context.Context, msg interface{}) error
- func (s *Subscriber[T]) StartDeferredTimeout(msg *EventMessage[T], delay time.Duration)
- func (s *Subscriber[T]) Stop()
- func (s *Subscriber[T]) Wait()
- func (s *Subscriber[T]) WaitMessage()
- type SubscriberHandlerFunc
- type SubscriberInfo
Constants ¶
const ( DriverNameSQLite = "sqlite" DriverNamePostgres = "postgres" DriverNameOther = "other" )
const DefaultMaxMessageRows = 10000
DefaultMaxMessageRows 定义消息表的默认最大行数 当消息表行数超过此值时,将仅保留最近3天的消息
const DefaultMaxRetentionDays = 7
DefaultMaxRetentionDays 定义消息的默认最大保留天数为7天 超过此天数的消息将被自动清理
Variables ¶
var (
ErrNoConsumer = errors.New("need consumers")
)
Functions ¶
func GetTimeUntilMidnight ¶ added in v1.0.0
GetTimeUntilMidnight 返回距离下一个凌晨12点的时间间隔
Types ¶
type BaseConfig ¶ added in v1.0.1
type BaseConfig struct {
// contains filtered or unexported fields
}
BaseConfig 基础配置实现
type Consumer ¶
type Consumer struct {
*BaseConfig
// contains filtered or unexported fields
}
Consumer 表示一个消息消费者
func NewConsumer ¶
NewConsumer 创建一个新的Consumer
func (*Consumer) AddConcurrentHandlers ¶ added in v1.0.0
func (c *Consumer) AddConcurrentHandlers(handler MessageHandler, concurrency int32)
AddConcurrentHandlers 添加并发处理程序
func (*Consumer) SendMessage ¶ added in v1.0.0
func (*Consumer) StartDeferredTimeout ¶ added in v1.0.0
StartDeferredTimeout 开始延迟处理消息
func (*Consumer) Stop ¶
func (c *Consumer) Stop()
Stop the consumers will not receive messages again 1. Stop receiving new messages 2. Clear the deferred retry queue data 3. Gradually stop the goroutines started by the current consumer
func (*Consumer) WaitMessage ¶ added in v1.0.0
func (c *Consumer) WaitMessage() <-chan struct{}
WaitMessage 等待所有消息处理完成,方便测试消息处理进度,不建议在生产环境中使用
type ConsumerHandlerFunc ¶ added in v1.0.0
ConsumerHandler 提供一个默认的 MessageHandler 实现,放入函数即可
func (ConsumerHandlerFunc) HandleMessage ¶ added in v1.0.0
func (h ConsumerHandlerFunc) HandleMessage(msg *Message) error
type DB ¶ added in v1.1.0
func (*DB) AutoMigrate ¶ added in v1.1.0
AutoMigrate init database table 1. create table nsqite_messages if not exists 如果你使用 gorm,可使用 gorm.AutoMigrate(new(nsqite.Message)) 初始化 如果你使用 goddd,可使用以下方式
if orm.EnabledAutoMigrate { if err := uc.DB.AutoMigrate(new(nsqite.Message)); err != nil { panic(err) } }
func (*DB) DeleteCompletedMessagesOlderThan ¶ added in v1.1.0
func (*DB) DeleteOldMessages ¶ added in v1.1.0
func (*DB) DriverName ¶ added in v1.1.0
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
func (*EventBus) AddSubscriber ¶ added in v1.0.0
func (s *EventBus) AddSubscriber(c SubscriberInfo)
func (*EventBus) DelConsumer ¶
func (s *EventBus) DelConsumer(c SubscriberInfo)
func (*EventBus) GetConsumer ¶ added in v1.0.2
type EventHandler ¶
type EventHandler[T any] interface { HandleMessage(message *EventMessage[T]) error }
type EventMessage ¶
type EventMessage[T any] struct { ID uint64 Body T Timestamp int64 Attempts uint32 Delegate EventMessageDelegate[T] // contains filtered or unexported fields }
func (*EventMessage[T]) Clone ¶ added in v1.0.0
func (m *EventMessage[T]) Clone() cloner
func (*EventMessage[T]) DisableAutoResponse ¶
func (m *EventMessage[T]) DisableAutoResponse()
DisableAutoResponse 禁用自动完成
func (*EventMessage[T]) HasResponded ¶
func (m *EventMessage[T]) HasResponded() bool
HasResponded 消息是否已得到处理,true 表示已得到处理
func (*EventMessage[T]) IsAutoResponseDisabled ¶
func (m *EventMessage[T]) IsAutoResponseDisabled() bool
IsAutoResponseDisabled 是否禁止自动完成, true 表示禁止
func (*EventMessage[T]) Requeue ¶
func (m *EventMessage[T]) Requeue(delay time.Duration)
Requeue 重新入队
type EventMessageDelegate ¶
type EventMessageDelegate[T any] interface { // OnFinish is called when the Finish() method // is triggered on the Message OnFinish(*EventMessage[T]) // OnRequeue is called when the Requeue() method // is triggered on the Message OnRequeue(m *EventMessage[T], delay time.Duration) // OnTouch is called when the Touch() method // is triggered on the Message // 超时应该由消息订阅者来处理 OnTouch(*EventMessage[T]) }
MessageDelegate is an interface of methods that are used as callbacks in Message
type Message ¶
type Message struct {
ID int `gorm:"primaryKey" json:"id"`
Timestamp time.Time `gorm:"column:timestamp;notNull;default:CURRENT_TIMESTAMP;index;comment:创建时间" json:"timestamp"` // 创建时间
Topic string `gorm:"notNull;index;default:''" json:"topic"` // 消息所属的 topic
Body []byte `gorm:"notNull" json:"body"` // 消息内容
// 订阅数和完成数
Consumers uint32 `gorm:"notNull;default:0;index:idx_messages_consumers_responded" json:"consumers"` // 消息订阅数
Responded uint32 `gorm:"notNull;default:0;index:idx_messages_consumers_responded" json:"responded"` // 消息响应完成数
Channels string `gorm:"notNull;default:''" json:"channels"` // 消息订阅通道
RespondedChannels string `gorm:"notNull;default:''" json:"responded_channels"` // 消息响应完成通道
Attempts uint32 `gorm:"notNull;default:0" json:"attempts"` // 消息重试次数
// runtime
Delegate MessageDelegate `gorm:"-" json:"-"`
// contains filtered or unexported fields
}
Message 表示一条消息
func (*Message) DisableAutoResponse ¶
func (m *Message) DisableAutoResponse()
DisableAutoResponse 禁止自动完成
func (*Message) EnableAutoResponse ¶
func (m *Message) EnableAutoResponse()
EnableAutoResponse 允许自动完成
func (*Message) IsAutoResponseDisabled ¶
IsAutoResponseDisabled 是否禁止自动完成
type MessageDelegate ¶
type MessageDelegate interface {
OnFinish(message *Message)
OnRequeue(message *Message, delay time.Duration)
OnTouch(message *Message)
}
MessageDelegate 消息代理接口
type MessageHandler ¶
MessageHandler 定义消息处理函数类型
type NSQite ¶
type NSQite struct {
// contains filtered or unexported fields
}
NSQite 是消息队列的核心结构
func TransactionMQ ¶ added in v1.0.0
func TransactionMQ() *NSQite
func (*NSQite) AddConsumer ¶ added in v1.0.0
func (*NSQite) DelConsumer ¶ added in v1.0.0
type Option ¶ added in v1.0.1
type Option func(*BaseConfig)
Option 统一的配置选项
func WithCheckTimeout ¶
WithCheckTimeout 设置消息检查超时时间
func WithDiscardOnBlocking ¶ added in v1.0.0
WithDiscardOnBlocking 设置是否丢弃消息,当队列已满时
func WithMaxAttempts ¶
WithMaxAttempts 设置消息最大重试次数,其次数是最终回调函数执行次数
func WithQueueSize ¶
WithQueueSize 设置消息队列大小 WithQueueSize(0) 表示无缓冲队列
type Producer ¶
type Producer struct{}
Producer 表示一个消息生产者
type Publisher ¶
type Publisher[T any] struct{}
Publisher 消息发布者
type SessionFunc ¶ added in v1.1.0
type Subscriber ¶
type Subscriber[T any] struct { *BaseConfig // contains filtered or unexported fields }
Subscriber 消息订阅者
func NewSubscriber ¶
func NewSubscriber[T any](topic, channel string, opts ...Option) *Subscriber[T]
NewSubscriber 创建消息订阅者
func (*Subscriber[T]) AddConcurrentHandlers ¶
func (s *Subscriber[T]) AddConcurrentHandlers(handler EventHandler[T], concurrency int32)
AddConcurrentHandlers 添加并发处理程序
func (*Subscriber[T]) GetChannel ¶
func (s *Subscriber[T]) GetChannel() string
GetChannel implements SubscriberInfo.
func (*Subscriber[T]) GetTopic ¶
func (s *Subscriber[T]) GetTopic() string
GetTopic implements SubscriberInfo.
func (*Subscriber[T]) OnFinish ¶
func (s *Subscriber[T]) OnFinish(_ *EventMessage[T])
OnFinish implements MessageDelegate.
func (*Subscriber[T]) OnRequeue ¶
func (s *Subscriber[T]) OnRequeue(m *EventMessage[T], delay time.Duration)
OnRequeue implements MessageDelegate.
func (*Subscriber[T]) OnTouch ¶
func (s *Subscriber[T]) OnTouch(*EventMessage[T])
OnTouch implements MessageDelegate.
func (*Subscriber[T]) SendMessage ¶
func (s *Subscriber[T]) SendMessage(ctx context.Context, msg interface{}) error
SendMessage implements SubscriberInfo.
func (*Subscriber[T]) StartDeferredTimeout ¶
func (s *Subscriber[T]) StartDeferredTimeout(msg *EventMessage[T], delay time.Duration)
StartDeferredTimeout 开始延迟处理消息
func (*Subscriber[T]) WaitMessage ¶
func (s *Subscriber[T]) WaitMessage()
WaitMessage waits for all messages to be processed, convenient for testing message processing progress, not recommended for use in production environments 等待所有消息处理完成,方便测试消息处理进度,不建议在生产环境中使用
type SubscriberHandlerFunc ¶ added in v1.0.0
type SubscriberHandlerFunc[T any] func(message *EventMessage[T]) error
SubscriberHandler 是一个适配器,允许使用普通函数作为 EventHandler
func (SubscriberHandlerFunc[T]) HandleMessage ¶ added in v1.0.0
func (s SubscriberHandlerFunc[T]) HandleMessage(msg *EventMessage[T]) error



