Documentation ¶
Overview ¶
Package orm provides a simple "Object-Relational Mapping" style library for MongoDB.
Operator ¶
The main entrypoint to use the package is an 'Operator' instance. An operator provides a handler for a MongoDB database and manage the required client connection to it.
// MongoDB client settings conf := options.Client() conf.ApplyURI("mongodb://localhost:27017/?tls=false") conf.SetDirect(true) conf.SetMinPoolSize(2) conf.SetReadPreference(readpref.Primary()) conf.SetAppName("super-cool-app") conf.SetReplicaSet("rs1") // Get a new operator instance db, err := NewOperator("testing", conf) if err != nil { panic(err) } // Use the operator instance. For example to check if the // MongoDB server is reachable if err = db.Ping(); err != nil { panic(err) } // Close the connection to the database when no longer needed if err = db.Close(context.Background()); err != nil { panic(err) }
Model ¶
The most common use of an operator instance is to create Models. A model instance serves as a "wrapper" to a MongoDB collection and provides an easy-to-use API on top of it to greatly simplify common operations. The most common usage of a model instance is to provide CRUD operations without the need to use intermediary data structures.
The encoding/decoding rules used when storing or retrieving data using a model, are the following.
1. Only exported fields on structs are included
2. Default BSON encoder rules are applied
3. If available, `bson` tags are honored
4. If available, `json` tags are honored
https://pkg.go.dev/go.mongodb.org/mongo-driver/bson
This allows to easily store complex types without the need to modify the code to include `bson` tags, for example, when using Protobuf Messages.
Transactions ¶
In MongoDB, an operation on a single document is atomic. Because you can use embedded documents and arrays to capture relationships between data in a single document structure instead of normalizing across multiple documents and collections, this single-document atomicity obviates the need for multi-document transactions for many practical use cases.
For situations that require atomicity of reads and writes to multiple documents (in a single or multiple collections), MongoDB supports multi-document transactions.
Using this package a transaction is executed using the 'Tx' method on an operator instance. The method takes a 'TransactionBody' function that atomically binds all operations performed inside of it, and returns the final result of committing or aborting the transaction. The active transaction must be set on any models used inside the transaction body.
c1 := db.Model("shelf") c2 := db.Model("protos") // Complex multi-collection operation complexOperation := func(tx *Transaction) error { // Set the active transaction on all models used if err := c1.WithTransaction(tx); err != nil { return tx.Abort() } if err := c2.WithTransaction(tx); err != nil { return tx.Abort() } // Run tasks if _, err := c1.Insert(annotatedStruct()); err != nil { return tx.Abort() } if _, err := c2.Insert(notAnnotatedStruct()); err != nil { return tx.Abort() } // Commit transaction and return final result return tx.Commit() } // Execute transaction if err := db.Tx(complexOperation, options.Transaction()); err != nil { panic(err) }
More information: https://docs.mongodb.com/manual/core/transactions/
Streams ¶
Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to relevant data changes and immediately react to them.
Change streams are provided at the collection level by using the 'Subscribe' method on a model instance.
shelf := db.Model("shelf") // Open a stream to detect all operations performed on the // 'shelf' collection sub, err := shelf.Subscribe(PipelineCollection(), options.ChangeStream()) if err != nil { panic(err) } // Handle subscription events go func() { for { select { case <-sub.Done(): return case e := <-sub.Event(): fmt.Printf("event: %+v\n", e) } } }() // When no longer needed, close the subscription rt, err := sub.Close() if err != nil { panic(err) } fmt.Printf("you can resume the subscription with: %v\n", rt)
More information: https://docs.mongodb.com/manual/changeStreams/
Index ¶
- func Filter() map[string]interface{}
- func ParseID(id string) (primitive.ObjectID, error)
- func PipelineCollection() mongo.Pipeline
- func PipelineUpdateDocument(oid primitive.ObjectID) mongo.Pipeline
- type ChangeEvent
- type Model
- func (m *Model) Batch(item interface{}, opts ...*options.InsertManyOptions) (int64, error)
- func (m *Model) Count(filter map[string]interface{}) (int64, error)
- func (m *Model) Delete(filter map[string]interface{}) error
- func (m *Model) DeleteAll(filter map[string]interface{}) (int64, error)
- func (m *Model) Distinct(field string, filter map[string]interface{}, result interface{}) error
- func (m *Model) Estimate() (int64, error)
- func (m *Model) Find(filter map[string]interface{}, result interface{}, ...) error
- func (m *Model) FindByID(id string, result interface{}) error
- func (m *Model) First(filter map[string]interface{}, result interface{}, ...) error
- func (m *Model) Insert(item interface{}) (string, error)
- func (m *Model) Subscribe(pipeline mongo.Pipeline, opts *options.ChangeStreamOptions) (*Stream, error)
- func (m *Model) Update(filter map[string]interface{}, patch interface{}, upsert bool) error
- func (m *Model) UpdateAll(filter map[string]interface{}, patch interface{}) (int64, error)
- func (m *Model) WithTransaction(tx *Transaction) error
- type Operator
- type Stream
- type Transaction
- type TransactionBody
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Filter ¶
func Filter() map[string]interface{}
Filter provides a simple shortcut for a commonly used filter value when no fields are specified.
func PipelineCollection ¶
PipelineCollection is a helper function to setup a pipeline to receive all change events for a specific MongoDB collection.
Types ¶
type ChangeEvent ¶
type ChangeEvent struct { // Metadata related to the operation. Acts as the resumeToken for // the resumeAfter parameter when resuming a change stream. ID bson.D `bson:"_id"` // The type of operation that occurred. Can be any of the following values: // insert, delete, replace, update, drop, rename, dropDatabase, invalidate Operation string `bson:"operationType"` // The namespace (database and or collection) affected by the event. Namespace ns `bson:"ns"` // The timestamp from the oplog entry associated with the event. ClusterTime primitive.Timestamp `bson:"clusterTime"` // A document that contains the _id of the document created or modified // by the insert, replace, delete, update operations (i.e. CRUD operations). // For sharded collections, also displays the full shard key for the // document. DocumentKey bson.D `bson:"documentKey"` // The document created or modified by the insert, replace, delete, // update operations (i.e. CRUD operations). For delete operations, this // field is omitted as the document no longer exists. // // For update operations, this field only appears if you configured the // change stream with fullDocument set to updateLookup. FullDocument bson.M `bson:"fullDocument"` // A document describing the fields that were updated or removed by the // update operation. UpdateDescription bson.M `bson:"updateDescription,omitempty"` }
ChangeEvent provides a notification for an operation that modified state on a MongoDB instance. https://docs.mongodb.com/manual/reference/change-events/
func (*ChangeEvent) Decode ¶
func (ce *ChangeEvent) Decode(target interface{}) error
Decode will load the 'fullDocument' contents of the event instance into the provided 'target' element.
type Model ¶
type Model struct { // MongoDB's collection backing the model. Collection *mongo.Collection // contains filtered or unexported fields }
Model instances serve as a "wrapper" to a MongoDB collection and provide an easy-to-use API on top of it to greatly simplify common tasks.
func (*Model) Batch ¶
func (m *Model) Batch(item interface{}, opts ...*options.InsertManyOptions) (int64, error)
Batch executes an insert command to save multiple documents into the collection and return the number of successfully stored items. The provided item must be a slice.
Example ¶
shelf := db.Model("shelf") // Get a list (slice) of items to store no the database. var list []*book // Batch will store all the items on a single operation. The // input provided must be a slice. n, err := shelf.Batch(list) if err != nil { panic(err) } fmt.Printf("documents saved: %d", n)
Output:
func (*Model) Count ¶
Count returns the number of documents in the collection that satisfy the provided filter. An empty filter will count all the documents in the collection by performing a full scan. This operation trades-off speed for accuracy. For more information: https://docs.mongodb.com/manual/reference/method/db.collection.countDocuments/
func (*Model) Delete ¶
Delete will look for the first document that satisfies the provided 'filter' value and permanently remove it. The operation does not fail if no document satisfy the filter value.
func (*Model) DeleteAll ¶
DeleteAll will remove any document that satisfies the provided 'filter' value and return the number of documents deleted by the operation.
Example ¶
mod := db.Model("authorities") // Filter is based on the encoded records as they appear on // the database. filter := Filter() filter["ca_constraint.is_ca"] = false // DeleteAll will remove all documents satisfying the filter. total, err := mod.DeleteAll(filter) if err != nil { panic(err) } fmt.Printf("documents deleted: %d", total)
Output:
func (*Model) Distinct ¶
Distinct allows to find the unique values for a specified field in the collection. If no 'filter' is specified a full scan of the collection is performed. Command documentation.
var list []string err := mod.Distinct("user_type", Filter(), &list)
func (*Model) Estimate ¶
Estimate executes a count command and returns an estimate of the number of documents in the collection using available metadata. This operation trades-off accuracy for speed. For more information: https://docs.mongodb.com/manual/reference/method/db.collection.estimatedDocumentCount/
func (*Model) Find ¶
func (m *Model) Find(filter map[string]interface{}, result interface{}, opts ...*options.FindOptions) error
Find all documents in the collection that satisfy the provided 'filter'. The returned documents will be automatically decoded into 'result', that must be a pointer to a slice.
func (*Model) FindByID ¶
FindByID looks for a given document based on its '_id' field. The provided 'id' value must be a MongoDB objectID hex string. The returned document is automatically decoded into 'result', that must be a pointer to a given struct.
Example ¶
shelf := db.Model("shelf") // ID must be a valid hex-encoded MongoDB ObjectID id := "...hex id string..." // The result will be automatically decoded to // this instance record := book{} // Get the result from the collection using the model. // Notice the result holder is passed by reference // (i.e., is a pointer). err := shelf.FindByID(id, &record) if err != nil { panic(err) }
Output:
func (*Model) First ¶
func (m *Model) First(filter map[string]interface{}, result interface{}, opts ...*options.FindOneOptions) error
First looks for the first document in the collection that satisfies the specified 'filter'. The returned document is automatically decoded into 'result', that must be a pointer to a given struct.
func (*Model) Insert ¶
Insert the item in the model's underlying collection.
Example ¶
// Create a model for the 'shelf' collection. The model // will encode/decode objects using the `bson` annotations // on the structs. shelf := db.Model("shelf") // Create an object to store on the database sample := &book{ Title: "Sample book", Author: "John Dow", Pages: 137, } // Store the object id, err := shelf.Insert(sample) if err != nil { panic(err) } fmt.Printf("book stored with id: %s", id)
Output:
func (*Model) Subscribe ¶
func (m *Model) Subscribe(pipeline mongo.Pipeline, opts *options.ChangeStreamOptions) (*Stream, error)
Subscribe will set up and return a stream instance that can used to receive change events based on the parameters provided.
Example ¶
shelf := db.Model("shelf") // Open a stream to detect all operations performed on the // 'shelf' collection sub, err := shelf.Subscribe(PipelineCollection(), options.ChangeStream()) if err != nil { panic(err) } // Handle subscription events go func() { for { select { case <-sub.Done(): return case e := <-sub.Event(): fmt.Printf("event: %+v\n", e) } } }() // When no longer needed, close the subscription rt, err := sub.Close() if err != nil { panic(err) } fmt.Printf("you can resume the subscription with: %v\n", rt)
Output:
func (*Model) Update ¶
Update will look for the first document that satisfies the 'filter' value and apply the 'patch' to it. If no such document currently exists, it will be automatically generated if 'upsert' is set to true.
func (*Model) UpdateAll ¶
UpdateAll will try to apply the provided 'patch' to all the documents satisfying the specified 'filter' and return the number of documents modified by the operation.
Example ¶
shelf := db.Model("shelf") // Filter and patch are based on the encoded records as // they appear on the database. filter := map[string]interface{}{ "author_name": "John Dow", } patch := map[string]interface{}{ "author_name": "Jane Dow", } // UpdateAll will apply the patch to all documents // satisfying the filter. total, err := shelf.UpdateAll(filter, patch) if err != nil { panic(err) } fmt.Printf("documents updated: %d", total)
Output:
func (*Model) WithTransaction ¶
func (m *Model) WithTransaction(tx *Transaction) error
WithTransaction sets the active transaction for the model. All CRUD operations are bound to the active transaction when executed. The active transaction will be removed automatically once it has been aborted or committed. This method will return an error if another transaction is currently active.
type Operator ¶
type Operator struct {
// contains filtered or unexported fields
}
Operator instances receive MongoDB client options and manage an underlying network connection. An operator can be used to instantiate any number of models.
func NewOperator ¶
func NewOperator(db string, opts *options.ClientOptions) (*Operator, error)
NewOperator returns a new ORM operator instance for the specified MongoDB database. The operator will create and manage the required client connection(s) based on the provided configuration options. Remember to call the 'Close' method to free resources when the operator is no longer needed.
Example ¶
// MongoDB client settings conf := options.Client() conf.ApplyURI("mongodb://localhost:27017/?tls=false") conf.SetDirect(true) conf.SetMinPoolSize(2) conf.SetReadPreference(readpref.Primary()) conf.SetAppName("super-cool-app") conf.SetReplicaSet("rs1") // Get a new operator instance db, err := NewOperator("testing", conf) if err != nil { panic(err) } // Use the operator instance to create models. And the models // to manage data on the database. // .. // Close the connection to the database when no longer needed err = db.Close(context.Background()) if err != nil { panic(err) }
Output:
func (*Operator) Model ¶
Model returns a new model instance. The 'name' provided will also be used as the model's underlying MongoDB collection. The encoding rules for items stored and retrieved using the model are: 1. Only exported fields on structs are included 2. Default BSON encoder rules 3. Use `bson` tags, if available 4. Use `json` tags, if available 5. Marshal `nil` maps as empty BSON documents 6. Marshal `nil` slices as empty BSON arrays https://pkg.go.dev/go.mongodb.org/mongo-driver/bson
func (*Operator) Ping ¶
Ping performs a reachability test to the MongoDB server used by the operator instance. A default timeout of 5 seconds is used.
func (*Operator) Tx ¶
func (op *Operator) Tx(body TransactionBody, opts *options.TransactionOptions) error
Tx can be used to enable causal consistency for a group of operations or to execute operations in an ACID transaction.
Example ¶
c1 := db.Model("shelf") c2 := db.Model("protos") // Set transaction options opts := options.Transaction() opts.SetReadConcern(readconcern.Snapshot()) opts.SetWriteConcern(writeconcern.Majority()) // Complex multi-collection operation complexOperation := func(tx *Transaction) error { // Set the active transaction on all models used if err := c1.WithTransaction(tx); err != nil { return tx.Abort() } if err := c2.WithTransaction(tx); err != nil { return tx.Abort() } // Run tasks if _, err := c1.Insert(annotatedStruct()); err != nil { return tx.Abort() } if _, err := c2.Insert(notAnnotatedStruct()); err != nil { return tx.Abort() } // Commit transaction and return final result return tx.Commit() } // Execute complex atomic operation if err := db.Tx(complexOperation, opts); err != nil { panic(err) }
Output:
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream provides a simple interface to listen for change events. More information:
https://docs.mongodb.com/manual/changeStreams/
func (*Stream) Close ¶
Close the stream and free all its related resources, the method returns the stream's resume token (when available) and any error produced when closing the stream. The resume token can later be used when opening a new stream using the 'SetResumeAfter' configuration option.
conf.SetResumeAfter(rt)
func (*Stream) Done ¶
func (s *Stream) Done() <-chan struct{}
Done is used to signal when the stream processing has terminated.
func (*Stream) Event ¶
func (s *Stream) Event() <-chan ChangeEvent
Event channel is used to deliver any changes detected for the stream parameters.
type Transaction ¶
type Transaction struct {
// contains filtered or unexported fields
}
Transaction instances allow to atomically run several operations. If an operation fails, the whole transaction is reverted.
func (*Transaction) Abort ¶
func (tx *Transaction) Abort() error
Abort the transaction and rollback all changes. This method will return an error if transaction is no longer active, has been committed or aborted.
func (*Transaction) Commit ¶
func (tx *Transaction) Commit() error
Commit the transaction. This method will return an error if the transaction is no longer active or has been aborted.
func (*Transaction) Done ¶
func (tx *Transaction) Done() <-chan struct{}
Done triggers a notification when the transaction is completed. Either by abort or commit.
func (*Transaction) ID ¶
func (tx *Transaction) ID() bson.Raw
ID returns the session identifier for the transaction.
type TransactionBody ¶
type TransactionBody func(tx *Transaction) error
TransactionBody serves as a wrapper to group all operations confirming a specific transaction.