nats

package
v1.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2021 License: MIT Imports: 10 Imported by: 14

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewStanConnection

func NewStanConnection(config *StanConnConfig) (stan.Conn, error)

Types

type GobMarshaler

type GobMarshaler struct{}

GobMarshaler is marshaller which is using Gob to marshal Watermill messages.

func (GobMarshaler) Marshal

func (GobMarshaler) Marshal(topic string, msg *message.Message) ([]byte, error)

func (GobMarshaler) Unmarshal

func (GobMarshaler) Unmarshal(stanMsg *stan.Msg) (*message.Message, error)

type Marshaler

type Marshaler interface {
	Marshal(topic string, msg *message.Message) ([]byte, error)
}

type MarshalerUnmarshaler

type MarshalerUnmarshaler interface {
	Marshaler
	Unmarshaler
}

type StanConnConfig

type StanConnConfig struct {
	// ClusterID is the NATS Streaming cluster ID.
	ClusterID string

	// ClientID is the NATS Streaming client ID to connect with.
	// ClientID can contain only alphanumeric and `-` or `_` characters.
	//
	// Using DurableName causes the NATS Streaming server to track
	// the last acknowledged message for that ClientID + DurableName.
	ClientID string

	// StanOptions are custom []stan.Option passed to the connection.
	// It is also used to provide connection parameters, for example:
	// 		stan.NatsURL("nats://localhost:4222")
	StanOptions []stan.Option
}

type StreamingPublisher

type StreamingPublisher struct {
	// contains filtered or unexported fields
}

func NewStreamingPublisher

func NewStreamingPublisher(config StreamingPublisherConfig, logger watermill.LoggerAdapter) (*StreamingPublisher, error)

NewStreamingPublisher creates a new StreamingPublisher.

When using custom NATS hostname, you should pass it by options StreamingPublisherConfig.StanOptions:

// ...
StanOptions: []stan.Option{
	stan.NatsURL("nats://your-nats-hostname:4222"),
}
// ...

func NewStreamingPublisherWithStanConn

func NewStreamingPublisherWithStanConn(conn stan.Conn, config StreamingPublisherPublishConfig, logger watermill.LoggerAdapter) (*StreamingPublisher, error)

func (StreamingPublisher) Close

func (p StreamingPublisher) Close() error

func (StreamingPublisher) Publish

func (p StreamingPublisher) Publish(topic string, messages ...*message.Message) error

Publish publishes message to NATS.

Publish will not return until an ack has been received from NATS Streaming. When one of messages delivery fails - function is interrupted.

type StreamingPublisherConfig

type StreamingPublisherConfig struct {
	// ClusterID is the NATS Streaming cluster ID.
	ClusterID string

	// ClientID is the NATS Streaming client ID to connect with.
	// ClientID can contain only alphanumeric and `-` or `_` characters.
	ClientID string

	// StanOptions are custom options for a connection.
	StanOptions []stan.Option

	// Marshaler is marshaler used to marshal messages to stan format.
	Marshaler Marshaler
}

func (StreamingPublisherConfig) GetStreamingPublisherPublishConfig

func (c StreamingPublisherConfig) GetStreamingPublisherPublishConfig() StreamingPublisherPublishConfig

func (StreamingPublisherConfig) Validate

func (c StreamingPublisherConfig) Validate() error

type StreamingPublisherPublishConfig

type StreamingPublisherPublishConfig struct {
	// Marshaler is marshaler used to marshal messages to stan format.
	Marshaler Marshaler
}

type StreamingSubscriber

type StreamingSubscriber struct {
	// contains filtered or unexported fields
}

func NewStreamingSubscriber

func NewStreamingSubscriber(config StreamingSubscriberConfig, logger watermill.LoggerAdapter) (*StreamingSubscriber, error)

NewStreamingSubscriber creates a new StreamingSubscriber.

When using custom NATS hostname, you should pass it by options StreamingSubscriberConfig.StanOptions:

// ...
StanOptions: []stan.Option{
	stan.NatsURL("nats://your-nats-hostname:4222"),
}
// ...

func NewStreamingSubscriberWithStanConn

func NewStreamingSubscriberWithStanConn(conn stan.Conn, config StreamingSubscriberSubscriptionConfig, logger watermill.LoggerAdapter) (*StreamingSubscriber, error)

func (*StreamingSubscriber) Close

func (s *StreamingSubscriber) Close() error

func (*StreamingSubscriber) Subscribe

func (s *StreamingSubscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

Subscribe subscribes messages from NATS Streaming.

Subscribe will spawn SubscribersCount goroutines making subscribe.

func (*StreamingSubscriber) SubscribeInitialize

func (s *StreamingSubscriber) SubscribeInitialize(topic string) (err error)

type StreamingSubscriberConfig

type StreamingSubscriberConfig struct {
	// ClusterID is the NATS Streaming cluster ID.
	ClusterID string

	// ClientID is the NATS Streaming client ID to connect with.
	// ClientID can contain only alphanumeric and `-` or `_` characters.
	//
	// Using DurableName causes the NATS Streaming server to track
	// the last acknowledged message for that ClientID + DurableName.
	ClientID string

	// QueueGroup is the NATS Streaming queue group.
	//
	// All subscriptions with the same queue name (regardless of the connection they originate from)
	// will form a queue group. Each message will be delivered to only one subscriber per queue group,
	// using queuing semantics.
	//
	// It is recommended to set it with DurableName.
	// For non durable queue subscribers, when the last member leaves the group,
	// that group is removed. A durable queue group (DurableName) allows you to have all members leave
	// but still maintain state. When a member re-joins, it starts at the last position in that group.
	//
	// When QueueGroup is empty, subscribe without QueueGroup will be used.
	QueueGroup string

	// DurableName is the NATS streaming durable name.
	//
	// Subscriptions may also specify a “durable name” which will survive client restarts.
	// Durable subscriptions cause the server to track the last acknowledged message
	// sequence number for a client and durable name. When the client restarts/resubscribes,
	// and uses the same client ID and durable name, the server will resume delivery beginning
	// with the earliest unacknowledged message for this durable subscription.
	//
	// Doing this causes the NATS Streaming server to track
	// the last acknowledged message for that ClientID + DurableName.
	DurableName string

	// SubscribersCount determines wow much concurrent subscribers should be started.
	SubscribersCount int

	// CloseTimeout determines how long subscriber will wait for Ack/Nack on close.
	// When no Ack/Nack is received after CloseTimeout, subscriber will be closed.
	CloseTimeout time.Duration

	// How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered.
	// It is mapped to stan.AckWait option.
	AckWaitTimeout time.Duration

	// StanOptions are custom []stan.Option passed to the connection.
	// It is also used to provide connection parameters, for example:
	// 		stan.NatsURL("nats://localhost:4222")
	StanOptions []stan.Option

	// StanSubscriptionOptions are custom []stan.SubscriptionOption passed to subscription.
	StanSubscriptionOptions []stan.SubscriptionOption

	// Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format.
	Unmarshaler Unmarshaler
}

func (*StreamingSubscriberConfig) GetStreamingSubscriberSubscriptionConfig

func (c *StreamingSubscriberConfig) GetStreamingSubscriberSubscriptionConfig() StreamingSubscriberSubscriptionConfig

type StreamingSubscriberSubscriptionConfig

type StreamingSubscriberSubscriptionConfig struct {
	// StanSubscriptionOptions are custom []stan.SubscriptionOption passed to subscription.
	StanSubscriptionOptions []stan.SubscriptionOption

	// Unmarshaler is an unmarshaler used to unmarshaling messages from NATS format to Watermill format.
	Unmarshaler Unmarshaler
	// QueueGroup is the NATS Streaming queue group.
	//
	// All subscriptions with the same queue name (regardless of the connection they originate from)
	// will form a queue group. Each message will be delivered to only one subscriber per queue group,
	// using queuing semantics.
	//
	// It is recommended to set it with DurableName.
	// For non durable queue subscribers, when the last member leaves the group,
	// that group is removed. A durable queue group (DurableName) allows you to have all members leave
	// but still maintain state. When a member re-joins, it starts at the last position in that group.
	//
	// When QueueGroup is empty, subscribe without QueueGroup will be used.
	QueueGroup string

	// DurableName is the NATS streaming durable name.
	//
	// Subscriptions may also specify a “durable name” which will survive client restarts.
	// Durable subscriptions cause the server to track the last acknowledged message
	// sequence number for a client and durable name. When the client restarts/resubscribes,
	// and uses the same client ID and durable name, the server will resume delivery beginning
	// with the earliest unacknowledged message for this durable subscription.
	//
	// Doing this causes the NATS Streaming server to track
	// the last acknowledged message for that ClientID + DurableName.
	DurableName string

	// SubscribersCount determines wow much concurrent subscribers should be started.
	SubscribersCount int

	// How long subscriber should wait for Ack/Nack. When no Ack/Nack was received, message will be redelivered.
	// It is mapped to stan.AckWait option.
	AckWaitTimeout time.Duration
	// CloseTimeout determines how long subscriber will wait for Ack/Nack on close.
	// When no Ack/Nack is received after CloseTimeout, subscriber will be closed.
	CloseTimeout time.Duration
}

func (*StreamingSubscriberSubscriptionConfig) Validate

type Unmarshaler

type Unmarshaler interface {
	Unmarshal(*stan.Msg) (*message.Message, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL