Documentation ¶
Index ¶
- Constants
- Variables
- func BackticksWrapper() func(text string) string
- func DoubleQuotesWrapper() func(text string) string
- func DumpTables(db *sql.DB, tables ...*Table) (string, error)
- func Identity(elem interface{}) (interface{}, error)
- func PostgresParameterPlaceholder(parameterIndex int) string
- func QuestionMarkPlaceholder(_ int) string
- func SingleQuotesWrapper() func(text string) string
- func ValidateMatchesExisting(existing *pf.MaterializationSpec_Binding, proposed *pf.CollectionSpec) map[string]*pm.Constraint
- func ValidateNewSQLProjections(proposed *pf.CollectionSpec, deltaUpdates bool) map[string]*pm.Constraint
- func ValidateSelectedFields(constraints map[string]*pm.Constraint, ...) error
- type Column
- type ColumnType
- type ColumnTypeMapper
- type CommentConfig
- type CommentRenderer
- type ConstColumnType
- type Driver
- func (d *Driver) ApplyDelete(ctx context.Context, req *pm.ApplyRequest) (*pm.ApplyResponse, error)
- func (d *Driver) ApplyUpsert(ctx context.Context, req *pm.ApplyRequest) (*pm.ApplyResponse, error)
- func (d *Driver) Spec(ctx context.Context, req *pm.SpecRequest) (*pm.SpecResponse, error)
- func (d *Driver) Transactions(stream pm.Driver_TransactionsServer) error
- func (d *Driver) Validate(ctx context.Context, req *pm.ValidateRequest) (*pm.ValidateResponse, error)
- type Endpoint
- type ExecFn
- type Fence
- type FlowTables
- type Generator
- func (gen *Generator) InsertStatement(table *Table) (string, ParametersConverter, error)
- func (gen *Generator) QueryOnPrimaryKey(table *Table, selectColumns ...string) (string, ParametersConverter, error)
- func (gen *Generator) UpdateStatement(table *Table, setColumns []string, whereColumns []string) (string, ParametersConverter, error)
- type LengthConstrainedColumnType
- type MaxLengthableColumnType
- type NullableTypeMapping
- type ParametersConverter
- type Renderer
- type ResolvedColumnType
- type Resource
- type ResourcePath
- type StdEndpoint
- func (e *StdEndpoint) Config() interface{}
- func (e *StdEndpoint) CreateTableStatement(table *Table) (string, error)
- func (e *StdEndpoint) DB() *sql.DB
- func (e *StdEndpoint) ExecuteStatements(ctx context.Context, statements []string) error
- func (e *StdEndpoint) FlowTables() *FlowTables
- func (e *StdEndpoint) Generator() *Generator
- func (e *StdEndpoint) LoadSpec(ctx context.Context, materialization pf.Materialization) (version string, _ *pf.MaterializationSpec, _ error)
- func (e *StdEndpoint) NewFence(ctx context.Context, materialization pf.Materialization, ...) (Fence, error)
- type StdFence
- type StringTypeInfo
- type StringTypeMapping
- type Table
- type TokenPair
- type TypeMapper
Constants ¶
const ( // DefaultFlowCheckpoints is the default table for checkpoints. DefaultFlowCheckpoints = "flow_checkpoints_v1" // DefaultFlowMaterializations is the default table for materialization specs. DefaultFlowMaterializations = "flow_materializations_v2" )
const TypeLengthPlaceholder = "?"
TypeLengthPlaceholder is the placeholder string that may appear in the SQL string, which will be replaced by the MaxLength of the string.
Variables ¶
var ( // DefaultUnwrappedIdentifiers is a SkipWrapper function that checks for identifiers that 1 // typically do not need wrapping. DefaultUnwrappedIdentifiers = regexp.MustCompile(`^[_\pL]+[_\pL\pN]*$`).MatchString // DefaultQuoteSanitizer used for sanitizing fields in SQL. DefaultQuoteSanitizer = strings.NewReplacer("'", "''").Replace )
Functions ¶
func BackticksWrapper ¶
Backticks returns a wrapper function with a single backtick character on the both the Left and the Right.
func DoubleQuotesWrapper ¶
DoubleQuotes returns a wrapper function with a single double quote character on the both the Left and the Right.
func DumpTables ¶
DumpTables is a convenience for testing which dumps the contents of the given tables into a debug string suitable for snapshotting.
func Identity ¶
func Identity(elem interface{}) (interface{}, error)
Identity is an identity function for no-op conversions of tuple elements to `interface{}` values that are suitable for use as sql parameters.
func PostgresParameterPlaceholder ¶
PostgresParameterPlaceholder returns $N style parameters where N is the parameter number starting at 1.
func QuestionMarkPlaceholder ¶
QuestionMarkPlaceholder returns the constant string "?".
func SingleQuotesWrapper ¶
SingleQuotesWrapper returns a wrapper function with one single quote character on the both the Left and the Right.
func ValidateMatchesExisting ¶
func ValidateMatchesExisting(existing *pf.MaterializationSpec_Binding, proposed *pf.CollectionSpec) map[string]*pm.Constraint
ValidateMatchesExisting returns a set of constraints to use when there is a new proposed CollectionSpec for a materialization that is already running, or has been Applied. The returned constraints will explicitly require all fields that are currently materialized, as long as they are not unsatisfiable, and forbid any fields that are not currently materialized.
func ValidateNewSQLProjections ¶
func ValidateNewSQLProjections(proposed *pf.CollectionSpec, deltaUpdates bool) map[string]*pm.Constraint
ValidateNewSQLProjections returns a set of constraints for a proposed flow collection for a **new** materialization (one that is not running and has never been Applied). Note that this will "recommend" all projections of single scalar types, which is what drives the default field selection in flowctl.
func ValidateSelectedFields ¶
func ValidateSelectedFields(constraints map[string]*pm.Constraint, proposed *pf.MaterializationSpec_Binding) error
ValidateSelectedFields validates a proposed MaterializationSpec against a set of constraints. If any constraints would be violated, then an error is returned.
Types ¶
type Column ¶
type Column struct { // The Name of the column, which is used when referencing columns by name when calling SQL // generation functions. This value should not include any quotes or escape characters. Name string // Identifier is the final form of the column name, exactly as it should be represented in SQL // statements. If quoting is necessary, then the quotes must be included here. Identifier string // Comment is optional text that will be used only on CREATE TABLE statements Comment string // PrimaryKey is true if this column is the primary key, or if it is part of a composite key. PrimaryKey bool // Type is the application type of the data. This corresponds closely to JSON types, but // includes "binary" and excludes "null". Unlike Flow Projections, a Column may only have a // single type, and nullability is represented as a separate boolean rather than a type itself. Type ColumnType // StringType is optional additional type information for strings. StringType *StringTypeInfo // NotNull is true if the database columns should disallow null values. NotNull bool }
Column describes a SQL table column that will hold JSON values
func ColumnForProjection ¶
func ColumnForProjection(projection *pf.Projection, identifierRenderer *Renderer) Column
ColumnForProjection returns a Column that is appropriate for storing values from the given Projection.
type ColumnType ¶
type ColumnType string
ColumnType represents a minimal set of database-agnostic types that we may try to store and query. This set of types is slightly different than the set of JSON types. This has a "binary" type for dealing with byte slices, and there is no "null" type, since nullability is modeled separately.
const ( STRING ColumnType = "string" BOOLEAN ColumnType = "boolean" INTEGER ColumnType = "integer" NUMBER ColumnType = "number" OBJECT ColumnType = "object" ARRAY ColumnType = "array" BINARY ColumnType = "binary" )
ColumnType constants that are used by ColumnTypeMapper
type ColumnTypeMapper ¶
type ColumnTypeMapper map[ColumnType]TypeMapper
ColumnTypeMapper selects a specific TypeMapper based on the type of the data that will be passed to as a parameter for inserts or updates to the column.
func (ColumnTypeMapper) GetColumnType ¶
func (amap ColumnTypeMapper) GetColumnType(col *Column) (*ResolvedColumnType, error)
GetColumnType implements the TypeMapper interface
type CommentConfig ¶
type CommentConfig struct { // Linewise determines whether to render line or block comments. If it is true, then each line // of comment text will be wrapped separately. If false, then the entire multi-line block of // comment text will be wrapped once. Linewise bool // Wrap holds the strings that will bound the beginning and end of the comment. Wrap TokenPair }
CommentConfig determines how SQL comments are rendered.
func LineComment ¶
func LineComment() CommentConfig
LineComment returns a CommentConfig configured for standard sql line comments that begins each line with a double dash ("-- ")
type CommentRenderer ¶
type CommentRenderer struct { // Linewise determines whether to render line or block comments. If it is true, then each line // of comment text will be wrapped separately. If false, then the entire multi-line block of // comment text will be wrapped once. Linewise bool // Wrap holds the strings that will bound the beginning and end of the comment. Wrap *TokenPair }
CommentRenderer is used to render comments in SQL.
func LineCommentRenderer ¶
func LineCommentRenderer() *CommentRenderer
LineCommentRenderer returns a per line comment valid for standard SQL.
func (*CommentRenderer) Render ¶
func (cr *CommentRenderer) Render(text string) string
Render takes a string and renders it as a comment based on it's configuration.
type ConstColumnType ¶
type ConstColumnType ResolvedColumnType
ConstColumnType is a ResolvedColumnType that is known statically at compile time.
func RawConstColumnType ¶
func RawConstColumnType(sql string) ConstColumnType
RawConstColumnType returns a ConstColumnType that always uses the given sql string as DDL and performs a no-op value conversion.
func (ConstColumnType) GetColumnType ¶
func (c ConstColumnType) GetColumnType(col *Column) (*ResolvedColumnType, error)
GetColumnType implements the TypeMapper interface
type Driver ¶
type Driver struct { // URL at which documentation for the driver may be found. DocumentationURL string // Instance of the type into which endpoint specifications are parsed. EndpointSpecType interface{} // Instance of the type into which resource specifications are parsed. ResourceSpecType Resource // NewEndpoint returns an Endpoint, which will be used to handle interactions with the database. NewEndpoint func(context.Context, json.RawMessage) (Endpoint, error) // NewResource returns an uninitialized Resource which may be parsed into. NewResource func(ep Endpoint) Resource // NewTransactor returns a Transactor ready for pm.RunTransactions. NewTransactor func(context.Context, Endpoint, *pf.MaterializationSpec, Fence, []Resource) (pm.Transactor, error) }
Driver implements the pm.DriverServer interface.
func (*Driver) ApplyDelete ¶
func (d *Driver) ApplyDelete(ctx context.Context, req *pm.ApplyRequest) (*pm.ApplyResponse, error)
ApplyDelete implements the DriverServer interface.
func (*Driver) ApplyUpsert ¶
func (d *Driver) ApplyUpsert(ctx context.Context, req *pm.ApplyRequest) (*pm.ApplyResponse, error)
ApplyUpsert implements the DriverServer interface.
func (*Driver) Spec ¶
func (d *Driver) Spec(ctx context.Context, req *pm.SpecRequest) (*pm.SpecResponse, error)
Spec implements the DriverServer interface.
func (*Driver) Transactions ¶
func (d *Driver) Transactions(stream pm.Driver_TransactionsServer) error
Transactions implements the DriverServer interface.
func (*Driver) Validate ¶
func (d *Driver) Validate(ctx context.Context, req *pm.ValidateRequest) (*pm.ValidateResponse, error)
Validate implements the DriverServer interface.
type Endpoint ¶
type Endpoint interface { // LoadSpec loads the named MaterializationSpec and its version that's stored within the Endpoint, if any. LoadSpec(ctx context.Context, materialization pf.Materialization) (string, *pf.MaterializationSpec, error) // CreateTableStatement returns the SQL statement to create the specified table in the correct dialect. CreateTableStatement(table *Table) (string, error) // ExecuteStatements takes a slice of SQL statements and executes them as a single transaction // (or as multiple transactions if it's not possible for the implementation) and rolls back // if there is a failure. ExecuteStatements(ctx context.Context, statements []string) error // NewFence installs and returns a new endpoint specific Fence implementation. On return, all // older endpoints with matching materialization name and overlapping key-range will be // blocked from further database operations. This prevents rogue endpoints from committing // further transactions. NewFence(ctx context.Context, materialization pf.Materialization, keyBegin, keyEnd uint32) (Fence, error) // Generator returns the dialect specific SQL generator for the endpoint. Generator() *Generator // FlowTables returns the FlowTables definitions for this endpoint. FlowTables() *FlowTables }
Endpoint is an sql compatible endpoint that allows dialect specific tasks and generators.
type ExecFn ¶
type ExecFn func(ctx context.Context, sql string, arguments ...interface{}) (rowsAffected int64, _ error)
ExecFn executes a |sql| statement with |arguments|, and returns the number of rows affected.
type Fence ¶
type Fence interface { // Fetch the current checkpoint. Checkpoint() []byte // SetCheckpoint sets the current checkpoint. SetCheckpoint(checkpoint []byte) // LogEntry returns a logger Entry with context of the current fence to differentiate // concurrent threads in the logs. LogEntry() *logrus.Entry }
Fence is an installed barrier in a shared checkpoints table which prevents other sessions from committing transactions under the fenced ID -- and prevents this Fence from committing where another session has in turn fenced this instance off.
type FlowTables ¶
type FlowTables struct { Checkpoints *Table // Table of Flow checkpoints. Specs *Table // Table of MaterializationSpecs. }
FlowTables is the table specifications for Flow.
func DefaultFlowTables ¶
func DefaultFlowTables(prefix string) FlowTables
DefaultFlowTables returns the default Flow *Table configurations and names with optional prefix. The prefix can be used to prepend pre-table identifiers such as schema names.
type Generator ¶
type Generator struct { Placeholder func(int) string CommentRenderer *CommentRenderer IdentifierRenderer *Renderer ValueRenderer *Renderer TypeMappings TypeMapper }
Generator generates SQL for a large variety of SQL dialects using various configuration parameters.
func PostgresSQLGenerator ¶
func PostgresSQLGenerator() Generator
PostgresSQLGenerator returns a SQLGenerator for the postgresql SQL dialect.
func SQLiteSQLGenerator ¶
func SQLiteSQLGenerator() Generator
SQLiteSQLGenerator returns a SQLGenerator for the sqlite SQL dialect.
func (*Generator) InsertStatement ¶
func (gen *Generator) InsertStatement(table *Table) (string, ParametersConverter, error)
InsertStatement returns an insert statement for the given table that includes all columns. The returned sql will have a parameter placeholder for every column in the order they appear in the Table. This should generate a plain insert statement, not an upsert, since we'll know in advance whether each document exists or not, and only use the InsertStatement when we know the document does not exist.
func (*Generator) QueryOnPrimaryKey ¶
func (gen *Generator) QueryOnPrimaryKey(table *Table, selectColumns ...string) (string, ParametersConverter, error)
QueryOnPrimaryKey generates a query that has a placeholder parameter for each primary key in the order given in the table. Only selectColumns will be selected in the same order as provided.
func (*Generator) UpdateStatement ¶
func (gen *Generator) UpdateStatement(table *Table, setColumns []string, whereColumns []string) (string, ParametersConverter, error)
UpdateStatement returns an update statement for the given table that sets the columns given in setColumns and matches based on the columns in whereColumns. The returned statement will have a placeholder parameter for each of the setColumns in the order given, followed by a parameter for each of the whereColumns in the order given.
type LengthConstrainedColumnType ¶
type LengthConstrainedColumnType ResolvedColumnType
LengthConstrainedColumnType is a TypeMapper that must always have a length argument, e.g. "VARCHAR(42)"
func (LengthConstrainedColumnType) GetColumnType ¶
func (c LengthConstrainedColumnType) GetColumnType(col *Column) (*ResolvedColumnType, error)
GetColumnType implements the TypeMapper interface
type MaxLengthableColumnType ¶
type MaxLengthableColumnType struct { WithoutLength *ConstColumnType WithLength *LengthConstrainedColumnType }
MaxLengthableColumnType is a TypeMapper that supports column types that may have a length argument (e.g. "VARCHAR(76)").
func (MaxLengthableColumnType) GetColumnType ¶
func (c MaxLengthableColumnType) GetColumnType(col *Column) (*ResolvedColumnType, error)
GetColumnType implements the TypeMapper interface
type NullableTypeMapping ¶
type NullableTypeMapping struct { NotNullText string NullableText string Inner TypeMapper }
NullableTypeMapping wraps a TypeMapper to add "NULL" and/or "NOT NULL" to the generated SQL type depending on the nullability of the column. Most databases will assume that a column may contain null as long as it isn't declared with a NOT NULL constraint, but some databases (e.g. ms sql server) make that behavior configurable, requiring the DDL to explicitly declare a column with NULL if it may contain null values. This wrapper will handle either or both cases.
func (NullableTypeMapping) GetColumnType ¶
func (mapper NullableTypeMapping) GetColumnType(col *Column) (*ResolvedColumnType, error)
GetColumnType implements the TypeMapper interface
type ParametersConverter ¶
type ParametersConverter []func(interface{}) (interface{}, error)
ParametersConverter is a slice of functions that can be used to convert a Tuple into an []interface{} that can be passed to the database driver. This conversion may be different depending on the specific driver, so instances of ParametersConverter should be obtained from the Generator when generating a sql statement.
type Renderer ¶
type Renderer struct {
// contains filtered or unexported fields
}
Renderer is used for naming things inside of SQL. It can be used for fields or values to handle sanitization and quoting.
func NewRenderer ¶
func NewRenderer(sanitizer func(string) string, wrapper func(string) string, skipWrapper func(string) bool) *Renderer
NewRenderer returns a configured renderer instance.
func (*Renderer) Sanitize ¶
Sanitize uses a SanitizerFunc or returns the original string if it's nil.
type ResolvedColumnType ¶
type ResolvedColumnType struct { SQLType string ValueConverter func(interface{}) (interface{}, error) }
ResolvedColumnType represents the result of successfully mapping a Column to SQL DDL and a function that can be used to convert a Tuple element into a type that is appropriate for the driver.
type Resource ¶
type Resource interface { // Validate returns an error if the Resource is malformed. Validate() error // Path returns the fully qualified name of the resource, as '.'-separated components. Path() ResourcePath // DeltaUpdates is true if the resource should be materialized using delta updates. DeltaUpdates() bool }
Resource is a driver-provided type which represents the SQL resource (for example, a table) bound to by a binding.
type ResourcePath ¶
type ResourcePath []string
ResourcePath is '.'-separated path components of a fully qualified database resource.
func (ResourcePath) Join ¶
func (p ResourcePath) Join() string
Join the ResourcePath into a '.'-separated string.
type StdEndpoint ¶
type StdEndpoint struct {
// contains filtered or unexported fields
}
StdEndpoint is the *database/sql.DB standard implementation of an endpoint.
func NewStdEndpoint ¶
func NewStdEndpoint(config interface{}, db *sql.DB, generator Generator, flowTables FlowTables) *StdEndpoint
NewStdEndpoint composes a new StdEndpoint suitable for sql.DB compatible databases.
func (*StdEndpoint) Config ¶
func (e *StdEndpoint) Config() interface{}
Config returns the endpoint's config value.
func (*StdEndpoint) CreateTableStatement ¶
func (e *StdEndpoint) CreateTableStatement(table *Table) (string, error)
CreateTableStatement generates a CREATE TABLE statement for the given table. The returned statement must not contain any parameter placeholders.
func (*StdEndpoint) ExecuteStatements ¶
func (e *StdEndpoint) ExecuteStatements(ctx context.Context, statements []string) error
ExecuteStatements executes all of the statements provided in a single transaction.
func (*StdEndpoint) FlowTables ¶
func (e *StdEndpoint) FlowTables() *FlowTables
FlowTables returns the Flow Tables configurations.
func (*StdEndpoint) Generator ¶
func (e *StdEndpoint) Generator() *Generator
Generator returns the SQL generator.
func (*StdEndpoint) LoadSpec ¶
func (e *StdEndpoint) LoadSpec(ctx context.Context, materialization pf.Materialization) (version string, _ *pf.MaterializationSpec, _ error)
LoadSpec loads the named MaterializationSpec and its version that's stored within the Endpoint, if any.
func (*StdEndpoint) NewFence ¶
func (e *StdEndpoint) NewFence(ctx context.Context, materialization pf.Materialization, keyBegin, keyEnd uint32) (Fence, error)
NewStdFence installs and returns a new *StdFence. On return, all older fences of this |shardFqn| have been fenced off from committing further transactions.
type StdFence ¶
type StdFence struct {
// contains filtered or unexported fields
}
StdFence is an installed barrier in a shared checkpoints table which prevents other sessions from committing transactions under the fenced ID -- and prevents this Fence from committing where another session has in turn fenced this instance off. This implementation of the Fence interface is for standard *sql.DB compatable databases.
func (*StdFence) Checkpoint ¶
Checkpoint returns the current checkpoint.
func (*StdFence) LogEntry ¶
LogEntry returns a log.Entry with pre-set fields that identify the Shard ID and Fence.
func (*StdFence) SetCheckpoint ¶
SetCheckpoint sets the current checkpoint.
type StringTypeInfo ¶
StringTypeInfo holds optional additional type information for string columns
type StringTypeMapping ¶
type StringTypeMapping struct { Default TypeMapper ByFormat map[string]TypeMapper ByContentType map[string]TypeMapper }
StringTypeMapping is a special TypeMapper for string type columns, which can take the format and/or content type into account when deciding what sql column type to generate.
func (StringTypeMapping) GetColumnType ¶
func (mapping StringTypeMapping) GetColumnType(col *Column) (*ResolvedColumnType, error)
GetColumnType implements the TypeMapper interface
type Table ¶
type Table struct { // The Name of the table before sanitization and quoting. Name string // Identifier is the final form of the table name, exactly as it should be represented in SQL // statements. If quoting is necessary, then the quotes must be included here. Identifier string // Optional Comment to add to create table statements. Comment string // The complete list of columns that should be created for the table and used in insert statements. This does // not need to include "automatic" columns (e.g. rowid), but only columns that should be // explicitly created and inserted into. Columns []Column // If IfNotExists is true then the create table statement will include an "IF NOT EXISTS" (or // equivalent). IfNotExists bool // Whether this is a temporary table. If true, then this will be created with the "TEMP(ORARY)" keyword. Temporary bool // If this is a temporary table, then this may optionally specify the ON COMMIT behavior. If // left blank, then no "ON COMMIT" clause will be added. TempOnCommit string }
Table describes a database table, which can be used to generate various types of SQL statements.
func FlowCheckpointsTable ¶
FlowCheckpointsTable returns the Table description for the table that holds the checkpoint and nonce values for each materialization shard.
func FlowMaterializationsTable ¶
FlowMaterializationsTable returns the Table description for the table that holds the MaterializationSpec that corresponds to each target table. This state is used both for sql generation and for validation.
func TableForMaterialization ¶
func TableForMaterialization(name string, comment string, identifierRenderer *Renderer, spec *pf.MaterializationSpec_Binding) *Table
TableForMaterialization converts a MaterializationSpec into the Table representation that's used by Generator. This assumes that the MaterializationSpec has already been validated to ensure that each projection has exactly one type besides "null".
type TokenPair ¶
TokenPair is a generic way of representing strings that can be used to surround some text for quoting and commenting.
func NewTokenPair ¶
NewTokenPair returns a TokenPair with the left and right tokens specified.
type TypeMapper ¶
type TypeMapper interface { // GetColumnType resolves a Column to a specific SQL type. For example, for all "string" // type Columns, it may return the "TEXT" sql type. An implementation may take into account as // much or as little information as it wants to about a particular column, and some may not // inspect the column at all. GetColumnType(column *Column) (*ResolvedColumnType, error) }
A TypeMapper resolves a Column to a specific base SQL type. For example, for all "string" type Columns, it may return the "TEXT" sql type. We use a decorator pattern to compose TypeMappers.