eventqueue

package
v0.0.0-...-e758639 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2021 License: ISC Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ByteString

type ByteString []byte

ByteString is a special type of byte array with implemented interfaces to convert from and to JSON and SQL values.

func (*ByteString) MarshalJSON

func (b *ByteString) MarshalJSON() ([]byte, error)

MarshalJSON implements the json.Marshaler interface.

func (*ByteString) Scan

func (b *ByteString) Scan(val interface{}) error

Scan implements the sql.Scanner interface.

func (*ByteString) UnmarshalJSON

func (b *ByteString) UnmarshalJSON(d []byte) error

UnmarshalJSON implements the json.Unmarshaler interface.

func (*ByteString) Value

func (b *ByteString) Value() (driver.Value, error)

Value implements the driver.Valuer interface.

type Event

type Event struct {
	ID         int             `json:"-"`
	UUID       string          `json:"uuid"`
	ExternalID ByteString      `json:"external_id"`
	TableName  string          `json:"-"`
	Statement  string          `json:"statement"`
	Data       json.RawMessage `json:"data"`
	CreatedAt  time.Time       `json:"created_at"`
	Processed  bool            `json:"-"`
}

Event represents the queued event in the database

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue represents the queue of snapshot/create/update/delete events stored in the database.

func New

func New(conninfo string) (*Queue, error)

New creates a new Queue, connected to the given database URL.

func NewWithDB

func NewWithDB(db *sql.DB) *Queue

NewWithDB creates a new Queue with the given database.

func (*Queue) Close

func (eq *Queue) Close() error

Close closes the Queue's database connection.

func (*Queue) ConfigureOutboundEventQueueAndTriggers

func (eq *Queue) ConfigureOutboundEventQueueAndTriggers(path string) error

ConfigureOutboundEventQueueAndTriggers will set up a new schema 'pg2kafka', with an 'outbound_event_queue' table that is used to store events, and all the triggers necessary to snapshot and start tracking changes for a given table.

func (*Queue) FetchUnprocessedRecords

func (eq *Queue) FetchUnprocessedRecords() ([]*Event, error)

FetchUnprocessedRecords fetches a page (up to 1000) of events that have not been marked as processed yet.

func (*Queue) MarkEventAsProcessed

func (eq *Queue) MarkEventAsProcessed(eventID int) error

MarkEventAsProcessed marks an even as processed.

func (*Queue) UnprocessedEventPagesCount

func (eq *Queue) UnprocessedEventPagesCount() (int, error)

UnprocessedEventPagesCount returns how many "pages" of events there are queued in the database. Currently page-size is hard-coded to 1000 events per page.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL