natty

package module
v0.0.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2022 License: MIT Imports: 11 Imported by: 0

README

natty

Go Report Card

An opinionated, NATS Jetstream client wrapper lib for Go.

Used by plumber and other Batch applications.

Why

NATS allows you tweak a lot of things - create push or pull streams, durable or ephemeral consumers and all kinds of other settings.

This library uses ONLY durable consumers and provides a two method API to interact with your NATS deployment:

  • Consume(ctx context.Context, subject string, errorCh chan error, cb func(msg *nats.Msg)) error
  • Publish(ctx context.Context, subject string, data []byte) error

The Consume() will block and has to be cancelled via context. You can also pass an optional error channel that the lib will write to when the callback func runs into an error.

Publish() is nothing fancy.

New() will perform the connect, create the stream and consumer.

TLS NATS

The NATS server started via docker-compose is configured to use TLS (with keys and certs located in ./assets/*).

We are doing NATS w/ TLS purely to ensure that the library will work with it.

Documentation

Index

Constants

View Source
const (
	DefaultMaxMsgs           = 10_000
	DefaultFetchSize         = 100
	DefaultFetchTimeout      = time.Second * 1
	DefaultDeliverPolicy     = nats.DeliverLastPolicy
	DefaultSubBatchSize      = 256
	DefaultWorkerIdleTimeout = time.Minute
	DefaultPublishTimeout    = time.Second * 5 // TODO: figure out a good value for this
)

Variables

View Source
var (
	ErrEmptyStreamName   = errors.New("StreamName cannot be empty")
	ErrEmptyConsumerName = errors.New("ConsumerName cannot be empty")
	ErrEmptySubject      = errors.New("Subject cannot be empty")
)

Functions

func GenerateTLSConfig

func GenerateTLSConfig(caCertFile, clientKeyFile, clientCertFile string, tlsSkipVerify bool) (*tls.Config, error)

Types

type Config

type Config struct {
	// NatsURL defines the NATS urls the library will attempt to connect to. Iff
	// first URL fails, we will try to connect to the next one. Only fail if all
	// URLs fail.
	NatsURL []string

	// MaxMsgs defines the maximum number of messages a stream will contain.
	MaxMsgs int64

	// FetchSize defines the number of messages to fetch from the stream during
	// a single Fetch() call.
	FetchSize int

	// FetchTimeout defines how long a Fetch() call will wait to attempt to reach
	// defined FetchSize before continuing.
	FetchTimeout time.Duration

	// DeliverPolicy defines the policy the library will use to deliver messages.
	// Default: DeliverLastPolicy which will deliver from the last message that
	// the consumer has seen.
	DeliverPolicy nats.DeliverPolicy

	// Logger allows you to inject a logger into the library. Optional.
	Logger Logger

	// Whether to use TLS
	UseTLS bool

	// TLS CA certificate file
	TLSCACertFile string

	// TLS client certificate file
	TLSClientCertFile string

	// TLS client key file
	TLSClientKeyFile string

	// Do not perform server certificate checks
	TLSSkipVerify bool

	// PublishBatchSize is how many messages to async publish at once
	// Default: 256
	PublishBatchSize int

	// ServiceShutdownContext is used by main() to shutdown services before application termination
	ServiceShutdownContext context.Context

	// MainShutdownFunc is triggered by watchForShutdown() after all publisher queues are exhausted
	// and is used to trigger shutdown of APIs and then main()
	MainShutdownFunc context.CancelFunc

	// WorkerIdleTimeout determines how long to keep a publish worker alive if no activity
	WorkerIdleTimeout time.Duration

	// PublishTimeout is how long to wait for a batch of async publish calls to be ACK'd
	PublishTimeout time.Duration

	// PublishErrorCh will receive any
	PublishErrorCh chan *PublishError
}

type ConsumerConfig

type ConsumerConfig struct {
	// Subject is the subject to consume off of a stream
	Subject string

	// StreamName is the name of JS stream to consume from.
	// This should first be created with CreateStream()
	StreamName string

	// ConsumerName is the consumer that was made with CreateConsumer()
	ConsumerName string

	// Looper is optional, if none is provided, one will be created
	Looper director.Looper

	// ErrorCh is used to retrieve any errors returned during asynchronous publishing
	// If nil, errors will only be logged
	ErrorCh chan error
}

ConsumerConfig is used to pass configuration options to Consume()

type INatty

type INatty interface {
	// Consume subscribes to given subject and executes callback every time a
	// message is received. Consumed messages must be explicitly ACK'd or NAK'd.
	//
	// This is a blocking call; cancellation should be performed via the context.
	Consume(ctx context.Context, cfg *ConsumerConfig, cb func(ctx context.Context, msg *nats.Msg) error) error

	// Publish publishes a single message with the given subject; this method
	// will perform automatic batching as configured during `natty.New(..)`
	Publish(ctx context.Context, subject string, data []byte)

	// DeletePublisher shuts down a publisher and deletes it from the internal publisherMap
	DeletePublisher(ctx context.Context, id string) bool

	// CreateStream creates a new stream if it does not exist
	CreateStream(ctx context.Context, name string, subjects []string) error

	// DeleteStream deletes an existing stream
	DeleteStream(ctx context.Context, name string) error

	// CreateConsumer creates a new consumer if it does not exist
	CreateConsumer(ctx context.Context, streamName, consumerName string, filterSubject ...string) error

	// DeleteConsumer deletes an existing consumer
	DeleteConsumer(ctx context.Context, consumerName, streamName string) error

	// Get will fetch the value for a given bucket and key. Will NOT auto-create
	// bucket if it does not exist.
	Get(ctx context.Context, bucket string, key string) ([]byte, error)

	// Put will put a new value for a given bucket and key. Will auto-create
	// the bucket if it does not already exist.
	Put(ctx context.Context, bucket string, key string, data []byte, ttl ...time.Duration) error

	// Delete will delete a key from a given bucket. Will no-op if the bucket
	// or key does not exist.
	Delete(ctx context.Context, bucket string, key string) error

	// DeleteBucket will delete the specified bucket
	DeleteBucket(ctx context.Context, bucket string) error
}

type KeyValueMap

type KeyValueMap struct {
	// contains filtered or unexported fields
}

func (*KeyValueMap) Delete

func (k *KeyValueMap) Delete(key string)

Delete functionality is not used because there is no way to list buckets in NATS

func (*KeyValueMap) Get

func (k *KeyValueMap) Get(key string) (nats.KeyValue, bool)

func (*KeyValueMap) Put

func (k *KeyValueMap) Put(key string, value nats.KeyValue)

type Logger

type Logger interface {
	// Debug sends out a debug message with the given arguments to the logger.
	Debug(args ...interface{})
	// Debugf formats a debug message using the given arguments and sends it to the logger.
	Debugf(format string, args ...interface{})
	// Info sends out an informational message with the given arguments to the logger.
	Info(args ...interface{})
	// Infof formats an informational message using the given arguments and sends it to the logger.
	Infof(format string, args ...interface{})
	// Warn sends out a warning message with the given arguments to the logger.
	Warn(args ...interface{})
	// Warnf formats a warning message using the given arguments and sends it to the logger.
	Warnf(format string, args ...interface{})
	// Error sends out an error message with the given arguments to the logger.
	Error(args ...interface{})
	// Errorf formats an error message using the given arguments and sends it to the logger.
	Errorf(format string, args ...interface{})
}

Logger is the common interface for user-provided loggers.

type Mode

type Mode int

type Natty

type Natty struct {
	*Config
	// contains filtered or unexported fields
}

func New

func New(cfg *Config) (*Natty, error)

func (*Natty) Consume

func (n *Natty) Consume(ctx context.Context, cfg *ConsumerConfig, f func(ctx context.Context, msg *nats.Msg) error) error

Consume will create a durable consumer and consume messages from the configured stream

func (*Natty) CreateConsumer

func (n *Natty) CreateConsumer(ctx context.Context, streamName, consumerName string, filterSubject ...string) error

func (*Natty) CreateStream

func (n *Natty) CreateStream(ctx context.Context, name string, subjects []string) error

func (*Natty) Delete

func (n *Natty) Delete(ctx context.Context, bucket string, key string) error

func (*Natty) DeleteBucket

func (n *Natty) DeleteBucket(_ context.Context, bucket string) error

func (*Natty) DeleteConsumer

func (n *Natty) DeleteConsumer(ctx context.Context, consumerName, streamName string) error

func (*Natty) DeletePublisher

func (n *Natty) DeletePublisher(ctx context.Context, topic string) bool

DeletePublisher will stop the batch publisher goroutine and remove the publisher from the shared publisher map.

It is safe to call this if a publisher for the topic does not exist.

Returns bool which indicate if publisher exists.

func (*Natty) DeleteStream

func (n *Natty) DeleteStream(ctx context.Context, name string) error

func (*Natty) Get

func (n *Natty) Get(ctx context.Context, bucket string, key string) ([]byte, error)

func (*Natty) Publish

func (n *Natty) Publish(ctx context.Context, subject string, value []byte)

func (*Natty) Put

func (n *Natty) Put(ctx context.Context, bucket string, key string, data []byte, keyTTL ...time.Duration) error

Put puts a key/val into a bucket and will create bucket if it doesn't already exit. TTL is optional; if provided, only the first value will be used.

type NoOpLogger

type NoOpLogger struct {
}

NoOpLogger is a do-nothing logger; it is used internally as the default Logger when none is provided in the Options.

func (*NoOpLogger) Debug

func (l *NoOpLogger) Debug(args ...interface{})

Debug is no-op implementation of Logger's Debug.

func (*NoOpLogger) Debugf

func (l *NoOpLogger) Debugf(format string, args ...interface{})

Debugf is no-op implementation of Logger's Debugf.

func (*NoOpLogger) Error

func (l *NoOpLogger) Error(args ...interface{})

Error is no-op implementation of Logger's Error.

func (*NoOpLogger) Errorf

func (l *NoOpLogger) Errorf(format string, args ...interface{})

Errorf is no-op implementation of Logger's Errorf.

func (*NoOpLogger) Info

func (l *NoOpLogger) Info(args ...interface{})

Info is no-op implementation of Logger's Info.

func (*NoOpLogger) Infof

func (l *NoOpLogger) Infof(format string, args ...interface{})

Infof is no-op implementation of Logger's Infof.

func (*NoOpLogger) Warn

func (l *NoOpLogger) Warn(args ...interface{})

Warn is no-op implementation of Logger's Warn.

func (*NoOpLogger) Warnf

func (l *NoOpLogger) Warnf(format string, args ...interface{})

Warnf is no-op implementation of Logger's Warnf.

type PublishError

type PublishError struct {
	Subject string
	Message error
}

PublishError is a wrapper struct used to return errors to code that occur during async batch publishes

type Publisher

type Publisher struct {
	Subject     string
	QueueMutex  *sync.RWMutex
	Queue       []*message
	Natty       *Natty
	IdleTimeout time.Duration

	// ErrorCh is optional. It will receive async publish errors if specified
	// Otherwise errors will only be logged
	ErrorCh chan *PublishError

	// PublisherContext is used to close a specific publisher
	PublisherContext context.Context

	// PublisherCancel is used to cancel a specific publisher's context
	PublisherCancel context.CancelFunc

	// ServiceShutdownContext is used by main() to shutdown services before application termination
	ServiceShutdownContext context.Context
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL