airflow2

package
v0.5.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2023 License: Apache-2.0 Imports: 22 Imported by: 0

README

Airflow v2.x

Currently, allows configuring dags to be loaded from

  • GCS bucket
  • Local filesystem
  • inmemory

Implementation already supports variety of other systems, just need to configure them.

For using a fs that needs auth, it is required to create a project secret with STORAGE as key and base64 encoded service account/token as value.

Optimus also provides api to get currently running job status using airflow APIs. For this to work, it is required to register a secret with SCHEDULER_AUTH as key and base64 encoded username:password as token. This assumes airflow is configured to use basic auth on api by default.

Documentation

Index

Constants

View Source
const (
	JobsDir       = "dags"
	JobsExtension = ".py"

	ConcurrentTicketPerSec = 40
	ConcurrentLimit        = 600
)

Variables

View Source
var ErrEmptyJobName = errors.New("job name cannot be an empty string")
View Source
var SharedLib []byte

Functions

func JobNameFromPath added in v0.0.3

func JobNameFromPath(filePath, suffix string) string

func NewScheduler

func NewScheduler(bucketFac BucketFactory, httpClient HTTPClient, compiler models.JobCompiler) *scheduler

func PathForJobDirectory added in v0.0.3

func PathForJobDirectory(prefix, namespace string) string

func PathFromJobName added in v0.0.3

func PathFromJobName(prefix, namespace, jobName, suffix string) string

Types

type Bucket added in v0.0.3

type Bucket interface {
	WriteAll(ctx context.Context, key string, p []byte, opts *blob.WriterOptions) error
	ReadAll(ctx context.Context, key string) ([]byte, error)
	List(opts *blob.ListOptions) *blob.ListIterator
	Delete(ctx context.Context, key string) error
	Close() error
}

type BucketFactory added in v0.0.3

type BucketFactory interface {
	New(ctx context.Context, project models.ProjectSpec) (Bucket, error)
}

type DagRun added in v0.2.0

type DagRun struct {
	DagRunID        string    `json:"dag_run_id"`
	DagID           string    `json:"dag_id"`
	LogicalDate     time.Time `json:"logical_date"`
	ExecutionDate   time.Time `json:"execution_date"`
	StartDate       time.Time `json:"start_date"`
	EndDate         time.Time `json:"end_date"`
	State           string    `json:"state"`
	ExternalTrigger bool      `json:"external_trigger"`
	Conf            struct{}  `json:"conf"`
}

type DagRunListResponse added in v0.2.0

type DagRunListResponse struct {
	DagRuns      []DagRun `json:"dag_runs"`
	TotalEntries int      `json:"total_entries"`
}

type DagRunRequest added in v0.2.0

type DagRunRequest struct {
	OrderBy          string   `json:"order_by"`
	PageOffset       int      `json:"page_offset"`
	PageLimit        int      `json:"page_limit"`
	DagIds           []string `json:"dag_ids"`
	ExecutionDateGte string   `json:"execution_date_gte,omitempty"`
	ExecutionDateLte string   `json:"execution_date_lte,omitempty"`
}

type HTTPClient added in v0.1.1

type HTTPClient interface {
	Do(req *http.Request) (*http.Response, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL