micro

package
v1.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2024 License: Apache-2.0, Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Example
s := RunServerOnPort(-1)
defer s.Shutdown()

nc, err := nats.Connect(s.ClientURL())
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

// Service handler is a function which takes Service.Request as argument.
// req.Respond or req.Error should be used to respond to the request.
incrementHandler := func(req *Request) {
	val, err := strconv.Atoi(string(req.Data()))
	if err != nil {
		req.Error("400", "request data should be a number", nil)
		return
	}

	responseData := val + 1
	req.Respond([]byte(strconv.Itoa(responseData)))
}

config := Config{
	Name:        "IncrementService",
	Version:     "0.1.0",
	Description: "Increment numbers",
	Endpoint: Endpoint{
		// service handler
		Handler: incrementHandler,
		// a unique subject serving as a service endpoint
		Subject: "numbers.increment",
	},
}
// Multiple instances of the servcice with the same name can be created.
// Requests to a service with the same name will be load-balanced.
for i := 0; i < 5; i++ {
	svc, err := AddService(nc, config)
	if err != nil {
		log.Fatal(err)
	}
	defer svc.Stop()
}

// send a request to a service
resp, err := nc.Request("numbers.increment", []byte("3"), 1*time.Second)
if err != nil {
	log.Fatal(err)
}
responseVal, err := strconv.Atoi(string(resp.Data))
if err != nil {
	log.Fatal(err)
}
fmt.Println(responseVal)

//
Output:

4

Index

Examples

Constants

View Source
const (
	// Queue Group name used across all services
	QG = "q"

	// APIPrefix is the root of all control subjects
	APIPrefix = "$SRV"
)
View Source
const (
	ErrorHeader     = "Nats-Service-Error"
	ErrorCodeHeader = "Nats-Service-Error-Code"
)

Service Error headers

Variables

View Source
var (
	ErrRespond         = errors.New("NATS error when sending response")
	ErrMarshalResponse = errors.New("marshaling response")
	ErrArgRequired     = errors.New("argument required")
)
View Source
var (
	// ErrConfigValidation is returned when service configuration is invalid
	ErrConfigValidation = errors.New("validation")

	// ErrVerbNotSupported is returned when invalid [Verb] is used (PING, SCHEMA, INFO, STATS)
	ErrVerbNotSupported = errors.New("unsupported verb")

	// ErrServiceNameRequired is returned when attempting to generate control subject with ID but empty name
	ErrServiceNameRequired = errors.New("service name is required to generate ID control subject")
)

Common errors returned by the Service framework.

Functions

func ControlSubject

func ControlSubject(verb Verb, name, id string) (string, error)

ControlSubject returns monitoring subjects used by the Service. Providing a verb is mandatory (it should be one of Ping, Schema, Info or Stats). Depending on whether kind and id are provided, ControlSubject will return one of the following:

  • verb only: subject used to monitor all available services
  • verb and kind: subject used to monitor services with the provided name
  • verb, name and id: subject used to monitor an instance of a service with the provided ID
Example
// subject used to get PING from all services
subjectPINGAll, _ := ControlSubject(PingVerb, "", "")
fmt.Println(subjectPINGAll)

// subject used to get PING from services with provided name
subjectPINGName, _ := ControlSubject(PingVerb, "CoolService", "")
fmt.Println(subjectPINGName)

// subject used to get PING from a service with provided name and ID
subjectPINGInstance, _ := ControlSubject(PingVerb, "CoolService", "123")
fmt.Println(subjectPINGInstance)
Output:

$SRV.PING
$SRV.PING.CoolService
$SRV.PING.CoolService.123

Types

type Config

type Config struct {
	Name         string   `json:"name"`
	Version      string   `json:"version"`
	Description  string   `json:"description"`
	Schema       Schema   `json:"schema"`
	Endpoint     Endpoint `json:"endpoint"`
	StatsHandler StatsHandler
	DoneHandler  DoneHandler
	ErrorHandler ErrHandler
}

Config is a configuration of a service.

type DoneHandler

type DoneHandler func(Service)

DoneHandler is a function used to configure a custom done handler for a service.

type Endpoint

type Endpoint struct {
	Subject string `json:"subject"`
	Handler RequestHandler
}

Endpoint is used to configure a subject and handler for a service.

type ErrHandler

type ErrHandler func(Service, *NATSError)

ErrHandler is a function used to configure a custom error handler for a service,

type Headers

type Headers nats.Header

Headers is a wrapper around *nats.Header

func (Headers) Get

func (h Headers) Get(key string) string

Get gets the first value associated with the given key. It is case-sensitive.

func (Headers) Values

func (h Headers) Values(key string) []string

Values returns all values associated with the given key. It is case-sensitive.

type Info

type Info struct {
	ServiceIdentity
	Description string `json:"description"`
	Subject     string `json:"subject"`
}

Info is the basic information about a service type.

type NATSError

type NATSError struct {
	Subject     string
	Description string
}

NATSError represents an error returned by a NATS Subscription. It contains a subject on which the subscription failed, so that it can be linked with a specific service endpoint.

func (*NATSError) Error

func (e *NATSError) Error() string

type Ping

type Ping ServiceIdentity

Ping is the response type for PING monitoring endpoint.

type Request

type Request struct {
	// contains filtered or unexported fields
}

Request represents service request available in the service handler. It exposes methods to respond to the request, as well as getting the request data and headers.

func (*Request) Data

func (r *Request) Data() []byte

func (*Request) Error

func (r *Request) Error(code, description string, data []byte, opts ...RespondOpt) error

Error prepares and publishes error response from a handler. A response error should be set containing an error code and description. Optionally, data can be set as response payload.

Example
handler := func(req *Request) {
	// respond with an error
	// Error sets Nats-Service-Error and Nats-Service-Error-Code headers in the response
	if err := req.Error("400", "bad request", []byte(`{"error": "value should be a number"}`)); err != nil {
		log.Fatal(err)
	}
}

fmt.Printf("%T", handler)
Output:

func (*Request) Headers

func (r *Request) Headers() Headers

func (*Request) Respond

func (r *Request) Respond(response []byte, opts ...RespondOpt) error
Example
handler := func(req *Request) {
	// respond to the request
	if err := req.Respond(req.Data()); err != nil {
		log.Fatal(err)
	}
}

fmt.Printf("%T", handler)
Output:

func (*Request) RespondJSON

func (r *Request) RespondJSON(response interface{}, opts ...RespondOpt) error
Example
type Point struct {
	X int `json:"x"`
	Y int `json:"y"`
}

handler := func(req *Request) {
	resp := Point{5, 10}
	// respond to the request
	// response will be serialized to {"x":5,"y":10}
	if err := req.RespondJSON(resp); err != nil {
		log.Fatal(err)
	}
}

fmt.Printf("%T", handler)
Output:

type RequestHandler

type RequestHandler func(*Request)

RequestHandler is a function used as a Handler for a service.

type RespondOpt

type RespondOpt func(*nats.Msg)

RespondOpt is a

func WithHeaders

func WithHeaders(headers Headers) RespondOpt

type Schema

type Schema struct {
	Request  string `json:"request"`
	Response string `json:"response"`
}

Schema can be used to configure a schema for a service. It is olso returned by the SCHEMA monitoring service (if set).

type SchemaResp

type SchemaResp struct {
	ServiceIdentity
	Schema Schema `json:"schema"`
}

SchemaResp is the response value for SCHEMA requests.

type Service

type Service interface {
	// Info returns the service info.
	Info() Info

	// Stats returns statisctics for the service endpoint and all monitoring endpoints.
	Stats() Stats

	// Reset resets all statistics on a service instance.
	Reset()

	// Stop drains the endpoint subscriptions and marks the service as stopped.
	Stop() error

	// Stopped informs whether [Stop] was executed on the service.
	Stopped() bool
}

func AddService

func AddService(nc *nats.Conn, config Config) (Service, error)

AddService adds a microservice. It will enable internal common services (PING, STATS, INFO and SCHEMA) as well as the actual service handler on the subject provided in config.Endpoint A service name, version and Endpoint configuration are required to add a service. AddService returns a Service interface, allowing service menagement. Each service is assigned a unique ID.

Example
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
	log.Fatal(err)
}
defer nc.Close()

echoHandler := func(req *Request) {
	req.Respond(req.Data())
}

config := Config{
	Name:        "EchoService",
	Version:     "v1.0.0",
	Description: "Send back what you receive",
	Endpoint: Endpoint{
		Subject: "echo",
		Handler: echoHandler,
	},

	// DoneHandler can be set to customize behavior on stopping a service.
	DoneHandler: func(srv Service) {
		info := srv.Info()
		fmt.Printf("stopped service %q with ID %q\n", info.Name, info.ID)
	},

	// ErrorHandler can be used to customize behavior on service execution error.
	ErrorHandler: func(srv Service, err *NATSError) {
		info := srv.Info()
		fmt.Printf("Service %q returned an error on subject %q: %s", info.Name, err.Subject, err.Description)
	},
}

srv, err := AddService(nc, config)
if err != nil {
	log.Fatal(err)
}
defer srv.Stop()
Output:

type ServiceIdentity

type ServiceIdentity struct {
	Name    string `json:"name"`
	ID      string `json:"id"`
	Version string `json:"version"`
}

ServiceIdentity contains fields helping to identidy a service instance.

type Stats

type Stats struct {
	ServiceIdentity
	NumRequests           int             `json:"num_requests"`
	NumErrors             int             `json:"num_errors"`
	LastError             string          `json:"last_error"`
	ProcessingTime        time.Duration   `json:"processing_time"`
	AverageProcessingTime time.Duration   `json:"average_processing_time"`
	Started               string          `json:"started"`
	Data                  json.RawMessage `json:"data,omitempty"`
}

Stats is the type returned by STATS monitoring endpoint. It contains stats for a specific endpoint (either request handler or monitoring enpoints).

type StatsHandler

type StatsHandler func(Endpoint) interface{}

StatsHandleris a function used to configure a custom STATS endpoint. It should return a value which can be serialized to JSON.

type Verb

type Verb int64

Verb represents a name of the monitoring service.

const (
	PingVerb Verb = iota
	StatsVerb
	InfoVerb
	SchemaVerb
)

Verbs being used to set up a specific control subject.

func (Verb) String

func (s Verb) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL