airflow

package
v0.21.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2025 License: Apache-2.0 Imports: 28 Imported by: 0

README

Airflow v2.x

Currently, allows configuring dags to be loaded from

  • GCS bucket
  • Local filesystem
  • inmemory

Implementation already supports variety of other systems, just need to configure them.

For using a fs that needs auth, it is required to create a project secret with STORAGE as key and base64 encoded service account/token as value.

Optimus also provides api to get currently running job status using airflow APIs. For this to work, it is required to register a secret with SCHEDULER_AUTH as key and base64 encoded username:password as token. This assumes airflow is configured to use basic auth on api by default.

Documentation

Index

Constants

View Source
const (
	EntityAirflow = "Airflow"
)

Variables

View Source
var SharedLib []byte

Functions

This section is empty.

Types

type Action added in v0.19.0

type Action struct {
	Action   ActionDetail   `json:"action"`
	Resource ResourceDetail `json:"resource"`
}

func (Action) String added in v0.19.0

func (a Action) String() string

type ActionDetail added in v0.19.0

type ActionDetail struct {
	Name string `json:"name"`
}

type Bucket

type Bucket interface {
	WriteAll(ctx context.Context, key string, p []byte, opts *blob.WriterOptions) error
	List(opts *blob.ListOptions) *blob.ListIterator
	Delete(ctx context.Context, key string) error
	Close() error
}

type BucketFactory

type BucketFactory interface {
	New(ctx context.Context, tenant tenant.Tenant) (Bucket, error)
}

type Client

type Client interface {
	Invoke(ctx context.Context, r airflowRequest, auth SchedulerAuth) ([]byte, error)
}

type ClientAirflow

type ClientAirflow struct {
	// contains filtered or unexported fields
}

func NewAirflowClient

func NewAirflowClient() *ClientAirflow

func (ClientAirflow) Invoke

func (ac ClientAirflow) Invoke(ctx context.Context, r airflowRequest, auth SchedulerAuth) ([]byte, error)

type DAGObj added in v0.19.0

type DAGObj struct {
	DAGDisplayName              string   `json:"dag_display_name"`
	DAGID                       string   `json:"dag_id"`
	DefaultView                 string   `json:"default_view"`
	Description                 *string  `json:"description"`
	FileToken                   string   `json:"file_token"`
	Fileloc                     string   `json:"fileloc"`
	HasImportErrors             bool     `json:"has_import_errors"`
	HasTaskConcurrencyLimits    bool     `json:"has_task_concurrency_limits"`
	IsActive                    bool     `json:"is_active"`
	IsPaused                    bool     `json:"is_paused"`
	IsSubdag                    bool     `json:"is_subdag"`
	LastExpired                 *string  `json:"last_expired"`
	LastParsedTime              string   `json:"last_parsed_time"`
	LastPickled                 *string  `json:"last_pickled"`
	MaxActiveRuns               int      `json:"max_active_runs"`
	MaxActiveTasks              int      `json:"max_active_tasks"`
	MaxConsecutiveFailedDAGRuns int      `json:"max_consecutive_failed_dag_runs"`
	NextDagRun                  string   `json:"next_dagrun"`
	NextDagRunCreateAfter       string   `json:"next_dagrun_create_after"`
	NextDagRunDataIntervalEnd   string   `json:"next_dagrun_data_interval_end"`
	NextDagRunDataIntervalStart string   `json:"next_dagrun_data_interval_start"`
	Owners                      []string `json:"owners"`
	PickleID                    *string  `json:"pickle_id"`
	RootDagID                   *string  `json:"root_dag_id"`
	ScheduleInterval            Schedule `json:"schedule_interval"`
	SchedulerLock               *string  `json:"scheduler_lock"`
	Tags                        []Tag    `json:"tags"`
	TimetableDescription        string   `json:"timetable_description"`
}

type DAGs added in v0.19.0

type DAGs struct {
	DAGS         []DAGObj `json:"dags"`
	TotalEntries int      `json:"total_entries"`
}

type DagCompiler

type DagCompiler interface {
	Compile(project *tenant.Project, job *scheduler.JobWithDetails) ([]byte, error)
}

type DagRun

type DagRun struct {
	ExecutionDate          time.Time `json:"execution_date"`
	State                  string    `json:"state"`
	ExternalTrigger        bool      `json:"external_trigger"`
	DagRunID               string    `json:"dag_run_id"`
	DagID                  string    `json:"dag_id"`
	LogicalDate            time.Time `json:"logical_date"`
	StartDate              time.Time `json:"start_date"`
	EndDate                time.Time `json:"end_date"`
	DataIntervalStart      time.Time `json:"data_interval_start"`
	DataIntervalEnd        time.Time `json:"data_interval_end"`
	LastSchedulingDecision time.Time `json:"last_scheduling_decision"`
	RunType                string    `json:"run_type"`
}

type DagRunListResponse

type DagRunListResponse struct {
	DagRuns      []DagRun `json:"dag_runs"`
	TotalEntries int      `json:"total_entries"`
}

type DagRunRequest

type DagRunRequest struct {
	OrderBy          string   `json:"order_by"`
	PageOffset       int      `json:"page_offset"`
	PageLimit        int      `json:"page_limit"`
	DagIds           []string `json:"dag_ids"` // nolint: revive
	ExecutionDateGte string   `json:"execution_date_gte,omitempty"`
	ExecutionDateLte string   `json:"execution_date_lte,omitempty"`
}

type PermissionSet added in v0.19.0

type PermissionSet struct {
	Name    string   `json:"name"`
	Actions []Action `json:"actions"`
}

type ProjectGetter

type ProjectGetter interface {
	Get(context.Context, tenant.ProjectName) (*tenant.Project, error)
}

type ResourceDetail added in v0.19.0

type ResourceDetail struct {
	Name string `json:"name"`
}

type Schedule added in v0.19.0

type Schedule struct {
	Type  string `json:"__type"`
	Value string `json:"value"`
}

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(l log.Logger, bucketFac BucketFactory, client Client, compiler DagCompiler, projectGetter ProjectGetter, secretGetter SecretGetter) *Scheduler

func (*Scheduler) AddRole added in v0.19.0

func (s *Scheduler) AddRole(ctx context.Context, tnnt tenant.Tenant, roleName string, ifNotExist bool) error

func (*Scheduler) CancelRun added in v0.17.0

func (s *Scheduler) CancelRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, dagRunID string) error

func (*Scheduler) Clear added in v0.7.0

func (s *Scheduler) Clear(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, executionTime time.Time) error

func (*Scheduler) ClearBatch added in v0.7.0

func (s *Scheduler) ClearBatch(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, startExecutionTime, endExecutionTime time.Time) error

func (*Scheduler) CreateRun added in v0.8.0

func (s *Scheduler) CreateRun(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, executionTime time.Time, dagRunIDPrefix string) error

func (*Scheduler) DeleteJobs

func (s *Scheduler) DeleteJobs(ctx context.Context, t tenant.Tenant, jobNames []string) error

func (*Scheduler) DeployJobs

func (s *Scheduler) DeployJobs(ctx context.Context, tenant tenant.Tenant, jobs []*scheduler.JobWithDetails) error

func (*Scheduler) GetJobRuns

func (s *Scheduler) GetJobRuns(ctx context.Context, tnnt tenant.Tenant, jobQuery *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error)

func (*Scheduler) GetJobRunsWithDetails added in v0.17.0

func (s *Scheduler) GetJobRunsWithDetails(ctx context.Context, tnnt tenant.Tenant, jobQuery *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunWithDetails, error)

func (*Scheduler) GetJobState added in v0.19.0

func (s *Scheduler) GetJobState(ctx context.Context, projectName tenant.ProjectName) (map[string]bool, error)

GetJobState sets the state of jobs disabled on scheduler

func (*Scheduler) GetRolePermissions added in v0.19.0

func (s *Scheduler) GetRolePermissions(ctx context.Context, t tenant.Tenant, roleName string) ([]string, error)

func (*Scheduler) ListJobs

func (s *Scheduler) ListJobs(ctx context.Context, t tenant.Tenant) ([]string, error)

TODO list jobs should not refer from the scheduler, rather should list from db and it has nothing to do with scheduler.

func (*Scheduler) UpdateJobState added in v0.9.0

func (s *Scheduler) UpdateJobState(ctx context.Context, project tenant.ProjectName, jobNames []job.Name, state string) error

UpdateJobState set the state of jobs as enabled / disabled on scheduler

type SchedulerAuth

type SchedulerAuth struct {
	// contains filtered or unexported fields
}

type SecretGetter

type SecretGetter interface {
	Get(ctx context.Context, projName tenant.ProjectName, namespaceName, name string) (*tenant.PlainTextSecret, error)
}

type Tag added in v0.19.0

type Tag struct {
	Name string `json:"name"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL