astiamqp

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2018 License: MIT Imports: 11 Imported by: 1

README

Astiamqp

Wrapper on top of amqp to provide proper configuration and error handling

Usage

// Create
a := astiamqp.New(c)
defer a.Close()

// Init
a.Init(context.Background())

// Add producer
p, _ := a.AddProducer(astiamqp.ConfigurationProducer{
    Exchange: astiamqp.ConfigurationExchange{
        Durable: true,
        Name:    "my-exchange",
        Type:    astiamqp.ExchangeTypeTopic,
    },
})

// Publish
p.Publish("my payload", "my.routing.key")

// Add consumer
a.AddConsumer(astiamqp.ConfigurationConsumer{
    AutoAck: false,
    Exchange: astiamqp.ConfigurationExchange{
        Durable: true,
        Name:    "my-exchange",
        Type:    astiamqp.ExchangeTypeTopic,
    },
    Handler: myHandler,
    Queue: astiamqp.ConfigurationQueue{
        Durable: true,
        Name:    "my-queue",
    },
    RoutingKey: "my.routing.key",
})

// Stop
a.Stop()

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	Addr     = flag.String("amqp-addr", "", "the amqp addr")
	Password = flag.String("amqp-password", "", "the amqp password")
	Username = flag.String("amqp-username", "", "the amqp username")
)

Flags

Functions

This section is empty.

Types

type AMQP

type AMQP struct {
	// contains filtered or unexported fields
}

AMQP represents a client capable of sending/listening to AMQP queues

func New

func New(c Configuration) (a *AMQP)

New creates a new AMQP instance based on a configuration

func (*AMQP) AddConsumer

func (a *AMQP) AddConsumer(c ConfigurationConsumer) (err error)

AddConsumer adds a consumer

func (*AMQP) AddProducer

func (a *AMQP) AddProducer(c ConfigurationProducer) (p *Producer, err error)

AddProducer adds a producer

func (*AMQP) Close

func (a *AMQP) Close() error

Close closes amqp properly

func (*AMQP) Init

func (a *AMQP) Init(ctx context.Context) (err error)

Init initializes amqp

func (*AMQP) Stop

func (a *AMQP) Stop()

Stop stops amqp It will wait for all consumers to stop handling deliveries

type Acknowledger

type Acknowledger interface {
	Ack(multiple bool) error
	Nack(multiple bool, requeue bool) error
	Reject(requeue bool) error
}

Acknowledger represents an acknowledger

type Configuration

type Configuration struct {
	Addr     string            `toml:"addr"`
	Password string            `toml:"password"`
	QOS      *ConfigurationQOS `toml:"qos"`
	Username string            `toml:"username"`
}

Configuration represents the AMQP configuration

func FlagConfig

func FlagConfig() Configuration

FlagConfig returns an AMQP config based on flags

type ConfigurationConsumer

type ConfigurationConsumer struct {
	Arguments  Table
	AutoAck    bool
	Exchange   ConfigurationExchange
	Exclusive  bool
	Handler    Handler
	NoLocal    bool
	NoWait     bool
	Queue      ConfigurationQueue
	RoutingKey string
}

ConfigurationConsumer represents a consumer configuration

type ConfigurationExchange

type ConfigurationExchange struct {
	Arguments   Table
	AutoDeleted bool
	Durable     bool
	Internal    bool
	Name        string
	NoWait      bool
	Type        ExchangeType
}

ConfigurationExchange represents an exchange configuration

type ConfigurationProducer

type ConfigurationProducer struct {
	Exchange ConfigurationExchange
}

ConfigurationProducer represents a producer configuration

type ConfigurationQOS

type ConfigurationQOS struct {
	Global        bool `toml:"global"`
	PrefetchCount int  `toml:"prefetch_count"`
	PrefetchSize  int  `toml:"prefetch_size"`
}

ConfigurationQOS represents the AMQP QOS configuration

type ConfigurationQueue

type ConfigurationQueue struct {
	Arguments   Table
	AutoDeleted bool
	Durable     bool
	Exclusive   bool
	Name        string
	NoWait      bool
}

ConfigurationQueue represents a queue configuration

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a Consumer

type ExchangeType

type ExchangeType string

ExchangeType represents an exchange type

const (
	ExchangeTypeDirect  ExchangeType = "direct"
	ExchangeTypeFanout  ExchangeType = "fanout"
	ExchangeTypeHeaders ExchangeType = "headers"
	ExchangeTypeTopic   ExchangeType = "topic"
)

Constants

type Handler

type Handler func(msg []byte, routingKey string, a Acknowledger) error

Handler handles a message

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer represents a producer

func (*Producer) Produce

func (p *Producer) Produce(msg interface{}, routingKey string) (err error)

Produce produces a message on a routing key after json.Marshaling it

type Table

type Table amqp.Table

Table wraps amqp.Table

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL