model

package
v0.0.0-...-dec25df Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2020 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Overview

Package model is the core of Maestro. Model interfaces with all the other packages and a *Model (conventionally *m) is frequently passed around across other packages.

The model also defines core structures, many of which are persisted in the Maestro database.

Index

Constants

View Source
const (
	ImpNone importStatus = iota // 0
	ImpQueued
	ImpRunning
	ImpDone
	ImpError
)

Import status - phases of the database import.

Variables

This section is empty.

Functions

This section is empty.

Types

type BQConf

type BQConf struct {
	ProjectId string `db:"project_id"`     // Project Id
	Email     string `db:"email"`          // Email (authentication)
	KeyId     string `db:"private_key_id"` // Key (authentication)
	CryptKey  string `db:"key"`            // Encrypted Key
	GcsBucket string `db:"gcs_bucket"`     // GCS bucket to be used for exports
	PlainKey  string `db:"-"`              // Plain Key (not stored in db)
}

Configuration necessary to connect to BigQuery.

type BQJob

type BQJob struct {
	Id                  int64      `db:"id"`
	CreatedAt           time.Time  `db:"created_at"`
	TableId             int64      `db:"table_id"`         // Corresponding table
	UserId              *int64     `db:"user_id"`          // User who triggered the job or nil (if it was system-triggered)
	RunId               *int64     `db:"run_id"`           // Run in which this job ran or nil
	Parents             *string    `db:"parents"`          // List of parents for this table
	BQJobId             string     `db:"bq_job_id"`        // Id returned by BigQuery
	Type                string     `db:"type"`             // Type of job: query, extract, load
	Configuration       *string    `db:"configuration"`    // BigQuery Job configuration (as a JSON string)
	Status              *string    `db:"status"`           // BigQuery Job status (as a JSON string)
	QueryStats          *string    `db:"query_stats"`      // Excerpt from Statistics returned by BQ (JSON)
	LoadStats           *string    `db:"load_stats"`       // Excerpt from Statistics returned by BQ (JSON)
	ExtractStats        *string    `db:"extract_stats"`    // Excerpt from Statistics returned by BQ (JSON)
	DestinationUrls     *string    `db:"destination_urls"` // For GCS extracts
	CreationTime        *time.Time `db:"creation_time"`    // BQ-specific
	StartTime           *time.Time `db:"start_time"`       // BQ-specific
	EndTime             *time.Time `db:"end_time"`         // BQ-specific
	TotalBytesProcessed int64      `db:"total_bytes_processed"`
	TotalBytesBilled    int64      `db:"total_bytes_billed"`
	ImportBegin         *time.Time `db:"import_begin"`
	ImportEnd           *time.Time `db:"import_end"`
	ImportBytes         int64      `db:"import_bytes"`
	ImportRows          int64      `db:"import_rows"`
	// contains filtered or unexported fields
}

Representation of a BigQuery job. BQJob may seem to replicate much of the data that exists in Table but this is intentional. The table may change after the job and it is important to preserve all the information in the table as it was at the moment of job creation.

A BQJob is created before submitting the job to BigQuery. Between the time it is created and submitted to BQ (which can be a while during a run, where all jobs are created upfront) it has no BQJobId. In a rare but not uncommon case BQ responds with "Retrying may solve the problem" error, when this happens the submission will be retried thereby altering the BQJobId (the old one is forgotten, though you can see it in the logs).

BQJobs are essential to Runs, because Runs are initially constructed as a collection of BQJobs, which are then submitted in the correct order based on dependencies and priorities.

func (*BQJob) GetName

func (j *BQJob) GetName() string

Satisfies scheduler.Item.

func (*BQJob) GetStatus

func (j *BQJob) GetStatus() (status, error string, err error)

Return the status of the table.

func (*BQJob) SignedExtractURLs

func (j *BQJob) SignedExtractURLs(m *Model) ([]string, error)

Generate GCS URLs that are signed (i.e. require no authentication).

func (*BQJob) TableReference

func (j *BQJob) TableReference() (*bigquery.TableReference, error)

Return a BQ TableReference.

type Dataset

type Dataset struct {
	Id      int64
	Dataset string
}

BigQuery Dataset.

type Db

type Db struct {
	Id          int64  `db:"id"`
	Name        string `db:"name"`        // Symbolic name
	DatasetId   int64  `db:"dataset_id"`  // Data will be imported into this dataset
	Dataset     string `db:"dataset"`     // (from datasets table)
	Driver      string `db:"driver"`      // postgres or mysql
	ConnectStr  string `db:"connect_str"` // Connect string
	CryptSecret string `db:"secret"`      // Encrypted secret
	Export      bool   `db:"export"`      // Is this an export database?
}

Configuration necessary to connect to import/export databases.

type Freq

type Freq struct {
	Id     int64
	Name   string        // E.g. "daily", "hourly", "15-minute"
	Period time.Duration // Periods are aligned on zero time
	Offset time.Duration // Offset into the period
	Active bool          // If false, this frequency will not run
}

Frequency (cadence) of periodic runs.

type GitConf

type GitConf struct {
	Url        string `db:"url"`   // Repo URL (https://github.com/org/repo)
	CryptToken string `db:"token"` // GitHub token (encrypted)
	PlainToken string `db:"-"`     // Decrypted token
}

Configuration necessary for Git access.

type Group

type Group struct {
	Id          int64
	Name        string `db:"name"`          // Symbolic name
	AdminUserId int64  `db:"admin_user_id"` // Admin of this group
}

Group information.

type GroupArray

type GroupArray []*Group

This type alias allows us to use StructScan with User.

func (*GroupArray) Scan

func (g *GroupArray) Scan(src interface{}) error

type Model

type Model struct {
	// contains filtered or unexported fields
}

Everything Maestro does is done via the Model. It contains refrences to all other things, such as database, BigQuery, external databases, etc, etc.

func New

func New(db db, domain string, bq *bq.BigQuery, gcs *gcs.GCS, gsh *gsheets.GSheets, gt *git.Git, slk *slack.Config) *Model

Create a new Model. Db is an implementation of the db interface, domain is the domain to which OAuth access will be restricted, bq, gcs, gsh, gt and slk are instances of structs docuemnted in corresponding packages.

func (*Model) DeleteImportStatus

func (m *Model) DeleteImportStatus(id int64)

Delete import status for table id.

func (*Model) DryRunTable

func (m *Model) DryRunTable(t *Table, userId *int64) error

DryRunTable is a good way to check syntax. This API call does not create a job, but returns immediately.

func (*Model) ExportTable

func (m *Model) ExportTable(t *Table) error

Export a BigQuery table to an external database. The export is done via GCS, which is much faster than the BigQuery paging API.

func (*Model) ExtractTableToGCS

func (m *Model) ExtractTableToGCS(t *Table, userId *int64, runId *int64) (string, error)

Extract table to GCS. The userId is of the user initiating the action, can be nil if this is a part of a run. The runId can also be nil if this is not part of a run. Returns the BigQuery job id.

func (*Model) ExtractTableToSheet

func (m *Model) ExtractTableToSheet(t *Table) error

Extract table to a Google Sheet. This extract uses the BigQuery paging API, so this only works well for small tables. If the spreadsheet already exists, the extract will be pre-pended as the first sheet (tab), and trailing sheets after a certain number (configurable in the gsheets package) are deleted.

func (*Model) GetImportStatus

func (m *Model) GetImportStatus(id int64) importStatus

Return import status for table id.

func (*Model) ReimportTable

func (m *Model) ReimportTable(t *Table, userId *int64, runId *int64) (err error)

func (*Model) RunTable

func (m *Model) RunTable(t *Table, userId *int64) (jobId string, err error)

Run a table, userId is who initiated the run (not always same as table owner). userId may be nil if this is an action triggered by Maestro, e.g. a periodic run.

func (*Model) SetAllowedDomain

func (m *Model) SetAllowedDomain(domain string)

Set the domain to which OAuth is restricted.

func (*Model) SlackAlert

func (m *Model) SlackAlert(msg string) error

Generate a Slack alert (provided Slack is configured).

func (*Model) Stop

func (m *Model) Stop()

Gracefully stop Maestro, waiting for things that we can wait for to finish. Database imports are not waited on, those transaction will be rolled back. We also do not wait for any BigQuery jobs, they just carry on and their status will be checked when Maestro is started next.

func (*Model) TablesParents

func (m *Model) TablesParents(children []*Table) (map[string][]*Table, error)

Provide parents for every child in the children map. This is more efficient if we need to extract parents from a whole list of tables.

func (*Model) ValidUser

func (m *Model) ValidUser(oAuthId, email string) int64

Check wether the user is valid based on email. If the user does not exist, it is created as disabled and a slack notification is sent. Email can be blank, in which case only the oAuthId is checked and the user cannot be created.

The very first user ever created (id == 1) is automatically made admin and not disabled.

type Notification

type Notification struct {
	Id             int64
	TableId        int64     `db:"table_id"`         // Table
	BqJobId        int64     `db:"bq_job_id"`        // BigQuery job id of the GCS extract
	CreatedAt      time.Time `db:"created_at"`       // Time of notification
	DurationMs     int64     `db:"duration_ms"`      // Duration of the HTTP request/response
	Error          *string   `db:"error"`            // HTTP error, if any
	Url            string    `db:"url"`              // Target URL
	Method         string    `db:"method"`           // HTTP method used (always POST)
	Body           string    `db:"body"`             // Request body
	RespStatusCode int       `db:"resp_status_code"` // Response status code
	RespStatus     string    `db:"resp_status"`      // Response status string
	RespHeaders    string    `db:"resp_headers"`     // Response headers
	RespBody       string    `db:"resp_body"`        // Response body
}

Record of HTTP notification which optionally happens after a table's GCS extract is complete.

type OAuthConf

type OAuthConf struct {
	ClientId          string `db:"client_id"`
	AllowedDomain     string `db:"allowed_domain"`
	Redirect          string `db:"redirect"`
	CryptSecret       string `db:"secret"`
	PlainSecret       string `db:"-"`
	CryptCookieSecret string `db:"cookie_secret"`
	PlainCookieSecret string `db:"-"`
}

Configuration necessary for Google OAuth

type Run

type Run struct {
	Id         int64
	UserId     *int64     `db:"user_id"`     // Non nil only if this is a UI-triggered un
	CreatedAt  time.Time  `db:"created_at"`  // Time the run was created
	StartTime  *time.Time `db:"start_time"`  // Time the run started (as opposed to created)
	EndTime    *time.Time `db:"end_time"`    // Time the run ended (even if in failure)
	FreqId     int64      `db:"freq_id"`     // The frequency associated with the run
	FreqName   string     `db:"freq_name"`   // This comes from freqs, so don't count on it being there
	TotalBytes int64      `db:"total_bytes"` // This comes from SUM() on bq_jobs, same warning as ^
	Error      *string    `db:"error"`       // Text of the error that failed the run (redundant - same error would exist in the table)
	// contains filtered or unexported fields
}

Run is a series of tables triggered by Maestro and associated with a Frequency.

A Run is created with a NewRun() (which merely inits the datastructure), followed by Assemble(). Assemble selects all the tables associated with the Run's frequency and constructs BQJob configurations for each table and inserts those into the database. This collection of BQJobs is the execution plan for this Run and is self-sufficient, reflecting the tables as they were at the time of Assemble. This means that the tables can change afterwards, but it will not affect an already assembled Run.

The Run execution is commenced via Start(), which repeatedly calls processCycle, which, in turn, constructs a DAG of the jobs, traverses and submits BQ jobs that can execute. At this point a BQJob will have a BQJobId which is the indicator that it has been submitted. If a job encounters errors, processCycle fails and with it fails the entire Run, requiring manual intervention afterwards.

A failed Run can be resumed with Resume(). All that Resume does is continue the cycling process, but this time counting errors. If the count of errors exceeds the count at the beginning of the Run, the Run fails again.

func NewRun

func NewRun(freqId int64, userId *int64) *Run

Return a new Run. (Does not write to the database).

func (*Run) Assemble

func (r *Run) Assemble(m *Model, now time.Time) error

Write the run to the database and create and save all the jobs that belong to it, as well as figure out the parent/child relationship in the process.

func (*Run) Graph

func (r *Run) Graph(m *Model) (scheduler.Graph, error)

Construct and return the graph (DAG) of this run (for UI).

func (*Run) Resume

func (r *Run) Resume(m *Model) error

Resume a failed run. The run will continue *after* the failed table, presumably that table would be fixed and ran manually.

func (*Run) Start

func (r *Run) Start(m *Model) error

type SlackConf

type SlackConf struct {
	Url       string `db:"url"`        // Slack hook URL
	UserName  string `db:"username"`   // Username that will show in slack, e.g. "maestro"
	Channel   string `db:"channel"`    // The channel to which to post
	IconEmoji string `db:"iconemoji"`  // An emoji that will show next to user name, e.g. ":violin:"
	UrlPrefix string `db:"url_prefix"` // URL to this instance of Maestro (to make messages clickable)
}

Configuration necessary for Slack posts

type Table

type Table struct {
	Id               int64
	UserId           int64                  // This user created the table
	GroupId          int64                  // Group to which this table belongs
	Email            string                 // from joined users table
	Name             string                 // Table name (without dataset)
	Dataset          string                 // from joined datasets table
	DatasetId        int64                  // The dataset
	Query            string                 // The SQL
	Disposition      string                 // e.g. WRITE_TRUNCATE or WRITE_APPEND
	Partitioned      bool                   // BigQuery DAY partitioning
	LegacySQL        bool                   // Standard SQL if false
	Description      string                 // Some descriptive text. TODO: make it searchable?
	Error            string                 // If the last run ended in error
	Running          bool                   // True if the table is currently running
	Extract          bool                   // Make a GCS extract
	NotifyExtractUrl string                 // The URL to notify upon extract completion
	SheetsExtract    bool                   // Make a sheets extract
	SheetId          string                 // The sheet id
	ExportDbId       int64                  // Export this table to this database
	ExportTableName  string                 // Use this as table name when exporting
	FreqId           int64                  // Run this table periodically
	Conditions       []*scheduler.Condition // Run only when this condition is satisfied
	ExternalTmout    int64                  // External table only: timeout
	ExternalFormat   string                 // CSV or NEWLINE_DELIMITED_JSON
	ImportDbId       int64                  // Import from this database
	ImportedAt       time.Time              // Last import time
	IdColumn         string                 // Column for incremental imports
	ReimportCond     []*scheduler.Condition // Reimport if this condition satisfied and imported_at does not satisfy it
	LastId           string                 // Last value of IdColumn for incremental imports

	CreatedAt      time.Time
	DeletedAt      time.Time
	LastOkRunEndAt time.Time // Used by pythonlib
	// contains filtered or unexported fields
}

All the information about a Table. Note the absence of "db" tags, this struct is stored in the database by way of an intermediate internal struct, see the db package.

func (*Table) ExternalLoad

func (t *Table) ExternalLoad(m *Model, fname string, userId *int64) (err error)

Load table from external data. fname is the GCS file name (without bucket), userId is the user triggering this action, if any.

func (*Table) GetBQInfo

func (t *Table) GetBQInfo(m *Model) (*bigquery.Table, error)

Get table information from the BigQuery API.

func (*Table) GetName

func (t *Table) GetName() string

Satisfy scheduler.Item, used when building the DAG.

func (*Table) GithubUrl

func (t *Table) GithubUrl(m *Model) string

Construct a valid GitHub URL to this particular table

func (*Table) ImportDataset

func (t *Table) ImportDataset(m *Model) (string, error)

Return the dataset associated with the import database.

func (*Table) IsExternal

func (t *Table) IsExternal() bool

func (*Table) IsImport

func (t *Table) IsImport() bool

func (*Table) LastExtractJob

func (t *Table) LastExtractJob(m *Model) (*BQJob, error)

Return the latest extract BQJob. This is where the GCS URLs come from.

func (*Table) QueueGitCommit

func (t *Table) QueueGitCommit(m *Model, email string) error

Queue up a git commit, which will eventually happen in a separate goroutine. The tables are stored in Git by Marshaling the Table struct as YAML.

func (*Table) SignedUploadURL

func (t *Table) SignedUploadURL(m *Model) string

Create a signed PUT-only GCS URL (for external table).

type User

type User struct {
	Id         int64
	OAuthId    string `db:"oauth_id"`  // Id issued by OAuth
	Email      string `db:"email"`     // User email (also retreived from OAuth)
	Disabled   bool   `db:"disabled"`  // If true, the used can never log in
	Admin      bool   `db:"admin"`     // Admins can do anything
	CryptToken string `db:"api_token"` // The token for pythonlib (or other direct API access)
	PlainToken []byte `db:"-"`         //
}

User information.

type UserWithGroups

type UserWithGroups struct {
	User
	Groups GroupArray `db:"groups"` // Groups this user is in (comes from user_groups table join)
}

User information with groups

func (*UserWithGroups) GenerateAndSaveToken

func (u *UserWithGroups) GenerateAndSaveToken(m *Model) error

Generate and save an API access token. Such a token allowes access to some Maestro API's without other authentication, the generated token should be kept securely.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL