clusterinfo

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 26, 2023 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChannelStats

type ChannelStats struct {
	Node          string          `json:"node"`
	Hostname      string          `json:"hostname"`
	TopicName     string          `json:"topic_name"`
	ChannelName   string          `json:"channel_name"`
	Depth         int64           `json:"depth"`
	MemoryDepth   int64           `json:"memory_depth"`
	BackendDepth  int64           `json:"backend_depth"`
	InFlightCount int64           `json:"in_flight_count"`
	DeferredCount int64           `json:"deferred_count"`
	RequeueCount  int64           `json:"requeue_count"`
	TimeoutCount  int64           `json:"timeout_count"`
	MessageCount  int64           `json:"message_count"`
	ClientCount   int             `json:"client_count"`
	Selected      bool            `json:"-"`
	NodeStats     []*ChannelStats `json:"nodes"`
	Clients       []*ClientStats  `json:"clients"`
	Paused        bool            `json:"paused"`

	E2eProcessingLatency *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"`
}

func (*ChannelStats) Add

func (c *ChannelStats) Add(a *ChannelStats)

type ChannelStatsByHost

type ChannelStatsByHost struct {
	ChannelStatsList
}

func (ChannelStatsByHost) Less

func (c ChannelStatsByHost) Less(i, j int) bool

type ChannelStatsList

type ChannelStatsList []*ChannelStats

func (ChannelStatsList) Len

func (c ChannelStatsList) Len() int

func (ChannelStatsList) Swap

func (c ChannelStatsList) Swap(i, j int)

type ClientStats

type ClientStats struct {
	Node              string        `json:"node"`
	RemoteAddress     string        `json:"remote_address"`
	Version           string        `json:"version"`
	ClientID          string        `json:"client_id"`
	Hostname          string        `json:"hostname"`
	UserAgent         string        `json:"user_agent"`
	ConnectTs         int64         `json:"connect_ts"`
	ConnectedDuration time.Duration `json:"connected"`
	InFlightCount     int           `json:"in_flight_count"`
	ReadyCount        int           `json:"ready_count"`
	FinishCount       int64         `json:"finish_count"`
	RequeueCount      int64         `json:"requeue_count"`
	MessageCount      int64         `json:"message_count"`
	SampleRate        int32         `json:"sample_rate"`
	Deflate           bool          `json:"deflate"`
	Snappy            bool          `json:"snappy"`
	Authed            bool          `json:"authed"`
	AuthIdentity      string        `json:"auth_identity"`
	AuthIdentityURL   string        `json:"auth_identity_url"`

	TLS                           bool   `json:"tls"`
	CipherSuite                   string `json:"tls_cipher_suite"`
	TLSVersion                    string `json:"tls_version"`
	TLSNegotiatedProtocol         string `json:"tls_negotiated_protocol"`
	TLSNegotiatedProtocolIsMutual bool   `json:"tls_negotiated_protocol_is_mutual"`
}

func (*ClientStats) HasSampleRate

func (s *ClientStats) HasSampleRate() bool

func (*ClientStats) HasUserAgent

func (s *ClientStats) HasUserAgent() bool

func (*ClientStats) UnmarshalJSON

func (s *ClientStats) UnmarshalJSON(b []byte) error

UnmarshalJSON implements json.Unmarshaler and postprocesses ConnectedDuration

type ClientStatsList

type ClientStatsList []*ClientStats

func (ClientStatsList) Len

func (c ClientStatsList) Len() int

func (ClientStatsList) Swap

func (c ClientStatsList) Swap(i, j int)

type ClientsByHost

type ClientsByHost struct {
	ClientStatsList
}

func (ClientsByHost) Less

func (c ClientsByHost) Less(i, j int) bool

type ClusterInfo

type ClusterInfo struct {
	// contains filtered or unexported fields
}

func New

func New(log lg.AppLogFunc, client *http_api.Client) *ClusterInfo

func (*ClusterInfo) CreateTopicChannel

func (c *ClusterInfo) CreateTopicChannel(topicName string, channelName string, lookupdHTTPAddrs []string) error

func (*ClusterInfo) DeleteChannel

func (c *ClusterInfo) DeleteChannel(topicName string, channelName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) DeleteTopic

func (c *ClusterInfo) DeleteTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) EmptyChannel

func (c *ClusterInfo) EmptyChannel(topicName string, channelName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) EmptyTopic

func (c *ClusterInfo) EmptyTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) GetLookupdProducers

func (c *ClusterInfo) GetLookupdProducers(lookupdHTTPAddrs []string) (Producers, error)

GetLookupdProducers returns Producers of all the nsqd connected to the given lookupds

func (*ClusterInfo) GetLookupdTopicChannels

func (c *ClusterInfo) GetLookupdTopicChannels(topic string, lookupdHTTPAddrs []string) ([]string, error)

GetLookupdTopicChannels returns a []string containing a union of all the channels from all the given lookupd for the given topic

func (*ClusterInfo) GetLookupdTopicProducers

func (c *ClusterInfo) GetLookupdTopicProducers(topic string, lookupdHTTPAddrs []string) (Producers, error)

GetLookupdTopicProducers returns Producers of all the nsqd for a given topic by unioning the nodes returned from the given lookupd

func (*ClusterInfo) GetLookupdTopics

func (c *ClusterInfo) GetLookupdTopics(lookupdHTTPAddrs []string) ([]string, error)

GetLookupdTopics returns a []string containing a union of all the topics from all the given nsqlookupd

func (*ClusterInfo) GetNSQDProducers

func (c *ClusterInfo) GetNSQDProducers(nsqdHTTPAddrs []string) (Producers, error)

GetNSQDProducers returns Producers of all the given nsqd

func (*ClusterInfo) GetNSQDStats

func (c *ClusterInfo) GetNSQDStats(producers Producers,
	selectedTopic string, selectedChannel string,
	includeClients bool) ([]*TopicStats, map[string]*ChannelStats, error)

GetNSQDStats returns aggregate topic and channel stats from the given Producers

if selectedChannel is empty, this will return stats for topic/channel if selectedTopic is empty, this will return stats for *all* topic/channels if includeClients is false, this will *not* return client stats for channels and the ChannelStats dict will be keyed by topic + ':' + channel

func (*ClusterInfo) GetNSQDTopicProducers

func (c *ClusterInfo) GetNSQDTopicProducers(topic string, nsqdHTTPAddrs []string) (Producers, error)

GetNSQDTopicProducers returns Producers containing the addresses of all the nsqd that produce the given topic

func (*ClusterInfo) GetNSQDTopics

func (c *ClusterInfo) GetNSQDTopics(nsqdHTTPAddrs []string) ([]string, error)

GetNSQDTopics returns a []string containing all the topics produced by the given nsqd

func (*ClusterInfo) GetProducers

func (c *ClusterInfo) GetProducers(lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) (Producers, error)

func (*ClusterInfo) GetTopicProducers

func (c *ClusterInfo) GetTopicProducers(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) (Producers, error)

func (*ClusterInfo) GetVersion

func (c *ClusterInfo) GetVersion(addr string) (semver.Version, error)

GetVersion returns a semver.Version object by querying /info

func (*ClusterInfo) PauseChannel

func (c *ClusterInfo) PauseChannel(topicName string, channelName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) PauseTopic

func (c *ClusterInfo) PauseTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) TombstoneNodeForTopic

func (c *ClusterInfo) TombstoneNodeForTopic(topic string, node string, lookupdHTTPAddrs []string) error

TombstoneNodeForTopic tombstones the given node for the given topic on all the given nsqlookupd and deletes the topic from the node

func (*ClusterInfo) UnPauseChannel

func (c *ClusterInfo) UnPauseChannel(topicName string, channelName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

func (*ClusterInfo) UnPauseTopic

func (c *ClusterInfo) UnPauseTopic(topicName string, lookupdHTTPAddrs []string, nsqdHTTPAddrs []string) error

type ErrList

type ErrList []error

func (ErrList) Error

func (l ErrList) Error() string

func (ErrList) Errors

func (l ErrList) Errors() []error

type PartialErr

type PartialErr interface {
	error
	Errors() []error
}

type Producer

type Producer struct {
	RemoteAddresses  []string       `json:"remote_addresses"`
	RemoteAddress    string         `json:"remote_address"`
	Hostname         string         `json:"hostname"`
	BroadcastAddress string         `json:"broadcast_address"`
	TCPPort          int            `json:"tcp_port"`
	HTTPPort         int            `json:"http_port"`
	Version          string         `json:"version"`
	VersionObj       semver.Version `json:"-"`
	Topics           ProducerTopics `json:"topics"`
	OutOfDate        bool           `json:"out_of_date"`
}

func (*Producer) Address

func (p *Producer) Address() string

func (*Producer) HTTPAddress

func (p *Producer) HTTPAddress() string

func (*Producer) IsInconsistent

func (p *Producer) IsInconsistent(numLookupd int) bool

IsInconsistent checks for cases where an unexpected number of nsqd connections are reporting the same information to nsqlookupd (ie: multiple instances are using the same broadcast address), or cases where some nsqd are not reporting to all nsqlookupd.

func (*Producer) TCPAddress

func (p *Producer) TCPAddress() string

func (*Producer) UnmarshalJSON

func (p *Producer) UnmarshalJSON(b []byte) error

UnmarshalJSON implements json.Unmarshaler and postprocesses of ProducerTopics and VersionObj

type ProducerTopic

type ProducerTopic struct {
	Topic      string `json:"topic"`
	Tombstoned bool   `json:"tombstoned"`
}

type ProducerTopics

type ProducerTopics []ProducerTopic

func (ProducerTopics) Len

func (pt ProducerTopics) Len() int

func (ProducerTopics) Less

func (pt ProducerTopics) Less(i, j int) bool

func (ProducerTopics) Swap

func (pt ProducerTopics) Swap(i, j int)

type Producers

type Producers []*Producer

func (Producers) HTTPAddrs

func (t Producers) HTTPAddrs() []string

func (Producers) Len

func (t Producers) Len() int

func (Producers) Search

func (t Producers) Search(needle string) *Producer

func (Producers) Swap

func (t Producers) Swap(i, j int)

type ProducersByHost

type ProducersByHost struct {
	Producers
}

func (ProducersByHost) Less

func (c ProducersByHost) Less(i, j int) bool

type TopicStats

type TopicStats struct {
	Node         string          `json:"node"`
	Hostname     string          `json:"hostname"`
	TopicName    string          `json:"topic_name"`
	Depth        int64           `json:"depth"`
	MemoryDepth  int64           `json:"memory_depth"`
	BackendDepth int64           `json:"backend_depth"`
	MessageCount int64           `json:"message_count"`
	NodeStats    []*TopicStats   `json:"nodes"`
	Channels     []*ChannelStats `json:"channels"`
	Paused       bool            `json:"paused"`

	E2eProcessingLatency *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"`
}

func (*TopicStats) Add

func (t *TopicStats) Add(a *TopicStats)

type TopicStatsByHost

type TopicStatsByHost struct {
	TopicStatsList
}

func (TopicStatsByHost) Less

func (c TopicStatsByHost) Less(i, j int) bool

type TopicStatsList

type TopicStatsList []*TopicStats

func (TopicStatsList) Len

func (t TopicStatsList) Len() int

func (TopicStatsList) Swap

func (t TopicStatsList) Swap(i, j int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL