stanmsg

package
v0.11.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2020 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package stanmsg contains DurConn which implements nproto.MsgPublisher/nproto.MsgAsyncPublisher and nproto.MsgSubscriber.

DurConn is a "durable" connection to nats streaming server which handles reconnecting and resubscription automatically.

DurConn(s) with same subject prefix within a same nats streaming cluster form a message delivery namespace.

Metadata (nproto.MD) attached in publisher (outgoing) side will be passed unmodified to subscriber (incoming) side.

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultSubjectPrefix is the default value of OptSubjectPrefix.
	DefaultSubjectPrefix = "npmsg"
	// DefaultReconnectWait is the default value of OptReconnectWait.
	DefaultReconnectWait = 5 * time.Second
	// DefaultSubRetryWait is the default value of SubOptRetryWait.
	DefaultSubRetryWait = 5 * time.Second
	// DefaultPubAckWait is the default value of OptPubAckWait.
	DefaultPubAckWait = 2 * time.Second
)
View Source
var (
	// ErrNCMaxReconnect is returned if nc has MaxReconnects >= 0.
	ErrNCMaxReconnect = errors.New("nproto.stanmsg.NewDurConn: nats.Conn should have MaxReconnects < 0")
	// ErrClosed is returned when the DurConn has been closed.
	ErrClosed = errors.New("nproto.stanmsg.DurConn: Closed.")
	// ErrNotConnected is returned when the underly connection has not been established.
	ErrNotConnected = errors.New("nproto.stanmsg.DurConn: Not connected.")
	// ErrBadSubscriptionOpt is returned if option passing to DurConn.Subscribe is not SubOption.
	ErrBadSubscriptionOpt = errors.New("nproto.stanmsg.DurConn: Expect stanmsg.SubOption.")
	// ErrDupSubscription is returned when duplicated (subject, queue) pair.
	ErrDupSubscription = func(subject, queue string) error {
		return fmt.Errorf(
			"nproto.stanmsg.DurConn: Duplicated subscription (%q, %q)",
			subject,
			queue,
		)
	}
)

Functions

This section is empty.

Types

type DurConn

type DurConn struct {
	// contains filtered or unexported fields
}

DurConn implements nproto.MsgAsyncPublisher and nproto.MsgSubscriber interfaces.

func NewDurConn

func NewDurConn(nc *nats.Conn, clusterID string, opts ...Option) (*DurConn, error)

NewDurConn creates a new DurConn. `nc` must have MaxReconnect < 0 set (e.g. Always reconnect). `clusterID` is nats-streaming server's cluster id.

func (*DurConn) Close

func (dc *DurConn) Close() error

Graceful close the conn.

func (*DurConn) Publish

func (dc *DurConn) Publish(ctx context.Context, subject string, msgData []byte) error

Publish implements nproto.MsgPublisher interface.

func (*DurConn) PublishAsync

func (dc *DurConn) PublishAsync(ctx context.Context, subject string, msgData []byte, cb func(error)) error

PublishAsync implements nproto.MsgAsyncPublisher interface.

func (*DurConn) Subscribe

func (dc *DurConn) Subscribe(subject, queue string, handler nproto.MsgHandler, opts ...interface{}) error

Subscribe implements nproto.MsgSubscriber interface. `opts` must be SubOption.

type Option

type Option func(*DurConn) error

Option is option in creating DurConn.

func OptConnectCb

func OptConnectCb(fn func(sc stan.Conn)) Option

OptConnectCb sets a callback invoked each time a new stan.Conn is established.

func OptContext

func OptContext(ctx context.Context) Option

OptContext sets the base context for handlers.

func OptDisconnectCb

func OptDisconnectCb(fn func(sc stan.Conn)) Option

OptDisconnectCb sets a callback invoked each time a stan.Conn lost.

func OptLogger

func OptLogger(logger *zerolog.Logger) Option

OptLogger sets structured logger.

func OptMaxConcurrentResubscription

func OptMaxConcurrentResubscription(n int) Option

OptMaxConcurrentResubscription sets the maximum concurrent subscription number.

func OptPings

func OptPings(interval, maxOut int) Option

OptPings sets stan.Pings.

func OptPubAckWait

func OptPubAckWait(t time.Duration) Option

OptPubAckWait sets stan.PubAckWait.

func OptReconnectWait

func OptReconnectWait(t time.Duration) Option

OptReconnectWait sets reconnection wait, e.g. time between reconnections.

func OptSubjectPrefix

func OptSubjectPrefix(subjectPrefix string) Option

OptSubjectPrefix sets message subject prefix. Default "npmsg": If you publish a message with subject "xxx", then the actual subject is "npmsg.xxx".

func OptTaskRunner

func OptTaskRunner(runner taskrunner.TaskRunner) Option

OptTaskRunner sets the task runner for handlers. The runner will be closed when closing the DurConn.

type SubOption

type SubOption func(*subscription) error

SubOption is option in subscription.

func SubOptAckWait

func SubOptAckWait(t time.Duration) SubOption

SubOptAckWait sets stan.AckWait.

func SubOptRetryWait

func SubOptRetryWait(t time.Duration) SubOption

SubOptRetryWait sets the wait time between (re)subscription.

func SubOptSubscribeCb

func SubOptSubscribeCb(fn func(sc stan.Conn, subject, queue string)) SubOption

SubOptSubscribeCb sets a callback invoked each time a subscription is established. NOTE: The callback is also called when resubscription after reconnection.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL