kafka

package
v0.2.1-0...-d023f04 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2023 License: MPL-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CONVERTER_JSON = "json"
	CONVERTER_AVRO = "avro"

	SCHEMA_TYPE_STRUCT    = "struct"
	SCHEMA_TYPE_STRING    = "string"
	SCHEMA_TYPE_INT64     = "int64"
	SCHEMA_TYPE_INT32     = "int32"
	SCHEMA_TYPE_INT16     = "int16"
	SCHEMA_TYPE_INT8      = "int8"
	SCHEMA_TYPE_BYTES     = "bytes"
	SCHEMA_TYPE_FLOAT64   = "float64"
	SCHEMA_TYPE_TIMESTAMP = "timestamp"
	SCHEMA_TYPE_DOUBLE    = "float64"
	SCHEMA_TYPE_FLOAT32   = "float32"
	SCHEMA_TYPE_BOOLEAN   = "boolean"

	RECORD_OP_INSERT = "c"
	RECORD_OP_UPDATE = "u"
	RECORD_OP_DELETE = "d"
	RECORD_OP_READ   = "r"
)
View Source
const (
	MySQLDateTimeFormat = "2006-01-02 15:04:05"
	MySQLDateFormat     = "2006-01-02"
)

Variables

Functions

func BinaryStringToBytes

func BinaryStringToBytes(s string) (bs []byte)

func DateTimeValue

func DateTimeValue(dateTime string, loc *time.Location) int64

func DateValue

func DateValue(date string) int64

func DecimalValueFromStringMysql

func DecimalValueFromStringMysql(value string) string

value: e.g. decimal(11,5), 123.45 will be 123.45000

func NewBeforeAfter

func NewBeforeAfter(tableIdent string, fields []*Schema) (*Schema, *Schema)

func TimeStamp

func TimeStamp(timestamp string) string

func TimeValue

func TimeValue(value string) int64

precision make no difference

func YearValue

func YearValue(year string) int

Types

type ColDefs

type ColDefs []*Schema

type DDLPayload

type DDLPayload struct {
	// See dbz `class HistoryRecord`.
	Source       DDLSource   `json:"source,omitempty"`
	Position     DDLPosition `json:"position,omitempty"`
	DatabaseName string      `json:"databaseName,omitempty"`
	// unused for MySQL
	SchemaName   string           `json:"schemaName,omitempty"`
	DDL          string           `json:"ddl"`
	TableChanges []DDLTableChange `json:"tableChanges"`
}

type DDLPosition

type DDLPosition struct {
	// See dbz `offsetUsingPosition`.
	TsSec    int64  `json:"ts_sec"`
	File     string `json:"file"`
	Pos      int64  `json:"pos"`
	Gtids    string `json:"gtids"`
	Snapshot bool   `json:"snapshot,omitempty"`
}

type DDLSource

type DDLSource struct {
	Server string `json:"server"`
}

type DDLTableChange

type DDLTableChange struct {
	// create/alter/drop
	Type  string `json:"type"`
	ID    string `json:"id"`
	Table struct {
		DefaultCharsetName    string   `json:"defaultCharsetName"`
		PrimaryKeyColumnNames []string `json:"primaryKeyColumnNames"`
		Columns               []struct {
			// See dbz `JsonTableChangeSerializer.toDocument(Column column)`.
			Name            string  `json:"name"`
			JdbcType        int     `json:"jdbcType"`
			NativeType      *int    `json:"nativeType,omitempty"`
			TypeName        string  `json:"typeName"`
			TypeExpression  string  `json:"typeExpression"`
			CharsetName     *string `json:"charsetName"`
			Length          *int    `json:"length,omitempty"`
			Scale           *int    `json:"scale,omitempty"`
			Position        int     `json:"position"`
			Optional        bool    `json:"optional"`
			AutoIncremented bool    `json:"autoIncremented"`
			Generated       bool    `json:"generated"`
		} `json:"columns"`
	} `json:"table"`
	Comment string `json:"comment"`
}

type DbzOutput

type DbzOutput struct {
	Schema *SchemaJson `json:"schema"`
	// ValuePayload or Row
	Payload interface{} `json:"payload"`
}

type KafkaManager

type KafkaManager struct {
	Cfg *common.KafkaConfig
	// contains filtered or unexported fields
}

func NewKafkaManager

func NewKafkaManager(kcfg *common.KafkaConfig) (*KafkaManager, error)

func (*KafkaManager) SendMessages

func (k *KafkaManager) SendMessages(logger g.LoggerType, topics []string, keys [][]byte, values [][]byte) error

type KafkaRunner

type KafkaRunner struct {
	Gtid       string // TODO remove?
	BinlogFile string
	BinlogPos  int64
	// contains filtered or unexported fields
}

func NewKafkaRunner

func NewKafkaRunner(execCtx *common.ExecContext, logger g.LoggerType, storeManager *common.StoreManager,
	natsAddr string, waitCh chan *drivers.ExitResult, ctx context.Context) (kr *KafkaRunner, err error)

func (*KafkaRunner) Finish1

func (kr *KafkaRunner) Finish1() error

func (*KafkaRunner) Run

func (kr *KafkaRunner) Run()

func (*KafkaRunner) Shutdown

func (kr *KafkaRunner) Shutdown() error

func (*KafkaRunner) Stats

func (kr *KafkaRunner) Stats() (*common.TaskStatistics, error)

type KafkaTableItem

type KafkaTableItem struct {
	// contains filtered or unexported fields
}

type Row

type Row struct {
	ColNames []string
	Values   []interface{}
}

func NewRow

func NewRow() *Row

func (*Row) AddField

func (r *Row) AddField(key string, value interface{})

func (*Row) MarshalJSON

func (r *Row) MarshalJSON() ([]byte, error)

type Schema

type Schema struct {
	Type       SchemaType             `json:"type"`
	Optional   bool                   `json:"optional"`
	Default    interface{}            `json:"default,omitempty"`
	Field      string                 `json:"field,omitempty"` // field name in outer struct
	Fields     []*Schema              `json:"fields,omitempty"`
	Name       string                 `json:"name,omitempty"`
	Version    int                    `json:"version,omitempty"`
	Parameters map[string]interface{} `json:"parameters,omitempty"`
}

func NewBitsField

func NewBitsField(optional bool, field string, length string, defaultValue interface{}) *Schema

func NewColDefSchema

func NewColDefSchema(tableIdent string, field string) *Schema

func NewDateField

func NewDateField(theType SchemaType, optional bool, field string, defaultValue interface{}) *Schema

func NewDateTimeField

func NewDateTimeField(optional bool, field string, defaultValue interface{}, loc *time.Location) *Schema

func NewDecimalField

func NewDecimalField(precision int, scale int, optional bool, field string, defaultValue interface{}) *Schema

func NewEnumField

func NewEnumField(theType SchemaType, optional bool, field string, allowed string, defaultValue interface{}) *Schema

func NewEnvelopeSchema

func NewEnvelopeSchema(tableIdent string, colDefs ColDefs) *Schema

func NewJsonField

func NewJsonField(optional bool, field string) *Schema

func NewKeySchema

func NewKeySchema(tableIdent string, fields ColDefs) *Schema

func NewSetField

func NewSetField(theType SchemaType, optional bool, field string, allowed string, defaultValue interface{}) *Schema

func NewSimpleSchemaField

func NewSimpleSchemaField(theType SchemaType, optional bool, field string) *Schema

func NewSimpleSchemaWithDefaultField

func NewSimpleSchemaWithDefaultField(theType SchemaType, optional bool, field string, defaultValue interface{}) *Schema

func NewTimeField

func NewTimeField(optional bool, field string, defaultValue interface{}) *Schema

func NewTimeStampField

func NewTimeStampField(optional bool, field string, defaultValue interface{}) *Schema

func NewYearField

func NewYearField(theType SchemaType, optional bool, field string, defaultValue interface{}) *Schema

type SchemaJson

type SchemaJson struct {
	// contains filtered or unexported fields
}

func (*SchemaJson) MarshalJSON

func (sj *SchemaJson) MarshalJSON() ([]byte, error)

type SchemaType

type SchemaType string

type SourcePayload

type SourcePayload struct {
	// we use 'interface{}' to represent an optional field
	Version  string      `json:"version"`
	Name     string      `json:"name"`
	ServerID int         `json:"server_id"`
	TsSec    int64       `json:"ts_sec"`
	Gtid     interface{} `json:"gtid"` // real type: optional<string>
	File     string      `json:"file"`
	Pos      int64       `json:"pos"`
	Query    interface{} `json:"query"`
	Row      int         `json:"row"`
	Snapshot bool        `json:"snapshot"`
	Thread   interface{} `json:"thread"` // real type: optional<int64>
	Db       string      `json:"db"`
	Table    string      `json:"table"`
}

type ValuePayload

type ValuePayload struct {
	Before *Row           `json:"before"`
	After  *Row           `json:"after"`
	Source *SourcePayload `json:"source"`
	Op     string         `json:"op"`
	TsMs   int64          `json:"ts_ms"`
}

func NewValuePayload

func NewValuePayload() *ValuePayload

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL