exec

package
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 13, 2024 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Overview

Package exec defines ppacer Executor and related functionalities.

Executor on start up (Start method) spins endless loop in which it polls Scheduler (via HTTP), to check if there are new tasks, to be executed. Polling is performed according to provided strategy (pace.Strategy). New tasks are executed in separate goroutines. When executed task receive runtime error, Executor will recover and will mark task execution as failed.

Executor might be used in the same program as Scheduler (in a separate goroutine) or can be put in a separate binary. Also there can be many executors, potentially on multiple machines.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	HttpRequestTimeout time.Duration
	MaxGoroutineCount  int64
}

Executor configuration.

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor executes DAG run tasks, each in a separate goroutine. When Start method is called Executor starts polling ppacer scheduler for new DAG run tasks to be run.

func New

func New(
	schedAddr string,
	logDbClient *db.LogsClient,
	polling pace.Strategy,
	logger *slog.Logger,
	config *Config,
	notifier notify.Sender,
) *Executor

New creates new Executor instance. When polling strategy is nil, then linear backoff (min=1ms, max=1s, step=10ms, repeat=10) will be sued. When config is nil, then default configuration values will be used. When logger is nil, then slog for stdout with WARN severity level will be used. When notifier is nil, then notify.NewLogsErr (notifications as logs) will be used.

func NewDefault

func NewDefault(schedulerUrl, taskLogsDbFile string) *Executor

NewDefault creates new Executor using SQLite for task logs and default configuration for Executor. It's mainly to reduce boilerplate in simple examples and tests.

func (*Executor) Start

func (e *Executor) Start(dags dag.Registry)

Start starts executor main loop. It polls, according to the polling strategy, Scheduler for new tasks to be executed. Tasks are executed in separate goroutines. There's a limit for number of goroutines running at the same time (default is 1000). If that limit is hit, Executor would wait with starting execution new task, until number of currently running goroutines would go below the limit.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL