mongo

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2023 License: MIT Imports: 22 Imported by: 0

Documentation

Overview

Third-party library code, from gtm

Index

Constants

View Source
const (
	LT  = "$lt"
	LTE = "$lte"
	GT  = "$gt"
	GTE = "$gte"
	ALL = "$all"
	OR  = "$or"
	Ne  = "$ne"
	IN  = "$in"
)

Variables

This section is empty.

Functions

func ConsumeChangeStream

func ConsumeChangeStream(ctx *OpCtx, client *mongo.Client, ns string, o *Options) (err error)

func DirectReadPaged

func DirectReadPaged(ctx *OpCtx, client *mongo.Client, ns string, o *Options) (err error)

func DirectReadSegment

func DirectReadSegment(ctx *OpCtx, client *mongo.Client, ns string, o *Options, seg *CollectionSegment, stats *CollectionStats) (err error)

func FetchDocuments

func FetchDocuments(ctx *OpCtx, client *mongo.Client, filter OpFilter, buf *OpBuf, inOp OpChan, o *Options) error

func FirstOpTimestamp

func FirstOpTimestamp(client *mongo.Client, o *Options) (primitive.Timestamp, error)

func GetOpLogCursor

func GetOpLogCursor(client *mongo.Client, after primitive.Timestamp, o *Options) (*mongo.Cursor, error)

func InitMongoCli added in v0.1.1

func InitMongoCli(ctx context.Context, uri string) error

func LastOpTimestamp

func LastOpTimestamp(client *mongo.Client, o *Options) (primitive.Timestamp, error)

func OpLogCollection

func OpLogCollection(client *mongo.Client, o *Options) *mongo.Collection

func OpLogCollectionName

func OpLogCollectionName(client *mongo.Client, o *Options) string

func ParseTimestamp

func ParseTimestamp(timestamp primitive.Timestamp) (uint32, uint32)

func ProcessDirectReads

func ProcessDirectReads(ctx *OpCtx, client *mongo.Client, o *Options) (err error)

func TailOps

func TailOps(ctx *OpCtx, client *mongo.Client, channels []OpChan, o *Options) error

func UpdateIsReplace

func UpdateIsReplace(entry map[string]interface{}) bool

Types

type BSONMarshaler

type BSONMarshaler struct{}

func (BSONMarshaler) Marshal

func (b BSONMarshaler) Marshal(v any) ([]byte, error)

func (BSONMarshaler) Unmarshal

func (b BSONMarshaler) Unmarshal(data []byte, v any) error

type ChangeDoc

type ChangeDoc struct {
	DocKey            map[string]interface{} "documentKey"
	Id                interface{}            "_id"
	Operation         string                 "operationType"
	FullDoc           map[string]interface{} "fullDocument"
	Namespace         ChangeDocNs            "ns"
	Timestamp         primitive.Timestamp    "clusterTime"
	UpdateDescription map[string]interface{} "updateDescription"
}

type ChangeDocNs

type ChangeDocNs struct {
	Database   string "db"
	Collection string "coll"
}

type CollectionInfo

type CollectionInfo struct {
	Name string "name"
	Type string "type"
}

func GetCollectionInfo

func GetCollectionInfo(ctx *OpCtx, client *mongo.Client, ns string) (info *CollectionInfo, err error)

type CollectionSegment

type CollectionSegment struct {
	// contains filtered or unexported fields
}

type CollectionStats

type CollectionStats struct {
	Count         int32 "count"
	AvgObjectSize int32 "avgObjSize"
}

func GetCollectionStats

func GetCollectionStats(ctx *OpCtx, client *mongo.Client, ns string) (stats *CollectionStats, err error)

type DataUnmarshaller

type DataUnmarshaller func(namespace string, data []byte) (interface{}, error)

type Doc

type Doc struct {
	Id interface{} "_id"
}

type MongoCli

type MongoCli[R core.IObject] struct {
	// contains filtered or unexported fields
}

func NewMongoCli

func NewMongoCli[R core.IObject]() *MongoCli[R]

func (*MongoCli[R]) Close

func (m *MongoCli[R]) Close() error

func (*MongoCli[R]) Count

func (m *MongoCli[R]) Count(ctx context.Context, q map[string]any) (int64, error)

func (*MongoCli[R]) Create

func (m *MongoCli[R]) Create(ctx context.Context, r R, q map[string]any) error

func (*MongoCli[R]) Delete

func (m *MongoCli[R]) Delete(ctx context.Context, q map[string]any) error

func (*MongoCli[R]) Get

func (m *MongoCli[R]) Get(ctx context.Context, q map[string]any) (R, error)

func (*MongoCli[R]) GetByQuery added in v0.2.0

func (m *MongoCli[R]) GetByQuery(ctx context.Context, query *query) (R, error)

func (*MongoCli[R]) List

func (m *MongoCli[R]) List(ctx context.Context, q map[string]any) ([]R, error)

func (*MongoCli[R]) Update

func (m *MongoCli[R]) Update(ctx context.Context, new R, q map[string]any) (R, error)

func (*MongoCli[R]) Watch

func (m *MongoCli[R]) Watch(ctx context.Context, kind string, q map[string]any) (<-chan core.Event, <-chan error)

type N

type N struct {
	// contains filtered or unexported fields
}

type Op

type Op struct {
	Id                interface{}            `json:"_id"`
	Operation         string                 `json:"operation"`
	Namespace         string                 `json:"namespace"`
	Data              map[string]interface{} `json:"data,omitempty"`
	Timestamp         primitive.Timestamp    `json:"timestamp"`
	Source            QuerySource            `json:"source"`
	Doc               interface{}            `json:"doc,omitempty"`
	UpdateDescription map[string]interface{} `json:"updateDescription,omitempty"`
	ResumeToken       OpResumeToken          `json:"-"`
}

func (*Op) GetCollection

func (this *Op) GetCollection() string

func (*Op) GetDatabase

func (this *Op) GetDatabase() string

func (*Op) IsCommand

func (this *Op) IsCommand() bool

func (*Op) IsDelete

func (this *Op) IsDelete() bool

func (*Op) IsDrop

func (this *Op) IsDrop() bool

func (*Op) IsDropCollection

func (this *Op) IsDropCollection() (string, bool)

func (*Op) IsDropDatabase

func (this *Op) IsDropDatabase() (string, bool)

func (*Op) IsInsert

func (this *Op) IsInsert() bool

func (*Op) IsSourceDirect

func (this *Op) IsSourceDirect() bool

func (*Op) IsSourceOplog

func (this *Op) IsSourceOplog() bool

func (*Op) IsUpdate

func (this *Op) IsUpdate() bool

func (*Op) ParseLogEntry

func (this *Op) ParseLogEntry(entry *OpLog, o *Options) (include bool, err error)

func (*Op) ParseNamespace

func (this *Op) ParseNamespace() []string

type OpBuf

type OpBuf struct {
	Entries        []*Op
	BufferSize     int
	BufferDuration time.Duration
}

func (*OpBuf) Append

func (this *OpBuf) Append(op *Op)

func (*OpBuf) Flush

func (this *OpBuf) Flush(client *mongo.Client, ctx *OpCtx, o *Options)

func (*OpBuf) HasOne

func (this *OpBuf) HasOne() bool

func (*OpBuf) IsFull

func (this *OpBuf) IsFull() bool

type OpChan

type OpChan chan *Op

func Tail

func Tail(client *mongo.Client, o *Options) (OpChan, chan error)

type OpCtx

type OpCtx struct {
	OpC          OpChan
	ErrC         chan error
	DirectReadWg *sync.WaitGroup
	// contains filtered or unexported fields
}

func Start

func Start(client *mongo.Client, o *Options) *OpCtx

func (*OpCtx) Pause

func (ctx *OpCtx) Pause()

func (*OpCtx) Resume

func (ctx *OpCtx) Resume()

func (*OpCtx) Since

func (ctx *OpCtx) Since(ts primitive.Timestamp)

func (*OpCtx) Stop

func (ctx *OpCtx) Stop()

type OpCtxMulti

type OpCtxMulti struct {
	OpC          OpChan
	ErrC         chan error
	DirectReadWg *sync.WaitGroup
	// contains filtered or unexported fields
}

func StartMulti

func StartMulti(clients []*mongo.Client, o *Options) *OpCtxMulti

func (*OpCtxMulti) AddShardListener

func (ctx *OpCtxMulti) AddShardListener(
	configSession *mongo.Client, shardOptions *Options, handler ShardInsertHandler)

func (*OpCtxMulti) Pause

func (ctx *OpCtxMulti) Pause()

func (*OpCtxMulti) Resume

func (ctx *OpCtxMulti) Resume()

func (*OpCtxMulti) Since

func (ctx *OpCtxMulti) Since(ts primitive.Timestamp)

func (*OpCtxMulti) Stop

func (ctx *OpCtxMulti) Stop()

type OpFilter

type OpFilter func(*Op) bool

func ChainOpFilters

func ChainOpFilters(filters ...OpFilter) OpFilter

func OpFilterForOrdering

func OpFilterForOrdering(ordering OrderingGuarantee, workers []string, worker string) OpFilter

type OpLog

type OpLog struct {
	Timestamp    primitive.Timestamp    "ts"
	HistoryID    int64                  "h"
	MongoVersion int                    "v"
	Operation    string                 "op"
	Namespace    string                 "ns"
	Doc          map[string]interface{} "o"
	Update       map[string]interface{} "o2"
}

type OpLogEntry

type OpLogEntry map[string]interface{}

type OpResumeToken

type OpResumeToken struct {
	StreamID    string
	ResumeToken interface{}
}

type Options

type Options struct {
	After               TimestampGenerator
	Token               ResumeTokenGenenerator
	Filter              OpFilter
	NamespaceFilter     OpFilter
	OpLogDisabled       bool
	OpLogDatabaseName   string
	OpLogCollectionName string
	ChannelSize         int
	BufferSize          int
	BufferDuration      time.Duration
	Ordering            OrderingGuarantee
	WorkerCount         int
	MaxAwaitTime        time.Duration
	UpdateDataAsDelta   bool
	ChangeStreamNs      []string
	DirectReadNs        []string
	DirectReadFilter    OpFilter
	DirectReadSplitMax  int32
	DirectReadConcur    int
	DirectReadNoTimeout bool
	DirectReadBounded   bool
	Unmarshal           DataUnmarshaller
	Pipe                PipelineBuilder
	PipeAllowDisk       bool
	Log                 *log.Logger
}

func DefaultOptions

func DefaultOptions() *Options

func (*Options) SetDefaults

func (this *Options) SetDefaults()

type OrderingGuarantee

type OrderingGuarantee int
const (
	Oplog     OrderingGuarantee = iota // ops sent in oplog order (strong ordering)
	Namespace                          // ops sent in oplog order within a namespace
	Document                           // ops sent in oplog order for a single document
	AnyOrder                           // ops sent as they become available
)

type PipelineBuilder

type PipelineBuilder func(namespace string, changeStream bool) ([]interface{}, error)

type QuerySource

type QuerySource int
const (
	OplogQuerySource QuerySource = iota
	DirectQuerySource
)

type ResumeTokenGenenerator

type ResumeTokenGenenerator func(*mongo.Client, string, *Options) (interface{}, error)

type ShardInfo

type ShardInfo struct {
	// contains filtered or unexported fields
}

func GetShards

func GetShards(client *mongo.Client) (shardInfos []*ShardInfo)

func (*ShardInfo) GetURL

func (shard *ShardInfo) GetURL() string

type ShardInsertHandler

type ShardInsertHandler func(*ShardInfo) (*mongo.Client, error)

type TimestampGenerator

type TimestampGenerator func(*mongo.Client, *Options) (primitive.Timestamp, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL