storage

package
v1.7.0-alpha.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2021 License: Apache-2.0 Imports: 23 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Map

func Map(vs []string, f func(string) string) []string

Apply func f to every string in a given string slice.

func NullInt64ToPointer

func NullInt64ToPointer(ni sql.NullInt64) *int64

func NullStringToPointer

func NullStringToPointer(ns sql.NullString) *string

func PointerToNullInt64

func PointerToNullInt64(ip *int64) sql.NullInt64

func PointerToNullString

func PointerToNullString(sp *string) sql.NullString

Types

type DB

type DB struct {
	*sql.DB
	SQLDialect
}

DB a struct wrapping plain sql library with SQL dialect, to solve any feature difference between MySQL, which is used in production, and Sqlite, which is used for unit testing.

func NewDB

func NewDB(db *sql.DB, dialect SQLDialect) *DB

NewDB creates a DB

func NewFakeDb

func NewFakeDb() (*DB, error)

func NewFakeDbOrFatal

func NewFakeDbOrFatal() *DB

type DBStatusStore

type DBStatusStore struct {
	// contains filtered or unexported fields
}

Implementation of a DBStatusStoreInterface. This store read/write state of the database. For now we store status like whether sample is loaded.

func NewDBStatusStore

func NewDBStatusStore(db *DB) *DBStatusStore

factory function for database status store

func (*DBStatusStore) HaveSamplesLoaded

func (s *DBStatusStore) HaveSamplesLoaded() (bool, error)

func (*DBStatusStore) InitializeDBStatusTable

func (s *DBStatusStore) InitializeDBStatusTable() error

func (*DBStatusStore) MarkSampleLoaded

func (s *DBStatusStore) MarkSampleLoaded() error

type DBStatusStoreInterface

type DBStatusStoreInterface interface {
	HaveSamplesLoaded() (bool, error)
	MarkSampleLoaded() error
}

type DefaultExperimentStore

type DefaultExperimentStore struct {
	// contains filtered or unexported fields
}

Implementation of a DefaultExperimentStoreInterface. This stores the default experiment's ID, which is created the first time the API server is initialized.

func NewDefaultExperimentStore

func NewDefaultExperimentStore(db *DB) *DefaultExperimentStore

factory function for creating default experiment store

func (*DefaultExperimentStore) GetDefaultExperimentId

func (s *DefaultExperimentStore) GetDefaultExperimentId() (string, error)

func (*DefaultExperimentStore) SetDefaultExperimentId

func (s *DefaultExperimentStore) SetDefaultExperimentId(id string) error

func (*DefaultExperimentStore) UnsetDefaultExperimentIdIfIdMatches

func (s *DefaultExperimentStore) UnsetDefaultExperimentIdIfIdMatches(tx *sql.Tx, id string) error

Sets the default experiment ID stored in the DB to the empty string. This needs to happen if the experiment is deleted via the normal delete experiment API so that the server knows to create a new default. This is always done alongside the deletion of the actual experiment itself, so a transaction is needed as input. Update is used instead of delete so that we don't need to first check that the experiment ID is there.

type DefaultExperimentStoreInterface

type DefaultExperimentStoreInterface interface {
	GetDefaultExperimentId() (string, error)
	SetDefaultExperimentId(id string) error
}

type ExperimentStore

type ExperimentStore struct {
	// contains filtered or unexported fields
}

func NewExperimentStore

func NewExperimentStore(db *DB, time util.TimeInterface, uuid util.UUIDGeneratorInterface) *ExperimentStore

factory function for experiment store

func (*ExperimentStore) ArchiveExperiment

func (s *ExperimentStore) ArchiveExperiment(expId string) error

func (*ExperimentStore) CreateExperiment

func (s *ExperimentStore) CreateExperiment(experiment *model.Experiment) (*model.Experiment, error)

func (*ExperimentStore) DeleteExperiment

func (s *ExperimentStore) DeleteExperiment(id string) error

func (*ExperimentStore) GetExperiment

func (s *ExperimentStore) GetExperiment(uuid string) (*model.Experiment, error)

func (*ExperimentStore) ListExperiments

func (s *ExperimentStore) ListExperiments(filterContext *common.FilterContext, opts *list.Options) ([]*model.Experiment, int, string, error)

Runs two SQL queries in a transaction to return a list of matching experiments, as well as their total_size. The total_size does not reflect the page size.

func (*ExperimentStore) UnarchiveExperiment

func (s *ExperimentStore) UnarchiveExperiment(expId string) error

type ExperimentStoreInterface

type ExperimentStoreInterface interface {
	ListExperiments(filterContext *common.FilterContext, opts *list.Options) ([]*model.Experiment, int, string, error)
	GetExperiment(uuid string) (*model.Experiment, error)
	CreateExperiment(*model.Experiment) (*model.Experiment, error)
	DeleteExperiment(uuid string) error
	ArchiveExperiment(expId string) error
	UnarchiveExperiment(expId string) error
}

type FakeMinioClient

type FakeMinioClient struct {
	// contains filtered or unexported fields
}

func NewFakeMinioClient

func NewFakeMinioClient() *FakeMinioClient

func (*FakeMinioClient) DeleteObject

func (c *FakeMinioClient) DeleteObject(bucketName, objectName string) error

func (*FakeMinioClient) ExistObject

func (c *FakeMinioClient) ExistObject(objectName string) bool

func (*FakeMinioClient) GetObject

func (c *FakeMinioClient) GetObject(bucketName, objectName string,
	opts minio.GetObjectOptions) (io.Reader, error)

func (*FakeMinioClient) GetObjectCount

func (c *FakeMinioClient) GetObjectCount() int

func (*FakeMinioClient) PutObject

func (c *FakeMinioClient) PutObject(bucketName, objectName string, reader io.Reader,
	objectSize int64, opts minio.PutObjectOptions) (n int64, err error)

type JobStore

type JobStore struct {
	// contains filtered or unexported fields
}

func NewJobStore

func NewJobStore(db *DB, time util.TimeInterface) *JobStore

factory function for job store

func (*JobStore) CreateJob

func (s *JobStore) CreateJob(j *model.Job) (*model.Job, error)

func (*JobStore) DeleteJob

func (s *JobStore) DeleteJob(id string) error

func (*JobStore) EnableJob

func (s *JobStore) EnableJob(id string, enabled bool) error

func (*JobStore) GetJob

func (s *JobStore) GetJob(id string) (*model.Job, error)

func (*JobStore) ListJobs

func (s *JobStore) ListJobs(
	filterContext *common.FilterContext, opts *list.Options) ([]*model.Job, int, string, error)

Runs two SQL queries in a transaction to return a list of matching jobs, as well as their total_size. The total_size does not reflect the page size, but it does reflect the number of jobs matching the supplied filters and resource references.

func (*JobStore) UpdateJob

func (s *JobStore) UpdateJob(swf *util.ScheduledWorkflow) error

type JobStoreInterface

type JobStoreInterface interface {
	ListJobs(filterContext *common.FilterContext, opts *list.Options) ([]*model.Job, int, string, error)
	GetJob(id string) (*model.Job, error)
	CreateJob(*model.Job) (*model.Job, error)
	DeleteJob(id string) error
	EnableJob(id string, enabled bool) error
	UpdateJob(swf *util.ScheduledWorkflow) error
}

type MinioClient

type MinioClient struct {
	Client *minio.Client
}

func (*MinioClient) DeleteObject

func (c *MinioClient) DeleteObject(bucketName, objectName string) error

func (*MinioClient) GetObject

func (c *MinioClient) GetObject(bucketName, objectName string, opts minio.GetObjectOptions) (io.Reader, error)

func (*MinioClient) PutObject

func (c *MinioClient) PutObject(bucketName, objectName string, reader io.Reader, objectSize int64, opts minio.PutObjectOptions) (n int64, err error)

type MinioClientInterface

type MinioClientInterface interface {
	PutObject(bucketName, objectName string, reader io.Reader, objectSize int64, opts minio.PutObjectOptions) (n int64, err error)
	GetObject(bucketName, objectName string, opts minio.GetObjectOptions) (io.Reader, error)
	DeleteObject(bucketName, objectName string) error
}

Create interface for minio client struct, making it more unit testable.

type MinioObjectStore

type MinioObjectStore struct {
	// contains filtered or unexported fields
}

Managing pipeline using Minio

func NewMinioObjectStore

func NewMinioObjectStore(minioClient MinioClientInterface, bucketName string, baseFolder string, disableMultipart bool) *MinioObjectStore

func (*MinioObjectStore) AddAsYamlFile

func (m *MinioObjectStore) AddAsYamlFile(o interface{}, filePath string) error

func (*MinioObjectStore) AddFile

func (m *MinioObjectStore) AddFile(file []byte, filePath string) error

func (*MinioObjectStore) DeleteFile

func (m *MinioObjectStore) DeleteFile(filePath string) error

func (*MinioObjectStore) GetFile

func (m *MinioObjectStore) GetFile(filePath string) ([]byte, error)

func (*MinioObjectStore) GetFromYamlFile

func (m *MinioObjectStore) GetFromYamlFile(o interface{}, filePath string) error

func (*MinioObjectStore) GetPipelineKey

func (m *MinioObjectStore) GetPipelineKey(pipelineID string) string

GetPipelineKey adds the configured base folder to pipeline id.

type MySQLDialect

type MySQLDialect struct{}

MySQLDialect implements SQLDialect with mysql dialect implementation.

func NewMySQLDialect

func NewMySQLDialect() MySQLDialect

func (MySQLDialect) Concat

func (d MySQLDialect) Concat(exprs []string, separator string) string

func (MySQLDialect) GroupConcat

func (d MySQLDialect) GroupConcat(expr string, separator string) string

func (MySQLDialect) IsDuplicateError

func (d MySQLDialect) IsDuplicateError(err error) bool

func (MySQLDialect) SelectForUpdate

func (d MySQLDialect) SelectForUpdate(query string) string

type ObjectStoreInterface

type ObjectStoreInterface interface {
	AddFile(template []byte, filePath string) error
	DeleteFile(filePath string) error
	GetFile(filePath string) ([]byte, error)
	AddAsYamlFile(o interface{}, filePath string) error
	GetFromYamlFile(o interface{}, filePath string) error
	GetPipelineKey(pipelineId string) string
}

Interface for object store.

func NewFakeObjectStore

func NewFakeObjectStore() ObjectStoreInterface

Return the object store with faked minio client.

type PipelineStore

type PipelineStore struct {
	// contains filtered or unexported fields
}

func NewPipelineStore

func NewPipelineStore(db *DB, time util.TimeInterface, uuid util.UUIDGeneratorInterface) *PipelineStore

factory function for pipeline store

func (*PipelineStore) CreatePipeline

func (s *PipelineStore) CreatePipeline(p *model.Pipeline) (*model.Pipeline, error)

func (*PipelineStore) CreatePipelineVersion

func (s *PipelineStore) CreatePipelineVersion(v *model.PipelineVersion, updatePipelineDefaultVersion bool) (*model.PipelineVersion, error)

func (*PipelineStore) DeletePipeline

func (s *PipelineStore) DeletePipeline(id string) error

func (*PipelineStore) DeletePipelineVersion

func (s *PipelineStore) DeletePipelineVersion(versionId string) error

func (*PipelineStore) GetPipeline

func (s *PipelineStore) GetPipeline(id string) (*model.Pipeline, error)

func (*PipelineStore) GetPipelineVersion

func (s *PipelineStore) GetPipelineVersion(versionId string) (*model.PipelineVersion, error)

func (*PipelineStore) GetPipelineVersionWithStatus

func (s *PipelineStore) GetPipelineVersionWithStatus(versionId string, status model.PipelineVersionStatus) (*model.PipelineVersion, error)

func (*PipelineStore) GetPipelineWithStatus

func (s *PipelineStore) GetPipelineWithStatus(id string, status model.PipelineStatus) (*model.Pipeline, error)

func (*PipelineStore) ListPipelineVersions

func (s *PipelineStore) ListPipelineVersions(pipelineId string, opts *list.Options) ([]*model.PipelineVersion, int, string, error)

func (*PipelineStore) ListPipelines

func (s *PipelineStore) ListPipelines(filterContext *common.FilterContext, opts *list.Options) ([]*model.Pipeline, int, string, error)

Runs two SQL queries in a transaction to return a list of matching pipelines, as well as their total_size. The total_size does not reflect the page size.

func (*PipelineStore) SetUUIDGenerator

func (s *PipelineStore) SetUUIDGenerator(new_uuid util.UUIDGeneratorInterface)

SetUUIDGenerator is for unit tests in other packages who need to set uuid, since uuid is not exported.

func (*PipelineStore) UpdatePipelineAndVersionsStatus

func (s *PipelineStore) UpdatePipelineAndVersionsStatus(id string, status model.PipelineStatus, pipelineVersionId string, pipelineVersionStatus model.PipelineVersionStatus) error

func (*PipelineStore) UpdatePipelineDefaultVersion

func (s *PipelineStore) UpdatePipelineDefaultVersion(pipelineId string, versionId string) error

func (*PipelineStore) UpdatePipelineStatus

func (s *PipelineStore) UpdatePipelineStatus(id string, status model.PipelineStatus) error

func (*PipelineStore) UpdatePipelineVersionStatus

func (s *PipelineStore) UpdatePipelineVersionStatus(id string, status model.PipelineVersionStatus) error

type PipelineStoreInterface

type PipelineStoreInterface interface {
	ListPipelines(filterContext *common.FilterContext, opts *list.Options) ([]*model.Pipeline, int, string, error)
	GetPipeline(pipelineId string) (*model.Pipeline, error)
	GetPipelineWithStatus(id string, status model.PipelineStatus) (*model.Pipeline, error)
	DeletePipeline(pipelineId string) error
	CreatePipeline(*model.Pipeline) (*model.Pipeline, error)
	UpdatePipelineStatus(string, model.PipelineStatus) error
	UpdatePipelineDefaultVersion(string, string) error

	CreatePipelineVersion(*model.PipelineVersion, bool) (*model.PipelineVersion, error)
	GetPipelineVersion(versionId string) (*model.PipelineVersion, error)
	GetPipelineVersionWithStatus(versionId string, status model.PipelineVersionStatus) (*model.PipelineVersion, error)
	ListPipelineVersions(pipelineId string, opts *list.Options) ([]*model.PipelineVersion, int, string, error)
	DeletePipelineVersion(pipelineVersionId string) error
	// Change status of a particular version.
	UpdatePipelineVersionStatus(pipelineVersionId string, status model.PipelineVersionStatus) error
	// TODO(jingzhang36): remove this temporary method after resource manager's
	// CreatePipeline stops using it.
	UpdatePipelineAndVersionsStatus(id string, status model.PipelineStatus, pipelineVersionId string, pipelineVersionStatus model.PipelineVersionStatus) error
}

type ResourceReferenceStore

type ResourceReferenceStore struct {
	// contains filtered or unexported fields
}

func NewResourceReferenceStore

func NewResourceReferenceStore(db *DB) *ResourceReferenceStore

func (*ResourceReferenceStore) CreateResourceReferences

func (s *ResourceReferenceStore) CreateResourceReferences(tx *sql.Tx, refs []*model.ResourceReference) error

Create a resource reference. This is always in company with creating a parent resource so a transaction is needed as input.

func (*ResourceReferenceStore) DeleteResourceReferences

func (s *ResourceReferenceStore) DeleteResourceReferences(tx *sql.Tx, id string, resourceType common.ResourceType) error

Delete all resource references for a specific resource. This is always in company with creating a parent resource so a transaction is needed as input.

func (*ResourceReferenceStore) GetResourceReference

func (s *ResourceReferenceStore) GetResourceReference(resourceId string, resourceType common.ResourceType,
	referenceType common.ResourceType) (*model.ResourceReference, error)

type ResourceReferenceStoreInterface

type ResourceReferenceStoreInterface interface {
	// Retrieve the resource reference for a given resource id, type and a reference type.
	GetResourceReference(resourceId string, resourceType common.ResourceType,
		referenceType common.ResourceType) (*model.ResourceReference, error)
}

type RunStore

type RunStore struct {
	// contains filtered or unexported fields
}

func NewRunStore

func NewRunStore(db *DB, time util.TimeInterface) *RunStore

NewRunStore creates a new RunStore.

func (*RunStore) AddSortByRunMetricToSelect

func (s *RunStore) AddSortByRunMetricToSelect(sqlBuilder sq.SelectBuilder, opts *list.Options) sq.SelectBuilder

Add a metric as a new field to the select clause by join the passed-in SQL query with run_metrics table. With the metric as a field in the select clause enable sorting on this metric afterwards. TODO(jingzhang36): example of resulting SQL query and explanation for it.

func (*RunStore) ArchiveRun

func (s *RunStore) ArchiveRun(runId string) error

func (*RunStore) CreateOrUpdateRun

func (s *RunStore) CreateOrUpdateRun(runDetail *model.RunDetail) error

func (*RunStore) CreateRun

func (s *RunStore) CreateRun(r *model.RunDetail) (*model.RunDetail, error)

func (*RunStore) DeleteRun

func (s *RunStore) DeleteRun(id string) error

func (*RunStore) GetRun

func (s *RunStore) GetRun(runId string) (*model.RunDetail, error)

GetRun Get the run manifest from Workflow CRD

func (*RunStore) ListRuns

func (s *RunStore) ListRuns(
	filterContext *common.FilterContext, opts *list.Options) ([]*model.Run, int, string, error)

Runs two SQL queries in a transaction to return a list of matching runs, as well as their total_size. The total_size does not reflect the page size, but it does reflect the number of runs matching the supplied filters and resource references.

func (*RunStore) ReportMetric

func (s *RunStore) ReportMetric(metric *model.RunMetric) (err error)

ReportMetric inserts a new metric to run_metrics table. Conflicting metrics are ignored.

func (*RunStore) TerminateRun

func (s *RunStore) TerminateRun(runId string) error

func (*RunStore) UnarchiveRun

func (s *RunStore) UnarchiveRun(runId string) error

func (*RunStore) UpdateRun

func (s *RunStore) UpdateRun(runID string, condition string, finishedAtInSec int64, workflowRuntimeManifest string) (err error)

type RunStoreInterface

type RunStoreInterface interface {
	GetRun(runId string) (*model.RunDetail, error)

	ListRuns(filterContext *common.FilterContext, opts *list.Options) ([]*model.Run, int, string, error)

	// Create a run entry in the database
	CreateRun(run *model.RunDetail) (*model.RunDetail, error)

	// Update run table. Only condition and runtime manifest is allowed to be updated.
	UpdateRun(id string, condition string, finishedAtInSec int64, workflowRuntimeManifest string) (err error)

	// Archive a run
	ArchiveRun(id string) error

	// Unarchive a run
	UnarchiveRun(id string) error

	// Delete a run entry from the database
	DeleteRun(id string) error

	// Update the run table or create one if the run doesn't exist
	CreateOrUpdateRun(run *model.RunDetail) error

	// Store a new metric entry to run_metrics table.
	ReportMetric(metric *model.RunMetric) (err error)

	// Terminate a run
	TerminateRun(runId string) error
}

type SQLDialect

type SQLDialect interface {
	// GroupConcat builds query to group concatenate `expr` in each row and use `separator`
	// to join rows in a group.
	GroupConcat(expr string, separator string) string

	// Concat builds query to concatenete a list of `exprs` into a single string with
	// a separator in between.
	Concat(exprs []string, separator string) string

	// Check whether the error is a SQL duplicate entry error or not
	IsDuplicateError(err error) bool

	// Modifies the SELECT clause in query to return one that locks the selected
	// row for update.
	SelectForUpdate(query string) string
}

SQLDialect abstracts common sql queries which vary in different dialect. It is used to bridge the difference between mysql (production) and sqlite (test).

type SQLiteDialect

type SQLiteDialect struct{}

SQLiteDialect implements SQLDialect with sqlite dialect implementation.

func NewSQLiteDialect

func NewSQLiteDialect() SQLiteDialect

func (SQLiteDialect) Concat

func (d SQLiteDialect) Concat(exprs []string, separator string) string

func (SQLiteDialect) GroupConcat

func (d SQLiteDialect) GroupConcat(expr string, separator string) string

func (SQLiteDialect) IsDuplicateError

func (d SQLiteDialect) IsDuplicateError(err error) bool

func (SQLiteDialect) SelectForUpdate

func (d SQLiteDialect) SelectForUpdate(query string) string

type TaskStore

type TaskStore struct {
	// contains filtered or unexported fields
}

func NewTaskStore

func NewTaskStore(db *DB, time util.TimeInterface, uuid util.UUIDGeneratorInterface) *TaskStore

NewTaskStore creates a new TaskStore.

func (*TaskStore) CreateTask

func (s *TaskStore) CreateTask(task *model.Task) (*model.Task, error)

func (*TaskStore) GetTask

func (s *TaskStore) GetTask(id string) (*model.Task, error)

func (*TaskStore) ListTasks

func (s *TaskStore) ListTasks(filterContext *common.FilterContext, opts *list.Options) ([]*model.Task, int, string, error)

Runs two SQL queries in a transaction to return a list of matching experiments, as well as their total_size. The total_size does not reflect the page size.

type TaskStoreInterface

type TaskStoreInterface interface {
	// Create a task entry in the database
	CreateTask(task *model.Task) (*model.Task, error)

	ListTasks(filterContext *common.FilterContext, opts *list.Options) ([]*model.Task, int, string, error)

	GetTask(id string) (*model.Task, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL