Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CDCData ¶
type CDCData struct {
// contains filtered or unexported fields
}
CDCData collects changes
func NewCDCData ¶
func (*CDCData) Assert ¶
func (s *CDCData) Assert(rawEvent *ChangeEventRaw) (a *ChangedData, err error)
Assert add changes to store
type ChangeEventHandler ¶
type ChangeEventHandler func(ctx context.Context, event *ChangeEventRaw) error
type ChangeEventRaw ¶
type ChangedData ¶
type ChangedData struct { // ID document id ID string // Db name Db string // Coll collection Coll string // Kind operation type Kind OperationType // TODO: need somehow get it OldDocument bson.M // NewDocument updated data NewDocument bson.M }
ChangedData is kind of CDC message data.
type CreateCollectionOptions ¶
type DbEventData ¶
type DbEventData struct { CurrentResumeToken resumeTokenKey `bson:"_id"` FullDocument bson.D `bson:"fullDocument"` FullDocumentBeforeChange bson.D `bson:"fullDocumentBeforeChange"` DocumentKey documentKey `bson:"documentKey"` OperationType OperationType `bson:"operationType"` }
type ICDCRepository ¶
type ICDCRepository interface { CreateCollection(ctx context.Context, opts *CreateCollectionOptions) error GetResumeToken(db, coll string) string SaveResumeToken(token *models.ResumeTokenState) error GetWatchStream(watch *WatchCollectionOptions, opts *options.ChangeStreamOptions) (*mongo.ChangeStream, error) IsAlive() error Close() error }
type ITokenSaver ¶
type ITokenSaver interface { Run() Stop() GetResumeToken(db, coll string) string SaveResumeToken(token *models.ResumeTokenState) error }
type Listener ¶
type Listener struct {
// contains filtered or unexported fields
}
Listener wrapper for collections listen logic
func NewListener ¶
func NewListener(pub *publisher.EventPublisherAdapter, metrics monitor.IMonitor, repo ICDCRepository, cfg *configs.ListenerConfig, saver ITokenSaver) *Listener
NewListener create new listener
func (*Listener) WatchCollection ¶
func (l *Listener) WatchCollection(ctx context.Context, opts *WatchCollectionOptions) error
WatchCollection get updates from collections
type OperationType ¶
type OperationType string
func (OperationType) IsInsert ¶
func (k OperationType) IsInsert() bool
func (OperationType) IsInvalid ¶
func (k OperationType) IsInvalid() bool
func (OperationType) IsPublishable ¶
func (k OperationType) IsPublishable() bool
type ResumeTokenSaver ¶
type ResumeTokenSaver struct {
// contains filtered or unexported fields
}
TODO: make using adapters: switch store between fs and mongodb ResumeTokenSaver
func NewResumeStore ¶
func NewResumeStore() *ResumeTokenSaver
func (*ResumeTokenSaver) GetResumeToken ¶
func (rs *ResumeTokenSaver) GetResumeToken(db, coll string) string
func (*ResumeTokenSaver) Run ¶
func (rs *ResumeTokenSaver) Run()
func (*ResumeTokenSaver) SaveResumeToken ¶
func (rs *ResumeTokenSaver) SaveResumeToken(data *models.ResumeTokenState) error
func (*ResumeTokenSaver) Stop ¶
func (rs *ResumeTokenSaver) Stop()
type WatchCollectionOptions ¶
type WatchCollectionOptions struct { WatchedDb string WatchedColl string ChangeEventHandler ChangeEventHandler }
Click to show internal directories.
Click to hide internal directories.