Documentation ¶
Index ¶
- Constants
- Variables
- func NewEtcdWrapper(ctx context.Context, endpoints []string, coordinator *Coordinator) (*etcdWrapper, error)
- func Smooth(ctx context.Context, tasks []*KvTask, callbackFunc smoothCallbackFunc, ...) error
- type AssignmentParser
- type Assignor
- type ConsistentHashingAssignor
- type Coordinator
- type CoordinatorOptionsFunc
- func WithAssignor(v Assignor) CoordinatorOptionsFunc
- func WithBiz(v string) CoordinatorOptionsFunc
- func WithEtcdEndpoints(v []string) CoordinatorOptionsFunc
- func WithInstanceId(v string) CoordinatorOptionsFunc
- func WithProtocol(v string) CoordinatorOptionsFunc
- func WithTaskHub(v TaskHub) CoordinatorOptionsFunc
- func WithTaskProvider(v TaskProvider) CoordinatorOptionsFunc
- func WithTenancy(v string) CoordinatorOptionsFunc
- type DoFunc
- type EventType
- type G
- type JsonAssignmentParser
- type KvTask
- type SmoothEvent
- type StdLogger
- type StringOrderEvenlyAssignor
- type Task
- type TaskHub
- type TaskProvider
- type Worker
- type WorkerFactory
- type WorkerStarter
Constants ¶
View Source
const ( StateIdle instanceState = iota StateRevoke StateAssign )
Variables ¶
View Source
var ( // exported ErrParam = errors.New("param err") )
Functions ¶
func NewEtcdWrapper ¶
func NewEtcdWrapper(ctx context.Context, endpoints []string, coordinator *Coordinator) (*etcdWrapper, error)
Types ¶
type AssignmentParser ¶
type Assignor ¶
type Assignor interface {
PerformAssignment(ctx context.Context, tasks []Task, instanceIds []string) (map[string][]Task, error)
}
func NewAssignor ¶
func NewAssignor() Assignor
type ConsistentHashingAssignor ¶
type ConsistentHashingAssignor struct{}
func (*ConsistentHashingAssignor) PerformAssignment ¶
func (a *ConsistentHashingAssignor) PerformAssignment(ctx context.Context, tasks []Task, instanceIds []string) (map[string][]Task, error)
https://github.com/topics/consistent-hashing https://github.com/golang/groupcache/blob/master/consistenthash/consistenthash_test.go https://ai.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html task和instance是多对一关系,task可以看作consistent hashing中的kv,instance是存储节点
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
func StartCoordinator ¶
func StartCoordinator(ctx context.Context, optFunc ...CoordinatorOptionsFunc) (*Coordinator, error)
func (*Coordinator) Close ¶
func (c *Coordinator) Close(ctx context.Context)
type CoordinatorOptionsFunc ¶
type CoordinatorOptionsFunc func(options *coordinatorOptions)
func WithAssignor ¶
func WithAssignor(v Assignor) CoordinatorOptionsFunc
func WithBiz ¶
func WithBiz(v string) CoordinatorOptionsFunc
func WithEtcdEndpoints ¶
func WithEtcdEndpoints(v []string) CoordinatorOptionsFunc
func WithInstanceId ¶
func WithInstanceId(v string) CoordinatorOptionsFunc
func WithProtocol ¶
func WithProtocol(v string) CoordinatorOptionsFunc
func WithTaskHub ¶
func WithTaskHub(v TaskHub) CoordinatorOptionsFunc
func WithTaskProvider ¶
func WithTaskProvider(v TaskProvider) CoordinatorOptionsFunc
func WithTenancy ¶
func WithTenancy(v string) CoordinatorOptionsFunc
type DoFunc ¶
需要goroutine pool,支持start 和 stop指定goroutine的能力, 但是这个goroutine的方法是用户编写,框架层能够等待用户执行完成, 然后识别是否继续或者销毁goroutine与dispatcher不同,不再是生产 消费的模式,所有变化都有api触发
type G ¶
type JsonAssignmentParser ¶
type JsonAssignmentParser struct{}
type KvTask ¶
func ParseKvTask ¶
type SmoothEvent ¶
type StdLogger ¶
type StdLogger interface { Print(v ...interface{}) Printf(format string, v ...interface{}) Println(v ...interface{}) }
StdLogger is used to log error messages.
type StringOrderEvenlyAssignor ¶
type StringOrderEvenlyAssignor struct{}
特定mq场景
func (*StringOrderEvenlyAssignor) PerformAssignment ¶
type TaskHub ¶
type TaskHub interface { // 提供剔除目标 OnRevoked(ctx context.Context, revoke string) ([]Task, error) // 提供分配结果 OnAssigned(ctx context.Context, assignment string) ([]Task, error) Assignment(ctx context.Context) string UnmarshalAssignment(ctx context.Context, assignment string) ([]Task, error) }
解决抽象层面的问题,对接coordinator,相当于抽象类
type Worker ¶
type Worker interface { Revoke(ctx context.Context, tasks []Task) error Assign(ctx context.Context, tasks []Task) error }
具体业务场景下,需要业务实现,是Worker要求业务实现的
type WorkerFactory ¶
type WorkerFactory interface {
New(ctx context.Context, task Task) (WorkerStarter, error)
}
Source Files ¶
Click to show internal directories.
Click to hide internal directories.