Documentation ¶
Index ¶
- func ConsumeChangeStream(ctx *OpCtx, client *mongo.Client, ns string, o *Options) (err error)
- func DirectReadPaged(ctx *OpCtx, client *mongo.Client, ns string, o *Options) (err error)
- func DirectReadSegment(ctx *OpCtx, client *mongo.Client, ns string, o *Options, ...) (err error)
- func FetchDocuments(ctx *OpCtx, client *mongo.Client, filter OpFilter, buf *OpBuf, inOp OpChan, ...) error
- func FirstOpTimestamp(client *mongo.Client, o *Options) (primitive.Timestamp, error)
- func GetOpLogCursor(client *mongo.Client, after primitive.Timestamp, o *Options) (*mongo.Cursor, error)
- func LastOpTimestamp(client *mongo.Client, o *Options) (primitive.Timestamp, error)
- func OpLogCollection(client *mongo.Client, o *Options) *mongo.Collection
- func OpLogCollectionName(client *mongo.Client, o *Options) string
- func ParseTimestamp(timestamp primitive.Timestamp) (uint32, uint32)
- func ProcessDirectReads(ctx *OpCtx, client *mongo.Client, o *Options) (err error)
- func TailOps(ctx *OpCtx, client *mongo.Client, channels []OpChan, o *Options) error
- func UpdateIsReplace(entry map[string]interface{}) bool
- type ChangeDoc
- type ChangeDocNs
- type CollectionInfo
- type CollectionSegment
- type CollectionStats
- type DataUnmarshaller
- type Doc
- type N
- type Op
- func (this *Op) GetCollection() string
- func (this *Op) GetDatabase() string
- func (this *Op) IsCommand() bool
- func (this *Op) IsDelete() bool
- func (this *Op) IsDrop() bool
- func (this *Op) IsDropCollection() (string, bool)
- func (this *Op) IsDropDatabase() (string, bool)
- func (this *Op) IsInsert() bool
- func (this *Op) IsSourceDirect() bool
- func (this *Op) IsSourceOplog() bool
- func (this *Op) IsUpdate() bool
- func (this *Op) ParseLogEntry(entry *OpLog, o *Options) (include bool, err error)
- func (this *Op) ParseNamespace() []string
- type OpBuf
- type OpChan
- type OpCtx
- type OpCtxMulti
- type OpFilter
- type OpLog
- type OpLogEntry
- type OpResumeToken
- type Options
- type OrderingGuarantee
- type PipelineBuilder
- type QuerySource
- type ReplStatus
- type ResumeTokenGenenerator
- type ShardInfo
- type ShardInsertHandler
- type TimestampGenerator
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConsumeChangeStream ¶
func DirectReadPaged ¶
func DirectReadSegment ¶
func DirectReadSegment(ctx *OpCtx, client *mongo.Client, ns string, o *Options, seg *CollectionSegment, stats *CollectionStats) (err error)
func FetchDocuments ¶
func FirstOpTimestamp ¶
func GetOpLogCursor ¶
func LastOpTimestamp ¶
func OpLogCollection ¶
func OpLogCollection(client *mongo.Client, o *Options) *mongo.Collection
func ProcessDirectReads ¶
func UpdateIsReplace ¶
Types ¶
type ChangeDoc ¶
type ChangeDoc struct { DocKey map[string]interface{} "documentKey" Id interface{} "_id" Operation string "operationType" FullDoc map[string]interface{} "fullDocument" Namespace ChangeDocNs "ns" Timestamp primitive.Timestamp "clusterTime" UpdateDescription map[string]interface{} "updateDescription" }
type ChangeDocNs ¶
type CollectionInfo ¶
func GetCollectionInfo ¶
type CollectionSegment ¶
type CollectionSegment struct {
// contains filtered or unexported fields
}
type CollectionStats ¶
func GetCollectionStats ¶
type DataUnmarshaller ¶
type Op ¶
type Op struct { Id interface{} `json:"_id"` Operation string `json:"operation"` Namespace string `json:"namespace"` Data map[string]interface{} `json:"data,omitempty"` Timestamp primitive.Timestamp `json:"timestamp"` Source QuerySource `json:"source"` Doc interface{} `json:"doc,omitempty"` UpdateDescription map[string]interface{} `json:"updateDescription,omitempty"` ResumeToken OpResumeToken `json:"-"` }
func (*Op) GetCollection ¶
func (*Op) GetDatabase ¶
func (*Op) IsDropCollection ¶
func (*Op) IsDropDatabase ¶
func (*Op) IsSourceDirect ¶
func (*Op) IsSourceOplog ¶
func (*Op) ParseLogEntry ¶
func (*Op) ParseNamespace ¶
type OpCtx ¶
type OpCtxMulti ¶
type OpCtxMulti struct { OpC OpChan ErrC chan error DirectReadWg *sync.WaitGroup // contains filtered or unexported fields }
func StartMulti ¶
func StartMulti(clients []*mongo.Client, o *Options) *OpCtxMulti
func (*OpCtxMulti) AddShardListener ¶
func (ctx *OpCtxMulti) AddShardListener( configSession *mongo.Client, shardOptions *Options, handler ShardInsertHandler)
func (*OpCtxMulti) Pause ¶
func (ctx *OpCtxMulti) Pause()
func (*OpCtxMulti) Resume ¶
func (ctx *OpCtxMulti) Resume()
func (*OpCtxMulti) Since ¶
func (ctx *OpCtxMulti) Since(ts primitive.Timestamp)
func (*OpCtxMulti) Stop ¶
func (ctx *OpCtxMulti) Stop()
type OpFilter ¶
func ChainOpFilters ¶
func OpFilterForOrdering ¶
func OpFilterForOrdering(ordering OrderingGuarantee, workers []string, worker string) OpFilter
type OpLogEntry ¶
type OpLogEntry map[string]interface{}
type OpResumeToken ¶
type OpResumeToken struct { StreamID string ResumeToken interface{} }
type Options ¶
type Options struct { After TimestampGenerator Token ResumeTokenGenenerator Filter OpFilter NamespaceFilter OpFilter OpLogDisabled bool OpLogDatabaseName string OpLogCollectionName string ChannelSize int BufferSize int BufferDuration time.Duration Ordering OrderingGuarantee WorkerCount int MaxAwaitTime time.Duration UpdateDataAsDelta bool ChangeStreamNs []string DirectReadNs []string DirectReadFilter OpFilter DirectReadSplitMax int32 DirectReadConcur int DirectReadNoTimeout bool DirectReadBounded bool Unmarshal DataUnmarshaller Pipe PipelineBuilder PipeAllowDisk bool Log *log.Logger }
func DefaultOptions ¶
func DefaultOptions() *Options
func (*Options) SetDefaults ¶
func (this *Options) SetDefaults()
type OrderingGuarantee ¶
type OrderingGuarantee int
const ( Oplog OrderingGuarantee = iota // ops sent in oplog order (strong ordering) Namespace // ops sent in oplog order within a namespace Document // ops sent in oplog order for a single document AnyOrder // ops sent as they become available )
type PipelineBuilder ¶
type QuerySource ¶
type QuerySource int
const ( OplogQuerySource QuerySource = iota DirectQuerySource )
type ReplStatus ¶
func GetReplStatus ¶
func GetReplStatus(client *mongo.Client) (rs *ReplStatus, err error)
func (*ReplStatus) GetLastCommitted ¶
func (rs *ReplStatus) GetLastCommitted() (ts primitive.Timestamp, err error)
type ResumeTokenGenenerator ¶
Click to show internal directories.
Click to hide internal directories.