server

package
v0.18.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2020 License: Apache-2.0 Imports: 42 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RootPath       = "/streaming"
	ServerPath     = RootPath + "/serverz"
	StorePath      = RootPath + "/storez"
	ClientsPath    = RootPath + "/clientsz"
	ChannelsPath   = RootPath + "/channelsz"
	IsFTActivePath = RootPath + "/isFTActive"
)

Routes for the monitoring pages

View Source
const (
	// VERSION is the current version for the NATS Streaming server.
	VERSION = "0.18.0"

	DefaultClusterID      = "test-cluster"
	DefaultDiscoverPrefix = "_STAN.discover"
	DefaultPubPrefix      = "_STAN.pub"
	DefaultSubPrefix      = "_STAN.sub"
	DefaultSubClosePrefix = "_STAN.subclose"
	DefaultUnSubPrefix    = "_STAN.unsub"
	DefaultClosePrefix    = "_STAN.close"

	DefaultRequestTopic = "_STAN.client_join_topic"
	DefaultStoreType    = stores.TypeMemory
	DefaultGroupLimit   = 5

	// DefaultHeartBeatInterval is the interval at which server sends heartbeat to a client
	DefaultHeartBeatInterval = 30 * time.Second
	// DefaultClientHBTimeout is how long server waits for a heartbeat response
	DefaultClientHBTimeout = 10 * time.Second
	// DefaultMaxFailedHeartBeats is the number of failed heartbeats before server closes
	// the client connection (total= (heartbeat interval + heartbeat timeout) * (fail count + 1)
	DefaultMaxFailedHeartBeats = int((5 * time.Minute) / DefaultHeartBeatInterval)

	// DefaultIOBatchSize is the maximum number of messages to accumulate before flushing a store.
	DefaultIOBatchSize = 1024

	// DefaultIOSleepTime is the duration (in micro-seconds) the server waits for more messages
	// before starting processing. Set to 0 (or negative) to disable the wait.
	DefaultIOSleepTime = int64(0)

	// DefaultLogCacheSize is the number of Raft log entries to cache in memory
	// to reduce disk IO.
	DefaultLogCacheSize = 512

	// DefaultLogSnapshots is the number of Raft log snapshots to retain.
	DefaultLogSnapshots = 2

	// DefaultTrailingLogs is the number of log entries to leave after a
	// snapshot and compaction.
	DefaultTrailingLogs = 10240
)

Server defaults.

Variables

View Source
var (
	ErrInvalidSubject     = errors.New("stan: invalid subject")
	ErrInvalidStart       = errors.New("stan: invalid start position")
	ErrInvalidSub         = errors.New("stan: invalid subscription")
	ErrInvalidClient      = errors.New("stan: clientID already registered")
	ErrMissingClient      = errors.New("stan: clientID missing")
	ErrInvalidClientID    = errors.New("stan: invalid clientID: only alphanumeric and `-` or `_` characters allowed")
	ErrInvalidAckWait     = errors.New("stan: invalid ack wait time, should be >= 1s")
	ErrInvalidMaxInflight = errors.New("stan: invalid MaxInflight, should be >= 1")
	ErrInvalidConnReq     = errors.New("stan: invalid connection request")
	ErrInvalidPubReq      = errors.New("stan: invalid publish request")
	ErrInvalidSubReq      = errors.New("stan: invalid subscription request")
	ErrInvalidUnsubReq    = errors.New("stan: invalid unsubscribe request")
	ErrInvalidCloseReq    = errors.New("stan: invalid close request")
	ErrDupDurable         = errors.New("stan: duplicate durable registration")
	ErrInvalidDurName     = errors.New("stan: durable name of a durable queue subscriber can't contain the character ':'")
	ErrUnknownClient      = errors.New("stan: unknown clientID")
	ErrNoChannel          = errors.New("stan: no configured channel")
	ErrClusteredRestart   = errors.New("stan: cannot restart server in clustered mode if it was not previously clustered")
	ErrChanDelInProgress  = errors.New("stan: channel is being deleted")
)

Errors.

View Source
var DefaultNatsServerOptions = server.Options{
	Host:   "127.0.0.1",
	Port:   4222,
	NoLog:  true,
	NoSigs: true,
}

DefaultNatsServerOptions are default options for the NATS server

View Source
var ErrTimeout = errors.New("natslog: read timeout")

ErrTimeout reports a read timeout error

Functions

func IsArray

func IsArray(arr []*SyncClient, cli *SyncClient) (int, bool)

judge the client in the array

func NewNATSOptions

func NewNATSOptions() *server.Options

NewNATSOptions returns a new instance of (NATS) Options. This is needed if one wants to configure specific NATS options before starting a NATS Streaming Server (with RunServerWithOpts()).

func ProcessConfigFile

func ProcessConfigFile(configFile string, opts *Options) error

ProcessConfigFile parses the configuration file `configFile` and updates the given Streaming options `opts`.

Types

type Channelsz

type Channelsz struct {
	ClusterID string      `json:"cluster_id"`
	ServerID  string      `json:"server_id"`
	Now       time.Time   `json:"now"`
	Offset    int         `json:"offset"`
	Limit     int         `json:"limit"`
	Count     int         `json:"count"`
	Total     int         `json:"total"`
	Names     []string    `json:"names,omitempty"`
	Channels  []*Channelz `json:"channels,omitempty"`
}

Channelsz lists the name of all NATS Streaming Channelsz

type Channelz

type Channelz struct {
	Name          string           `json:"name"`
	Msgs          int              `json:"msgs"`
	Bytes         uint64           `json:"bytes"`
	FirstSeq      uint64           `json:"first_seq"`
	LastSeq       uint64           `json:"last_seq"`
	Subscriptions []*Subscriptionz `json:"subscriptions,omitempty"`
}

Channelz describes a NATS Streaming Channel

type Clientsz

type Clientsz struct {
	ClusterID string     `json:"cluster_id"`
	ServerID  string     `json:"server_id"`
	Now       time.Time  `json:"now"`
	Offset    int        `json:"offset"`
	Limit     int        `json:"limit"`
	Count     int        `json:"count"`
	Total     int        `json:"total"`
	Clients   []*Clientz `json:"clients"`
}

Clientsz lists the client connections

type Clientz

type Clientz struct {
	ID            string                      `json:"id"`
	HBInbox       string                      `json:"hb_inbox"`
	Subscriptions map[string][]*Subscriptionz `json:"subscriptions,omitempty"`
}

Clientz describes a NATS Streaming Client connection

type ClusteringOptions

type ClusteringOptions struct {
	Clustered    bool     // Run the server in a clustered configuration.
	NodeID       string   // ID of the node within the cluster.
	Bootstrap    bool     // Bootstrap the cluster as a seed node if there is no existing state.
	Peers        []string // List of cluster peer node IDs to bootstrap cluster state.
	RaftLogPath  string   // Path to Raft log store directory.
	LogCacheSize int      // Number of Raft log entries to cache in memory to reduce disk IO.
	LogSnapshots int      // Number of Raft log snapshots to retain.
	TrailingLogs int64    // Number of logs left after a snapshot.
	Sync         bool     // Do a file sync after every write to the Raft log and message store.
	RaftLogging  bool     // Enable logging of Raft library (disabled by default since really verbose).

	// When a node processes a snapshot (either on startup or if falling behind) and its is
	// not in phase with the message store's state, it is required to reconcile its state
	// with the current leader. If it is unable, the node will fail to start or exit.
	// If all nodes are starting and there is no way to have a leader at this point,
	// then if this boolean is set to true, then the node will attempt to reconcile but
	// if it can't it will still proceed.
	ProceedOnRestoreFailure bool

	// These will be set to some sane defaults. Change only if experiencing raft issues.
	RaftHeartbeatTimeout time.Duration
	RaftElectionTimeout  time.Duration
	RaftLeaseTimeout     time.Duration
	RaftCommitTimeout    time.Duration
}

ClusteringOptions contains STAN Server options related to clustering.

type Options

type Options struct {
	ID                 string
	DiscoverPrefix     string
	StoreType          string
	FilestoreDir       string
	FileStoreOpts      stores.FileStoreOptions
	SQLStoreOpts       stores.SQLStoreOptions
	stores.StoreLimits               // Store limits (MaxChannels, etc..)
	EnableLogging      bool          // Enables logging
	CustomLogger       logger.Logger // Server will start with the provided logger
	Trace              bool          // Verbose trace
	Debug              bool          // Debug trace
	HandleSignals      bool          // Should the server setup a signal handler (for Ctrl+C, etc...)
	Secure             bool          // Create a TLS enabled connection
	ClientCert         string        // Client Certificate for TLS
	ClientKey          string        // Client Key for TLS
	ClientCA           string        // Client CAs for TLS
	TLSSkipVerify      bool          // Skips the server's certificate chain and host name verification (Insecure!)
	TLSServerName      string        // Used to verify the hostname returned in the server certificate
	IOBatchSize        int           // Maximum number of messages collected from clients before starting their processing.
	IOSleepTime        int64         // Duration (in micro-seconds) the server waits for more message to fill up a batch.
	NATSServerURL      string        // URL for external NATS Server to connect to. If empty, NATS Server is embedded.
	NATSCredentials    string        // Credentials file for connecting to external NATS Server.
	ClientHBInterval   time.Duration // Interval at which server sends heartbeat to a client.
	ClientHBTimeout    time.Duration // How long server waits for a heartbeat response.
	ClientHBFailCount  int           // Number of failed heartbeats before server closes client connection.
	FTGroupName        string        // Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore.
	Partitioning       bool          // Specify if server only accepts messages/subscriptions on channels defined in StoreLimits.
	SyslogName         string        // Optional name for the syslog (usueful on Windows when running several servers as a service)
	Encrypt            bool          // Specify if server should encrypt messages payload when storing them
	EncryptionCipher   string        // Cipher used for encryption. Supported are "AES" and "CHACHA". If none is specified, defaults to AES on platforms with Intel processors, CHACHA otherwise.
	EncryptionKey      []byte        // Encryption key. The environment NATS_STREAMING_ENCRYPTION_KEY takes precedence and is the preferred way to provide the key.
	Clustering         ClusteringOptions
	NATSClientOpts     []nats.Option

	GroupLimit int
	Sctp       bool
}

Options for NATS Streaming Server

func ConfigureOptions

func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, printTLSHelp func()) (*Options, *natsd.Options, error)

ConfigureOptions accepts a flag set and augment it with NATS Streaming Server specific flags. It then invokes the corresponding function from NATS Server. On success, Streaming and NATS options structures are returned configured based on the selected flags and/or configuration files. The command line options take precedence to the ones in the configuration files.

func GetDefaultOptions

func GetDefaultOptions() (o *Options)

GetDefaultOptions returns default options for the NATS Streaming Server

func (*Options) Clone

func (o *Options) Clone() *Options

Clone returns a deep copy of the Options object.

type Serverz

type Serverz struct {
	ClusterID     string    `json:"cluster_id"`
	ServerID      string    `json:"server_id"`
	Version       string    `json:"version"`
	GoVersion     string    `json:"go"`
	State         string    `json:"state"`
	Role          string    `json:"role,omitempty"`
	Now           time.Time `json:"now"`
	Start         time.Time `json:"start_time"`
	Uptime        string    `json:"uptime"`
	Clients       int       `json:"clients"`
	Subscriptions int       `json:"subscriptions"`
	Channels      int       `json:"channels"`
	TotalMsgs     int       `json:"total_msgs"`
	TotalBytes    uint64    `json:"total_bytes"`
	InMsgs        int64     `json:"in_msgs"`
	InBytes       int64     `json:"in_bytes"`
	OutMsgs       int64     `json:"out_msgs"`
	OutBytes      int64     `json:"out_bytes"`
	OpenFDs       int       `json:"open_fds,omitempty"`
	MaxFDs        int       `json:"max_fds,omitempty"`
}

Serverz describes the NATS Streaming Server

type StanServer

type StanServer struct {

	// Add Topic Group Map
	TopicGroupSnapshot *TopicGroup
	// contains filtered or unexported fields
}

StanServer structure represents the NATS Streaming Server

func Run

func Run(sOpts *Options, nOpts *natsd.Options) (*StanServer, error)

Run starts the NATS Streaming server. This wrapper function allows Windows to add a hook for running NATS Streaming as a service.

func RunServer

func RunServer(ID string) (*StanServer, error)

RunServer will startup an embedded NATS Streaming Server and a nats-server to support it.

func RunServerWithOpts

func RunServerWithOpts(stanOpts *Options, natsOpts *server.Options) (newServer *StanServer, returnedError error)

RunServerWithOpts allows you to run a NATS Streaming Server with full control on the Streaming and NATS Server configuration.

func (*StanServer) ClusterID

func (s *StanServer) ClusterID() string

ClusterID returns the NATS Streaming Server's ID.

func (*StanServer) LastError

func (s *StanServer) LastError() error

LastError returns the last fatal error the server experienced.

func (*StanServer) Shutdown

func (s *StanServer) Shutdown()

Shutdown will close our NATS connection and shutdown any embedded NATS server.

func (*StanServer) State

func (s *StanServer) State() State

State returns the state of this server.

type State

type State int8

State represents the possible server states

const (
	Standalone State = iota
	FTActive
	FTStandby
	Failed
	Shutdown
	Clustered
)

Possible server states

func (State) String

func (state State) String() string

type Storez

type Storez struct {
	ClusterID  string             `json:"cluster_id"`
	ServerID   string             `json:"server_id"`
	Now        time.Time          `json:"now"`
	Type       string             `json:"type"`
	Limits     stores.StoreLimits `json:"limits"`
	TotalMsgs  int                `json:"total_msgs"`
	TotalBytes uint64             `json:"total_bytes"`
}

Storez describes the NATS Streaming Store

type Subscriptionz

type Subscriptionz struct {
	ClientID     string `json:"client_id"`
	Inbox        string `json:"inbox"`
	AckInbox     string `json:"ack_inbox"`
	DurableName  string `json:"durable_name,omitempty"`
	QueueName    string `json:"queue_name,omitempty"`
	IsDurable    bool   `json:"is_durable"`
	IsOffline    bool   `json:"is_offline"`
	MaxInflight  int    `json:"max_inflight"`
	AckWait      int    `json:"ack_wait"`
	LastSent     uint64 `json:"last_sent"`
	PendingCount int    `json:"pending_count"`
	IsStalled    bool   `json:"is_stalled"`
}

Subscriptionz describes a NATS Streaming Subscription

type SyncClient

type SyncClient struct {
	ClientID string `json:"client_id,omitempty"`
	Topic    string `json:"topic,omitempty"`
	Group    string `json:"group,omitempty"`
	Host     string `json:"host,omitempty"`
	Port     int    `json:"port,omitempty"`
}

type TopicGroup

type TopicGroup struct {
	Limit    int                        `json:"limit"`
	Clients  map[string][][]*SyncClient `json:"clients"`
	Snapshot map[string][]int           `json:"snapshot"`
}

func (*TopicGroup) Add

func (t *TopicGroup) Add(client *SyncClient, isAuto bool)

add client to array

func (*TopicGroup) AddClient

func (t *TopicGroup) AddClient(clientId, topic, group, host string, port int, isAuto bool)

add client info to array

func (*TopicGroup) DelClient

func (t *TopicGroup) DelClient(clientId, topic string, isAuto bool)

delete client info from array

func (*TopicGroup) Delete

func (t *TopicGroup) Delete(client *SyncClient, isAuto bool)

delete client from array

func (*TopicGroup) GetClient

func (t *TopicGroup) GetClient(clientID string, subject string) *SyncClient

func (*TopicGroup) GetClientGroup

func (t *TopicGroup) GetClientGroup(client *SyncClient) []*SyncClient

get group client by single client

func (*TopicGroup) GetGroupClientsByGroup

func (t *TopicGroup) GetGroupClientsByGroup(group, subject string) []*SyncClient

get group client by group_name and topic

func (*TopicGroup) ToJson

func (t *TopicGroup) ToJson() string

to json

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL