changestream

package
v0.0.0-...-ae3587a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2020 License: LGPL-3.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChangeMessage

type ChangeMessage struct {
	Schema struct {
		Type   string `json:"type"`
		Fields []struct {
			Type   string `json:"type"`
			Fields []struct {
				Type     string `json:"type"`
				Optional bool   `json:"optional"`
				Field    string `json:"field"`
			} `json:"fields,omitempty"`
			Optional bool   `json:"optional"`
			Name     string `json:"name,omitempty"`
			Field    string `json:"field"`
		} `json:"fields"`
		Optional bool   `json:"optional"`
		Name     string `json:"name"`
	} `json:"schema"`
	Payload struct {
		Before json.RawMessage `json:"before"`
		After  json.RawMessage `json:"after"`
		Source struct {
			Version   string      `json:"version"`
			Connector string      `json:"connector"`
			Name      string      `json:"name"`
			TsMs      int64       `json:"ts_ms"`
			Snapshot  string      `json:"snapshot"`
			Db        string      `json:"db"`
			Schema    string      `json:"schema"`
			Table     string      `json:"table"`
			TxID      int         `json:"txId"`
			Lsn       int         `json:"lsn"`
			Xmin      interface{} `json:"xmin"`
		} `json:"source"`
		Op   string `json:"op"`
		TsMs int64  `json:"ts_ms"`
	} `json:"payload"`
}

ChangeMessage ...

type Filter

type Filter struct {
	*worker.Worker
	// contains filtered or unexported fields
}

Filter is responsible for reading the change stream, filtering out the events that are not interesting to us and writing new messages based on the changes to the filtered topic

func NewFilter

func NewFilter(kafkaAddress, kafkaGroupID, changesTopic, filteredTopic string, filter FilterFunc) *Filter

NewFilter returns an initilized Filter

type FilterFunc

type FilterFunc func(*ChangeMessage) ([]kafka.Message, error)

FilterFunc given a ChangeMessage from the changesTopic returns zero, one or multiple kafka Messages that should be written to the filteredTopic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL