scheduler

package
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 13, 2024 License: Apache-2.0 Imports: 23 Imported by: 1

Documentation

Overview

Package scheduler provides functionality for creating and starting new Scheduler.

Introduction

Scheduler type is the hearth of ppacer. Once initialized and started it does the following steps:

  • Setup internal caches and configuration (eg timezone).
  • Synchronize metadata on current dag.Registry with the database.
  • Setup new DagRunWatcher, to listen on schedules and starts new DAG runs.
  • Setup new TaskScheduler, to listen on new DAG runs and start scheduling its tasks.
  • Register Scheduler endpoints in form of http.Handler.

It's meant to have single instance of Scheduler per program. Types DagRunWatcher and TaskScheduler are public primarily due to exposing documentation and they usually shouldn't be accessed directly. It's enough, to have an instance of Scheduler.

Default Scheduler

The simplest way, to start new Scheduler is using DefaultStarted function. Let's take a look on the following example:

ctx := context.TODO()
dags := dag.Registry{} // add your dags or use existing Registry
schedulerHandler := DefaultStarted(ctx, dags, "scheduler.db", 9191)
lasErr := schedulerHandler.ListenAndServe()
if lasErr != nil {
	log.Panicf("Cannot start the server: %s\n", lasErr.Error())
}

Function DefaultStarted starts Scheduler with default configuration, including notifier in form of log ERR messages, SQLite database and default slog.Logger.

To get full example of ppacer "hello world", please refer to: https://ppacer.org/start/intro/

Index

Constants

View Source
const (
	// Timeout for HTTP request contexts.
	HTTPRequestContextTimeout = 30 * time.Second
)
View Source
const PPACER_ENV_LOG_LEVEL = "PPACER_LOG_LEVEL"

Name for environment variable for setting ppacer default logger severity level.

Variables

View Source
var (
	ErrDagRunIdIncorrect = errors.New("incorrect DAG run ID")
)

Functions

func DefaultStarted

func DefaultStarted(
	ctx context.Context, dags dag.Registry, dbFile, dbLogsFile string, port int,
) *http.Server

DefaultStarted creates default Scheduler using default configuration and SQLite databases, starts that scheduler and returns HTTP server with Scheduler endpoints. It's mainly to reduce boilerplate in simple examples and tests.

Types

type API added in v0.0.3

type API interface {
	GetTask() (api.TaskToExec, error)
	UpsertTaskStatus(api.TaskToExec, dag.TaskStatus, error) error
	GetState() (State, error)
	TriggerDagRun(api.DagRunTriggerInput) error
	RestartDagRun(api.DagRunRestartInput) error

	UIDagrunStats() (api.UIDagrunStats, error)
	UIDagrunLatest(int) (api.UIDagrunList, error)
	UIDagrunDetails(int) (api.UIDagrunDetails, error)
	UIDagrunTaskDetails(int, string, int) (api.UIDagrunTask, error)
}

API defines ppacer Scheduler API. Client implements this interface.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client provides API for interacting with Scheduler.

func NewClient

func NewClient(
	url string, httpClient *http.Client, logger *slog.Logger,
	config ClientConfig,
) *Client

NewClient instantiate new Client. In case when HTTP client or logger are nil, those would be initialized with default parameters.

func (*Client) GetState

func (c *Client) GetState() (State, error)

GetState gets the current Scheduler state.

func (*Client) GetTask

func (c *Client) GetTask() (api.TaskToExec, error)

GetTask gets new task from scheduler to be executed by executor.

func (*Client) RestartDagRun added in v0.0.11

func (c *Client) RestartDagRun(in api.DagRunRestartInput) error

RestartDagRun restarts given DAG run.

func (*Client) Stop

func (c *Client) Stop() error

Stop stops the Scheduler.

func (*Client) TriggerDagRun added in v0.0.10

func (c *Client) TriggerDagRun(in api.DagRunTriggerInput) error

TriggerDagRun schedules new DAG run.

func (*Client) UIDagrunDetails added in v0.0.7

func (c *Client) UIDagrunDetails(runId int) (api.UIDagrunDetails, error)

UIDagrunDetails provides detailed information on given DAG run, including task logs and task configuration.

func (*Client) UIDagrunLatest added in v0.0.3

func (c *Client) UIDagrunLatest(n int) (api.UIDagrunList, error)

UIDagrunLatest returns information on latest n DAG runs and its tasks completion.

func (*Client) UIDagrunStats added in v0.0.3

func (c *Client) UIDagrunStats() (api.UIDagrunStats, error)

UIDagrunStats returns the current statistics on DAG runs, its tasks and goroutines.

func (*Client) UIDagrunTaskDetails added in v0.0.9

func (c *Client) UIDagrunTaskDetails(runId int, taskId string, retry int) (api.UIDagrunTask, error)

UIDagrunTaskLogs returns all task logs for given DAG run and given task ID.

func (*Client) UpsertTaskStatus

func (c *Client) UpsertTaskStatus(
	tte api.TaskToExec, status dag.TaskStatus, taskErr error,
) error

UpsertTaskStatus either updates existing DAG run task status or inserts new one.

type ClientConfig

type ClientConfig struct {
	// HTTP client timeout value in case when http.Client needs to be
	// initialized within NewClient.
	HttpClientTimeout time.Duration
}
var DefaultClientConfig ClientConfig = ClientConfig{
	HttpClientTimeout: 15 * time.Second,
}

Default Client configuration.

type Config

type Config struct {
	// DAG runs queue capacity. DAG run queue is preallocated on Scheduler
	// startup.
	DagRunQueueLen int

	// DAG run tasks queue capacity. DAG run tasks queue is preallocated on
	// Scheduler startup.
	DagRunTaskQueueLen int

	// DAG run tasks cache capacity.
	DagRunTaskCacheLen int

	// Startup timeout duration. When scheduler call Start, it synchronize with
	// the database and possibly other resources. This duration interval is
	// setup in Start context.
	StartupContextTimeout time.Duration

	// Configuration for taskScheduler.
	TaskSchedulerConfig TaskSchedulerConfig

	// Configuration for dagRunWatcher
	DagRunWatcherConfig DagRunWatcherConfig

	// Timezone name consistent with IANA Time Zone Database (e.g.
	// "Europe/Warsaw" or "America/New_York").
	TimezoneName string
}

Config represents main configuration for the Scheduler.

var DefaultConfig Config = Config{
	DagRunQueueLen:        100,
	DagRunTaskQueueLen:    1000,
	DagRunTaskCacheLen:    1000,
	StartupContextTimeout: 30 * time.Second,
	TaskSchedulerConfig:   DefaultTaskSchedulerConfig,
	DagRunWatcherConfig:   DefaultDagRunWatcherConfig,
	TimezoneName:          "Local",
}

Default Scheduler configuration.

type DRTBase

type DRTBase struct {
	DagId  dag.Id
	AtTime string
	TaskId string
}

DRTBase represents base information regarding identification a DAG run task. It meant to be used in caching where we care about the latest status for the DAG run task and doesn't need to store info for all retries.

type DagRun

type DagRun struct {
	DagId       dag.Id
	AtTime      time.Time
	IsRestarted bool
}

DagRun represents single DAG run. AtTime is more of scheduling time rather then time of actually pushing it onto DAG run queue.

type DagRunTask

type DagRunTask struct {
	DagId  dag.Id    `json:"dagId"`
	AtTime time.Time `json:"execTs"`
	TaskId string    `json:"taskId"`
	Retry  int       `json:"retry"`
}

DagRunTask represents an identifier for a single DAG run task which shall be scheduled.

type DagRunTaskState

type DagRunTaskState struct {
	Status         dag.TaskStatus
	StatusUpdateTs time.Time
}

DagRunTaskState represents DagRunTask state.

type DagRunWatcher

type DagRunWatcher struct {
	// contains filtered or unexported fields
}

DagRunWatcher watches on given list of DAGs and sends information about new DAG runs onto the internal queue when it's time for a new DAG run to be scheduled. Next DAG run schedule for given DAG is determined based on its schedule. It also synchronize information about DAG run with the database.

If you use Scheduler, you probably don't need to use this object directly.

func NewDagRunWatcher

func NewDagRunWatcher(
	queue ds.Queue[DagRun],
	dbClient *db.Client,
	logger *slog.Logger,
	stateFunc GetStateFunc,
	config DagRunWatcherConfig,
) *DagRunWatcher

NewDagRunWatcher creates new instance of DagRunWatcher. Argument stateFunc is usually passed by the main Scheduler, to give access to its state. In case when nil is provided as logger, then slog.Logger is instantiated with TextHandler and INFO severity level.

func (*DagRunWatcher) Watch

func (drw *DagRunWatcher) Watch(ctx context.Context, dags dag.Registry)

Watch watches on given list of DAGs and try to schedules new DAG runs onto internal queue. Watch tries to do it each WatchInterval. In case when DAG run queue is full and new DAG runs cannot be pushed there Watch waits for DagRunWatcherConfig.QueueIsFullInterval before the next try. Even when the queue would be full for longer then an interval between two next DAG runs, those DAG runs won't be skipped. They will be scheduled in expected order but possibly a bit later.

type DagRunWatcherConfig

type DagRunWatcherConfig struct {
	// DagRunWatcher waits WatchInterval before another try of scheduling DAG
	// runs.
	WatchInterval time.Duration

	// DagRunWatcher would wait for QueueIsFullInterval in case when DAG run
	// queue is full, before it would try again.
	QueueIsFullInterval time.Duration

	// Duration for database context timeout for queries done by DagRunWatcher.
	DatabaseContextTimeout time.Duration
}

Configuration for DagRunWatcher which is responsible for scheduling new DAG runs based on their schedule.

var DefaultDagRunWatcherConfig DagRunWatcherConfig = DagRunWatcherConfig{
	WatchInterval:          100 * time.Millisecond,
	QueueIsFullInterval:    100 * time.Millisecond,
	DatabaseContextTimeout: 10 * time.Second,
}

Default DagRunWatcher configuration.

type GetStateFunc

type GetStateFunc func() State

Signature for function which return current Scheduler state.

type Queues

type Queues struct {
	DagRuns     ds.Queue[DagRun]
	DagRunTasks ds.Queue[DagRunTask]
}

Queues contains queues internally needed by the Scheduler. It's exposed publicly, because those queues are of type ds.Queue which is a generic interface. This way one can link external queues like AWS SQS or others to be used internally by the Scheduler.

func DefaultQueues

func DefaultQueues(config Config) Queues

Returns default instance of Queues which uses ds.SimpleQueue - fixed size buffer queues. Size of buffers are based on Config.

type Scheduler

type Scheduler struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Scheduler is the title object of this package, it connects all other components together. There should be single instance of a scheduler in single project - currently model of 1 scheduler and N executors is supported.

func New

func New(
	dbClient *db.Client,
	taskLogs tasklog.Factory,
	queues Queues,
	config Config,
	logger *slog.Logger,
	notifier notify.Sender,
) *Scheduler

New returns new instance of Scheduler. Scheduler needs database client, queues for asynchronous communication and configuration. Database clients are by default available in db package (e.g. db.NewSqliteClient). Default configuration is set in DefaultConfig and default fixed-size buffer queues in DefaultQueues. In case when nil is provided as logger, then slog.Logger is instantiated with TextHandler and INFO severity level (unless PPACER_LOG_LEVEL env viariable is set).

func (*Scheduler) Goroutines

func (s *Scheduler) Goroutines() int

Gets current number of goroutines spawned by (Task)Scheduler. It excludes long-running goroutines spawned by Scheduler.Start and focuses on goroutines from TaskScheduler where almost the all pressure is.

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context, dags dag.Registry) http.Handler

Start starts Scheduler. It synchronize internal queues with the database, fires up DAG watcher, task scheduler and finally returns HTTP ServeMux with attached HTTP endpoints for communication between scheduler and executors. TODO(dskrzypiec): more docs

type State

type State int

State for representing current Scheduler state.

const (
	StateStarted State = iota
	StateSynchronizing
	StateRunning
	StateStopping
	StateStopped
)

func ParseState

func ParseState(s string) (State, error)

ParseState parses State based on given string. If that string does not match any State, then non-nil error is returned. State strings are case-sensitive.

func (State) String

func (s State) String() string

String serializes State to its upper case string.

type TaskScheduler

type TaskScheduler struct {
	// contains filtered or unexported fields
}

TaskScheduler is responsible for scheduling tasks for a single DagRun. When new DagRun is scheduled (by DagRunWatcher) it should appear on DagRunQueue, then TaskScheduler pick it up and start scheduling DAG tasks for that DagRun in a separate goroutine.

If you use Scheduler, you probably don't need to use this object directly.

func NewTaskScheduler

func NewTaskScheduler(
	dags dag.Registry,
	db *db.Client,
	queues Queues,
	cache ds.Cache[DRTBase, DagRunTaskState],
	config TaskSchedulerConfig,
	logger *slog.Logger,
	notifier notify.Sender,
	goroutineCountInit int,
	schedulerStateFunc GetStateFunc,
) *TaskScheduler

NewTaskScheduler initialize new instance of *TaskScheduler.

func (*TaskScheduler) Start

func (ts *TaskScheduler) Start(ctx context.Context, dags dag.Registry)

Start starts TaskScheduler main loop. It check if there are new DagRuns on the DagRunQueue and if so, it spins up DAG tasks scheduling for that DagRun in a separate goroutine. If DagRunQueue is empty at the moment, then main loop waits Config.HeartbeatMs milliseconds before the next try.

func (*TaskScheduler) UpsertTaskStatus

func (ts *TaskScheduler) UpsertTaskStatus(
	ctx context.Context,
	drt DagRunTask,
	status dag.TaskStatus,
	taskErrStr *string,
) error

UpsertTaskStatus inserts or updates given DAG run task status. That includes caches, queues, database and every place that needs to be included regarding task status update.

type TaskSchedulerConfig

type TaskSchedulerConfig struct {
	// How long TaskScheduler should wait in case when DAG run queue is empty.
	Heartbeat time.Duration

	// How often TaskScheduler should check if all dependencies are met before
	// scheduling new task. Expressed in milliseconds.
	CheckDependenciesStatusWait time.Duration

	// How long TaskScheduler should try to put new DAG run task onto the task
	// queue.
	PutOnTaskQueueTimeout time.Duration

	// Max number of goroutines spawned by Scheduler.
	MaxGoroutineCount int
}

Configuration for TaskScheduler which is responsible for scheduling tasks for particular DAG run.

var DefaultTaskSchedulerConfig TaskSchedulerConfig = TaskSchedulerConfig{
	Heartbeat:                   1 * time.Millisecond,
	CheckDependenciesStatusWait: 1 * time.Millisecond,
	PutOnTaskQueueTimeout:       30 * time.Second,
	MaxGoroutineCount:           10000,
}

Default taskScheduler configuration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL