natsembed

package module
v0.0.0-...-3ec94f2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 25, 2025 License: BSD-3-Clause Imports: 10 Imported by: 0

README

natsembed

Helper module for embedding a NATS server inside your Go process. Start and stop it from code, apply new settings at runtime, and create in‑process client connections without opening a TCP socket.


Features

  • Start with Start(...) or Run(ctx, ...).
  • Run respects context cancelation.
  • Reconfigure(...) restarts the server with new options and closes tracked in‑process connections.
  • InProcessConnection() returns an in-process *nats.Conn.
  • Functional server options

Installation

go get github.com/tmacro/natsembed

Quick Start

package main

import (
    "context"
    "log"

    "github.com/nats-io/nats.go"
    "github.com/tmacro/natsembed"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Run blocks until the context is canceled
    go func() {
        if err := natsembed.Run(ctx,
            natsembed.ServerName("demo"),
            natsembed.JetstreamEnabled(),
        ); err != nil {
            log.Fatal(err)
        }
    }()

    nc, err := natsembed.InProcessConnection()
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Drain()

    nc.Subscribe("greet", func(m *nats.Msg) {
        log.Printf("received: %s", string(m.Data))
        cancel() // server stops when ctx is done
    })

    nc.Publish("greet", []byte("hello from inside"))

    <-ctx.Done()
}

API Overview

Lifecycle
  • Run(ctx context.Context, opts ...ServerOption) error Blocks until ctx.Done() then shuts the server down.
  • Start(opts ...ServerOption) error Alias for Reconfigure
  • Stop() error Stops the server and closes tracked in‑process connections.
  • Reconfigure(opts ...ServerOption) error Stops the current instance, closes in‑process conns, and starts again with the new options.
Connections
  • InProcessConnection() (*nats.Conn, error) Returns an in-process client connection. Client is set to reconnect automatically to allow for runtime server restarts.
Options

All configuration is done via functional ServerOptions.

Option Description
ServerName(name string) Sets the server name.
Host(host string) / Port(port int) Client listener address and port.
ClusterName(name string) Cluster name.
ClusterHost(host string) / ClusterPort(port int) Cluster listen address.
WithPeer(host string, port int) Add a single route peer.
WithPeers(peers []Peer) Add multiple peers.
JetstreamEnabled() Enable JetStream.
DebugEnabled() Turn on debug logging.
TraceEnabled() Turn on trace logging.
DontListen() Do not open a TCP listener (useful for tests).
Nkeys(users []*natsserver.NkeyUser) Configure NKeys auth.
Users(users []*natsserver.User) Configure username/password auth.
TLS(cfg *tls.Config) Enable TLS.
StoreDir(dir string) Persistent store directory (JetStream).
WithOptions(*natsserver.Options) Provide your own nats server config.
WithLogger(natsserver.Logger) Provide your own logger.

Shutdown & Reconnects

InProcessConnection() enables automatic reconnects with no max attempts and a 2s wait. When Stop() or Reconfigure() is called in‑process connections are closed.


License

BSD 3-Clause

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrServerNotRunning = errors.New("server is not running")
)

Functions

func InProcessConnection

func InProcessConnection(srv ...nats.InProcessConnProvider) (*nats.Conn, error)

func Reconfigure

func Reconfigure(options ...ServerOption) error

func Run

func Run(ctx context.Context, options ...ServerOption) error

func Start

func Start(options ...ServerOption) error

func Stop

func Stop() error

Types

type Peer

type Peer struct {
	Host string
	Port int
}

func (Peer) String

func (p Peer) String() string

func (Peer) URL

func (p Peer) URL() string

type ServerOption

type ServerOption func(*ServerOptions)

func ClusterHost

func ClusterHost(host string) ServerOption

func ClusterName

func ClusterName(name string) ServerOption

func ClusterPort

func ClusterPort(port int) ServerOption

func DebugEnabled

func DebugEnabled() ServerOption

func DontListen

func DontListen() ServerOption

func Host

func Host(host string) ServerOption

func JetstreamEnabled

func JetstreamEnabled() ServerOption

func Nkeys

func Nkeys(users []*natsserver.NkeyUser) ServerOption

func Port

func Port(port int) ServerOption

func ServerName

func ServerName(name string) ServerOption

func StoreDir

func StoreDir(dir string) ServerOption

func TLS

func TLS(cfg *tls.Config) ServerOption

func TraceEnabled

func TraceEnabled() ServerOption

func Users

func Users(users []*natsserver.User) ServerOption

func WithLogger

func WithLogger(logger natsserver.Logger) ServerOption

func WithOptions

func WithOptions(serverOpts natsserver.Options) ServerOption

func WithPeer

func WithPeer(host string, port int) ServerOption

func WithPeers

func WithPeers(peers []Peer) ServerOption

type ServerOptions

type ServerOptions struct {
	natsserver.Options
	Logger natsserver.Logger
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL