server

package
v0.0.0-...-f468990 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2019 License: Apache-2.0 Imports: 37 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// HealthInterval is the minimum time the cluster should be healthy
	// before accepting add member requests.
	HealthInterval = 5 * time.Second
	//DialTimeout is the timeout of dial
	DialTimeout = 5 * time.Second
	//ProposeMaxRetryCount is the max count of propose retry times
	ProposeMaxRetryCount = 1000
)

Variables

View Source
var (
	//ErrStopped return for kingbus server stopped
	ErrStopped = errors.New("kingbus: server stopped")
	//ErrStarted return for kingbus server started
	ErrStarted = errors.New("kingbus: server already started")
	//ErrCanceled return for request be cancelled
	ErrCanceled = errors.New("kingbus: request cancelled")
	//ErrTimeout return for request timed out
	ErrTimeout = errors.New("kingbus: request timed out")
	//ErrTimeoutDueToLeaderFail return for request timed out,possibly due to previous leader failure
	ErrTimeoutDueToLeaderFail = errors.New("kingbus: request timed out, possibly due to previous leader failure")
	//ErrTimeoutDueToConnectionLost return for request timed out,possibly due to connection lost
	ErrTimeoutDueToConnectionLost = errors.New("kingbus: request timed out, possibly due to connection lost")
	//ErrNotEnoughStartedMembers return for re-configuration failed due to not enough started members
	ErrNotEnoughStartedMembers = errors.New("kingbus: re-configuration failed due to not enough started members")
	//ErrRequestTooLarge return for request is too large
	ErrRequestTooLarge = errors.New("kingbus: request is too large")
	//ErrNoSpace return for no space
	ErrNoSpace = errors.New("kingbus: no space")
	//ErrTooManyRequests return for too many requests
	ErrTooManyRequests = errors.New("kingbus: too many requests")
	//ErrUnhealthy return for unhealthy cluster
	ErrUnhealthy = errors.New("kingbus: unhealthy cluster")
	//ErrKeyNotFound return for key not found
	ErrKeyNotFound = errors.New("kingbus: key not found")
	//ErrCorrupt return for corrupt cluster
	ErrCorrupt = errors.New("kingbus: corrupt cluster")
	//ErrUnsupport return for apply unsupport event type
	ErrUnsupport = errors.New("apply unsupport event type")
	//ErrArgs return for args are not available
	ErrArgs = errors.New("binlog_server:args are not available")
	//ErrNotContain return for slave gtidset not contain in master
	ErrNotContain = errors.New("binlog_server:slave gtidset not contain in master")
	//ErrUUIDIsNull return for the uuid of server is null
	ErrUUIDIsNull = errors.New("binlog_server:the uuid of server is null")
)

Functions

This section is empty.

Types

type BinlogProgress

type BinlogProgress struct {
	// contains filtered or unexported fields
}

BinlogProgress is the progress of receiving binlog

func (*BinlogProgress) CurrentGtidStr

func (s *BinlogProgress) CurrentGtidStr() string

CurrentGtidStr get the current gtid

func (*BinlogProgress) ExecutedGtidSetClone

func (s *BinlogProgress) ExecutedGtidSetClone() gomysql.GTIDSet

ExecutedGtidSetClone geth the executed gtid set clone, deep copy

func (*BinlogProgress) ExecutedGtidSetStr

func (s *BinlogProgress) ExecutedGtidSetStr() string

ExecutedGtidSetStr get the executed gtid set

func (*BinlogProgress) LastBinlogFile

func (s *BinlogProgress) LastBinlogFile() string

LastBinlogFile get the last file of binlog event

func (*BinlogProgress) LastFilePosition

func (s *BinlogProgress) LastFilePosition() uint32

LastFilePosition get the last file position of binlog event

type BinlogServer

type BinlogServer struct {
	// contains filtered or unexported fields
}

BinlogServer is a binlog server,send binlog event to slave. The generic process: 1.authentication SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM' SET @master_binlog_checksum='NONE' SET @master_heartbeat_period=%d 2.COM_REGISTER_SLAVE 3.semi-sync: SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'; SET @rpl_semi_sync_slave = 1 4.COM_BINLOG_DUMP_GTID

func NewBinlogServer

func NewBinlogServer(cfg *config.BinlogServerConfig, ki KingbusInfo, store storage.Storage, broadcast *utils.Broadcast) (*BinlogServer, error)

NewBinlogServer create a binlog server

func (*BinlogServer) CheckGtidSet

func (s *BinlogServer) CheckGtidSet(flavor string, slaveExecutedGtidSet gomysql.GTIDSet) error

CheckGtidSet the illegal of the slave executed gtid

func (*BinlogServer) DumpBinlogAt

func (s *BinlogServer) DumpBinlogAt(ctx context.Context,
	startRaftIndex uint64, slaveGtids *gomysql.MysqlGTIDSet,
	eventC chan<- *storagepb.BinlogEvent, errorC chan<- error) error

DumpBinlogAt implements dump binlog event by slave executed gtid set

func (*BinlogServer) GetFde

func (s *BinlogServer) GetFde(preGtidEventIndex uint64) ([]byte, error)

GetFde get the FORMAT_DESCRIPTION_EVENT by previous gtids log event raft index

func (*BinlogServer) GetGtidSet

func (s *BinlogServer) GetGtidSet(flavor string, key string) (gomysql.GTIDSet, error)

GetGtidSet get gtid set

func (*BinlogServer) GetMasterInfo

func (s *BinlogServer) GetMasterInfo() (*mysql.MasterInfo, error)

GetMasterInfo get the master information connected by syncer

func (*BinlogServer) GetMySQLDumpAt

func (s *BinlogServer) GetMySQLDumpAt(slaveExecutedGtids *gomysql.MysqlGTIDSet) (uint64, error)

GetMySQLDumpAt return raft index, dump binlog event from this position TODO if gtids is empty set, and previous_gtids also is empty, ok!

func (*BinlogServer) GetNextBinlogFile

func (s *BinlogServer) GetNextBinlogFile(startRaftIndex uint64) (string, error)

GetNextBinlogFile get next binlog file by raft index

func (*BinlogServer) GetSlaves

func (s *BinlogServer) GetSlaves() map[string]*mysql.Slave

GetSlaves get all slaves connected the binlog server

func (*BinlogServer) LastBinlogFile

func (s *BinlogServer) LastBinlogFile() string

LastBinlogFile return the last binlog file

func (*BinlogServer) LastFilePosition

func (s *BinlogServer) LastFilePosition() uint32

LastFilePosition return the last binlog file position

func (*BinlogServer) RegisterSlave

func (s *BinlogServer) RegisterSlave(slave *mysql.Slave) error

RegisterSlave implements register slave into binlog server

func (*BinlogServer) Start

func (s *BinlogServer) Start()

Start implements binlog server start

func (*BinlogServer) Stop

func (s *BinlogServer) Stop()

Stop implements binlog server stop

func (*BinlogServer) UnregisterSlave

func (s *BinlogServer) UnregisterSlave(uuid string)

UnregisterSlave unregister slave by uuid

type KingbusInfo

type KingbusInfo interface {
	AppliedIndex() uint64
	LastBinlogFile() string
	LastFilePosition() uint32
	ExecutedGtidSetStr() string
}

KingbusInfo get the kingbus server information

type KingbusServer

type KingbusServer struct {
	Cfg *config.KingbusServerConfig

	//lock for leadElectedTime
	Mu sync.RWMutex
	// contains filtered or unexported fields
}

KingbusServer is a instance run all sub servers

func NewKingbusServer

func NewKingbusServer(cfg *config.KingbusServerConfig) (*KingbusServer, error)

NewKingbusServer create a kingbus server

func (*KingbusServer) AddMember

func (s *KingbusServer) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)

AddMember member into raft cluster, only executed in lead node

func (*KingbusServer) AppliedIndex

func (s *KingbusServer) AppliedIndex() uint64

AppliedIndex get applied index

func (*KingbusServer) CommittedIndex

func (s *KingbusServer) CommittedIndex() uint64

CommittedIndex get committed index

func (*KingbusServer) CurrentGtidStr

func (s *KingbusServer) CurrentGtidStr() string

CurrentGtidStr return current gtid

func (*KingbusServer) ExecutedGtidSetStr

func (s *KingbusServer) ExecutedGtidSetStr() string

ExecutedGtidSetStr return executed gtid

func (*KingbusServer) GetIP

func (s *KingbusServer) GetIP() string

GetIP get ip of kingbus server

func (*KingbusServer) GetServerStatus

func (s *KingbusServer) GetServerStatus(svrType config.SubServerType) interface{}

GetServerStatus get the sub server status

func (*KingbusServer) ID

func (s *KingbusServer) ID() types.ID

ID get kingbus server id

func (*KingbusServer) IsBinlogServerStarted

func (s *KingbusServer) IsBinlogServerStarted() bool

IsBinlogServerStarted return if binlog server started

func (*KingbusServer) IsIDRemoved

func (s *KingbusServer) IsIDRemoved(id uint64) bool

IsIDRemoved return if the kingbus has been removed

func (*KingbusServer) IsLeader

func (s *KingbusServer) IsLeader() bool

IsLeader return the node is lead

func (*KingbusServer) IsSyncerStarted

func (s *KingbusServer) IsSyncerStarted() bool

IsSyncerStarted return if syncer started

func (*KingbusServer) LastBinlogFile

func (s *KingbusServer) LastBinlogFile() string

LastBinlogFile return last binlog file

func (*KingbusServer) LastFilePosition

func (s *KingbusServer) LastFilePosition() uint32

LastFilePosition return last binlog file position

func (*KingbusServer) Leader

func (s *KingbusServer) Leader() types.ID

Leader get raft cluster leader

func (*KingbusServer) Process

func (s *KingbusServer) Process(ctx context.Context, m raftpb.Message) error

Process takes a raft message and applies it to the server's raft state machine, respecting any timeout of the given context.

func (*KingbusServer) Propose

func (s *KingbusServer) Propose(data []byte) error

Propose data,only execute in lead node,follower node will be forbidden

func (*KingbusServer) ProposeWithRetry

func (s *KingbusServer) ProposeWithRetry(ctx context.Context, data []byte)

ProposeWithRetry implements propose data with retry

func (*KingbusServer) RemoveMember

func (s *KingbusServer) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)

RemoveMember member from raft cluster, only executed in lead node

func (*KingbusServer) ReportSnapshot

func (s *KingbusServer) ReportSnapshot(id uint64, status etcdraft.SnapshotStatus)

ReportSnapshot reports snapshot sent status to the raft state machine, and clears the used snapshot from the snapshot store.

func (*KingbusServer) ReportUnreachable

func (s *KingbusServer) ReportUnreachable(id uint64)

ReportUnreachable report unreachable

func (*KingbusServer) Run

func (s *KingbusServer) Run()

Run kingbus server

func (*KingbusServer) StartProposeBinlog

func (s *KingbusServer) StartProposeBinlog(ctx context.Context)

StartProposeBinlog start propose binlog event

func (*KingbusServer) StartServer

func (s *KingbusServer) StartServer(svrType config.SubServerType, args interface{}) error

StartServer start sub servers:syncer server or binlog master server

func (*KingbusServer) Stop

func (s *KingbusServer) Stop()

Stop kingbus server

func (*KingbusServer) StopServer

func (s *KingbusServer) StopServer(svrType config.SubServerType)

StopServer stop sub server

func (*KingbusServer) SyncAdminURL

func (s *KingbusServer) SyncAdminURL()

SyncAdminURL sync the admin url between raft cluster since only propose data in lead node, other followers should wait apply the lead admin url,then send http api request to lead admin url.

func (*KingbusServer) Term

func (s *KingbusServer) Term() uint64

Term get raft term

func (*KingbusServer) UpdateMember

func (s *KingbusServer) UpdateMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)

UpdateMember member in raft cluster, only executed in lead node

type PrometheusServer

type PrometheusServer struct {
	// contains filtered or unexported fields
}

PrometheusServer provides a container with config parameters for the Prometheus Exporter

func NewPrometheusServer

func NewPrometheusServer(addr string, r metrics.Registry,
	promRegistry prometheus.Registerer, FlushInterval time.Duration) *PrometheusServer

NewPrometheusServer returns a Provider that produces Prometheus metrics. Namespace and subsystem are applied to all produced metrics.

func (*PrometheusServer) Run

func (c *PrometheusServer) Run()

Run prometheus server

func (*PrometheusServer) Stop

func (c *PrometheusServer) Stop()

Stop prometheus server

type Syncer

type Syncer struct {
	// contains filtered or unexported fields
}

Syncer is a mock mysql slave, and receive binlog from master and propose event into raft cluster

func NewSyncer

func NewSyncer(cfg *config.SyncerConfig, store storage.Storage) (*Syncer, error)

NewSyncer create syncer in lead node, and init its gtid set

func (*Syncer) BinlogEventC

func (s *Syncer) BinlogEventC() chan *storagepb.BinlogEvent

BinlogEventC get binlogEvent channel

func (*Syncer) DataC

func (s *Syncer) DataC() chan []byte

DataC get data channel

func (*Syncer) Execute

func (s *Syncer) Execute(cmd string, args ...interface{}) (rr *gomysql.Result, err error)

Execute a SQL in Master

func (*Syncer) GetMasterInfo

func (s *Syncer) GetMasterInfo() error

GetMasterInfo get master info

func (*Syncer) Start

func (s *Syncer) Start(gset gomysql.GTIDSet) error

Start syncer

func (*Syncer) Stop

func (s *Syncer) Stop()

Stop syncer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL