listener

package
v2.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Int2OID = 21
	Int4OID = 23
	Int8OID = 20

	TextOID    = 25
	VarcharOID = 1043

	TimestampOID   = 1114
	TimestamptzOID = 1184
	DateOID        = 1082
	TimeOID        = 1083

	JSONBOID = 3802
	UUIDOID  = 2950
	BoolOID  = 16
)

PostgreSQL OIDs https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_type.dat

View Source
const (
	// CommitMsgType protocol commit message type.
	CommitMsgType byte = 'C'

	// BeginMsgType protocol begin message type.
	BeginMsgType byte = 'B'

	// OriginMsgType protocol original message type.
	OriginMsgType byte = 'O'

	// RelationMsgType protocol relation message type.
	RelationMsgType byte = 'R'

	// TypeMsgType protocol message type.
	TypeMsgType byte = 'Y'

	// InsertMsgType protocol insert message type.
	InsertMsgType byte = 'I'

	// UpdateMsgType protocol update message type.
	UpdateMsgType byte = 'U'

	// DeleteMsgType protocol delete message type.
	DeleteMsgType byte = 'D'

	// NewTupleDataType protocol new tuple data type.
	NewTupleDataType byte = 'N'

	// TextDataType protocol test data type.
	TextDataType byte = 't'

	// NullDataType protocol NULL data type.
	NullDataType byte = 'n'

	// ToastDataType protocol toast data type.
	ToastDataType byte = 'u'
)

Variables

This section is empty.

Functions

func SubjectName added in v2.4.0

func SubjectName(cfg *config.Config, event Event) string

SubjectName creates subject name from the prefix, schema and table name. Also using topic map from cfg.

Types

type ActionData

type ActionData struct {
	Schema  string
	Table   string
	Kind    ActionKind
	Columns []Column
}

ActionData kind of WAL message data.

type ActionKind

type ActionKind string

ActionKind kind of action on WAL message.

const (
	ActionKindInsert ActionKind = "INSERT"
	ActionKindUpdate ActionKind = "UPDATE"
	ActionKindDelete ActionKind = "DELETE"
)

kind of WAL message.

type Begin

type Begin struct {
	// Identifies the message as a begin message.
	LSN int64
	// Commit timestamp of the transaction.
	Timestamp time.Time
	// 	Xid of the transaction.
	XID int32
}

Begin message format.

type BinaryParser

type BinaryParser struct {
	// contains filtered or unexported fields
}

BinaryParser represent binary protocol parser.

func NewBinaryParser

func NewBinaryParser(byteOrder binary.ByteOrder) *BinaryParser

NewBinaryParser create instance of binary parser.

func (*BinaryParser) ParseWalMessage

func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error

ParseWalMessage parse postgres WAL message.

type Column

type Column struct {
	// contains filtered or unexported fields
}

Column of the table with which changes occur.

func (*Column) AssertValue

func (c *Column) AssertValue(src []byte)

AssertValue converts bytes to a specific type depending on the type of this data in the database table.

type Commit

type Commit struct {
	// Flags; currently unused (must be 0).
	Flags int8
	// The LSN of the commit.
	LSN int64
	// The end LSN of the transaction.
	TransactionLSN int64
	// Commit timestamp of the transaction.
	Timestamp time.Time
}

Commit message format.

type DataType

type DataType struct {
	// ID of the data type.
	ID int32
	// Namespace (empty string for pg_catalog).
	Namespace string
	// name of the data type.
	Name string
}

DataType path of WAL message data.

type Delete

type Delete struct {
	/// ID of the relation corresponding to the ID in the relation message.
	RelationID int32
	// Identifies the following TupleData submessage as a key.
	KeyTuple bool
	// Identifies the following TupleData message as a old tuple.
	OldTuple bool
	// TupleData message part representing the contents of new tuple.
	Row []TupleData
}

Delete message format.

type Event

type Event struct {
	ID        uuid.UUID              `json:"id"`
	Schema    string                 `json:"schema"`
	Table     string                 `json:"table"`
	Action    string                 `json:"action"`
	Data      map[string]interface{} `json:"data"`
	EventTime time.Time              `json:"commitTime"`
}

Event structure for publishing to the NATS server.

type Insert

type Insert struct {
	/// ID of the relation corresponding to the ID in the relation message.
	RelationID int32
	// Identifies the following TupleData message as a new tuple.
	NewTuple bool
	// TupleData message part representing the contents of new tuple.
	Row []TupleData
}

Insert message format.

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

Listener main service struct.

func NewWalListener

func NewWalListener(
	cfg *config.Config,
	log *logrus.Entry,
	repo repository,
	repl replication,
	publ Publisher,
	parser parser,
	slotName string,
) *Listener

NewWalListener create and initialize new service instance.

func (*Listener) AckWalMessage

func (l *Listener) AckWalMessage(lsn uint64) error

AckWalMessage acknowledge received wal message.

func (*Listener) Process

func (l *Listener) Process(ctx context.Context) error

Process is main service entry point.

func (*Listener) SendPeriodicHeartbeats

func (l *Listener) SendPeriodicHeartbeats(ctx context.Context)

SendPeriodicHeartbeats send periodic keep alive heartbeats to the server.

func (*Listener) SendStandbyStatus

func (l *Listener) SendStandbyStatus() error

SendStandbyStatus sends a `StandbyStatus` object with the current RestartLSN value to the server.

func (*Listener) Stop

func (l *Listener) Stop() error

Stop is a finalizer function.

func (*Listener) Stream

func (l *Listener) Stream(ctx context.Context)

Stream receive event from PostgreSQL. Accept message, apply filter and publish it in NATS server.

type NatsPublisher

type NatsPublisher struct {
	// contains filtered or unexported fields
}

NatsPublisher represent event publisher.

func NewNatsPublisher

func NewNatsPublisher(js nats.JetStreamContext) *NatsPublisher

NewNatsPublisher return new NatsPublisher instance.

func (NatsPublisher) Publish

func (n NatsPublisher) Publish(ctx context.Context, cfg *config.Config, log *logrus.Entry, event Event) error

Publish serializes the event and publishes it on the bus.

type Origin

type Origin struct {
	// The LSN of the commit on the origin server.
	LSN int64
	// name of the origin.
	Name string
}

Origin message format.

type PubSubPublisher added in v2.4.0

type PubSubPublisher struct {
	// contains filtered or unexported fields
}

PubSubPublisher publishes to pub/sub.

func NewPubSubPublisher added in v2.4.0

func NewPubSubPublisher(topic *pubsub.Topic) *PubSubPublisher

NewPubSubPublisher constructs a new pubsub publisher.

func (*PubSubPublisher) Publish added in v2.4.0

func (p *PubSubPublisher) Publish(ctx context.Context, _ *config.Config, log *logrus.Entry, event Event) error

Publish publishes a message to the configured pubsub topic.

type Publisher added in v2.4.0

type Publisher interface {
	Publish(context.Context, *config.Config, *logrus.Entry, Event) error
}

type Relation

type Relation struct {
	// ID of the relation.
	ID int32
	// Namespace (empty string for pg_catalog).
	Namespace string
	// Relation name.
	Name string
	// Replica identity setting for the relation (same as relreplident in pg_class).
	Replica int8
	Columns []RelationColumn
}

Relation message format.

type RelationColumn

type RelationColumn struct {
	// Flags for the column which marks the column as part of the key.
	Key bool
	// name of the column.
	Name string
	// ID of the column's data type.
	TypeID int32
	// valueType modifier of the column (atttypmod).
	ModifierType int32
}

RelationColumn path of WAL message data.

type RelationData

type RelationData struct {
	Schema  string
	Table   string
	Columns []Column
}

RelationData kind of WAL message data.

type RepositoryImpl

type RepositoryImpl struct {
	// contains filtered or unexported fields
}

RepositoryImpl service repository.

func NewRepository

func NewRepository(conn *pgx.Conn) *RepositoryImpl

NewRepository returns a new instance of the repository.

func (RepositoryImpl) Close

func (r RepositoryImpl) Close() error

Close database connection.

func (RepositoryImpl) CreatePublication

func (r RepositoryImpl) CreatePublication(name string) error

CreatePublication create publication fo all.

func (RepositoryImpl) GetSlotLSN

func (r RepositoryImpl) GetSlotLSN(slotName string) (string, error)

GetSlotLSN returns the value of the last offset for a specific slot.

func (RepositoryImpl) IsAlive

func (r RepositoryImpl) IsAlive() bool

IsAlive check database connection problems.

type TupleData

type TupleData struct {
	Value []byte
}

TupleData path of WAL message data.

type Update

type Update struct {
	/// ID of the relation corresponding to the ID in the relation message.
	RelationID int32
	// Identifies the following TupleData submessage as a key.
	KeyTuple bool
	// Identifies the following TupleData message as a old tuple.
	OldTuple bool
	// Identifies the following TupleData message as a new tuple.
	NewTuple bool
	// TupleData message part representing the contents of new tuple.
	Row []TupleData
	// TupleData message part representing the contents of the old tuple or primary key.
	//Only present if the previous 'O' or 'K' part is present.
	OldRow []TupleData
}

Update message format.

type WalTransaction

type WalTransaction struct {
	LSN           int64
	BeginTime     *time.Time
	CommitTime    *time.Time
	RelationStore map[int32]RelationData
	Actions       []ActionData
}

WalTransaction transaction specified WAL message.

func NewWalTransaction

func NewWalTransaction() *WalTransaction

NewWalTransaction create and initialize new WAL transaction.

func (*WalTransaction) Clear

func (w *WalTransaction) Clear()

Clear transaction data.

func (*WalTransaction) CreateActionData

func (w *WalTransaction) CreateActionData(relationID int32, rows []TupleData, kind ActionKind) (a ActionData, err error)

CreateActionData create action from WAL message data.

func (*WalTransaction) CreateEventsWithFilter

func (w *WalTransaction) CreateEventsWithFilter(tableMap map[string][]string) []Event

CreateEventsWithFilter filter WAL message by table, action and create events for each value.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL