types

package
v0.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrStateMissing       = errors.New("stream missing from state")
	ErrStateCursorMissing = errors.New("cursor field missing from state")
)
View Source
var TypeWeights = map[DataType]int{
	Bool:           0,
	Int32:          1,
	Int64:          2,
	Float64:        3,
	Float32:        4,
	String:         5,
	TimestampNano:  9,
	TimestampMicro: 8,
	TimestampMilli: 7,
	Timestamp:      6,
}

Functions

func LogCatalog

func LogCatalog(streams []*Stream, oldCatalog *Catalog)

func StreamsToMap

func StreamsToMap(streams ...*Stream) map[string]*Stream

Types

type ActionRow

type ActionRow struct {
}

type AdapterType

type AdapterType string
const (
	Parquet AdapterType = "PARQUET"
	Iceberg AdapterType = "ICEBERG"
)

type Catalog

type Catalog struct {
	SelectedStreams map[string][]StreamMetadata `json:"selected_streams,omitempty"`
	Streams         []*ConfiguredStream         `json:"streams,omitempty"`
}

ConfiguredCatalog is a dto for formatted airbyte catalog serialization

func GetWrappedCatalog

func GetWrappedCatalog(streams []*Stream) *Catalog

type Chunk

type Chunk struct {
	Min any `json:"min"`
	Max any `json:"max"`
}

Chunk struct that holds status, min, and max values

type Condition added in v0.1.6

type Condition struct {
	Column   string
	Operator string
	Value    string
}

Condition represents a single condition in a filter

type ConfiguredStream

type ConfiguredStream struct {
	StreamMetadata StreamMetadata `json:"-"`
	Stream         *Stream        `json:"stream,omitempty"`
}

Input/Processed object for Stream

func (*ConfiguredStream) Cursor

func (s *ConfiguredStream) Cursor() (string, string)

returns primary and secondary cursor

func (*ConfiguredStream) GetFilter added in v0.1.6

func (s *ConfiguredStream) GetFilter() (Filter, error)

func (*ConfiguredStream) GetStream

func (s *ConfiguredStream) GetStream() *Stream

func (*ConfiguredStream) GetSyncMode

func (s *ConfiguredStream) GetSyncMode() SyncMode

func (*ConfiguredStream) ID

func (s *ConfiguredStream) ID() string

func (*ConfiguredStream) Name

func (s *ConfiguredStream) Name() string

func (*ConfiguredStream) Namespace

func (s *ConfiguredStream) Namespace() string

func (*ConfiguredStream) NormalizationEnabled

func (s *ConfiguredStream) NormalizationEnabled() bool

func (*ConfiguredStream) Schema

func (s *ConfiguredStream) Schema() *TypeSchema

func (*ConfiguredStream) Self

func (*ConfiguredStream) SupportedSyncModes

func (s *ConfiguredStream) SupportedSyncModes() *Set[SyncMode]

func (*ConfiguredStream) Validate

func (s *ConfiguredStream) Validate(source *Stream) error

Validate Configured Stream with Source Stream

type ConnectionStatus

type ConnectionStatus string
const (
	ConnectionSucceed ConnectionStatus = "SUCCEEDED"
	ConnectionFailed  ConnectionStatus = "FAILED"
)

type DataType

type DataType string
const (
	Null           DataType = "null"
	Int32          DataType = "integer_small"
	Int64          DataType = "integer"
	Float32        DataType = "number_small"
	Float64        DataType = "number"
	String         DataType = "string"
	Bool           DataType = "boolean"
	Object         DataType = "object"
	Array          DataType = "array"
	Unknown        DataType = "unknown"
	Timestamp      DataType = "timestamp"
	TimestampMilli DataType = "timestamp_milli" // storing datetime up to 3 precisions
	TimestampMicro DataType = "timestamp_micro" // storing datetime up to 6 precisions
	TimestampNano  DataType = "timestamp_nano"  // storing datetime up to 9 precisions
)

func GetCommonAncestorType added in v0.1.7

func GetCommonAncestorType(t1, t2 DataType) DataType

GetCommonAncestorType returns lowest common ancestor type

func (DataType) ToNewParquet

func (d DataType) ToNewParquet() parquet.Node

type Filter added in v0.1.6

type Filter struct {
	Conditions      []Condition // a > b, a < b
	LogicalOperator string      // condition[0] and/or condition[1], single and/or supported
}

Filter represents the parsed filter

type GlobalState

type GlobalState struct {
	// Global State shared by streams
	State any `json:"state"`
	// Attaching Streams to Global State helps in recognizing the tables that the state belongs to.
	//
	// This results in helping connector determine what streams were synced during the last sync in
	// Group read. and also helps connectors to migrate from incremental to CDC Read without the need to
	// full load with the help of using cursor value and field as recovery cursor for CDC
	Streams *Set[string] `json:"streams"`
}

type Hashable

type Hashable interface {
	Hash() string
}

type Identifier

type Identifier interface {
	ID() string
}

type Iterable

type Iterable interface {
	Next() bool
	Err() error
}

type Log

type Log struct {
	Level   string `json:"level,omitempty"`
	Message string `json:"message,omitempty"`
}

Log is a dto for airbyte logs serialization

type Message

type Message struct {
	Type             MessageType            `json:"type"`
	Log              *Log                   `json:"log,omitempty"`
	ConnectionStatus *StatusRow             `json:"connectionStatus,omitempty"`
	State            *State                 `json:"state,omitempty"`
	Catalog          *Catalog               `json:"catalog,omitempty"`
	Action           *ActionRow             `json:"action,omitempty"`
	Spec             map[string]interface{} `json:"spec,omitempty"`
}

Message is a dto for olake output row representation

type MessageType

type MessageType string
const (
	LogMessage              MessageType = "LOG"
	ConnectionStatusMessage MessageType = "CONNECTION_STATUS"
	StateMessage            MessageType = "STATE"
	RecordMessage           MessageType = "RECORD"
	CatalogMessage          MessageType = "CATALOG"
	SpecMessage             MessageType = "SPEC"
	ActionMessage           MessageType = "ACTION"
)

type Property

type Property struct {
	Type *Set[DataType] `json:"type,omitempty"`
}

Property is a dto for catalog properties representation

func (*Property) DataType

func (p *Property) DataType() DataType

returns datatype according to typecast tree if multiple type present

func (*Property) Nullable

func (p *Property) Nullable() bool

type RawRecord

type RawRecord struct {
	Data           map[string]any `parquet:"data,json"`
	OlakeID        string         `parquet:"_olake_id"`
	OlakeTimestamp time.Time      `parquet:"_olake_timestamp"`
	OperationType  string         `parquet:"_op_type"` // "r" for read/backfill, "c" for create, "u" for update, "d" for delete
	CdcTimestamp   time.Time      `parquet:"_cdc_timestamp"`
}

func CreateRawRecord

func CreateRawRecord(olakeID string, data map[string]any, operationType string, cdcTimestamp time.Time) RawRecord

func (*RawRecord) ToDebeziumFormat

func (r *RawRecord) ToDebeziumFormat(db string, stream string, normalization bool, threadID string) (string, error)

type Record

type Record map[string]any

type Set

type Set[T comparable] struct {
	// contains filtered or unexported fields
}

func NewSet

func NewSet[T comparable](initial ...T) *Set[T]

Create a new set

func (*Set[T]) Array

func (st *Set[T]) Array() []T

func (*Set[T]) Difference

func (st *Set[T]) Difference(set *Set[T]) *Set[T]

Find the difference between two sets

func (*Set[T]) Exists

func (st *Set[T]) Exists(element T) bool

Test to see whether or not the element is in the set

func (*Set[T]) Hash

func (st *Set[T]) Hash(elem T) string

func (*Set[T]) Insert

func (st *Set[T]) Insert(elements ...T)

Add an element to the set

func (*Set[T]) Intersection

func (st *Set[T]) Intersection(set *Set[T]) *Set[T]

Find the intersection of two sets

func (*Set[T]) Len

func (st *Set[T]) Len() int

Return the number of items in the set

func (*Set[T]) MarshalJSON

func (st *Set[T]) MarshalJSON() ([]byte, error)

func (*Set[T]) ProperSubsetOf

func (st *Set[T]) ProperSubsetOf(set *Set[T]) bool

Test whether or not st set is a proper subset of "set"

func (*Set[T]) Range

func (st *Set[T]) Range(f func(T))

Call f for each item in the set

func (*Set[T]) Remove

func (st *Set[T]) Remove(element T)

Remove an element from the set

func (*Set[T]) String

func (st *Set[T]) String() string

func (*Set[T]) SubsetOf

func (st *Set[T]) SubsetOf(set *Set[T]) bool

Test whether or not st set is a subset of "set"

func (*Set[T]) Union

func (st *Set[T]) Union(set *Set[T]) *Set[T]

Find the union of two sets

func (*Set[T]) UnmarshalJSON

func (st *Set[T]) UnmarshalJSON(data []byte) error

func (*Set[T]) WithHasher

func (st *Set[T]) WithHasher(f func(T) string) *Set[T]

type State

type State struct {
	*sync.RWMutex `json:"-"`
	Type          StateType      `json:"type"`
	Global        *GlobalState   `json:"global,omitempty"`
	Streams       []*StreamState `json:"streams,omitempty"` // TODO: make it set
}

TODO: Add validation tags; Write custom unmarshal that triggers validation State is a dto for airbyte state serialization

func (*State) GetChunks

func (s *State) GetChunks(stream *ConfiguredStream) *Set[Chunk]

GetStateChunks retrieves all chunks from the state.

func (*State) GetCursor

func (s *State) GetCursor(stream *ConfiguredStream, key string) any

func (*State) GetGlobal

func (s *State) GetGlobal() *GlobalState

func (*State) HasCompletedBackfill

func (s *State) HasCompletedBackfill(stream *ConfiguredStream) bool

func (*State) LogState

func (s *State) LogState()

func (*State) LogWithLock

func (s *State) LogWithLock()

func (*State) MarshalJSON

func (s *State) MarshalJSON() ([]byte, error)

func (*State) RemoveChunk

func (s *State) RemoveChunk(stream *ConfiguredStream, chunk Chunk) int

remove chunk returns remaining chunk count after removing the chunk

func (*State) ResetStreams

func (s *State) ResetStreams()

func (*State) SetChunks

func (s *State) SetChunks(stream *ConfiguredStream, chunks *Set[Chunk])

set chunks

func (*State) SetCursor

func (s *State) SetCursor(stream *ConfiguredStream, key string, value any)

func (*State) SetGlobal

func (s *State) SetGlobal(state any, streams ...string)

Set global state if state is not nil and streams are not empty

func (*State) SetType

func (s *State) SetType(typ StateType)

type StateInterface

type StateInterface interface {
	ResetStreams()
	SetType(typ StateType)
	GetCursor(stream *ConfiguredStream, key string) any
	SetCursor(stream *ConfiguredStream, key, value any)
	GetChunks(stream *ConfiguredStream) *Set[Chunk]
	SetChunks(stream *ConfiguredStream, chunks *Set[Chunk])
	RemoveChunk(stream *ConfiguredStream, chunk Chunk)
	SetGlobal(globalState any, streams ...string)
}

type StateType

type StateType string
const (
	// Global Type indicates that the connector solely acts on Globally shared state across streams
	GlobalType StateType = "GLOBAL"
	// Streme Type indicates that the connector solely acts on individual stream state
	StreamType StateType = "STREAM"
	// Mixed type indicates that the connector works with a mix of Globally shared and
	// Individual stream state (Note: not being used yet but in plan)
	MixedType StateType = "MIXED"
	// constant key for chunks
	ChunksKey = "chunks"
)

type StatusRow

type StatusRow struct {
	Status  ConnectionStatus `json:"status,omitempty"`
	Message string           `json:"message,omitempty"`
}

StatusRow is a dto for airbyte result status serialization

type Stream

type Stream struct {
	// Name of the Stream
	Name string `json:"name,omitempty"`
	// Namespace of the Stream, or Database it belongs to
	// helps in identifying collections with same name in different database
	Namespace string `json:"namespace,omitempty"`
	// Possible Schema of the Stream
	Schema *TypeSchema `json:"type_schema,omitempty"`
	// Supported sync modes from driver for the respective Stream
	SupportedSyncModes *Set[SyncMode] `json:"supported_sync_modes,omitempty"`
	// Primary key if available
	SourceDefinedPrimaryKey *Set[string] `json:"source_defined_primary_key,omitempty"`
	// Available cursor fields supported by driver
	AvailableCursorFields *Set[string] `json:"available_cursor_fields,omitempty"`
	// Input of JSON Schema from Client to be parsed by driver
	AdditionalProperties string `json:"additional_properties,omitempty"`
	// Renderable JSON Schema for additional properties supported by respective driver for individual stream
	AdditionalPropertiesSchema schema.JSONSchema `json:"additional_properties_schema,omitempty"`
	// Cursor field to be used for incremental sync
	CursorField string `json:"cursor_field,omitempty"`
	// Mode being used for syncing data
	SyncMode SyncMode `json:"sync_mode,omitempty"`
}

Output Stream Object for dsynk

func NewStream

func NewStream(name, namespace string) *Stream

func (*Stream) ID

func (s *Stream) ID() string

func (*Stream) UnmarshalJSON

func (s *Stream) UnmarshalJSON(data []byte) error

func (*Stream) UpsertField

func (s *Stream) UpsertField(column string, typ DataType, nullable bool)

Add or Update Column in Stream Type Schema

func (*Stream) WithCursorField

func (s *Stream) WithCursorField(columns ...string) *Stream

func (*Stream) WithPrimaryKey

func (s *Stream) WithPrimaryKey(keys ...string) *Stream

func (*Stream) WithSchema

func (s *Stream) WithSchema(schema *TypeSchema) *Stream

func (*Stream) WithSyncMode

func (s *Stream) WithSyncMode(modes ...SyncMode) *Stream

func (*Stream) Wrap

func (s *Stream) Wrap(_ int) *ConfiguredStream

type StreamInterface

type StreamInterface interface {
	ID() string
	Self() *ConfiguredStream
	Name() string
	Namespace() string
	Schema() *TypeSchema
	GetStream() *Stream
	GetSyncMode() SyncMode
	GetFilter() (Filter, error)
	SupportedSyncModes() *Set[SyncMode]
	Cursor() (string, string)
	Validate(source *Stream) error
	NormalizationEnabled() bool
}

type StreamMetadata

type StreamMetadata struct {
	ChunkColumn    string `json:"chunk_column,omitempty"`
	PartitionRegex string `json:"partition_regex"`
	StreamName     string `json:"stream_name"`
	AppendMode     bool   `json:"append_mode,omitempty"`
	Normalization  bool   `json:"normalization" default:"false"`
	Filter         string `json:"filter,omitempty"`
}

type StreamState

type StreamState struct {
	HoldsValue atomic.Bool `json:"-"` // If State holds some value and should not be excluded during unmarshaling then value true
	Stream     string      `json:"stream"`
	Namespace  string      `json:"namespace"`
	SyncMode   string      `json:"sync_mode"`
	State      sync.Map    `json:"state"`
}

func (*StreamState) MarshalJSON

func (s *StreamState) MarshalJSON() ([]byte, error)

MarshalJSON custom marshaller to handle sync.Map encoding

func (*StreamState) UnmarshalJSON

func (s *StreamState) UnmarshalJSON(data []byte) error

UnmarshalJSON custom unmarshaller to handle sync.Map decoding

type SyncMode

type SyncMode string
const (
	FULLREFRESH SyncMode = "full_refresh"
	INCREMENTAL SyncMode = "incremental"
	CDC         SyncMode = "cdc"
	STRICTCDC   SyncMode = "strict_cdc"
)

type TypeSchema

type TypeSchema struct {
	Properties sync.Map `json:"-"`
	// contains filtered or unexported fields
}

func NewTypeSchema

func NewTypeSchema() *TypeSchema

func (*TypeSchema) AddTypes

func (t *TypeSchema) AddTypes(column string, types ...DataType)

func (*TypeSchema) GetProperty

func (t *TypeSchema) GetProperty(column string) (bool, *Property)

func (*TypeSchema) GetType

func (t *TypeSchema) GetType(column string) (DataType, error)

func (*TypeSchema) MarshalJSON

func (t *TypeSchema) MarshalJSON() ([]byte, error)

MarshalJSON custom marshaller to handle sync.Map encoding

func (*TypeSchema) Override

func (t *TypeSchema) Override(fields map[string]*Property)

func (*TypeSchema) ToParquet

func (t *TypeSchema) ToParquet() *parquet.Schema

func (*TypeSchema) UnmarshalJSON

func (t *TypeSchema) UnmarshalJSON(data []byte) error

UnmarshalJSON custom unmarshaller to handle sync.Map decoding

type WriterConfig

type WriterConfig struct {
	Type         AdapterType `json:"type"`
	WriterConfig any         `json:"writer"`
}

TODO: Add validations

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL