events

package
v0.6.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2024 License: Apache-2.0 Imports: 11 Imported by: 9

README

Events

Package events provides an interface and methods to interact with an events stream broker.

The package provides methods to serialize, deserialize data sent on the stream as a pubsubx.Message along with methods to parse the message URN through urnx.

Connect to a NATS Jetstream to publish messages.

Example below sets up a NATS stream broker with the parameters provided, The stream, consumer and subscription(s) are initialized when defined, based on the configuration.

	options := events.NatsOptions{
		AppName:                "foo",
		URL:                    "nats://nats:4222",
		StreamUser:             viper.GetString("nats.stream.user"),
		StreamPass:             viper.GetString("nats.stream.pass"),
		CredsFile:              viper.GetString("nats.creds.file"),
		...

		// Defining a stream will result in the stream being added if not present.
		Stream: &events.NatsStreamOptions{
			// Name of the stream to be created.
			Name:     viper.GetString("nats.stream.name"),

			// Subjects associated with the stream.
			Subjects: viper.GetStringSlice("nats.stream.subjects"),
		},

		// Defining a consumer will result in the consumer being added if not present.
		Consumer: &events.NatsConsumerOptions{
			// Pull indicates this is a pull based stream, subcriptions to it will be pull based.
			Pull: viper.GetBool("nats.stream.consumer.pull")

			// Sets the durable consumer name, by setting a durable consumer name
			// the consumer is not epheremal and removed once there are no subscribers.
			Name: viper.GetString("nats.stream.consumer.name")

			....
		}
	}

	// initialize broker - validates the configuration and returns a Stream
	stream, err := events.NewStream(natsOptions(appName, streamURL))
	if err != nil {
		panic(err)
	}

	// Open connection - sets up required streams, consumers.
	if err := stream.Open(); err != nil {
		panic(err)
	}


    // publish asynchronously to subscribed consumer.
	if err := stream.PublishAsyncWithContext(ctx, resourceTypeServer, eventTypeCreate, uuid.New(), &Server{}); err != nil {
		panic(err)
	}


	// subscribe to one or more consumers, this returns a single channel.
	eventsCh, err := o.streamBroker.Subscribe(ctx)
	if err != nil {
		o.logger.Fatal(err)
	}

	for _, msg := range {
		// unpacks the data as a *pubsubx.Message
		data, err := msg.Data()
		if err != nil {
			panic(err)
		}

		// parse and retrieve the Subject URN
		urn, err := msg.SubjectURN(data)
		if err != nil {
			panic(err)
		}

		// ack the message
		if err := msg.Ack(); err != nil {
			panic(err)
		}
	}

Implementations

TODO(joel) : Link to implementations of this library.

Documentation

Overview

Package events provides methods to initializ§e and interface with an events broker.

Package events provides types and methods to interact with a messaging stream broker.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNatsConfig is returned when the conf
	ErrNatsConfig = errors.New("error in NATs Jetstream configuration")

	// ErrNatsConn is returned when an error occurs in connecting to NATS.
	ErrNatsConn = errors.New("error opening nats connection")

	// ErrNatsJetstream is returned when an error occurs in setting up the NATS Jetstream context.
	ErrNatsJetstream = errors.New("error in NATS Jetstream")

	// ErrNatsJetstreamAddStream os returned when an attempt to add a NATS Jetstream fails.
	ErrNatsJetstreamAddStream = errors.New("error adding stream to NATS Jetstream")

	// ErrNatsJetstreamAddConsumer is returned when theres an error adding a consumer to the NATS Jetstream.
	ErrNatsJetstreamAddConsumer = errors.New("error adding consumer on NATS Jetstream")

	// ErrNatsJetstreamUpdateConsumer is returned when theres an error updating a consumer configuration on the NATS Jetstream.
	ErrNatsJetstreamUpdateConsumer = errors.New("error updating consumer configuration on NATS Jetstream")

	// ErrNatsMsgPull is returned when theres and error pulling a message from a NATS Jetstream.
	ErrNatsMsgPull = errors.New("error fetching message from NATS Jetstream")

	// ErrSubscription is returned when an error in the consumer subscription occurs.
	ErrSubscription = errors.New("error subscribing to stream")
)

Functions

func AsNatsConnection

func AsNatsConnection(n *NatsJetstream) *nats.Conn

Add some conversions for functions/APIs that expect NATS primitive types. This allows consumers of NatsJetsream to convert easily to the types they need, without exporting the members or coercing and direct clients/holders of NatsJetstream to do this conversion. AsNatsConnection exposes the otherwise private NATS connection pointer

func AsNatsJetStreamContext

func AsNatsJetStreamContext(n *NatsJetstream) nats.JetStreamContext

AsNatsJetstreamContext exposes the otherwise private NATS JetStreamContext

func AsNatsMsg

func AsNatsMsg(m Message) (*nats.Msg, error)

AsNatsMsg exposes the underlying nats.Msg to a sophisticated consumer.

func MustNatsMsg

func MustNatsMsg(m Message) *nats.Msg

MustNatsMsg will panic if the type assertion fails

Types

type EventType

type EventType string

EventType is a type identifying the Event kind that has occurred on an object.

const (
	// Create action kind identifies objects that were created.
	Create EventType = "create"

	// Update action kind identifies objects that were updated.
	Update EventType = "update"

	// Delete action kind identifies objects that were removed.
	Delete EventType = "delete"
)

type Message

type Message interface {
	// Ack the message as processed on the stream.
	Ack() error

	// Nak the message as not processed on the stream.
	Nak() error

	// Term signals to the broker that the message processing has failed and the message
	// must not be redelivered.
	Term() error

	// InProgress resets the redelivery timer for the message on the stream
	// to indicate the message is being worked on.
	InProgress() error

	// Subject returns the message subject.
	Subject() string

	// Data returns the data contained in the message.
	Data() []byte

	// ExtractOtelTraceContext returns a context populated with the parent trace if any.
	ExtractOtelTraceContext(ctx context.Context) context.Context
}

Message interface defines the methods available on the messages received on the stream.

These methods are to be implemented by the stream broker for its messages.

type MsgCh

type MsgCh chan Message

MsgCh is a channel over which messages arrive when subscribed.

type NatsConsumerOptions

type NatsConsumerOptions struct {
	// Pull indicates this is a pull based subscriber
	Pull bool `mapstructure:"pull"`

	// Sets the durable consumer name
	Name string `mapstructure:"name"`

	// Sets the queue group for this consumer
	QueueGroup string `mapstructure:"queue_group"`

	AckWait time.Duration `mapstructure:"ack_wait"`

	MaxAckPending int `mapstructure:"max_ack_pending"`

	// Setting the FilterSubject turns this consumer into a push based consumer,
	// With no filter subject, the consumer is a pull based consumer.
	//
	// Although if the stream is a WorkQueue stream, then this must be set
	// and should be unique between consumers on the stream.
	FilterSubject string `mapstructure:"filter_subject"`

	// Subscribe to these subjects through this consumer.
	SubscribeSubjects []string `mapstructure:"subscribe_subjects"`
}

NatsConsumerOptions is the parameters for the NATS consumer configuration.

Note: Nats consumers are views into the stream, multiple subscribers may bind on a consumer.

type NatsJetstream

type NatsJetstream struct {
	// contains filtered or unexported fields
}

NatsJetstream wraps the NATs JetStream connector to implement the Stream interface.

func NewJetstreamFromConn

func NewJetstreamFromConn(c *nats.Conn) *NatsJetstream

NewJetstreamFromConn takes an already established NATS connection pointer and returns a NatsJetstream pointer

func NewNatsBroker

func NewNatsBroker(params StreamParameters) (*NatsJetstream, error)

NewNatsBroker validates the given stream broker parameters and returns a stream broker implementation.

func (*NatsJetstream) Close

func (n *NatsJetstream) Close() error

Close drains any subscriptions and closes the NATS Jetstream connection.

func (*NatsJetstream) Open

func (n *NatsJetstream) Open() error

Open connects to the NATS Jetstream.

func (*NatsJetstream) Publish

func (n *NatsJetstream) Publish(ctx context.Context, subjectSuffix string, data []byte) error

Publish publishes an event onto the NATS Jetstream. The caller is responsible for message addressing and data serialization. NOTE: The subject passed here will be prepended with any configured PublisherSubjectPrefix.

func (*NatsJetstream) PullMsg

func (n *NatsJetstream) PullMsg(_ context.Context, batch int) ([]Message, error)

PullMsg pulls up to the batch count of messages from each pull-based subscription to subjects on the stream.

func (*NatsJetstream) Subscribe

func (n *NatsJetstream) Subscribe(ctx context.Context) (MsgCh, error)

Subscribe to all configured SubscribeSubjects

type NatsOptions

type NatsOptions struct {
	// URL is the NATS server URL
	URL string `mapstructure:"url"`

	// AppName is the name of the application connecting to the
	// NATS stream, this parameter is used to open the NATS connection
	// and bind as a durable consumer.
	AppName string `mapstructure:"app_name"`

	// NATS stream user, when no creds file is provided.
	StreamUser string `mapstructure:"stream_user"`

	// NATS stream pass, when no creds file is provided.
	StreamPass string `mapstructure:"stream_pass"`

	// NATS creds file
	CredsFile string `mapstructure:"creds_file"`

	// The subject prefix when publishing a message.
	PublisherSubjectPrefix string `mapstructure:"publisher_subject_prefix"`

	// URN Namespace to include in the published messages.
	StreamURNNamespace string `mapstructure:"stream_urn_ns"`

	// SubscribeSubjects when defined will result in the event broker subscribing to given streams.
	SubscribeSubjects []string `mapstructure:"subscribe_subjects"`

	// NATS connection timeout
	ConnectTimeout time.Duration `mapstructure:"connect_timeout"`

	// Setting Consumer parameters will cause a NATS consumer to be added.
	Consumer *NatsConsumerOptions `mapstructure:"consumer"`

	// Setting Stream parameters will cause a NATS stream to be added.
	Stream *NatsStreamOptions `mapstructure:"stream"`

	// KVReplicationFactor sets the number of copies in a NATS clustered environment
	KVReplicationFactor int `mapstructure:"kv_replication"`
}

NatsOptions holds the configuration parameters to setup NATS Jetstream.

type NatsStreamOptions

type NatsStreamOptions struct {
	// Name for the stream
	Name string `mapstructure:"name"`

	// Subjects allowed to publish on the stream
	Subjects []string `mapstructure:"subjects"`

	// Acknowledgements required for each msg
	//
	// https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#acknowledgement-models
	Acknowledgements bool `mapstructure:"acknowledgements"`

	// DuplicateWindow, messages containing the same message ID will be
	// deduplicated in this time window.
	//
	// https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#message-deduplication
	DuplicateWindow time.Duration `mapstructure:"duplicate_window"`

	// Retention is the message eviction criteria
	//
	// https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#stream-limits-retention-and-policy
	Retention string `mapstructure:"retention"`
}

NatsStreamOptions are parameters to setup a NATS stream.

type ResourceType

type ResourceType string

ResourceType is the kind of the object included the message.

type Stream

type Stream interface {
	// Open sets up the stream connection.
	Open() error

	// Publish publishes the message to the message broker.
	Publish(ctx context.Context, subject string, msg []byte) error

	// Subscribe subscribes to one or more subjects on the stream returning a message channel for subscribers to read from.
	Subscribe(ctx context.Context) (MsgCh, error)

	// PullMsg pulls upto batch count of messages from the stream through the pull based subscription.
	PullMsg(ctx context.Context, batch int) ([]Message, error)

	// Closes the connection to the stream, along with unsubscribing any subscriptions.
	Close() error
}

Stream provides methods to interact with the event stream.

func NewStream

func NewStream(parameters StreamParameters) (Stream, error)

NewStream returns a Stream implementation.

type StreamParameters

type StreamParameters interface{}

StreamParameters is the configuration for the Stream broker, the interface is type asserted by the stream broker implementation.

Directories

Path Synopsis
internal
Package mock_events is a generated GoMock package.
Package mock_events is a generated GoMock package.
pkg
kv
The registry package builds functionality for tracking live controller processes in a NATS KV store.
The registry package builds functionality for tracking live controller processes in a NATS KV store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL