scheduler

package
v0.0.0-...-56fe054 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: AGPL-3.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultMaxBackoff is the default maximum backoff duration for workers.
	DefaultMaxBackoff = 1 * time.Second
	// DefaultMinBackoff is the default minimum backoff duration for workers.
	DefaultMinBackoff = 100 * time.Millisecond
	// DefaultNumWorkers is the default number of workers in the scheduler.
	DefaultNumWorkers = 4
	// DefaultMaxRetries is the default maximum number of retries for dequeue operations.
	DefaultMaxRetries = 5
)
View Source
const (
	// DefaultMaxSizePerTenant is the default maximum number of items per tenant in the queue.
	DefaultMaxSizePerTenant = 100
)

Variables

View Source
var ErrMissingTenantID = errors.New("item requires TenantID")
View Source
var ErrNilRunnable = errors.New("cannot enqueue nil runnable")
View Source
var ErrQueueClosed = errors.New("queue closed")
View Source
var ErrTenantQueueFull = errors.New("tenant queue full")

Functions

This section is empty.

Types

type Config

type Config struct {
	NumWorkers int
	MaxBackoff time.Duration
	MaxRetries int
	Logger     log.Logger
}

Config holds configuration for the Scheduler.

type NoopQueue

type NoopQueue struct{}

func NewNoopQueue

func NewNoopQueue() *NoopQueue

func (*NoopQueue) Enqueue

func (*NoopQueue) Enqueue(ctx context.Context, _ string, runnable func()) error

type Queue

type Queue struct {
	services.Service
	// contains filtered or unexported fields
}

Queue implements a multi-tenant qos with round-robin fairness using a dispatcher goroutine.

func NewQueue

func NewQueue(opts *QueueOptions) *Queue

NewQueue creates a new Queue and starts its dispatcher goroutine.

func (*Queue) ActiveTenantsLen

func (q *Queue) ActiveTenantsLen() int

ActiveTenantsLen returns the number of tenants with items currently in the queue.

func (*Queue) Dequeue

func (q *Queue) Dequeue(ctx context.Context) (func(), error)

Dequeue removes and returns a work item from the qos using linked-list round-robin. It blocks until an item is available for any tenant, the queue is closed, or the context is cancelled.

func (*Queue) Enqueue

func (q *Queue) Enqueue(ctx context.Context, tenantID string, runnable func()) error

Enqueue adds a work item to the appropriate tenant's qos. It blocks only if the dispatcher is busy or the tenant queue is full.

func (*Queue) Len

func (q *Queue) Len() int

Len returns the total number of items across all tenants in the queue.

type QueueOptions

type QueueOptions struct {
	MaxSizePerTenant int
	Registerer       prometheus.Registerer
	Logger           log.Logger
}

type Scheduler

type Scheduler struct {
	services.Service
	// contains filtered or unexported fields
}

Scheduler manages a pool of Workers consuming from a Queue.

func NewScheduler

func NewScheduler(queue WorkQueue, config *Config) (*Scheduler, error)

NewScheduler creates a new scheduler instance.

type WorkQueue

type WorkQueue interface {
	services.Service

	Dequeue(ctx context.Context) (runnable func(), err error)
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker processes items from the QoS request queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL