Documentation

Overview

    Package amqp implements an AMQP transport.

    Index

    Constants

    View Source
    const (
    	// ContextKeyExchange is the value of the reply Exchange in
    	// amqp.Publish.
    	ContextKeyExchange contextKey = iota
    	// ContextKeyPublishKey is the value of the ReplyTo field in
    	// amqp.Publish.
    	ContextKeyPublishKey
    	// ContextKeyNackSleepDuration is the duration to sleep for if the
    	// service Nack and requeues a message.
    	// This is to prevent sporadic send-resending of message
    	// when a message is constantly Nack'd and requeued.
    	ContextKeyNackSleepDuration
    	// ContextKeyAutoAck is the value of autoAck field when calling
    	// amqp.Channel.Consume.
    	ContextKeyAutoAck
    	// ContextKeyConsumeArgs is the value of consumeArgs field when calling
    	// amqp.Channel.Consume.
    	ContextKeyConsumeArgs
    )

    Variables

    This section is empty.

    Functions

    func DefaultDeliverer

    func DefaultDeliverer(
    	ctx context.Context,
    	p Publisher,
    	pub *amqp.Publishing,
    ) (*amqp.Delivery, error)

      DefaultDeliverer is a deliverer that publishes the specified Publishing and returns the first Delivery object with the matching correlationId. If the context times out while waiting for a reply, an error will be returned.

      func DefaultErrorEncoder

      func DefaultErrorEncoder(ctx context.Context,
      	err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing)

        DefaultErrorEncoder simply ignores the message. It does not reply nor Ack/Nack the message.

        func DefaultResponsePublisher

        func DefaultResponsePublisher(
        	ctx context.Context,
        	deliv *amqp.Delivery,
        	ch Channel,
        	pub *amqp.Publishing,
        ) error

          DefaultResponsePublisher extracts the reply exchange and reply key from the request, and sends the response object to that destination.

          func EncodeJSONResponse

          func EncodeJSONResponse(
          	ctx context.Context,
          	pub *amqp.Publishing,
          	response interface{},
          ) error

            EncodeJSONResponse marshals the response as JSON as part of the payload of the AMQP Publishing object.

            func EncodeNopResponse

            func EncodeNopResponse(
            	ctx context.Context,
            	pub *amqp.Publishing,
            	response interface{},
            ) error

              EncodeNopResponse is a response function that does nothing.

              func NopResponsePublisher

              func NopResponsePublisher(
              	ctx context.Context,
              	deliv *amqp.Delivery,
              	ch Channel,
              	pub *amqp.Publishing,
              ) error

                NopResponsePublisher does not deliver a response to the original sender. This response publisher is used when the user wants the subscriber to receive and forget.

                func ReplyAndAckErrorEncoder

                func ReplyAndAckErrorEncoder(ctx context.Context, err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing)

                  ReplyAndAckErrorEncoder serializes the error message as a DefaultErrorResponse JSON and sends the message to the ReplyTo address then Acks the original message.

                  func ReplyErrorEncoder

                  func ReplyErrorEncoder(
                  	ctx context.Context,
                  	err error,
                  	deliv *amqp.Delivery,
                  	ch Channel,
                  	pub *amqp.Publishing,
                  )

                    ReplyErrorEncoder serializes the error message as a DefaultErrorResponse JSON and sends the message to the ReplyTo address.

                    func SendAndForgetDeliverer

                    func SendAndForgetDeliverer(
                    	ctx context.Context,
                    	p Publisher,
                    	pub *amqp.Publishing,
                    ) (*amqp.Delivery, error)

                      SendAndForgetDeliverer delivers the supplied publishing and returns a nil response. When using this deliverer please ensure that the supplied DecodeResponseFunc and PublisherResponseFunc are able to handle nil-type responses.

                      func SingleNackRequeueErrorEncoder

                      func SingleNackRequeueErrorEncoder(ctx context.Context,
                      	err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing)

                        SingleNackRequeueErrorEncoder issues a Nack to the delivery with multiple flag set as false and requeue flag set as true. It does not reply the message.

                        Types

                        type Channel

                        type Channel interface {
                        	Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
                        	Consume(queue, consumer string, autoAck, exclusive, noLocal, noWail bool, args amqp.Table) (<-chan amqp.Delivery, error)
                        }

                          Channel is a channel interface to make testing possible. It is highly recommended to use *amqp.Channel as the interface implementation.

                          type DecodeRequestFunc

                          type DecodeRequestFunc func(context.Context, *amqp.Delivery) (request interface{}, err error)

                            DecodeRequestFunc extracts a user-domain request object from an AMQP Delivery object. It is designed to be used in AMQP Subscribers.

                            type DecodeResponseFunc

                            type DecodeResponseFunc func(context.Context, *amqp.Delivery) (response interface{}, err error)

                              DecodeResponseFunc extracts a user-domain response object from an AMQP Delivery object. It is designed to be used in AMQP Publishers.

                              type DefaultErrorResponse

                              type DefaultErrorResponse struct {
                              	Error string `json:"err"`
                              }

                                DefaultErrorResponse is the default structure of responses in the event of an error.

                                type Deliverer

                                type Deliverer func(
                                	context.Context,
                                	Publisher,
                                	*amqp.Publishing,
                                ) (*amqp.Delivery, error)

                                  Deliverer is invoked by the Publisher to publish the specified Publishing, and to retrieve the appropriate response Delivery object.

                                  type EncodeRequestFunc

                                  type EncodeRequestFunc func(context.Context, *amqp.Publishing, interface{}) error

                                    EncodeRequestFunc encodes the passed request object into an AMQP Publishing object. It is designed to be used in AMQP Publishers.

                                    type EncodeResponseFunc

                                    type EncodeResponseFunc func(context.Context, *amqp.Publishing, interface{}) error

                                      EncodeResponseFunc encodes the passed response object to an AMQP Publishing object. It is designed to be used in AMQP Subscribers.

                                      type ErrorEncoder

                                      type ErrorEncoder func(ctx context.Context,
                                      	err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing)

                                        ErrorEncoder is responsible for encoding an error to the subscriber reply. Users are encouraged to use custom ErrorEncoders to encode errors to their replies, and will likely want to pass and check for their own error types.

                                        type Publisher

                                        type Publisher struct {
                                        	// contains filtered or unexported fields
                                        }

                                          Publisher wraps an AMQP channel and queue, and provides a method that implements endpoint.Endpoint.

                                          func NewPublisher

                                          func NewPublisher(
                                          	ch Channel,
                                          	q *amqp.Queue,
                                          	enc EncodeRequestFunc,
                                          	dec DecodeResponseFunc,
                                          	options ...PublisherOption,
                                          ) *Publisher

                                            NewPublisher constructs a usable Publisher for a single remote method.

                                            func (Publisher) Endpoint

                                            func (p Publisher) Endpoint() endpoint.Endpoint

                                              Endpoint returns a usable endpoint that invokes the remote endpoint.

                                              type PublisherOption

                                              type PublisherOption func(*Publisher)

                                                PublisherOption sets an optional parameter for clients.

                                                func PublisherAfter

                                                func PublisherAfter(after ...PublisherResponseFunc) PublisherOption

                                                  PublisherAfter sets the ClientResponseFuncs applied to the incoming AMQP request prior to it being decoded. This is useful for obtaining anything off of the response and adding onto the context prior to decoding.

                                                  func PublisherBefore

                                                  func PublisherBefore(before ...RequestFunc) PublisherOption

                                                    PublisherBefore sets the RequestFuncs that are applied to the outgoing AMQP request before it's invoked.

                                                    func PublisherDeliverer

                                                    func PublisherDeliverer(deliverer Deliverer) PublisherOption

                                                      PublisherDeliverer sets the deliverer function that the Publisher invokes.

                                                      func PublisherTimeout

                                                      func PublisherTimeout(timeout time.Duration) PublisherOption

                                                        PublisherTimeout sets the available timeout for an AMQP request.

                                                        type PublisherResponseFunc

                                                        type PublisherResponseFunc func(context.Context, *amqp.Delivery) context.Context

                                                          PublisherResponseFunc may take information from an AMQP request and make the response available for consumption. PublisherResponseFunc are only executed in publishers, after a request has been made, but prior to it being decoded.

                                                          type RequestFunc

                                                          type RequestFunc func(context.Context, *amqp.Publishing, *amqp.Delivery) context.Context

                                                            RequestFunc may take information from a publisher request and put it into a request context. In Subscribers, RequestFuncs are executed prior to invoking the endpoint.

                                                            func SetConsumeArgs

                                                            func SetConsumeArgs(args amqp.Table) RequestFunc

                                                              SetConsumeArgs returns a RequestFunc that set the arguments for amqp Consume function. It is designed to be used by Publishers.

                                                              func SetConsumeAutoAck

                                                              func SetConsumeAutoAck(autoAck bool) RequestFunc

                                                                SetConsumeAutoAck returns a RequestFunc that sets whether or not to autoAck messages when consuming. When set to false, the publisher will Ack the first message it receives with a matching correlationId. It is designed to be used by Publishers.

                                                                func SetContentEncoding

                                                                func SetContentEncoding(contentEncoding string) RequestFunc

                                                                  SetContentEncoding returns a RequestFunc that sets the ContentEncoding field of an AMQP Publishing.

                                                                  func SetContentType

                                                                  func SetContentType(contentType string) RequestFunc

                                                                    SetContentType returns a RequestFunc that sets the ContentType field of an AMQP Publishing.

                                                                    func SetCorrelationID

                                                                    func SetCorrelationID(cid string) RequestFunc

                                                                      SetCorrelationID returns a RequestFunc that sets the CorrelationId field of an AMQP Publishing.

                                                                      func SetNackSleepDuration

                                                                      func SetNackSleepDuration(duration time.Duration) RequestFunc

                                                                        SetNackSleepDuration returns a RequestFunc that sets the amount of time to sleep in the event of a Nack. This has to be used in conjunction with an error encoder that Nack and sleeps. One example is the SingleNackRequeueErrorEncoder. It is designed to be used by Subscribers.

                                                                        func SetPublishDeliveryMode

                                                                        func SetPublishDeliveryMode(dmode uint8) RequestFunc

                                                                          SetPublishDeliveryMode sets the delivery mode of a Publishing. Please refer to AMQP delivery mode constants in the AMQP package.

                                                                          func SetPublishExchange

                                                                          func SetPublishExchange(publishExchange string) RequestFunc

                                                                            SetPublishExchange returns a RequestFunc that sets the Exchange field of an AMQP Publish call.

                                                                            func SetPublishKey

                                                                            func SetPublishKey(publishKey string) RequestFunc

                                                                              SetPublishKey returns a RequestFunc that sets the Key field of an AMQP Publish call.

                                                                              type ResponsePublisher

                                                                              type ResponsePublisher func(
                                                                              	context.Context,
                                                                              	*amqp.Delivery,
                                                                              	Channel,
                                                                              	*amqp.Publishing,
                                                                              ) error

                                                                                ResponsePublisher functions are executed by the subscriber to publish response object to the original sender. Please note that the word "publisher" does not refer to the publisher of pub/sub. Rather, publisher is merely a function that publishes, or sends responses.

                                                                                type Subscriber

                                                                                type Subscriber struct {
                                                                                	// contains filtered or unexported fields
                                                                                }

                                                                                  Subscriber wraps an endpoint and provides a handler for AMQP Delivery messages.

                                                                                  func NewSubscriber

                                                                                  func NewSubscriber(
                                                                                  	e endpoint.Endpoint,
                                                                                  	dec DecodeRequestFunc,
                                                                                  	enc EncodeResponseFunc,
                                                                                  	options ...SubscriberOption,
                                                                                  ) *Subscriber

                                                                                    NewSubscriber constructs a new subscriber, which provides a handler for AMQP Delivery messages.

                                                                                    func (Subscriber) ServeDelivery

                                                                                    func (s Subscriber) ServeDelivery(ch Channel) func(deliv *amqp.Delivery)

                                                                                      ServeDelivery handles AMQP Delivery messages It is strongly recommended to use *amqp.Channel as the Channel interface implementation.

                                                                                      type SubscriberOption

                                                                                      type SubscriberOption func(*Subscriber)

                                                                                        SubscriberOption sets an optional parameter for subscribers.

                                                                                        func SubscriberAfter

                                                                                        func SubscriberAfter(after ...SubscriberResponseFunc) SubscriberOption

                                                                                          SubscriberAfter functions are executed on the subscriber reply after the endpoint is invoked, but before anything is published to the reply.

                                                                                          func SubscriberBefore

                                                                                          func SubscriberBefore(before ...RequestFunc) SubscriberOption

                                                                                            SubscriberBefore functions are executed on the publisher delivery object before the request is decoded.

                                                                                            func SubscriberErrorEncoder

                                                                                            func SubscriberErrorEncoder(ee ErrorEncoder) SubscriberOption

                                                                                              SubscriberErrorEncoder is used to encode errors to the subscriber reply whenever they're encountered in the processing of a request. Clients can use this to provide custom error formatting. By default, errors will be published with the DefaultErrorEncoder.

                                                                                              func SubscriberErrorHandler

                                                                                              func SubscriberErrorHandler(errorHandler transport.ErrorHandler) SubscriberOption

                                                                                                SubscriberErrorHandler is used to handle non-terminal errors. By default, non-terminal errors are ignored. This is intended as a diagnostic measure. Finer-grained control of error handling, including logging in more detail, should be performed in a custom SubscriberErrorEncoder which has access to the context.

                                                                                                func SubscriberErrorLogger

                                                                                                func SubscriberErrorLogger(logger log.Logger) SubscriberOption

                                                                                                  SubscriberErrorLogger is used to log non-terminal errors. By default, no errors are logged. This is intended as a diagnostic measure. Finer-grained control of error handling, including logging in more detail, should be performed in a custom SubscriberErrorEncoder which has access to the context. Deprecated: Use SubscriberErrorHandler instead.

                                                                                                  func SubscriberResponsePublisher

                                                                                                  func SubscriberResponsePublisher(rp ResponsePublisher) SubscriberOption

                                                                                                    SubscriberResponsePublisher is used by the subscriber to deliver response objects to the original sender. By default, the DefaultResponsePublisher is used.

                                                                                                    type SubscriberResponseFunc

                                                                                                    type SubscriberResponseFunc func(context.Context,
                                                                                                    	*amqp.Delivery,
                                                                                                    	Channel,
                                                                                                    	*amqp.Publishing,
                                                                                                    ) context.Context

                                                                                                      SubscriberResponseFunc may take information from a request context and use it to manipulate a Publisher. SubscriberResponseFuncs are only executed in subscribers, after invoking the endpoint but prior to publishing a reply.

                                                                                                      func SetAckAfterEndpoint

                                                                                                      func SetAckAfterEndpoint(multiple bool) SubscriberResponseFunc

                                                                                                        SetAckAfterEndpoint returns a SubscriberResponseFunc that prompts the service to Ack the Delivery object after successfully evaluating the endpoint, and before it encodes the response. It is designed to be used by Subscribers.