Documentation ¶
Overview ¶
Package events provides methods to initializ§e and interface with an events broker.
Package events provides types and methods to interact with a messaging stream broker.
Index ¶
- Variables
- func AsNatsConnection(n *NatsJetstream) *nats.Conn
- func AsNatsJetStreamContext(n *NatsJetstream) nats.JetStreamContext
- func AsNatsMsg(m Message) (*nats.Msg, error)
- func MustNatsMsg(m Message) *nats.Msg
- type EventType
- type Message
- type MsgCh
- type NatsConsumerOptions
- type NatsJetstream
- func (n *NatsJetstream) Close() error
- func (n *NatsJetstream) Open() error
- func (n *NatsJetstream) Publish(ctx context.Context, subjectSuffix string, data []byte) error
- func (n *NatsJetstream) PullMsg(_ context.Context, batch int) ([]Message, error)
- func (n *NatsJetstream) Subscribe(ctx context.Context) (MsgCh, error)
- type NatsOptions
- type NatsStreamOptions
- type ResourceType
- type Stream
- type StreamParameters
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNatsConfig is returned when the conf ErrNatsConfig = errors.New("error in NATs Jetstream configuration") // ErrNatsConn is returned when an error occurs in connecting to NATS. ErrNatsConn = errors.New("error opening nats connection") // ErrNatsJetstream is returned when an error occurs in setting up the NATS Jetstream context. ErrNatsJetstream = errors.New("error in NATS Jetstream") // ErrNatsJetstreamAddStream os returned when an attempt to add a NATS Jetstream fails. ErrNatsJetstreamAddStream = errors.New("error adding stream to NATS Jetstream") // ErrNatsJetstreamAddConsumer is returned when theres an error adding a consumer to the NATS Jetstream. ErrNatsJetstreamAddConsumer = errors.New("error adding consumer on NATS Jetstream") // ErrNatsJetstreamUpdateConsumer is returned when theres an error updating a consumer configuration on the NATS Jetstream. ErrNatsJetstreamUpdateConsumer = errors.New("error updating consumer configuration on NATS Jetstream") // ErrNatsMsgPull is returned when theres and error pulling a message from a NATS Jetstream. ErrNatsMsgPull = errors.New("error fetching message from NATS Jetstream") // ErrSubscription is returned when an error in the consumer subscription occurs. ErrSubscription = errors.New("error subscribing to stream") )
Functions ¶
func AsNatsConnection ¶
func AsNatsConnection(n *NatsJetstream) *nats.Conn
Add some conversions for functions/APIs that expect NATS primitive types. This allows consumers of NatsJetsream to convert easily to the types they need, without exporting the members or coercing and direct clients/holders of NatsJetstream to do this conversion. AsNatsConnection exposes the otherwise private NATS connection pointer
func AsNatsJetStreamContext ¶
func AsNatsJetStreamContext(n *NatsJetstream) nats.JetStreamContext
AsNatsJetstreamContext exposes the otherwise private NATS JetStreamContext
func MustNatsMsg ¶
func MustNatsMsg(m Message) *nats.Msg
MustNatsMsg will panic if the type assertion fails
Types ¶
type EventType ¶
type EventType string
EventType is a type identifying the Event kind that has occurred on an object.
type Message ¶
type Message interface { // Ack the message as processed on the stream. Ack() error // Nak the message as not processed on the stream. Nak() error // Term signals to the broker that the message processing has failed and the message // must not be redelivered. Term() error // InProgress resets the redelivery timer for the message on the stream // to indicate the message is being worked on. InProgress() error // Subject returns the message subject. Subject() string // Data returns the data contained in the message. Data() []byte // ExtractOtelTraceContext returns a context populated with the parent trace if any. ExtractOtelTraceContext(ctx context.Context) context.Context }
Message interface defines the methods available on the messages received on the stream.
These methods are to be implemented by the stream broker for its messages.
type NatsConsumerOptions ¶
type NatsConsumerOptions struct { // Pull indicates this is a pull based subscriber Pull bool `mapstructure:"pull"` // Sets the durable consumer name Name string `mapstructure:"name"` // Sets the queue group for this consumer QueueGroup string `mapstructure:"queue_group"` AckWait time.Duration `mapstructure:"ack_wait"` MaxAckPending int `mapstructure:"max_ack_pending"` // Setting the FilterSubject turns this consumer into a push based consumer, // With no filter subject, the consumer is a pull based consumer. // // Although if the stream is a WorkQueue stream, then this must be set // and should be unique between consumers on the stream. FilterSubject string `mapstructure:"filter_subject"` // Subscribe to these subjects through this consumer. SubscribeSubjects []string `mapstructure:"subscribe_subjects"` }
NatsConsumerOptions is the parameters for the NATS consumer configuration.
Note: Nats consumers are views into the stream, multiple subscribers may bind on a consumer.
type NatsJetstream ¶
type NatsJetstream struct {
// contains filtered or unexported fields
}
NatsJetstream wraps the NATs JetStream connector to implement the Stream interface.
func NewJetstreamFromConn ¶
func NewJetstreamFromConn(c *nats.Conn) *NatsJetstream
NewJetstreamFromConn takes an already established NATS connection pointer and returns a NatsJetstream pointer
func NewNatsBroker ¶
func NewNatsBroker(params StreamParameters) (*NatsJetstream, error)
NewNatsBroker validates the given stream broker parameters and returns a stream broker implementation.
func (*NatsJetstream) Close ¶
func (n *NatsJetstream) Close() error
Close drains any subscriptions and closes the NATS Jetstream connection.
func (*NatsJetstream) Open ¶
func (n *NatsJetstream) Open() error
Open connects to the NATS Jetstream.
func (*NatsJetstream) Publish ¶
Publish publishes an event onto the NATS Jetstream. The caller is responsible for message addressing and data serialization. NOTE: The subject passed here will be prepended with any configured PublisherSubjectPrefix.
type NatsOptions ¶
type NatsOptions struct { // URL is the NATS server URL URL string `mapstructure:"url"` // AppName is the name of the application connecting to the // NATS stream, this parameter is used to open the NATS connection // and bind as a durable consumer. AppName string `mapstructure:"app_name"` // NATS stream user, when no creds file is provided. StreamUser string `mapstructure:"stream_user"` // NATS stream pass, when no creds file is provided. StreamPass string `mapstructure:"stream_pass"` // NATS creds file CredsFile string `mapstructure:"creds_file"` // The subject prefix when publishing a message. PublisherSubjectPrefix string `mapstructure:"publisher_subject_prefix"` // URN Namespace to include in the published messages. StreamURNNamespace string `mapstructure:"stream_urn_ns"` // SubscribeSubjects when defined will result in the event broker subscribing to given streams. SubscribeSubjects []string `mapstructure:"subscribe_subjects"` // NATS connection timeout ConnectTimeout time.Duration `mapstructure:"connect_timeout"` // Setting Consumer parameters will cause a NATS consumer to be added. Consumer *NatsConsumerOptions `mapstructure:"consumer"` // Setting Stream parameters will cause a NATS stream to be added. Stream *NatsStreamOptions `mapstructure:"stream"` // KVReplicationFactor sets the number of copies in a NATS clustered environment KVReplicationFactor int `mapstructure:"kv_replication"` }
NatsOptions holds the configuration parameters to setup NATS Jetstream.
type NatsStreamOptions ¶
type NatsStreamOptions struct { // Name for the stream Name string `mapstructure:"name"` // Subjects allowed to publish on the stream Subjects []string `mapstructure:"subjects"` // Acknowledgements required for each msg // // https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#acknowledgement-models Acknowledgements bool `mapstructure:"acknowledgements"` // DuplicateWindow, messages containing the same message ID will be // deduplicated in this time window. // // https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#message-deduplication DuplicateWindow time.Duration `mapstructure:"duplicate_window"` // Retention is the message eviction criteria // // https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#stream-limits-retention-and-policy Retention string `mapstructure:"retention"` }
NatsStreamOptions are parameters to setup a NATS stream.
type ResourceType ¶
type ResourceType string
ResourceType is the kind of the object included the message.
type Stream ¶
type Stream interface { // Open sets up the stream connection. Open() error // Publish publishes the message to the message broker. Publish(ctx context.Context, subject string, msg []byte) error // Subscribe subscribes to one or more subjects on the stream returning a message channel for subscribers to read from. Subscribe(ctx context.Context) (MsgCh, error) // PullMsg pulls upto batch count of messages from the stream through the pull based subscription. PullMsg(ctx context.Context, batch int) ([]Message, error) // Closes the connection to the stream, along with unsubscribing any subscriptions. Close() error }
Stream provides methods to interact with the event stream.
func NewStream ¶
func NewStream(parameters StreamParameters) (Stream, error)
NewStream returns a Stream implementation.
type StreamParameters ¶
type StreamParameters interface{}
StreamParameters is the configuration for the Stream broker, the interface is type asserted by the stream broker implementation.
Directories ¶
Path | Synopsis |
---|---|
internal
|
|
Package mock_events is a generated GoMock package.
|
Package mock_events is a generated GoMock package. |
pkg
|
|
The registry package builds functionality for tracking live controller processes in a NATS KV store.
|
The registry package builds functionality for tracking live controller processes in a NATS KV store. |