Version: v0.5.28 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2021 License: Apache-2.0 Imports: 39 Imported by: 0



This package deals with the communication with AWS-Batch and adopting its APIs to the flyte-plugin model.



View Source
const (
	// Keep these in-sync with flyteAdmin @
	PrimaryTaskQueueKey = "primary_queue"
	DynamicTaskQueueKey = "dynamic_queue"
	ChildTaskQueueKey   = "child_queue"
View Source
const (
	LogStreamFormatter = ";stream=%v"
	ArrayJobFormatter  = ""
	JobFormatter       = ""
View Source
const (


This section is empty.


func FlyteTaskToBatchInput

func FlyteTaskToBatchInput(ctx context.Context, tCtx pluginCore.TaskExecutionContext, jobDefinition string, cfg *config2.Config) (
	batchInput *batch.SubmitJobInput, err error)

Note that Name is not set on the result object. It's up to the caller to set the Name before creating the object in K8s.

func GetJobTaskLog

func GetJobTaskLog(jobSize int, accountID, region, queue, jobID string) *idlCore.TaskLog

func GetJobURI added in v0.3.11

func GetJobURI(jobSize int, accountID, region, queue, jobID string) string
func GetTaskLinks(ctx context.Context, taskMeta pluginCore.TaskExecutionMetadata, jobStore *JobStore, state *State) (
	[]*idlCore.TaskLog, error)

func TerminateSubTasks

func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchClient Client, reason string, metrics ExecutorMetrics) error

Attempts to terminate the AWS Job if one is recorded in the pluginState. This API is idempotent and should be safe to call multiple times on the same job. It'll result in multiple calls to AWS Batch in that case, however.

func UpdateBatchInputForArray

func UpdateBatchInputForArray(_ context.Context, batchInput *batch.SubmitJobInput, arraySize int64) *batch.SubmitJobInput


type ArrayJobSummary

type ArrayJobSummary map[JobPhaseType]int64

type Attempt

type Attempt struct {
	LogStream string    `json:"logStream,omitempty"`
	StartedAt time.Time `json:"startedAt,omitempty"`
	StoppedAt time.Time `json:"stoppedAt,omitempty"`

type BatchServiceClient

type BatchServiceClient interface {
	SubmitJobWithContext(ctx a.Context, input *batch.SubmitJobInput, opts ...request.Option) (*batch.SubmitJobOutput, error)
	TerminateJobWithContext(ctx a.Context, input *batch.TerminateJobInput, opts ...request.Option) (*batch.TerminateJobOutput, error)
	DescribeJobsWithContext(ctx a.Context, input *batch.DescribeJobsInput, opts ...request.Option) (*batch.DescribeJobsOutput, error)
	RegisterJobDefinitionWithContext(ctx a.Context, input *batch.RegisterJobDefinitionInput, opts ...request.Option) (*batch.RegisterJobDefinitionOutput, error)

BatchServiceClient is an interface on top of the native AWS Batch client to allow for mocking and alternative implementations.

type Client

type Client interface {
	// Submits a new job to AWS Batch and retrieves job info. Note that submitted jobs will not have status populated.
	SubmitJob(ctx context.Context, input *batch.SubmitJobInput) (jobID string, err error)

	// Attempts to terminate a job. If the job hasn't started yet, it'll just get deleted.
	TerminateJob(ctx context.Context, jobID JobID, reason string) error

	// Retrieves jobs' details from AWS Batch.
	GetJobDetailsBatch(ctx context.Context, ids []JobID) ([]*batch.JobDetail, error)

	// Registers a new Job Definition with AWS Batch provided a name, image and role.
	RegisterJobDefinition(ctx context.Context, name, image, role string) (arn string, err error)

	// Gets the single region this client interacts with.
	GetRegion() string

	GetAccountID() string

AWS Batch Client interface.

func NewBatchClient

func NewBatchClient(awsClient aws.Client,
	getRateLimiter utils.RateLimiter,
	defaultRateLimiter utils.RateLimiter) Client

Initializes a new Batch Client that can be used to interact with AWS Batch.

func NewCustomBatchClient

func NewCustomBatchClient(batchClient BatchServiceClient, accountID, region string,
	getRateLimiter utils.RateLimiter,
	defaultRateLimiter utils.RateLimiter) Client

type Event

type Event struct {
	OldJob *Job
	NewJob *Job

type EventHandler

type EventHandler struct {
	Updated func(ctx context.Context, event Event)

type Executor

type Executor struct {
	// contains filtered or unexported fields

func NewExecutor

func NewExecutor(ctx context.Context, awsClient aws.Client, cfg *batchConfig.Config,
	enqueueOwner core.EnqueueOwner, scope promutils.Scope) (Executor, error)

func (Executor) Abort

func (Executor) Finalize

func (e Executor) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error

func (Executor) GetID

func (e Executor) GetID() string

func (Executor) GetProperties

func (e Executor) GetProperties() core.PluginProperties

func (Executor) Handle

func (Executor) Start

func (e Executor) Start(ctx context.Context) error

type ExecutorMetrics added in v0.3.22

type ExecutorMetrics struct {
	Scope              promutils.Scope
	SubTasksSubmitted  labeled.Counter
	SubTasksSucceeded  labeled.Counter
	SubTasksFailed     labeled.Counter
	SubTasksQueued     labeled.Counter
	BatchJobTerminated labeled.Counter

type Job

type Job struct {
	ID             JobID                `json:"id,omitempty"`
	OwnerReference types.NamespacedName `json:"owner.omitempty"`
	Attempts       []Attempt            `json:"attempts,omitempty"`
	Status         JobStatus            `json:"status,omitempty"`
	SubJobs        []*Job               `json:"array,omitempty"`

func (Job) String

func (j Job) String() string

type JobConfig

type JobConfig struct {
	PrimaryTaskQueue string `json:"primary_queue"`
	DynamicTaskQueue string `json:"dynamic_queue"`

func (*JobConfig) MergeFromConfigMap

func (j *JobConfig) MergeFromConfigMap(configMap *v1.ConfigMap) *JobConfig

func (*JobConfig) MergeFromKeyValuePairs

func (j *JobConfig) MergeFromKeyValuePairs(pairs []*core.KeyValuePair) *JobConfig

type JobID

type JobID = string

func GetJobID

func GetJobID(id JobID, index int) JobID

type JobName

type JobName = string

type JobPhaseType

type JobPhaseType = core.Phase

type JobStatus

type JobStatus struct {
	Phase   JobPhaseType `json:"phase,omitempty"`
	Message string       `json:"msg,omitempty"`

type JobStore

type JobStore struct {
	// contains filtered or unexported fields

func NewJobStore

func NewJobStore(ctx context.Context, batchClient Client, cfg config.JobStoreConfig,
	handler EventHandler, scope promutils.Scope) (JobStore, error)

Constructs a new in-memory store.

func (JobStore) Get

func (s JobStore) Get(jobName string) *Job

func (JobStore) GetOrCreate

func (s JobStore) GetOrCreate(jobName string, job *Job) (*Job, error)

func (JobStore) IsStarted

func (s JobStore) IsStarted() bool

func (*JobStore) Start

func (s *JobStore) Start(ctx context.Context) error

func (JobStore) SubmitJob

func (s JobStore) SubmitJob(ctx context.Context, input *batch.SubmitJobInput) (jobID string, err error)

Submits a new job to AWS Batch and retrieves job info. Note that submitted jobs will not have status populated.

type State

type State struct {

	ExternalJobID    *string `json:"externalJobID"`
	JobDefinitionArn definition.JobDefinitionArn

func CheckSubTasksState

func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata, outputPrefix, baseOutputSandbox storage.DataReference, jobStore *JobStore,
	dataStore *storage.DataStore, cfg *config.Config, currentState *State, metrics ExecutorMetrics) (newState *State, err error)

func EnsureJobDefinition

func EnsureJobDefinition(ctx context.Context, tCtx pluginCore.TaskExecutionContext, cfg *config.Config, client Client,
	definitionCache definition.Cache, currentState *State) (nextState *State, err error)

func LaunchSubTasks

func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchClient Client, pluginConfig *config.Config,
	currentState *State, metrics ExecutorMetrics) (nextState *State, err error)

func (State) GetExternalJobID

func (s State) GetExternalJobID() *string

func (State) GetJobDefinitionArn

func (s State) GetJobDefinitionArn() definition.JobDefinitionArn

func (*State) SetExternalJobID

func (s *State) SetExternalJobID(jobID string) *State

func (*State) SetJobDefinitionArn

func (s *State) SetJobDefinitionArn(arn definition.JobDefinitionArn) *State


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL