rmqcli

package module
v0.0.0-...-fa9e10c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2025 License: MIT Imports: 6 Imported by: 0

README

rmqcli

Go rabbitmq client

Wrapper abstraction reallization - amqp091-go

Use-cases for rabbitMq broker

  • Binding queue by exchange and routing key
  • Create handle func in consumer
  • Simple use consumer and publish
  • Graseful shutdown

Install

$ go get "github.com/nergilz/rmqcli"
Examples
create publisher
func runPublisher(cli *rmqcli.RmqCli, queue string) {
	pub := &amqp.Publishing{ContentType: "text/plain", Body: []byte("test rmq msg by cli v9.7")}

	log.Println("run publisher...")

	err := cli.Publisher.Publish(pub, "", queue)
	if err != nil {
		log.Println("error run publish:", err.Error())
	}

	defer cli.Publisher.CloseChannel()
}
create consumer
func runConsumer(cli *rmqcli.RmqCli, queue string) {
	log.Println("run consumer...")

	err := cli.Consumer.Consume(queue)
	if err != nil {
		log.Println("error run publish:", err.Error())
	}

	defer cli.Consumer.CloseChannel()

	time.Sleep(time.Second)
}
create hendler function
func handlerFoo(delivery *amqp.Delivery) {
	// do something
	log.Printf("Received msg: %s", delivery.Body)
}
main.go
func main() {
	cfg := &rmqcli.RmqConfig{
		Url:   "amqp://guest:guest@localhost:5672/",
		Queue: "hello_rmqcli",
	}

	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()

	cli, err := rmqcli.InitRmqCli(ctx, cfg.Url, handlerFoo)
	if err != nil {
		log.Println("error init cli:", err.Error())
	}

	runPublisher(cli, cfg.Queue)

	runConsumer(cli, cfg.Queue)

	err = cli.CloseConnection()
	if err != nil {
		log.Println("error close connection:", err.Error())
	}
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type RmqCli

type RmqCli struct {
	Consumer   *consumer.Consumer
	Publisher  *publisher.Publisher
	Declorator *declorator.Declorator
	// contains filtered or unexported fields
}

func InitRmqCli

func InitRmqCli(ctx context.Context, url string, h consumer.HandlerFoo) (*RmqCli, error)

func (*RmqCli) CloseConnection

func (rmq *RmqCli) CloseConnection() error

type RmqConfig

type RmqConfig struct {
	Url         string
	Queue       string
	Concurrency int
	Exchange    string
	RoutingKey  string
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL