Documentation ¶
Overview ¶
Package scheduler provides functionality for creating and starting new Scheduler.
Introduction ¶
Scheduler type is the hearth of ppacer. Once initialized and started it does the following steps:
- Setup internal caches and configuration (eg timezone).
- Synchronize metadata on current dag.Registry with the database.
- Setup new DagRunWatcher, to listen on schedules and starts new DAG runs.
- Setup new TaskScheduler, to listen on new DAG runs and start scheduling its tasks.
- Register Scheduler endpoints in form of http.Handler.
It's meant to have single instance of Scheduler per program. Types DagRunWatcher and TaskScheduler are public primarily due to exposing documentation and they usually shouldn't be accessed directly. It's enough, to have an instance of Scheduler.
Default Scheduler ¶
The simplest way, to start new Scheduler is using DefaultStarted function. Let's take a look on the following example:
ctx := context.TODO() dags := dag.Registry{} // add your dags or use existing Registry schedulerHandler := DefaultStarted(ctx, dags, "scheduler.db", 9191) lasErr := schedulerHandler.ListenAndServe() if lasErr != nil { log.Panicf("Cannot start the server: %s\n", lasErr.Error()) }
Function DefaultStarted starts Scheduler with default configuration, including notifier in form of log ERR messages, SQLite database and default slog.Logger.
To get full example of ppacer "hello world", please refer to: https://ppacer.org/start/intro/
Index ¶
- Constants
- Variables
- func DefaultStarted(ctx context.Context, dags dag.Registry, dbFile, dbLogsFile string, port int) *http.Server
- type API
- type Client
- func (c *Client) GetState() (State, error)
- func (c *Client) GetTask() (api.TaskToExec, error)
- func (c *Client) RestartDagRun(in api.DagRunRestartInput) error
- func (c *Client) Stop() error
- func (c *Client) TriggerDagRun(in api.DagRunTriggerInput) error
- func (c *Client) UIDagrunDetails(runId int) (api.UIDagrunDetails, error)
- func (c *Client) UIDagrunLatest(n int) (api.UIDagrunList, error)
- func (c *Client) UIDagrunStats() (api.UIDagrunStats, error)
- func (c *Client) UIDagrunTaskDetails(runId int, taskId string, retry int) (api.UIDagrunTask, error)
- func (c *Client) UpsertTaskStatus(tte api.TaskToExec, status dag.TaskStatus, taskErr error) error
- type ClientConfig
- type Config
- type DRTBase
- type DagRun
- type DagRunTask
- type DagRunTaskState
- type DagRunWatcher
- type DagRunWatcherConfig
- type GetStateFunc
- type Queues
- type Scheduler
- type State
- type TaskScheduler
- type TaskSchedulerConfig
Constants ¶
const ( // Timeout for HTTP request contexts. HTTPRequestContextTimeout = 30 * time.Second )
const PPACER_ENV_LOG_LEVEL = "PPACER_LOG_LEVEL"
Name for environment variable for setting ppacer default logger severity level.
Variables ¶
var (
ErrDagRunIdIncorrect = errors.New("incorrect DAG run ID")
)
Functions ¶
func DefaultStarted ¶
func DefaultStarted( ctx context.Context, dags dag.Registry, dbFile, dbLogsFile string, port int, ) *http.Server
DefaultStarted creates default Scheduler using default configuration and SQLite databases, starts that scheduler and returns HTTP server with Scheduler endpoints. It's mainly to reduce boilerplate in simple examples and tests.
Types ¶
type API ¶ added in v0.0.3
type API interface { GetTask() (api.TaskToExec, error) UpsertTaskStatus(api.TaskToExec, dag.TaskStatus, error) error GetState() (State, error) TriggerDagRun(api.DagRunTriggerInput) error RestartDagRun(api.DagRunRestartInput) error UIDagrunStats() (api.UIDagrunStats, error) UIDagrunLatest(int) (api.UIDagrunList, error) UIDagrunDetails(int) (api.UIDagrunDetails, error) UIDagrunTaskDetails(int, string, int) (api.UIDagrunTask, error) }
API defines ppacer Scheduler API. Client implements this interface.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client provides API for interacting with Scheduler.
func NewClient ¶
func NewClient( url string, httpClient *http.Client, logger *slog.Logger, config ClientConfig, ) *Client
NewClient instantiate new Client. In case when HTTP client or logger are nil, those would be initialized with default parameters.
func (*Client) GetTask ¶
func (c *Client) GetTask() (api.TaskToExec, error)
GetTask gets new task from scheduler to be executed by executor.
func (*Client) RestartDagRun ¶ added in v0.0.11
func (c *Client) RestartDagRun(in api.DagRunRestartInput) error
RestartDagRun restarts given DAG run.
func (*Client) TriggerDagRun ¶ added in v0.0.10
func (c *Client) TriggerDagRun(in api.DagRunTriggerInput) error
TriggerDagRun schedules new DAG run.
func (*Client) UIDagrunDetails ¶ added in v0.0.7
func (c *Client) UIDagrunDetails(runId int) (api.UIDagrunDetails, error)
UIDagrunDetails provides detailed information on given DAG run, including task logs and task configuration.
func (*Client) UIDagrunLatest ¶ added in v0.0.3
func (c *Client) UIDagrunLatest(n int) (api.UIDagrunList, error)
UIDagrunLatest returns information on latest n DAG runs and its tasks completion.
func (*Client) UIDagrunStats ¶ added in v0.0.3
func (c *Client) UIDagrunStats() (api.UIDagrunStats, error)
UIDagrunStats returns the current statistics on DAG runs, its tasks and goroutines.
func (*Client) UIDagrunTaskDetails ¶ added in v0.0.9
UIDagrunTaskLogs returns all task logs for given DAG run and given task ID.
func (*Client) UpsertTaskStatus ¶
func (c *Client) UpsertTaskStatus( tte api.TaskToExec, status dag.TaskStatus, taskErr error, ) error
UpsertTaskStatus either updates existing DAG run task status or inserts new one.
type ClientConfig ¶
type ClientConfig struct { // HTTP client timeout value in case when http.Client needs to be // initialized within NewClient. HttpClientTimeout time.Duration }
var DefaultClientConfig ClientConfig = ClientConfig{ HttpClientTimeout: 15 * time.Second, }
Default Client configuration.
type Config ¶
type Config struct { // DAG runs queue capacity. DAG run queue is preallocated on Scheduler // startup. DagRunQueueLen int // DAG run tasks queue capacity. DAG run tasks queue is preallocated on // Scheduler startup. DagRunTaskQueueLen int // DAG run tasks cache capacity. DagRunTaskCacheLen int // Startup timeout duration. When scheduler call Start, it synchronize with // the database and possibly other resources. This duration interval is // setup in Start context. StartupContextTimeout time.Duration // Configuration for taskScheduler. TaskSchedulerConfig TaskSchedulerConfig // Configuration for dagRunWatcher DagRunWatcherConfig DagRunWatcherConfig // Timezone name consistent with IANA Time Zone Database (e.g. // "Europe/Warsaw" or "America/New_York"). TimezoneName string }
Config represents main configuration for the Scheduler.
var DefaultConfig Config = Config{ DagRunQueueLen: 100, DagRunTaskQueueLen: 1000, DagRunTaskCacheLen: 1000, StartupContextTimeout: 30 * time.Second, TaskSchedulerConfig: DefaultTaskSchedulerConfig, DagRunWatcherConfig: DefaultDagRunWatcherConfig, TimezoneName: "Local", }
Default Scheduler configuration.
type DRTBase ¶
DRTBase represents base information regarding identification a DAG run task. It meant to be used in caching where we care about the latest status for the DAG run task and doesn't need to store info for all retries.
type DagRun ¶
DagRun represents single DAG run. AtTime is more of scheduling time rather then time of actually pushing it onto DAG run queue.
type DagRunTask ¶
type DagRunTask struct { DagId dag.Id `json:"dagId"` AtTime time.Time `json:"execTs"` TaskId string `json:"taskId"` Retry int `json:"retry"` }
DagRunTask represents an identifier for a single DAG run task which shall be scheduled.
type DagRunTaskState ¶
type DagRunTaskState struct { Status dag.TaskStatus StatusUpdateTs time.Time }
DagRunTaskState represents DagRunTask state.
type DagRunWatcher ¶
type DagRunWatcher struct {
// contains filtered or unexported fields
}
DagRunWatcher watches on given list of DAGs and sends information about new DAG runs onto the internal queue when it's time for a new DAG run to be scheduled. Next DAG run schedule for given DAG is determined based on its schedule. It also synchronize information about DAG run with the database.
If you use Scheduler, you probably don't need to use this object directly.
func NewDagRunWatcher ¶
func NewDagRunWatcher( queue ds.Queue[DagRun], dbClient *db.Client, logger *slog.Logger, stateFunc GetStateFunc, config DagRunWatcherConfig, ) *DagRunWatcher
NewDagRunWatcher creates new instance of DagRunWatcher. Argument stateFunc is usually passed by the main Scheduler, to give access to its state. In case when nil is provided as logger, then slog.Logger is instantiated with TextHandler and INFO severity level.
func (*DagRunWatcher) Watch ¶
func (drw *DagRunWatcher) Watch(ctx context.Context, dags dag.Registry)
Watch watches on given list of DAGs and try to schedules new DAG runs onto internal queue. Watch tries to do it each WatchInterval. In case when DAG run queue is full and new DAG runs cannot be pushed there Watch waits for DagRunWatcherConfig.QueueIsFullInterval before the next try. Even when the queue would be full for longer then an interval between two next DAG runs, those DAG runs won't be skipped. They will be scheduled in expected order but possibly a bit later.
type DagRunWatcherConfig ¶
type DagRunWatcherConfig struct { // DagRunWatcher waits WatchInterval before another try of scheduling DAG // runs. WatchInterval time.Duration // DagRunWatcher would wait for QueueIsFullInterval in case when DAG run // queue is full, before it would try again. QueueIsFullInterval time.Duration // Duration for database context timeout for queries done by DagRunWatcher. DatabaseContextTimeout time.Duration }
Configuration for DagRunWatcher which is responsible for scheduling new DAG runs based on their schedule.
var DefaultDagRunWatcherConfig DagRunWatcherConfig = DagRunWatcherConfig{ WatchInterval: 100 * time.Millisecond, QueueIsFullInterval: 100 * time.Millisecond, DatabaseContextTimeout: 10 * time.Second, }
Default DagRunWatcher configuration.
type GetStateFunc ¶
type GetStateFunc func() State
Signature for function which return current Scheduler state.
type Queues ¶
Queues contains queues internally needed by the Scheduler. It's exposed publicly, because those queues are of type ds.Queue which is a generic interface. This way one can link external queues like AWS SQS or others to be used internally by the Scheduler.
func DefaultQueues ¶
Returns default instance of Queues which uses ds.SimpleQueue - fixed size buffer queues. Size of buffers are based on Config.
type Scheduler ¶
Scheduler is the title object of this package, it connects all other components together. There should be single instance of a scheduler in single project - currently model of 1 scheduler and N executors is supported.
func New ¶
func New( dbClient *db.Client, taskLogs tasklog.Factory, queues Queues, config Config, logger *slog.Logger, notifier notify.Sender, ) *Scheduler
New returns new instance of Scheduler. Scheduler needs database client, queues for asynchronous communication and configuration. Database clients are by default available in db package (e.g. db.NewSqliteClient). Default configuration is set in DefaultConfig and default fixed-size buffer queues in DefaultQueues. In case when nil is provided as logger, then slog.Logger is instantiated with TextHandler and INFO severity level (unless PPACER_LOG_LEVEL env viariable is set).
func (*Scheduler) Goroutines ¶
Gets current number of goroutines spawned by (Task)Scheduler. It excludes long-running goroutines spawned by Scheduler.Start and focuses on goroutines from TaskScheduler where almost the all pressure is.
type State ¶
type State int
State for representing current Scheduler state.
func ParseState ¶
ParseState parses State based on given string. If that string does not match any State, then non-nil error is returned. State strings are case-sensitive.
type TaskScheduler ¶
type TaskScheduler struct {
// contains filtered or unexported fields
}
TaskScheduler is responsible for scheduling tasks for a single DagRun. When new DagRun is scheduled (by DagRunWatcher) it should appear on DagRunQueue, then TaskScheduler pick it up and start scheduling DAG tasks for that DagRun in a separate goroutine.
If you use Scheduler, you probably don't need to use this object directly.
func NewTaskScheduler ¶
func NewTaskScheduler( dags dag.Registry, db *db.Client, queues Queues, cache ds.Cache[DRTBase, DagRunTaskState], config TaskSchedulerConfig, logger *slog.Logger, notifier notify.Sender, goroutineCountInit int, schedulerStateFunc GetStateFunc, ) *TaskScheduler
NewTaskScheduler initialize new instance of *TaskScheduler.
func (*TaskScheduler) Start ¶
func (ts *TaskScheduler) Start(ctx context.Context, dags dag.Registry)
Start starts TaskScheduler main loop. It check if there are new DagRuns on the DagRunQueue and if so, it spins up DAG tasks scheduling for that DagRun in a separate goroutine. If DagRunQueue is empty at the moment, then main loop waits Config.HeartbeatMs milliseconds before the next try.
func (*TaskScheduler) UpsertTaskStatus ¶
func (ts *TaskScheduler) UpsertTaskStatus( ctx context.Context, drt DagRunTask, status dag.TaskStatus, taskErrStr *string, ) error
UpsertTaskStatus inserts or updates given DAG run task status. That includes caches, queues, database and every place that needs to be included regarding task status update.
type TaskSchedulerConfig ¶
type TaskSchedulerConfig struct { // How long TaskScheduler should wait in case when DAG run queue is empty. Heartbeat time.Duration // How often TaskScheduler should check if all dependencies are met before // scheduling new task. Expressed in milliseconds. CheckDependenciesStatusWait time.Duration // How long TaskScheduler should try to put new DAG run task onto the task // queue. PutOnTaskQueueTimeout time.Duration // Max number of goroutines spawned by Scheduler. MaxGoroutineCount int }
Configuration for TaskScheduler which is responsible for scheduling tasks for particular DAG run.
var DefaultTaskSchedulerConfig TaskSchedulerConfig = TaskSchedulerConfig{ Heartbeat: 1 * time.Millisecond, CheckDependenciesStatusWait: 1 * time.Millisecond, PutOnTaskQueueTimeout: 30 * time.Second, MaxGoroutineCount: 10000, }
Default taskScheduler configuration.