 
 
 
 
 

go-rabbitmq
Elegant RabbitMQ client package with Go idioms and configurable options.
CHINESE README
δΈζθ―΄ζ
Main Features
π Simple API Design: Clean and intuitive interfaces with options pattern configuration
β‘ Connection Management: Automatic connection lifecycle handling with reconnection support
π Message Acknowledgment: Complete message acknowledgment with Ack/Reject operations
π― Concurrent Processing: Built-in rate limiting with configurable concurrent goroutines
π Flexible Options: Chainable With methods allowing fine-grained publish and consume configuration
πͺ Message Persistence: Message-level delivery mode settings supporting both transient and persistent messages
Installation
go get github.com/go-xlan/go-rabbitmq
Quick Start
This complete example demonstrates concurrent publishing and consuming with message acknowledgment:
package main
import (
	"context"
	"math/rand/v2"
	"sync"
	"time"
	"github.com/go-xlan/go-rabbitmq/rabbitmq"
	"github.com/google/uuid"
	"github.com/pkg/errors"
	amqp "github.com/rabbitmq/amqp091-go"
	"github.com/yyle88/must"
	"github.com/yyle88/neatjson/neatjsonm"
	"github.com/yyle88/neatjson/neatjsons"
	"github.com/yyle88/rese"
	"github.com/yyle88/zaplog"
)
type Message struct {
	Value string
}
func main() {
	var cfg = &rabbitmq.Config{
		Protocol:  "amqp",
		Host:      "127.0.0.1",
		Port:      5672,
		Username:  "guest",
		Password:  "guest",
		QueueName: "go-rabbitmq-demo1x",
	}
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		publishFunction(cfg)
	}()
	wg.Add(1)
	go func() {
		defer wg.Done()
		consumeFunction(cfg)
	}()
	wg.Wait()
}
func publishFunction(cfg *rabbitmq.Config) {
	ctx := context.Background()
	pub := rese.P1(rabbitmq.NewPublishClient(cfg))
	defer rese.F0(pub.Close)
	for idx := 0; idx < 10; idx++ {
		msg := &Message{Value: uuid.New().String()}
		data := neatjsonm.B(msg)
		if idx%2 == 0 {
			must.Done(pub.PublishMessage(ctx, data, pub.NewPublishOptions().WithDeliveryMode(amqp.Persistent)))
		} else {
			must.Done(pub.PublishMessage(ctx, data, pub.NewPublishOptions()))
		}
		time.Sleep(time.Millisecond * 10)
	}
	time.Sleep(time.Millisecond * 100)
}
func consumeFunction(cfg *rabbitmq.Config) {
	ctx := context.Background()
	ctx, cancelCauseFunc := context.WithCancelCause(ctx)
	must.Done(rabbitmq.ConsumeMessage(ctx, cfg, func(name string, data []byte) error {
		zaplog.SUG.Infoln(name)
		zaplog.SUG.Infoln(neatjsons.SxB(data))
		if rand.IntN(100) < 10 {
			return nil
		}
		return errors.Errorf("xxx")
	}, 10000))
	time.Sleep(time.Millisecond * 500)
	cancelCauseFunc(errors.New("done"))
	time.Sleep(time.Millisecond * 200)
}
β¬οΈ Source: Source
Advanced Usage
Custom Queue Options
Configure queue with custom options:
opts := rabbitmq.NewQueueOptions("my-queue").
    WithDurable(true).
    WithAutoDelete(false).
    WithExclusive(false)
Custom Consume Options
Configure consume behavior:
opts := rabbitmq.NewConsumeOptions("my-queue").
    WithAutoAck(false).
    WithConsumerTag("my-consumer").
    WithExclusive(false)
Custom Publish Options
Publish with custom options:
opts := pub.NewPublishOptions().
    WithDeliveryMode(amqp.Persistent).
    WithExchange("my-exchange").
    WithMandatory(true)
err := pub.PublishMessage(ctx, data, opts)
Secure Connection (AMQPS)
Use TLS for secure connection:
cfg := &rabbitmq.Config{
    Protocol:  "amqps",
    Host:      "secure.rabbitmq.host",
    Port:      5671,
    Username:  "user",
    Password:  "pass",
    QueueName: "secure-queue",
}
Configuration Options
Connection Config
- Protocol: "amqp"(plain) /"amqps"(TLS-secured)
- Host: RabbitMQ hostname / IP address
- Port: RabbitMQ port (5672standard,5671TLS)
- Username: Authentication username
- Password: Authentication password
- QueueName: Default queue name to use
Queue Options
- Durable: Queue survives restarts (default: true)
- AutoDelete: Delete queue when no consumers (default: false)
- Exclusive: Queue accessible to this connection during its lifetime (default: false)
- NoWait: Don't wait to confirm queue declaration (default: false)
- Args: Extra arguments to pass to queue declaration
Consume Options
- AutoAck: Auto-acknowledge messages (default: false)
- ConsumerTag: Consumer tag name
- Exclusive: Exclusive consumer, no concurrent consumers (default: false)
- NoLocal: Don't get messages published on same connection (default: false)
- NoWait: Don't wait to confirm consume operation (default: false)
- Args: Extra arguments to pass to consume operation
Publish Options
- DeliveryMode: 0(transient) /2(persistent)
- Exchange: Exchange name to publish message to
- RoutingKey: Routing key to route message
- Mandatory: Return message if unroutable
- Immediate: Return message if no immediate consumers
Error Handling
The package uses structured exception types to provide context about different conditions:
// Exception types
- MqCannotInit   // Failed to initialize connection
- MqDisconnected // Lost connection to RabbitMQ
- ContextIsDone  // Context cancelled
Automatic reconnection happens in the background except when the first connection fails.
Best Practices
- Close clients: Use defer pub.Close()to ensure resource cleanup
- Handle context cancellation: Pass appropriate context to enable smooth shutdown
- Configure concurrent processing: Set appropriate maxRoutinebased on workload
- Use persistent messages with care: Persistence prevents message loss but has performance cost
- Message acknowledgment: Return nilto ack, returnerrorto reject and requeue
- Test configuration: Test in development environment before production
π License
MIT License. See LICENSE.
π€ Contributing
Contributions are welcome! Report bugs, suggest features, and contribute code:
- π Found a mistake? Open an issue on GitHub with reproduction steps
- π‘ Have a feature idea? Create an issue to discuss the suggestion
- π Documentation confusing? Report it so we can improve
- π Need new features? Share the use cases to help us understand requirements
- β‘ Performance issue? Help us optimize through reporting slow operations
- π§ Configuration problem? Ask questions about complex setups
- π’ Follow project progress? Watch the repo to get new releases and features
- π Success stories? Share how this package improved the workflow
- π¬ Feedback? We welcome suggestions and comments
π§ Development
New code contributions, follow this process:
- Fork: Fork the repo on GitHub (using the webpage UI).
- Clone: Clone the forked project (git clone https://github.com/yourname/repo-name.git).
- Navigate: Navigate to the cloned project (cd repo-name)
- Branch: Create a feature branch (git checkout -b feature/xxx).
- Code: Implement the changes with comprehensive tests
- Testing: (Golang project) Ensure tests pass (go test ./...) and follow Go code style conventions
- Documentation: Update documentation to support client-facing changes and use significant commit messages
- Stage: Stage changes (git add .)
- Commit: Commit changes (git commit -m "Add feature xxx") ensuring backward compatible code
- Push: Push to the branch (git push origin feature/xxx).
- PR: Open a merge request on GitHub (on the GitHub webpage) with detailed description.
Please ensure tests pass and include relevant documentation updates.
π Support
Welcome to contribute to this project via submitting merge requests and reporting issues.
Project Support:
- β Give GitHub stars if this project helps you
- π€ Share with teammates and (golang) programming friends
- π Write tech blogs about development tools and workflows - we provide content writing support
- π Join the ecosystem - committed to supporting open source and the (golang) development scene
Have Fun Coding with this package! πππ
GitHub Stars
