Documentation
¶
Index ¶
- Constants
- func QueryResultToRecords(qr *sqltypes.Result) []map[string]interface{}
- func TabletTypeToString(t psdbconnect.TabletType) string
- type AirbyteLogMessage
- type AirbyteLogger
- type AirbyteMessage
- type AirbyteRecord
- type AirbyteState
- type Catalog
- type ConfiguredCatalog
- type ConfiguredStream
- type ConnectionProperties
- type ConnectionProperty
- type ConnectionSpecification
- type ConnectionStatus
- type PlanetScaleDatabase
- type PlanetScaleEdgeDatabase
- func (p PlanetScaleEdgeDatabase) CanConnect(ctx context.Context, psc PlanetScaleSource) error
- func (p PlanetScaleEdgeDatabase) Close() error
- func (p PlanetScaleEdgeDatabase) DiscoverSchema(ctx context.Context, psc PlanetScaleSource) (Catalog, error)
- func (p PlanetScaleEdgeDatabase) ListShards(ctx context.Context, psc PlanetScaleSource) ([]string, error)
- func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps PlanetScaleSource, s ConfiguredStream, ...) (*SerializedCursor, error)
- type PlanetScaleEdgeMysqlAccess
- type PlanetScaleSource
- type PropertyType
- type SerializedCursor
- type ShardStates
- type Spec
- type SpecMessage
- type Stream
- type StreamSchema
- type SyncState
- type VitessTablet
Constants ¶
const ( RECORD = "RECORD" STATE = "STATE" LOG = "LOG" CONNECTION_STATUS = "CONNECTION_STATUS" CATALOG = "CATALOG" )
const ( LOGLEVEL_ERROR = "ERROR" LOGLEVEL_WARN = "WARN" LOGLEVEL_INFO = "INFO" )
const ( SYNC_MODE_FULL_REFRESH = "full_refresh" SYNC_MODE_INCREMENTAL = "incremental" )
const MaxBatchSize = 10000
Variables ¶
This section is empty.
Functions ¶
func QueryResultToRecords ¶
func TabletTypeToString ¶
func TabletTypeToString(t psdbconnect.TabletType) string
Types ¶
type AirbyteLogMessage ¶
type AirbyteLogger ¶
type AirbyteLogger interface {
Log(level, message string)
Catalog(catalog Catalog)
ConnectionStatus(status ConnectionStatus)
Record(tableNamespace, tableName string, data map[string]interface{})
Flush()
State(syncState SyncState)
Error(error string)
}
func NewLogger ¶
func NewLogger(w io.Writer) AirbyteLogger
type AirbyteMessage ¶
type AirbyteMessage struct {
Type string `json:"type"`
Log *AirbyteLogMessage `json:"log,omitempty"`
ConnectionStatus *ConnectionStatus `json:"connectionStatus,omitempty"`
Catalog *Catalog `json:"catalog,omitempty"`
Record *AirbyteRecord `json:"record,omitempty"`
State *AirbyteState `json:"state,omitempty"`
}
type AirbyteRecord ¶
type AirbyteState ¶
type AirbyteState struct {
Data SyncState `json:"data"`
}
type ConfiguredCatalog ¶
type ConfiguredCatalog struct {
Streams []ConfiguredStream `json:"streams"`
}
type ConfiguredStream ¶
func (ConfiguredStream) IncrementalSyncRequested ¶
func (cs ConfiguredStream) IncrementalSyncRequested() bool
func (ConfiguredStream) ResetRequested ¶
func (cs ConfiguredStream) ResetRequested() bool
type ConnectionProperties ¶
type ConnectionProperties struct {
Host ConnectionProperty `json:"host"`
Shards ConnectionProperty `json:"shards"`
Database ConnectionProperty `json:"database"`
Username ConnectionProperty `json:"username"`
Password ConnectionProperty `json:"password"`
}
type ConnectionProperty ¶
type ConnectionSpecification ¶
type ConnectionStatus ¶
type PlanetScaleDatabase ¶
type PlanetScaleDatabase interface {
CanConnect(ctx context.Context, ps PlanetScaleSource) error
DiscoverSchema(ctx context.Context, ps PlanetScaleSource) (Catalog, error)
ListShards(ctx context.Context, ps PlanetScaleSource) ([]string, error)
Read(ctx context.Context, w io.Writer, ps PlanetScaleSource, s ConfiguredStream, tc *psdbconnect.TableCursor) (*SerializedCursor, error)
Close() error
}
PlanetScaleDatabase is a general purpose interface that defines all the data access methods needed for the PlanetScale Airbyte source to function.
type PlanetScaleEdgeDatabase ¶
type PlanetScaleEdgeDatabase struct {
Logger AirbyteLogger
Mysql PlanetScaleEdgeMysqlAccess
// contains filtered or unexported fields
}
PlanetScaleEdgeDatabase is an implementation of the PlanetScaleDatabase interface defined above. It uses the mysql interface provided by PlanetScale for all schema/shard/tablet discovery and the grpc API for incrementally syncing rows from PlanetScale.
func (PlanetScaleEdgeDatabase) CanConnect ¶
func (p PlanetScaleEdgeDatabase) CanConnect(ctx context.Context, psc PlanetScaleSource) error
func (PlanetScaleEdgeDatabase) Close ¶
func (p PlanetScaleEdgeDatabase) Close() error
func (PlanetScaleEdgeDatabase) DiscoverSchema ¶
func (p PlanetScaleEdgeDatabase) DiscoverSchema(ctx context.Context, psc PlanetScaleSource) (Catalog, error)
func (PlanetScaleEdgeDatabase) ListShards ¶
func (p PlanetScaleEdgeDatabase) ListShards(ctx context.Context, psc PlanetScaleSource) ([]string, error)
func (PlanetScaleEdgeDatabase) Read ¶
func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps PlanetScaleSource, s ConfiguredStream, tc *psdbconnect.TableCursor) (*SerializedCursor, error)
type PlanetScaleEdgeMysqlAccess ¶
type PlanetScaleEdgeMysqlAccess interface {
PingContext(context.Context, PlanetScaleSource) error
GetTableNames(context.Context, PlanetScaleSource) ([]string, error)
GetTableSchema(context.Context, PlanetScaleSource, string) (map[string]PropertyType, error)
GetTablePrimaryKeys(context.Context, PlanetScaleSource, string) ([]string, error)
GetVitessShards(ctx context.Context, psc PlanetScaleSource) ([]string, error)
GetVitessTablets(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error)
Close() error
}
func NewMySQL ¶
func NewMySQL(psc *PlanetScaleSource) (PlanetScaleEdgeMysqlAccess, error)
type PlanetScaleSource ¶
type PlanetScaleSource struct {
Host string `json:"host"`
Database string `json:"database"`
Username string `json:"username"`
Password string `json:"password"`
Shards string `json:"shards"`
}
PlanetScaleSource defines a configured Airbyte Source for a PlanetScale database
func (PlanetScaleSource) DSN ¶
func (psc PlanetScaleSource) DSN(tt psdbconnect.TabletType) string
DSN returns a DataSource that mysql libraries can use to connect to a PlanetScale database.
func (PlanetScaleSource) GetInitialState ¶
func (psc PlanetScaleSource) GetInitialState(keyspaceOrDatabase string, shards []string) (ShardStates, error)
GetInitialState will return the initial/blank state for a given keyspace in all of its shards. This state can be round-tripped safely with Airbyte.
type PropertyType ¶
type PropertyType struct {
Type string `json:"type"`
}
type SerializedCursor ¶
type SerializedCursor struct {
Cursor string `json:"cursor"`
}
func TableCursorToSerializedCursor ¶
func TableCursorToSerializedCursor(cursor *psdbconnect.TableCursor) (*SerializedCursor, error)
func (SerializedCursor) SerializedCursorToTableCursor ¶
func (s SerializedCursor) SerializedCursorToTableCursor(table ConfiguredStream) (*psdbconnect.TableCursor, error)
type ShardStates ¶
type ShardStates struct {
Shards map[string]*SerializedCursor `json:"shards"`
}
type Spec ¶
type Spec struct {
DocumentationURL string `json:"documentationUrl"`
ConnectionSpecification ConnectionSpecification `json:"connectionSpecification"`
SupportsIncremental bool `json:"supportsIncremental"`
SupportsNormalization bool `json:"supportsNormalization"`
SupportsDBT bool `json:"supportsDBT"`
SupportedDestinationSyncModes []string `json:"supported_destination_sync_modes"`
}
type SpecMessage ¶
type Stream ¶
type Stream struct {
Name string `json:"name"`
Schema StreamSchema `json:"json_schema"`
SupportedSyncModes []string `json:"supported_sync_modes"`
Namespace string `json:"namespace"`
PrimaryKeys [][]string `json:"source_defined_primary_key"`
SourceDefinedCursor bool `json:"source_defined_cursor"`
DefaultCursorFields []string `json:"default_cursor_field"`
}
type StreamSchema ¶
type StreamSchema struct {
Type string `json:"type"`
Properties map[string]PropertyType `json:"properties"`
}
type SyncState ¶
type SyncState struct {
Streams map[string]ShardStates `json:"streams"`
}