Documentation
¶
Index ¶
- Constants
- func KeepRegister(ctx context.Context, cli *clientv3.Client, key, value string) (*etcdRegistry, error)
- func NewEtcdRegistry(cli *clientv3.Client) *etcdRegistry
- func NewLeaderDAO(client *clientv3.Client) *leaderDAO
- func NewLeaderDataDAO(client *clientv3.Client) *leaderDataDAO
- func NewNodeDAO(client *clientv3.Client) *nodeDAO
- func NewRunningTaskDAO(client *clientv3.Client) *runningTaskDAO
- func NewShardDAO(client *clientv3.Client) *shardDAO
- func NewSystemConfigDAO(client *clientv3.Client) *systemConfigDAO
- func NewTaskConfigDAO(client *clientv3.Client) *taskConfigDAO
- type ILeaderDAO
- type ILeaderDataDAO
- type INodeDAO
- type IRunningTaskDAO
- type IShardDAO
- type ITaskConfigDAO
- type InternalTaskType
- type NodeWithShards
- type Nodes
- type Shards
- type SystemConfig
- type TaskConfig
- type TaskConfigStore
- type TaskNodeShardStore
- type TaskWithNodesWithShards
Constants ¶
View Source
const EtcdCampaignTimeout = 30 // time.Second
View Source
const EtcdKeepAliveRetryInterval = 5 * time.Second
View Source
const EtcdKeyTTL = 10
View Source
const EtcdQueryTimeout = 3 * time.Second
Variables ¶
This section is empty.
Functions ¶
func KeepRegister ¶
func NewEtcdRegistry ¶
func NewLeaderDAO ¶
func NewLeaderDataDAO ¶
func NewNodeDAO ¶
func NewRunningTaskDAO ¶
func NewShardDAO ¶
func NewSystemConfigDAO ¶
func NewTaskConfigDAO ¶
Types ¶
type ILeaderDAO ¶
type ILeaderDataDAO ¶
type IRunningTaskDAO ¶
type IRunningTaskDAO interface {
Put(ctx context.Context, taskName, node, shard string) error
Get(ctx context.Context, taskName, node, shard string) (string, error)
Delete(ctx context.Context, taskName, node, shard string) error
DeleteTaskNode(ctx context.Context, taskName, node string) error
DeleteTask(ctx context.Context, taskName string) error
DeleteAll(ctx context.Context) error
ListByTaskNode(ctx context.Context, taskName, node string) (Shards, error)
ListByTask(ctx context.Context, task string) ([]*NodeWithShards, error)
List(ctx context.Context) ([]*TaskWithNodesWithShards, error)
}
type IShardDAO ¶
type IShardDAO interface {
Put(ctx context.Context, taskName, node string, shards Shards) error
Get(ctx context.Context, taskName, node string) (Shards, error)
GetByTask(ctx context.Context, taskName string) ([]*NodeWithShards, error)
GetAll(ctx context.Context) ([]*TaskWithNodesWithShards, error)
Delete(ctx context.Context, taskName, node string) error
DeleteTask(ctx context.Context, taskName string) error
DeleteAll(ctx context.Context) error
WatchShards(ctx context.Context, task, node string) (<-chan Shards, error)
WatchForNewTask(ctx context.Context) (<-chan TaskNodeShardStore, error)
}
type ITaskConfigDAO ¶
type ITaskConfigDAO interface {
Put(ctx context.Context, value *TaskConfig) error
Get(ctx context.Context, taskName string) (*TaskConfig, error)
List(ctx context.Context) ([]*TaskConfig, error)
Watch(ctx context.Context) (<-chan []*TaskConfig, error)
WatchTaskConfig(ctx context.Context, taskName string) (<-chan *TaskConfig, error)
}
type InternalTaskType ¶
type InternalTaskType uint32
const ( InternalTaskType_Task InternalTaskType = 1 //task InternalTaskType_Schedule InternalTaskType = 2 //timer task InternalTaskType_SummaryMapTask InternalTaskType = 3 //summary task - MapTask InternalTaskType_SummaryReduce1Task InternalTaskType = 4 //summary task - Reduce1Task InternalTaskType_SummaryReduce2Task InternalTaskType = 5 //summary task - Reduce2Task InternalTaskType_SummarySaveReduceResultTask InternalTaskType = 6 //summary task - SaveReduceResultTask InternalTaskType_SummaryCleanTask InternalTaskType = 7 //summary task - CleanTask )
type NodeWithShards ¶
type SystemConfig ¶
type SystemConfig struct {
// Check leader changing every 10 seconds
LeaderChangeCheckInterval int64 `json:"leader_change_check_interval"`
// Check node changing every 30 seconds
NodeChangeCheckInterval int64 `json:"node_change_check_interval"`
}
func (*SystemConfig) Decode ¶
func (c *SystemConfig) Decode(value []byte) error
func (*SystemConfig) Encode ¶
func (c *SystemConfig) Encode() string
type TaskConfig ¶
type TaskConfig struct {
// unique name
Name string `json:"name"`
// only for timer task - Full crontab specs, e.g. "* * * * * ?"
Spec string `json:"spec"`
// total shards among all nodes
ShardNumber uint32 `json:"shard_number"`
// when fetch data,take limit size
TakeLimit uint32 `json:"take_limit"`
// if no data, pause milliseconds
NoDataPause uint64 `json:"no_data_pause"`
// custom parameter
Params string `json:"params"`
// turn on or off
Alive bool `json:"alive"`
// task type
TaskType InternalTaskType `json:"task_type"`
}
func (*TaskConfig) Decode ¶
func (c *TaskConfig) Decode(value []byte) error
func (*TaskConfig) Encode ¶
func (c *TaskConfig) Encode() string
type TaskConfigStore ¶
type TaskConfigStore map[string]*TaskConfig
type TaskNodeShardStore ¶
type TaskWithNodesWithShards ¶
type TaskWithNodesWithShards struct {
TaskName string
NodeWithShards []*NodeWithShards
}
Click to show internal directories.
Click to hide internal directories.