dynamodbtask

package
v0.0.0-...-90deddd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2023 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Overview

Package dynamodbtask implements the taskdb.TaskDB interface for AWS dynamodb backend. Every run or task is stored in a row with their attributes which includes labels, user, keepalive and start times. Tasks have a runid column to identify which run it belongs to. Tasks also store the flowId of the reflow flow, resultId, exec uri and stdout, stderr and inspect log ids. To make common queries like recently run/tasks, runs/tasks have day time buckets stored. "Date-Keepalive-index" index allows querying runs/tasks based on time buckets. Dynamodbtask also uses a bunch of secondary indices to help with run/task querying. Schema: run: {ID, ID4, Type="run", Labels, Bundle, Args, Date, Keepalive, StartTime, EndTime, User} task: {ID, ID4, Type="task", Labels, Date, Attempt, Keepalive, StartTime, EndTime, FlowID, Inspect, Error, ResultID, RunID, RunID4, AllocID, ImgCmdID, Ident, Stderr, Stdout, URI} alloc: {ID, ID4, Type="alloc", PoolID, AllocID, Resources, URI, Keepalive, StartTime, EndTime} pool: {ID, ID4, Type="pool", PoolID, PoolType, ClusterID.*, Resources, URI, Keepalive, StartTime, EndTime} Note: PoolID: While rows of type "pool" are expected to store the implementation-specific identifier of a pool, rows of type "alloc" will contain the digest of PoolID in this field (of the pool they belong to). AllocID: Similarly, While rows of type "alloc" are expected to store the value Alloc.ID(), rows of type "task" will contain the digest of Alloc.ID() (of the alloc where they are attempted). Indexes: 1. Date-Keepalive-index - for time-based queries. 2. RunID-index - for finding all tasks that belong to a run. 3. ID-index and ID4-ID-index - for queries looking for specific runs or tasks. 4. ImgCmdID-index and Ident-index - for queries looking for specific execs.

Index

Constants

View Source
const (
	ID taskdb.Kind = iota
	ID4
	RunID
	RunID4
	FlowID
	AllocID
	PoolID
	ResultID
	ImgCmdID
	Ident
	Attempt
	KeepAlive
	StartTime
	Stdout
	Stderr
	ExecInspect
	Error
	URI
	Labels
	User
	Type
	Date
	Bundle
	Args
	EndTime
	RunLog
	ExecLog
	SysLog
	EvalGraph
	Trace
	Resources
	PoolType
	ClusterName
	ReflowVersion
)
View Source
const (
	// ProviderName is the name of this TaskDB's infra config provider.
	ProviderName = "dynamodbtask"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Items

type Items []map[string]*dynamodb.AttributeValue

Items is the response from a dynamoDb scan.

type ItemsHandler

type ItemsHandler interface {
	// HandleItems handles a set of scanned items.
	HandleItems(items Items) error
}

ItemsHandler is an interface for handling Items from a segment scan.

type ItemsHandlerFunc

type ItemsHandlerFunc func(items Items) error

ItemsHandlerFunc is a convenience type to avoid having to declare a struct to implement the ItemsHandler interface.

func (ItemsHandlerFunc) HandleItems

func (h ItemsHandlerFunc) HandleItems(items Items) error

HandleItems implements the ItemsHandler interface.

type TaskDB

type TaskDB struct {
	infra2.TableNameFlagsTrait
	infra2.BucketNameFlagsTrait
	// DB is the dynamodb.
	DB dynamodbiface.DynamoDBAPI
	// Labels on the run.
	Labels []string
	// User who initiated this run.
	User string

	// Repo is the repository to store large objects referenced from this TaskDB.
	Repo *blobrepo.Repository
	// contains filtered or unexported fields
}

TaskDB implements the dynamodb backed taskdb.TaskDB interface to store run/task state and metadata. Each association is either: a) RunID and its associated metadata (run labels, user info, and leases) b) TaskID and its associated metadata (RunID that spawned this task, FlowID of the node, and leases)

func (*TaskDB) Allocs

func (t *TaskDB) Allocs(ctx context.Context, q taskdb.AllocQuery) ([]taskdb.Alloc, error)

Allocs returns allocs (with their pools) that matches the query.

func (*TaskDB) CreateRun

func (t *TaskDB) CreateRun(ctx context.Context, id taskdb.RunID, user string) error

CreateRun sets a new run in the taskdb with the given id, labels and user.

func (*TaskDB) CreateTask

func (t *TaskDB) CreateTask(ctx context.Context, task taskdb.Task) error

CreateTask creates a new task in the taskdb with the provided task.

func (*TaskDB) Flags

func (t *TaskDB) Flags(flags *flag.FlagSet)

Flags implements infra.Provider.

func (TaskDB) Help

func (TaskDB) Help() string

Help implements infra.Provider.

func (*TaskDB) Init

func (t *TaskDB) Init(sess *session.Session, user *infra2.User, labels pool.Labels) (err error)

Init implements infra.Provider.

func (*TaskDB) KeepIDAlive

func (t *TaskDB) KeepIDAlive(ctx context.Context, id digest.Digest, keepalive time.Time) error

keepalive sets the keepalive for the specified id to keepalive.

func (*TaskDB) KeepRunAlive

func (t *TaskDB) KeepRunAlive(ctx context.Context, id taskdb.RunID, keepalive time.Time) error

KeepRunAlive sets the keepalive for run id to keepalive.

func (*TaskDB) KeepTaskAlive

func (t *TaskDB) KeepTaskAlive(ctx context.Context, id taskdb.TaskID, keepalive time.Time) error

KeepTaskAlive sets the keepalive for task id to keepalive.

func (*TaskDB) Pools

func (t *TaskDB) Pools(ctx context.Context, q taskdb.PoolQuery) ([]taskdb.PoolRow, error)

Pools returns pools that matches the query.

func (*TaskDB) Repository

func (t *TaskDB) Repository() reflow.Repository

Repository implements taskdb.TaskDB.

func (*TaskDB) Runs

func (t *TaskDB) Runs(ctx context.Context, runQuery taskdb.RunQuery) ([]taskdb.Run, error)

Runs returns runs that matches the query.

func (*TaskDB) Scan

func (t *TaskDB) Scan(ctx context.Context, kind taskdb.Kind, mappingHandler taskdb.MappingHandler) error

Scan calls the handler function for every association in the mapping. Note that the handler function may be called asynchronously from multiple threads.

func (*TaskDB) SetEndTime

func (t *TaskDB) SetEndTime(ctx context.Context, id digest.Digest, end time.Time) error

SetEndTime sets the end time for the given id.

func (*TaskDB) SetResources

func (t *TaskDB) SetResources(ctx context.Context, id digest.Digest, resources reflow.Resources) error

SetResources sets the resources field in the taskdb for the row with the given id.

func (*TaskDB) SetRunAttrs

func (t *TaskDB) SetRunAttrs(ctx context.Context, id taskdb.RunID, bundle digest.Digest, args []string) error

SetRunAttrs sets the reflow bundle and corresponding args for this run.

func (*TaskDB) SetRunComplete

func (t *TaskDB) SetRunComplete(ctx context.Context, id taskdb.RunID, runlog, evalGraph, trace digest.Digest, end time.Time) error

SetRunComplete sets the result of the run post completion.

func (*TaskDB) SetTaskAttrs

func (t *TaskDB) SetTaskAttrs(ctx context.Context, id taskdb.TaskID, stdout, stderr, inspect digest.Digest) error

SetTaskAttrs sets the stdout, stderr and inspect ids for the task.

func (*TaskDB) SetTaskComplete

func (t *TaskDB) SetTaskComplete(ctx context.Context, id taskdb.TaskID, err error, end time.Time) error

SetTaskComplete mark the task as completed as of the given end time.

func (*TaskDB) SetTaskResult

func (t *TaskDB) SetTaskResult(ctx context.Context, id taskdb.TaskID, result digest.Digest) error

SetTaskResult sets the task result id.

func (*TaskDB) SetTaskUri

func (t *TaskDB) SetTaskUri(ctx context.Context, id taskdb.TaskID, uri string) error

SetTaskUri updates the task URI.

func (*TaskDB) Setup

func (t *TaskDB) Setup(sess *session.Session, log *log.Logger) error

Setup implements infra.Provider.

func (*TaskDB) StartAlloc

func (t *TaskDB) StartAlloc(ctx context.Context, allocID reflow.StringDigest, poolID digest.Digest, resources reflow.Resources, start time.Time) error

StartAlloc creates a new alloc in the taskdb with the provided parameters.

func (*TaskDB) StartPool

func (t *TaskDB) StartPool(ctx context.Context, pool taskdb.Pool) error

StartPool creates a new pool in the taskdb with the provided parameters.

func (*TaskDB) String

func (t *TaskDB) String() string

func (*TaskDB) Tasks

func (t *TaskDB) Tasks(ctx context.Context, q taskdb.TaskQuery) ([]taskdb.Task, error)

Tasks returns tasks that matches the query.

func (*TaskDB) Version

func (t *TaskDB) Version() int

Version implements infra.Provider.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL