balancer

package
v0.0.0-...-8b14664 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2026 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultDQNHiddenSize     = 128
	DefaultDQNEpsilon        = 0.05
	DefaultDQNGamma          = 0.9
	DefaultDQNLearningRate   = 0.001
	DefaultDQNReplayBufSize  = 50_000
	DefaultDQNBatchSize      = 32
	DefaultDQNMinReplay      = 64
	DefaultDQNTrainEvery     = 4
	DefaultDQNFallbackRatio  = 0.8
	DefaultDQNLoadThreshold  = 0.75
	DefaultDQNExpChannelSize = 4096
)

DQN hyperparameters (user-facing defaults).

View Source
const (
	PSARebalanceLoadFactor = 2.0
)

PSA constants.

Variables

This section is empty.

Functions

This section is empty.

Types

type Balancer

type Balancer interface {
	// SelectPartition returns the partition index for a given topic/key pair.
	SelectPartition(topic string, key []byte, numPartitions int) int

	// OnMetrics is called periodically with fresh broker metrics.
	OnMetrics(m broker.Metrics)
}

Balancer selects a target partition for an incoming message.

type DQNBalancer

type DQNBalancer struct {
	// contains filtered or unexported fields
}

func NewDQNBalancer

func NewDQNBalancer(numPartitions int, opts ...DQNOption) *DQNBalancer

func (*DQNBalancer) DroppedExperience

func (d *DQNBalancer) DroppedExperience() int64

func (*DQNBalancer) IsFallbackActive

func (d *DQNBalancer) IsFallbackActive() bool

func (*DQNBalancer) OnMetrics

func (d *DQNBalancer) OnMetrics(m broker.Metrics)

func (*DQNBalancer) OnPublishComplete

func (d *DQNBalancer) OnPublishComplete(partition int)

func (*DQNBalancer) SelectPartition

func (d *DQNBalancer) SelectPartition(topic string, key []byte, numPartitions int) int

func (*DQNBalancer) SetBaseThroughput

func (d *DQNBalancer) SetBaseThroughput(t float64)

func (*DQNBalancer) SetPredictedLoads

func (d *DQNBalancer) SetPredictedLoads(predicted []float64)

func (*DQNBalancer) Stop

func (d *DQNBalancer) Stop()

type DQNOption

type DQNOption func(*DQNBalancer)

DQNOption configures a DQNBalancer.

func WithDQNBatchSize

func WithDQNBatchSize(n int) DQNOption

func WithDQNEpsilon

func WithDQNEpsilon(e float64) DQNOption

func WithDQNFallbackRatio

func WithDQNFallbackRatio(r float64) DQNOption

func WithDQNGamma

func WithDQNGamma(g float64) DQNOption

func WithDQNHiddenSize

func WithDQNHiddenSize(n int) DQNOption

func WithDQNHuberDelta

func WithDQNHuberDelta(delta float64) DQNOption

func WithDQNLearningRate

func WithDQNLearningRate(lr float64) DQNOption

func WithDQNLoadThreshold

func WithDQNLoadThreshold(t float64) DQNOption

func WithDQNMinReplay

func WithDQNMinReplay(n int) DQNOption

func WithDQNReplayBufSize

func WithDQNReplayBufSize(n int) DQNOption

func WithDQNTrainEvery

func WithDQNTrainEvery(n int) DQNOption

type P2COption

type P2COption func(*PowerOfTwoChoices)

func WithP2CAlpha

func WithP2CAlpha(a float64) P2COption

func WithP2CDepthWeight

func WithP2CDepthWeight(w float64) P2COption

func WithP2CInflightWeight

func WithP2CInflightWeight(w float64) P2COption

func WithP2CLoadWeight

func WithP2CLoadWeight(w float64) P2COption

type PSA

type PSA struct {
	// contains filtered or unexported fields
}

func NewPSA

func NewPSA(numPartitions int) *PSA

func (*PSA) EvictionCount

func (p *PSA) EvictionCount() int64

func (*PSA) OnMetrics

func (p *PSA) OnMetrics(m broker.Metrics)

func (*PSA) SelectPartition

func (p *PSA) SelectPartition(_ string, key []byte, numPartitions int) int

type PowerOfTwoChoices

type PowerOfTwoChoices struct {
	// contains filtered or unexported fields
}

func NewPowerOfTwoChoices

func NewPowerOfTwoChoices(opts ...P2COption) *PowerOfTwoChoices

func (*PowerOfTwoChoices) OnMetrics

func (p *PowerOfTwoChoices) OnMetrics(m broker.Metrics)

func (*PowerOfTwoChoices) OnPublishComplete

func (p *PowerOfTwoChoices) OnPublishComplete(partition int)

func (*PowerOfTwoChoices) SelectPartition

func (p *PowerOfTwoChoices) SelectPartition(_ string, _ []byte, numPartitions int) int

type RROption

type RROption func(*RoundRobin)

func WithRRSkipBudget

func WithRRSkipBudget(b int) RROption

func WithRRSkipThreshold

func WithRRSkipThreshold(t float64) RROption

type RoundRobin

type RoundRobin struct {
	// contains filtered or unexported fields
}

func NewRoundRobin

func NewRoundRobin(opts ...RROption) *RoundRobin

func (*RoundRobin) OnMetrics

func (rr *RoundRobin) OnMetrics(m broker.Metrics)

func (*RoundRobin) SelectPartition

func (rr *RoundRobin) SelectPartition(_ string, _ []byte, numPartitions int) int

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL