README

Logo

lungo

Build Status Coverage Status GoDoc Release Go Report Card

A MongoDB compatible embeddable database and toolkit for Go.

Installation

To get started, install the package using the go tool:

$ go get -u github.com/256dpi/lungo

Example

This example shows a basic usage of the mongo compatible API.

Motivation

The document-oriented database MongoDB has become a widely used data store for many applications developed with the Go programming language. Both, the deprecated mgo and the official mongo driver offer a sophisticated interface to connect to a deployment and ingest and extract data using various commands. While this is enough for most projects, there are situations in which one thinks: "It would be cool if I could just do that in memory without asking the server."

Lungo tries to address this need by re-implementing the data handling mechanics in Go to be used on the client-side. This allows developers to pre- or post-process data in the application relieving the server. For example, applications may utilize this functionality to cache documents and query them quickly in memory.

But we do not need to stop there. Many developers coming from the SQL ecosystem enjoy working with SQLite as a simple alternative to bigger SQL databases. It allows running tests without setting up a database or even small production apps that write their data to a single backed-up file.

Lungo wants to offer a similar experience by implementing a full MongoDB compatible embeddable database that persists data in a single file. The project aims to provide drop-in compatibility with the API exported by the official Go driver. This way, applications may use lungo for running their tests or even low-write production deployments without big code changes.

However, one thing this project does not try to do is build another distributed database. MongoDB itself does a pretty good job at that already.

Architecture

The codebase is divided into the packages bsonkit, mongokit, dbkit and the main lungo package.

  • The bsonkit package provides building blocks that extend the ones found in the official bson package for handling BSON data. Its functions are mainly useful to applications that need to inspect, compare, convert, transform, clone, access, and manipulate BSON data directly in memory.

  • On top of that, the mongokit package provides the MongoDB data handling algorithms and structures. Specifically, it implements the MongoDB querying, update, and sort algorithms as well as a btree based index for documents. All of that is then bundled as a basic in-memory collection of documents that offers a familiar CRUD interface.

  • The dbkit package provides database-centric utilities e.g. atomic file write.

  • Finally, the lungo package implements the embeddable database and the mongo compatible driver. The heavy work is done by the engine and transaction types that manage access to the basic mongokit.Collection instances. While both can be used standalone, most users want to use the generic driver interface that can be used with MongoDB deployments and lungo engines.

Features

On a high level, lungo provides the following features (unchecked features are planned to be implemented):

  • CRUD, Index Management and Namespace Management
  • Single, Compound and Partial Indexes
  • Index Supported Sorting & Filtering
  • Sessions & Multi-Document Transactions
  • Oplog & Change Streams
  • Aggregation Pipeline
  • Memory & Single File Store
  • GridFS

While the goal is to implement all MongoDB features in a compatible way, the architectural difference has implications on some of the features. Furthermore, the goal is to build an open and accessible codebase that favors simplicity. Check out the following sections for details on the implementation.

CRUD, Index Management and Namespace Management

The driver supports all standard CRUD, index management and namespace management methods that are also exposed by the official driver. However, to this date, the driver does not yet support any of the MongoDB commands that can be issued using the Database.RunCommand method. Most unexported commands are related to query planning, replication, sharding, and user and role management features that we do not plan to support. However, we eventually will support some of the administrative and diagnostics commands e.g. renameCollection and explain.

Leveraging the mongokit.Match function, lungo supports the following query operators:

  • $and, $or, $nor, ($not)
  • $eq, $gt, $lt, $gte, $lte, $ne
  • ($in), ($nin), $exist, $type
  • $jsonSchema, $all, $size, $elemMatch

And the mongokit.Apply function currently supports the following update operators:

  • $set, $setOnInsert, $unset, $rename
  • $inc, $mul, $max, $min, ($push)
  • $pop, $currentDate, $[], $[<identifier>]

Finally, the mongokit.Project function currently supports the following projection operators:

  • $slice

Operators in braces are only partially supported, see comments in code.

Single, Compound and Partial Indexes

The mongokit.Index type supports single field and compound indexes that optionally enforce uniqueness or index a subset of documents using a partial filter expression. Single field indexes also support the automated expiry of documents aka. TTL indexes.

The more advanced multikey, geospatial, text, and hashed indexes are not yet supported and may be added later, while the deprecated sparse indexes will not. The recently introduced collation feature, as well as wildcard indexes, are also subject to future development.

Index Supported Sorting & Filtering

Indexes are currently only used to ensure uniqueness constraints and do not support filtering and sorting. This will be added in the future together with support for the explain command to debug the generated query plan.

Sessions & Multi-Document Transactions

Lungo supports multi-document transactions using a basic copy on write mechanism. Every transaction will make a copy of the catalog and clone namespaces before applying changes. After the new catalog has been written to disk, the transaction is considered successful and the catalog replaced. Read-only transactions are allowed to run in parallel as they only serve as snapshots. But write transactions are run sequentially. We assume write transactions to be fast and therefore try to prevent abortions due to conflicts (pessimistic concurrency control). The chosen approach might be changed in the future.

Oplog & Change Streams

Similar to MongoDB, every CRUD change is also logged to the local.oplog collection in the same format as consumed by change streams in MongoDB. Based on that, change streams can be used in the same way as with MongoDB replica sets.

Memory & Single File Store

The lungo.Store interface enables custom adapters that store the catalog to various mediums. The built-in MemoryStore keeps all data in memory while the FileStore writes all data atomically to a single BSON file. The interface may get more sophisticated in the future to allow more efficient storing methods.

GridFS

The lungo.Bucket, lungo.UploadStream and lungo.DownloadStream provide a GridFS implementation similar to the one found in the gridfs package of the official Go driver. However, some improvements have been made while re-implementing the package:

  • Support for sessions via the context.Context parameter in all lungo.Bucket methods.
  • The lungo.DowloadStream implements the io.Seeker interface for convenient range queries on the file contents.
  • A non-standard "tracking" mode in which in-progress uploads and deletions are tracked by storing a document in an additional "markers" collection. If enabled, uploads can be suspended and resumed later and must be explicitly claimed. All unclaimed uploads and not fully deleted files can be cleaned up.

License

The MIT License (MIT)

Copyright (c) 2019 Joël Gähwiler

Documentation

Overview

Example
type post struct {
	Title string `bson:"title"`
}

// prepare options
opts := Options{
	Store: NewMemoryStore(),
}

// open database
client, engine, err := Open(nil, opts)
if err != nil {
	panic(err)
}

// ensure engine is closed
defer engine.Close()

// get db
foo := client.Database("foo")

// get collection
bar := foo.Collection("bar")

// insert post
_, err = bar.InsertOne(nil, &post{
	Title: "Hello World!",
})
if err != nil {
	panic(err)
}

// query posts
csr, err := bar.Find(nil, bson.M{})
if err != nil {
	panic(err)
}

// decode posts
var posts []post
err = csr.All(nil, &posts)
if err != nil {
	panic(err)
}

// print documents
fmt.Printf("%+v", posts)
Output:

[{Title:Hello World!}]

Index

Examples

Constants

View Source
const (
	BucketMarkerStateUploading = "uploading"
	BucketMarkerStateUploaded  = "uploaded"
	BucketMarkerStateDeleted   = "deleted"
)

The bucket marker states.

View Source
const Local = "local"

Local is the local database.

Variables

View Source
var ErrEngineClosed = errors.New("engine closed")

ErrEngineClosed is returned if the engine has been closed.

View Source
var ErrFileNotFound = gridfs.ErrFileNotFound

ErrFileNotFound is returned if the specified file was not found in the bucket. The value is the same as gridfs.ErrFileNotFound and can be used interchangeably.

View Source
var ErrInvalidPosition = errors.New("invalid position")

ErrInvalidPosition is returned if the resulting position after a seek operation is invalid.

View Source
var ErrLostOplogPosition = errors.New("lost oplog position")

ErrLostOplogPosition may be returned by a stream when the oplog position has been lost. This can happen if a consumer is slower than the expiration of oplog entries.

View Source
var ErrNoDocuments = mongo.ErrNoDocuments

ErrNoDocuments is returned by SingleResult if not document has been found. The value is the same as mongo.ErrNoDocuments and can be used interchangeably.

View Source
var ErrSessionEnded = errors.New("session ended")

ErrSessionEnded is returned if the session has been ended.

View Source
var Oplog = Handle{Local, "oplog"}

Oplog is the handle for the local oplog namespace.

Functions

func IsUniquenessError

func IsUniquenessError(err error) bool

IsUniquenessError returns true if the provided error is generated due to a document failing a unique index constraint.

func Open

func Open(_ context.Context, opts Options) (IClient, *Engine, error)

Open will open a lungo database using the provided store.

func WithSession

func WithSession(ctx context.Context, session ISession, fn func(ISessionContext) error) error

WithSession will yield a session context to the provided callback that uses the specified session.

Types

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

Bucket provides access to a GridFS bucket. The type is generally compatible with gridfs.Bucket from the official driver but allows the passing in of a context on all methods. This ways the bucket theoretically supports multi- document transactions. However, it is not recommended to use transactions for large uploads and instead enable the the tracking mode and claim the uploads to ensure operational safety.

func NewBucket

func NewBucket(db IDatabase, opts ...*options.BucketOptions) *Bucket

NewBucket creates a bucket using the provided database and options.

func (*Bucket) ClaimUpload

func (b *Bucket) ClaimUpload(ctx context.Context, id interface{}) error

ClaimUpload will claim a tracked upload by creating the file and removing the marker.

func (*Bucket) Cleanup

func (b *Bucket) Cleanup(ctx context.Context, age time.Duration) error

Cleanup will remove unfinished uploads older than the specified age and all files marked for deletion.

func (*Bucket) Delete

func (b *Bucket) Delete(ctx context.Context, id interface{}) error

Delete will remove the specified file from the bucket. If the bucket is tracked, only a marker is inserted that will ensure the file and its chunks are deleted during the next cleanup.

func (*Bucket) DownloadToStream

func (b *Bucket) DownloadToStream(ctx context.Context, id interface{}, w io.Writer) (int64, error)

DownloadToStream will download the file with the specified id and write its contents to the provided writer.

func (*Bucket) DownloadToStreamByName

func (b *Bucket) DownloadToStreamByName(ctx context.Context, name string, w io.Writer, opts ...*options.NameOptions) (int64, error)

DownloadToStreamByName will download the file with the specified name and write its contents to the provided writer.

func (*Bucket) Drop

func (b *Bucket) Drop(ctx context.Context) error

Drop will drop the files and chunks collection. If the bucket is tracked, the markers collection is also dropped.

func (*Bucket) EnableTracking

func (b *Bucket) EnableTracking()

EnableTracking will enable a non-standard mode in which in-progress uploads and deletions are tracked by storing a document in an additional "markers" collection. If enabled, uploads can be suspended and resumed later and must be explicitly claimed. All unclaimed uploads and not fully deleted files can be cleaned up.

func (*Bucket) EnsureIndexes

func (b *Bucket) EnsureIndexes(ctx context.Context, force bool) error

EnsureIndexes will check if all required indexes exist and create them when needed. Usually, this is done automatically when uploading the first file using a bucket. However, when transactions are used to upload files, the indexes must be created before the first upload as index creation is prohibited during transactions.

func (*Bucket) Find

func (b *Bucket) Find(ctx context.Context, filter interface{}, opts ...*options.GridFSFindOptions) (ICursor, error)

Find will perform a query on the underlying files collection.

func (*Bucket) GetChunksCollection

func (b *Bucket) GetChunksCollection(_ context.Context) ICollection

GetChunksCollection returns the collection used for storing chunks.

func (*Bucket) GetFilesCollection

func (b *Bucket) GetFilesCollection(_ context.Context) ICollection

GetFilesCollection returns the collection used for storing files.

func (*Bucket) GetMarkersCollection

func (b *Bucket) GetMarkersCollection(_ context.Context) ICollection

GetMarkersCollection returns the collection used for storing markers.

func (*Bucket) OpenDownloadStream

func (b *Bucket) OpenDownloadStream(ctx context.Context, id interface{}) (*DownloadStream, error)

OpenDownloadStream will open a download stream for the file with the specified id.

func (*Bucket) OpenDownloadStreamByName

func (b *Bucket) OpenDownloadStreamByName(ctx context.Context, name string, opts ...*options.NameOptions) (*DownloadStream, error)

OpenDownloadStreamByName will open a download stream for the file with the specified name.

func (*Bucket) OpenUploadStream

func (b *Bucket) OpenUploadStream(ctx context.Context, name string, opts ...*options.UploadOptions) (*UploadStream, error)

OpenUploadStream will open an upload stream for a new file with the provided name.

func (*Bucket) OpenUploadStreamWithID

func (b *Bucket) OpenUploadStreamWithID(ctx context.Context, id interface{}, name string, opts ...*options.UploadOptions) (*UploadStream, error)

OpenUploadStreamWithID will open an upload stream for a new file with the provided id and name.

func (*Bucket) Rename

func (b *Bucket) Rename(ctx context.Context, id interface{}, name string) error

Rename will rename the file with the specified id to the provided name.

func (*Bucket) UploadFromStream

func (b *Bucket) UploadFromStream(ctx context.Context, name string, r io.Reader, opts ...*options.UploadOptions) (primitive.ObjectID, error)

UploadFromStream will upload a new file using the contents read from the provided reader.

func (*Bucket) UploadFromStreamWithID

func (b *Bucket) UploadFromStreamWithID(ctx context.Context, id interface{}, name string, r io.Reader, opts ...*options.UploadOptions) error

UploadFromStreamWithID will upload a new file using the contents read from the provided reader.

type BucketChunk

type BucketChunk struct {
	ID   primitive.ObjectID `bson:"_id"`
	File interface{}        `bson:"files_id"`
	Num  int                `bson:"n"`
	Data []byte             `bson:"data"`
}

BucketChunk represents a document stored in the bucket "chunks" collection.

type BucketFile

type BucketFile struct {
	ID         interface{} `bson:"_id"`
	Length     int         `bson:"length"`
	ChunkSize  int         `bson:"chunkSize"`
	UploadDate time.Time   `bson:"uploadDate"`
	Filename   string      `bson:"filename"`
	Metadata   interface{} `bson:"metadata,omitempty"`
}

BucketFile represents a document stored in the bucket "files" collection.

type BucketMarker

type BucketMarker struct {
	ID        primitive.ObjectID `bson:"_id"`
	File      interface{}        `bson:"files_id"`
	State     string             `bson:"state"`
	Timestamp time.Time          `bson:"timestamp"`
	Length    int                `bson:"length"`
	ChunkSize int                `bson:"chunkSize"`
	Filename  string             `bson:"filename"`
	Metadata  interface{}        `bson:"metadata,omitempty"`
}

BucketMarker represents a document stored in the bucket "markers" collection.

type Catalog

type Catalog struct {
	Namespaces map[Handle]*mongokit.Collection
}

Catalog is the top level object per database that contains all data.

func NewCatalog

func NewCatalog() *Catalog

NewCatalog creates and returns a new catalog.

func (*Catalog) Clone

func (d *Catalog) Clone() *Catalog

Clone will clone the catalog. Namespaces need to be cloned separately.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client wraps an Engine to be mongo compatible.

func (*Client) Connect

func (c *Client) Connect(context.Context) error

Connect implements the IClient.Connect method.

func (*Client) Database

func (c *Client) Database(name string, opts ...*options.DatabaseOptions) IDatabase

Database implements the IClient.Database method.

func (*Client) Disconnect

func (c *Client) Disconnect(context.Context) error

Disconnect implements the IClient.Disconnect method.

func (*Client) ListDatabaseNames

func (c *Client) ListDatabaseNames(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) ([]string, error)

ListDatabaseNames implements the IClient.ListDatabaseNames method.

func (*Client) ListDatabases

func (c *Client) ListDatabases(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) (mongo.ListDatabasesResult, error)

ListDatabases implements the IClient.ListDatabases method.

func (*Client) NumberSessionsInProgress

func (c *Client) NumberSessionsInProgress() int

NumberSessionsInProgress implements the IClient.NumberSessionsInProgress method.

func (*Client) Ping

Ping implements the IClient.Ping method.

func (*Client) StartSession

func (c *Client) StartSession(opts ...*options.SessionOptions) (ISession, error)

StartSession implements the IClient.StartSession method.

func (*Client) UseSession

func (c *Client) UseSession(ctx context.Context, fn func(ISessionContext) error) error

UseSession implements the IClient.UseSession method.

func (*Client) UseSessionWithOptions

func (c *Client) UseSessionWithOptions(ctx context.Context, opt *options.SessionOptions, fn func(ISessionContext) error) error

UseSessionWithOptions implements the IClient.UseSessionWithOptions method.

func (*Client) Watch

func (c *Client) Watch(_ context.Context, pipeline interface{}, opts ...*options.ChangeStreamOptions) (IChangeStream, error)

Watch implements the IClient.Watch method.

type Collection

type Collection struct {
	// contains filtered or unexported fields
}

Collection wraps an Engine to be mongo compatible.

func (*Collection) Aggregate

func (c *Collection) Aggregate(context.Context, interface{}, ...*options.AggregateOptions) (ICursor, error)

Aggregate implements the ICollection.Aggregate method.

func (*Collection) BulkWrite

func (c *Collection) BulkWrite(ctx context.Context, models []mongo.WriteModel, opts ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error)

BulkWrite implements the ICollection.BulkWrite method.

func (*Collection) Clone

func (c *Collection) Clone(opts ...*options.CollectionOptions) (ICollection, error)

Clone implements the ICollection.Clone method.

func (*Collection) CountDocuments

func (c *Collection) CountDocuments(ctx context.Context, filter interface{}, opts ...*options.CountOptions) (int64, error)

CountDocuments implements the ICollection.CountDocuments method.

func (*Collection) Database

func (c *Collection) Database() IDatabase

Database implements the ICollection.Database method.

func (*Collection) DeleteMany

func (c *Collection) DeleteMany(ctx context.Context, filter interface{}, opts ...*options.DeleteOptions) (*mongo.DeleteResult, error)

DeleteMany implements the ICollection.DeleteMany method.

func (*Collection) DeleteOne

func (c *Collection) DeleteOne(ctx context.Context, filter interface{}, opts ...*options.DeleteOptions) (*mongo.DeleteResult, error)

DeleteOne implements the ICollection.DeleteOne method.

func (*Collection) Distinct

func (c *Collection) Distinct(ctx context.Context, field string, filter interface{}, opts ...*options.DistinctOptions) ([]interface{}, error)

Distinct implements the ICollection.Distinct method.

func (*Collection) Drop

func (c *Collection) Drop(ctx context.Context) error

Drop implements the ICollection.Drop method.

func (*Collection) EstimatedDocumentCount

func (c *Collection) EstimatedDocumentCount(ctx context.Context, opts ...*options.EstimatedDocumentCountOptions) (int64, error)

EstimatedDocumentCount implements the ICollection.EstimatedDocumentCount method.

func (*Collection) Find

func (c *Collection) Find(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (ICursor, error)

Find implements the ICollection.Find method.

func (*Collection) FindOne

func (c *Collection) FindOne(ctx context.Context, filter interface{}, opts ...*options.FindOneOptions) ISingleResult

FindOne implements the ICollection.FindOne method.

func (*Collection) FindOneAndDelete

func (c *Collection) FindOneAndDelete(ctx context.Context, filter interface{}, opts ...*options.FindOneAndDeleteOptions) ISingleResult

FindOneAndDelete implements the ICollection.FindOneAndDelete method.

func (*Collection) FindOneAndReplace

func (c *Collection) FindOneAndReplace(ctx context.Context, filter, replacement interface{}, opts ...*options.FindOneAndReplaceOptions) ISingleResult

FindOneAndReplace implements the ICollection.FindOneAndReplace method.

func (*Collection) FindOneAndUpdate

func (c *Collection) FindOneAndUpdate(ctx context.Context, filter, update interface{}, opts ...*options.FindOneAndUpdateOptions) ISingleResult

FindOneAndUpdate implements the ICollection.FindOneAndUpdate method.

func (*Collection) Indexes

func (c *Collection) Indexes() IIndexView

Indexes implements the ICollection.Indexes method.

func (*Collection) InsertMany

func (c *Collection) InsertMany(ctx context.Context, documents []interface{}, opts ...*options.InsertManyOptions) (*mongo.InsertManyResult, error)

InsertMany implements the ICollection.InsertMany method.

func (*Collection) InsertOne

func (c *Collection) InsertOne(ctx context.Context, document interface{}, opts ...*options.InsertOneOptions) (*mongo.InsertOneResult, error)

InsertOne implements the ICollection.InsertOne method.

func (*Collection) Name

func (c *Collection) Name() string

Name implements the ICollection.Name method.

func (*Collection) ReplaceOne

func (c *Collection) ReplaceOne(ctx context.Context, filter, replacement interface{}, opts ...*options.ReplaceOptions) (*mongo.UpdateResult, error)

ReplaceOne implements the ICollection.ReplaceOne method.

func (*Collection) UpdateByID

func (c *Collection) UpdateByID(ctx context.Context, id interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error)

UpdateByID implements the ICollection.UpdateByID method.

func (*Collection) UpdateMany

func (c *Collection) UpdateMany(ctx context.Context, filter, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error)

UpdateMany implements the ICollection.UpdateMany method.

func (*Collection) UpdateOne

func (c *Collection) UpdateOne(ctx context.Context, filter, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error)

UpdateOne implements the ICollection.UpdateOne method.

func (*Collection) Watch

func (c *Collection) Watch(_ context.Context, pipeline interface{}, opts ...*options.ChangeStreamOptions) (IChangeStream, error)

Watch implements the ICollection.Watch method.

type Cursor

type Cursor struct {
	// contains filtered or unexported fields
}

Cursor wraps a list to be mongo compatible.

func (*Cursor) All

func (c *Cursor) All(_ context.Context, out interface{}) error

All implements the ICursor.All method.

func (*Cursor) Close

func (c *Cursor) Close(context.Context) error

Close implements the ICursor.Close method.

func (*Cursor) Decode

func (c *Cursor) Decode(out interface{}) error

Decode implements the ICursor.Decode method.

func (*Cursor) Err

func (c *Cursor) Err() error

Err implements the ICursor.Err method.

func (*Cursor) ID

func (c *Cursor) ID() int64

ID implements the ICursor.ID method.

func (*Cursor) Next

func (c *Cursor) Next(context.Context) bool

Next implements the ICursor.Next method.

func (*Cursor) RemainingBatchLength

func (c *Cursor) RemainingBatchLength() int

RemainingBatchLength implements the ICursor.RemainingBatchLength method.

func (*Cursor) TryNext

func (c *Cursor) TryNext(ctx context.Context) bool

TryNext implements the ICursor.TryNext method.

type Database

type Database struct {
	// contains filtered or unexported fields
}

Database wraps an Engine to be mongo compatible.

func (*Database) Aggregate

func (d *Database) Aggregate(context.Context, interface{}, ...*options.AggregateOptions) (ICursor, error)

Aggregate implements the IDatabase.Aggregate method.

func (*Database) Client

func (d *Database) Client() IClient

Client implements the IDatabase.Client method.

func (*Database) Collection

func (d *Database) Collection(name string, opts ...*options.CollectionOptions) ICollection

Collection implements the IDatabase.Collection method.

func (*Database) CreateCollection

func (d *Database) CreateCollection(ctx context.Context, name string, opts ...*options.CreateCollectionOptions) error

CreateCollection implements the IDatabase.CreateCollection method.

func (*Database) CreateView

func (d *Database) CreateView(_ context.Context, _, _ string, _ interface{}, _ ...*options.CreateViewOptions) error

CreateView implements the IDatabase.CreateView method.

func (*Database) Drop

func (d *Database) Drop(ctx context.Context) error

Drop implements the IDatabase.Drop method.

func (*Database) ListCollectionNames

func (d *Database) ListCollectionNames(ctx context.Context, filter interface{}, opts ...*options.ListCollectionsOptions) ([]string, error)

ListCollectionNames implements the IDatabase.ListCollectionNames method.

func (*Database) ListCollectionSpecifications

func (d *Database) ListCollectionSpecifications(context.Context, interface{}, ...*options.ListCollectionsOptions) ([]*mongo.CollectionSpecification, error)

ListCollectionSpecifications implements the IDatabase.ListCollectionSpecifications method.

func (*Database) ListCollections

func (d *Database) ListCollections(ctx context.Context, filter interface{}, opts ...*options.ListCollectionsOptions) (ICursor, error)

ListCollections implements the IDatabase.ListCollections method.

func (*Database) Name

func (d *Database) Name() string

Name implements the IDatabase.Name method.

func (*Database) ReadConcern

func (d *Database) ReadConcern() *readconcern.ReadConcern

ReadConcern implements the IDatabase.ReadConcern method.

func (*Database) ReadPreference

func (d *Database) ReadPreference() *readpref.ReadPref

ReadPreference implements the IDatabase.ReadPreference method.

func (*Database) RunCommand

func (d *Database) RunCommand(context.Context, interface{}, ...*options.RunCmdOptions) ISingleResult

RunCommand implements the IDatabase.RunCommand method.

func (*Database) RunCommandCursor

func (d *Database) RunCommandCursor(context.Context, interface{}, ...*options.RunCmdOptions) (ICursor, error)

RunCommandCursor implements the IDatabase.RunCommandCursor method.

func (*Database) Watch

func (d *Database) Watch(_ context.Context, pipeline interface{}, opts ...*options.ChangeStreamOptions) (IChangeStream, error)

Watch implements the IDatabase.Watch method.

func (*Database) WriteConcern

func (d *Database) WriteConcern() *writeconcern.WriteConcern

WriteConcern implements the IDatabase.WriteConcern method.

type DownloadStream

type DownloadStream struct {
	// contains filtered or unexported fields
}

DownloadStream is used to download a single file.

func (*DownloadStream) Close

func (s *DownloadStream) Close() error

Close will close the download stream.

func (*DownloadStream) GetFile

func (s *DownloadStream) GetFile() *BucketFile

GetFile will return the file that is stream is downloading from.

func (*DownloadStream) Read

func (s *DownloadStream) Read(buf []uint8) (int, error)

Read will read bytes into the specified buffer from the current position of the read head.

func (*DownloadStream) Seek

func (s *DownloadStream) Seek(offset int64, whence int) (int64, error)

Seek will reposition the read head using the specified values. A resulting position below zero will yield and error while a position beyond the file length will yield EOF on subsequent reads.

func (*DownloadStream) Skip

func (s *DownloadStream) Skip(skip int64) (int64, error)

Skip will advance the read head by the specified amount of bytes.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine manages the catalog loaded from a store and provides access to it through transactions. Additionally, it also manages streams that subscribe to changes to the catalog.

func CreateEngine

func CreateEngine(opts Options) (*Engine, error)

CreateEngine will create and return an engine with a loaded catalog from the store.

func (*Engine) Abort

func (e *Engine) Abort(txn *Transaction)

Abort will abort the specified transaction. To ensure a transaction is always released, Abort should be called after finishing any transaction.

func (*Engine) Begin

func (e *Engine) Begin(ctx context.Context, lock bool) (*Transaction, error)

Begin will create a new transaction from the current catalog. A locked transaction must be committed or aborted before another transaction can be started. Unlocked transactions serve as a point in time snapshots and can be just be discarded when not being used further.

func (*Engine) Catalog

func (e *Engine) Catalog() *Catalog

Catalog will return the currently used catalog. Any modifications to the returned catalog while using the engine results in undefined behaviour.

func (*Engine) Close

func (e *Engine) Close()

Close will close the engine.

func (*Engine) Commit

func (e *Engine) Commit(txn *Transaction) error

Commit will attempt to store the modified catalog and on success replace the current catalog. If an error is returned the transaction has been aborted and become invalid.

func (*Engine) Watch

func (e *Engine) Watch(handle Handle, pipeline bsonkit.List, resumeAfter, startAfter bsonkit.Doc, startAt *primitive.Timestamp) (*Stream, error)

Watch will return a stream that is able to consume events from the oplog.

type File

type File struct {
	Namespaces map[string]FileNamespace `bson:"namespaces"`
}

File is a format for storing catalogs in a single structure.

func BuildFile

func BuildFile(catalog *Catalog) *File

BuildFile will build a new file from the provided catalog.

func (*File) BuildCatalog

func (f *File) BuildCatalog() (*Catalog, error)

BuildCatalog will build a new catalog from the file.

type FileIndex

type FileIndex struct {
	Key     bsonkit.Doc   `bson:"key"`
	Unique  bool          `bson:"unique"`
	Partial bsonkit.Doc   `bson:"partial"`
	Expiry  time.Duration `bson:"expiry"`
}

FileIndex is a single index stored in a file.

type FileNamespace

type FileNamespace struct {
	Documents bsonkit.List         `bson:"documents"`
	Indexes   map[string]FileIndex `bson:"indexes"`
}

FileNamespace is a single namespace stored in a file.

type FileStore

type FileStore struct {
	// contains filtered or unexported fields
}

FileStore writes the catalog to a single file on disk.

func NewFileStore

func NewFileStore(path string, mode os.FileMode) *FileStore

NewFileStore creates and returns a new file store.

func (*FileStore) Load

func (s *FileStore) Load() (*Catalog, error)

Load will read the catalog from disk and return it. If no file exists at the specified location an empty catalog is returned.

func (*FileStore) Store

func (s *FileStore) Store(catalog *Catalog) error

Store will atomically write the catalog to disk.

type Handle

type Handle [2]string

Handle is a two component identifier for namespaces where the first part is the database and the second the collection.

func (Handle) String

func (h Handle) String() string

String will return the string form of the handle.

func (Handle) Validate

func (h Handle) Validate(needCollection bool) error

Validate will validate the handle.

type IChangeStream

type IChangeStream interface {
	Close(context.Context) error
	Decode(interface{}) error
	Err() error
	ID() int64
	Next(context.Context) bool
	ResumeToken() bson.Raw
	TryNext(context.Context) bool
}

IChangeStream defines a generic change stream.

type IClient

type IClient interface {
	Connect(context.Context) error
	Database(string, ...*options.DatabaseOptions) IDatabase
	Disconnect(context.Context) error
	ListDatabaseNames(context.Context, interface{}, ...*options.ListDatabasesOptions) ([]string, error)
	ListDatabases(context.Context, interface{}, ...*options.ListDatabasesOptions) (mongo.ListDatabasesResult, error)
	NumberSessionsInProgress() int
	Ping(context.Context, *readpref.ReadPref) error
	StartSession(...*options.SessionOptions) (ISession, error)
	UseSession(context.Context, func(ISessionContext) error) error
	UseSessionWithOptions(context.Context, *options.SessionOptions, func(ISessionContext) error) error
	Watch(context.Context, interface{}, ...*options.ChangeStreamOptions) (IChangeStream, error)
}

IClient defines a generic client.

func Connect

func Connect(ctx context.Context, opts ...*options.ClientOptions) (IClient, error)

Connect will connect to a MongoDB database and return a lungo compatible client.

func NewClient

func NewClient(engine *Engine) IClient

NewClient will create and return a new client.

type ICollection

type ICollection interface {
	Aggregate(context.Context, interface{}, ...*options.AggregateOptions) (ICursor, error)
	BulkWrite(context.Context, []mongo.WriteModel, ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error)
	Clone(...*options.CollectionOptions) (ICollection, error)
	CountDocuments(context.Context, interface{}, ...*options.CountOptions) (int64, error)
	Database() IDatabase
	DeleteMany(context.Context, interface{}, ...*options.DeleteOptions) (*mongo.DeleteResult, error)
	DeleteOne(context.Context, interface{}, ...*options.DeleteOptions) (*mongo.DeleteResult, error)
	Distinct(context.Context, string, interface{}, ...*options.DistinctOptions) ([]interface{}, error)
	Drop(context.Context) error
	EstimatedDocumentCount(context.Context, ...*options.EstimatedDocumentCountOptions) (int64, error)
	Find(context.Context, interface{}, ...*options.FindOptions) (ICursor, error)
	FindOne(context.Context, interface{}, ...*options.FindOneOptions) ISingleResult
	FindOneAndDelete(context.Context, interface{}, ...*options.FindOneAndDeleteOptions) ISingleResult
	FindOneAndReplace(context.Context, interface{}, interface{}, ...*options.FindOneAndReplaceOptions) ISingleResult
	FindOneAndUpdate(context.Context, interface{}, interface{}, ...*options.FindOneAndUpdateOptions) ISingleResult
	Indexes() IIndexView
	InsertMany(context.Context, []interface{}, ...*options.InsertManyOptions) (*mongo.InsertManyResult, error)
	InsertOne(context.Context, interface{}, ...*options.InsertOneOptions) (*mongo.InsertOneResult, error)
	Name() string
	ReplaceOne(context.Context, interface{}, interface{}, ...*options.ReplaceOptions) (*mongo.UpdateResult, error)
	UpdateByID(context.Context, interface{}, interface{}, ...*options.UpdateOptions) (*mongo.UpdateResult, error)
	UpdateMany(context.Context, interface{}, interface{}, ...*options.UpdateOptions) (*mongo.UpdateResult, error)
	UpdateOne(context.Context, interface{}, interface{}, ...*options.UpdateOptions) (*mongo.UpdateResult, error)
	Watch(context.Context, interface{}, ...*options.ChangeStreamOptions) (IChangeStream, error)
}

ICollection defines a generic collection.

type ICursor

type ICursor interface {
	All(context.Context, interface{}) error
	Close(context.Context) error
	Decode(interface{}) error
	Err() error
	ID() int64
	Next(context.Context) bool
	RemainingBatchLength() int
	TryNext(context.Context) bool
}

ICursor defines a generic cursor.

type IDatabase

type IDatabase interface {
	Aggregate(context.Context, interface{}, ...*options.AggregateOptions) (ICursor, error)
	Client() IClient
	Collection(string, ...*options.CollectionOptions) ICollection
	CreateCollection(context.Context, string, ...*options.CreateCollectionOptions) error
	CreateView(context.Context, string, string, interface{}, ...*options.CreateViewOptions) error
	Drop(context.Context) error
	ListCollectionNames(context.Context, interface{}, ...*options.ListCollectionsOptions) ([]string, error)
	ListCollectionSpecifications(context.Context, interface{}, ...*options.ListCollectionsOptions) ([]*mongo.CollectionSpecification, error)
	ListCollections(context.Context, interface{}, ...*options.ListCollectionsOptions) (ICursor, error)
	Name() string
	ReadConcern() *readconcern.ReadConcern
	ReadPreference() *readpref.ReadPref
	RunCommand(context.Context, interface{}, ...*options.RunCmdOptions) ISingleResult
	RunCommandCursor(context.Context, interface{}, ...*options.RunCmdOptions) (ICursor, error)
	Watch(context.Context, interface{}, ...*options.ChangeStreamOptions) (IChangeStream, error)
	WriteConcern() *writeconcern.WriteConcern
}

IDatabase defines a generic database.

type IIndexView

IIndexView defines a generic index view.

type ISession

type ISession interface {
	ID() bson.Raw
	AbortTransaction(context.Context) error
	AdvanceClusterTime(bson.Raw) error
	AdvanceOperationTime(*primitive.Timestamp) error
	Client() IClient
	ClusterTime() bson.Raw
	CommitTransaction(context.Context) error
	EndSession(context.Context)
	OperationTime() *primitive.Timestamp
	StartTransaction(...*options.TransactionOptions) error
	WithTransaction(context.Context, func(ISessionContext) (interface{}, error), ...*options.TransactionOptions) (interface{}, error)
}

ISession defines a generic session.

type ISessionContext

type ISessionContext interface {
	context.Context
	ISession
}

ISessionContext defines a generic session context.

type ISingleResult

type ISingleResult interface {
	Decode(interface{}) error
	DecodeBytes() (bson.Raw, error)
	Err() error
}

ISingleResult defines a generic single result

type IndexView

type IndexView struct {
	// contains filtered or unexported fields
}

IndexView wraps an Engine to be mongo compatible.

func (*IndexView) CreateMany

func (v *IndexView) CreateMany(ctx context.Context, indexes []mongo.IndexModel, opts ...*options.CreateIndexesOptions) ([]string, error)

CreateMany implements the IIndexView.CreateMany method.

func (*IndexView) CreateOne

func (v *IndexView) CreateOne(ctx context.Context, index mongo.IndexModel, opts ...*options.CreateIndexesOptions) (string, error)

CreateOne implements the IIndexView.CreateOne method.

func (*IndexView) DropAll

func (v *IndexView) DropAll(ctx context.Context, opts ...*options.DropIndexesOptions) (bson.Raw, error)

DropAll implements the IIndexView.DropAll method.

func (*IndexView) DropOne

func (v *IndexView) DropOne(ctx context.Context, name string, opts ...*options.DropIndexesOptions) (bson.Raw, error)

DropOne implements the IIndexView.DropOne method.

func (*IndexView) List

func (v *IndexView) List(ctx context.Context, opts ...*options.ListIndexesOptions) (ICursor, error)

List implements the IIndexView.List method.

func (*IndexView) ListSpecifications

ListSpecifications implements the IIndexView.ListSpecifications method.

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore holds the catalog in memory.

func NewMemoryStore

func NewMemoryStore() *MemoryStore

NewMemoryStore creates and returns a new memory store.

func (MemoryStore) Load

func (m MemoryStore) Load() (*Catalog, error)

Load will return the catalog.

func (MemoryStore) Store

func (m MemoryStore) Store(data *Catalog) error

Store will store the catalog.

type MongoClient

type MongoClient struct {
	*mongo.Client
}

MongoClient wraps a mongo.Client to be lungo compatible.

func (*MongoClient) Database

func (c *MongoClient) Database(name string, opts ...*options.DatabaseOptions) IDatabase

Database implements the IClient.Database method.

func (*MongoClient) StartSession

func (c *MongoClient) StartSession(opts ...*options.SessionOptions) (ISession, error)

StartSession implements the IClient.StartSession method.

func (*MongoClient) UseSession

func (c *MongoClient) UseSession(ctx context.Context, fn func(ISessionContext) error) error

UseSession implements the IClient.UseSession method.

func (*MongoClient) UseSessionWithOptions

func (c *MongoClient) UseSessionWithOptions(ctx context.Context, opt *options.SessionOptions, fn func(ISessionContext) error) error

UseSessionWithOptions implements the IClient.UseSessionWithOptions method.

func (*MongoClient) Watch

func (c *MongoClient) Watch(ctx context.Context, pipeline interface{}, opts ...*options.ChangeStreamOptions) (IChangeStream, error)

Watch implements the IClient.Watch method.

type MongoCollection

type MongoCollection struct {
	*mongo.Collection
	// contains filtered or unexported fields
}

MongoCollection wraps a mongo.Collection to be lungo compatible.

func (*MongoCollection) Aggregate

func (c *MongoCollection) Aggregate(ctx context.Context, pipeline interface{}, opts ...*options.AggregateOptions) (ICursor, error)

Aggregate implements the ICollection.Aggregate method.

func (*MongoCollection) Clone

Clone implements the ICollection.Clone method.

func (*MongoCollection) Database

func (c *MongoCollection) Database() IDatabase

Database implements the ICollection.Database method.

func (*MongoCollection) Find

func (c *MongoCollection) Find(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (ICursor, error)

Find implements the ICollection.Find method.

func (*MongoCollection) FindOne

func (c *MongoCollection) FindOne(ctx context.Context, filter interface{}, opts ...*options.FindOneOptions) ISingleResult

FindOne implements the ICollection.FindOne method.

func (*MongoCollection) FindOneAndDelete

func (c *MongoCollection) FindOneAndDelete(ctx context.Context, filter interface{}, opts ...*options.FindOneAndDeleteOptions) ISingleResult

FindOneAndDelete implements the ICollection.FindOneAndDelete method.

func (*MongoCollection) FindOneAndReplace

func (c *MongoCollection) FindOneAndReplace(ctx context.Context, filter, replacement interface{}, opts ...*options.FindOneAndReplaceOptions) ISingleResult

FindOneAndReplace implements the ICollection.FindOneAndReplace method.

func (*MongoCollection) FindOneAndUpdate

func (c *MongoCollection) FindOneAndUpdate(ctx context.Context, filter, update interface{}, opts ...*options.FindOneAndUpdateOptions) ISingleResult

FindOneAndUpdate implements the ICollection.FindOneAndUpdate method.

func (*MongoCollection) Indexes

func (c *MongoCollection) Indexes() IIndexView

Indexes implements the ICollection.Indexes method.

func (*MongoCollection) Watch

func (c *MongoCollection) Watch(ctx context.Context, pipeline interface{}, opts ...*options.ChangeStreamOptions) (IChangeStream, error)

Watch implements the ICollection.Watch method.

type MongoDatabase

type MongoDatabase struct {
	*mongo.Database
	// contains filtered or unexported fields
}

MongoDatabase wraps a mongo.Database to be lungo compatible.

func (*MongoDatabase) Aggregate

func (d *MongoDatabase) Aggregate(ctx context.Context, pipeline interface{}, opts ...*options.AggregateOptions) (ICursor, error)

Aggregate implements the IDatabase.Aggregate method.

func (*MongoDatabase) Client

func (d *MongoDatabase) Client() IClient

Client implements the IDatabase.Client method.

func (*MongoDatabase) Collection

func (d *MongoDatabase) Collection(name string, opts ...*options.CollectionOptions) ICollection

Collection implements the IDatabase.Collection method.

func (*MongoDatabase) CreateCollection

func (d *MongoDatabase) CreateCollection(ctx context.Context, name string, opts ...*options.CreateCollectionOptions) error

CreateCollection implements the IDatabase.CreateCollection method.

func (*MongoDatabase) ListCollections

func (d *MongoDatabase) ListCollections(ctx context.Context, filter interface{}, opts ...*options.ListCollectionsOptions) (ICursor, error)

ListCollections implements the IDatabase.ListCollections method.

func (*MongoDatabase) RunCommand

func (d *MongoDatabase) RunCommand(ctx context.Context, runCommand interface{}, opts ...*options.RunCmdOptions) ISingleResult

RunCommand implements the IDatabase.RunCommand method.

func (*MongoDatabase) RunCommandCursor

func (d *MongoDatabase) RunCommandCursor(ctx context.Context, filter interface{}, opts ...*options.RunCmdOptions) (ICursor, error)

RunCommandCursor implements the IDatabase.RunCommandCursor method.

func (*MongoDatabase) Watch

func (d *MongoDatabase) Watch(ctx context.Context, pipeline interface{}, opts ...*options.ChangeStreamOptions) (IChangeStream, error)

Watch implements the IDatabase.Watch method.

type MongoIndexView

type MongoIndexView struct {
	*mongo.IndexView
}

MongoIndexView wraps a mongo.IndexView to be lungo compatible.

func (*MongoIndexView) CreateMany

func (m *MongoIndexView) CreateMany(ctx context.Context, models []mongo.IndexModel, opts ...*options.CreateIndexesOptions) ([]string, error)

CreateMany implements the IIndexView.List method.

func (*MongoIndexView) CreateOne

func (m *MongoIndexView) CreateOne(ctx context.Context, model mongo.IndexModel, opts ...*options.CreateIndexesOptions) (string, error)

CreateOne implements the IIndexView.List method.

func (*MongoIndexView) DropAll

func (m *MongoIndexView) DropAll(ctx context.Context, opts ...*options.DropIndexesOptions) (bson.Raw, error)

DropAll implements the IIndexView.List method.

func (*MongoIndexView) DropOne

func (m *MongoIndexView) DropOne(ctx context.Context, name string, opts ...*options.DropIndexesOptions) (bson.Raw, error)

DropOne implements the IIndexView.List method.

func (*MongoIndexView) List

List implements the IIndexView.List method.

type MongoSession

type MongoSession struct {
	mongo.Session
	// contains filtered or unexported fields
}

MongoSession wraps a mongo.Session to be lungo compatible.

func (*MongoSession) AbortTransaction

func (s *MongoSession) AbortTransaction(ctx context.Context) error

AbortTransaction implements the ISession.Client method.

func (*MongoSession) Client

func (s *MongoSession) Client() IClient

Client implements the ISession.Client method.

func (*MongoSession) CommitTransaction

func (s *MongoSession) CommitTransaction(ctx context.Context) error

CommitTransaction implements the ISession.Client method.

func (*MongoSession) EndSession

func (s *MongoSession) EndSession(ctx context.Context)

EndSession implements the ISession.Client method.

func (*MongoSession) WithTransaction

func (s *MongoSession) WithTransaction(ctx context.Context, fn func(ISessionContext) (interface{}, error), opts ...*options.TransactionOptions) (interface{}, error)

WithTransaction implements the ISession.WithTransaction method.

type MongoSessionContext

type MongoSessionContext struct {
	context.Context
	*MongoSession
}

MongoSessionContext wraps a mongo.SessionContext to be lungo compatible.

type Opcode

type Opcode int

Opcode defines the type of an operation.

const (
	Insert Opcode = iota
	Replace
	Update
	Delete
)

The available opcodes.

func (Opcode) String

func (c Opcode) String() string

Strings returns the opcode name.

type Operation

type Operation struct {
	// The opcode.
	Opcode Opcode

	// The filter document (replace, update, delete).
	Filter bsonkit.Doc

	// The insert, update or replacement document.
	Document bsonkit.Doc

	// The sorting to apply (replace, update, delete).
	Sort bsonkit.Doc

	// Whether an upsert should be performed (replace, update).
	Upsert bool

	// The documents to skip (update, delete).
	Skip int

	// The limit (update, delete).
	Limit int

	// The array filter conditions (update).
	ArrayFilters bsonkit.List
}

Operation defines a single operation.

type Options

type Options struct {
	// The store used by the engine to load and store the catalog.
	Store Store

	// The interval at which expired documents are removed.
	//
	// Default: 60s.
	ExpireInterval time.Duration

	// The function that is called with errors from the expiry goroutine.
	ExpireErrors func(error)

	// The minimum and maximum size of the oplog.
	//
	// Default: 100, 1000.
	MinOplogSize int
	MaxOplogSize int

	// The minimum and maximum age of oplog entries.
	//
	// Default: 5m, 1h.
	MinOplogAge time.Duration
	MaxOplogAge time.Duration
}

Options is used to configure an engine.

type Result

type Result struct {
	// The list of matched documents.
	Matched bsonkit.List

	// The list of inserted, replace or updated documents.
	Modified bsonkit.List

	// The upserted document.
	Upserted bsonkit.Doc

	// The error that occurred during the operation.
	Error error
}

Result describes the outcome of an operation.

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session provides a mongo compatible way to handle transactions.

func (*Session) AbortTransaction

func (s *Session) AbortTransaction(context.Context) error

AbortTransaction implements the ISession.AbortTransaction method.

func (*Session) AdvanceClusterTime

func (s *Session) AdvanceClusterTime(bson.Raw) error

AdvanceClusterTime implements the ISession.AdvanceClusterTime method.

func (*Session) AdvanceOperationTime

func (s *Session) AdvanceOperationTime(*primitive.Timestamp) error

AdvanceOperationTime implements the ISession.AdvanceOperationTime method.

func (*Session) Client

func (s *Session) Client() IClient

Client implements the ISession.Client method.

func (*Session) ClusterTime

func (s *Session) ClusterTime() bson.Raw

ClusterTime implements the ISession.ClusterTime method.

func (*Session) CommitTransaction

func (s *Session) CommitTransaction(context.Context) error

CommitTransaction implements the ISession.CommitTransaction method.

func (*Session) EndSession

func (s *Session) EndSession(context.Context)

EndSession implements the ISession.EndSession method.

func (*Session) ID

func (s *Session) ID() bson.Raw

ID implements the ISession.ID method.

func (*Session) OperationTime

func (s *Session) OperationTime() *primitive.Timestamp

OperationTime implements the ISession.OperationTime method.

func (*Session) StartTransaction

func (s *Session) StartTransaction(opts ...*options.TransactionOptions) error

StartTransaction implements the ISession.StartTransaction method.

func (*Session) Transaction

func (s *Session) Transaction() *Transaction

Transaction will return the active transaction or nil if no transaction has been started.

func (*Session) WithTransaction

func (s *Session) WithTransaction(ctx context.Context, fn func(ISessionContext) (interface{}, error), opts ...*options.TransactionOptions) (interface{}, error)

WithTransaction implements the ISession.WithTransaction method.

type SessionContext

type SessionContext struct {
	context.Context
	*Session
}

SessionContext provides a mongo compatible session context.

type SingleResult

type SingleResult struct {
	// contains filtered or unexported fields
}

SingleResult wraps a result to be mongo compatible.

func (*SingleResult) Decode

func (r *SingleResult) Decode(out interface{}) error

Decode implements the ISingleResult.Decode method.

func (*SingleResult) DecodeBytes

func (r *SingleResult) DecodeBytes() (bson.Raw, error)

DecodeBytes implements the ISingleResult.DecodeBytes method.

func (*SingleResult) Err

func (r *SingleResult) Err() error

Err implements the ISingleResult.Err method.

type Store

type Store interface {
	Load() (*Catalog, error)
	Store(*Catalog) error
}

Store is the interface that describes storage adapters.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream provides a mongo compatible way to read oplog events.

func (*Stream) Close

func (s *Stream) Close(context.Context) error

Close implements the IChangeStream.Close method.

func (*Stream) Decode

func (s *Stream) Decode(out interface{}) error

Decode implements the IChangeStream.Decode method.

func (*Stream) Err

func (s *Stream) Err() error

Err implements the IChangeStream.Err method.

func (*Stream) ID

func (s *Stream) ID() int64

ID implements the IChangeStream.ID method.

func (*Stream) Next

func (s *Stream) Next(ctx context.Context) bool

Next implements the IChangeStream.Next method.

func (*Stream) ResumeToken

func (s *Stream) ResumeToken() bson.Raw

ResumeToken implements the IChangeStream.ResumeToken method.

func (*Stream) TryNext

func (s *Stream) TryNext(ctx context.Context) bool

TryNext implements the ICursor.TryNext method.

type Transaction

type Transaction struct {
	// contains filtered or unexported fields
}

Transaction buffers multiple changes to a catalog.

func NewTransaction

func NewTransaction(catalog *Catalog) *Transaction

NewTransaction creates and returns a new transaction.

func (*Transaction) Bulk

func (t *Transaction) Bulk(handle Handle, ops []Operation, ordered bool) ([]Result, error)

Bulk performs the specified operations in one go. If ordered is true the process is aborted on the first error.

func (*Transaction) Catalog

func (t *Transaction) Catalog() *Catalog

Catalog will return the modified catalog by the transaction.

func (*Transaction) Clean

func (t *Transaction) Clean(minSize, maxSize int, minAge, maxAge time.Duration)

Clean will clean the oplog and only keep up to the specified amount of events and delete events that are older than the specified age.

func (*Transaction) CountDocuments

func (t *Transaction) CountDocuments(handle Handle) (int, error)

CountDocuments will return the number of documents in the specified namespace.

func (*Transaction) Create

func (t *Transaction) Create(handle Handle) error

Create will ensure that a namespace for the provided handle exists.

func (*Transaction) CreateIndex

func (t *Transaction) CreateIndex(handle Handle, name string, config mongokit.IndexConfig) (string, error)

CreateIndex will create the specified index in the specified namespace. It is a no-op if an index with the same name and configuration already exists.

func (*Transaction) Delete

func (t *Transaction) Delete(handle Handle, query, sort bsonkit.Doc, skip, limit int) (*Result, error)

Delete will remove all matching documents from the namespace. Sort, skip and limit may be supplied to modify the result. The returned result will contain the matched documents.

func (*Transaction) Dirty

func (t *Transaction) Dirty() bool

Dirty will return whether the transaction contains changes.

func (*Transaction) Drop

func (t *Transaction) Drop(handle Handle) error

Drop will return the namespace with the specified handle from the catalog. If the second part of the handle is empty, it will drop all namespaces matching the first part.

func (*Transaction) DropIndex

func (t *Transaction) DropIndex(handle Handle, name string) error

DropIndex will drop the specified index in the specified namespace.

func (*Transaction) Expire

func (t *Transaction) Expire() error

Expire will remove documents that are expired due to a TTL index.

func (*Transaction) Find

func (t *Transaction) Find(handle Handle, query, sort bsonkit.Doc, skip, limit int) (*Result, error)

Find will query documents from a namespace. Sort, skip and limit may be supplied to modify the result. The returned results will contain the matched list of documents.

func (*Transaction) Insert

func (t *Transaction) Insert(handle Handle, list bsonkit.List, ordered bool) (*Result, error)

Insert will insert the specified documents into the namespace. The engine will automatically generate an object id per document if it is missing. If ordered is enabled the operation is aborted on the first error and the result returned. Otherwise, the engine will try to insert all documents. The returned results will contain the inserted documents and potential errors.

func (*Transaction) ListCollections

func (t *Transaction) ListCollections(handle Handle, query bsonkit.Doc) (bsonkit.List, error)

ListCollections will return a list of all collections in the specified db.

func (*Transaction) ListDatabases

func (t *Transaction) ListDatabases(query bsonkit.Doc) (bsonkit.List, error)

ListDatabases will return a list of all databases in the catalog.

func (*Transaction) ListIndexes

func (t *Transaction) ListIndexes(handle Handle) (bsonkit.List, error)

ListIndexes will return a list of indexes in the specified namespace.

func (*Transaction) Replace

func (t *Transaction) Replace(handle Handle, query, sort, repl bsonkit.Doc, upsert bool) (*Result, error)

Replace will replace the first matching document with the specified replacement document. If upsert is enabled, it will insert the replacement document if it is missing. The returned result will contain the matched and modified or upserted document.

func (*Transaction) Update

func (t *Transaction) Update(handle Handle, query, sort, update bsonkit.Doc, skip, limit int, upsert bool, arrayFilters bsonkit.List) (*Result, error)

Update will apply the update to all matching document. Sort, skip and limit may be supplied to modify the result. If upsert is enabled, it will extract constant parts of the query and apply the update and insert the document if it is missing. The returned result will contain the matched and modified or upserted document.

type UploadStream

type UploadStream struct {
	// contains filtered or unexported fields
}

UploadStream is used to upload a single file.

func (*UploadStream) Abort

func (s *UploadStream) Abort() error

Abort will abort the upload and remove uploaded chunks. If the bucket is tracked it will also remove the potentially created marker. If the abort fails the upload may get cleaned up.

func (*UploadStream) Close

func (s *UploadStream) Close() error

Close will finish the upload and close the stream. If the bucket is tracked the method will not finalize the upload by creating a file. Instead the user should call ClaimUpload as part of a multi-document transaction to safely claim the upload. Until that happens the upload may be cleaned up.

func (*UploadStream) Resume

func (s *UploadStream) Resume() (int64, error)

Resume will try to resume a previous tracked upload that has been suspended. It will return the amount of bytes that have already been written.

func (*UploadStream) Suspend

func (s *UploadStream) Suspend() (int64, error)

Suspend will upload fully buffered chunks and close the stream. The stream may be reopened and resumed later to finish the upload. Until that happens the upload my be cleaned up.

func (*UploadStream) Write

func (s *UploadStream) Write(data []uint8) (int, error)

Write will write the provided data to chunks in the background. If the bucket is tracked and an upload already exists, it must be resumed before writing more data.

Directories

Path Synopsis