Documentation ¶
Index ¶
Constants ¶
const ( // DefaultEtcdTimeout is the default timeout config for etcd. DefaultEtcdTimeout = 5 * time.Second // DefaultAllRetryTime is the default retry time for all pumps, should greter than RetryTime. DefaultAllRetryTime = 20 // RetryTime is the retry time for each pump. RetryTime = 5 // DefaultBinlogWriteTimeout is the default max time binlog can use to write to pump. DefaultBinlogWriteTimeout = 15 * time.Second // CheckInterval is the default interval for check unavaliable pumps. CheckInterval = 30 * time.Second // RetryInterval is the default interval of retrying to write binlog. RetryInterval = 100 * time.Millisecond )
const ( // Range means range algorithm. Range = "range" // Hash means hash algorithm. Hash = "hash" // Score means choose pump by it's score. Score = "score" // LocalUnix means will only use the local pump by unix socket. LocalUnix = "local unix" )
Variables ¶
var ( // Logger is ... Logger = log.New() // ErrNoAvaliablePump means no avaliable pump to write binlog. ErrNoAvaliablePump = errors.New("no avaliable pump to write binlog") )
Functions ¶
This section is empty.
Types ¶
type HashSelector ¶
type HashSelector struct { sync.RWMutex // TsMap saves the map of start_ts with pump when send prepare binlog. // And Commit binlog should send to the same pump. TsMap map[int64]*PumpStatus // PumpMap saves the map of pump's node id with pump. PumpMap map[string]*PumpStatus // the pumps to be selected. Pumps []*PumpStatus }
HashSelector select a pump by hash.
func (*HashSelector) Next ¶
func (h *HashSelector) Next(binlog *pb.Binlog, retryTime int) *PumpStatus
Next implement PumpSelector.Next. Only for Prewrite binlog.
func (*HashSelector) Select ¶
func (h *HashSelector) Select(binlog *pb.Binlog) *PumpStatus
Select implement PumpSelector.Select.
func (*HashSelector) SetPumps ¶
func (h *HashSelector) SetPumps(pumps []*PumpStatus)
SetPumps implement PumpSelector.SetPumps.
type LocalUnixSelector ¶
type LocalUnixSelector struct { sync.RWMutex // the pump to be selected. Pump *PumpStatus }
LocalUnixSelector will always select the local pump, used for compatible with kafka version tidb-binlog.
func (*LocalUnixSelector) Next ¶
func (u *LocalUnixSelector) Next(binlog *pb.Binlog, retryTime int) *PumpStatus
Next implement PumpSelector.Next. Only for Prewrite binlog.
func (*LocalUnixSelector) Select ¶
func (u *LocalUnixSelector) Select(binlog *pb.Binlog) *PumpStatus
Select implement PumpSelector.Select.
func (*LocalUnixSelector) SetPumps ¶
func (u *LocalUnixSelector) SetPumps(pumps []*PumpStatus)
SetPumps implement PumpSelector.SetPumps.
type PumpInfos ¶
type PumpInfos struct { sync.RWMutex // Pumps saves the map of pump's nodeID and pump status. Pumps map[string]*PumpStatus // AvliablePumps saves the whole avaliable pumps' status. AvaliablePumps map[string]*PumpStatus // UnAvaliablePumps saves the unAvaliable pumps. // And only pump with Online state in this map need check is it avaliable. UnAvaliablePumps map[string]*PumpStatus }
PumpInfos saves pumps' infomations in pumps client.
type PumpSelector ¶
type PumpSelector interface { // SetPumps set pumps to be selected. SetPumps([]*PumpStatus) // Select returns a situable pump. Select(*pb.Binlog) *PumpStatus // returns the next pump. Next(*pb.Binlog, int) *PumpStatus }
PumpSelector selects pump for sending binlog.
func NewHashSelector ¶
func NewHashSelector() PumpSelector
NewHashSelector returns a new HashSelector.
func NewLocalUnixSelector ¶
func NewLocalUnixSelector() PumpSelector
NewLocalUnixSelector returns a new LocalUnixSelector.
func NewRangeSelector ¶
func NewRangeSelector() PumpSelector
NewRangeSelector returns a new ScoreSelector.
func NewScoreSelector ¶
func NewScoreSelector() PumpSelector
NewScoreSelector returns a new ScoreSelector.
func NewSelector ¶
func NewSelector(algorithm string) PumpSelector
NewSelector returns a PumpSelector according to the algorithm.
type PumpStatus ¶
type PumpStatus struct { /* Pump has these state: Online: only when pump's state is online that pumps client can write binlog to. Pausing: this pump is pausing, and can't provide write binlog service. And this state will turn into Paused when pump is quit. Paused: this pump is paused, and can't provide write binlog service. Closing: this pump is closing, and can't provide write binlog service. And this state will turn into Offline when pump is quit. Offline: this pump is offline, and can't provide write binlog service forever. */ node.Status // the pump is avaliable or not IsAvaliable bool // the client of this pump Client pb.PumpClient // contains filtered or unexported fields }
PumpStatus saves pump's status.
func NewPumpStatus ¶
func NewPumpStatus(status *node.Status, security *tls.Config) *PumpStatus
NewPumpStatus returns a new PumpStatus according to node's status.
type PumpsClient ¶
type PumpsClient struct { // ClusterID is the cluster ID of this tidb cluster. ClusterID uint64 // the registry of etcd. EtcdRegistry *node.EtcdRegistry // Pumps saves the pumps' information. Pumps *PumpInfos // Selector will select a suitable pump. Selector PumpSelector // the max retry time if write binlog failed. RetryTime int // BinlogWriteTimeout is the max time binlog can use to write to pump. BinlogWriteTimeout time.Duration // Security is the security config Security *tls.Config // contains filtered or unexported fields }
PumpsClient is the client of pumps.
func NewLocalPumpsClient ¶
func NewLocalPumpsClient(etcdURLs, binlogSocket string, timeout time.Duration, securityOpt pd.SecurityOption) (*PumpsClient, error)
NewLocalPumpsClient returns a PumpsClient, this PumpsClient will write binlog by socket file. For compatible with kafka version pump.
func NewPumpsClient ¶
func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.SecurityOption) (*PumpsClient, error)
NewPumpsClient returns a PumpsClient. TODO: get strategy from etcd, and can update strategy in real-time. Use Range as default now.
func (*PumpsClient) WriteBinlog ¶
func (c *PumpsClient) WriteBinlog(binlog *pb.Binlog) error
WriteBinlog writes binlog to a situable pump.
type RangeSelector ¶
type RangeSelector struct { sync.RWMutex // Offset saves the offset in Pumps. Offset int // TsMap saves the map of start_ts with pump when send prepare binlog. // And Commit binlog should send to the same pump. TsMap map[int64]*PumpStatus // PumpMap saves the map of pump's node id with pump. PumpMap map[string]*PumpStatus // the pumps to be selected. Pumps []*PumpStatus }
RangeSelector select a pump by range.
func (*RangeSelector) Next ¶
func (r *RangeSelector) Next(binlog *pb.Binlog, retryTime int) *PumpStatus
Next implement PumpSelector.Next. Only for Prewrite binlog.
func (*RangeSelector) Select ¶
func (r *RangeSelector) Select(binlog *pb.Binlog) *PumpStatus
Select implement PumpSelector.Select.
func (*RangeSelector) SetPumps ¶
func (r *RangeSelector) SetPumps(pumps []*PumpStatus)
SetPumps implement PumpSelector.SetPumps.
type ScoreSelector ¶
type ScoreSelector struct{}
ScoreSelector select a pump by pump's score.
func (*ScoreSelector) Next ¶
func (s *ScoreSelector) Next(binlog *pb.Binlog, retryTime int) *PumpStatus
Next implement PumpSelector.Next. Only for Prewrite binlog.
func (*ScoreSelector) Select ¶
func (s *ScoreSelector) Select(binlog *pb.Binlog) *PumpStatus
Select implement PumpSelector.Select.
func (*ScoreSelector) SetPumps ¶
func (s *ScoreSelector) SetPumps(pumps []*PumpStatus)
SetPumps implement PumpSelector.SetPumps.