nsqite

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2025 License: MIT Imports: 12 Imported by: 0

README

NSQite Logo

中文 English

A lightweight message queue implemented in Go, supporting SQLite, PostgreSQL, and ORM as persistent storage.

Introduction

In the early stages of a project, you might not need large message queue systems like NSQ、NATs or Pulsar. NSQite provides a simple and reliable solution to meet basic message queue requirements.

NSQite supports multiple storage methods:

  • SQLite as message queue persistence
  • PostgreSQL as message queue persistence

NSQite's API design is similar to go-nsq, making it easy to upgrade to NSQ in the future for higher concurrency support.

Note: NSQite guarantees at-least-once message delivery, which means duplicate messages may occur. Consumers need to implement deduplication or idempotent operations.

Quick Start

Event Bus

Suitable for business scenarios in monolithic architectures, implemented in memory, supporting 1:N publisher-subscriber relationships, including task failure delay retry mechanism.

Use cases:

  • Monolithic architecture
  • Real-time notification to subscribers
  • Message compensation after service restart
  • Support for generic message bodies

Example scenario: When a system alert occurs, it needs to be recorded in the database and notified to clients via WebSocket

  1. Database logging module subscribes to the alert topic
  2. WebSocket notification module subscribes to the alert topic
  3. When an alert occurs, the publisher sends the alert message
  4. Both subscribers process the message respectively

The event bus decouples modules, transforming imperative programming into an event-driven architecture.

About message ordering:

  • When the subscriber goroutine count is 1 and the handler always returns nil, NSQite guarantees message ordering
  • In other cases (concurrent processing or message retry), NSQite cannot guarantee message order
type Reader1 struct{}

// HandleMessage implements Handler.
func (r *Reader1) HandleMessage(message *EventMessage[string]) error {
	time.Sleep(time.Second)
	fmt.Println("reader one :", message.Body)
	return nil
}

// Simulate an author writing books frantically, with 5 editors processing one book per second
func main() {
	const topic = "a-book"
	p := NewPublisher[string]()
	// Limit task failure retry attempts to 10 times
	c := NewSubscriber(topic, "comsumer1", WithMaxAttempts(10))
	c.AddConcurrentHandlers(&Reader1{}, 5)

	for i := 0; i < 5; i++ {
		// This function returns an error, but in normal pub/sub usage, errors are rare and can be ignored
		p.Publish(topic, fmt.Sprintf("a >> hello %d", i))
	}

	time.Sleep(2 * time.Second)
}

Manual completion

type Reader3 struct {
	receivedMessages sync.Map
	attemptCount     int32
}

// HandleMessage implements Handler.
func (r *Reader3) HandleMessage(message *EventMessage[string]) error {
	// Disable auto-completion
	message.DisableAutoResponse()
	if message.Body == "hello" || message.Attempts > 3 {
		// Manual completion
		r.receivedMessages.Store(message.Body, true)
		message.Finish()
		return nil
	}
	// Manual retry after 1 second delay
	atomic.AddInt32(&r.attemptCount, 1)
	message.Requeue(time.Second)
	return nil
}

Transactional Messages

Database-based implementation, supporting GORM/PGX..., with transactional message publishing, consisting of producers and consumers.

Use cases:

  • Monolithic or distributed architecture
  • Messages bound to database transactions, can be revoked when transaction rolls back
  • Fast message processing in monolithic architecture
  • Message delay of 100~5000ms in distributed architecture

Example scenario: When deleting a user, related data needs to be deleted simultaneously

  1. User profile module subscribes to user deletion topic
  2. Publish transactional message within the user deletion transaction
  3. After transaction commit, consumers receive and process the message
  4. If server crashes during processing
  5. After restart, consumers will receive and process the message again

Note: Messages may be triggered multiple times, consumers need to implement idempotent processing.

Code Examples

Basic Usage
type Reader1 struct{}

// HandleMessage implements Handler.
func (r *Reader1) HandleMessage(message *EventMessage[string]) error {
	time.Sleep(time.Second)
	fmt.Println("reader one :", message.Body)
	return nil
}

// Simulate an author writing books frantically, with 5 editors processing one book per second
func main() {
	// 1. SetDB
	if err := nsqite.SetDB(nsqite.DriverNameSQLite  db).AutoMigrate();err!=nil{
		panic(err)
	}

	const topic = "a-book"
	p := NewProducer()
	// 限制任务失败重试次数 10 次
	c := NewConsumer(topic, "comsumer1", WithMaxAttempts(10))
	c.AddConcurrentHandlers(&Reader1{}, 5)
	for i := 0; i < 5; i++ {
		p.Publish(topic, fmt.Appendf("a >> hello %d", i))
	}
	time.Sleep(2 * time.Second)
}

Maintenance and Optimization

NSQite uses slog for logging. If you see the following warning logs, you need to optimize parameters promptly:

  • [NSQite] publish message timeout: Indicates publishing is too fast for consumers to handle. Optimize by:
    • Increasing cache queue length
    • Increasing concurrent processing goroutines
    • Optimizing consumer handler performance

Default timeout is 3 seconds. If timeouts occur frequently, adjust the timeout using WithCheckTimeout(10*time.Second).

Benchmark

Event Bus

One publisher, one subscriber, 3 million concurrent messages per second

Transactional Message Queue

One producer, one consumer, based on SQLite database, performance is barely satisfactory. PostgreSQL will provide better performance

Next Development Tasks

  • Event Bus support for Redis as persistent storage, enabling distributed deployment
  • Transactional Message Queue support for distributed deployment, where consumers update the database after receiving messages

QA

What happens when subscriber b blocks among subscribers a, b, and c?

  • a receives messages normally
  • b blocks, causing c to not receive messages
  • b blocks, causing the publisher to block

Solutions:

  • Use WithDiscardOnBlocking(true) to discard messages
  • Use PublicWithContext(ctx, topic, message) to limit publishing timeout
  • Use WithQueueSize(1024) to set cache queue length
  • Optimize callbacks to make consumers process tasks faster

When using transactional messages, if messages are published and a, c have completed tasks, what happens when the service restarts with b not completed?

  • After service restart, b will receive the message again and continue processing
  • a and c won't receive the message again as they have already completed

Can I customize the delay time when a task fails?

What happens when a task keeps failing and reaches the maximum retry count? A task ends under two conditions:

  • Task execution succeeds
  • Task reaches maximum execution count For unlimited retries, use WithMaxAttempts(0). By default, it retries 10 times, but you can increase it with WithMaxAttempts(100)

If WithMaxAttempts(10) means 10 retries, how many times will the callback be executed if it keeps failing?

  • 10 times

How long will transactional messages be stored in the database?

  • Automatically deletes all messages older than 15 days
  • Automatically deletes completed messages older than 7 days
  • When table data exceeds 10,000 rows, automatically deletes completed messages older than 3 days

Need to customize these times? Please submit a PR or issue.

In the event bus, will continuous callback failures block the queue?

  • No, failed tasks will enter a priority queue for delayed processing
  • Large numbers of failed tasks will cause messages to accumulate in memory, and will be released when reaching maximum retry attempts

In the event bus, if publishing to one topic is blocked, will it affect publishing to other topics?

  • No, topics are independent of each other

Documentation

Index

Constants

View Source
const (
	DriverNameSQLite   = "sqlite"
	DriverNamePostgres = "postgres"
	DriverNameOther    = "other"
)
View Source
const DefaultMaxMessageRows = 10000

DefaultMaxMessageRows 定义消息表的默认最大行数 当消息表行数超过此值时,将仅保留最近3天的消息

View Source
const DefaultMaxRetentionDays = 7

DefaultMaxRetentionDays 定义消息的默认最大保留天数为7天 超过此天数的消息将被自动清理

Variables

View Source
var (
	ErrNoConsumer = errors.New("need consumers")
)

Functions

func GetTimeUntilMidnight added in v1.0.0

func GetTimeUntilMidnight() time.Duration

GetTimeUntilMidnight 返回距离下一个凌晨12点的时间间隔

Types

type BaseConfig added in v1.0.1

type BaseConfig struct {
	// contains filtered or unexported fields
}

BaseConfig 基础配置实现

type Consumer

type Consumer struct {
	*BaseConfig
	// contains filtered or unexported fields
}

Consumer 表示一个消息消费者

func NewConsumer

func NewConsumer(topic, channel string, opts ...Option) *Consumer

NewConsumer 创建一个新的Consumer

func (*Consumer) AddConcurrentHandlers added in v1.0.0

func (c *Consumer) AddConcurrentHandlers(handler MessageHandler, concurrency int32)

AddConcurrentHandlers 添加并发处理程序

func (*Consumer) OnFinish added in v1.0.0

func (c *Consumer) OnFinish(msg *Message)

OnFinish implements MessageDelegate.

func (*Consumer) OnRequeue added in v1.0.0

func (c *Consumer) OnRequeue(m *Message, delay time.Duration)

OnRequeue implements MessageDelegate.

func (*Consumer) OnTouch added in v1.0.0

func (c *Consumer) OnTouch(*Message)

OnTouch implements MessageDelegate.

func (*Consumer) SendMessage added in v1.0.0

func (c *Consumer) SendMessage(msg *Message) error

func (*Consumer) StartDeferredTimeout added in v1.0.0

func (c *Consumer) StartDeferredTimeout(msg *Message, delay time.Duration)

StartDeferredTimeout 开始延迟处理消息

func (*Consumer) Stop

func (c *Consumer) Stop()

Stop the consumers will not receive messages again 1. Stop receiving new messages 2. Clear the deferred retry queue data 3. Gradually stop the goroutines started by the current consumer

func (*Consumer) WaitMessage added in v1.0.0

func (c *Consumer) WaitMessage() <-chan struct{}

WaitMessage 等待所有消息处理完成,方便测试消息处理进度,不建议在生产环境中使用

type ConsumerHandlerFunc added in v1.0.0

type ConsumerHandlerFunc func(msg *Message) error

ConsumerHandler 提供一个默认的 MessageHandler 实现,放入函数即可

func (ConsumerHandlerFunc) HandleMessage added in v1.0.0

func (h ConsumerHandlerFunc) HandleMessage(msg *Message) error

type DB added in v1.1.0

type DB struct {
	*sql.DB
	// contains filtered or unexported fields
}

func SetDB added in v1.1.0

func SetDB(driverName string, g *sql.DB) *DB

SetDB init db or SetSQLite or SetPostgres

func SetPostgres added in v1.1.0

func SetPostgres(g *sql.DB) *DB

SetPostgres init db or SetSQLite

func SetSQLite added in v1.1.0

func SetSQLite(g *sql.DB) *DB

SetSQLite init db or SetPostgres

func (*DB) AutoMigrate added in v1.1.0

func (d *DB) AutoMigrate() error

AutoMigrate init database table 1. create table nsqite_messages if not exists 如果你使用 gorm,可使用 gorm.AutoMigrate(new(nsqite.Message)) 初始化 如果你使用 goddd,可使用以下方式

if orm.EnabledAutoMigrate { if err := uc.DB.AutoMigrate(new(nsqite.Message)); err != nil { panic(err) } }

func (*DB) Count added in v1.1.1

func (d *DB) Count() (int, error)

func (*DB) Create added in v1.1.0

func (d *DB) Create(value *Message) error

func (*DB) DeleteCompletedMessagesOlderThan added in v1.1.0

func (d *DB) DeleteCompletedMessagesOlderThan(days int) error

func (*DB) DeleteOldMessages added in v1.1.0

func (d *DB) DeleteOldMessages(days int) error

func (*DB) DriverName added in v1.1.0

func (d *DB) DriverName() string

func (*DB) FetchPendingMessages added in v1.1.0

func (d *DB) FetchPendingMessages(id int, msgs *[]Message) error

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

func (*EventBus) AddSubscriber added in v1.0.0

func (s *EventBus) AddSubscriber(c SubscriberInfo)

func (*EventBus) DelConsumer

func (s *EventBus) DelConsumer(c SubscriberInfo)

func (*EventBus) GetConsumer added in v1.0.2

func (s *EventBus) GetConsumer(topic string) (*subscriberMap, bool)

func (*EventBus) Publish

func (s *EventBus) Publish(ctx context.Context, topic string, msg cloner) error

type EventHandler

type EventHandler[T any] interface {
	HandleMessage(message *EventMessage[T]) error
}

type EventMessage

type EventMessage[T any] struct {
	ID        uint64
	Body      T
	Timestamp int64
	Attempts  uint32

	Delegate EventMessageDelegate[T]
	// contains filtered or unexported fields
}

func (*EventMessage[T]) Clone added in v1.0.0

func (m *EventMessage[T]) Clone() cloner

func (*EventMessage[T]) DisableAutoResponse

func (m *EventMessage[T]) DisableAutoResponse()

DisableAutoResponse 禁用自动完成

func (*EventMessage[T]) Finish

func (m *EventMessage[T]) Finish()

Finish 消息处理完成

func (*EventMessage[T]) HasResponded

func (m *EventMessage[T]) HasResponded() bool

HasResponded 消息是否已得到处理,true 表示已得到处理

func (*EventMessage[T]) IsAutoResponseDisabled

func (m *EventMessage[T]) IsAutoResponseDisabled() bool

IsAutoResponseDisabled 是否禁止自动完成, true 表示禁止

func (*EventMessage[T]) Requeue

func (m *EventMessage[T]) Requeue(delay time.Duration)

Requeue 重新入队

func (*EventMessage[T]) Touch

func (m *EventMessage[T]) Touch()

Touch 防止消息处理超时

type EventMessageDelegate

type EventMessageDelegate[T any] interface {
	// OnFinish is called when the Finish() method
	// is triggered on the Message
	OnFinish(*EventMessage[T])

	// OnRequeue is called when the Requeue() method
	// is triggered on the Message
	OnRequeue(m *EventMessage[T], delay time.Duration)

	// OnTouch is called when the Touch() method
	// is triggered on the Message
	// 超时应该由消息订阅者来处理
	OnTouch(*EventMessage[T])
}

MessageDelegate is an interface of methods that are used as callbacks in Message

type Message

type Message struct {
	ID        int       `gorm:"primaryKey" json:"id"`
	Timestamp time.Time `gorm:"column:timestamp;notNull;default:CURRENT_TIMESTAMP;index;comment:创建时间" json:"timestamp"` // 创建时间
	Topic     string    `gorm:"notNull;index;default:''" json:"topic"`                                                  // 消息所属的 topic
	Body      []byte    `gorm:"notNull" json:"body"`                                                                    // 消息内容
	// 订阅数和完成数
	Consumers         uint32 `gorm:"notNull;default:0;index:idx_messages_consumers_responded" json:"consumers"` // 消息订阅数
	Responded         uint32 `gorm:"notNull;default:0;index:idx_messages_consumers_responded" json:"responded"` // 消息响应完成数
	Channels          string `gorm:"notNull;default:''" json:"channels"`                                        // 消息订阅通道
	RespondedChannels string `gorm:"notNull;default:''" json:"responded_channels"`                              // 消息响应完成通道
	Attempts          uint32 `gorm:"notNull;default:0" json:"attempts"`                                         // 消息重试次数

	// runtime
	Delegate MessageDelegate `gorm:"-" json:"-"`
	// contains filtered or unexported fields
}

Message 表示一条消息

func (*Message) DisableAutoResponse

func (m *Message) DisableAutoResponse()

DisableAutoResponse 禁止自动完成

func (*Message) EnableAutoResponse

func (m *Message) EnableAutoResponse()

EnableAutoResponse 允许自动完成

func (*Message) Finish

func (m *Message) Finish()

Finish 消息处理完成

func (*Message) HasResponded

func (m *Message) HasResponded() bool

HasResponded 消息是否已得到处理

func (*Message) IsAutoResponseDisabled

func (m *Message) IsAutoResponseDisabled() bool

IsAutoResponseDisabled 是否禁止自动完成

func (*Message) Requeue

func (m *Message) Requeue(delay time.Duration)

Requeue 重新入队

func (*Message) TableName added in v1.0.0

func (*Message) TableName() string

func (*Message) Touch

func (m *Message) Touch()

Touch 消息处理中

type MessageDelegate

type MessageDelegate interface {
	OnFinish(message *Message)
	OnRequeue(message *Message, delay time.Duration)
	OnTouch(message *Message)
}

MessageDelegate 消息代理接口

type MessageHandler

type MessageHandler interface {
	HandleMessage(msg *Message) error
}

MessageHandler 定义消息处理函数类型

type NSQite

type NSQite struct {
	// contains filtered or unexported fields
}

NSQite 是消息队列的核心结构

func TransactionMQ added in v1.0.0

func TransactionMQ() *NSQite

func (*NSQite) AddConsumer added in v1.0.0

func (n *NSQite) AddConsumer(c *Consumer)

func (*NSQite) Close

func (n *NSQite) Close() error

Close 关闭NSQite实例

func (*NSQite) DelConsumer added in v1.0.0

func (n *NSQite) DelConsumer(topic, channel string)

func (*NSQite) Finish added in v1.0.0

func (n *NSQite) Finish(msg *Message, channel string) error

func (*NSQite) Publish

func (n *NSQite) Publish(topic string, msg *Message) error

Publish 发布消息到Topic

func (*NSQite) PublishTx

func (n *NSQite) PublishTx(tx SessionFunc, topic string, msg *Message) error

type Option added in v1.0.1

type Option func(*BaseConfig)

Option 统一的配置选项

func WithCheckTimeout

func WithCheckTimeout(checkTimeout time.Duration) Option

WithCheckTimeout 设置消息检查超时时间

func WithDiscardOnBlocking added in v1.0.0

func WithDiscardOnBlocking(v bool) Option

WithDiscardOnBlocking 设置是否丢弃消息,当队列已满时

func WithMaxAttempts

func WithMaxAttempts(maxAttempts uint32) Option

WithMaxAttempts 设置消息最大重试次数,其次数是最终回调函数执行次数

func WithQueueSize

func WithQueueSize(size uint16) Option

WithQueueSize 设置消息队列大小 WithQueueSize(0) 表示无缓冲队列

type Producer

type Producer struct{}

Producer 表示一个消息生产者

func NewProducer

func NewProducer() *Producer

NewProducer 创建一个新的Producer

func (*Producer) Publish

func (p *Producer) Publish(topic string, body []byte) error

Publish 发布消息 此函数会返回 err,正常使用发布订阅不会出错,可以直接丢弃 err 不处理

func (*Producer) PublishTx

func (p *Producer) PublishTx(createFn SessionFunc, topic string, body []byte) error

type Publisher

type Publisher[T any] struct{}

Publisher 消息发布者

func NewPublisher

func NewPublisher[T any]() *Publisher[T]

NewPublisher 创建消息发布者

func (*Publisher[T]) Publish

func (p *Publisher[T]) Publish(topic string, msg T) error

Publish 发布消息 此函数会返回 err,正常使用发布订阅不会出错,可以直接丢弃 err 不处理 如果使用 PublishWithContext 则需要处理 err 有哪些情况会触发 err 呢? 1. 没有订阅时发布 2. 使用 PublishWithContext 发布超时,订阅者没有足够的能力快速处理任务

func (*Publisher[T]) PublishWithContext

func (p *Publisher[T]) PublishWithContext(ctx context.Context, topic string, msg T) error

PublishWithContext 发布限时消息 如果需要限制发布超时,请使用此函数,可以根据返回的 err 判断是否发布超时

type SessionFunc added in v1.1.0

type SessionFunc func(*Message) error

type Subscriber

type Subscriber[T any] struct {
	*BaseConfig
	// contains filtered or unexported fields
}

Subscriber 消息订阅者

func NewSubscriber

func NewSubscriber[T any](topic, channel string, opts ...Option) *Subscriber[T]

NewSubscriber 创建消息订阅者

func (*Subscriber[T]) AddConcurrentHandlers

func (s *Subscriber[T]) AddConcurrentHandlers(handler EventHandler[T], concurrency int32)

AddConcurrentHandlers 添加并发处理程序

func (*Subscriber[T]) GetChannel

func (s *Subscriber[T]) GetChannel() string

GetChannel implements SubscriberInfo.

func (*Subscriber[T]) GetTopic

func (s *Subscriber[T]) GetTopic() string

GetTopic implements SubscriberInfo.

func (*Subscriber[T]) OnFinish

func (s *Subscriber[T]) OnFinish(_ *EventMessage[T])

OnFinish implements MessageDelegate.

func (*Subscriber[T]) OnRequeue

func (s *Subscriber[T]) OnRequeue(m *EventMessage[T], delay time.Duration)

OnRequeue implements MessageDelegate.

func (*Subscriber[T]) OnTouch

func (s *Subscriber[T]) OnTouch(*EventMessage[T])

OnTouch implements MessageDelegate.

func (*Subscriber[T]) SendMessage

func (s *Subscriber[T]) SendMessage(ctx context.Context, msg interface{}) error

SendMessage implements SubscriberInfo.

func (*Subscriber[T]) StartDeferredTimeout

func (s *Subscriber[T]) StartDeferredTimeout(msg *EventMessage[T], delay time.Duration)

StartDeferredTimeout 开始延迟处理消息

func (*Subscriber[T]) Stop

func (s *Subscriber[T]) Stop()

Stop 订阅者停止接收消息

func (*Subscriber[T]) Wait

func (s *Subscriber[T]) Wait()

Wait 等待订阅者执行 stop 方法

func (*Subscriber[T]) WaitMessage

func (s *Subscriber[T]) WaitMessage()

WaitMessage waits for all messages to be processed, convenient for testing message processing progress, not recommended for use in production environments 等待所有消息处理完成,方便测试消息处理进度,不建议在生产环境中使用

type SubscriberHandlerFunc added in v1.0.0

type SubscriberHandlerFunc[T any] func(message *EventMessage[T]) error

SubscriberHandler 是一个适配器,允许使用普通函数作为 EventHandler

func (SubscriberHandlerFunc[T]) HandleMessage added in v1.0.0

func (s SubscriberHandlerFunc[T]) HandleMessage(msg *EventMessage[T]) error

type SubscriberInfo

type SubscriberInfo interface {
	GetTopic() string
	GetChannel() string
	SendMessage(ctx context.Context, msg interface{}) error
}

SubscriberInfo 定义了获取订阅者信息的接口

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL