agent

package
v2.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2024 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const LevelTrace = slog.Level(-8)
View Source
const MAX_CONCURRENCY = uint64(math.MaxInt32)

Variables

View Source
var ErrAgentStopTimeout = errors.New("timeout waiting for queue agent to stop")
View Source
var ErrAgentStopped = errors.New("queue agent stopped")

Functions

func Int64InSlice

func Int64InSlice(needle int64, haystack []int64) bool

Types

type AgentConfig

type AgentConfig struct {
	WorkRunner             queue.WorkRunner
	Queue                  queue.Queue
	ConcurrencyEnforcer    *ConcurrencyEnforcer
	SupportedTypes         queue.QueueSupportedTypes
	NotificationsChan      <-chan listener.Notification
	NotifyTypeWorkComplete uint8
	JobLifecycleWrapper    metrics.JobLifecycleWrapper
}

type ConcurrencyEnforcer

type ConcurrencyEnforcer struct {
	// contains filtered or unexported fields
}

func Concurrencies

func Concurrencies(defaults, priorityMap map[int64]int64, priorities []int64) (*ConcurrencyEnforcer, error)

Returns a map of concurrency limits, where the key is the priority and the value is the concurrency limit for that priority.

func (*ConcurrencyEnforcer) Check

func (c *ConcurrencyEnforcer) Check(jobCount int64) (bool, uint64)

Check if we have the capacity to run any jobs Parameters:

  • jobCount - the number of running jobs

Returns:

  • bool - True if we have capacity to take a job
  • int64 - The maximum priority job we have capacity to run

func (*ConcurrencyEnforcer) SetConcurrencies

func (c *ConcurrencyEnforcer) SetConcurrencies(defaults, priorityMap map[int64]int64, priorities []int64) error

func (*ConcurrencyEnforcer) Verify

func (c *ConcurrencyEnforcer) Verify() error

Returns an error if any priority has a concurrency setting lower than the a low priority.

type DefaultAgent

type DefaultAgent struct {
	// contains filtered or unexported fields
}

func NewAgent

func NewAgent(cfg AgentConfig) *DefaultAgent

func (*DefaultAgent) Run

func (a *DefaultAgent) Run(notify agenttypes.Notify)

func (*DefaultAgent) Stop

func (a *DefaultAgent) Stop(timeout time.Duration) error

Stop safely stops the agent.

func (*DefaultAgent) Wait

func (a *DefaultAgent) Wait(runningJobs int64, jobDone chan int64) uint64

Wait blocks until there's capacity to run a new job. Parameters:

  • runningJobs int64 - The initial number of running jobs
  • jobDone chan int64 - Passes the new number of running jobs when any job completes

Returns:

  • uint64 - The maximum job priority for which we have capacity

type FakeAgent

type FakeAgent struct {
	StopErr    error
	WaitResult uint64
}

func (*FakeAgent) Run

func (a *FakeAgent) Run(notify agenttypes.Notify)

func (*FakeAgent) Stop

func (a *FakeAgent) Stop(timeout time.Duration) error

func (*FakeAgent) Wait

func (a *FakeAgent) Wait(runningJobs int64, jobDone chan int64) uint64

type Int64RevSlice

type Int64RevSlice []int64

Int64RevSlice makes []int64 sortable with sort.Sort (reverse order)

func (Int64RevSlice) Len

func (a Int64RevSlice) Len() int

func (Int64RevSlice) Less

func (a Int64RevSlice) Less(i, j int) bool

func (Int64RevSlice) Sort

func (p Int64RevSlice) Sort()

func (Int64RevSlice) Swap

func (a Int64RevSlice) Swap(i, j int)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL