natstream

package module
v0.0.0-...-5cfc6db Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2025 License: GPL-3.0 Imports: 5 Imported by: 0

README

A nats jetstream golang publisher/consumer utility

getting started

go get github.com/briannbig/natstream

Example usage

import (
    
	"context"
	"os"
    "time"
    
    "github.com/nats-io/nats.go"
    "github.com/briannbig/natstream"
)

const (
	consumerDurableName = "notifications.consumer"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	url := os.Getenv("NATS_URL")
	if url == "" {
		url = nats.DefaultURL
	}
	nc, err := nats.Connect(url)

	if err != nil {
		log.Printf("could not connect to queue --- %s", err.Error())
		os.Exit(1)
	}

   // initialize jetstream
	q, err := natstream.New(ctx, nc, natstream.QueueConfig{
		StreamName: "notification-stream",
		Subjects:   []string{"subject.one", "subject.two", "subject.three"},
		Storage:    jetstream.MemoryStorage,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer q.Close()

	svc := NotificationService()

	// Create consumer
	q.RegisterConsumer(ctx, natstream.ConsumerConfig{
		DurableName: consumerDurableName,
		AckPolicy:   jetstream.AckAllPolicy,
	}, svc.Notify)
	
    //... rest of your code

}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerConfig

type ConsumerConfig struct {
	DurableName   string
	AckPolicy     jetstream.AckPolicy
	MaxDeliver    int
	FilterSubject string
}

ConsumerConfig holds the configuration for a jetstream consumer

type Queue

type Queue struct {
	Stream jetstream.Stream
	Js     jetstream.JetStream
}

Queue represents a jetstream queue

func New

func New(ctx context.Context, nc *nats.Conn, cfg QueueConfig) (*Queue, error)

New connets to jetstream with given *nats.Conn and creates a jetstream.Stream with given streamName and subjects

func (*Queue) Close

func (q *Queue) Close() error

Close closes the jetstream connection

func (*Queue) Publish

func (q *Queue) Publish(ctx context.Context, subject string, data []byte) error

Publish publishes a message to the stream

func (Queue) RegisterConsumer

func (q Queue) RegisterConsumer(ctx context.Context, cfg ConsumerConfig, handler func(jetstream.Msg)) error

RegisterConsumer creates a nats jetstream consumer

type QueueConfig

type QueueConfig struct {
	StreamName string
	Subjects   []string
	Storage    jetstream.StorageType
}

QueueConfig holds the configuration for a jetstream queue

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL