pump

package
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PropSyncServerId = "sync.server-id"
	PropSyncHost     = "sync.host"
	PropSyncPort     = "sync.port"
	PropSyncUser     = "sync.user"
	PropSyncPassword = "sync.password"
	PropSyncPosFile  = "sync.pos.file"

	TypeInsert = "INS"
	TypeUpdate = "UPD"
	TypeDelete = "DEL"
)

Variables

This section is empty.

Functions

func AttachPosFile added in v0.0.5

func AttachPosFile(rail miso.Rail) error

func BootstrapServer added in v0.0.7

func BootstrapServer(args []string)

func DetachPosFile added in v0.0.6

func DetachPosFile(rail miso.Rail)

func FlushPosFile added in v0.0.6

func FlushPosFile()

func HasAnyEventHandler

func HasAnyEventHandler() bool

func OnEventReceived

func OnEventReceived(handler EventHandler)

func PostServerBootstrap

func PostServerBootstrap(rail miso.Rail) error

func PreServerBootstrap

func PreServerBootstrap(rail miso.Rail) error

func PrepareSync

func PrepareSync(rail miso.Rail) (*replication.BinlogSyncer, error)

func PumpEvents

func PumpEvents(c miso.Rail, syncer *replication.BinlogSyncer, streamer *replication.BinlogStreamer) error

func ResetTableInfoCache

func ResetTableInfoCache(c miso.Rail, schema string, table string)

func SetGlobalExclude

func SetGlobalExclude(r *regexp.Regexp)

func SetGlobalInclude

func SetGlobalInclude(r *regexp.Regexp)

Types

type ColumnInfo

type ColumnInfo struct {
	ColumnName      string `gorm:"column:COLUMN_NAME"`
	DataType        string `gorm:"column:DATA_TYPE"`
	OrdinalPosition int    `gorm:"column:ORDINAL_POSITION"`
}

type Condition

type Condition struct {
	ColumnChanged []string `mapstructure:"column-changed"`
}

type DataChangeEvent

type DataChangeEvent struct {
	Timestamp uint32         `json:"timestamp"` // epoch time second
	Schema    string         `json:"schema"`
	Table     string         `json:"table"`
	Type      string         `json:"type"` // INS-INSERT, UPD-UPDATE, DEL-DELETE
	Records   []Record       `json:"records"`
	Columns   []RecordColumn `json:"columns"`
}

func (DataChangeEvent) PrintRecord

func (d DataChangeEvent) PrintRecord(r Record) string

func (DataChangeEvent) String

func (d DataChangeEvent) String() string

type EventHandler

type EventHandler func(c miso.Rail, dce DataChangeEvent) error

type EventMapping

type EventMapping struct {
	From string
	To   string
	Type string
}

type EventPumpConfig

type EventPumpConfig struct {
	Filter    GlobalFilter `mapstructure:"filter"`
	Pipelines []Pipeline   `mapstructure:"pipeline"`
}

func LoadConfig

func LoadConfig() EventPumpConfig

type Filter

type Filter interface {
	Include(rail miso.Rail, evt any) bool
}

func NewFilters

func NewFilters(p Pipeline) []Filter

type GlobalFilter

type GlobalFilter struct {
	Include string
	Exclude string
}

type Mapper

type Mapper interface {
	MapEvent(DataChangeEvent) ([]any, error)
}

func NewMapper

func NewMapper() Mapper

type Pipeline

type Pipeline struct {
	Schema    string
	Table     string
	Type      string
	Stream    string
	Enabled   bool
	Condition Condition `mapstructure:"condition"`
}

type Record

type Record struct {
	Before []interface{} `json:"before"`
	After  []interface{} `json:"after"`
}

type RecordColumn

type RecordColumn struct {
	Name     string `json:"name"`
	DataType string `json:"dataType"`
}

type StreamEvent

type StreamEvent struct {
	Timestamp uint32                       `json:"timestamp"` // epoc time second
	Schema    string                       `json:"schema"`
	Table     string                       `json:"table"`
	Type      string                       `json:"type"`    // INS-INSERT, UPD-UPDATE, DEL-DELETE
	Columns   map[string]StreamEventColumn `json:"columns"` // key is the column name
}

type StreamEventColumn

type StreamEventColumn struct {
	DataType string `json:"dataType"`
	Before   string `json:"before"`
	After    string `json:"after"`
}

type TableInfo

type TableInfo struct {
	Schema  string
	Table   string
	Columns []ColumnInfo
}

func CachedTableInfo

func CachedTableInfo(c miso.Rail, schema string, table string) (TableInfo, error)

func FetchTableInfo

func FetchTableInfo(c miso.Rail, schema string, table string) (TableInfo, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL