Documentation ¶
Overview ¶
Package model is the core of Maestro. Model interfaces with all the other packages and a *Model (conventionally *m) is frequently passed around across other packages.
The model also defines core structures, many of which are persisted in the Maestro database.
Index ¶
- Constants
- type BQConf
- type BQJob
- type Dataset
- type Db
- type Freq
- type GitConf
- type Group
- type GroupArray
- type Model
- func (m *Model) DeleteImportStatus(id int64)
- func (m *Model) DryRunTable(t *Table, userId *int64) error
- func (m *Model) ExportTable(t *Table) error
- func (m *Model) ExtractTableToGCS(t *Table, userId *int64, runId *int64) (string, error)
- func (m *Model) ExtractTableToSheet(t *Table) error
- func (m *Model) GetImportStatus(id int64) importStatus
- func (m *Model) ReimportTable(t *Table, userId *int64, runId *int64) (err error)
- func (m *Model) RunTable(t *Table, userId *int64) (jobId string, err error)
- func (m *Model) SetAllowedDomain(domain string)
- func (m *Model) SlackAlert(msg string) error
- func (m *Model) Stop()
- func (m *Model) TablesParents(children []*Table) (map[string][]*Table, error)
- func (m *Model) ValidUser(oAuthId, email string) int64
- type Notification
- type OAuthConf
- type Run
- type SlackConf
- type Table
- func (t *Table) ExternalLoad(m *Model, fname string, userId *int64) (err error)
- func (t *Table) GetBQInfo(m *Model) (*bigquery.Table, error)
- func (t *Table) GetName() string
- func (t *Table) GithubUrl(m *Model) string
- func (t *Table) ImportDataset(m *Model) (string, error)
- func (t *Table) IsExternal() bool
- func (t *Table) IsImport() bool
- func (t *Table) LastExtractJob(m *Model) (*BQJob, error)
- func (t *Table) QueueGitCommit(m *Model, email string) error
- func (t *Table) SignedUploadURL(m *Model) string
- type User
- type UserWithGroups
Constants ¶
const ( ImpNone importStatus = iota // 0 ImpQueued ImpRunning ImpDone ImpError )
Import status - phases of the database import.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BQConf ¶
type BQConf struct { ProjectId string `db:"project_id"` // Project Id Email string `db:"email"` // Email (authentication) KeyId string `db:"private_key_id"` // Key (authentication) CryptKey string `db:"key"` // Encrypted Key GcsBucket string `db:"gcs_bucket"` // GCS bucket to be used for exports PlainKey string `db:"-"` // Plain Key (not stored in db) }
Configuration necessary to connect to BigQuery.
type BQJob ¶
type BQJob struct { Id int64 `db:"id"` CreatedAt time.Time `db:"created_at"` TableId int64 `db:"table_id"` // Corresponding table UserId *int64 `db:"user_id"` // User who triggered the job or nil (if it was system-triggered) RunId *int64 `db:"run_id"` // Run in which this job ran or nil Parents *string `db:"parents"` // List of parents for this table BQJobId string `db:"bq_job_id"` // Id returned by BigQuery Type string `db:"type"` // Type of job: query, extract, load Configuration *string `db:"configuration"` // BigQuery Job configuration (as a JSON string) Status *string `db:"status"` // BigQuery Job status (as a JSON string) QueryStats *string `db:"query_stats"` // Excerpt from Statistics returned by BQ (JSON) LoadStats *string `db:"load_stats"` // Excerpt from Statistics returned by BQ (JSON) ExtractStats *string `db:"extract_stats"` // Excerpt from Statistics returned by BQ (JSON) DestinationUrls *string `db:"destination_urls"` // For GCS extracts CreationTime *time.Time `db:"creation_time"` // BQ-specific StartTime *time.Time `db:"start_time"` // BQ-specific EndTime *time.Time `db:"end_time"` // BQ-specific TotalBytesProcessed int64 `db:"total_bytes_processed"` TotalBytesBilled int64 `db:"total_bytes_billed"` ImportBegin *time.Time `db:"import_begin"` ImportEnd *time.Time `db:"import_end"` ImportBytes int64 `db:"import_bytes"` ImportRows int64 `db:"import_rows"` // contains filtered or unexported fields }
Representation of a BigQuery job. BQJob may seem to replicate much of the data that exists in Table but this is intentional. The table may change after the job and it is important to preserve all the information in the table as it was at the moment of job creation.
A BQJob is created before submitting the job to BigQuery. Between the time it is created and submitted to BQ (which can be a while during a run, where all jobs are created upfront) it has no BQJobId. In a rare but not uncommon case BQ responds with "Retrying may solve the problem" error, when this happens the submission will be retried thereby altering the BQJobId (the old one is forgotten, though you can see it in the logs).
BQJobs are essential to Runs, because Runs are initially constructed as a collection of BQJobs, which are then submitted in the correct order based on dependencies and priorities.
func (*BQJob) SignedExtractURLs ¶
Generate GCS URLs that are signed (i.e. require no authentication).
func (*BQJob) TableReference ¶
func (j *BQJob) TableReference() (*bigquery.TableReference, error)
Return a BQ TableReference.
type Db ¶
type Db struct { Id int64 `db:"id"` Name string `db:"name"` // Symbolic name DatasetId int64 `db:"dataset_id"` // Data will be imported into this dataset Dataset string `db:"dataset"` // (from datasets table) Driver string `db:"driver"` // postgres or mysql ConnectStr string `db:"connect_str"` // Connect string CryptSecret string `db:"secret"` // Encrypted secret Export bool `db:"export"` // Is this an export database? }
Configuration necessary to connect to import/export databases.
type Freq ¶
type Freq struct { Id int64 Name string // E.g. "daily", "hourly", "15-minute" Period time.Duration // Periods are aligned on zero time Offset time.Duration // Offset into the period Active bool // If false, this frequency will not run }
Frequency (cadence) of periodic runs.
type GitConf ¶
type GitConf struct { Url string `db:"url"` // Repo URL (https://github.com/org/repo) CryptToken string `db:"token"` // GitHub token (encrypted) PlainToken string `db:"-"` // Decrypted token }
Configuration necessary for Git access.
type Group ¶
type Group struct { Id int64 Name string `db:"name"` // Symbolic name AdminUserId int64 `db:"admin_user_id"` // Admin of this group }
Group information.
type GroupArray ¶
type GroupArray []*Group
This type alias allows us to use StructScan with User.
func (*GroupArray) Scan ¶
func (g *GroupArray) Scan(src interface{}) error
type Model ¶
type Model struct {
// contains filtered or unexported fields
}
Everything Maestro does is done via the Model. It contains refrences to all other things, such as database, BigQuery, external databases, etc, etc.
func New ¶
func New(db db, domain string, bq *bq.BigQuery, gcs *gcs.GCS, gsh *gsheets.GSheets, gt *git.Git, slk *slack.Config) *Model
Create a new Model. Db is an implementation of the db interface, domain is the domain to which OAuth access will be restricted, bq, gcs, gsh, gt and slk are instances of structs docuemnted in corresponding packages.
func (*Model) DeleteImportStatus ¶
Delete import status for table id.
func (*Model) DryRunTable ¶
DryRunTable is a good way to check syntax. This API call does not create a job, but returns immediately.
func (*Model) ExportTable ¶
Export a BigQuery table to an external database. The export is done via GCS, which is much faster than the BigQuery paging API.
func (*Model) ExtractTableToGCS ¶
Extract table to GCS. The userId is of the user initiating the action, can be nil if this is a part of a run. The runId can also be nil if this is not part of a run. Returns the BigQuery job id.
func (*Model) ExtractTableToSheet ¶
Extract table to a Google Sheet. This extract uses the BigQuery paging API, so this only works well for small tables. If the spreadsheet already exists, the extract will be pre-pended as the first sheet (tab), and trailing sheets after a certain number (configurable in the gsheets package) are deleted.
func (*Model) GetImportStatus ¶
Return import status for table id.
func (*Model) ReimportTable ¶
func (*Model) RunTable ¶
Run a table, userId is who initiated the run (not always same as table owner). userId may be nil if this is an action triggered by Maestro, e.g. a periodic run.
func (*Model) SetAllowedDomain ¶
Set the domain to which OAuth is restricted.
func (*Model) SlackAlert ¶
Generate a Slack alert (provided Slack is configured).
func (*Model) Stop ¶
func (m *Model) Stop()
Gracefully stop Maestro, waiting for things that we can wait for to finish. Database imports are not waited on, those transaction will be rolled back. We also do not wait for any BigQuery jobs, they just carry on and their status will be checked when Maestro is started next.
func (*Model) TablesParents ¶
Provide parents for every child in the children map. This is more efficient if we need to extract parents from a whole list of tables.
func (*Model) ValidUser ¶
Check wether the user is valid based on email. If the user does not exist, it is created as disabled and a slack notification is sent. Email can be blank, in which case only the oAuthId is checked and the user cannot be created.
The very first user ever created (id == 1) is automatically made admin and not disabled.
type Notification ¶
type Notification struct { Id int64 TableId int64 `db:"table_id"` // Table BqJobId int64 `db:"bq_job_id"` // BigQuery job id of the GCS extract CreatedAt time.Time `db:"created_at"` // Time of notification DurationMs int64 `db:"duration_ms"` // Duration of the HTTP request/response Error *string `db:"error"` // HTTP error, if any Url string `db:"url"` // Target URL Method string `db:"method"` // HTTP method used (always POST) Body string `db:"body"` // Request body RespStatusCode int `db:"resp_status_code"` // Response status code RespStatus string `db:"resp_status"` // Response status string RespHeaders string `db:"resp_headers"` // Response headers RespBody string `db:"resp_body"` // Response body }
Record of HTTP notification which optionally happens after a table's GCS extract is complete.
type OAuthConf ¶
type OAuthConf struct { ClientId string `db:"client_id"` AllowedDomain string `db:"allowed_domain"` Redirect string `db:"redirect"` CryptSecret string `db:"secret"` PlainSecret string `db:"-"` CryptCookieSecret string `db:"cookie_secret"` PlainCookieSecret string `db:"-"` }
Configuration necessary for Google OAuth
type Run ¶
type Run struct { Id int64 UserId *int64 `db:"user_id"` // Non nil only if this is a UI-triggered un CreatedAt time.Time `db:"created_at"` // Time the run was created StartTime *time.Time `db:"start_time"` // Time the run started (as opposed to created) EndTime *time.Time `db:"end_time"` // Time the run ended (even if in failure) FreqId int64 `db:"freq_id"` // The frequency associated with the run FreqName string `db:"freq_name"` // This comes from freqs, so don't count on it being there TotalBytes int64 `db:"total_bytes"` // This comes from SUM() on bq_jobs, same warning as ^ Error *string `db:"error"` // Text of the error that failed the run (redundant - same error would exist in the table) // contains filtered or unexported fields }
Run is a series of tables triggered by Maestro and associated with a Frequency.
A Run is created with a NewRun() (which merely inits the datastructure), followed by Assemble(). Assemble selects all the tables associated with the Run's frequency and constructs BQJob configurations for each table and inserts those into the database. This collection of BQJobs is the execution plan for this Run and is self-sufficient, reflecting the tables as they were at the time of Assemble. This means that the tables can change afterwards, but it will not affect an already assembled Run.
The Run execution is commenced via Start(), which repeatedly calls processCycle, which, in turn, constructs a DAG of the jobs, traverses and submits BQ jobs that can execute. At this point a BQJob will have a BQJobId which is the indicator that it has been submitted. If a job encounters errors, processCycle fails and with it fails the entire Run, requiring manual intervention afterwards.
A failed Run can be resumed with Resume(). All that Resume does is continue the cycling process, but this time counting errors. If the count of errors exceeds the count at the beginning of the Run, the Run fails again.
func (*Run) Assemble ¶
Write the run to the database and create and save all the jobs that belong to it, as well as figure out the parent/child relationship in the process.
type SlackConf ¶
type SlackConf struct { Url string `db:"url"` // Slack hook URL UserName string `db:"username"` // Username that will show in slack, e.g. "maestro" Channel string `db:"channel"` // The channel to which to post IconEmoji string `db:"iconemoji"` // An emoji that will show next to user name, e.g. ":violin:" UrlPrefix string `db:"url_prefix"` // URL to this instance of Maestro (to make messages clickable) }
Configuration necessary for Slack posts
type Table ¶
type Table struct { Id int64 UserId int64 // This user created the table GroupId int64 // Group to which this table belongs Email string // from joined users table Name string // Table name (without dataset) Dataset string // from joined datasets table DatasetId int64 // The dataset Query string // The SQL Disposition string // e.g. WRITE_TRUNCATE or WRITE_APPEND Partitioned bool // BigQuery DAY partitioning LegacySQL bool // Standard SQL if false Description string // Some descriptive text. TODO: make it searchable? Error string // If the last run ended in error Running bool // True if the table is currently running Extract bool // Make a GCS extract NotifyExtractUrl string // The URL to notify upon extract completion SheetsExtract bool // Make a sheets extract SheetId string // The sheet id ExportDbId int64 // Export this table to this database ExportTableName string // Use this as table name when exporting FreqId int64 // Run this table periodically Conditions []*scheduler.Condition // Run only when this condition is satisfied ExternalTmout int64 // External table only: timeout ExternalFormat string // CSV or NEWLINE_DELIMITED_JSON ImportDbId int64 // Import from this database ImportedAt time.Time // Last import time IdColumn string // Column for incremental imports ReimportCond []*scheduler.Condition // Reimport if this condition satisfied and imported_at does not satisfy it LastId string // Last value of IdColumn for incremental imports CreatedAt time.Time DeletedAt time.Time LastOkRunEndAt time.Time // Used by pythonlib // contains filtered or unexported fields }
All the information about a Table. Note the absence of "db" tags, this struct is stored in the database by way of an intermediate internal struct, see the db package.
func (*Table) ExternalLoad ¶
Load table from external data. fname is the GCS file name (without bucket), userId is the user triggering this action, if any.
func (*Table) ImportDataset ¶
Return the dataset associated with the import database.
func (*Table) IsExternal ¶
func (*Table) LastExtractJob ¶
Return the latest extract BQJob. This is where the GCS URLs come from.
func (*Table) QueueGitCommit ¶
Queue up a git commit, which will eventually happen in a separate goroutine. The tables are stored in Git by Marshaling the Table struct as YAML.
func (*Table) SignedUploadURL ¶
Create a signed PUT-only GCS URL (for external table).
type User ¶
type User struct { Id int64 OAuthId string `db:"oauth_id"` // Id issued by OAuth Email string `db:"email"` // User email (also retreived from OAuth) Disabled bool `db:"disabled"` // If true, the used can never log in Admin bool `db:"admin"` // Admins can do anything CryptToken string `db:"api_token"` // The token for pythonlib (or other direct API access) PlainToken []byte `db:"-"` // }
User information.
type UserWithGroups ¶
type UserWithGroups struct { User Groups GroupArray `db:"groups"` // Groups this user is in (comes from user_groups table join) }
User information with groups
func (*UserWithGroups) GenerateAndSaveToken ¶
func (u *UserWithGroups) GenerateAndSaveToken(m *Model) error
Generate and save an API access token. Such a token allowes access to some Maestro API's without other authentication, the generated token should be kept securely.