nats

package
v1.2.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2023 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrCreatingSubscriber = errors.New("error creating subscriber")
	ErrCreatingPublisher  = errors.New("error creating publisher")
)

NATS Errors

Functions

func DisconnectErrorCallback

func DisconnectErrorCallback(logger log.Logger) natn.ConnErrHandler

DisconnectErrorCallback is called when the connection to nats server is lost

func EncodeJSONRequest

func EncodeJSONRequest(_ context.Context, msg *natn.Msg, request interface{}) error

EncodeJSONRequest is an Encoder that serializes the request as a JSON object to the Data of the Msg. Many JSON-over-NATS services can use it as a sensible default.

func NoOpErrorEncoder

func NoOpErrorEncoder(context.Context, error, string, *natn.Conn)

func NoOpResponseHandler

func NoOpResponseHandler(context.Context, string, *natn.Conn, interface{}) error

func ReconnectCallback

func ReconnectCallback(logger log.Logger) natn.ConnHandler

ReconnectCallback is called when the connection to nats server is re-established

Types

type AfterFunc

type AfterFunc func(context.Context, *natn.Conn) context.Context

type AfterPublish added in v1.1.0

type AfterPublish func(context.Context, *natn.Msg, error)

After is a function called after every message sent to NATS

type BeforeFunc

type BeforeFunc func(context.Context, *natn.Msg) context.Context

type BeforePublish added in v1.1.0

type BeforePublish func(context.Context, *natn.Msg) error

Before is a function that is called before every message sent to NATS

type ConnectionErrHandler added in v1.2.7

type ConnectionErrHandler func(t *Transport, e error)

type Decoder

type Decoder func(context.Context, *natn.Msg) (request interface{}, err error)

Decoder decodes the message received on NATS and converts into business entity

type ErrorEncoder

type ErrorEncoder kitn.ErrorEncoder

type ErrorHandler

type ErrorHandler interface{ transport.ErrorHandler }

type PublishErrorHandler added in v1.1.0

type PublishErrorHandler func(context.Context, error) error

PublishErrorHandler is a function that is called when an error occurs

type PublishMessageEncoder added in v1.1.0

type PublishMessageEncoder func(cx context.Context, sub string, data interface{}) (*natn.Msg, error)

PublishMessageEncoder encodes the value passed to it and converts to NATS message

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

publisher publishes message on NATS

func NewPublisher

func NewPublisher(connstr string, options ...PublisherOption) (*Publisher, error)

func (*Publisher) Endpoint

func (p *Publisher) Endpoint(sub string) endpoint.Endpoint

Endpoint returns a usable endpoint that invokes the remote endpoint.

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, sub string, data interface{}) error

Publish publishes the message on NATS

type PublisherOption

type PublisherOption func(*Publisher)

PublisherOption lets you modify properties for publisher

func WithAfterPublish added in v1.1.0

func WithAfterPublish(afters ...AfterPublish) PublisherOption

func WithBeforePublish added in v1.1.0

func WithBeforePublish(befores ...BeforePublish) PublisherOption

func WithCustomPublisherMaxReconnect added in v1.1.0

func WithCustomPublisherMaxReconnect(maxReconnect int) PublisherOption

func WithCustomPublisherPingInterval added in v1.1.0

func WithCustomPublisherPingInterval(pingInterval time.Duration) PublisherOption

func WithCustomPublisherTimeout added in v1.1.0

func WithCustomPublisherTimeout(timeout time.Duration) PublisherOption

func WithErrorHandler added in v1.1.0

func WithErrorHandler(handler PublishErrorHandler) PublisherOption

func WithPublishHeader added in v1.1.0

func WithPublishHeader(headers natn.Header) PublisherOption

func WithPublishMessageEncoder added in v1.1.0

func WithPublishMessageEncoder(encoder PublishMessageEncoder) PublisherOption

func WithPublisherName added in v1.1.0

func WithPublisherName(name string) PublisherOption

func WithPublisherSubjectPrefix added in v1.1.0

func WithPublisherSubjectPrefix(prefix string) PublisherOption

type ResponseHandler

type ResponseHandler func(context.Context, string, *natn.Conn, interface{}) error

ResponseHandler handles the endpoint response

type Subscriber

type Subscriber interface {
	Id() string
	Topic() string
	Group() string
	IsValid() bool
}

type SubscriberOption

type SubscriberOption func(*subscriber)

SubscriberOption provides set of options to modify a Subscriber

func WithAfterFuncsSubscriberOption

func WithAfterFuncsSubscriberOption(fns ...AfterFunc) SubscriberOption

func WithBeforeFuncsSubscriberOption

func WithBeforeFuncsSubscriberOption(fns ...BeforeFunc) SubscriberOption

func WithDecoderSubscriberOption

func WithDecoderSubscriberOption(fn Decoder) SubscriberOption

func WithEndpointMiddleware

func WithEndpointMiddleware(m endpoint.Middleware) SubscriberOption

HandlerWithEndpointMiddleware provides an ability to add a middleware of the base type

func WithEndpointSubscriberOption

func WithEndpointSubscriberOption(end endpoint.Endpoint) SubscriberOption

func WithErrorEncoderSubscriberOption

func WithErrorEncoderSubscriberOption(e ErrorEncoder) SubscriberOption

func WithErrorhandlerSubscriberOption

func WithErrorhandlerSubscriberOption(e ErrorHandler) SubscriberOption

func WithId

func WithId(id string) SubscriberOption

func WithQGroupSubscriberOption

func WithQGroupSubscriberOption(qGroup string) SubscriberOption

func WithSubjectSubscriberOption

func WithSubjectSubscriberOption(sub string) SubscriberOption

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport is transport server for natn.IO connection

func NewTransport

func NewTransport(
	closeCh chan struct{},
	options ...TransportOption,
) (*Transport, error)

NewTransport returns a new NATS transport

func (*Transport) Close

func (tr *Transport) Close() (err error)

Close shuts down Transport

func (*Transport) Open

func (tr *Transport) Open() error

Open starts the Transport

func (*Transport) Subscribe

func (tr *Transport) Subscribe(
	options ...SubscriberOption,
) (Subscriber, error)

func (*Transport) Subscribers

func (tr *Transport) Subscribers() []Subscriber

func (*Transport) Unsubscribe

func (tr *Transport) Unsubscribe(id string) error

type TransportOption

type TransportOption func(*Transport)

TransportOption is optional parameters for NATS Transport

func WithConnectionErrorHandler added in v1.2.7

func WithConnectionErrorHandler(h ConnectionErrHandler) TransportOption

WithConnectionErrorHandler sets a handler for connection errors

func WithDisconnectCallback

func WithDisconnectCallback(fn func(nc *natn.Conn, err error)) TransportOption

func WithFlushTimeout

func WithFlushTimeout(t time.Duration) TransportOption

WithFlushTimeout sets a timeout that we will wait for publisher to complete flushing the content on nats server before terminating connection

func WithLogging

func WithLogging(logger log.Logger) TransportOption

WithLogging sets logging for Transport, subscribers & publishers

func WithName

func WithName(n string) TransportOption

func WithNoRandomize

func WithNoRandomize(noRandomize bool) TransportOption

func WithReconnectCallback

func WithReconnectCallback(fn func(nc *natn.Conn)) TransportOption

func WithServers

func WithServers(servers []string) TransportOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL