hedwig

package module
v1.0.0-rc6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2022 License: MIT Imports: 9 Imported by: 0

README

Note:

Since the release of cloudevents, there is no need to maintain hedwig anymore. Please use the Go SDK instead of using hedwig.

Hedwig Go

Golang emitter and consumer for Hedwig

About Hedwig

Hedwig is an attempt to standardize messaging and events between micro-services.

It uses RabbitMQ as the transport and simplifies the API down significantly.

Documentation

Index

Constants

View Source
const (
	PublishChannel        = "publish"
	DelayedPublishChannel = "publish_delay"
	SubscribeChannel      = "subscribe"
)
View Source
const (
	// For exchanges which use RMQ delay plugin, following type should be used as ExchangeType
	ExchangeTypeDelayed = "x-delayed-message"
	// Delayed Type Exchanges also need an extra arg with following key with the value set to actual ExchangeType
	// like `direct`, `topic` etc
	DelayedExchangeArgKey = "x-delayed-type"
	// Following header should be used when you are publishing to an Delay exchange. The header value will be
	// delay in milliseconds
	DelayHeader      = "x-delay"
	MessageTTLArgKey = "x-message-ttl"
)

Variables

View Source
var (
	ErrNilHedwig          = errors.New("hedwig is nil, use `New` to create an instance")
	ErrAlreadyInitialized = errors.New("already initialized, disconnect before attempting again")
	ErrNoBindings         = errors.New("no bindings provided, need at least one")
	ErrNoConsumerSetting  = errors.New("consumer setting is nil")
)

Functions

This section is empty.

Types

type Callback

type Callback func(<-chan amqp.Delivery, *sync.WaitGroup)

type ConsumerSetting

type ConsumerSetting struct {
	Queues map[string]*QueueSetting
	// contains filtered or unexported fields
}

type Hedwig

type Hedwig struct {
	sync.Mutex

	Settings        *Settings
	DelayedSettings *Settings
	Error           error
	// contains filtered or unexported fields
}

func New

func New(exchangeSettings *Settings, delayedExchangeSettings *Settings) *Hedwig

func (*Hedwig) AddQueue

func (h *Hedwig) AddQueue(qSetting *QueueSetting, qName string) error

func (*Hedwig) Consume

func (h *Hedwig) Consume() error

We continue to consume from the original hedwig exchange

func (*Hedwig) DelayedPublishWithContext

func (h *Hedwig) DelayedPublishWithContext(ctx context.Context, key string, body []byte, delay time.Duration) (err error)

func (*Hedwig) Disconnect

func (h *Hedwig) Disconnect() error

Both hedwig exchange and delayed exchange will use the same broker hence no need to update here. The same connection object can be used.

func (*Hedwig) DoPublish

func (h *Hedwig) DoPublish(ctx context.Context, exchange, channel, key string, body []byte, headers map[string]interface{}) error

func (*Hedwig) Publish

func (h *Hedwig) Publish(key string, body []byte) (err error)

Deprecated, use PublishWithContext

func (*Hedwig) PublishWithContext

func (h *Hedwig) PublishWithContext(ctx context.Context, key string, body []byte) (err error)

func (*Hedwig) PublishWithDelay

func (h *Hedwig) PublishWithDelay(key string, body []byte, delay time.Duration) (err error)

Deprecated, use DelayedPublishWithContext

func (*Hedwig) PublishWithHeaders

func (h *Hedwig) PublishWithHeaders(key string, body []byte, headers map[string]interface{}) (err error)

Deprecated, use DoPublish

type QueueSetting

type QueueSetting struct {
	Bindings   []string
	Callback   Callback
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoAck      bool
	QueueArgs  amqp.Table
}

func DefaultQueueSetting

func DefaultQueueSetting(callback Callback, bindings ...string) *QueueSetting

type Settings

type Settings struct {
	Exchange          string
	ExchangeType      string
	ExchangeArgs      amqp.Table
	HeartBeatInterval time.Duration
	SocketTimeout     time.Duration
	Host              string
	Port              int
	Vhost             string
	Username          string
	Password          string
	Consumer          *ConsumerSetting
}

func DefaultDelayedSettings

func DefaultDelayedSettings() *Settings

func DefaultSettings

func DefaultSettings() *Settings

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL