Documentation ¶
Index ¶
- Constants
- Variables
- func BinaryStringToBytes(s string) (bs []byte)
- func DateTimeValue(dateTime string, loc *time.Location) int64
- func DateValue(date string) int64
- func DecimalValueFromStringMysql(value string) string
- func NewBeforeAfter(tableIdent string, fields []*Schema) (*Schema, *Schema)
- func TimeStamp(timestamp string) string
- func TimeValue(value string) int64
- func YearValue(year string) int
- type ColDefs
- type DDLPayload
- type DDLPosition
- type DDLSource
- type DDLTableChange
- type DbzOutput
- type KafkaManager
- type KafkaRunner
- type KafkaTableItem
- type Row
- type Schema
- func NewBitsField(optional bool, field string, length string, defaultValue interface{}) *Schema
- func NewColDefSchema(tableIdent string, field string) *Schema
- func NewDateField(theType SchemaType, optional bool, field string, defaultValue interface{}) *Schema
- func NewDateTimeField(optional bool, field string, defaultValue interface{}, loc *time.Location) *Schema
- func NewDecimalField(precision int, scale int, optional bool, field string, ...) *Schema
- func NewEnumField(theType SchemaType, optional bool, field string, allowed string, ...) *Schema
- func NewEnvelopeSchema(tableIdent string, colDefs ColDefs) *Schema
- func NewJsonField(optional bool, field string) *Schema
- func NewKeySchema(tableIdent string, fields ColDefs) *Schema
- func NewSetField(theType SchemaType, optional bool, field string, allowed string, ...) *Schema
- func NewSimpleSchemaField(theType SchemaType, optional bool, field string) *Schema
- func NewSimpleSchemaWithDefaultField(theType SchemaType, optional bool, field string, defaultValue interface{}) *Schema
- func NewTimeField(optional bool, field string, defaultValue interface{}) *Schema
- func NewTimeStampField(optional bool, field string, defaultValue interface{}) *Schema
- func NewYearField(theType SchemaType, optional bool, field string, defaultValue interface{}) *Schema
- type SchemaJson
- type SchemaType
- type SourcePayload
- type ValuePayload
Constants ¶
View Source
const ( CONVERTER_JSON = "json" CONVERTER_AVRO = "avro" SCHEMA_TYPE_STRUCT = "struct" SCHEMA_TYPE_STRING = "string" SCHEMA_TYPE_INT64 = "int64" SCHEMA_TYPE_INT32 = "int32" SCHEMA_TYPE_INT16 = "int16" SCHEMA_TYPE_INT8 = "int8" SCHEMA_TYPE_BYTES = "bytes" SCHEMA_TYPE_FLOAT64 = "float64" SCHEMA_TYPE_TIMESTAMP = "timestamp" SCHEMA_TYPE_DOUBLE = "float64" SCHEMA_TYPE_FLOAT32 = "float32" SCHEMA_TYPE_BOOLEAN = "boolean" RECORD_OP_INSERT = "c" RECORD_OP_UPDATE = "u" RECORD_OP_DELETE = "d" RECORD_OP_READ = "r" )
View Source
const ( MySQLDateTimeFormat = "2006-01-02 15:04:05" MySQLDateFormat = "2006-01-02" )
Variables ¶
View Source
var ( SourceSchema = &Schema{ Fields: []*Schema{ NewSimpleSchemaField(SCHEMA_TYPE_STRING, true, "version"), NewSimpleSchemaField(SCHEMA_TYPE_STRING, false, "name"), NewSimpleSchemaField(SCHEMA_TYPE_INT64, false, "server_id"), NewSimpleSchemaField(SCHEMA_TYPE_INT64, false, "ts_sec"), NewSimpleSchemaField(SCHEMA_TYPE_STRING, true, "gtid"), NewSimpleSchemaField(SCHEMA_TYPE_STRING, false, "file"), NewSimpleSchemaField(SCHEMA_TYPE_INT64, false, "pos"), NewSimpleSchemaField(SCHEMA_TYPE_INT32, false, "row"), NewSimpleSchemaWithDefaultField(SCHEMA_TYPE_BOOLEAN, true, "snapshot", false), NewSimpleSchemaField(SCHEMA_TYPE_INT64, true, "thread"), NewSimpleSchemaField(SCHEMA_TYPE_STRING, true, "db"), NewSimpleSchemaField(SCHEMA_TYPE_STRING, true, "table"), NewSimpleSchemaField(SCHEMA_TYPE_STRING, true, "query"), }, Optional: false, Name: "io.debezium.connector.mysql.Source", Field: "source", Type: SCHEMA_TYPE_STRUCT, } )
Functions ¶
func BinaryStringToBytes ¶
func DecimalValueFromStringMysql ¶
value: e.g. decimal(11,5), 123.45 will be 123.45000
Types ¶
type DDLPayload ¶
type DDLPayload struct { // See dbz `class HistoryRecord`. Source DDLSource `json:"source,omitempty"` Position DDLPosition `json:"position,omitempty"` DatabaseName string `json:"databaseName,omitempty"` // unused for MySQL SchemaName string `json:"schemaName,omitempty"` DDL string `json:"ddl"` TableChanges []DDLTableChange `json:"tableChanges"` }
type DDLPosition ¶
type DDLTableChange ¶
type DDLTableChange struct { // create/alter/drop Type string `json:"type"` ID string `json:"id"` Table struct { DefaultCharsetName string `json:"defaultCharsetName"` PrimaryKeyColumnNames []string `json:"primaryKeyColumnNames"` Columns []struct { // See dbz `JsonTableChangeSerializer.toDocument(Column column)`. Name string `json:"name"` JdbcType int `json:"jdbcType"` NativeType *int `json:"nativeType,omitempty"` TypeName string `json:"typeName"` TypeExpression string `json:"typeExpression"` CharsetName *string `json:"charsetName"` Length *int `json:"length,omitempty"` Scale *int `json:"scale,omitempty"` Position int `json:"position"` Optional bool `json:"optional"` AutoIncremented bool `json:"autoIncremented"` Generated bool `json:"generated"` } `json:"columns"` } `json:"table"` Comment string `json:"comment"` }
type DbzOutput ¶
type DbzOutput struct { Schema *SchemaJson `json:"schema"` // ValuePayload or Row Payload interface{} `json:"payload"` }
type KafkaManager ¶
type KafkaManager struct { Cfg *common.KafkaConfig // contains filtered or unexported fields }
func NewKafkaManager ¶
func NewKafkaManager(kcfg *common.KafkaConfig) (*KafkaManager, error)
func (*KafkaManager) SendMessages ¶
func (k *KafkaManager) SendMessages(logger g.LoggerType, topics []string, keys [][]byte, values [][]byte) error
type KafkaRunner ¶
type KafkaRunner struct { Gtid string // TODO remove? BinlogFile string BinlogPos int64 // contains filtered or unexported fields }
func NewKafkaRunner ¶
func NewKafkaRunner(execCtx *common.ExecContext, logger g.LoggerType, storeManager *common.StoreManager, natsAddr string, waitCh chan *drivers.ExitResult, ctx context.Context) (kr *KafkaRunner, err error)
func (*KafkaRunner) Finish1 ¶
func (kr *KafkaRunner) Finish1() error
func (*KafkaRunner) Run ¶
func (kr *KafkaRunner) Run()
func (*KafkaRunner) Shutdown ¶
func (kr *KafkaRunner) Shutdown() error
func (*KafkaRunner) Stats ¶
func (kr *KafkaRunner) Stats() (*common.TaskStatistics, error)
type KafkaTableItem ¶
type KafkaTableItem struct {
// contains filtered or unexported fields
}
type Schema ¶
type Schema struct { Type SchemaType `json:"type"` Optional bool `json:"optional"` Default interface{} `json:"default,omitempty"` Field string `json:"field,omitempty"` // field name in outer struct Fields []*Schema `json:"fields,omitempty"` Name string `json:"name,omitempty"` Version int `json:"version,omitempty"` Parameters map[string]interface{} `json:"parameters,omitempty"` }
func NewBitsField ¶
func NewColDefSchema ¶
func NewDateField ¶
func NewDateField(theType SchemaType, optional bool, field string, defaultValue interface{}) *Schema
func NewDateTimeField ¶
func NewDecimalField ¶
func NewEnumField ¶
func NewEnumField(theType SchemaType, optional bool, field string, allowed string, defaultValue interface{}) *Schema
func NewEnvelopeSchema ¶
func NewJsonField ¶
func NewKeySchema ¶
func NewSetField ¶
func NewSetField(theType SchemaType, optional bool, field string, allowed string, defaultValue interface{}) *Schema
func NewSimpleSchemaField ¶
func NewSimpleSchemaField(theType SchemaType, optional bool, field string) *Schema
func NewSimpleSchemaWithDefaultField ¶
func NewSimpleSchemaWithDefaultField(theType SchemaType, optional bool, field string, defaultValue interface{}) *Schema
func NewTimeField ¶
func NewTimeStampField ¶
func NewYearField ¶
func NewYearField(theType SchemaType, optional bool, field string, defaultValue interface{}) *Schema
type SchemaJson ¶
type SchemaJson struct {
// contains filtered or unexported fields
}
func (*SchemaJson) MarshalJSON ¶
func (sj *SchemaJson) MarshalJSON() ([]byte, error)
type SchemaType ¶
type SchemaType string
type SourcePayload ¶
type SourcePayload struct { // we use 'interface{}' to represent an optional field Version string `json:"version"` Name string `json:"name"` ServerID int `json:"server_id"` TsSec int64 `json:"ts_sec"` Gtid interface{} `json:"gtid"` // real type: optional<string> File string `json:"file"` Pos int64 `json:"pos"` Query interface{} `json:"query"` Row int `json:"row"` Snapshot bool `json:"snapshot"` Thread interface{} `json:"thread"` // real type: optional<int64> Db string `json:"db"` Table string `json:"table"` }
type ValuePayload ¶
type ValuePayload struct { Before *Row `json:"before"` After *Row `json:"after"` Source *SourcePayload `json:"source"` Op string `json:"op"` TsMs int64 `json:"ts_ms"` }
func NewValuePayload ¶
func NewValuePayload() *ValuePayload
Click to show internal directories.
Click to hide internal directories.