Version: v0.9.6 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2016 License: MIT Imports: 30 Imported by: 0



Package sublist is a routing mechanism to handle subject distribution and provides a facility to match subjects from published messages to interested subscribers. Subscribers can have wildcard subjects to match multiple published subjects.



View Source
const (
	// CLIENT is an end user.
	CLIENT = iota
	// ROUTER is another router in the cluster.

Type of client connection.

View Source
const (
	// Original Client protocol from 2009.
	ClientProtoZero = iota
	// This signals a client can receive more then the original INFO block.
	// This can be used to update clients on other cluster members, etc.
View Source
const (
	// VERSION is the current version for the server.
	VERSION = "0.9.6"

	// DEFAULT_PORT is the default port for client connections.

	// RANDOM_PORT is the value for port that, when supplied, will cause the
	// server to listen on a randomly-chosen available port. The resolved port
	// is available via the Addr() method.

	// DEFAULT_HOST defaults to all interfaces.

	// MAX_CONTROL_LINE_SIZE is the maximum allowed protocol control line size.
	// 1k should be plenty since payloads sans connect string are separate

	// MAX_PAYLOAD_SIZE is the maximum allowed payload size. Should be using
	// something different if > 1MB payloads are needed.
	MAX_PAYLOAD_SIZE = (1024 * 1024)

	// DEFAULT_MAX_CONNECTIONS is the default maximum connections allowed.

	// TLS_TIMEOUT is the TLS wait time.
	TLS_TIMEOUT = 500 * time.Millisecond

	// AUTH_TIMEOUT is the authorization wait time.

	// DEFAULT_PING_INTERVAL is how often pings are sent to clients and routes.
	DEFAULT_PING_INTERVAL = 2 * time.Minute

	// DEFAULT_PING_MAX_OUT is maximum allowed pings outstanding before disconnect.

	// CR_LF string
	CR_LF = "\r\n"

	// LEN_CR_LF hold onto the computed size.
	LEN_CR_LF = len(CR_LF)

	// DEFAULT_FLUSH_DEADLINE is the write/flush deadlines.

	// DEFAULT_HTTP_PORT is the default monitoring port.

	// ACCEPT_MIN_SLEEP is the minimum acceptable sleep times on temporary errors.
	ACCEPT_MIN_SLEEP = 10 * time.Millisecond

	// ACCEPT_MAX_SLEEP is the maximum acceptable sleep times on temporary errors
	ACCEPT_MAX_SLEEP = 1 * time.Second

	// DEFAULT_ROUTE_CONNECT Route solicitation intervals.
	DEFAULT_ROUTE_CONNECT = 1 * time.Second

	// DEFAULT_ROUTE_RECONNECT Route reconnect intervals.

	// DEFAULT_ROUTE_DIAL Route dial timeout.
	DEFAULT_ROUTE_DIAL = 1 * time.Second

	// PROTO_SNIPPET_SIZE is the default size of proto to print on parse errors.

	// MAX_MSG_ARGS Maximum possible number of arguments from MSG proto.

	// MAX_PUB_ARGS Maximum possible number of arguments from PUB proto.
View Source
const (
	OP_START = iota

Parser constants

View Source
const (
	ConProto  = "CONNECT %s" + _CRLF_
	InfoProto = "INFO %s" + _CRLF_

Route protocol constants

View Source
const (


FIXME(dlc) - Make these reserved and reject if they come in as a sid from a client connection. Route constants

View Source
const (
	RootPath    = "/"
	VarzPath    = "/varz"
	ConnzPath   = "/connz"
	RoutezPath  = "/routez"
	SubszPath   = "/subsz"
	StackszPath = "/stacksz"

HTTP endpoints

View Source
const DefaultConnListSize = 1024

DefaultConnListSize is the default size of the connection list.


View Source
var (
	// ErrConnectionClosed represents an error condition on a closed connection.
	ErrConnectionClosed = errors.New("Connection Closed")

	// ErrAuthorization represents an error condition on failed authorization.
	ErrAuthorization = errors.New("Authorization Error")

	// ErrAuthTimeout represents an error condition on failed authorization due to timeout.
	ErrAuthTimeout = errors.New("Authorization Timeout")

	// ErrMaxPayload represents an error condition when the payload is too big.
	ErrMaxPayload = errors.New("Maximum Payload Exceeded")

	// ErrMaxControlLine represents an error condition when the control line is too big.
	ErrMaxControlLine = errors.New("Maximum Control Line Exceeded")

	// ErrReservedPublishSubject represents an error condition when sending to a reserved subject, e.g. _SYS.>
	ErrReservedPublishSubject = errors.New("Reserved Internal Subject")

	// ErrBadClientProtocol signals a client requested an invalud client protocol.
	ErrBadClientProtocol = errors.New("Invalid Client Protocol")

	// ErrTooManyConnections signals a client that the maximum number of connections supported by the
	// server has been reached.
	ErrTooManyConnections = errors.New("Maximum Connections Exceeded")
View Source
var (
	ErrInvalidSubject = errors.New("sublist: Invalid Subject")
	ErrNotFound       = errors.New("sublist: No Matches Found")

Sublist related errors


func Debugf

func Debugf(format string, v ...interface{})

Debugf logs a debug statement

func Errorf added in v0.6.0

func Errorf(format string, v ...interface{})

Errorf logs an error

func Fatalf

func Fatalf(format string, v ...interface{})

Fatalf logs a fatal error

func GenTLSConfig added in v0.7.0

func GenTLSConfig(tc *TLSConfigOpts) (*tls.Config, error)

GenTLSConfig loads TLS related configuration parameters.

func IsValidLiteralSubject added in v0.8.0

func IsValidLiteralSubject(subject string) bool

IsValidLiteralSubject returns true if a subject is valid and literal (no wildcards), false otherwise

func IsValidSubject added in v0.9.2

func IsValidSubject(subject string) bool

IsValidSubject returns true if a subject is valid, false otherwise

func Noticef added in v0.6.0

func Noticef(format string, v ...interface{})

Noticef logs a notice statement

func PrintAndDie

func PrintAndDie(msg string)

PrintAndDie is exported for access in other packages.

func PrintServerAndExit

func PrintServerAndExit()

PrintServerAndExit will print our version and exit.

func PrintTLSHelpAndDie added in v0.8.0

func PrintTLSHelpAndDie()

PrintTLSHelpAndDie prints TLS usage and exits.

func ProcessCommandLineArgs added in v0.9.6

func ProcessCommandLineArgs(cmd *flag.FlagSet) (showVersion bool, showHelp bool, err error)

ProcessCommandLineArgs takes the command line arguments validating and setting flags for handling in case any sub command was present.

func RemoveSelfReference added in v0.5.6

func RemoveSelfReference(clusterPort int, routes []*url.URL) ([]*url.URL, error)

RemoveSelfReference removes this server from an array of routes

func ResponseHandler added in v0.6.6

func ResponseHandler(w http.ResponseWriter, r *http.Request, data []byte)

ResponseHandler handles responses for monitoring routes

func RoutesFromStr added in v0.6.2

func RoutesFromStr(routesStr string) []*url.URL

RoutesFromStr parses route URLs from a string

func Tracef

func Tracef(format string, v ...interface{})

Tracef logs a trace statement


type Auth added in v0.6.0

type Auth interface {
	// Check if a client is authorized to connect
	Check(c ClientAuth) bool

Auth is an interface for implementing authentication

type ClientAuth added in v0.6.0

type ClientAuth interface {
	// Get options associated with a client
	GetOpts() *clientOpts
	// Optionally map a user after auth.

ClientAuth is an interface for client authentication

type ClusterOpts added in v0.9.6

type ClusterOpts struct {
	Host        string      `json:"addr"`
	Port        int         `json:"cluster_port"`
	Username    string      `json:"-"`
	Password    string      `json:"-"`
	AuthTimeout float64     `json:"auth_timeout"`
	TLSTimeout  float64     `json:"-"`
	TLSConfig   *tls.Config `json:"-"`
	ListenStr   string      `json:"-"`
	NoAdvertise bool        `json:"-"`

Options for clusters.

type ConnInfo

type ConnInfo struct {
	Cid            uint64    `json:"cid"`
	IP             string    `json:"ip"`
	Port           int       `json:"port"`
	Start          time.Time `json:"start"`
	LastActivity   time.Time `json:"last_activity"`
	Uptime         string    `json:"uptime"`
	Idle           string    `json:"idle"`
	Pending        int       `json:"pending_bytes"`
	InMsgs         int64     `json:"in_msgs"`
	OutMsgs        int64     `json:"out_msgs"`
	InBytes        int64     `json:"in_bytes"`
	OutBytes       int64     `json:"out_bytes"`
	NumSubs        uint32    `json:"subscriptions"`
	Name           string    `json:"name,omitempty"`
	Lang           string    `json:"lang,omitempty"`
	Version        string    `json:"version,omitempty"`
	TLSVersion     string    `json:"tls_version,omitempty"`
	TLSCipher      string    `json:"tls_cipher_suite,omitempty"`
	AuthorizedUser string    `json:"authorized_user,omitempty"`
	Subs           []string  `json:"subscriptions_list,omitempty"`

ConnInfo has detailed information on a per connection basis.

type Connz

type Connz struct {
	Now      time.Time  `json:"now"`
	NumConns int        `json:"num_connections"`
	Total    int        `json:"total"`
	Offset   int        `json:"offset"`
	Limit    int        `json:"limit"`
	Conns    []ConnInfo `json:"connections"`

Connz represents detailed information on current client connections.

type Info

type Info struct {
	ID                string   `json:"server_id"`
	Version           string   `json:"version"`
	GoVersion         string   `json:"go"`
	Host              string   `json:"host"`
	Port              int      `json:"port"`
	AuthRequired      bool     `json:"auth_required"`
	SSLRequired       bool     `json:"ssl_required"` // DEPRECATED: ssl json used for older clients
	TLSRequired       bool     `json:"tls_required"`
	TLSVerify         bool     `json:"tls_verify"`
	MaxPayload        int      `json:"max_payload"`
	IP                string   `json:"ip,omitempty"`
	ClientConnectURLs []string `json:"connect_urls,omitempty"` // Contains URLs a client can connect to.
	// contains filtered or unexported fields

Info is the information sent to clients to help them understand information about this server.

type Logger added in v0.6.0

type Logger interface {

	// Log a notice statement
	Noticef(format string, v ...interface{})

	// Log a fatal error
	Fatalf(format string, v ...interface{})

	// Log an error
	Errorf(format string, v ...interface{})

	// Log a debug statement
	Debugf(format string, v ...interface{})

	// Log a trace statement
	Tracef(format string, v ...interface{})

Logger interface of the NATS Server

type Options

type Options struct {
	Host           string        `json:"addr"`
	Port           int           `json:"port"`
	Trace          bool          `json:"-"`
	Debug          bool          `json:"-"`
	NoLog          bool          `json:"-"`
	NoSigs         bool          `json:"-"`
	Logtime        bool          `json:"-"`
	MaxConn        int           `json:"max_connections"`
	Users          []*User       `json:"-"`
	Username       string        `json:"-"`
	Password       string        `json:"-"`
	Authorization  string        `json:"-"`
	PingInterval   time.Duration `json:"ping_interval"`
	MaxPingsOut    int           `json:"ping_max"`
	HTTPHost       string        `json:"http_host"`
	HTTPPort       int           `json:"http_port"`
	HTTPSPort      int           `json:"https_port"`
	AuthTimeout    float64       `json:"auth_timeout"`
	MaxControlLine int           `json:"max_control_line"`
	MaxPayload     int           `json:"max_payload"`
	Cluster        ClusterOpts   `json:"cluster"`
	ProfPort       int           `json:"-"`
	PidFile        string        `json:"-"`
	LogFile        string        `json:"-"`
	Syslog         bool          `json:"-"`
	RemoteSyslog   string        `json:"-"`
	Routes         []*url.URL    `json:"-"`
	RoutesStr      string        `json:"-"`
	TLSTimeout     float64       `json:"tls_timeout"`
	TLS            bool          `json:"-"`
	TLSVerify      bool          `json:"-"`
	TLSCert        string        `json:"-"`
	TLSKey         string        `json:"-"`
	TLSCaCert      string        `json:"-"`
	TLSConfig      *tls.Config   `json:"-"`

Options block for gnatsd server.

func MergeOptions

func MergeOptions(fileOpts, flagOpts *Options) *Options

MergeOptions will merge two options giving preference to the flagOpts if the item is present.

func ProcessConfigFile

func ProcessConfigFile(configFile string) (*Options, error)

ProcessConfigFile processes a configuration file. FIXME(dlc): Hacky

type Pair added in v0.6.2

type Pair struct {
	Key *client
	Val int64

Pair type is internally used.

type Pairs added in v0.8.0

type Pairs []Pair

Pairs type is internally used.

func (Pairs) Len added in v0.8.0

func (d Pairs) Len() int

func (Pairs) Less added in v0.8.0

func (d Pairs) Less(i, j int) bool

func (Pairs) Swap added in v0.8.0

func (d Pairs) Swap(i, j int)

type Permissions added in v0.9.2

type Permissions struct {
	Publish   []string `json:"publish"`
	Subscribe []string `json:"subscribe"`

Authorization are the allowed subjects on a per publish or subscribe basis.

type RouteInfo added in v0.6.0

type RouteInfo struct {
	Rid          uint64   `json:"rid"`
	RemoteID     string   `json:"remote_id"`
	DidSolicit   bool     `json:"did_solicit"`
	IsConfigured bool     `json:"is_configured"`
	IP           string   `json:"ip"`
	Port         int      `json:"port"`
	Pending      int      `json:"pending_size"`
	InMsgs       int64    `json:"in_msgs"`
	OutMsgs      int64    `json:"out_msgs"`
	InBytes      int64    `json:"in_bytes"`
	OutBytes     int64    `json:"out_bytes"`
	NumSubs      uint32   `json:"subscriptions"`
	Subs         []string `json:"subscriptions_list,omitempty"`

RouteInfo has detailed information on a per connection basis.

type RouteType added in v0.8.0

type RouteType int

RouteType designates the router type

const (
	// This route we learned from speaking to other routes.
	Implicit RouteType = iota
	// This route was explicitly configured.

Type of Route

type Routez added in v0.6.0

type Routez struct {
	Now       time.Time    `json:"now"`
	NumRoutes int          `json:"num_routes"`
	Routes    []*RouteInfo `json:"routes"`

Routez represents detailed information on current client connections.

type Server

type Server struct {
	// contains filtered or unexported fields

Server is our main struct.

func New

func New(opts *Options) *Server

New will setup a new server struct after parsing the options.

func (*Server) AcceptLoop

func (s *Server) AcceptLoop(clr chan struct{})

AcceptLoop is exported for easier testing.

func (*Server) Addr added in v0.5.4

func (s *Server) Addr() net.Addr

Addr will return the net.Addr object for the current listener.

func (*Server) HandleConnz

func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request)

HandleConnz process HTTP requests for connection information.

func (*Server) HandleRoot added in v0.6.2

func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request)

HandleRoot will show basic info and links to others handlers.

func (*Server) HandleRoutez added in v0.6.0

func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request)

HandleRoutez process HTTP requests for route information.

func (*Server) HandleStacksz added in v0.8.1

func (s *Server) HandleStacksz(w http.ResponseWriter, r *http.Request)

HandleStacksz processes HTTP requests for getting stacks

func (*Server) HandleSubsz added in v0.6.0

func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request)

HandleSubsz processes HTTP requests for subjects stats.

func (*Server) HandleVarz

func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request)

HandleVarz will process HTTP requests for server information.

func (*Server) ID added in v0.8.0

func (s *Server) ID() string

ID returns the server's ID

func (*Server) NumClients

func (s *Server) NumClients() int

NumClients will report the number of registered clients.

func (*Server) NumRemotes

func (s *Server) NumRemotes() int

NumRemotes will report number of registered remotes.

func (*Server) NumRoutes

func (s *Server) NumRoutes() int

NumRoutes will report the number of registered routes.

func (*Server) NumSubscriptions

func (s *Server) NumSubscriptions() uint32

NumSubscriptions will report how many subscriptions are active.

func (*Server) ReOpenLogFile added in v0.9.6

func (s *Server) ReOpenLogFile()

If the logger is a file based logger, close and re-open the file. This allows for file rotation by 'mv'ing the file then signalling the process to trigger this function.

func (*Server) ReadyForConnections added in v0.9.6

func (s *Server) ReadyForConnections(dur time.Duration) bool

ReadyForConnections returns `true` if the server is ready to accept client and, if routing is enabled, route connections. If after the duration `dur` the server is still not ready, returns `false`.

func (*Server) SetClientAuthMethod added in v0.8.0

func (s *Server) SetClientAuthMethod(authMethod Auth)

SetClientAuthMethod sets the authentication method for clients.

func (*Server) SetLogger added in v0.6.0

func (s *Server) SetLogger(logger Logger, debugFlag, traceFlag bool)

SetLogger sets the logger of the server

func (*Server) SetRouteAuthMethod added in v0.8.0

func (s *Server) SetRouteAuthMethod(authMethod Auth)

SetRouteAuthMethod sets the authentication method for routes.

func (*Server) Shutdown

func (s *Server) Shutdown()

Shutdown will shutdown the server instance by kicking out the AcceptLoop and closing all associated clients.

func (*Server) Start

func (s *Server) Start()

Start up the server, this will block. Start via a Go routine if needed.

func (*Server) StartHTTPMonitoring

func (s *Server) StartHTTPMonitoring()

StartHTTPMonitoring will enable the HTTP monitoring port.

func (*Server) StartHTTPSMonitoring added in v0.7.0

func (s *Server) StartHTTPSMonitoring()

StartHTTPSMonitoring will enable the HTTPS monitoring port.

func (*Server) StartProfiler

func (s *Server) StartProfiler()

StartProfiler is called to enable dynamic profiling.

func (*Server) StartRouting

func (s *Server) StartRouting(clientListenReady chan struct{})

StartRouting will start the accept loop on the cluster host:port and will actively try to connect to listed routes.

type SortOpt added in v0.6.2

type SortOpt string

SortOpt is a helper type to sort by ConnInfo values

func (SortOpt) IsValid added in v0.8.0

func (s SortOpt) IsValid() bool

IsValid determines if a sort option is valid

type Sublist added in v0.8.0

type Sublist struct {
	// contains filtered or unexported fields

A Sublist stores and efficiently retrieves subscriptions.

func NewSublist added in v0.8.0

func NewSublist() *Sublist

New will create a default sublist

func (*Sublist) CacheCount added in v0.8.0

func (s *Sublist) CacheCount() int

CacheCount returns the number of result sets in the cache.

func (*Sublist) Count added in v0.8.0

func (s *Sublist) Count() uint32

Count returns the number of subscriptions.

func (*Sublist) Insert added in v0.8.0

func (s *Sublist) Insert(sub *subscription) error

Insert adds a subscription into the sublist

func (*Sublist) Match added in v0.8.0

func (s *Sublist) Match(subject string) *SublistResult

Match will match all entries to the literal subject. It will return a set of results for both normal and queue subscribers.

func (*Sublist) Remove added in v0.8.0

func (s *Sublist) Remove(sub *subscription) error

Remove will remove a subscription.

func (*Sublist) Stats added in v0.8.0

func (s *Sublist) Stats() *SublistStats

Stats will return a stats structure for the current state.

type SublistResult added in v0.8.0

type SublistResult struct {
	// contains filtered or unexported fields

A result structure better optimized for queue subs.

type SublistStats added in v0.8.0

type SublistStats struct {
	NumSubs      uint32  `json:"num_subscriptions"`
	NumCache     uint32  `json:"num_cache"`
	NumInserts   uint64  `json:"num_inserts"`
	NumRemoves   uint64  `json:"num_removes"`
	NumMatches   uint64  `json:"num_matches"`
	CacheHitRate float64 `json:"cache_hit_rate"`
	MaxFanout    uint32  `json:"max_fanout"`
	AvgFanout    float64 `json:"avg_fanout"`

Public stats for the sublist

type Subsz added in v0.6.0

type Subsz struct {

Subsz represents detail information on current connections.

type TLSConfigOpts added in v0.7.0

type TLSConfigOpts struct {
	CertFile string
	KeyFile  string
	CaFile   string
	Verify   bool
	Timeout  float64
	Ciphers  []uint16

TLSConfigOpts holds the parsed tls config information, used with flag parsing

type User added in v0.8.1

type User struct {
	Username    string       `json:"user"`
	Password    string       `json:"password"`
	Permissions *Permissions `json:"permissions"`

For multiple accounts/users.

type Varz

type Varz struct {
	Port             int               `json:"port"`
	MaxPayload       int               `json:"max_payload"`
	Start            time.Time         `json:"start"`
	Now              time.Time         `json:"now"`
	Uptime           string            `json:"uptime"`
	Mem              int64             `json:"mem"`
	Cores            int               `json:"cores"`
	CPU              float64           `json:"cpu"`
	Connections      int               `json:"connections"`
	TotalConnections uint64            `json:"total_connections"`
	Routes           int               `json:"routes"`
	Remotes          int               `json:"remotes"`
	InMsgs           int64             `json:"in_msgs"`
	OutMsgs          int64             `json:"out_msgs"`
	InBytes          int64             `json:"in_bytes"`
	OutBytes         int64             `json:"out_bytes"`
	SlowConsumers    int64             `json:"slow_consumers"`
	Subscriptions    uint32            `json:"subscriptions"`
	HTTPReqStats     map[string]uint64 `json:"http_req_stats"`

Varz will output server information on the monitoring port at /varz.


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL