Documentation
¶
Index ¶
Constants ¶
View Source
const LevelTrace = slog.Level(-8)
View Source
const MAX_CONCURRENCY = uint64(math.MaxInt32)
Variables ¶
View Source
var ErrAgentStopTimeout = errors.New("timeout waiting for queue agent to stop")
View Source
var ErrAgentStopped = errors.New("queue agent stopped")
Functions ¶
func Int64InSlice ¶
Types ¶
type AgentConfig ¶
type AgentConfig struct { WorkRunner queue.WorkRunner Queue queue.Queue ConcurrencyEnforcer *ConcurrencyEnforcer SupportedTypes queue.QueueSupportedTypes NotificationsChan <-chan listener.Notification NotifyTypeWorkComplete uint8 JobLifecycleWrapper metrics.JobLifecycleWrapper }
type ConcurrencyEnforcer ¶
type ConcurrencyEnforcer struct {
// contains filtered or unexported fields
}
func Concurrencies ¶
func Concurrencies(defaults, priorityMap map[int64]int64, priorities []int64) (*ConcurrencyEnforcer, error)
Returns a map of concurrency limits, where the key is the priority and the value is the concurrency limit for that priority.
func (*ConcurrencyEnforcer) Check ¶
func (c *ConcurrencyEnforcer) Check(jobCount int64) (bool, uint64)
Check if we have the capacity to run any jobs Parameters:
- jobCount - the number of running jobs
Returns:
- bool - True if we have capacity to take a job
- int64 - The maximum priority job we have capacity to run
func (*ConcurrencyEnforcer) SetConcurrencies ¶
func (c *ConcurrencyEnforcer) SetConcurrencies(defaults, priorityMap map[int64]int64, priorities []int64) error
func (*ConcurrencyEnforcer) Verify ¶
func (c *ConcurrencyEnforcer) Verify() error
Returns an error if any priority has a concurrency setting lower than the a low priority.
type DefaultAgent ¶
type DefaultAgent struct {
// contains filtered or unexported fields
}
func NewAgent ¶
func NewAgent(cfg AgentConfig) *DefaultAgent
func (*DefaultAgent) Run ¶
func (a *DefaultAgent) Run(notify agenttypes.Notify)
func (*DefaultAgent) Stop ¶
func (a *DefaultAgent) Stop(timeout time.Duration) error
Stop safely stops the agent.
func (*DefaultAgent) Wait ¶
func (a *DefaultAgent) Wait(runningJobs int64, jobDone chan int64) uint64
Wait blocks until there's capacity to run a new job. Parameters:
- runningJobs int64 - The initial number of running jobs
- jobDone chan int64 - Passes the new number of running jobs when any job completes
Returns:
- uint64 - The maximum job priority for which we have capacity
type Int64RevSlice ¶
type Int64RevSlice []int64
Int64RevSlice makes []int64 sortable with sort.Sort (reverse order)
func (Int64RevSlice) Len ¶
func (a Int64RevSlice) Len() int
func (Int64RevSlice) Less ¶
func (a Int64RevSlice) Less(i, j int) bool
func (Int64RevSlice) Sort ¶
func (p Int64RevSlice) Sort()
func (Int64RevSlice) Swap ¶
func (a Int64RevSlice) Swap(i, j int)
Click to show internal directories.
Click to hide internal directories.