yagnats

package module
v0.0.0-...-c3d200e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 3, 2021 License: Apache-2.0, BSD-3-Clause Imports: 13 Imported by: 233

README

Build Status

Yet Another Go NATS Client

A simple client for NATS written in Go.

Basic usage:

client := yagnats.NewClient()

err := client.Connect(&yagnats.ConnectionInfo{
		Addr:     "127.0.0.1:4222",
		Username: "user",
		Password: "pass",
})
if err != nil {
  panic("Wrong auth or something.")
}

client.Subscribe("some.subject", func(msg *Message) {
  fmt.Printf("Got message: %s\n", msg.Payload)
})

client.Publish("some.subject", []byte("Sup son?"))

TLS: Add a cert pool to the ConnectionInfo to enable a TLS connection

roots := x509.NewCertPool()
ok := roots.AppendCertsFromPEM([]byte("some-ca-cert-string"))

err := client.Connect(&ConnectionInfo{
        Addr:            "127.0.0.1:4222",
        Username:        "nats",
        Password:        "nats",
        CertPool:        roots,
})

Note: The INFO message is NOT currently available for consumption from the client.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var PARSERS = map[string]Parser{

	"PING": func(io *bufio.Reader) (Packet, error) {
		io.ReadBytes('\n')
		return &PingPacket{}, nil
	},

	"PONG": func(io *bufio.Reader) (Packet, error) {
		io.ReadBytes('\n')
		return &PongPacket{}, nil
	},

	"+OK": func(io *bufio.Reader) (Packet, error) {
		io.ReadBytes('\n')
		return &OKPacket{}, nil
	},

	"-ERR": func(io *bufio.Reader) (Packet, error) {
		bytes, _ := io.ReadBytes('\n')
		re := regexp.MustCompile(`\s*'(.*)'\s*\r\n`)
		match := re.FindSubmatchIndex(bytes)

		if match == nil {
			return nil, errors.New("Malformed -ERR message")
		}

		return &ERRPacket{Message: string(bytes[match[2]:match[3]])}, nil
	},

	"INFO": func(io *bufio.Reader) (Packet, error) {
		bytes, _ := io.ReadBytes('\n')
		re := regexp.MustCompile(`\s*([^\s]+)\s*\r\n`)

		match := re.FindSubmatchIndex(bytes)

		if match == nil {
			return nil, errors.New("Malformed INFO message")
		}

		return &InfoPacket{Payload: string(bytes[match[2]:match[3]])}, nil
	},

	"MSG": func(io *bufio.Reader) (Packet, error) {
		bytes, _ := io.ReadBytes('\n')
		re := regexp.MustCompile(`\s*([^\s]+)\s+(\d+)\s+(([^\s]+)\s+)?(\d+)\s*\r\n`)
		matches := re.FindStringSubmatch(string(bytes))

		if matches == nil {
			return nil, errors.New("Malformed MSG message")
		}

		subID, _ := strconv.ParseInt(matches[2], 10, 64)
		payloadLen, _ := strconv.Atoi(matches[5])

		payload, err := readNBytes(payloadLen, io)
		if err != nil {
			return nil, err
		}

		io.ReadBytes('\n')

		return &MsgPacket{
			Subject: matches[1],
			SubID:   subID,
			ReplyTo: matches[4],
			Payload: payload,
		}, nil
	},
}

Functions

func DefaultOptions

func DefaultOptions() nats.Options

Types

type Callback

type Callback func(*Message)

type Client

type Client struct {
	ConnectedCallback func()
	// contains filtered or unexported fields
}

func NewClient

func NewClient() *Client

func (*Client) BeforeConnectCallback

func (c *Client) BeforeConnectCallback(callback func())

func (*Client) Connect

func (c *Client) Connect(cp ConnectionProvider) error

func (*Client) Disconnect

func (c *Client) Disconnect()

func (*Client) Logger

func (c *Client) Logger() Logger

func (*Client) Ping

func (c *Client) Ping() bool

func (*Client) Publish

func (c *Client) Publish(subject string, payload []byte) error

func (*Client) PublishWithReplyTo

func (c *Client) PublishWithReplyTo(subject, reply string, payload []byte) error

func (*Client) SetLogger

func (c *Client) SetLogger(logger Logger)

func (*Client) Subscribe

func (c *Client) Subscribe(subject string, callback Callback) (int64, error)

func (*Client) SubscribeWithQueue

func (c *Client) SubscribeWithQueue(subject, queue string, callback Callback) (int64, error)

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(sid int64) error

func (*Client) UnsubscribeAll

func (c *Client) UnsubscribeAll(subject string)

type ConnectPacket

type ConnectPacket struct {
	User string
	Pass string
}

func (*ConnectPacket) Encode

func (p *ConnectPacket) Encode() []byte

type Connection

type Connection struct {
	Disconnected chan bool
	// contains filtered or unexported fields
}

func NewConnection

func NewConnection(addr, user, pass string) *Connection

func NewTLSConnection

func NewTLSConnection(addr, user, pass string, certPool *x509.CertPool, clientCert *tls.Certificate, verifyPeerCertificate func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error) *Connection

func (*Connection) Dial

func (c *Connection) Dial() error

func (*Connection) Disconnect

func (c *Connection) Disconnect()

func (*Connection) ErrOrOK

func (c *Connection) ErrOrOK() error

func (*Connection) Handshake

func (c *Connection) Handshake() error

func (*Connection) Logger

func (c *Connection) Logger() Logger

func (*Connection) OnMessage

func (c *Connection) OnMessage(callback func(*MsgPacket))

func (*Connection) Ping

func (c *Connection) Ping() bool

func (*Connection) Send

func (c *Connection) Send(packet Packet)

func (*Connection) SetLogger

func (c *Connection) SetLogger(logger Logger)

type ConnectionCluster

type ConnectionCluster struct {
	Members []ConnectionProvider
}

func (*ConnectionCluster) ProvideConnection

func (c *ConnectionCluster) ProvideConnection() (conn *Connection, err error)

type ConnectionInfo

type ConnectionInfo struct {
	Addr     string
	Username string
	Password string
	Dial     func(network, address string) (net.Conn, error)
	TLSInfo  *ConnectionTLSInfo
}

func (*ConnectionInfo) ProvideConnection

func (c *ConnectionInfo) ProvideConnection() (*Connection, error)

type ConnectionProvider

type ConnectionProvider interface {
	ProvideConnection() (*Connection, error)
}

type ConnectionTLSInfo

type ConnectionTLSInfo struct {
	CertPool              *x509.CertPool
	ClientCert            *tls.Certificate
	VerifyPeerCertificate func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error
}

type DefaultLogger

type DefaultLogger struct{}

func (*DefaultLogger) Debug

func (dl *DefaultLogger) Debug(string)

func (*DefaultLogger) Debugd

func (dl *DefaultLogger) Debugd(map[string]interface{}, string)

func (*DefaultLogger) Error

func (dl *DefaultLogger) Error(string)

func (*DefaultLogger) Errord

func (dl *DefaultLogger) Errord(map[string]interface{}, string)

func (*DefaultLogger) Fatal

func (dl *DefaultLogger) Fatal(string)

func (*DefaultLogger) Fatald

func (dl *DefaultLogger) Fatald(map[string]interface{}, string)

func (*DefaultLogger) Info

func (dl *DefaultLogger) Info(string)

func (*DefaultLogger) Infod

func (dl *DefaultLogger) Infod(map[string]interface{}, string)

func (*DefaultLogger) Warn

func (dl *DefaultLogger) Warn(string)

func (*DefaultLogger) Warnd

func (dl *DefaultLogger) Warnd(map[string]interface{}, string)

type ERRPacket

type ERRPacket struct {
	Message string
}

func (*ERRPacket) Encode

func (p *ERRPacket) Encode() []byte

type InfoPacket

type InfoPacket struct {
	Payload string
}

func (*InfoPacket) Encode

func (p *InfoPacket) Encode() []byte

type Logger

type Logger interface {
	Fatal(string)
	Error(string)
	Warn(string)
	Info(string)
	Debug(string)

	Fatald(map[string]interface{}, string)
	Errord(map[string]interface{}, string)
	Warnd(map[string]interface{}, string)
	Infod(map[string]interface{}, string)
	Debugd(map[string]interface{}, string)
}

type Message

type Message struct {
	Subject string
	ReplyTo string
	Payload []byte
}

type MsgPacket

type MsgPacket struct {
	Subject string
	SubID   int64
	ReplyTo string
	Payload []byte
}

func (*MsgPacket) Encode

func (p *MsgPacket) Encode() []byte

type NATSClient

type NATSClient interface {
	Ping() bool
	Connect(connectionProvider ConnectionProvider) error
	Disconnect()
	Publish(subject string, payload []byte) error
	PublishWithReplyTo(subject, reply string, payload []byte) error
	Subscribe(subject string, callback Callback) (int64, error)
	SubscribeWithQueue(subject, queue string, callback Callback) (int64, error)
	Unsubscribe(subscription int64) error
	UnsubscribeAll(subject string)
	BeforeConnectCallback(callback func())
}

type NATSConn

type NATSConn interface {
	Close()
	Publish(subject string, data []byte) error
	PublishRequest(subj, reply string, data []byte) error
	Subscribe(subject string, handler nats.MsgHandler) (*nats.Subscription, error)
	QueueSubscribe(subject, queue string, handler nats.MsgHandler) (*nats.Subscription, error)
	Unsubscribe(sub *nats.Subscription) error
	Ping() bool
	AddReconnectedCB(func(*nats.Conn))
	AddClosedCB(func(*nats.Conn))
	AddDisconnectedCB(func(*nats.Conn))
	Options() nats.Options
}

func Connect

func Connect(urls []string) (NATSConn, error)

func ConnectWithOptions

func ConnectWithOptions(urls []string, opts nats.Options) (NATSConn, error)

type OKPacket

type OKPacket struct{}

func (*OKPacket) Encode

func (p *OKPacket) Encode() []byte

type Packet

type Packet interface {
	Encode() []byte
}

func Parse

func Parse(io *bufio.Reader) (val Packet, err error)

type Parser

type Parser func(*bufio.Reader) (Packet, error)

type PingPacket

type PingPacket struct{}

func (*PingPacket) Encode

func (p *PingPacket) Encode() []byte

type PongPacket

type PongPacket struct{}

func (*PongPacket) Encode

func (p *PongPacket) Encode() []byte

type PubPacket

type PubPacket struct {
	Subject string
	ReplyTo string
	Payload []byte
}

func (*PubPacket) Encode

func (p *PubPacket) Encode() []byte

type SubPacket

type SubPacket struct {
	Subject string
	Queue   string
	ID      int64
}

func (*SubPacket) Encode

func (p *SubPacket) Encode() []byte

type Subscription

type Subscription struct {
	Subject  string
	Queue    string
	Callback Callback
	ID       int64
}

type UnsubPacket

type UnsubPacket struct {
	ID int64
}

func (*UnsubPacket) Encode

func (p *UnsubPacket) Encode() []byte

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL