amqp_recipient

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2023 License: MIT Imports: 3 Imported by: 0

README

Build Status

Useful go components for amqp consumers

This module based on popular https://github.com/streadway/amqp library.

Preface

Working on several microservices that communicate through rabbitmq I found there are common tasks not related to the domain I need to solve. Most often I need consumers to be idempotent. Logs are important too. Probably if message handling fails you want retry it later or even republish to another exchange/queue. So I made handlers to solve this tasks.

Install

go get https://github.com/elegant-bro/amqp-recipient@v{version}

Usage

Suppose you need consumer that able to listen events from queue user.registered and send Welcome email using some mail service's api.

Let's look at basic example.

Documentation

Index

Constants

View Source
const HandlerAck uint8 = 0
View Source
const HandlerDoNothing uint8 = 3
View Source
const HandlerReject uint8 = 2
View Source
const HandlerRequeue uint8 = 1

Variables

This section is empty.

Functions

This section is empty.

Types

type AmqpSender

type AmqpSender struct {
	Conn       *amqp.Connection
	Exchange   string
	RoutingKey string
}

func NewAmqpSender

func NewAmqpSender(conn *amqp.Connection, exchange string, routingKey string) *AmqpSender

func (AmqpSender) Send

func (a AmqpSender) Send(p amqp.Publishing) error

type HandledIds

type HandledIds interface {
	Has(key string) (bool, error)
	Save(key string, fn func() (uint8, error)) (uint8, error)
}

type Job

type Job interface {
	Run()
}

type JobHandler

type JobHandler interface {
	Handle(d amqp.Delivery) (uint8, error)
}

type MapOfHandlers added in v1.1.0

type MapOfHandlers map[string]JobHandler

type OnHandlerFails

type OnHandlerFails func(d amqp.Delivery, err error)

type Recipient

type Recipient interface {
	Subscribe() (Job, error)
}

type Run

type Run struct {
	// contains filtered or unexported fields
}

func NewRun

func NewRun(entries []RunEntry, onFail func(err error)) *Run

func (*Run) All

func (run *Run) All()

func (*Run) AllAsync added in v0.3.0

func (run *Run) AllAsync() <-chan error

type RunEntry

type RunEntry struct {
	// contains filtered or unexported fields
}

func NewRunEntry

func NewRunEntry(recipient Recipient, consumers int) RunEntry

func OneRunEntry

func OneRunEntry(recipient Recipient) RunEntry

type Sender

type Sender interface {
	Send(p amqp.Publishing) error
}

type StubJob

type StubJob struct {
	// contains filtered or unexported fields
}

func NewStubJob

func NewStubJob(fn func()) *StubJob

func (StubJob) Run

func (s StubJob) Run()

type StubJobHandler

type StubJobHandler struct {
	// contains filtered or unexported fields
}

func NewStubJobHandler

func NewStubJobHandler(fn func(d amqp.Delivery) (uint8, error)) *StubJobHandler

func (StubJobHandler) Handle

func (s StubJobHandler) Handle(d amqp.Delivery) (uint8, error)

type StubRecipient

type StubRecipient struct {
	// contains filtered or unexported fields
}

func NewStubRecipient

func NewStubRecipient(job Job, err error) *StubRecipient

func NoErrorStubRecipient

func NoErrorStubRecipient(job Job) *StubRecipient

func (StubRecipient) Subscribe

func (s StubRecipient) Subscribe() (Job, error)

type StubSender

type StubSender struct {
	// contains filtered or unexported fields
}

func NewStubSender

func NewStubSender(fn func(p amqp.Publishing) error) *StubSender

func (StubSender) Send

func (f StubSender) Send(p amqp.Publishing) error

type Wrapper added in v1.1.0

type Wrapper func(d amqp.Delivery, wrapped JobHandler) (uint8, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL