adapter

package
v0.0.0-...-a0fe4de Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2021 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultMaxPingsOutstanding int = 3
View Source
var DefaultMaxReconnects int = -1
View Source
var DefaultPingInterval int64 = 10
View Source
var DefaultWorkerCount int = 128

Default settings

Functions

func StrToBytes

func StrToBytes(s string) []byte

Types

type Adapter

type Adapter struct {
	// contains filtered or unexported fields
}

func NewAdapter

func NewAdapter(a app.App) *Adapter

func (*Adapter) Init

func (adapter *Adapter) Init() error

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a Sarama consumer group consumer

func NewConsumer

func NewConsumer(source *Source) *Consumer

func (*Consumer) Cleanup

func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Consumer) ConsumeClaim

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*Consumer) Setup

func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim

type ConsumerEvent

type ConsumerEvent struct {
	Session sarama.ConsumerGroupSession
	Message *sarama.ConsumerMessage
}

type Event

type Event struct {
	Payload EventPayload `json:"payload"`
}

type EventPayload

type EventPayload struct {
	Before map[string]interface{} `json:"before"`
	After  map[string]interface{} `json:"after"`
	Source EventSource            `json:"source"`
	Op     string                 `json:"op"`
}

type EventSource

type EventSource struct {
	Snapshot string `json:"snapshot"`
	Table    string `json:"table"`
}

func (*EventSource) IsSnapshot

func (es *EventSource) IsSnapshot() bool

type Packet

type Packet struct {
	EventName string
	Payload   []byte
}

type Source

type Source struct {
	// contains filtered or unexported fields
}

func NewSource

func NewSource(adapter *Adapter, name string, sourceInfo *SourceInfo) *Source

func (*Source) HandleRequest

func (source *Source) HandleRequest(request *Packet)

func (*Source) Init

func (source *Source) Init() error

func (*Source) InitDebezium

func (source *Source) InitDebezium() error

func (*Source) InitSubscription

func (source *Source) InitSubscription() error

type SourceConfig

type SourceConfig struct {
	Sources map[string]SourceInfo `json:"sources"`
}

type SourceInfo

type SourceInfo struct {
	Host       string                 `json:"host"`
	KafkaHosts string                 `json:"kafka.hosts"`
	Tables     map[string]SourceTable `json:"tables"`
	Configs    map[string]interface{} `json:"configs"`
}

type SourceManager

type SourceManager struct {
	// contains filtered or unexported fields
}

func NewSourceManager

func NewSourceManager(adapter *Adapter) *SourceManager

func (*SourceManager) Initialize

func (sm *SourceManager) Initialize() error

func (*SourceManager) LoadSourceConfig

func (sm *SourceManager) LoadSourceConfig(filename string) (*SourceConfig, error)

type SourceTable

type SourceTable struct {
	Events SourceTableEvents `json:"events"`
}

type SourceTableEvents

type SourceTableEvents struct {
	Snapshot string `json:"snapshot"`
	Create   string `json:"create"`
	Update   string `json:"update"`
	Delete   string `json:"delete"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL