Documentation
¶
Index ¶
- Variables
- func LogCatalog(streams []*Stream, oldCatalog *Catalog)
- func StreamsToMap(streams ...*Stream) map[string]*Stream
- type ActionRow
- type AdapterType
- type Catalog
- type Chunk
- type Condition
- type ConfiguredStream
- func (s *ConfiguredStream) Cursor() (string, string)
- func (s *ConfiguredStream) GetFilter() (Filter, error)
- func (s *ConfiguredStream) GetStream() *Stream
- func (s *ConfiguredStream) GetSyncMode() SyncMode
- func (s *ConfiguredStream) ID() string
- func (s *ConfiguredStream) Name() string
- func (s *ConfiguredStream) Namespace() string
- func (s *ConfiguredStream) NormalizationEnabled() bool
- func (s *ConfiguredStream) Schema() *TypeSchema
- func (s *ConfiguredStream) Self() *ConfiguredStream
- func (s *ConfiguredStream) SupportedSyncModes() *Set[SyncMode]
- func (s *ConfiguredStream) Validate(source *Stream) error
- type ConnectionStatus
- type DataType
- type Filter
- type GlobalState
- type Hashable
- type Identifier
- type Iterable
- type Log
- type Message
- type MessageType
- type Property
- type RawRecord
- type Record
- type Set
- func (st *Set[T]) Array() []T
- func (st *Set[T]) Difference(set *Set[T]) *Set[T]
- func (st *Set[T]) Exists(element T) bool
- func (st *Set[T]) Hash(elem T) string
- func (st *Set[T]) Insert(elements ...T)
- func (st *Set[T]) Intersection(set *Set[T]) *Set[T]
- func (st *Set[T]) Len() int
- func (st *Set[T]) MarshalJSON() ([]byte, error)
- func (st *Set[T]) ProperSubsetOf(set *Set[T]) bool
- func (st *Set[T]) Range(f func(T))
- func (st *Set[T]) Remove(element T)
- func (st *Set[T]) String() string
- func (st *Set[T]) SubsetOf(set *Set[T]) bool
- func (st *Set[T]) Union(set *Set[T]) *Set[T]
- func (st *Set[T]) UnmarshalJSON(data []byte) error
- func (st *Set[T]) WithHasher(f func(T) string) *Set[T]
- type State
- func (s *State) GetChunks(stream *ConfiguredStream) *Set[Chunk]
- func (s *State) GetCursor(stream *ConfiguredStream, key string) any
- func (s *State) GetGlobal() *GlobalState
- func (s *State) HasCompletedBackfill(stream *ConfiguredStream) bool
- func (s *State) LogState()
- func (s *State) LogWithLock()
- func (s *State) MarshalJSON() ([]byte, error)
- func (s *State) RemoveChunk(stream *ConfiguredStream, chunk Chunk) int
- func (s *State) ResetStreams()
- func (s *State) SetChunks(stream *ConfiguredStream, chunks *Set[Chunk])
- func (s *State) SetCursor(stream *ConfiguredStream, key string, value any)
- func (s *State) SetGlobal(state any, streams ...string)
- func (s *State) SetType(typ StateType)
- type StateInterface
- type StateType
- type StatusRow
- type Stream
- func (s *Stream) ID() string
- func (s *Stream) UnmarshalJSON(data []byte) error
- func (s *Stream) UpsertField(column string, typ DataType, nullable bool)
- func (s *Stream) WithCursorField(columns ...string) *Stream
- func (s *Stream) WithPrimaryKey(keys ...string) *Stream
- func (s *Stream) WithSchema(schema *TypeSchema) *Stream
- func (s *Stream) WithSyncMode(modes ...SyncMode) *Stream
- func (s *Stream) Wrap(_ int) *ConfiguredStream
- type StreamInterface
- type StreamMetadata
- type StreamState
- type SyncMode
- type TypeSchema
- func (t *TypeSchema) AddTypes(column string, types ...DataType)
- func (t *TypeSchema) GetProperty(column string) (bool, *Property)
- func (t *TypeSchema) GetType(column string) (DataType, error)
- func (t *TypeSchema) MarshalJSON() ([]byte, error)
- func (t *TypeSchema) Override(fields map[string]*Property)
- func (t *TypeSchema) ToParquet() *parquet.Schema
- func (t *TypeSchema) UnmarshalJSON(data []byte) error
- type WriterConfig
Constants ¶
This section is empty.
Variables ¶
var ( ErrStateMissing = errors.New("stream missing from state") ErrStateCursorMissing = errors.New("cursor field missing from state") )
var TypeWeights = map[DataType]int{ Bool: 0, Int32: 1, Int64: 2, Float64: 3, Float32: 4, String: 5, TimestampNano: 9, TimestampMicro: 8, TimestampMilli: 7, Timestamp: 6, }
Functions ¶
func LogCatalog ¶
func StreamsToMap ¶
Types ¶
type AdapterType ¶
type AdapterType string
const ( Parquet AdapterType = "PARQUET" Iceberg AdapterType = "ICEBERG" )
type Catalog ¶
type Catalog struct { SelectedStreams map[string][]StreamMetadata `json:"selected_streams,omitempty"` Streams []*ConfiguredStream `json:"streams,omitempty"` }
ConfiguredCatalog is a dto for formatted airbyte catalog serialization
func GetWrappedCatalog ¶
type ConfiguredStream ¶
type ConfiguredStream struct { StreamMetadata StreamMetadata `json:"-"` Stream *Stream `json:"stream,omitempty"` }
Input/Processed object for Stream
func (*ConfiguredStream) Cursor ¶
func (s *ConfiguredStream) Cursor() (string, string)
returns primary and secondary cursor
func (*ConfiguredStream) GetFilter ¶ added in v0.1.6
func (s *ConfiguredStream) GetFilter() (Filter, error)
func (*ConfiguredStream) GetStream ¶
func (s *ConfiguredStream) GetStream() *Stream
func (*ConfiguredStream) GetSyncMode ¶
func (s *ConfiguredStream) GetSyncMode() SyncMode
func (*ConfiguredStream) ID ¶
func (s *ConfiguredStream) ID() string
func (*ConfiguredStream) Name ¶
func (s *ConfiguredStream) Name() string
func (*ConfiguredStream) Namespace ¶
func (s *ConfiguredStream) Namespace() string
func (*ConfiguredStream) NormalizationEnabled ¶
func (s *ConfiguredStream) NormalizationEnabled() bool
func (*ConfiguredStream) Schema ¶
func (s *ConfiguredStream) Schema() *TypeSchema
func (*ConfiguredStream) Self ¶
func (s *ConfiguredStream) Self() *ConfiguredStream
func (*ConfiguredStream) SupportedSyncModes ¶
func (s *ConfiguredStream) SupportedSyncModes() *Set[SyncMode]
func (*ConfiguredStream) Validate ¶
func (s *ConfiguredStream) Validate(source *Stream) error
Validate Configured Stream with Source Stream
type ConnectionStatus ¶
type ConnectionStatus string
const ( ConnectionSucceed ConnectionStatus = "SUCCEEDED" ConnectionFailed ConnectionStatus = "FAILED" )
type DataType ¶
type DataType string
const ( Null DataType = "null" Int32 DataType = "integer_small" Int64 DataType = "integer" Float32 DataType = "number_small" Float64 DataType = "number" String DataType = "string" Bool DataType = "boolean" Object DataType = "object" Array DataType = "array" Unknown DataType = "unknown" Timestamp DataType = "timestamp" TimestampMilli DataType = "timestamp_milli" // storing datetime up to 3 precisions TimestampMicro DataType = "timestamp_micro" // storing datetime up to 6 precisions TimestampNano DataType = "timestamp_nano" // storing datetime up to 9 precisions )
func GetCommonAncestorType ¶ added in v0.1.7
GetCommonAncestorType returns lowest common ancestor type
func (DataType) ToNewParquet ¶
func (d DataType) ToNewParquet() parquet.Node
type Filter ¶ added in v0.1.6
type Filter struct { Conditions []Condition // a > b, a < b LogicalOperator string // condition[0] and/or condition[1], single and/or supported }
Filter represents the parsed filter
type GlobalState ¶
type GlobalState struct { // Global State shared by streams State any `json:"state"` // Attaching Streams to Global State helps in recognizing the tables that the state belongs to. // // This results in helping connector determine what streams were synced during the last sync in // Group read. and also helps connectors to migrate from incremental to CDC Read without the need to // full load with the help of using cursor value and field as recovery cursor for CDC Streams *Set[string] `json:"streams"` }
type Identifier ¶
type Identifier interface {
ID() string
}
type Message ¶
type Message struct { Type MessageType `json:"type"` Log *Log `json:"log,omitempty"` ConnectionStatus *StatusRow `json:"connectionStatus,omitempty"` State *State `json:"state,omitempty"` Catalog *Catalog `json:"catalog,omitempty"` Action *ActionRow `json:"action,omitempty"` Spec map[string]interface{} `json:"spec,omitempty"` }
Message is a dto for olake output row representation
type MessageType ¶
type MessageType string
const ( LogMessage MessageType = "LOG" ConnectionStatusMessage MessageType = "CONNECTION_STATUS" StateMessage MessageType = "STATE" RecordMessage MessageType = "RECORD" CatalogMessage MessageType = "CATALOG" SpecMessage MessageType = "SPEC" ActionMessage MessageType = "ACTION" )
type Property ¶
Property is a dto for catalog properties representation
type RawRecord ¶
type RawRecord struct { Data map[string]any `parquet:"data,json"` OlakeID string `parquet:"_olake_id"` OlakeTimestamp time.Time `parquet:"_olake_timestamp"` OperationType string `parquet:"_op_type"` // "r" for read/backfill, "c" for create, "u" for update, "d" for delete CdcTimestamp time.Time `parquet:"_cdc_timestamp"` }
func CreateRawRecord ¶
type Set ¶
type Set[T comparable] struct { // contains filtered or unexported fields }
func (*Set[T]) Difference ¶
Find the difference between two sets
func (*Set[T]) Intersection ¶
Find the intersection of two sets
func (*Set[T]) MarshalJSON ¶
func (*Set[T]) ProperSubsetOf ¶
Test whether or not st set is a proper subset of "set"
func (*Set[T]) UnmarshalJSON ¶
func (*Set[T]) WithHasher ¶
type State ¶
type State struct { *sync.RWMutex `json:"-"` Type StateType `json:"type"` Global *GlobalState `json:"global,omitempty"` Streams []*StreamState `json:"streams,omitempty"` // TODO: make it set }
TODO: Add validation tags; Write custom unmarshal that triggers validation State is a dto for airbyte state serialization
func (*State) GetChunks ¶
func (s *State) GetChunks(stream *ConfiguredStream) *Set[Chunk]
GetStateChunks retrieves all chunks from the state.
func (*State) GetGlobal ¶
func (s *State) GetGlobal() *GlobalState
func (*State) HasCompletedBackfill ¶
func (s *State) HasCompletedBackfill(stream *ConfiguredStream) bool
func (*State) LogWithLock ¶
func (s *State) LogWithLock()
func (*State) MarshalJSON ¶
func (*State) RemoveChunk ¶
func (s *State) RemoveChunk(stream *ConfiguredStream, chunk Chunk) int
remove chunk returns remaining chunk count after removing the chunk
func (*State) ResetStreams ¶
func (s *State) ResetStreams()
func (*State) SetChunks ¶
func (s *State) SetChunks(stream *ConfiguredStream, chunks *Set[Chunk])
set chunks
func (*State) SetCursor ¶
func (s *State) SetCursor(stream *ConfiguredStream, key string, value any)
type StateInterface ¶
type StateInterface interface { ResetStreams() SetType(typ StateType) GetCursor(stream *ConfiguredStream, key string) any SetCursor(stream *ConfiguredStream, key, value any) GetChunks(stream *ConfiguredStream) *Set[Chunk] SetChunks(stream *ConfiguredStream, chunks *Set[Chunk]) RemoveChunk(stream *ConfiguredStream, chunk Chunk) SetGlobal(globalState any, streams ...string) }
type StateType ¶
type StateType string
const ( // Global Type indicates that the connector solely acts on Globally shared state across streams GlobalType StateType = "GLOBAL" // Streme Type indicates that the connector solely acts on individual stream state StreamType StateType = "STREAM" // Mixed type indicates that the connector works with a mix of Globally shared and // Individual stream state (Note: not being used yet but in plan) MixedType StateType = "MIXED" // constant key for chunks ChunksKey = "chunks" )
type StatusRow ¶
type StatusRow struct { Status ConnectionStatus `json:"status,omitempty"` Message string `json:"message,omitempty"` }
StatusRow is a dto for airbyte result status serialization
type Stream ¶
type Stream struct { // Name of the Stream Name string `json:"name,omitempty"` // Namespace of the Stream, or Database it belongs to // helps in identifying collections with same name in different database Namespace string `json:"namespace,omitempty"` // Possible Schema of the Stream Schema *TypeSchema `json:"type_schema,omitempty"` // Supported sync modes from driver for the respective Stream SupportedSyncModes *Set[SyncMode] `json:"supported_sync_modes,omitempty"` // Primary key if available SourceDefinedPrimaryKey *Set[string] `json:"source_defined_primary_key,omitempty"` // Available cursor fields supported by driver AvailableCursorFields *Set[string] `json:"available_cursor_fields,omitempty"` // Input of JSON Schema from Client to be parsed by driver AdditionalProperties string `json:"additional_properties,omitempty"` // Renderable JSON Schema for additional properties supported by respective driver for individual stream AdditionalPropertiesSchema schema.JSONSchema `json:"additional_properties_schema,omitempty"` // Cursor field to be used for incremental sync CursorField string `json:"cursor_field,omitempty"` // Mode being used for syncing data SyncMode SyncMode `json:"sync_mode,omitempty"` }
Output Stream Object for dsynk
func (*Stream) UnmarshalJSON ¶
func (*Stream) UpsertField ¶
Add or Update Column in Stream Type Schema
func (*Stream) WithCursorField ¶
func (*Stream) WithPrimaryKey ¶
func (*Stream) WithSchema ¶
func (s *Stream) WithSchema(schema *TypeSchema) *Stream
func (*Stream) WithSyncMode ¶
func (*Stream) Wrap ¶
func (s *Stream) Wrap(_ int) *ConfiguredStream
type StreamInterface ¶
type StreamInterface interface { ID() string Self() *ConfiguredStream Name() string Namespace() string Schema() *TypeSchema GetStream() *Stream GetSyncMode() SyncMode GetFilter() (Filter, error) SupportedSyncModes() *Set[SyncMode] Cursor() (string, string) Validate(source *Stream) error NormalizationEnabled() bool }
type StreamMetadata ¶
type StreamMetadata struct { ChunkColumn string `json:"chunk_column,omitempty"` PartitionRegex string `json:"partition_regex"` StreamName string `json:"stream_name"` AppendMode bool `json:"append_mode,omitempty"` Normalization bool `json:"normalization" default:"false"` Filter string `json:"filter,omitempty"` }
type StreamState ¶
type StreamState struct { HoldsValue atomic.Bool `json:"-"` // If State holds some value and should not be excluded during unmarshaling then value true Stream string `json:"stream"` Namespace string `json:"namespace"` SyncMode string `json:"sync_mode"` State sync.Map `json:"state"` }
func (*StreamState) MarshalJSON ¶
func (s *StreamState) MarshalJSON() ([]byte, error)
MarshalJSON custom marshaller to handle sync.Map encoding
func (*StreamState) UnmarshalJSON ¶
func (s *StreamState) UnmarshalJSON(data []byte) error
UnmarshalJSON custom unmarshaller to handle sync.Map decoding
type TypeSchema ¶
func NewTypeSchema ¶
func NewTypeSchema() *TypeSchema
func (*TypeSchema) AddTypes ¶
func (t *TypeSchema) AddTypes(column string, types ...DataType)
func (*TypeSchema) GetProperty ¶
func (t *TypeSchema) GetProperty(column string) (bool, *Property)
func (*TypeSchema) MarshalJSON ¶
func (t *TypeSchema) MarshalJSON() ([]byte, error)
MarshalJSON custom marshaller to handle sync.Map encoding
func (*TypeSchema) Override ¶
func (t *TypeSchema) Override(fields map[string]*Property)
func (*TypeSchema) ToParquet ¶
func (t *TypeSchema) ToParquet() *parquet.Schema
func (*TypeSchema) UnmarshalJSON ¶
func (t *TypeSchema) UnmarshalJSON(data []byte) error
UnmarshalJSON custom unmarshaller to handle sync.Map decoding
type WriterConfig ¶
type WriterConfig struct { Type AdapterType `json:"type"` WriterConfig any `json:"writer"` }
TODO: Add validations