listener

package
v1.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2022 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrPostgresConnection    = "db connection error"
	ErrReplicationConnection = "replication connection error"
	ErrPublishEvent          = "publish message error"
	ErrUnmarshalMsg          = "unmarshal wal message error"
	ErrAckWalMessage         = "acknowledge wal message error"
	ErrSendStandbyStatus     = "send standby status error"
)

Constants with error text message

View Source
const (
	StartServiceMessage = "service was started"
	StopServiceMessage  = "service was stopped"
)

Service info message.

View Source
const (
	// CommitMsgType protocol commit message type.
	CommitMsgType byte = 'C'

	// BeginMsgType protocol begin message type.
	BeginMsgType byte = 'B'

	// OriginMsgType protocol original message type.
	OriginMsgType byte = 'O'

	// RelationMsgType protocol relation message type.
	RelationMsgType byte = 'R'

	// TypeMsgType protocol message type.
	TypeMsgType byte = 'Y'

	// InsertMsgType protocol insert message type.
	InsertMsgType byte = 'I'

	// UpdateMsgType protocol update message type.
	UpdateMsgType byte = 'U'

	// DeleteMsgType protocol delete message type.
	DeleteMsgType byte = 'D'

	// NewTupleDataType protocol new tuple data type.
	NewTupleDataType byte = 'N'

	// TextDataType protocol test data type.
	TextDataType byte = 't'

	// NullDataType protocol NULL data type.
	NullDataType byte = 'n'

	// ToastDataType protocol toast data type.
	ToastDataType byte = 'u'
)

Variables

This section is empty.

Functions

func NewMockPublisher

func NewMockPublisher() *publisherMock

Types

type ActionData

type ActionData struct {
	Schema  string
	Table   string
	Kind    ActionKind
	Columns []Column
}

ActionData kind of WAL message data.

type ActionKind

type ActionKind string

ActionKind kind of action on WAL message.

const (
	ActionKindInsert ActionKind = "INSERT"
	ActionKindUpdate ActionKind = "UPDATE"
	ActionKindDelete ActionKind = "DELETE"
)

kind of WAL message.

type Begin

type Begin struct {
	// Identifies the message as a begin message.
	LSN int64
	// Commit timestamp of the transaction.
	Timestamp time.Time
	// 	Xid of the transaction.
	XID int32
}

Begin message format.

type BinaryParser

type BinaryParser struct {
	// contains filtered or unexported fields
}

BinaryParser represent binary protocol parser.

func NewBinaryParser

func NewBinaryParser(byteOrder binary.ByteOrder) *BinaryParser

NewBinaryParser create instance of binary parser.

func (*BinaryParser) ParseWalMessage

func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error

ParseWalMessage parse postgres WAL message.

type Column

type Column struct {
	// contains filtered or unexported fields
}

Column of the table with which changes occur.

func (*Column) AssertValue

func (c *Column) AssertValue(src []byte)

AssertValue converts bytes to a specific type depending on the type of this data in the database table.

type Commit

type Commit struct {
	// Flags; currently unused (must be 0).
	Flags int8
	// The LSN of the commit.
	LSN int64
	// The end LSN of the transaction.
	TransactionLSN int64
	// Commit timestamp of the transaction.
	Timestamp time.Time
}

Commit message format.

type DataType

type DataType struct {
	// ID of the data type.
	ID int32
	// Namespace (empty string for pg_catalog).
	Namespace string
	// name of the data type.
	Name string
}

DataType path of WAL message data.

type Delete

type Delete struct {
	/// ID of the relation corresponding to the ID in the relation message.
	RelationID int32
	// Identifies the following TupleData submessage as a key.
	KeyTuple bool
	// Identifies the following TupleData message as a old tuple.
	OldTuple bool
	// TupleData message part representing the contents of new tuple.
	Row []TupleData
}

Delete message format.

type Event

type Event struct {
	ID        uuid.UUID              `json:"id"`
	Schema    string                 `json:"schema"`
	Table     string                 `json:"table"`
	Action    string                 `json:"action"`
	Data      map[string]interface{} `json:"data"`
	EventTime time.Time              `json:"commitTime"`
	Topic     string                 `json:"-"`
}

Event event structure for publishing to the NATS server.

func (Event) GetSubjectName

func (e Event) GetSubjectName(prefix string) string

GetSubjectName creates subject name from the prefix, schema and table name.

func (Event) MarshalEasyJSON

func (v Event) MarshalEasyJSON(w *jwriter.Writer)

MarshalEasyJSON supports easyjson.Marshaler interface

func (Event) MarshalJSON

func (v Event) MarshalJSON() ([]byte, error)

MarshalJSON supports json.Marshaler interface

func (*Event) UnmarshalEasyJSON

func (v *Event) UnmarshalEasyJSON(l *jlexer.Lexer)

UnmarshalEasyJSON supports easyjson.Unmarshaler interface

func (*Event) UnmarshalJSON

func (v *Event) UnmarshalJSON(data []byte) error

UnmarshalJSON supports json.Unmarshaler interface

type Insert

type Insert struct {
	/// ID of the relation corresponding to the ID in the relation message.
	RelationID int32
	// Identifies the following TupleData message as a new tuple.
	NewTuple bool
	// TupleData message part representing the contents of new tuple.
	Row []TupleData
}

Insert message format.

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

Listener main service struct.

func NewWalListener

func NewWalListener(
	cfg *config.Config,
	repo repository,
	repl replication,
	publ publisher,
	parser parser,
) *Listener

NewWalListener create and initialize new service instance.

func (*Listener) AckWalMessage

func (l *Listener) AckWalMessage(lsn uint64) error

AckWalMessage acknowledge received wal message.

func (*Listener) Process

func (l *Listener) Process(ctx context.Context) error

Process is main service entry point.

func (*Listener) SendPeriodicHeartbeats

func (l *Listener) SendPeriodicHeartbeats(ctx context.Context)

SendPeriodicHeartbeats send periodic keep alive heartbeats to the server.

func (*Listener) SendStandbyStatus

func (l *Listener) SendStandbyStatus() error

SendStandbyStatus sends a `StandbyStatus` object with the current RestartLSN value to the server.

func (*Listener) Stop

func (l *Listener) Stop() error

Stop is a finalizer function.

func (*Listener) Stream

func (l *Listener) Stream(ctx context.Context)

Stream receive event from PostgreSQL. Accept message, apply filter and publish it in NATS server.

type NatsPublisher

type NatsPublisher struct {
	// contains filtered or unexported fields
}

NatsPublisher represent event publisher.

func (NatsPublisher) Close

func (n NatsPublisher) Close() error

Close NATS connection.

type Origin

type Origin struct {
	// The LSN of the commit on the origin server.
	LSN int64
	// name of the origin.
	Name string
}

Origin message format.

type Relation

type Relation struct {
	// ID of the relation.
	ID int32
	// Namespace (empty string for pg_catalog).
	Namespace string
	// Relation name.
	Name string
	// Replica identity setting for the relation (same as relreplident in pg_class).
	Replica int8
	Columns []RelationColumn
}

Relation message format.

type RelationColumn

type RelationColumn struct {
	// Flags for the column which marks the column as part of the key.
	Key bool
	// name of the column.
	Name string
	// ID of the column's data type.
	TypeID int32
	// valueType modifier of the column (atttypmod).
	ModifierType int32
}

RelationColumn path of WAL message data.

type RelationData

type RelationData struct {
	Schema  string
	Table   string
	Columns []Column
}

RelationData kind of WAL message data.

type RepositoryImpl

type RepositoryImpl struct {
	// contains filtered or unexported fields
}

RepositoryImpl service repository.

func NewRepository

func NewRepository(conn *pgx.Conn) *RepositoryImpl

NewRepository returns a new instance of the repository.

func (RepositoryImpl) Close

func (r RepositoryImpl) Close() error

Close database connection.

func (RepositoryImpl) CreatePublication

func (r RepositoryImpl) CreatePublication(name string) error

CreatePublication create publication fo all.

func (RepositoryImpl) GetSlotLSN

func (r RepositoryImpl) GetSlotLSN(slotName string) (string, error)

GetSlotLSN returns the value of the last offset for a specific slot.

func (RepositoryImpl) IsAlive

func (r RepositoryImpl) IsAlive() bool

IsAlive check database connection problems.

type TupleData

type TupleData struct {
	Value []byte
}

TupleData path of WAL message data.

type Update

type Update struct {
	/// ID of the relation corresponding to the ID in the relation message.
	RelationID int32
	// Identifies the following TupleData submessage as a key.
	KeyTuple bool
	// Identifies the following TupleData message as a old tuple.
	OldTuple bool
	// Identifies the following TupleData message as a new tuple.
	NewTuple bool
	// TupleData message part representing the contents of new tuple.
	Row []TupleData
	// TupleData message part representing the contents of the old tuple or primary key.
	//Only present if the previous 'O' or 'K' part is present.
	OldRow []TupleData
}

Update message format.

type WalTransaction

type WalTransaction struct {
	LSN           int64
	BeginTime     *time.Time
	CommitTime    *time.Time
	RelationStore map[int32]RelationData
	Actions       []ActionData
}

WalTransaction transaction specified WAL message.

func NewWalTransaction

func NewWalTransaction() *WalTransaction

NewWalTransaction create and initialize new WAL transaction.

func (*WalTransaction) Clear

func (w *WalTransaction) Clear()

Clear transaction data.

func (WalTransaction) CreateActionData

func (w WalTransaction) CreateActionData(
	relationID int32,
	rows []TupleData,
	kind ActionKind,
) (a ActionData, err error)

CreateActionData create action from WAL message data.

func (*WalTransaction) CreateEventsWithFilter

func (w *WalTransaction) CreateEventsWithFilter(
	tableMap map[string]config.Table) []Event

CreateEventsWithFilter filter WAL message by table, action and create events for each value.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL