listener

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2021 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrPostgresConnection    = "db connection error"
	ErrReplicationConnection = "replication connection error"
	ErrNatsConnection        = "nats connection error"
	ErrPublishEvent          = "publish message error"
	ErrUnmarshalMsg          = "unmarshal wal message error"
	ErrAckWalMessage         = "acknowledge wal message error"
	ErrSendStandbyStatus     = "send standby status error"
)

Constants with error text message

View Source
const (
	StartServiceMessage = "service was started"
	StopServiceMessage  = "service was stopped"
)

Service info message.

View Source
const (
	// CommitMsgType protocol commit message type.
	CommitMsgType byte = 'C'

	// BeginMsgType protocol begin message type.
	BeginMsgType byte = 'B'

	// OriginMsgType protocol original message type.
	OriginMsgType byte = 'O'

	// RelationMsgType protocol relation message type.
	RelationMsgType byte = 'R'

	// TypeMsgType protocol message type.
	TypeMsgType byte = 'Y'

	// InsertMsgType protocol insert message type.
	InsertMsgType byte = 'I'

	// UpdateMsgType protocol update message type.
	UpdateMsgType byte = 'U'

	// DeleteMsgType protocol delete message type.
	DeleteMsgType byte = 'D'

	// NewTupleDataType protocol new tuple data type.
	NewTupleDataType byte = 'N'

	// TextDataType protocol test data type.
	TextDataType byte = 't'

	// NullDataType protocol NULL data type.
	NullDataType byte = 'n'

	// ToastDataType protocol toast data type.
	ToastDataType byte = 'u'
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ActionData added in v1.0.0

type ActionData struct {
	Schema  string
	Table   string
	Kind    ActionKind
	Columns []Column
}

ActionData kind of WAL message data.

type ActionKind added in v1.0.0

type ActionKind string

ActionKind kind of action on WAL message.

const (
	ActionKindInsert ActionKind = "INSERT"
	ActionKindUpdate ActionKind = "UPDATE"
	ActionKindDelete ActionKind = "DELETE"
)

kind of WAL message.

type Begin added in v1.0.0

type Begin struct {
	// Identifies the message as a begin message.
	LSN int64
	// Commit timestamp of the transaction.
	Timestamp time.Time
	// 	Xid of the transaction.
	XID int32
}

Begin message format.

type BinaryParser added in v1.0.0

type BinaryParser struct {
	// contains filtered or unexported fields
}

BinaryParser represent binary protocol parser.

func NewBinaryParser added in v1.0.0

func NewBinaryParser(byteOrder binary.ByteOrder) *BinaryParser

NewBinaryParser create instance of binary parser.

func (*BinaryParser) ParseWalMessage added in v1.0.0

func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error

ParseWalMessage parse postgres WAL message.

type Column added in v1.0.0

type Column struct {
	// contains filtered or unexported fields
}

Column of the table with which changes occur.

func (*Column) AssertValue added in v1.0.0

func (c *Column) AssertValue(src []byte)

AssertValue converts bytes to a specific type depending on the type of this data in the database table.

type Commit added in v1.0.0

type Commit struct {
	// Flags; currently unused (must be 0).
	Flags int8
	// The LSN of the commit.
	LSN int64
	// The end LSN of the transaction.
	TransactionLSN int64
	// Commit timestamp of the transaction.
	Timestamp time.Time
}

Commit message format.

type DataType added in v1.0.0

type DataType struct {
	// ID of the data type.
	ID int32
	// Namespace (empty string for pg_catalog).
	Namespace string
	// name of the data type.
	Name string
}

DataType path of WAL message data.

type Delete added in v1.0.0

type Delete struct {
	/// ID of the relation corresponding to the ID in the relation message.
	RelationID int32
	// Identifies the following TupleData submessage as a key.
	KeyTuple bool
	// Identifies the following TupleData message as a old tuple.
	OldTuple bool
	// TupleData message part representing the contents of new tuple.
	Row []TupleData
}

Delete message format.

type Event

type Event struct {
	ID        uuid.UUID              `json:"id"`
	Schema    string                 `json:"schema"`
	Table     string                 `json:"table"`
	Action    string                 `json:"action"`
	Data      map[string]interface{} `json:"data"`
	EventTime time.Time              `json:"commitTime"`
}

Event event structure for publishing to the NATS server.

func (Event) GetSubjectName

func (e Event) GetSubjectName(prefix string) string

GetSubjectName creates subject name from the prefix, schema and table name.

func (Event) MarshalEasyJSON

func (v Event) MarshalEasyJSON(w *jwriter.Writer)

MarshalEasyJSON supports easyjson.Marshaler interface

func (Event) MarshalJSON

func (v Event) MarshalJSON() ([]byte, error)

MarshalJSON supports json.Marshaler interface

func (*Event) UnmarshalEasyJSON

func (v *Event) UnmarshalEasyJSON(l *jlexer.Lexer)

UnmarshalEasyJSON supports easyjson.Unmarshaler interface

func (*Event) UnmarshalJSON

func (v *Event) UnmarshalJSON(data []byte) error

UnmarshalJSON supports json.Unmarshaler interface

type Insert added in v1.0.0

type Insert struct {
	/// ID of the relation corresponding to the ID in the relation message.
	RelationID int32
	// Identifies the following TupleData message as a new tuple.
	NewTuple bool
	// TupleData message part representing the contents of new tuple.
	Row []TupleData
}

Insert message format.

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

Listener main service struct.

func NewWalListener

func NewWalListener(
	cfg *config.Config,
	repo repository,
	repl replication,
	publ publisher,
	parser parser,
) *Listener

NewWalListener create and initialize new service instance.

func (*Listener) AckWalMessage

func (l *Listener) AckWalMessage(lsn uint64) error

AckWalMessage acknowledge received wal message.

func (*Listener) Process

func (l *Listener) Process() error

Process is main service entry point.

func (*Listener) SendPeriodicHeartbeats

func (l *Listener) SendPeriodicHeartbeats(ctx context.Context)

SendPeriodicHeartbeats send periodic keep alive heartbeats to the server.

func (*Listener) SendStandbyStatus

func (l *Listener) SendStandbyStatus() error

SendStandbyStatus sends a `StandbyStatus` object with the current RestartLSN value to the server.

func (*Listener) Stop

func (l *Listener) Stop() error

Stop is a finalizer function.

func (*Listener) Stream

func (l *Listener) Stream(ctx context.Context)

Stream receive event from PostgreSQL. Accept message, apply filter and publish it in NATS server.

type NatsPublisher

type NatsPublisher struct {
	// contains filtered or unexported fields
}

NatsPublisher represent event publisher.

func NewNatsPublisher

func NewNatsPublisher(conn stan.Conn) *NatsPublisher

NewNatsPublisher return new NatsPublisher instance.

func (NatsPublisher) Close

func (n NatsPublisher) Close() error

Close NATS connection.

func (NatsPublisher) Publish

func (n NatsPublisher) Publish(subject string, event Event) error

Publish serializes the event and publishes it on the bus.

type Origin added in v1.0.0

type Origin struct {
	// The LSN of the commit on the origin server.
	LSN int64
	// name of the origin.
	Name string
}

Origin message format.

type Relation added in v1.0.0

type Relation struct {
	// ID of the relation.
	ID int32
	// Namespace (empty string for pg_catalog).
	Namespace string
	// Relation name.
	Name string
	// Replica identity setting for the relation (same as relreplident in pg_class).
	Replica int8
	Columns []RelationColumn
}

Relation message format.

type RelationColumn added in v1.0.0

type RelationColumn struct {
	// Flags for the column which marks the column as part of the key.
	Key bool
	// name of the column.
	Name string
	// ID of the column's data type.
	TypeID int32
	// valueType modifier of the column (atttypmod).
	ModifierType int32
}

RelationColumn path of WAL message data.

type RelationData added in v1.0.0

type RelationData struct {
	Schema  string
	Table   string
	Columns []Column
}

RelationData kind of WAL message data.

type RepositoryImpl added in v1.0.1

type RepositoryImpl struct {
	// contains filtered or unexported fields
}

RepositoryImpl service repository.

func NewRepository

func NewRepository(conn *pgx.Conn) *RepositoryImpl

NewRepository returns a new instance of the repository.

func (RepositoryImpl) Close added in v1.0.1

func (r RepositoryImpl) Close() error

Close database connection.

func (RepositoryImpl) CreatePublication added in v1.1.0

func (r RepositoryImpl) CreatePublication(name string) error

CreatePublication create publication fo all.

func (RepositoryImpl) GetSlotLSN added in v1.0.1

func (r RepositoryImpl) GetSlotLSN(slotName string) (string, error)

GetSlotLSN returns the value of the last offset for a specific slot.

func (RepositoryImpl) IsAlive added in v1.0.1

func (r RepositoryImpl) IsAlive() bool

IsAlive check database connection problems.

type TupleData added in v1.0.0

type TupleData struct {
	Value []byte
}

TupleData path of WAL message data.

type Update added in v1.0.0

type Update struct {
	/// ID of the relation corresponding to the ID in the relation message.
	RelationID int32
	// Identifies the following TupleData submessage as a key.
	KeyTuple bool
	// Identifies the following TupleData message as a old tuple.
	OldTuple bool
	// Identifies the following TupleData message as a new tuple.
	NewTuple bool
	// TupleData message part representing the contents of new tuple.
	Row []TupleData
	// TupleData message part representing the contents of the old tuple or primary key.
	//Only present if the previous 'O' or 'K' part is present.
	OldRow []TupleData
}

Update message format.

type WalTransaction added in v1.0.0

type WalTransaction struct {
	LSN           int64
	BeginTime     *time.Time
	CommitTime    *time.Time
	RelationStore map[int32]RelationData
	Actions       []ActionData
}

WalTransaction transaction specified WAL message.

func NewWalTransaction added in v1.0.0

func NewWalTransaction() *WalTransaction

NewWalTransaction create and initialize new WAL transaction.

func (*WalTransaction) Clear added in v1.0.0

func (w *WalTransaction) Clear()

Clear transaction data.

func (WalTransaction) CreateActionData added in v1.0.0

func (w WalTransaction) CreateActionData(
	relationID int32,
	rows []TupleData,
	kind ActionKind,
) (a ActionData, err error)

CreateActionData create action from WAL message data.

func (*WalTransaction) CreateEventsWithFilter added in v1.0.0

func (w *WalTransaction) CreateEventsWithFilter(
	tableMap map[string][]string) []Event

CreateEventsWithFilter filter WAL message by table, action and create events for each value.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL