Documentation ¶
Index ¶
- Constants
- Variables
- func InitWebsocketEnv(config *config.Config)
- type Broadcast
- type Client
- type ClientPool
- type Collector
- func (c *Collector) AddNode(addr string)
- func (c *Collector) DisableConnection()
- func (c *Collector) EnableConnection()
- func (c *Collector) MessageIn() chan<- *message.Message
- func (c *Collector) ProcessData(data []byte)
- func (c *Collector) Register() chan<- *Client
- func (c *Collector) RemoveNode(addr string)
- func (c *Collector) ReportIn() chan<- *message.Report
- func (c *Collector) Run()
- func (c *Collector) Stop()
- func (c *Collector) Unregister() chan<- *Client
- func (c *Collector) UpdateReportPeriod(reportPeriod time.Duration)
- type Dispatcher
- type DistributedSystem
- type GossipSystem
- func (d *GossipSystem) ConfigUpdate(key string, val string) error
- func (d *GossipSystem) GetBroadcasts(overhead, limit int) [][]byte
- func (d *GossipSystem) Join(addr string) error
- func (d *GossipSystem) Leave() error
- func (d *GossipSystem) ListNodes() ([]byte, error)
- func (d *GossipSystem) LocalState(join bool) []byte
- func (d *GossipSystem) MergeRemoteState(buf []byte, join bool)
- func (d *GossipSystem) NodeMeta(limit int) []byte
- func (d *GossipSystem) NotifyAlive(peer *memberlist.Node) error
- func (d *GossipSystem) NotifyJoin(node *memberlist.Node)
- func (d *GossipSystem) NotifyLeave(node *memberlist.Node)
- func (d *GossipSystem) NotifyMsg(b []byte)
- func (d *GossipSystem) NotifyUpdate(node *memberlist.Node)
- func (d *GossipSystem) OnRoleChanged(oldRole, newRole string)
- func (d *GossipSystem) Remove(addr string) error
- func (d *GossipSystem) Run()
- type Hub
- type MetaData
- type Node
- type Pusher
- type Server
- type Slot
- type StaticSystem
Constants ¶
const ( NodeRoleSlave = "slave" NodeRoleMaster = "master" NodeRoleStandby = "standby" )
lables of role
const (
SeedsFileName = "seeds.yaml"
)
seeds file to store and reload cluster nodes
Variables ¶
var ErrNotMaster = errors.New("not master")
ErrNotMaster returned when the node is not master
Functions ¶
func InitWebsocketEnv ¶
InitWebsocketEnv set global config for websocket client and server
Types ¶
type Broadcast ¶
type Broadcast struct {
// contains filtered or unexported fields
}
Broadcast is the implicment of memberlists' Broadcast
func (*Broadcast) Invalidates ¶
func (b *Broadcast) Invalidates(other memberlist.Broadcast) bool
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a integration of network data used by hub
type ClientPool ¶
type ClientPool struct {
// contains filtered or unexported fields
}
ClientPool keep the connections for pushing data TODO: perference support
type Collector ¶
Collector is responsable for assembling data master collector gathers info from slaves slave collector gathers info from probes
func NewCollector ¶
func NewCollector(report chan<- []byte, reportPeriod time.Duration, slowThreshold int64, disableConnection bool) *Collector
NewCollector create a collecotr
func (*Collector) DisableConnection ¶
func (c *Collector) DisableConnection()
DisableConnection clean all client connections
func (*Collector) EnableConnection ¶
func (c *Collector) EnableConnection()
EnableConnection enable and refresh client connections
func (*Collector) ProcessData ¶
ProcessData decode the report received from slaves
func (*Collector) RemoveNode ¶
RemoveNode delete a cluster node specified by addr
func (*Collector) Run ¶
func (c *Collector) Run()
Run start the main assembling process on message and report level
func (*Collector) Unregister ¶
Unregister remove a client from client pool
func (*Collector) UpdateReportPeriod ¶
UpdateReportPeriod reload the reportPeriod
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher is a hub to serve websocket, it runs on every node in the cluster
func NewDispatcher ¶
func NewDispatcher(pusher *Pusher) *Dispatcher
NewDispatcher create a new Dispatcher object
func (*Dispatcher) ProcessData ¶
func (d *Dispatcher) ProcessData(data []byte)
ProcessData of Dispatcher is empty
func (*Dispatcher) Register ¶
func (d *Dispatcher) Register() chan<- *Client
Register gets the register channel
func (*Dispatcher) Unregister ¶
func (d *Dispatcher) Unregister() chan<- *Client
Unregister gets the unregister channel
type DistributedSystem ¶
type DistributedSystem interface { Run() Join(addr string) error Remove(addr string) error Leave() error ListNodes() ([]byte, error) ConfigUpdate(key string, val string) error }
DistributedSystem decide how the cluster works
type GossipSystem ¶
GossipSystem is an auto failure dectect distributed system
func NewGossipSystem ¶
func NewGossipSystem(server *Server, role string, group string, port uint16) *GossipSystem
NewGossipSystem create a new gossip based system
func (*GossipSystem) ConfigUpdate ¶
func (d *GossipSystem) ConfigUpdate(key string, val string) error
ConfigUpdate updates a key-value configuration
func (*GossipSystem) GetBroadcasts ¶
func (d *GossipSystem) GetBroadcasts(overhead, limit int) [][]byte
func (*GossipSystem) Join ¶
func (d *GossipSystem) Join(addr string) error
Join add a node specified by 'addr' into cluster
func (*GossipSystem) Leave ¶
func (d *GossipSystem) Leave() error
Leave removes current node from cluster
func (*GossipSystem) ListNodes ¶
func (d *GossipSystem) ListNodes() ([]byte, error)
ListNodes shows cluster nodes info
func (*GossipSystem) LocalState ¶
func (d *GossipSystem) LocalState(join bool) []byte
func (*GossipSystem) MergeRemoteState ¶
func (d *GossipSystem) MergeRemoteState(buf []byte, join bool)
func (*GossipSystem) NodeMeta ¶
func (d *GossipSystem) NodeMeta(limit int) []byte
NodeMeta reutrn the binary meta data
func (*GossipSystem) NotifyAlive ¶
func (d *GossipSystem) NotifyAlive(peer *memberlist.Node) error
NotifyAlive is a interface called by the heart beat scheme of memberlist
func (*GossipSystem) NotifyJoin ¶
func (d *GossipSystem) NotifyJoin(node *memberlist.Node)
NotifyJoin is called when a node join the cluster
func (*GossipSystem) NotifyLeave ¶
func (d *GossipSystem) NotifyLeave(node *memberlist.Node)
NotifyLeave is called when a node leave the gossip cluster
func (*GossipSystem) NotifyMsg ¶
func (d *GossipSystem) NotifyMsg(b []byte)
func (*GossipSystem) NotifyUpdate ¶
func (d *GossipSystem) NotifyUpdate(node *memberlist.Node)
NotifyUpdate is called when a node change its gossip message
func (*GossipSystem) OnRoleChanged ¶
func (d *GossipSystem) OnRoleChanged(oldRole, newRole string)
OnRoleChanged is a callback to process nodes' role changed
func (*GossipSystem) Remove ¶
func (d *GossipSystem) Remove(addr string) error
Remove is not supported in gossip system
type Hub ¶
type Hub interface { ProcessData(data []byte) Register() chan<- *Client Unregister() chan<- *Client }
Hub is the interface processing Client
type MetaData ¶
type MetaData struct { Role string `json:"role"` // master, standy, probe Epic uint64 `json:"epic"` // epic for message checking Group string `json:"group"` // group(cluster) name ServerPort uint16 `json:"server_port"` // dispatcher port }
MetaData keeps the base infomation of a node
type Node ¶
type Node struct { Name string `json:"name"` IP string `json:"ip"` GossipPort uint16 `json:"gossip_port"` Meta *MetaData `json:"meta"` }
Node contains the unit's topology
type Pusher ¶
type Pusher struct {
// contains filtered or unexported fields
}
Pusher always push the message to one server in the pool TODO: a instance of special Client may be a cute implementation
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server manage all network input and output
func (*Server) Dispatcher ¶
func (s *Server) Dispatcher() *Dispatcher
Dispatcher get the current instance of Dispatcher
type Slot ¶
type Slot struct {
// contains filtered or unexported fields
}
Slot keep the connection for a server
type StaticSystem ¶
StaticSystem is a mannually control distributed system
func NewStaticSystem ¶
func NewStaticSystem(server *Server, role string, group string) *StaticSystem
NewStaticSystem ... there is no standby static system slave can only added by master
func (*StaticSystem) ConfigUpdate ¶
func (d *StaticSystem) ConfigUpdate(key string, val string) error
ConfigUpdate react to dynamic config change
func (*StaticSystem) Join ¶
func (d *StaticSystem) Join(addr string) error
Join adds a node specified by 'addr' to current cluster
func (*StaticSystem) Leave ¶
func (d *StaticSystem) Leave() error
Leave remove current node from its cluster
func (*StaticSystem) ListNodes ¶
func (d *StaticSystem) ListNodes() ([]byte, error)
ListNodes show cluster topology
func (*StaticSystem) Remove ¶
func (d *StaticSystem) Remove(addr string) error
Remove delete a node specified by 'addr' from current cluster