listener

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2024 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CDCData

type CDCData struct {
	// contains filtered or unexported fields
}

CDCData collects changes

func NewCDCData

func NewCDCData(pool *sync.Pool) *CDCData

func (*CDCData) Assert

func (s *CDCData) Assert(rawEvent *ChangeEventRaw) (a *ChangedData, err error)

Assert add changes to store

func (*CDCData) FilterEvent

func (s *CDCData) FilterEvent(ctx context.Context, tableMap map[string][]string) *publisher.Event

FilterEvent filter db events

type ChangeEventHandler

type ChangeEventHandler func(ctx context.Context, event *ChangeEventRaw) error

type ChangeEventRaw

type ChangeEventRaw struct {
	ID                        string
	Db                        string
	Coll                      string
	Kind                      OperationType
	FullDocumentBeforeChanges bson.D
	FullDocument              bson.D
}

type ChangedData

type ChangedData struct {
	// ID document id
	ID string
	// Db name
	Db string
	// Coll collection
	Coll string
	// Kind operation type
	Kind OperationType
	// TODO: need somehow get it
	OldDocument bson.M
	// NewDocument updated data
	NewDocument bson.M
}

ChangedData is kind of CDC message data.

type CreateCollectionOptions

type CreateCollectionOptions struct {
	DbName                       string
	CollName                     string
	Capped                       bool
	SizeInBytes                  int64
	ChangeStreamPreAndPostImages bool
}

type DbEventData

type DbEventData struct {
	CurrentResumeToken       resumeTokenKey `bson:"_id"`
	FullDocument             bson.D         `bson:"fullDocument"`
	FullDocumentBeforeChange bson.D         `bson:"fullDocumentBeforeChange"`
	DocumentKey              documentKey    `bson:"documentKey"`
	OperationType            OperationType  `bson:"operationType"`
}

type ICDCRepository

type ICDCRepository interface {
	CreateCollection(ctx context.Context, opts *CreateCollectionOptions) error
	GetResumeToken(db, coll string) string
	SaveResumeToken(token *models.ResumeTokenState) error
	GetWatchStream(watch *WatchCollectionOptions, opts *options.ChangeStreamOptions) (*mongo.ChangeStream, error)
	IsAlive() error
	Close() error
}

type IListener

type IListener interface {
	WatchCollection(ctx context.Context, opts *WatchCollectionOptions) error
	Run(ctx context.Context) error
	Stop() error
}

type ITokenSaver

type ITokenSaver interface {
	Run()
	Stop()
	GetResumeToken(db, coll string) string
	SaveResumeToken(token *models.ResumeTokenState) error
}

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

Listener wrapper for collections listen logic

func NewListener

NewListener create new listener

func (*Listener) Run

func (l *Listener) Run(ctx context.Context) error

Run start listen collections from db

func (*Listener) Stop

func (l *Listener) Stop() error

Stop stop listener and disconnect db

func (*Listener) WatchCollection

func (l *Listener) WatchCollection(ctx context.Context, opts *WatchCollectionOptions) error

WatchCollection get updates from collections

type OperationType

type OperationType string

func (OperationType) IsInsert

func (k OperationType) IsInsert() bool

func (OperationType) IsInvalid

func (k OperationType) IsInvalid() bool

func (OperationType) IsPublishable

func (k OperationType) IsPublishable() bool

type ResumeTokenSaver

type ResumeTokenSaver struct {
	// contains filtered or unexported fields
}

TODO: make using adapters: switch store between fs and mongodb ResumeTokenSaver

func NewResumeStore

func NewResumeStore() *ResumeTokenSaver

func (*ResumeTokenSaver) GetResumeToken

func (rs *ResumeTokenSaver) GetResumeToken(db, coll string) string

func (*ResumeTokenSaver) Run

func (rs *ResumeTokenSaver) Run()

func (*ResumeTokenSaver) SaveResumeToken

func (rs *ResumeTokenSaver) SaveResumeToken(data *models.ResumeTokenState) error

func (*ResumeTokenSaver) Stop

func (rs *ResumeTokenSaver) Stop()

type WatchCollectionOptions

type WatchCollectionOptions struct {
	WatchedDb          string
	WatchedColl        string
	ChangeEventHandler ChangeEventHandler
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL