Documentation
¶
Index ¶
- type Config
- type MsgCon
- func (mc *MsgCon) Config() Config
- func (mc *MsgCon) Connect() error
- func (mc *MsgCon) Connected() bool
- func (mc MsgCon) DefaultDurablePubOptions() []nats.PubOpt
- func (mc MsgCon) DefaultDurableSubOptions() []nats.SubOpt
- func (mc *MsgCon) Disconnect() error
- func (mc *MsgCon) Disconnected() bool
- func (mc *MsgCon) Encoder() nats.Encoder
- func (mc *MsgCon) JetStreamContext() *nats.JetStreamContext
- func (mc *MsgCon) NatsContext() *nats.Conn
- func (mc *MsgCon) Options() options
- func (mc MsgCon) Publish(subject string, payload interface{}) error
- func (mc MsgCon) PublishDurable(subject string, payload interface{}) (*nats.PubAck, error)
- func (mc MsgCon) Request(subject string, payload interface{}, timeout time.Duration) (*nats.Msg, error)
- func (mc *MsgCon) SetOptions(opts ...Option) error
- func (mc MsgCon) SubscribeAsync(subject string, handler nats.MsgHandler) (*nats.Subscription, error)
- func (mc MsgCon) SubscribeDurableAsync(streamName string, consumerName string, handler nats.MsgHandler) (*nats.Subscription, error)
- func (mc MsgCon) SubscribeDurableSync(streamName string, consumerName string) (*nats.Subscription, error)
- func (mc MsgCon) SubscribeSync(subject string) (*nats.Subscription, error)
- type Option
- func WithAdditionalJetStreamOptions(additionalJetStreamOptions ...nats.JSOpt) Option
- func WithAdditionalNatsOptions(additionalNatsOptions ...nats.Option) Option
- func WithEncoder(encoder nats.Encoder) Option
- func WithRequiredConsumers(streamName string, requiredConsumers ...nats.ConsumerConfig) Option
- func WithRequiredStreams(requiredStreams ...nats.StreamConfig) Option
- type PersistentConfigurationAction
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { ServerURL string `default:"nats://127.0.0.1:4222" split_words:"true" desc:"The url of the NATS server"` RequiredStreamsConfigurationAction PersistentConfigurationAction `` /* 147-byte string literal not displayed */ RequiredConsumersConfigurationAction PersistentConfigurationAction `` /* 149-byte string literal not displayed */ }
Config is the configuration used by a messaging connector. These are intended to be set by the ops people deploying the application. For options set by the calling application see Options. It is made to be processed by https://github.com/kelseyhightower/envconfig but could also be filled manually
type MsgCon ¶
type MsgCon struct {
// contains filtered or unexported fields
}
MsgCon is a messaging connector able to connect to the messaging infrastructure
func NewMessagingConnector ¶
NewMessagingConnector creates a new messaging connector with the given config
It does not yet actually connect to the messaging infrastructure. Use Connect for that.
func (*MsgCon) Connect ¶
Connect actually connects this messaging connector instance to the messaging infrastructure.
To customize the connection please call SetOptions.
This message can only be called once per messaging connector. Returns an error when called a second time.
func (MsgCon) DefaultDurablePubOptions ¶
func (mc MsgCon) DefaultDurablePubOptions() []nats.PubOpt
DefaultDurablePubOptions returns some sensible default PubOpts for the JetStream Publish Functions
func (MsgCon) DefaultDurableSubOptions ¶
func (mc MsgCon) DefaultDurableSubOptions() []nats.SubOpt
DefaultDurableSubOptions returns some sensible default SubOpts for the JetStream Sublish Functions
func (*MsgCon) Disconnect ¶
Disconnect this messaging connector. After this is called, there is no way to reconnect with this messaging connector.
func (*MsgCon) Disconnected ¶
func (*MsgCon) JetStreamContext ¶
func (mc *MsgCon) JetStreamContext() *nats.JetStreamContext
func (*MsgCon) NatsContext ¶
func (mc *MsgCon) NatsContext() *nats.Conn
func (MsgCon) Publish ¶
Publish publishes a message with the given payload on the given subject.
The payload may be anything that can be encoded with the selected encoder.
This method does not use JetStream, therefore no QOS is guaranteed. For reliable messaging see PublishDurable
Returns an error when encoding fails ¶
For more information on the behavior, parameters, and return value see nats.Conn.Publish
func (MsgCon) PublishDurable ¶
Publish publishes a message with the given payload on the given subject.
The payload may be anything that can be encoded with the selected encoder.
This method does use JetStream and the messages are therefore durable. However you must make sure the Streams and Consumers are setup correctly.
Returns an error when encoding fails ¶
For more information on the behavior, parameters, and return value see nats.JetStreamContext.Publish
func (MsgCon) Request ¶
func (mc MsgCon) Request(subject string, payload interface{}, timeout time.Duration) (*nats.Msg, error)
Request will do a request with the given payload to the given subject and wait the given amount of time for the response.
The payload may be anything that can be encoded with the selected encoder.
This method does not use JetStream, therefore no QOS is guaranteed. Request-reply is not supported by JetStream. Reliability in request-reply can be achived by retrying the request on failure.
Returns an error when encoding fails ¶
For more information on the behavior, parameters, and return value see nats.Conn.Request
func (*MsgCon) SetOptions ¶
SetOptions sets the given options on the messaging connector.
Options are ment to be set by the calling application. For configuration supplied during deployment see Config.
Options set will be overwritten, not merged.
This method must be called becore calling Connect. Returns an error if the messaging connector is already connected.
Example setting some options: mc := NewMessagingConnector(...) mc.SetOptions(WithRequiredStreams(...), WithAdditionalNatsOptions(...), ...)
func (MsgCon) SubscribeAsync ¶
func (mc MsgCon) SubscribeAsync(subject string, handler nats.MsgHandler) (*nats.Subscription, error)
SubscribeAsync subscribes to a subject, retrieving messages asnchronously.
To decode the payload use MsgCon.Encoder.Decode ¶
This method does not use JetStream, therefore no QOS is guaranteed. For reliable messaging see SubscribeDurableAsync
For more information on the behavior, parameters, and return value see nats.Conn.Subscribe
func (MsgCon) SubscribeDurableAsync ¶
func (mc MsgCon) SubscribeDurableAsync(streamName string, consumerName string, handler nats.MsgHandler) (*nats.Subscription, error)
SubscribeDurableAsync durably subscribes to a subject, retrieving messages asnchronously. It uses the preconfigured consumer with the given name on the stream with the given name. The subject is defined by the configuration of the stream.
To decode the payload use MsgCon.Encoder.Decode ¶
This method does use JetStream and the messages are therefore durable. However you must make sure the Streams and Consumers are setup correctly.
For more information on the behavior, parameters, and return value see nats.JetStreamContext.Subscribe
func (MsgCon) SubscribeDurableSync ¶
func (mc MsgCon) SubscribeDurableSync(streamName string, consumerName string) (*nats.Subscription, error)
SubscribeDurableSync durably subscribes to a subject, retrieving messages synchronously. It uses the preconfigured consumer with the given name on the stream with the given name. The subject is defined by the configuration of the stream.
To decode the payload use MsgCon.Encoder.Decode ¶
This method does use JetStream and the messages are therefore durable. However you must make sure the Streams and Consumers are setup correctly.
For more information on the behavior, parameters, and return value see nats.JetStreamContext.SubscribeSync
func (MsgCon) SubscribeSync ¶
SubscribeSync subscribes to a subject, retrieving messages synchronously.
To decode the payload use MsgCon.Encoder.Decode ¶
This method does not use JetStream, therefore no QOS is guaranteed. For reliable messaging see SubscribeDurableSync
For more information on the behavior, parameters, and return value see nats.Conn.SubscribeSync
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option is one option that can be set for a messaging connector
func WithAdditionalJetStreamOptions ¶
func WithAdditionalJetStreamOptions(additionalJetStreamOptions ...nats.JSOpt) Option
WithAdditionalJetStreamOptions returns the corresponding Option for passing the given additional options the the nats.Conn.JetStream call
func WithAdditionalNatsOptions ¶
func WithAdditionalNatsOptions(additionalNatsOptions ...nats.Option) Option
WithAdditionalNatsOptions returns the corresponding Option for passing the given additional options to the nats.Connect call
func WithEncoder ¶
func WithEncoder(encoder nats.Encoder) Option
WithEncoder returns the corresponding Option for using the given encoder for encoding payloads
func WithRequiredConsumers ¶
WithRequiredConsumers returns the corresponding Option indicating that consumers with the given config should be configured for the given stream during Connect
Consumers are part of JetStream, the protocol on top of NATS with durability gurantees. Streams should be properly configured when using one of the durable subscribe functions.
This will only overwrite the list of required consumers for the given stream name
func WithRequiredStreams ¶
func WithRequiredStreams(requiredStreams ...nats.StreamConfig) Option
WithRequiredStreams returns the corresponding Option indicating that streams with the given config should be configured during Connect
Streams are part of JetStream, the protocol on top of NATS with durability gurantees. Streams should be properly configured when using one of the durable publish or subscribe functions.
type PersistentConfigurationAction ¶
type PersistentConfigurationAction string
PersistentConfigurationAction is a type indictaing what to do with configuration options persisted in the nats infrastructure
const ( // Do not configure instances of this option ever DoNotTouch PersistentConfigurationAction = "DoNotTouch" // Create required instances of this option when they are not present but do not update existing instances CreateIfMissing PersistentConfigurationAction = "CreateIfMissing" // Always update the configuration of the required instances of this option AlwaysUpdate PersistentConfigurationAction = "AlwaysUpdate" )
func (*PersistentConfigurationAction) Decode ¶
func (action *PersistentConfigurationAction) Decode(value string) error
func (PersistentConfigurationAction) IsValid ¶
func (a PersistentConfigurationAction) IsValid() bool