app

package
v0.0.0-...-5d2d8d3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2021 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const AnnotateDatasetQuery = "" /* 149-byte string literal not displayed */
View Source
const DatasetQuery = "SELECT id, name, type, data_type, metadata, hash, done FROM datasets"
View Source
const DbDebug bool = false
View Source
const ExecNodeQuery = "SELECT id, name, op, params, parents, workspace FROM exec_nodes"
View Source
const ItemQuery = "SELECT k, ext, format, metadata, provider, provider_info FROM items"
View Source
const JobQuery = "SELECT id, name, type, op, metadata, start_time, done, error FROM jobs"
View Source
const PytorchArchQuery = "SELECT id, params FROM pytorch_archs"
View Source
const PytorchComponentQuery = "SELECT id, params FROM pytorch_components"

Variables

View Source
var Config struct {
	// URL where main program can be reached.
	// This is used when telling workers which coordinator a request came from,
	// so that we can get back responses or serve any API calls worker needs to make.
	CoordinatorURL string
	// URL where the worker is, which runs as a separate program.
	// Can also point to a worker pool.
	WorkerURL string
	// Optional instance ID.
	// If set, the worker should launch container in a subdirectory with this name.
	InstanceID string
}

Global config object, set by main.go

View Source
var Router = mux.NewRouter()
View Source
var SetupFuncs []func(*socketio.Server)

Functions

func AcquireWorker

func AcquireWorker(jobOp *AppJobOp) error

Acquire worker and return nil. Or returns error if interrupted (i.e. job terminated by user).

func DeleteBrokenReferences

func DeleteBrokenReferences(isDeleted func(parent skyhook.ExecParent) bool)

Delete ExecParent references that match with an isDeleted function.

func DeleteReferencesToDataset

func DeleteReferencesToDataset(dataset *DBDataset)

Delete broken ExecParent references to a dataset that has been deleted.

func DeleteReferencesToNode

func DeleteReferencesToNode(node *DBExecNode, newOutputs []skyhook.ExecOutput)

Delete broken ExecParent references to a node when the node is deleted or its outputs have changed.

func GetKeyFromFilename

func GetKeyFromFilename(fname string) string

func GetNodeByGraphID

func GetNodeByGraphID(id skyhook.GraphID) skyhook.Node

Retrieves the Node (VirtualNode or Dataset) based on GraphID.

func HandleUpload

func HandleUpload(w http.ResponseWriter, r *http.Request, f func(fname string, cleanupFunc func()) error)

handle parts of standard upload where we save to a temporary file with same extension as uploaded file

func ImportDataset

func ImportDataset(path string, opts ImportOptions) error

Import a local path that contains a Skyhook dataset. If symlink is true, we will copy the db.sqlite3 but symlink all the other

files in the dataset. We do not symlink the whole directory currently
because we don't want to modify it.

If symlink is false, we just copy all the files.

func ImportURL

func ImportURL(url string, opts ImportOptions, f func(path string) error) error

Import from a URL. Calls handler function after URL is downloaded and unzipped. Updates opts with progress.

func IncorporateIntoGraph

func IncorporateIntoGraph(graph skyhook.ExecutionGraph, subgraph skyhook.ExecutionGraph)

Incorporate a subgraph (graph of new nodes) into an existing execution graph. The subgraph may present new dependencies that don't exist yet in the graph, so we need to search those.

func InitDB

func InitDB(init bool)

Initialize the database on startup with cleanup operations. If init is true, we also first initialize the schema and populate certain tables.

func ReleaseWorker

func ReleaseWorker()

func RunNode

func RunNode(targetNode *DBExecNode, opts RunNodeOptions) error

func ToSkyhookInputDatasets

func ToSkyhookInputDatasets(datasets map[string][]*DBDataset) map[string][]skyhook.Dataset

func ToSkyhookOutputDatasets

func ToSkyhookOutputDatasets(datasets map[string]*DBDataset) map[string]skyhook.Dataset

func UncacheDB

func UncacheDB(fname string)

func UnzipThen

func UnzipThen(fname string, f func(path string) error) error

unzip the filename to a temporary directory, then call another function afterwards we will clear the temporary directory

Types

type AnnotateDatasetUpdate

type AnnotateDatasetUpdate struct {
	Tool   *string
	Params *string
}

type AnnotateResponse

type AnnotateResponse struct {
	// The key that we're labeling.
	// May be an existing key in the destination dataset, or a new key.
	Key string

	IsExisting bool
}

info needed to annotate one item, which may or may not be present in the destination dataset

type AppJobOp

type AppJobOp struct {
	Job    *DBJob
	TailOp *skyhook.TailJobOp

	WrappedJobOps    map[string]skyhook.JobOp
	LastWrappedDatas map[string]string

	// stopping support
	Stopping bool
	Stopped  bool

	// function to call when stopping a job
	// we also call this on SetDone
	CleanupFunc func()
	// contains filtered or unexported fields
}

A JobOp that wraps a TailOp for console, plus arbitrary number of other JobOps. It also provides functionality for stopping via mutex/condition.

func (*AppJobOp) Cleanup

func (op *AppJobOp) Cleanup()

func (*AppJobOp) Encode

func (op *AppJobOp) Encode() string

func (*AppJobOp) IsStopping

func (op *AppJobOp) IsStopping() bool

func (*AppJobOp) ReadFrom

func (op *AppJobOp) ReadFrom(r io.Reader)

func (*AppJobOp) SetCleanupFunc

func (op *AppJobOp) SetCleanupFunc(f func())

func (*AppJobOp) SetDone

func (op *AppJobOp) SetDone(err error)

Handles ending the job so that the caller doesn't need to call Job.SetDone directly.

func (*AppJobOp) Stop

func (op *AppJobOp) Stop() error

func (*AppJobOp) Update

func (op *AppJobOp) Update(lines []string)

type AppJobState

type AppJobState struct {
	Lines []string
	Datas map[string]string
}

type ContainerInfo

type ContainerInfo struct {
	UUID        string
	BaseURL     string
	Parallelism int
}

Allocate a container on the worker. Caller is responsible for acquiring worker.

func AcquireContainer

func AcquireContainer(node skyhook.Runnable, jobOp *AppJobOp) (ContainerInfo, error)

type DBAnnotateDataset

type DBAnnotateDataset struct {
	skyhook.AnnotateDataset

	InputDatasets []skyhook.Dataset
	// contains filtered or unexported fields
}

func GetAnnotateDataset

func GetAnnotateDataset(id int) *DBAnnotateDataset

func ListAnnotateDatasets

func ListAnnotateDatasets() []*DBAnnotateDataset

func NewAnnotateDataset

func NewAnnotateDataset(dataset skyhook.Dataset, inputs []skyhook.ExecParent, tool string, params string) (*DBAnnotateDataset, error)

func (*DBAnnotateDataset) Delete

func (s *DBAnnotateDataset) Delete()

func (*DBAnnotateDataset) Load

func (s *DBAnnotateDataset) Load()

func (*DBAnnotateDataset) SampleMissingKey

func (s *DBAnnotateDataset) SampleMissingKey() string

samples a key that is present in all input datasets but not yet labeled in this annotate dataset TODO: have sampler object so that hash tables can be stored in memory instead of loaded from db each time

func (*DBAnnotateDataset) Update

type DBDataset

type DBDataset struct {
	skyhook.Dataset
	Done bool
}

func ExecParentToDataset

func ExecParentToDataset(parent skyhook.ExecParent) (*DBDataset, error)

Resolves an ExecParent to a dataset. If the dataset is unavailable, returns an error.

func FindDataset

func FindDataset(hash string) *DBDataset

func GetDataset

func GetDataset(id int) *DBDataset

func ListDatasets

func ListDatasets() []*DBDataset

func NewDataset

func NewDataset(name string, t string, dataType skyhook.DataType, hash *string) *DBDataset

func (*DBDataset) AddExecRef

func (ds *DBDataset) AddExecRef(nodeID int)

func (*DBDataset) AddItem

func (ds *DBDataset) AddItem(item skyhook.Item) (*DBItem, error)

func (*DBDataset) Clear

func (ds *DBDataset) Clear()

Clear the dataset without deleting it.

func (*DBDataset) Delete

func (ds *DBDataset) Delete()

func (*DBDataset) DeleteExecRef

func (ds *DBDataset) DeleteExecRef(nodeID int)

func (*DBDataset) Export

func (ds *DBDataset) Export(outFname string, opts ImportOptions) error

Export a dataset into the Skyhook .zip format. Returns the zip filename or error.

func (*DBDataset) ExportFiles

func (ds *DBDataset) ExportFiles(outFname string, opts ImportOptions) error

Export files in a file dataset. Unlike Export, this produces a .zip file where items in the dataset are named based on their filenames specified in FileMetadata.

func (*DBDataset) GetItem

func (ds *DBDataset) GetItem(key string) *DBItem

func (*DBDataset) ImportDir

func (ds *DBDataset) ImportDir(path string, opts ImportOptions) error

func (*DBDataset) ImportFiles

func (ds *DBDataset) ImportFiles(fnames []string, opts ImportOptions) error

func (*DBDataset) ImportIntoFileDataset

func (ds *DBDataset) ImportIntoFileDataset(fnames []string, opts ImportOptions) error

specialized function for importing files if the dataset is File type in this case, the metadata specifies the original filename also in this case we want to recursively scan since filenames in subdirectories should be imported too

func (*DBDataset) ListItems

func (ds *DBDataset) ListItems() []*DBItem

func (*DBDataset) SetDone

func (ds *DBDataset) SetDone(done bool)

func (*DBDataset) Update

func (ds *DBDataset) Update(req DatasetUpdate)

func (*DBDataset) WriteItem

func (ds *DBDataset) WriteItem(key string, data interface{}, metadata skyhook.DataMetadata) (*DBItem, error)

type DBExecNode

type DBExecNode struct {
	skyhook.ExecNode
	Workspace string
}

func GetExecNode

func GetExecNode(id int) *DBExecNode

func ListExecNodes

func ListExecNodes() []*DBExecNode

func NewExecNode

func NewExecNode(name string, op string, params string, parents map[string][]skyhook.ExecParent, workspace string) *DBExecNode

func (*DBExecNode) DatasetRefs

func (node *DBExecNode) DatasetRefs() []int

func (*DBExecNode) Delete

func (node *DBExecNode) Delete()

func (*DBExecNode) GetComputedKeys

func (node *DBExecNode) GetComputedKeys() map[string]bool

Helper function to compute the keys already computed at a node. This only works for incremental nodes, which must produce the same keys across all output datasets.

func (*DBExecNode) GetDatasets

func (node *DBExecNode) GetDatasets(create bool) (map[string]*DBDataset, bool)

Get datasets for each output of this node. If create=true, creates new datasets to cover missing ones. Also returns bool, which is true if all datasets exist.

func (*DBExecNode) GetGraph

func (node *DBExecNode) GetGraph() skyhook.ExecutionGraph

Build the execution graph rooted at this node. Execution graph maps from GraphID to Node.

func (*DBExecNode) GetGraphID

func (node *DBExecNode) GetGraphID() skyhook.GraphID

func (*DBExecNode) GetVirtualDatasets

func (node *DBExecNode) GetVirtualDatasets(vnode *skyhook.VirtualNode) map[string]*DBDataset

Get dataset for a virtual node that comes from this node. If the datasets don't exist already, we create them.

func (*DBExecNode) Hash

func (node *DBExecNode) Hash() string

func (*DBExecNode) Incremental

func (node *DBExecNode) Incremental(opts IncrementalOptions) error

func (*DBExecNode) IsDone

func (node *DBExecNode) IsDone() bool

Returns true if all the output datasets are done.

func (*DBExecNode) PrepareRun

func (node *DBExecNode) PrepareRun(opts ExecRunOptions) (*RunData, error)

Prepare to run this node. Returns a RunData. Or error on error. Or nil RunData and error if the node is already done.

func (*DBExecNode) Update

func (node *DBExecNode) Update(req ExecNodeUpdate)

type DBItem

type DBItem struct {
	skyhook.Item
	// contains filtered or unexported fields
}

func (*DBItem) Delete

func (item *DBItem) Delete()

func (*DBItem) Handle

func (item *DBItem) Handle(format string, w http.ResponseWriter, r *http.Request)

func (*DBItem) Load

func (item *DBItem) Load()

func (*DBItem) SetMetadata

func (item *DBItem) SetMetadata(format string, metadata skyhook.DataMetadata)

func (*DBItem) SetMetadataFromFile

func (item *DBItem) SetMetadataFromFile() error

Set metadata based on the file.

type DBJob

type DBJob struct{ skyhook.Job }

func GetJob

func GetJob(id int) *DBJob

func ListJobs

func ListJobs() []*DBJob

func NewJob

func NewJob(name string, t string, op string, metadata string) *DBJob

func (*DBJob) AttachOp

func (j *DBJob) AttachOp(op skyhook.JobOp)

func (*DBJob) GetState

func (j *DBJob) GetState() string

func (*DBJob) SetDone

func (j *DBJob) SetDone(error string)

func (*DBJob) UpdateMetadata

func (j *DBJob) UpdateMetadata(metadata string)

func (*DBJob) UpdateState

func (j *DBJob) UpdateState(state string)

type DBPytorchArch

type DBPytorchArch struct{ skyhook.PytorchArch }

func GetPytorchArch

func GetPytorchArch(id string) *DBPytorchArch

func GetPytorchArchByName

func GetPytorchArchByName(id string) *DBPytorchArch

func ListPytorchArchs

func ListPytorchArchs() []*DBPytorchArch

func NewPytorchArch

func NewPytorchArch(id string) *DBPytorchArch

func (*DBPytorchArch) Delete

func (arch *DBPytorchArch) Delete()

func (*DBPytorchArch) Update

func (arch *DBPytorchArch) Update(req PytorchArchUpdate)

type DBPytorchComponent

type DBPytorchComponent struct{ skyhook.PytorchComponent }

func GetPytorchComponent

func GetPytorchComponent(id string) *DBPytorchComponent

func ListPytorchComponents

func ListPytorchComponents() []*DBPytorchComponent

func NewPytorchComponent

func NewPytorchComponent(id string) *DBPytorchComponent

func (*DBPytorchComponent) Delete

func (c *DBPytorchComponent) Delete()

func (*DBPytorchComponent) Update

type DBWorkspace

type DBWorkspace string

func GetWorkspace

func GetWorkspace(wsName string) *DBWorkspace

func (DBWorkspace) Delete

func (ws DBWorkspace) Delete()

func (DBWorkspace) ListExecNodes

func (ws DBWorkspace) ListExecNodes() []*DBExecNode

type Database

type Database struct {
	// contains filtered or unexported fields
}

func GetCachedDB

func GetCachedDB(fname string, initFunc func(*Database)) *Database

Get a cached database connection to the specified sqlite3 file. If initFunc is set, we create the sqlite3 if it doesn't already exist, and

call initFunc each time a new connection is opened.

Otherwise, we do not create new files, and instead return nil.

func GetDB

func GetDB() *Database

func (*Database) Close

func (this *Database) Close()

func (*Database) Exec

func (this *Database) Exec(q string, args ...interface{}) Result

func (*Database) Query

func (this *Database) Query(q string, args ...interface{}) *Rows

func (*Database) QueryRow

func (this *Database) QueryRow(q string, args ...interface{}) *Row

func (*Database) Transaction

func (this *Database) Transaction(f func(tx Tx))

type DatasetUpdate

type DatasetUpdate struct {
	Name     *string
	Metadata *string
}

type ExecNodeUpdate

type ExecNodeUpdate struct {
	Name    *string
	Op      *string
	Params  *string
	Parents *map[string][]skyhook.ExecParent
}

type ExecRunOptions

type ExecRunOptions struct {
	// If force, we run even if outputs were already available.
	Force bool

	// Whether to try incremental execution at this node.
	// If false, we throw error if parent datasets are not done.
	Incremental bool

	// If set, limit execution to these keys.
	// Only supported by incremental ops.
	LimitOutputKeys map[string]bool
}

type ImportOptions

type ImportOptions struct {
	Symlink bool

	// import will call Update and IsStopping on this AppJobOp if set
	AppJobOp *AppJobOp

	// import will call Increment if set
	ProgressJobOp *ProgressJobOp
}

func (ImportOptions) CompletedTask

func (opts ImportOptions) CompletedTask(line string, increment int) bool

increment the ProgressJobOp, write a line to AppJobOp, and check IsStopping

func (ImportOptions) SetTasks

func (opts ImportOptions) SetTasks(total int)

type IncrementalOptions

type IncrementalOptions struct {
	// Number of random outputs to compute at this node.
	// Only one of Count or Keys should be specified.
	Count int
	// Compute outputs matching these keys.
	Keys []string
	// MultiExecJob to update during incremental execution.
	// For non-incremental ancestors, we pass this JobOp to RunNode.
	JobOp *MultiExecJobOp
}

Get some number of incremental outputs from this node.

type MultiExecJobOp

type MultiExecJobOp struct {
	Job *DBJob

	// current wrapped job (current ExecJob)
	CurJob *skyhook.Job

	// current execution plan
	// the field can change but the slice itself must not
	Plan []*skyhook.VirtualNode

	// which index in the plan are we executing next (or right now)?
	PlanIndex int
	// contains filtered or unexported fields
}

A JobOp for running multiple ExecNodes.

func (*MultiExecJobOp) ChangeJob

func (op *MultiExecJobOp) ChangeJob(job skyhook.Job)

func (*MultiExecJobOp) ChangePlan

func (op *MultiExecJobOp) ChangePlan(plan []*skyhook.VirtualNode, planIndex int)

Set the plan. The plan must be immutable.

func (*MultiExecJobOp) Encode

func (op *MultiExecJobOp) Encode() string

func (*MultiExecJobOp) SetPlanFromGraph

func (op *MultiExecJobOp) SetPlanFromGraph(graph skyhook.ExecutionGraph, ready map[skyhook.GraphID]map[string]*DBDataset, needed map[skyhook.GraphID]skyhook.Node, cur *skyhook.VirtualNode)

Get a []*skyhook.VirtualNode plan based on current execution graph and related state.

func (*MultiExecJobOp) SetPlanFromMap

func (op *MultiExecJobOp) SetPlanFromMap(nodes map[int]*DBExecNode, done map[int]bool, curID int)

Used in DBExecNode.Incremental.

func (*MultiExecJobOp) Stop

func (op *MultiExecJobOp) Stop() error

func (*MultiExecJobOp) Update

func (op *MultiExecJobOp) Update(lines []string)

type MultiExecJobState

type MultiExecJobState struct {
	CurJob    *skyhook.Job
	Plan      []*skyhook.VirtualNode
	PlanIndex int
}

type ProgressJobOp

type ProgressJobOp struct {
	Completed int
	Total     int
	// contains filtered or unexported fields
}

func (*ProgressJobOp) Encode

func (op *ProgressJobOp) Encode() string

func (*ProgressJobOp) Increment

func (op *ProgressJobOp) Increment()

func (*ProgressJobOp) SetProgressPercent

func (op *ProgressJobOp) SetProgressPercent(percent int) bool

Set the progress to the specified percentage. Returns true if the percent was updated.

func (*ProgressJobOp) SetTotal

func (op *ProgressJobOp) SetTotal(total int)

func (*ProgressJobOp) Stop

func (op *ProgressJobOp) Stop() error

func (*ProgressJobOp) Update

func (op *ProgressJobOp) Update(lines []string)

type PytorchArchUpdate

type PytorchArchUpdate struct {
	Params *skyhook.PytorchArchParams
}

type PytorchComponentUpdate

type PytorchComponentUpdate struct {
	Params *skyhook.PytorchComponentParams
}

type Result

type Result struct {
	// contains filtered or unexported fields
}

func (Result) LastInsertId

func (r Result) LastInsertId() int

func (Result) RowsAffected

func (r Result) RowsAffected() int

type Row

type Row struct {
	// contains filtered or unexported fields
}

func (Row) Scan

func (r Row) Scan(dest ...interface{})

type Rows

type Rows struct {
	// contains filtered or unexported fields
}

func (*Rows) Close

func (r *Rows) Close()

func (*Rows) Next

func (r *Rows) Next() bool

func (*Rows) Scan

func (r *Rows) Scan(dest ...interface{})

type RunData

type RunData struct {
	Name  string
	Node  skyhook.Runnable
	Tasks []skyhook.ExecTask

	// whether we'll be done with the node after running Tasks
	// i.e., whether Tasks contains all pending tasks at this node
	WillBeDone bool

	// job-related things to update
	JobOp         *AppJobOp
	ProgressJobOp *ProgressJobOp

	// Saved error if any
	Error error
}

A RunData provides a Run function that executes a Runnable over the specified tasks.

func (*RunData) Run

func (rd *RunData) Run() error

func (*RunData) SetDone

func (rd *RunData) SetDone()

Update the AppJobOp with the saved error. We don't call this in RunData.Run by default because it's possible that the specified RunData.JobOp is shared across multiple Runs and shouldn't be marked as completed.

func (*RunData) SetJob

func (rd *RunData) SetJob(name string, metadata string)

Create a Job for this RunData and populate JobOp/ProgressJobOp.

type RunNodeOptions

type RunNodeOptions struct {
	// If force, we run even if outputs were already available.
	Force bool
	// If NoRunTree, we do not run if the parents of targetNode are not available.
	NoRunTree bool
	// MultiExecJobOp to update with jobs for each ExecNode run.
	JobOp *MultiExecJobOp
}

Run the specified node, while running ancestors first if needed.

type Tx

type Tx struct {
	// contains filtered or unexported fields
}

func (Tx) Exec

func (tx Tx) Exec(q string, args ...interface{}) Result

func (Tx) Query

func (tx Tx) Query(q string, args ...interface{}) Rows

func (Tx) QueryRow

func (tx Tx) QueryRow(q string, args ...interface{}) Row

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL