ratelimitv2

package
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2023 License: Apache-2.0, BSD-2-Clause, BSD-3-Clause, + 3 more Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EventTypeStreamUpdate        = "streamUpdate"
	EventTypeClientUpdate        = "clientUpdate"
	EventTypeClientStreamUpdate  = "clientStreamUpdate"
	EventTypeCounterClientUpdate = "counterClientUpdate"
	EventTypeCounterUpdate       = "counterUpdate"
	EventTypeQuotaChange         = "quotaChange"
	ActionAdd                    = "add"
	ActionReplace                = "replace"
	ActionDelete                 = "delete"
	TriggerShareEqually          = "shareEqually"
	TriggerShareGlobal           = "shareGlobal"
)
View Source
const (
	// MaxSlideCount 最大滑窗
	MaxSlideCount = config.MaxSlideCount
)

默认滑窗数量

Variables

This section is empty.

Functions

func CheckRateLimitBatchInitRequest

func CheckRateLimitBatchInitRequest(
	req *apiv2.RateLimitInitRequest, defaultSlideCount uint32) (*apiv2.RateLimitInitResponse, time.Duration)

CheckRateLimitBatchInitRequest 检查限流初始化请求参数

func CheckRateLimitInitRequest

func CheckRateLimitInitRequest(
	req *apiv2.RateLimitInitRequest, defaultSlideCount uint32) (*apiv2.RateLimitInitResponse, time.Duration)

CheckRateLimitInitRequest 检查限流初始化请求参数

func CheckRateLimitReportRequest

func CheckRateLimitReportRequest(req *apiv2.RateLimitReportRequest) *apiv2.TimedRateLimitReportResponse

CheckRateLimitReportRequest 检查限流上报请求参数

func Initialize

func Initialize(ctx context.Context, config *config.Config) error

Initialize 初始化函数

func SetStatics

func SetStatics(loadStatics plugin.Statis)

SetStatics 设置统计插件

Types

type Client

type Client interface {
	// ClientKey 客户端标识
	ClientKey() uint32
	// ClientIP 获取客户端IP
	ClientIP() utils.IPAddress
	// ClientId 获取客户端ID
	ClientId() string
	// SendAndUpdate 发送
	SendAndUpdate(*apiv2.RateLimitResponse, *ClientSendTime, int64) (bool, error)
	// UpdateStreamContext 更新流上下文,返回该stream是否已经更新成功
	UpdateStreamContext(streamCtx *StreamContext) bool
	// Cleanup 清理所有上下文信息
	Cleanup()
	// Detach 原子操作解引用客户端,返回是否解引用成功
	Detach(clientId string, streamCtxId string) bool
	// IsDetached 检查是否已经被解引用
	IsDetached() bool
}

Client 客户端统计数据

func NewClient

func NewClient(
	clientKey uint32, clientIP *utils.IPAddress, clientId string, streamCtx *StreamContext) Client

NewClient 新建客户端

type ClientManager

type ClientManager struct {
	// contains filtered or unexported fields
}

ClientManager 客户端管理器

func NewClientManager

func NewClientManager(maxSize uint32) *ClientManager

NewClientManager 创建客户端管理器

func (*ClientManager) AddClient

func (c *ClientManager) AddClient(
	clientId string, clientIP *utils.IPAddress, streamContext *StreamContext) (apiv2.Code, Client)

AddClient 增加客户端

func (*ClientManager) DelClient

func (c *ClientManager) DelClient(client Client, streamCtxId string) bool

DelClient 删除客户端

type ClientSendTime

type ClientSendTime struct {
	// contains filtered or unexported fields
}

ClientSendTime 记录客户端上次发送时间

func (*ClientSendTime) UpdateLastSendTime

func (c *ClientSendTime) UpdateLastSendTime(value int64) bool

UpdateLastSendTime 更新最后一次发送时间

type ClientStreamUpdateEvent

type ClientStreamUpdateEvent struct {
	// TimeNumber 时间点
	TimeNumber string `json:"time_number"`
	// EventType 事件类型
	EventType string `json:"event"`
	// LastStreamId 最后的流ID
	LastStreamId string `json:"last_stream_id"`
	// StreamId 流ID
	StreamId string `json:"stream_id"`
	// ClientId 客户端ID
	ClientId string `json:"client_id"`
	// IPAddress IP地址
	IPAddress string `json:"ip_address"`
	// Action 动作,上线还是下线
	Action string `json:"action"`
}

ClientStreamUpdateEvent 客户端更新事件

func NewClientStreamUpdateEvent

func NewClientStreamUpdateEvent(
	lastStreamId string, streamId string, client Client, action string) *ClientStreamUpdateEvent

NewClientStreamUpdateEvent 更新客户端事件

func (*ClientStreamUpdateEvent) GetEventType

func (c *ClientStreamUpdateEvent) GetEventType() string

GetEventType 获取事件类型

func (*ClientStreamUpdateEvent) ToJson

func (c *ClientStreamUpdateEvent) ToJson() string

ToJson 变成Json输出

type ClientUpdateEvent

type ClientUpdateEvent struct {
	// 时间点
	TimeNumber string `json:"time_number"`
	// 事件类型
	EventType string `json:"event"`
	// 客户端ID
	ClientId string `json:"client_id"`
	// IP地址
	IPAddress string `json:"ip_address"`
	// 动作,上线还是下线
	Action string `json:"action"`
}

ClientUpdateEvent 客户端更新事件

func NewClientUpdateEvent

func NewClientUpdateEvent(clientId string, ipAddr *utils.IPAddress, action string) *ClientUpdateEvent

NewClientUpdateEvent 更新客户端事件

func (*ClientUpdateEvent) GetEventType

func (c *ClientUpdateEvent) GetEventType() string

GetEventType 获取事件类型

func (*ClientUpdateEvent) ToJson

func (c *ClientUpdateEvent) ToJson() string

ToJson 变成Json输出

type CounterClientUpdateEvent

type CounterClientUpdateEvent struct {
	// TimeNumber 时间点
	TimeNumber string `json:"time_number"`
	// EventType 事件类型
	EventType string `json:"event"`
	// Namespace 命名空间
	Namespace string `json:"namespace"`
	// Service 服务名
	Service string `json:"service"`
	// Labels 标签
	Labels string `json:"labels"`
	// Duration 时间段
	Duration string `json:"duration"`
	// ClientId 客户端ID
	ClientId string `json:"client_id"`
	// IPAddress IP地址
	IPAddress string `json:"ip_address"`
	// Action 动作,上线还是下线
	Action string `json:"action"`
	// ClientCount 客户端总数
	ClientCount int `json:"client_count"`
}

CounterClientUpdateEvent 客户端更新事件

func (*CounterClientUpdateEvent) GetEventType

func (c *CounterClientUpdateEvent) GetEventType() string

GetEventType 获取事件类型

func (*CounterClientUpdateEvent) ToJson

func (c *CounterClientUpdateEvent) ToJson() string

ToJson 变成Json输出

type CounterClients

type CounterClients struct {
	// contains filtered or unexported fields
}

CounterClients 计算器管理的客户端数据结构

func (*CounterClients) AddSender

func (cc *CounterClients) AddSender(sender Client, counter *counterV2)

AddSender 增加调用者

func (*CounterClients) ClientCount

func (cc *CounterClients) ClientCount() uint32

ClientCount 获取客户端数量快照

func (*CounterClients) DelSender

func (cc *CounterClients) DelSender(sender Client, counter *counterV2, counterExpired bool)

DelSender 删除调用者

func (*CounterClients) Init

func (cc *CounterClients) Init()

Init 初始化

type CounterIdentifier

type CounterIdentifier struct {
	Service   string
	Namespace string
	Labels    string
	Duration  time.Duration
}

CounterIdentifier 计数器标识

func NewCounterIdentifier

func NewCounterIdentifier(initReq *apiv2.RateLimitInitRequest, ruleIdx int) *CounterIdentifier

NewCounterIdentifier 创建限流计数器标识

type CounterManagerV2

type CounterManagerV2 struct {
	// contains filtered or unexported fields
}

CounterManagerV2 计数器管理类V2

func NewCounterManagerV2

func NewCounterManagerV2(maxSize uint32, cleanupInterval time.Duration, pushManager PushManager) *CounterManagerV2

NewCounterManagerV2 创建计数器管理类

func (*CounterManagerV2) AddCounter

func (cm *CounterManagerV2) AddCounter(initReq *apiv2.RateLimitInitRequest, ruleIdx int,
	sender Client, expireDuration time.Duration) (apiv2.Code, CounterV2)

AddCounter 增加计数器

func (*CounterManagerV2) GetCounter

func (cm *CounterManagerV2) GetCounter(counterKey uint32) (apiv2.Code, CounterV2)

GetCounter 获取计数器

func (*CounterManagerV2) Start

func (cm *CounterManagerV2) Start(ctx context.Context)

Start 启动过期清理

type CounterSendTime

type CounterSendTime struct {
	// contains filtered or unexported fields
}

CounterSendTime 记录计数器最后一次发送时间

func (*CounterSendTime) UpdateLastSendTime

func (c *CounterSendTime) UpdateLastSendTime(value int64) bool

UpdateLastSendTime 更新最后一次发送时间

type CounterUpdateEvent

type CounterUpdateEvent struct {
	// TimeNumber 时间点
	TimeNumber string `json:"time_number"`
	// EventType 事件类型
	EventType string `json:"event"`
	// Namespace 命名空间
	Namespace string `json:"namespace"`
	// Service 服务名
	Service string `json:"service"`
	// Labels 标签
	Labels string `json:"labels"`
	// Duration 时间段
	Duration string `json:"duration"`
	// Action 触发配额调整的动作
	Action string `json:"action"`
	// Amount 当前配额值
	Amount uint32 `json:"amount"`
}

CounterUpdateEvent 配额变更时间

func (*CounterUpdateEvent) GetEventType

func (q *CounterUpdateEvent) GetEventType() string

GetEventType 获取事件类型

func (*CounterUpdateEvent) ToJson

func (q *CounterUpdateEvent) ToJson() string

ToJson 变成Json输出

type CounterV2

type CounterV2 interface {
	// CounterKey 获取标识
	CounterKey() uint32
	// Identifier 计数器标识
	Identifier() *CounterIdentifier
	// Reload 刷新配额值
	Reload(InitRequest)
	// AcquireQuota 原子增加
	AcquireQuota(client Client, quotaSum *apiv2.QuotaSum,
		nowMs int64, startMicro int64, collector *plugin.RateLimitStatCollectorV2) *apiv2.QuotaLeft
	// SumQuota 获取当前quota总量
	SumQuota(client Client, timestampMs int64) *apiv2.QuotaLeft
	// IsExpired 是否已过期
	IsExpired() bool
	// PushMessage 推送消息
	PushMessage(*PushValue)
	// ExpireDuration 过期周期
	ExpireDuration() time.Duration
	// LastUpdateTime 最后一次更新时间
	LastUpdateTime() int64
	// Update 更新时间戳
	Update()
	// Mode 配额分配模式
	Mode() apiv2.Mode
	// MaxAmount 最大配额数
	MaxAmount() uint32
	// ClientCount 客户端数量
	ClientCount() uint32
	// DelSender 删除客户端
	DelSender(sender Client, expired bool)
	// CleanupSenders 清理没用的客户端
	CleanupSenders(bool)
	// UpdateClientSendTime 更新对应客户端最近一次发送时间
	UpdateClientSendTime(sender Client, sendTimeMicro int64)
}

CounterV2 计数器

func NewCounterV2

func NewCounterV2(counterKey uint32, identifier *CounterIdentifier, initRequest InitRequest) CounterV2

NewCounterV2 创建计数器

type InitRequest

type InitRequest struct {
	MaxAmount      uint32
	SlideCount     int
	Sender         Client
	Duration       time.Duration
	ExpireDuration time.Duration
	AmountMode     apiv2.QuotaMode
	PushManager    PushManager
}

InitRequest 初始化请求

type OccupyAllocator

type OccupyAllocator struct {
	// contains filtered or unexported fields
}

OccupyAllocator 抢占式分配器

func (*OccupyAllocator) Allocate

func (o *OccupyAllocator) Allocate(
	client Client, quotaSum *apiv2.QuotaSum, timestampMs int64, startTimeMicro int64) *apiv2.QuotaLeft

Allocate 分配配额

func (*OccupyAllocator) Mode

func (o *OccupyAllocator) Mode() apiv2.Mode

Mode 返回分配器所属的模式

type PushManager

type PushManager interface {
	// Run 启动push线程
	Run(ctx context.Context)
	// Schedule 准备推送
	Schedule(value *PushValue)
}

PushManager 推送管理器

func NewPushManager

func NewPushManager(workerCount int, chanSize int) (PushManager, error)

NewPushManager 创建推送管理器

type PushValue

type PushValue struct {
	Counter       CounterV2
	Msg           *apiv2.RateLimitResponse
	ExcludeClient string
	// StartTimeMicro 请求进入时间
	StartTimeMicro int64
	// MsgTimeMicro 应答生成时间
	MsgTimeMicro int64
}

PushValue 推送的值

type QuotaAllocator

type QuotaAllocator interface {
	// Mode 返回分配器所属的模式
	Mode() apiv2.Mode
	// Allocate 分配配额
	Allocate(client Client, quotaSum *apiv2.QuotaSum, clientTimeMs int64, serverTimeMicro int64) *apiv2.QuotaLeft
}

QuotaAllocator 请求分配器

func NewOccupyAllocator

func NewOccupyAllocator(slideCount int, intervalMs int, pushManager PushManager, counter CounterV2) QuotaAllocator

NewOccupyAllocator 创建抢占式分配器

type QuotaChangeEvent

type QuotaChangeEvent struct {
	// TimeNumber 时间点
	TimeNumber string `json:"time_number"`
	// EventType 事件类型
	EventType string `json:"event"`
	// Namespace 命名空间
	Namespace string `json:"namespace"`
	// Service 服务名
	Service string `json:"service"`
	// Labels 标签
	Labels string `json:"labels"`
	// Duration 时间段
	Duration string `json:"duration"`
	// Action 触发配额调整的动作
	Action string `json:"action"`
	// ClientId 客户端ID
	ClientId string `json:"client_id"`
	// LatestAmount 原来的配额值
	LatestAmount uint32 `json:"latest_amount"`
	// CurrentAmount 当前配额值
	CurrentAmount uint32 `json:"current_amount"`
}

QuotaChangeEvent 配额变更时间

func (*QuotaChangeEvent) GetEventType

func (q *QuotaChangeEvent) GetEventType() string

GetEventType 获取事件类型

func (*QuotaChangeEvent) ToJson

func (q *QuotaChangeEvent) ToJson() string

ToJson 变成Json输出

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server v2版本的主server逻辑

func GetRateLimitServer

func GetRateLimitServer() (*Server, error)

GetRateLimitServer 获取已经初始化好的Server

func (*Server) AcquireQuota

func (s *Server) AcquireQuota(client Client, startTimeMicro int64, request *apiv2.RateLimitReportRequest,
	collector *plugin.RateLimitStatCollectorV2) (*apiv2.TimedRateLimitReportResponse, CounterV2)

AcquireQuota 获取限流配额

func (*Server) BatchInitializeQuota

func (s *Server) BatchInitializeQuota(ctx context.Context, client Client,
	request *apiv2.RateLimitBatchInitRequest) (*apiv2.RateLimitBatchInitResponse, CounterV2)

BatchInitializeQuota 限流KEY初始化

func (*Server) CleanupClient

func (s *Server) CleanupClient(client Client, streamCtxId string)

CleanupClient 清理客户端

func (*Server) CounterMng

func (s *Server) CounterMng() *CounterManagerV2

CounterMng 获取计数器管理类

func (*Server) InitializeClient

func (s *Server) InitializeClient(request *apiv2.RateLimitInitRequest,
	client Client, clientIP *utils.IPAddress, streamContext *StreamContext) (*apiv2.RateLimitInitResponse, Client)

InitializeClient 初始化客户端

func (*Server) InitializeClientBatch

func (s *Server) InitializeClientBatch(request *apiv2.RateLimitBatchInitRequest, client Client,
	clientIP *utils.IPAddress, streamContext *StreamContext) (*apiv2.RateLimitBatchInitResponse, Client)

InitializeClientBatch 初始化客户端

func (*Server) InitializeQuota

func (s *Server) InitializeQuota(ctx context.Context, client Client,
	request *apiv2.RateLimitInitRequest) (*apiv2.RateLimitInitResponse, CounterV2)

InitializeQuota 限流KEY初始化

type Stream

type Stream interface {
	// Send 推送应答
	Send(*apiv2.RateLimitResponse) error
}

Stream 应答发送Stream

type StreamContext

type StreamContext struct {
	// contains filtered or unexported fields
}

StreamContext 连接上下文

func NewStreamContext

func NewStreamContext(stream Stream) *StreamContext

NewStreamContext 创建连接上下文

func (*StreamContext) ContextId

func (s *StreamContext) ContextId() string

ContextId 上下文唯一标识

func (*StreamContext) Send

func (s *StreamContext) Send(resp *apiv2.RateLimitResponse) error

Send 发送消息

type StreamUpdateEvent

type StreamUpdateEvent struct {
	// 时间点
	TimeNumber string `json:"time_number"`
	// 事件类型
	EventType string `json:"event"`
	// 客户端ID
	StreamId string `json:"stream_id"`
	// IP地址
	IPAddress string `json:"ip_address"`
	// 动作,上线还是下线
	Action string `json:"action"`
}

StreamUpdateEvent 客户端更新事件

func NewStreamUpdateEvent

func NewStreamUpdateEvent(streamId string, ipAddr *utils.IPAddress, action string) *StreamUpdateEvent

NewStreamUpdateEvent 更新客户端事件

func (*StreamUpdateEvent) GetEventType

func (c *StreamUpdateEvent) GetEventType() string

GetEventType 获取事件类型

func (*StreamUpdateEvent) ToJson

func (c *StreamUpdateEvent) ToJson() string

ToJson 变成Json输出

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL