transport

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2022 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package transport provides a Kafka transport.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func EncodeJSONRequest

func EncodeJSONRequest(_ context.Context, msg *kafka.Message, request interface{}) error

EncodeJSONRequest is an EncodeRequestFunc that serializes the request as a JSON object to the Message value. Many services can use it as a sensible default.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer wraps an endpoint and implements kafka.Handler.

func NewConsumer

func NewConsumer(
	e endpoint.Endpoint,
	dec DecodeRequestFunc,
	opts ...ConsumerOption,
) *Consumer

NewConsumer constructs a new consumer, which implements kafka.Handler and wraps the provided endpoint.

func (Consumer) Handle

func (c Consumer) Handle(ctx context.Context, msg *kafka.Message) (err error)

Handle implements kafka.Handler.

type ConsumerFinalizerFunc

type ConsumerFinalizerFunc func(ctx context.Context, msg *kafka.Message, err error)

ConsumerFinalizerFunc can be used to perform work at the end of message processing, after the response has been constructed. The principal intended use is for request logging.

type ConsumerOption

type ConsumerOption func(consumer *Consumer)

ConsumerOption sets an optional parameter for consumer.

func ConsumerAfter

func ConsumerAfter(after ...ConsumerResponseFunc) ConsumerOption

ConsumerAfter functions are executed on the consumer reply after the endpoint is invoked, but before anything is published to the reply.

func ConsumerBefore

func ConsumerBefore(before ...RequestFunc) ConsumerOption

ConsumerBefore functions are executed on the consumer message object before the request is decoded.

func ConsumerErrorHandler

func ConsumerErrorHandler(errorHandler transport.ErrorHandler) ConsumerOption

ConsumerErrorHandler is used to handle non-terminal errors. By default, non-terminal errors are ignored. This is intended as a diagnostic measure.

func ConsumerFinalizer

func ConsumerFinalizer(f ...ConsumerFinalizerFunc) ConsumerOption

ConsumerFinalizer is executed at the end of every message processing. By default, no finalizer is registered.

type ConsumerResponseFunc

type ConsumerResponseFunc func(ctx context.Context, response interface{}) context.Context

ConsumerResponseFunc may take information from a request context and use it to manipulate a Producer. ConsumerResponseFuncs are only executed in consumers, after invoking the endpoint but prior to publishing a reply.

type DecodeRequestFunc

type DecodeRequestFunc func(ctx context.Context, msg *kafka.Message) (request interface{}, err error)

DecodeRequestFunc extracts a user-domain request object from an Kafka message. It is designed to be used in Kafka Consumers.

type EncodeRequestFunc

type EncodeRequestFunc func(context.Context, *kafka.Message, interface{}) error

EncodeRequestFunc encodes the passed request object into an Kafka message object. It is designed to be used in Kafka Producers.

type EncodeResponseFunc

type EncodeResponseFunc func(context.Context, interface{}) error

EncodeResponseFunc encodes the passed response object into an Kafka message object. It is designed to be used in Kafka Consumers.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer wraps single Kafka topic for message producing and implements endpoint.Endpoint.

func NewProducer

func NewProducer(
	handler kafka.Handler,
	topic string,
	enc EncodeRequestFunc,
	options ...ProducerOption,
) *Producer

NewProducer constructs a new producer for a single Kafka topic, which implements endpoint.Endpoint.

func (Producer) Endpoint

func (p Producer) Endpoint() endpoint.Endpoint

Endpoint returns a usable endpoint that invokes message producing.

type ProducerFinalizerFunc

type ProducerFinalizerFunc func(ctx context.Context, err error)

ProducerFinalizerFunc can be used to perform work at the end of a producing Kafka message, after response is returned. The principal intended use is for error logging.

type ProducerOption

type ProducerOption func(producer *Producer)

ProducerOption sets an optional parameter for producers.

func ProducerAfter

func ProducerAfter(after ...ProducerResponseFunc) ProducerOption

ProducerAfter adds one or more ProducerResponseFuncs, which are applied to the context after successful message producing. This is useful for context-manipulation operations.

func ProducerBefore

func ProducerBefore(before ...RequestFunc) ProducerOption

ProducerBefore sets the RequestFuncs that are applied to the outgoing producer request before it's invoked.

func ProducerFinalizer

func ProducerFinalizer(f ...ProducerFinalizerFunc) ProducerOption

ProducerFinalizer adds one or more ProducerFinalizerFuncs to be executed at the end of producing Kafka message. Finalizers are executed in the order in which they were added. By default, no finalizer is registered.

func ProducerResponse

func ProducerResponse(response interface{}) ProducerOption

ProducerResponse sets the successful response value

type ProducerResponseFunc

type ProducerResponseFunc func(ctx context.Context) context.Context

ProducerResponseFunc may take information from a request context. ProducerResponseFunc are only executed in producers, after a request has been produced.

type RequestFunc

type RequestFunc func(ctx context.Context, msg *kafka.Message) context.Context

RequestFunc may take information from a Kafka message and put it into a request context. In Consumers, RequestFuncs are executed prior to invoking the endpoint.

type Router

type Router map[string][]kafka.Handler

Router represents mapping topic -> []kafka.Handler and implements kafka.Handler with routing handlers by topic.

func (Router) AddHandler

func (r Router) AddHandler(topic string, handler kafka.Handler) Router

AddHandler appends kafka.Handler for specific topic.

func (Router) Handle

func (r Router) Handle(ctx context.Context, msg *kafka.Message) error

Handle implements kafka.Handler.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL