pool

package
v2.0.0-alpha27 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2020 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const MB = 1024 * 1024
View Source
const StopRequest = "{\"stop\":true}"

StopRequest can be sent by worker to indicate that restart is required.

Variables

This section is empty.

Functions

func Initialize

func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Config, options ...Options) (pool.Pool, error)

Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.

Types

type Command

type Command func() *exec.Cmd

type Config

type Config struct {
	// Debug flag creates new fresh worker before every request.
	Debug bool

	// NumWorkers defines how many sub-processes can be run at once. This value
	// might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
	NumWorkers int64

	// MaxJobs defines how many executions is allowed for the worker until
	// it's destruction. set 1 to create new process for each new task, 0 to let
	// worker handle as many tasks as it can.
	MaxJobs int64

	// AllocateTimeout defines for how long pool will be waiting for a worker to
	// be freed to handle the task. Defaults to 60s.
	AllocateTimeout time.Duration

	// DestroyTimeout defines for how long pool should be waiting for worker to
	// properly destroy, if timeout reached worker will be killed. Defaults to 60s.
	DestroyTimeout time.Duration

	// Supervision config to limit worker and pool memory usage.
	Supervisor *SupervisorConfig
}

Configures the pool behaviour.

func (*Config) InitDefaults

func (cfg *Config) InitDefaults()

InitDefaults enables default config values.

type ErrorEncoder

type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)

ErrorEncoder encode error or make a decision based on the error type

type Options

type Options func(p *StaticPool)

func AddListeners

func AddListeners(listeners ...events.EventListener) Options

type StaticPool

type StaticPool struct {
	// contains filtered or unexported fields
}

StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.

func (*StaticPool) Destroy

func (sp *StaticPool) Destroy(ctx context.Context)

Destroy all underlying stack (but let them to complete the task).

func (*StaticPool) Exec

func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error)

func (*StaticPool) ExecWithContext

func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error)

func (*StaticPool) GetConfig

func (sp *StaticPool) GetConfig() interface{}

Config returns associated pool configuration. Immutable.

func (*StaticPool) RemoveWorker

func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error

func (*StaticPool) Workers

func (sp *StaticPool) Workers() (workers []worker.BaseProcess)

Workers returns worker list associated with the pool.

type Supervised

type Supervised interface {
	pool.Pool
	// Start used to start watching process for all pool workers
	Start()
}

type SupervisorConfig

type SupervisorConfig struct {
	// WatchTick defines how often to check the state of worker.
	WatchTick uint64

	// TTL defines maximum time worker is allowed to live.
	TTL uint64

	// IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
	IdleTTL uint64

	// ExecTTL defines maximum lifetime per job.
	ExecTTL uint64

	// MaxWorkerMemory limits memory per worker.
	MaxWorkerMemory uint64
}

func (*SupervisorConfig) InitDefaults

func (cfg *SupervisorConfig) InitDefaults()

InitDefaults enables default config values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL