nats

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2025 License: MIT Imports: 10 Imported by: 0

README

NATS Bus

Go Reference Go Report Card License

A wrapper around NATS JetStream providing simple helpers for publishing, subscribing, and stream management.

Features

  • Publish data as []byte, string, or any Go struct (auto JSON marshal).
  • EnsureStream to create streams on the fly if they don’t exist.
  • Subscribe with durable consumers, manual ack, and error handling.
  • Graceful shutdown with connection draining and close.

Installation

go get github.com/your-org/your-nats-bus

Quick Start

Setup
package main

import (
	"context"
	"time"
	"log"

	"github.com/ose-micro/core/logger"
	"github.com/ose-micro/core/tracing"
)

func main() {
	logs, err := logger.NewZap(logger.Config{})
	if err != nil {
		log.Fatal(err)
	}

	tracer, err := tracing.NewOtel(tracing.Config{
		Endpoint:    "nats://localhost:4222",
		ServiceName: "Nats",
		SampleRatio: 1.0,
	}, logs)
	if err != nil {
		log.Fatal(err)
	}

	bus, err := New(Config{
		Url:          "nats://turntable.proxy.rlwy.net:58598",
		Name:         "Ose Nats",
		User:         "nats",
		Password:     "supersecret",
		Timeout:      2 * time.Second,
		MaxReconnect: 5,
	}, log, tracer)
	if err != nil {
		log.Fatal(err)
	}

	events := []string{"events.*"}

	if err = bus.EnsureStream("EVENTS", events...); err != nil {
		log.Fatal(err)
	}

	if err := bus.Publish("events.created", map[string]any{
		"id":   "123",
		"name": "New Event",
	}); err != nil {
		log.Fatal(err)
	}

	if err := bus.Subscribe("events.created", "ose", func(ctx context.Context, data any) error {
		log.Printf("📩 received event: %#v", data)
		return nil
	}); err != nil {
		log.Fatal(err)
	}

	if err = bus.Close(); err != nil {
		log.Fatal(err)
	}
}

API

Publish(subject string, data any) error

Publish a message to a subject.
Supports []byte, string, or any struct (auto-marshaled to JSON).


EnsureStream(name string, subjects ...string) error

EnsureStream(name string, subjects ...string) error
Ensure a JetStream stream exists with given name and subjects. If the stream already exists, it does nothing.

Subscribe(subject string, durable string, handler func(ctx context.Context, data any) error) error

Subscribe to a subject with a durable consumer.

  • Messages are JSON-unmarshaled if possible, otherwise passed as string.
  • Successful handler → message is Ack()ed.
  • Handler error → message is Nak()ed for retry.

Close() error

Gracefully drain and close the NATS connection.


Example Output

✅ Created stream EVENTS with subjects [events.*]
INFO  Subscribed to subject subject=events.created durable=worker-1
📩 received event: map[string]interface {}{"id":"123", "name":"New Event"}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

func New(conf Config, log logger.Logger, tracer tracing.Tracer) (domain.Bus, error)

Types

type Config

type Config struct {
	Url          string        `mapstructure:"url"`
	Name         string        `mapstructure:"name"`
	User         string        `mapstructure:"user"`
	Password     string        `mapstructure:"password"`
	Timeout      time.Duration `mapstructure:"timeout"`
	MaxReconnect int           `mapstructure:"max_reconnect"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL