p2c

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2022 License: MIT Imports: 14 Imported by: 0

README

grpc p2c balancer

​ 最近上海疫情, 居家办公了快一个月了, 除了最开始两周早睡早起, 早上还起来跳跳绳运动运动, 后面基本上动一天停一天... 懒惰了不少, 除了看了一本书之外, 文章一篇没发, 之前计划一直想写的go ast规则引擎库也一直没动工, 果然坚持才是最困难的事儿, 下两周工作上又会是比较忙的两周, 今天先来聊下负载均衡里面的p2c算法, 希望上海疫情能够早日结束.

项目地址:

https://github.com/sado0823/go-kitx/blob/master/grpc/balancer/p2c/p2c.go

what?

P2C算法全称pick of 2 choices,相比WRR(加权轮询算法),P2C随机选择两个节点后在这俩节点里选择优胜者,并通过指数加权移动平均算法统计服务端的实时状态,从而做出最优选择。

  • p2c (Pick Of 2 Choices) 二选一: 在多个节点中随机选择两个节点。

选择两个符合健康度的节点进行计算负载, 优先选择出符合标准的节点, 这里计算负载有运用cpu的方式和计算延迟(lag)+节点拥塞度(inflight)的方式

  • EWMA (Exponentially Weighted Moving-Average) 指数加权平均算法公式: vt = vt-1 * β + vt * (1 - β)

这个算法之前我们在自适应限流bbr算法中也有用到, 主要用于计算cpu负载, 优势为能够减小尖刺(网络波动)的影响

vt:当前值

vt-1: 上一周期值

β:常量

  • 牛顿冷却定律衰减模型: β = 1/e^(k*△t)

运用这个模型就要用于计算EWMA中的β

why?

一般的使用场景是:

  • 负载均衡算法尽量选择负载最低的节点
  • 负载均衡算法尽量选择延迟最低的节点
  • 隔离故障节点, 自动摘流
  • 故障节点恢复后自动, 自动接流
  • 判断是否存在某行数据,用以减少对磁盘访问,提高服务的访问性能

how?

基本思想

EWMA

EWMA主要用于与一般的算数平均数作对比, 计算公式为vt = vt-1 * β + vt * (1 - β)

我们用一周的温度计算来做比较, ewma中β值一般在于(0-1)之间, 这里选取10% = 0.1

image-20220412003955372

一周的趋势图如下

image-20220412004052835

从上图可以看出, 使用ewma能够使得曲线非常的平滑, 从而减缓一些尖刺行为, β越大越接近平均值, β越小越接近原值.

有一个一般用的计算β公式为: 2/(n+1)

牛顿冷却定律衰减模型

公式: β = 1/e^(k*△t), e是数学常数, △t为第t次的请求延迟, k表示衰减系数 (代码中实现为10s)

image-20220412010038364

x = k * △t, 则公式为f(x) = 1/e^x, x越大, y越小, 也就是说

  • △t延迟越高, β越小, ewma中β越小, 则越接近原值, 延迟高的情况下凸显出原始延迟, 能够检测到网络毛刺
  • △t延迟越低, β越大, ewma中β越大, 则越接近平均值

基本流程

image-20220412101142089

源码分析

roundrobin

先看google grpc roundrobin内部实现 https://github.com/grpc/grpc-go/blob/master/balancer/roundrobin/roundrobin.go

package roundrobin

import (
	"sync"

	"google.golang.org/grpc/balancer"
	"google.golang.org/grpc/balancer/base"
	"google.golang.org/grpc/grpclog"
	"google.golang.org/grpc/internal/grpcrand"
)

// Name is the name of round_robin balancer.
const Name = "round_robin"

var logger = grpclog.Component("roundrobin")

// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
	return base.NewBalancerBuilder(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
}

func init() {
	balancer.Register(newBuilder())
}

type rrPickerBuilder struct{}

func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
	...
}

type rrPicker struct {
}

func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
	...
}

以上代码省去了关键实现, 可以看出, 要实现一个负载均衡算法, 只要需要实现内部的两个interface

// 1. 有节点更新会调用build方法
type PickerBuilder interface {
	Build(info PickerBuildInfo) balancer.Picker
}
// 2. 挑选节点
type Picker interface {
	Pick(info PickInfo) (PickResult, error)
}
// 3. 注册负载均衡器
func init() {
	balancer.Register(newBuilder())
}
p2c

https://github.com/sado0823/go-kitx/blob/master/grpc/balancer/p2c/p2c.go

接口
// 实现 PickerBuilder 接口
type p2cPickBuilder struct {
}
// 实现 Picker 接口
type p2cPicker struct {
	conns []*subConn
	r     *rand.Rand
	stamp *atomicx.AtomicDuration
	lock  sync.Mutex
}
核心数据结构
type p2cSubConn struct {
	addr resolver.Address
	conn balancer.SubConn

	lagEWMA uint64	// 请求耗时, 计算后的ewma

	inFlight int64	// 节点拥塞度, 正在处理的请求
	successEWMA  uint64	// 请求成功数量
	requests int64	// 请求量

	lastLag int64	// 上一次请求耗时, 用于计算ewma
	pickTime int64	// 上一次选择的时间时间戳
}

在此基础上, 还可以再加上weight字段, 设置权重

load负载计算

有两种方式:

  • load = lagEWMA * inFlight: lagEWMA 相当于平均请求耗时,inFlight 是当前节点正在处理请求的数量,相乘大致计算出了当前节点的网络负载, 我们采用的就是这种方式
  • oad = cpuEWMA * lag * inFlight: cpuEWMA 利用服务端的cpu值计算负载, 可以从balancer.DoneInfo. Trailer中从服务端传递, 实际为metadata.MD类型, map[string][]string的别名
// load = lagEWMA * inFlight
func (s *p2cSubConn) load() int64 {
	lag := int64(math.Sqrt(float64(atomic.LoadUint64(&s.lagEWMA) + 1)))
	load := lag * (atomic.LoadInt64(&s.inFlight) + 1)
	if load == 0 {
		// penalty是初始化没有数据时的惩罚值, 在没有被选过的情况下, 会强制选择一次
		return penalty
	}

	return load
}
Pick过程
	var chosen *p2cSubConn
	switch len(p.conns) {
	case 0:	// 没有节点, 直接报错
		return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
	case 1:	// 一个节点, 直接返回
		chosen = p.choose(p.conns[0], nil)
	case 2: // 两个节点, 返回负载最低的节点
		chosen = p.choose(p.conns[0], p.conns[1])
	default: // 多个节点, 最多经常三次计算, 选择合适的节点
		var node1, node2 *p2cSubConn
		// 三次随机选择节点
		for i := 0; i < pickTimes; i++ {
			a := p.r.Intn(len(p.conns))
			b := p.r.Intn(len(p.conns) - 1)
			if b >= a {
				// 防止出现相同节点
				b++
			}
			node1 = p.conns[a]
			node2 = p.conns[b]

			// 选出一次符合要求的节点则停止
			if node1.healthy() && node2.healthy() {
				break
			}
		}

		chosen = p.choose(node1, node2)
	}
请求结束, 更新节点信息
		// 正在处理的请求数-1
		atomic.AddInt64(&c.inFlight, -1)

		// 计算相对时间
		now := time.Since(initTime)
		last := atomic.SwapInt64(&c.lastLag, int64(now))
		td := int64(now) - last
		if td < 0 {
			td = 0
		}

		// 牛顿冷却定律的衰减模型, 确定ewma中的β值, β = 1/e^(k*△t)
		beta := math.Exp(float64(-td) / float64(decayTime))
		lag := int64(now) - start
		if lag < 0 {
			lag = 0
		}
		olag := atomic.LoadUint64(&c.lagEWMA)
		if olag == 0 {
			beta = 0
		}

		// 指数加权平均算法 vt = vt-1 * β + vt * (1 - β)
		// 存储当前lagEWMA
		atomic.StoreUint64(&c.lagEWMA, uint64(float64(olag)*beta+float64(lag)*(1-beta)))

		success := initSuccess
		if info.Err != nil {
			switch status.Code(info.Err) {
			case codes.DeadlineExceeded, codes.Internal, codes.Unavailable, codes.DataLoss:
				success = 0
			}
		}

		oldSuccess := atomic.LoadUint64(&c.successEWMA)
		// 指数加权平均算法 vt = vt-1 * β + vt * (1 - β)
		// 存储当前successEWMA
		atomic.StoreUint64(&c.successEWMA, uint64(float64(oldSuccess)*beta+float64(success)*(1-beta)))

example

func test() {
	cc, err := grpc.Dial(r.Scheme()+":///test.server",
		grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, p2c.Name)))
	if err != nil {
		t.Fatalf("failed to dial: %v", err)
	}
	defer cc.Close()
}

references

Documentation

Index

Constants

View Source
const (
	Name = "p2c_EWMA"
)

Variables

This section is empty.

Functions

This section is empty.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL