rabbit

package module
v0.12.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2022 License: Apache-2.0 Imports: 14 Imported by: 0

README

RABBITMQ

Install

go get github.com/aacfactory/fns-contrib/message-queues/rabbit

Usage

app.Deploy(rabbit.Service())

Config

rabbitmq:
  uri: "amqp://"
  producers:
    foo:
      exchange: "exchange"
      confirmMode: true
      key: ""
      mandatory: false
      immediate: false
      size: 8
  consumers:
    bar:
      handler: default
      queue: ""
      autoAck: false
      exclusive: false
      noLocal: false
      noWait: false

Use user consumer handler

consumers:
  bar:
    handler: user_consumer
    handlerOptions:
      userId: "userId"

As proxy

published, publishErr := rabbit.Publish(ctx, rabbit.PublishArgument{
	Name: "producer name", 
	Body: json.RawMessage([]byte("{}"))
})

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Publish

func Publish(ctx context.Context, argument PublishArgument) (ok bool, err errors.CodeError)

func RegisterConsumerHandler

func RegisterConsumerHandler(name string, builder ConsumerHandlerBuilder)

func Service

func Service() service.Service

Types

type ClientTLSConfig

type ClientTLSConfig struct {
	CA                 string `json:"ca"`
	Cert               string `json:"cert"`
	Key                string `json:"key"`
	InsecureSkipVerify bool   `json:"insecureSkipVerify"`
}

type Config

type Config struct {
	URI       string                     `json:"uri"`
	Options   *OptionsConfig             `json:"options"`
	Producers map[string]*ProducerConfig `json:"producers"`
	Consumers map[string]*ConsumerConfig `json:"consumers"`
}

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Close

func (consumer *Consumer) Close() (err error)

func (*Consumer) Consume

func (consumer *Consumer) Consume(ctx context.Context) (err error)

type ConsumerConfig

type ConsumerConfig struct {
	Handler        string                 `json:"handler"`
	HandlerOptions json.RawMessage        `json:"handlerOptions"`
	Queue          string                 `json:"queue"`
	AutoAck        bool                   `json:"autoAck"`
	Exclusive      bool                   `json:"exclusive"`
	NoLocal        bool                   `json:"noLocal"`
	NoWait         bool                   `json:"noWait"`
	Arguments      map[string]interface{} `json:"arguments"`
}

type ConsumerHandler

type ConsumerHandler interface {
	Handle(ctx context.Context, message ConsumerMessage)
}

type ConsumerHandlerBuilder

type ConsumerHandlerBuilder func(options ConsumerHandlerOptions) (handler ConsumerHandler, err error)

type ConsumerHandlerOptions

type ConsumerHandlerOptions struct {
	Log    logs.Logger
	Config configures.Config
}

type ConsumerMessage

type ConsumerMessage interface {
	Id() (id string)
	ContentType() (contentType string)
	Body() (body []byte)
	Type() (typ string)
	Ack() (err error)
	Reject() (err error)
	Raw() (raw *amqp.Delivery)
}

type DefaultConsumerMessage

type DefaultConsumerMessage struct {
	// contains filtered or unexported fields
}

func (*DefaultConsumerMessage) Ack

func (msg *DefaultConsumerMessage) Ack() (err error)

func (*DefaultConsumerMessage) Body

func (msg *DefaultConsumerMessage) Body() (body []byte)

func (*DefaultConsumerMessage) ContentType

func (msg *DefaultConsumerMessage) ContentType() (contentType string)

func (*DefaultConsumerMessage) Id

func (msg *DefaultConsumerMessage) Id() (id string)

func (*DefaultConsumerMessage) Raw

func (msg *DefaultConsumerMessage) Raw() (raw *amqp.Delivery)

func (*DefaultConsumerMessage) Reject

func (msg *DefaultConsumerMessage) Reject() (err error)

func (*DefaultConsumerMessage) Type

func (msg *DefaultConsumerMessage) Type() (typ string)

type Message

type Message struct {
	Service  string          `json:"service"`
	Fn       string          `json:"fn"`
	Argument json.RawMessage `json:"argument"`
}

type OptionsConfig

type OptionsConfig struct {
	AMQPlainAuth     *amqp.AMQPlainAuth `json:"amqPlainAuth"`
	Vhost            string             `json:"vhost"`
	ChannelMax       int                `json:"channelMax"`
	FrameSize        int                `json:"frameSize"`
	HeartbeatSeconds int                `json:"heartbeatSeconds"`
	Locale           string             `json:"locale"`
	ClientTLS        *ClientTLSConfig   `json:"clientTLS"`
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func (*Producer) Close

func (producer *Producer) Close() (err error)

func (*Producer) Publish

func (producer *Producer) Publish(_ context.Context, msg ProducerMessage) (ok bool, err errors.CodeError)

type ProducerConfig

type ProducerConfig struct {
	Exchange    string `json:"exchange"`
	ConfirmMode bool   `json:"confirmMode"`
	Key         string `json:"key"`
	Mandatory   bool   `json:"mandatory"`
	Immediate   bool   `json:"immediate"`
	Size        int    `json:"size"`
}

type ProducerMessage

type ProducerMessage interface {
	ContentType() (contentType string)
	Body() (body []byte)
}

type PublishArgument

type PublishArgument struct {
	Name string          `json:"name"`
	Body json.RawMessage `json:"body"`
}

type PublishResult

type PublishResult struct {
	Succeed bool `json:"succeed"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL