Documentation ¶
Overview ¶
Package stanmsg contains DurConn which implements nproto.MsgPublisher/nproto.MsgAsyncPublisher and nproto.MsgSubscriber.
DurConn is a "durable" connection to nats streaming server which handles reconnecting and resubscription automatically.
DurConn(s) with same subject prefix within a same nats streaming cluster form a message delivery namespace.
Metadata (nproto.MD) attached in publisher (outgoing) side will be passed unmodified to subscriber (incoming) side.
Index ¶
- Variables
- type DurConn
- func (dc *DurConn) Close() error
- func (dc *DurConn) Publish(ctx context.Context, subject string, msgData []byte) error
- func (dc *DurConn) PublishAsync(ctx context.Context, subject string, msgData []byte, cb func(error)) error
- func (dc *DurConn) Subscribe(subject, queue string, handler nproto.MsgHandler, opts ...interface{}) error
- type Option
- func OptConnectCb(fn func(sc stan.Conn)) Option
- func OptContext(ctx context.Context) Option
- func OptDisconnectCb(fn func(sc stan.Conn)) Option
- func OptLogger(logger *zerolog.Logger) Option
- func OptMaxConcurrentResubscription(n int) Option
- func OptPings(interval, maxOut int) Option
- func OptPubAckWait(t time.Duration) Option
- func OptReconnectWait(t time.Duration) Option
- func OptSubjectPrefix(subjectPrefix string) Option
- func OptTaskRunner(runner taskrunner.TaskRunner) Option
- type SubOption
Constants ¶
This section is empty.
Variables ¶
var ( // DefaultSubjectPrefix is the default value of OptSubjectPrefix. DefaultSubjectPrefix = "npmsg" // DefaultReconnectWait is the default value of OptReconnectWait. DefaultReconnectWait = 5 * time.Second // DefaultSubRetryWait is the default value of SubOptRetryWait. DefaultSubRetryWait = 5 * time.Second // DefaultPubAckWait is the default value of OptPubAckWait. DefaultPubAckWait = 2 * time.Second )
var ( // ErrNCMaxReconnect is returned if nc has MaxReconnects >= 0. ErrNCMaxReconnect = errors.New("nproto.stanmsg.NewDurConn: nats.Conn should have MaxReconnects < 0") // ErrClosed is returned when the DurConn has been closed. ErrClosed = errors.New("nproto.stanmsg.DurConn: Closed.") // ErrNotConnected is returned when the underly connection has not been established. ErrNotConnected = errors.New("nproto.stanmsg.DurConn: Not connected.") // ErrBadSubscriptionOpt is returned if option passing to DurConn.Subscribe is not SubOption. ErrBadSubscriptionOpt = errors.New("nproto.stanmsg.DurConn: Expect stanmsg.SubOption.") // ErrDupSubscription is returned when duplicated (subject, queue) pair. ErrDupSubscription = func(subject, queue string) error { return fmt.Errorf( "nproto.stanmsg.DurConn: Duplicated subscription (%q, %q)", subject, queue, ) } )
Functions ¶
This section is empty.
Types ¶
type DurConn ¶
type DurConn struct {
// contains filtered or unexported fields
}
DurConn implements nproto.MsgAsyncPublisher and nproto.MsgSubscriber interfaces.
func NewDurConn ¶
NewDurConn creates a new DurConn. `nc` must have MaxReconnect < 0 set (e.g. Always reconnect). `clusterID` is nats-streaming server's cluster id.
type Option ¶
Option is option in creating DurConn.
func OptConnectCb ¶
func OptConnectCb(fn func(sc stan.Conn)) Option
OptConnectCb sets a callback invoked each time a new stan.Conn is established.
func OptContext ¶
OptContext sets the base context for handlers.
func OptDisconnectCb ¶
func OptDisconnectCb(fn func(sc stan.Conn)) Option
OptDisconnectCb sets a callback invoked each time a stan.Conn lost.
func OptMaxConcurrentResubscription ¶
OptMaxConcurrentResubscription sets the maximum concurrent subscription number.
func OptReconnectWait ¶
OptReconnectWait sets reconnection wait, e.g. time between reconnections.
func OptSubjectPrefix ¶
OptSubjectPrefix sets message subject prefix. Default "npmsg": If you publish a message with subject "xxx", then the actual subject is "npmsg.xxx".
func OptTaskRunner ¶
func OptTaskRunner(runner taskrunner.TaskRunner) Option
OptTaskRunner sets the task runner for handlers. The runner will be closed when closing the DurConn.
type SubOption ¶
type SubOption func(*subscription) error
SubOption is option in subscription.
func SubOptRetryWait ¶
SubOptRetryWait sets the wait time between (re)subscription.
func SubOptSubscribeCb ¶
SubOptSubscribeCb sets a callback invoked each time a subscription is established. NOTE: The callback is also called when resubscription after reconnection.