kafka

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2021 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package kafka implements an KAFKA transport.

Index

Constants

View Source
const (
	// ContextKeyExchange is the value of the reply Exchange in
	// amqp.Publish.
	ContextKeyExchange contextKey = iota
	// ContextKeyPublishKey is the value of the ReplyTo field in
	// amqp.Publish.
	ContextKeyPublishKey
	// ContextKeyNackSleepDuration is the duration to sleep for if the
	// service Nack and requeues a message.
	// This is to prevent sporadic send-resending of message
	// when a message is constantly Nack'd and requeued.
	ContextKeyNackSleepDuration
	// ContextKeyAutoAck is the value of autoAck field when calling
	// amqp.Channel.Consume.
	ContextKeyAutoAck
	// ContextKeyConsumeArgs is the value of consumeArgs field when calling
	// amqp.Channel.Consume.
	ContextKeyConsumeArgs
)

Variables

This section is empty.

Functions

func DefaultErrorEncoder

func DefaultErrorEncoder(ctx context.Context,
	err error, deliv *sarama.ConsumerMessage, ch Channel, pub *sarama.ProducerMessage)

DefaultErrorEncoder simply ignores the message. It does not reply nor Ack/Nack the message.

func DefaultResponsePublisher

func DefaultResponsePublisher(
	ctx context.Context,
	deliv *sarama.ConsumerMessage,
	ch Channel,
	pub *sarama.ProducerMessage,
) error

DefaultResponsePublisher extracts the reply exchange and reply key from the request, and sends the response object to that destination.

func EncodeJSONResponse

func EncodeJSONResponse(
	ctx context.Context,
	pub *amqp.Publishing,
	response interface{},
) error

EncodeJSONResponse marshals the response as JSON as part of the payload of the AMQP Publishing object.

func EncodeNopResponse

func EncodeNopResponse(
	ctx context.Context,
	pub *amqp.Publishing,
	response interface{},
) error

EncodeNopResponse is a response function that does nothing.

func NopResponsePublisher

func NopResponsePublisher(
	ctx context.Context,
	deliv *amqp.Delivery,
	ch Channel,
	pub *amqp.Publishing,
) error

NopResponsePublisher does not deliver a response to the original sender. This response publisher is used when the user wants the subscriber to receive and forget.

func ReplyAndAckErrorEncoder

func ReplyAndAckErrorEncoder(ctx context.Context, err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing)

ReplyAndAckErrorEncoder serializes the error message as a DefaultErrorResponse JSON and sends the message to the ReplyTo address then Acks the original message.

func ReplyErrorEncoder

func ReplyErrorEncoder(
	ctx context.Context,
	err error,
	deliv *sarama.ConsumerMessage,
	ch Channel,
	pub *sarama.ProducerMessage,
)

ReplyErrorEncoder serializes the error message as a DefaultErrorResponse JSON and sends the message to the ReplyTo address.

func SingleNackRequeueErrorEncoder

func SingleNackRequeueErrorEncoder(ctx context.Context,
	err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing)

SingleNackRequeueErrorEncoder issues a Nack to the delivery with multiple flag set as false and requeue flag set as true. It does not reply the message.

Types

type Channel

type Channel interface {
	Publish(exchange, key string, mandatory, immediate bool, msg sarama.ProducerMessage) error
	Consume(queue, consumer string, autoAck, exclusive, noLocal, noWail bool, args amqp.Table) (<-chan amqp.Delivery, error)
}

Channel is a channel interface to make testing possible. It is highly recommended to use *amqp.Channel as the interface implementation.

type DecodeRequestFunc

type DecodeRequestFunc func(context.Context, *sarama.ConsumerMessage) (request interface{}, err error)

DecodeRequestFunc extracts a user-domain request object from an AMQP Delivery object. It is designed to be used in AMQP Subscribers.

type DecodeResponseFunc

type DecodeResponseFunc func(context.Context, *sarama.ConsumerMessage) (response interface{}, err error)

DecodeResponseFunc extracts a user-domain response object from an AMQP Delivery object. It is designed to be used in AMQP Publishers.

type DefaultErrorResponse

type DefaultErrorResponse struct {
	Error string `json:"err"`
}

DefaultErrorResponse is the default structure of responses in the event of an error.

type EncodeRequestFunc

type EncodeRequestFunc func(context.Context, *sarama.ProducerMessage, interface{}) error

EncodeRequestFunc encodes the passed request object into an AMQP Publishing object. It is designed to be used in AMQP Publishers.

type EncodeResponseFunc

type EncodeResponseFunc func(context.Context, *sarama.ProducerMessage, interface{}) error

EncodeResponseFunc encodes the passed response object to an AMQP Publishing object. It is designed to be used in AMQP Subscribers.

type ErrorEncoder

type ErrorEncoder func(ctx context.Context,
	err error, deliv *sarama.ConsumerMessage, ch Channel, pub *sarama.ProducerMessage)

ErrorEncoder is responsible for encoding an error to the subscriber reply. Users are encouraged to use custom ErrorEncoders to encode errors to their replies, and will likely want to pass and check for their own error types.

type PublisherResponseFunc

type PublisherResponseFunc func(context.Context, *amqp.Delivery) context.Context

PublisherResponseFunc may take information from an AMQP request and make the response available for consumption. PublisherResponseFunc are only executed in publishers, after a request has been made, but prior to it being decoded.

type RequestFunc

RequestFunc may take information from a publisher request and put it into a request context. In Subscribers, RequestFuncs are executed prior to invoking the endpoint.

type ResponsePublisher

type ResponsePublisher func(
	context.Context,
	*sarama.ConsumerMessage,
	Channel,
	*sarama.ProducerMessage,
) error

ResponsePublisher functions are executed by the subscriber to publish response object to the original sender. Please note that the word "publisher" does not refer to the publisher of pub/sub. Rather, publisher is merely a function that publishes, or sends responses.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber wraps an endpoint and provides a handler for AMQP Delivery messages.

func NewSubscriber

func NewSubscriber(
	e endpoint.Endpoint,
	dec DecodeRequestFunc,
	enc EncodeResponseFunc,
	options ...SubscriberOption,
) *Subscriber

NewSubscriber constructs a new subscriber, which provides a handler for AMQP Delivery messages.

func (Subscriber) ServeDelivery

func (s Subscriber) ServeDelivery(ch Channel) func(deliv *sarama.ConsumerMessage)

ServeDelivery handles AMQP Delivery messages It is strongly recommended to use *amqp.Channel as the Channel interface implementation.

type SubscriberOption

type SubscriberOption func(*Subscriber)

SubscriberOption sets an optional parameter for subscribers.

func SubscriberAfter

func SubscriberAfter(after ...SubscriberResponseFunc) SubscriberOption

SubscriberAfter functions are executed on the subscriber reply after the endpoint is invoked, but before anything is published to the reply.

func SubscriberBefore

func SubscriberBefore(before ...RequestFunc) SubscriberOption

SubscriberBefore functions are executed on the publisher delivery object before the request is decoded.

func SubscriberErrorEncoder

func SubscriberErrorEncoder(ee ErrorEncoder) SubscriberOption

SubscriberErrorEncoder is used to encode errors to the subscriber reply whenever they're encountered in the processing of a request. Clients can use this to provide custom error formatting. By default, errors will be published with the DefaultErrorEncoder.

func SubscriberErrorHandler

func SubscriberErrorHandler(errorHandler transport.ErrorHandler) SubscriberOption

SubscriberErrorHandler is used to handle non-terminal errors. By default, non-terminal errors are ignored. This is intended as a diagnostic measure. Finer-grained control of error handling, including logging in more detail, should be performed in a custom SubscriberErrorEncoder which has access to the context.

func SubscriberErrorLogger

func SubscriberErrorLogger(logger log.Logger) SubscriberOption

SubscriberErrorLogger is used to log non-terminal errors. By default, no errors are logged. This is intended as a diagnostic measure. Finer-grained control of error handling, including logging in more detail, should be performed in a custom SubscriberErrorEncoder which has access to the context. Deprecated: Use SubscriberErrorHandler instead.

func SubscriberResponsePublisher

func SubscriberResponsePublisher(rp ResponsePublisher) SubscriberOption

SubscriberResponsePublisher is used by the subscriber to deliver response objects to the original sender. By default, the DefaultResponsePublisher is used.

type SubscriberResponseFunc

SubscriberResponseFunc may take information from a request context and use it to manipulate a Publisher. SubscriberResponseFuncs are only executed in subscribers, after invoking the endpoint but prior to publishing a reply.

func SetAckAfterEndpoint

func SetAckAfterEndpoint(multiple bool) SubscriberResponseFunc

SetAckAfterEndpoint returns a SubscriberResponseFunc that prompts the service to Ack the Delivery object after successfully evaluating the endpoint, and before it encodes the response. It is designed to be used by Subscribers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL