usagi

package module
v0.0.0-...-3a1d3b3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2024 License: MIT Imports: 7 Imported by: 0

README

Usagi Go

!This project is still in development and is not ready for production use (kinda)!


Usagi provides a simple and flexible interface for interacting with RabbitMQ, including message publishing, listening, and retry mechanisms.

Install

Via go get tool

go get github.com/1704mori/usagi-go

Usage

Create and connect instance

client := usagi.NewUsagi(amqp.ConnectionConfig{
    URI:          "amqp://user:passwd@localhost:5672/vhost,
    Exchange:     "my_exchange",
    ExchangeType: "topic",
})

err := client.Initialize("connection_name")
if err != nil {
    defer client.Close()
    log.Fatalf("Failed to initialize: %v", err)
}

Publish a message

publisher := usagi.NewPublisher(client.ConnectionManager, "my_queue", usagi.PublishOptions{})
sent, err := publisher.Send(payload interface{})

// or
sent, err := client.Publish("my_queue", payload interface{}, usagi.PublishOptions{})

Listen to a queue

listener := usagi.NewListener(client.ConnectionManager, "my_queue")

err := listener.Listen(func(message interface{}) bool {
    value, err := someFunctionCall()
    if err != nil {
        return false // returning false will nack the message, triggering the retry mechanism, which tries 3 times before removing it from the queue
    }

    return true // ack the message
})

if err != nil {
  log.Fatalf("Failed to start listening: %v", err)
}

// or
client.Queue("my_queue").Listen(func(message interface{}) bool {
  // return true or false
})

License

This project is licensed under the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

func NewConnection

func NewConnection(config ConnectionConfig) *Connection

func (*Connection) Close

func (c *Connection) Close() error

func (*Connection) GetChannel

func (c *Connection) GetChannel() *amqp091.Channel

func (*Connection) GetExchange

func (c *Connection) GetExchange() string

func (*Connection) Initialize

func (c *Connection) Initialize(name string) error

type ConnectionConfig

type ConnectionConfig struct {
	URI          string
	Exchange     string
	ExchangeType string
}

type Listener

type Listener struct {
	RetryCount    int
	RetryTimeout  time.Duration
	DontSetupNack bool
	// contains filtered or unexported fields
}

func NewListener

func NewListener(conn *Connection, queueName string) *Listener

func (*Listener) Listen

func (l *Listener) Listen(callback func(message interface{}) bool) error

Listen starts listening to the queue and processes messages using the provided callback.

type PublishOptions

type PublishOptions struct {
	Persistent bool // Indicates if messages should be marked as persistent.
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(conn *Connection, queue string, options PublishOptions) *Publisher

NewPublisher creates a new instance of a Publisher. This function takes a connection manager, the name of the queue to publish messages to, and publishing options. It returns a pointer to a new Publisher instance.

func (*Publisher) Send

func (p *Publisher) Send(payload interface{}) (sent bool, err error)

Send marshals the payload into JSON and publishes it to the configured queue. It returns a boolean indicating whether the message was sent successfully and an error if something went wrong.

type Usagi

type Usagi struct {
	ConnectionManager *Connection
}

func NewUsagi

func NewUsagi(config ConnectionConfig) *Usagi

func (*Usagi) Close

func (u *Usagi) Close() error

func (*Usagi) Initialize

func (u *Usagi) Initialize(name string) error

func (*Usagi) Publish

func (u *Usagi) Publish(queue string, payload interface{}, options PublishOptions) (sent bool, err error)

func (*Usagi) Queue

func (u *Usagi) Queue(queueName string) *Listener

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL