Documentation ¶
Overview ¶
Package stanmsg contains a msg implemenation using nats-streaming-server as transport.
Index ¶
- Variables
- func NewPbJsonPublisher(dc *DurConn) MsgAsyncPublisherFunc
- func NewPbJsonSubscriber(dc *DurConn) MsgSubscriberFunc
- type DurConn
- type DurConnOption
- func DCOptConnectCb(cb func(stan.Conn)) DurConnOption
- func DCOptContext(ctx context.Context) DurConnOption
- func DCOptDisconnectCb(cb func(stan.Conn)) DurConnOption
- func DCOptLogger(logger logr.Logger) DurConnOption
- func DCOptReconnectWait(t time.Duration) DurConnOption
- func DCOptRunner(runner taskrunner.TaskRunner) DurConnOption
- func DCOptStanPingInterval(interval int) DurConnOption
- func DCOptStanPingMaxOut(maxOut int) DurConnOption
- func DCOptStanPubAckWait(t time.Duration) DurConnOption
- func DCOptSubRetryWait(t time.Duration) DurConnOption
- func DCOptSubjectPrefix(subjectPrefix string) DurConnOption
- func DCOptSubscribeCb(cb func(stan.Conn, MsgSpec)) DurConnOption
- type SubOption
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNCMaxReconnect is returned if nc has MaxReconnects >= 0. ErrNCMaxReconnect = errors.New("stanmsg.DurConn nats.Conn should have MaxReconnects < 0") // ErrClosed is returned when DurConn is closed ErrClosed = errors.New("stanmsg.DurConn closed") // ErrDupSubscription is returned when (subjectName, queue) already subscribed. ErrDupSubscription = errors.New("stanmsg.DurConn duplicated subscription") // ErrNotConnected is returned when stan connection has not connected yet. ErrNotConnected = errors.New("stanmsg.DurConn not connected") )
var ( // DefaultSubjectPrefix is the default value of DCOptSubjectPrefix. DefaultSubjectPrefix = "stanmsg" // DefaultReconnectWait is the default value of DCOptReconnectWait. DefaultReconnectWait = 5 * time.Second // DefaultStanPingInterval is the default value of DCOptStanPingInterval. DefaultStanPingInterval = stan.DefaultPingInterval // DefaultStanPingMaxOut is the default value of DCOptStanPingMaxOut. DefaultStanPingMaxOut = stan.DefaultPingMaxOut // DefaultStanPubAckWait is the default value of DCOptStanPubAckWait. DefaultStanPubAckWait = 2 * time.Second // DefaultSubRetryWait is the default value of DCOptSubRetryWait. DefaultSubRetryWait = 5 * time.Second )
Functions ¶
func NewPbJsonPublisher ¶
func NewPbJsonPublisher(dc *DurConn) MsgAsyncPublisherFunc
NewPbJsonPublisher creates a msg publisher using protobuf or json for encoding:
- If msg is proto.Message, then use protobuf.
- Otherwise use json.
func NewPbJsonSubscriber ¶
func NewPbJsonSubscriber(dc *DurConn) MsgSubscriberFunc
NewPbJsonSubscriber creates a msg subscriber using protobuf or json for decoding.
Types ¶
type DurConn ¶
type DurConn struct {
// contains filtered or unexported fields
}
DurConn is 'durable connection' to nats-streaming-server which handles reconnect and resubscription automatically.
func NewDurConn ¶
func NewDurConn(nc *nats.Conn, clusterID string, opts ...DurConnOption) (durConn *DurConn, err error)
NewDurConn creates a new DurConn. `nc` must have MaxReconnect < 0 (e.g. never give up trying to reconnect). `clusterID` is the nats-streaming-server's cluster id.
func (*DurConn) Close ¶
func (dc *DurConn) Close()
Close shutdowns the DurConn: closes handler runner and disconnects from nats-streaming-server.
func (*DurConn) NewPublisher ¶
NewPublisher creates a publisher using specified encoder.
func (*DurConn) NewSubscriber ¶
NewSubscriber creates a subscriber using specified decoder.
type DurConnOption ¶
DurConnOption is option in creating DurConn.
func DCOptConnectCb ¶
func DCOptConnectCb(cb func(stan.Conn)) DurConnOption
DCOptConnectCb sets callback when nats streaming connection establish.
func DCOptContext ¶
func DCOptContext(ctx context.Context) DurConnOption
DCOptContext sets base context for handlers.
func DCOptDisconnectCb ¶
func DCOptDisconnectCb(cb func(stan.Conn)) DurConnOption
DCOptDisconnectCb sets callback when nats streaming connection lost due to unexpected errors.
func DCOptLogger ¶
func DCOptLogger(logger logr.Logger) DurConnOption
DCOptLogger sets logger for DurConn.
func DCOptReconnectWait ¶
func DCOptReconnectWait(t time.Duration) DurConnOption
DCOptReconnectWait sets the interval between reconnections.
func DCOptRunner ¶
func DCOptRunner(runner taskrunner.TaskRunner) DurConnOption
DCOptRunner sets runner for handlers.
func DCOptStanPingInterval ¶
func DCOptStanPingInterval(interval int) DurConnOption
DCOptStanPingInterval sets stan::Pings, must >= 1 (seconds).
func DCOptStanPingMaxOut ¶
func DCOptStanPingMaxOut(maxOut int) DurConnOption
DCOptStanPingMaxOut sets stan::Pings, must >= 2.
func DCOptStanPubAckWait ¶
func DCOptStanPubAckWait(t time.Duration) DurConnOption
DCOptStanPubAckWait sets stan::PubAckWait.
func DCOptSubRetryWait ¶
func DCOptSubRetryWait(t time.Duration) DurConnOption
DCOptSubRetryWait sets the interval between resubscriptions due to subscription error.
func DCOptSubjectPrefix ¶
func DCOptSubjectPrefix(subjectPrefix string) DurConnOption
DCOptSubjectPrefix sets subject prefix in nats streaming namespace.
func DCOptSubscribeCb ¶
func DCOptSubscribeCb(cb func(stan.Conn, MsgSpec)) DurConnOption
DCOptSubscribeCb sets callback when subscriptions subscribed.