natshttp

package module
v0.0.0-...-06d80d2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 3, 2023 License: MIT Imports: 15 Imported by: 4

README


nats-http

Build Coverage Status License

Status: experimental

Usage

The project is currently in a state of rapid iteration. I will update this section and add more documentation as features mature and reach a steady state.

Client

// connect to NATS
conn, err := nats.Connect(nats.DefaultURL)
if err != nil {
    panic(err)
}

// create a client using the nats transport
client := http.Client{
    Transport: &Transport{
        Conn: conn,
    },
}

// perform a get request against a NATS Http Server configured to listen on the 'foo.bar.>' subject hierarchy
// it's important to use the 'nats+http' url scheme

resp, err := client.Get("nats+http://foo.bar/hello/world")
if err != nil {
    panic(err)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
    panic(err)
}

println(string(body))

Server

// connect to NATS
conn, err := nats.Connect(nats.DefaultURL)
if err != nil {
    panic(err)
}

// create a router
router := chi.NewRouter()
router.Head("/hello", func(w http.ResponseWriter, r *http.Request) {
    _, _ = io.WriteString(w, "world")
})

// create a server
srv := Server{
    Conn:    conn,
    Subject: "foo.bar",   // it will listen for requests on the 'foo.bar.>' subject hierarchy
    Group:   "my-server", // name of the queue group when subscribing, used for load balancing
    Handler: router,
}

// create a context and an error group for running processes
ctx, cancel := context.WithCancel(context.Background())
eg := errgroup.Group{}

// start listening
eg.Go(func() error {
    return srv.Listen(ctx)
})

// wait 10 seconds then cancel the context
eg.Go(func() error {
    <-time.After(10 * time.Second)
    cancel()
    return nil
})

// wait for the listener to complete
if err = eg.Wait(); err != nil {
    panic(err)
}

Proxy

// connect to NATS
conn, err := nats.Connect(nats.DefaultURL)
if err != nil {
    panic(err)
}

// create a TCP listener
listener, err := net.Listen("tcp", "localhost:8080")
if err != nil {
    panic(err)
}

// create a proxy which forwards requests to 'test.service.>' subject hierarchy
proxy := Proxy{
    Subject: "test.service",
    Listener: listener,
    Transport: &Transport{
        Conn: conn,
    },
}

// create a context and an error group for running processes
ctx, cancel := context.WithCancel(context.Background())
eg := errgroup.Group{}

// start listening
eg.Go(func () error {
    return proxy.Listen(ctx)
})

// wait 10 seconds then cancel the context
eg.Go(func () error {
    <-time.After(10 * time.Second)
    cancel()
    return nil
})

// wait for the listener to complete
if err = eg.Wait(); err != nil {
    panic(err)
}

Documentation

Index

Examples

Constants

View Source
const (
	SmallBodySize     = 4 * 1024  // 4 Kb
	DefaultHeaderSize = 10 * 1024 // 10 Kb
)
View Source
const (
	HeaderPath       = "X-Path"
	HeaderQuery      = "X-Query"
	HeaderFragment   = "X-Fragment"
	HeaderStatus     = "X-Status"
	HeaderStatusCode = "X-Status-Code"
	UrlScheme        = "nats+http"
)
View Source
const (
	ErrInvalidUrl = errors.ConstError("natshttp: urls must of be of the form 'nats+http://a.valid.nats.subject/foo/bar?query=baz")
)

Variables

View Source
var NoOpErrorHandler = func(_ error) {
}

Functions

func IsChunkedRequest

func IsChunkedRequest(msg *nats.Msg, msgSize int) (bool, error)

func MsgToRequest

func MsgToRequest(prefix string, msg *nats.Msg, req *http.Request) error

func ReqToMsg

func ReqToMsg(req *http.Request, msg *nats.Msg) error

Types

type ChunkReader

type ChunkReader struct {
	// contains filtered or unexported fields
}

func NewChunkReader

func NewChunkReader(
	firstMsg *nats.Msg,
	sub *nats.Subscription,
	ctx context.Context,
) (*ChunkReader, error)

func (*ChunkReader) Close

func (c *ChunkReader) Close() error

func (*ChunkReader) Read

func (c *ChunkReader) Read(p []byte) (n int, err error)

type Proxy

type Proxy struct {
	Subject   string
	Transport *Transport
	Listener  net.Listener
}
Example (Basic)
// connect to NATS
conn, err := nats.Connect(nats.DefaultURL)
if err != nil {
	panic(err)
}

// create a TCP listener
listener, err := net.Listen("tcp", "localhost:8080")
if err != nil {
	panic(err)
}

// create a proxy which forwards requests to 'test.service.>' subject hierarchy
proxy := Proxy{
	Subject:  "test.service",
	Listener: listener,
	Transport: &Transport{
		Conn: conn,
	},
}

// create a context and an error group for running processes
ctx, cancel := context.WithCancel(context.Background())
eg := errgroup.Group{}

// start listening
eg.Go(func() error {
	return proxy.Listen(ctx)
})

// wait 10 seconds then cancel the context
eg.Go(func() error {
	<-time.After(10 * time.Second)
	cancel()
	return nil
})

// wait for the listener to complete
if err = eg.Wait(); err != nil {
	panic(err)
}
Output:

func (*Proxy) Listen

func (p *Proxy) Listen(ctx context.Context) error

func (*Proxy) ServeHTTP

func (p *Proxy) ServeHTTP(w http.ResponseWriter, req *http.Request)

type ResponseWriter

type ResponseWriter struct {
	// contains filtered or unexported fields
}

func NewResponseWriter

func NewResponseWriter(conn *nats.Conn, subject string) (*ResponseWriter, error)

func (*ResponseWriter) Close

func (r *ResponseWriter) Close() error

func (*ResponseWriter) Header

func (r *ResponseWriter) Header() http.Header

func (*ResponseWriter) Write

func (r *ResponseWriter) Write(b []byte) (n int, err error)

func (*ResponseWriter) WriteHeader

func (r *ResponseWriter) WriteHeader(statusCode int)

type Result

type Result[T any] struct {
	Value T
	Error error
}

type Server

type Server struct {
	Conn    *nats.Conn
	Subject string
	Group   string

	Handler      http.Handler
	ErrorHandler func(error)

	PendingMsgsLimit  int
	PendingBytesLimit int
	// contains filtered or unexported fields
}
Example (Basic)
// connect to NATS
conn, err := nats.Connect(nats.DefaultURL)
if err != nil {
	panic(err)
}

// create a router
router := chi.NewRouter()
router.Head("/hello", func(w http.ResponseWriter, r *http.Request) {
	_, _ = io.WriteString(w, "world")
})

// create a server
srv := Server{
	Conn:    conn,
	Subject: "foo.bar",   // it will listen for requests on the 'foo.bar.>' subject hierarchy
	Group:   "my-server", // name of the queue group when subscribing, used for load balancing
	Handler: router,
}

// create a context and an error group for running processes
ctx, cancel := context.WithCancel(context.Background())
eg := errgroup.Group{}

// start listening
eg.Go(func() error {
	return srv.Listen(ctx)
})

// wait 10 seconds then cancel the context
eg.Go(func() error {
	<-time.After(10 * time.Second)
	cancel()
	return nil
})

// wait for the listener to complete
if err = eg.Wait(); err != nil {
	panic(err)
}
Output:

func (*Server) Listen

func (s *Server) Listen(ctx context.Context) error

type Transport

type Transport struct {
	Conn *nats.Conn

	PendingMsgsLimit  int
	PendingBytesLimit int
	// contains filtered or unexported fields
}
Example (Basic)
// connect to NATS
conn, err := nats.Connect(nats.DefaultURL)
if err != nil {
	panic(err)
}

// create a client using the nats transport
client := http.Client{
	Transport: &Transport{
		Conn: conn,
	},
}

// perform a get request against a NATS Http Server configured to listen on the 'foo.bar.>' subject hierarchy
// it's important to use the 'nats+http' url scheme

resp, err := client.Get("nats+http://foo.bar/hello/world")
if err != nil {
	panic(err)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
	panic(err)
}

println(string(body))
Output:

func (*Transport) RoundTrip

func (t *Transport) RoundTrip(req *http.Request) (resp *http.Response, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL