Version: v0.0.0-...-0ae59ff Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2020 License: MIT Imports: 21 Imported by: 0



Package ctl provides influxd-ctl service



View Source
const (
	// RequestTruncateShard represents a request for truncating shard.
	RequestTruncateShard   RequestType = 1
	RequestCopyShard                   = 2
	RequestCopyShardStatus             = 3
	RequestKillCopyShard               = 4
	RequestRemoveShard                 = 5
	RequestRemoveDataNode              = 6
	RequestShowDataNodes               = 7
View Source
const (
	ResponseTruncateShard   ResponseType = 1
	ResponseCopyShard                    = 2
	ResponseCopyShardStatus              = 3
	ResponseKillCopyShard                = 4
	ResponseRemoveShard                  = 5
	ResponseRemoveDataNode               = 6
	ResponseShowDataNodes                = 7
View Source
const (
	// MuxHeader is the header byte used for the TCP muxer.
	MuxHeader = 4


This section is empty.


This section is empty.


type CommonResp

type CommonResp struct {
	Code int    `json:"code"`
	Msg  string `json:"msg"`

type Config

type Config struct {
	Enabled           bool `toml:"enabled"`
	MaxShardCopyTasks int  `toml:"max_shard_copy_tasks"`

func NewConfig

func NewConfig() Config

type CopyShardRequest

type CopyShardRequest struct {
	SourceNodeAddr string `json:"source_node_address"`
	DestNodeAddr   string `json:"dest_node_address"`
	ShardID        uint64 `json:"shard_id"`

type CopyShardResponse

type CopyShardResponse struct {

type CopyShardStatusResponse

type CopyShardStatusResponse struct {
	Tasks []CopyShardTask `json:"tasks"`

type CopyShardTask

type CopyShardTask struct {
	Database    string `json:"database"`
	Rp          string `json:"retention_policy"`
	ShardID     uint64 `json:"shard_id"`
	TotalSize   uint64 `json:"total_size"`
	CurrentSize uint64 `json:"current_size"`
	Source      string `json:"source"`
	Destination string `json:"destination"`

type DataNode

type DataNode struct {
	ID       uint64 `json:"id"`
	TcpAddr  string `json:"tcp_addr"`
	HttpAddr string `json:"http_addr"`

type KillCopyShardRequest

type KillCopyShardRequest struct {
	SourceNodeAddr string `json:"source_node_address"`
	DestNodeAddr   string `json:"dest_node_address"`
	ShardID        uint64 `json:"shard_id"`

type KillCopyShardResponse

type KillCopyShardResponse struct {

type RemoveDataNodeRequest

type RemoveDataNodeRequest struct {
	DataNodeAddr string `json:"data_node_addr"`

type RemoveDataNodeResponse

type RemoveDataNodeResponse struct {

type RemoveShardRequest

type RemoveShardRequest struct {
	DataNodeAddr string `json:"data_node_addr"`
	ShardID      uint64 `json:"shard_id"`

type RemoveShardResponse

type RemoveShardResponse struct {

type RequestType

type RequestType byte

RequestType indicates the typeof ctl request.

type ResponseType

type ResponseType byte

type Service

type Service struct {
	Node *influxdb.Node

	MetaClient interface {
		TruncateShardGroups(t time.Time) error
		DeleteDataNode(id uint64) error
		DataNodeByTCPHost(addr string) (*meta.NodeInfo, error)
		RemoveShardOwner(shardID, nodeID uint64) error
		DataNodes() ([]meta.NodeInfo, error)

	TSDBStore interface {
		DeleteShard(id uint64) error

	Listener net.Listener
	Logger   *zap.Logger

	ShardCopier interface {
		CopyShard(sourceAddr string, shardId uint64) error
		Query() []CopyShardTask
		Kill(shardId uint64, source, destination string)
	// contains filtered or unexported fields

func NewService

func NewService(c Config) *Service

NewService returns a new instance of Service.

func (*Service) Close

func (s *Service) Close() error

Close implements the Service interface.

func (*Service) Open

func (s *Service) Open() error

Open starts the service.

func (*Service) WithLogger

func (s *Service) WithLogger(log *zap.Logger)

WithLogger sets the logger on the service.

type ShardCopier

type ShardCopier struct {
	Node    *influxdb.Node
	Logger  *zap.Logger
	Manager interface {
		Add(task *ShardCopyTask) error
		Remove(id uint64)
		Tasks() []*ShardCopyTask
		Kill(id uint64, source, dest string)

	MetaClient interface {
		ShardOwner(shardID uint64) (database, policy string, sgi *meta.ShardGroupInfo)
		AddShardOwner(shardID, nodeID uint64) error
	TSDBStore interface {
		Path() string
		ShardRelativePath(id uint64) (string, error)
		CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error
		Shard(id uint64) *tsdb.Shard

func (*ShardCopier) CopyShard

func (s *ShardCopier) CopyShard(sourceAddr string, shardId uint64) error

func (*ShardCopier) Kill

func (s *ShardCopier) Kill(shardId uint64, source, destination string)

func (*ShardCopier) Query

func (s *ShardCopier) Query() []CopyShardTask

func (*ShardCopier) WithLogger

func (s *ShardCopier) WithLogger(log *zap.Logger)

type ShardCopyManager

type ShardCopyManager struct {
	// contains filtered or unexported fields

func NewCopyManager

func NewCopyManager(max int) *ShardCopyManager

func (*ShardCopyManager) Add

func (c *ShardCopyManager) Add(task *ShardCopyTask) error

func (*ShardCopyManager) Kill

func (c *ShardCopyManager) Kill(id uint64, source, dest string)

func (*ShardCopyManager) Remove

func (c *ShardCopyManager) Remove(id uint64)

func (*ShardCopyManager) Tasks

func (c *ShardCopyManager) Tasks() []*ShardCopyTask

type ShardCopyTask

type ShardCopyTask struct {
	// contains filtered or unexported fields

type ShowDataNodesResponse

type ShowDataNodesResponse struct {
	DataNodes []DataNode `json:"data_nodes"`

type TruncateShardRequest

type TruncateShardRequest struct {
	DelaySec int64 `json:"delay_sec"`

type TruncateShardResponse

type TruncateShardResponse struct {

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL