LRMF

package module
v0.0.0-...-3039a1d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2021 License: Apache-2.0 Imports: 18 Imported by: 0

README

LRMF(Limited Resource Management Framework)

支持以sdk的方式集成到go程序,利用etcd服务实现在不同go进程之间分配任务,类似存储系统中的placement driver角色,这里做的是任务和进程的映射。
Limited:目前仅支持在单点上做任务分配的计算,计算能力不能横向扩容。映射算法支持定制化,算法能够支持较大量的任务和进程资源的映射,但是映射的结果需要通过etcd在集群范围内的共享实现协调机制,计算和etcd本身都会随着任务量增加成为瓶颈。
目前的主要应用场景: k级别的任务,由当前go服务组成的worker集群负责处理。 例如:

  • kafka mm2中的topic/partition/consumergroup维度的任务拆分和分配。
  • 利用redis做延迟队列,对zset做存储/消费速率上的容量扩容,可以使用LRMF做分片与消费进程之间的任务分配。
  • 将若干任务在某个维度聚合成group,group与进程之间的分配关系也可以使用LRMF。

Table of Contents

Getting Started

Installing

安装Go,然后运行: go get github.com/entertainment-venue/LRMF

Concept explanation

Task

任务数据结构,接口实现以下接口:

type Task interface {
	// 按照Key做任务分布
	Key(ctx context.Context) string

	// Value代表实际任务内容
	Value(ctx context.Context) string
}

统一使用LRMF内部定义的Task结构体KvTask,结构如下:

type KvTask struct {
	K string `json:"k"`
	V string `json:"v"`
}

TaskProvider

任务的拆解由使用LRMF的go程序提供,实现以下接口:

type TaskProvider interface {
	Tasks(ctx context.Context) ([]Task, error)
}

Assignor

用户可以实现自己的任务/资源映射算法,目前内置的有:

  • ConsistentHashingAssignor
  • StringOrderEvenlyAssignor

具体实现可以阅读assignor.go源码,接口如下:

type Assignor interface {
	PerformAssignment(ctx context.Context, tasks []Task, instanceIds []string) (map[string][]Task, error)
}

Worker

用户需要实现Worker接口,接收(Assign)或者删除(Revoke)任务

type Worker interface {
    Revoke(ctx context.Context, tasks []Task) error
    Assign(ctx context.Context, tasks []Task) error
}

Example

Simple

https://github.com/entertainment-venue/LRMF/blob/main/decoupling_test.go#L32

Flexibility

type testTaskProvider struct{}

func (config *testTaskProvider) Tasks(ctx context.Context) ([]Task, error) {
	var tasks []Task
	task1 := &taskTest{K: "key1", V: "value1"}
	task2 := &taskTest{K: "key2", V: "value2"}
	task3 := &taskTest{K: "key3", V: "value3"}
	tasks = append(tasks, task1)
	tasks = append(tasks, task2)
	tasks = append(tasks, task3)
	return tasks, nil
}

func (config *testTaskProvider) Tenancy() string {
	return "default"
}

type testWorker struct {
	// 区分不同的instance
	InstanceId string
}

func (w *testWorker) Revoke(ctx context.Context, tasks []Task) error {
	for _, task := range tasks {
		Logger.Printf("instance %s revoke task %s", w.InstanceId, task.Key(ctx))
	}
	return nil
}

func (w *testWorker) Assign(ctx context.Context, tasks []Task) error {
	for _, task := range tasks {
		Logger.Printf("instance %s assign task %s", w.InstanceId, task.Key(ctx))
	}
	return nil
}

func main() {
	taskProvider := &testTaskProvider{}
	assignor := &StringOrderEvenlyAssignor{}

	worker := &testWorker{InstanceId: instanceId}
	taskHub := NewTaskHub(context.TODO(), worker)

	instanceId := fmt.Sprintf("testInstance_%d", i)

	coordinator, err := StartCoordinator(
		context.TODO(),
		WithEtcdEndpoints([]string{"127.0.0.1:2379"}),
		WithProtocol("foo"),
		WithBiz("bar"),
		WithTenancy("tenancy"),
		WithInstanceId(instanceId),
		WithTaskHub(taskHub),
		WithTaskProvider(taskProvider),
		WithAssignor(assignor))
	if err != nil {
	    panic(err)
	}

	ch := make(chan struct{})
	<-ch
}

Documentation

Index

Constants

View Source
const (
	StateIdle instanceState = iota
	StateRevoke
	StateAssign
)

Variables

View Source
var (
	// exported
	ErrParam = errors.New("param err")
)

Functions

func NewEtcdWrapper

func NewEtcdWrapper(ctx context.Context, endpoints []string, coordinator *Coordinator) (*etcdWrapper, error)

func Smooth

func Smooth(ctx context.Context, tasks []*KvTask, callbackFunc smoothCallbackFunc, optFunc ...CoordinatorOptionsFunc) error

Types

type AssignmentParser

type AssignmentParser interface {
	Unmarshal(ctx context.Context, assignment string) ([]Task, error)
}

type Assignor

type Assignor interface {
	PerformAssignment(ctx context.Context, tasks []Task, instanceIds []string) (map[string][]Task, error)
}

func NewAssignor

func NewAssignor() Assignor

type ConsistentHashingAssignor

type ConsistentHashingAssignor struct{}

func (*ConsistentHashingAssignor) PerformAssignment

func (a *ConsistentHashingAssignor) PerformAssignment(ctx context.Context, tasks []Task, instanceIds []string) (map[string][]Task, error)

https://github.com/topics/consistent-hashing https://github.com/golang/groupcache/blob/master/consistenthash/consistenthash_test.go https://ai.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html task和instance是多对一关系,task可以看作consistent hashing中的kv,instance是存储节点

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

func StartCoordinator

func StartCoordinator(ctx context.Context, optFunc ...CoordinatorOptionsFunc) (*Coordinator, error)

func (*Coordinator) Close

func (c *Coordinator) Close(ctx context.Context)

func (*Coordinator) JoinGroup

func (c *Coordinator) JoinGroup(ctx context.Context) error

func (*Coordinator) TriggerRb

func (c *Coordinator) TriggerRb(ctx context.Context) error

type CoordinatorOptionsFunc

type CoordinatorOptionsFunc func(options *coordinatorOptions)

func WithAssignor

func WithAssignor(v Assignor) CoordinatorOptionsFunc

func WithBiz

func WithBiz(v string) CoordinatorOptionsFunc

func WithEtcdEndpoints

func WithEtcdEndpoints(v []string) CoordinatorOptionsFunc

func WithInstanceId

func WithInstanceId(v string) CoordinatorOptionsFunc

func WithProtocol

func WithProtocol(v string) CoordinatorOptionsFunc

func WithTaskHub

func WithTaskHub(v TaskHub) CoordinatorOptionsFunc

func WithTaskProvider

func WithTaskProvider(v TaskProvider) CoordinatorOptionsFunc

func WithTenancy

func WithTenancy(v string) CoordinatorOptionsFunc

type DoFunc

type DoFunc func(ctx context.Context, task Task) error

需要goroutine pool,支持start 和 stop指定goroutine的能力, 但是这个goroutine的方法是用户编写,框架层能够等待用户执行完成, 然后识别是否继续或者销毁goroutine与dispatcher不同,不再是生产 消费的模式,所有变化都有api触发

type EventType

type EventType int
const (
	EventRevoke EventType = iota
	EventAssign
)

type G

type G struct {
	// generation的Id,和instanceId区分开
	// 选择int类型,对于generation的操作必须是顺序的,不能回退
	Id int64 `json:"id"`

	// 限定参与此次rb的instance
	Participant []string `json:"participant"`

	// 开始时间,用于计算rb timeout
	Timestamp int64 `json:"timestamp"`
}

func ParseG

func ParseG(ctx context.Context, val string) *G

func (*G) LeaseID

func (g *G) LeaseID() clientv3.LeaseID

func (*G) String

func (g *G) String() string

type JsonAssignmentParser

type JsonAssignmentParser struct{}

func (*JsonAssignmentParser) Unmarshal

func (p *JsonAssignmentParser) Unmarshal(_ context.Context, assignment string) ([]Task, error)

type KvTask

type KvTask struct {
	K string `json:"k"`
	V string `json:"v"`
}

func ParseKvTask

func ParseKvTask(tasks []Task) []*KvTask

func (*KvTask) Key

func (t *KvTask) Key(_ context.Context) string

func (*KvTask) Value

func (t *KvTask) Value(_ context.Context) string

type SmoothEvent

type SmoothEvent struct {
	Typ  EventType
	Task Task
}

type StdLogger

type StdLogger interface {
	Print(v ...interface{})
	Printf(format string, v ...interface{})
	Println(v ...interface{})
}

StdLogger is used to log error messages.

var Logger StdLogger = log.New(os.Stdout, "[LRMF] ", log.LstdFlags|log.Lshortfile)

type StringOrderEvenlyAssignor

type StringOrderEvenlyAssignor struct{}

特定mq场景

func (*StringOrderEvenlyAssignor) PerformAssignment

func (a *StringOrderEvenlyAssignor) PerformAssignment(ctx context.Context, tasks []Task, instanceIds []string) (map[string][]Task, error)

type Task

type Task interface {
	// 按照Key做任务分布
	Key(ctx context.Context) string

	// Value代表实际任务内容
	Value(ctx context.Context) string
}

type TaskHub

type TaskHub interface {
	// 提供剔除目标
	OnRevoked(ctx context.Context, revoke string) ([]Task, error)

	// 提供分配结果
	OnAssigned(ctx context.Context, assignment string) ([]Task, error)

	Assignment(ctx context.Context) string

	UnmarshalAssignment(ctx context.Context, assignment string) ([]Task, error)
}

解决抽象层面的问题,对接coordinator,相当于抽象类

func NewTaskHub

func NewTaskHub(_ context.Context, workerHub Worker) TaskHub

包外访问

type TaskProvider

type TaskProvider interface {
	Tasks(ctx context.Context) ([]Task, error)
}

type Worker

type Worker interface {
	Revoke(ctx context.Context, tasks []Task) error
	Assign(ctx context.Context, tasks []Task) error
}

具体业务场景下,需要业务实现,是Worker要求业务实现的

func NewWorker

func NewWorker(wf WorkerFactory) Worker

具体业务场景下调用,给到coordinator

type WorkerFactory

type WorkerFactory interface {
	New(ctx context.Context, task Task) (WorkerStarter, error)
}

type WorkerStarter

type WorkerStarter interface {
	Start(ctx context.Context, task Task) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL