orm

package
v0.0.0-...-1aa08c1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2024 License: BSD-3-Clause Imports: 14 Imported by: 3

README

Package orm

Simple "Object-Relational Mapping" style library for MongoDB.

Operator

The main entrypoint to use the package is an 'Operator' instance. An operator provides a handler for a MongoDB database and manage the required client connection(s) to it.

// MongoDB client settings
conf := options.Client()
conf.ApplyURI("mongodb://localhost:27017/?tls=false")
conf.SetDirect(true)
conf.SetMinPoolSize(2)
conf.SetReadPreference(readpref.Primary())
conf.SetAppName("super-cool-app")
conf.SetReplicaSet("rs1")

// Get a new operator instance
db, err := NewOperator("testing", conf)
if err != nil {
  panic(err)
}

// Use the operator instance. For example to check if the
// MongoDB server is reachable
if err = db.Ping(); err != nil {
  panic(err)
}

// Close the connection to the database when no longer needed
if err = db.Close(context.Background()); err != nil {
  panic(err)
}

Model

The most common use of an operator instance is to create models. A model instance serves as a "wrapper" to a MongoDB collection and provides an easy-to-use API on top of it to greatly simplify common operations. The most common usage of a model instance is to provide CRUD operations without the need to use intermediary data structures.

The encoding/decoding rules used when storing or retrieving data using a model, are the following.

  1. Only exported fields on structs are included
  2. Default BSON encoder rules are applied
  3. If available, bson tags are honored
  4. If available, json tags are honored

This allows to easily store complex types without the need to modify the code to include bson tags, for example, when using Protobuf Messages.

More information: https://pkg.go.dev/go.mongodb.org/mongo-driver/bson

Transactions

In MongoDB, an operation on a single document is atomic. Because you can use embedded documents and arrays to capture relationships between data in a single document structure instead of normalizing across multiple documents and collections, this single-document atomicity obviates the need for multi-document transactions for many practical use cases.

For situations that require atomicity of reads and writes to multiple documents (in a single or multiple collections), MongoDB supports multi-document transactions.

Using this package a transaction is executed using the Tx method on an operator instance. The method takes a TransactionBody function that atomically binds all operations performed inside of it, and returns the final result of committing or aborting the transaction. The active transaction must be set on any models used inside the transaction body.

c1 := db.Model("shelf")
c2 := db.Model("protos")

// Complex multi-collection operation
complexOperation := func(tx *Transaction) error {
  // Set the active transaction on all models used
  if err := c1.WithTransaction(tx); err != nil {
    return tx.Abort()
  }
  if err := c2.WithTransaction(tx); err != nil {
    return tx.Abort()
  }

  // Run tasks
  if _, err := c1.Insert(annotatedStruct()); err != nil {
    return tx.Abort()
  }
  if_, err := c2.Insert(notAnnotatedStruct()); err != nil {
    return tx.Abort()
  }

  // Commit transaction and return final result
  return tx.Commit()
}

// Execute transaction
if err := db.Tx(complexOperation, options.Transaction()); err != nil {
  panic(err)
}

More information: https://docs.mongodb.com/manual/core/transactions/

Change Streams

Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to relevant data changes and immediately react to them.

Change streams are provided at the collection level by using the Subscribe method on a model instance.

shelf := db.Model("shelf")

// Open a stream to detect all operations performed on the
// 'shelf' collection
sub, err := shelf.Subscribe(PipelineCollection(), options.ChangeStream())
if err != nil {
  panic(err)
}

// Handle subscription events
go func() {
  for {
    select {
    case <-sub.Done():
      return
    case e := <-sub.Event():
      fmt.Printf("event: %+v\n", e)
    }
  }
}()

// When no longer needed, close the subscription
rt, err := sub.Close()
if err != nil {
  panic(err)
}
fmt.Printf("you can resume the subscription with: %v\n", rt)

More information: https://docs.mongodb.com/manual/changeStreams/

Documentation

Overview

Package orm provides a simple "Object-Relational Mapping" style library for MongoDB.

Operator

The main entrypoint to use the package is an 'Operator' instance. An operator provides a handler for a MongoDB database and manage the required client connection to it.

// MongoDB client settings
conf := options.Client()
conf.ApplyURI("mongodb://localhost:27017/?tls=false")
conf.SetDirect(true)
conf.SetMinPoolSize(2)
conf.SetReadPreference(readpref.Primary())
conf.SetAppName("super-cool-app")
conf.SetReplicaSet("rs1")

// Get a new operator instance
db, err := NewOperator("testing", conf)
if err != nil {
	panic(err)
}

// Use the operator instance. For example to check if the
// MongoDB server is reachable
if err = db.Ping(); err != nil {
	panic(err)
}

// Close the connection to the database when no longer needed
if err = db.Close(context.Background()); err != nil {
	panic(err)
}

Model

The most common use of an operator instance is to create Models. A model instance serves as a "wrapper" to a MongoDB collection and provides an easy-to-use API on top of it to greatly simplify common operations. The most common usage of a model instance is to provide CRUD operations without the need to use intermediary data structures.

The encoding/decoding rules used when storing or retrieving data using a model, are the following.

1. Only exported fields on structs are included

2. Default BSON encoder rules are applied

3. If available, `bson` tags are honored

4. If available, `json` tags are honored

https://pkg.go.dev/go.mongodb.org/mongo-driver/bson

This allows to easily store complex types without the need to modify the code to include `bson` tags, for example, when using Protobuf Messages.

Transactions

In MongoDB, an operation on a single document is atomic. Because you can use embedded documents and arrays to capture relationships between data in a single document structure instead of normalizing across multiple documents and collections, this single-document atomicity obviates the need for multi-document transactions for many practical use cases.

For situations that require atomicity of reads and writes to multiple documents (in a single or multiple collections), MongoDB supports multi-document transactions.

Using this package a transaction is executed using the 'Tx' method on an operator instance. The method takes a 'TransactionBody' function that atomically binds all operations performed inside of it, and returns the final result of committing or aborting the transaction. The active transaction must be set on any models used inside the transaction body.

c1 := db.Model("shelf")
c2 := db.Model("protos")

// Complex multi-collection operation
complexOperation := func(tx *Transaction) error {
	// Set the active transaction on all models used
	if err := c1.WithTransaction(tx); err != nil {
		return tx.Abort()
	}
	if err := c2.WithTransaction(tx); err != nil {
		return tx.Abort()
	}

	// Run tasks
	if _, err := c1.Insert(annotatedStruct()); err != nil {
		return tx.Abort()
	}
	if _, err := c2.Insert(notAnnotatedStruct()); err != nil {
		return tx.Abort()
	}

	// Commit transaction and return final result
	return tx.Commit()
}

// Execute transaction
if err := db.Tx(complexOperation, options.Transaction()); err != nil {
	panic(err)
}

More information: https://docs.mongodb.com/manual/core/transactions/

Streams

Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to relevant data changes and immediately react to them.

Change streams are provided at the collection level by using the 'Subscribe' method on a model instance.

shelf := db.Model("shelf")

// Open a stream to detect all operations performed on the
// 'shelf' collection
sub, err := shelf.Subscribe(PipelineCollection(), options.ChangeStream())
if err != nil {
	panic(err)
}

// Handle subscription events
go func() {
	for {
		select {
		case <-sub.Done():
			return
		case e := <-sub.Event():
			fmt.Printf("event: %+v\n", e)
		}
	}
}()

// When no longer needed, close the subscription
rt, err := sub.Close()
if err != nil {
	panic(err)
}
fmt.Printf("you can resume the subscription with: %v\n", rt)

More information: https://docs.mongodb.com/manual/changeStreams/

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Filter

func Filter() map[string]interface{}

Filter provides a simple shortcut for a commonly used filter value when no fields are specified.

func ParseID

func ParseID(id string) (primitive.ObjectID, error)

ParseID returns a MongoDB objectID instance from its hex-encoded representation.

func PipelineCollection

func PipelineCollection() mongo.Pipeline

PipelineCollection is a helper function to setup a pipeline to receive all change events for a specific MongoDB collection.

func PipelineUpdateDocument

func PipelineUpdateDocument(oid primitive.ObjectID) mongo.Pipeline

PipelineUpdateDocument returns a pipeline to receive update notifications for a specific document based on its '_id' field.

Types

type ChangeEvent

type ChangeEvent struct {
	// Metadata related to the operation. Acts as the resumeToken for
	// the resumeAfter parameter when resuming a change stream.
	ID bson.D `bson:"_id"`

	// The type of operation that occurred. Can be any of the following values:
	// insert, delete, replace, update, drop, rename, dropDatabase, invalidate
	Operation string `bson:"operationType"`

	// The namespace (database and or collection) affected by the event.
	Namespace ns `bson:"ns"`

	// The timestamp from the oplog entry associated with the event.
	ClusterTime primitive.Timestamp `bson:"clusterTime"`

	// A document that contains the _id of the document created or modified
	// by the insert, replace, delete, update operations (i.e. CRUD operations).
	// For sharded collections, also displays the full shard key for the
	// document.
	DocumentKey bson.D `bson:"documentKey"`

	// The document created or modified by the insert, replace, delete,
	// update operations (i.e. CRUD operations). For delete operations, this
	// field is omitted as the document no longer exists.
	//
	// For update operations, this field only appears if you configured the
	// change stream with fullDocument set to updateLookup.
	FullDocument bson.M `bson:"fullDocument"`

	// A document describing the fields that were updated or removed by the
	// update operation.
	UpdateDescription bson.M `bson:"updateDescription,omitempty"`
}

ChangeEvent provides a notification for an operation that modified state on a MongoDB instance. https://docs.mongodb.com/manual/reference/change-events/

func (*ChangeEvent) Decode

func (ce *ChangeEvent) Decode(target interface{}) error

Decode will load the 'fullDocument' contents of the event instance into the provided 'target' element.

type Model

type Model struct {
	// MongoDB's collection backing the model.
	Collection *mongo.Collection
	// contains filtered or unexported fields
}

Model instances serve as a "wrapper" to a MongoDB collection and provide an easy-to-use API on top of it to greatly simplify common tasks.

func (*Model) Batch

func (m *Model) Batch(item interface{}, opts ...*options.InsertManyOptions) (int64, error)

Batch executes an insert command to save multiple documents into the collection and return the number of successfully stored items. The provided item must be a slice.

Example
shelf := db.Model("shelf")

// Get a list (slice) of items to store no the database.
var list []*book

// Batch will store all the items on a single operation. The
// input provided must be a slice.
n, err := shelf.Batch(list)
if err != nil {
	panic(err)
}
fmt.Printf("documents saved: %d", n)
Output:

func (*Model) Count

func (m *Model) Count(filter map[string]interface{}) (int64, error)

Count returns the number of documents in the collection that satisfy the provided filter. An empty filter will count all the documents in the collection by performing a full scan. This operation trades-off speed for accuracy. For more information: https://docs.mongodb.com/manual/reference/method/db.collection.countDocuments/

func (*Model) Delete

func (m *Model) Delete(filter map[string]interface{}) error

Delete will look for the first document that satisfies the provided 'filter' value and permanently remove it. The operation does not fail if no document satisfy the filter value.

func (*Model) DeleteAll

func (m *Model) DeleteAll(filter map[string]interface{}) (int64, error)

DeleteAll will remove any document that satisfies the provided 'filter' value and return the number of documents deleted by the operation.

Example
mod := db.Model("authorities")

// Filter is based on the encoded records as they appear on
// the database.
filter := Filter()
filter["ca_constraint.is_ca"] = false

// DeleteAll will remove all documents satisfying the filter.
total, err := mod.DeleteAll(filter)
if err != nil {
	panic(err)
}
fmt.Printf("documents deleted: %d", total)
Output:

func (*Model) Distinct

func (m *Model) Distinct(field string, filter map[string]interface{}, result interface{}) error

Distinct allows to find the unique values for a specified field in the collection. If no 'filter' is specified a full scan of the collection is performed. Command documentation.

var list []string
err := mod.Distinct("user_type", Filter(), &list)

func (*Model) Estimate

func (m *Model) Estimate() (int64, error)

Estimate executes a count command and returns an estimate of the number of documents in the collection using available metadata. This operation trades-off accuracy for speed. For more information: https://docs.mongodb.com/manual/reference/method/db.collection.estimatedDocumentCount/

func (*Model) Find

func (m *Model) Find(filter map[string]interface{}, result interface{}, opts ...*options.FindOptions) error

Find all documents in the collection that satisfy the provided 'filter'. The returned documents will be automatically decoded into 'result', that must be a pointer to a slice.

func (*Model) FindByID

func (m *Model) FindByID(id string, result interface{}) error

FindByID looks for a given document based on its '_id' field. The provided 'id' value must be a MongoDB objectID hex string. The returned document is automatically decoded into 'result', that must be a pointer to a given struct.

Example
shelf := db.Model("shelf")

// ID must be a valid hex-encoded MongoDB ObjectID
id := "...hex id string..."

// The result will be automatically decoded to
// this instance
record := book{}

// Get the result from the collection using the model.
// Notice the result holder is passed by reference
// (i.e., is a pointer).
err := shelf.FindByID(id, &record)
if err != nil {
	panic(err)
}
Output:

func (*Model) First

func (m *Model) First(filter map[string]interface{}, result interface{}, opts ...*options.FindOneOptions) error

First looks for the first document in the collection that satisfies the specified 'filter'. The returned document is automatically decoded into 'result', that must be a pointer to a given struct.

func (*Model) Insert

func (m *Model) Insert(item interface{}) (string, error)

Insert the item in the model's underlying collection.

Example
// Create a model for the 'shelf' collection. The model
// will encode/decode objects using the `bson` annotations
// on the structs.
shelf := db.Model("shelf")

// Create an object to store on the database
sample := &book{
	Title:  "Sample book",
	Author: "John Dow",
	Pages:  137,
}

// Store the object
id, err := shelf.Insert(sample)
if err != nil {
	panic(err)
}
fmt.Printf("book stored with id: %s", id)
Output:

func (*Model) Subscribe

func (m *Model) Subscribe(pipeline mongo.Pipeline, opts *options.ChangeStreamOptions) (*Stream, error)

Subscribe will set up and return a stream instance that can used to receive change events based on the parameters provided.

Example
shelf := db.Model("shelf")

// Open a stream to detect all operations performed on the
// 'shelf' collection
sub, err := shelf.Subscribe(PipelineCollection(), options.ChangeStream())
if err != nil {
	panic(err)
}

// Handle subscription events
go func() {
	for {
		select {
		case <-sub.Done():
			return
		case e := <-sub.Event():
			fmt.Printf("event: %+v\n", e)
		}
	}
}()

// When no longer needed, close the subscription
rt, err := sub.Close()
if err != nil {
	panic(err)
}
fmt.Printf("you can resume the subscription with: %v\n", rt)
Output:

func (*Model) Update

func (m *Model) Update(filter map[string]interface{}, patch interface{}, upsert bool) error

Update will look for the first document that satisfies the 'filter' value and apply the 'patch' to it. If no such document currently exists, it will be automatically generated if 'upsert' is set to true.

func (*Model) UpdateAll

func (m *Model) UpdateAll(filter map[string]interface{}, patch interface{}) (int64, error)

UpdateAll will try to apply the provided 'patch' to all the documents satisfying the specified 'filter' and return the number of documents modified by the operation.

Example
shelf := db.Model("shelf")

// Filter and patch are based on the encoded records as
// they appear on the database.
filter := map[string]interface{}{
	"author_name": "John Dow",
}
patch := map[string]interface{}{
	"author_name": "Jane Dow",
}

// UpdateAll will apply the patch to all documents
// satisfying the filter.
total, err := shelf.UpdateAll(filter, patch)
if err != nil {
	panic(err)
}
fmt.Printf("documents updated: %d", total)
Output:

func (*Model) WithTransaction

func (m *Model) WithTransaction(tx *Transaction) error

WithTransaction sets the active transaction for the model. All CRUD operations are bound to the active transaction when executed. The active transaction will be removed automatically once it has been aborted or committed. This method will return an error if another transaction is currently active.

type Operator

type Operator struct {
	// contains filtered or unexported fields
}

Operator instances receive MongoDB client options and manage an underlying network connection. An operator can be used to instantiate any number of models.

func NewOperator

func NewOperator(db string, opts *options.ClientOptions) (*Operator, error)

NewOperator returns a new ORM operator instance for the specified MongoDB database. The operator will create and manage the required client connection(s) based on the provided configuration options. Remember to call the 'Close' method to free resources when the operator is no longer needed.

Example
// MongoDB client settings
conf := options.Client()
conf.ApplyURI("mongodb://localhost:27017/?tls=false")
conf.SetDirect(true)
conf.SetMinPoolSize(2)
conf.SetReadPreference(readpref.Primary())
conf.SetAppName("super-cool-app")
conf.SetReplicaSet("rs1")

// Get a new operator instance
db, err := NewOperator("testing", conf)
if err != nil {
	panic(err)
}

// Use the operator instance to create models. And the models
// to manage data on the database.
// ..

// Close the connection to the database when no longer needed
err = db.Close(context.Background())
if err != nil {
	panic(err)
}
Output:

func (*Operator) Close

func (op *Operator) Close(ctx context.Context) error

Close any existing MongoDB client connection.

func (*Operator) Model

func (op *Operator) Model(name string) *Model

Model returns a new model instance. The 'name' provided will also be used as the model's underlying MongoDB collection. The encoding rules for items stored and retrieved using the model are: 1. Only exported fields on structs are included 2. Default BSON encoder rules 3. Use `bson` tags, if available 4. Use `json` tags, if available 5. Marshal `nil` maps as empty BSON documents 6. Marshal `nil` slices as empty BSON arrays https://pkg.go.dev/go.mongodb.org/mongo-driver/bson

func (*Operator) Ping

func (op *Operator) Ping() error

Ping performs a reachability test to the MongoDB server used by the operator instance. A default timeout of 5 seconds is used.

func (*Operator) Tx

Tx can be used to enable causal consistency for a group of operations or to execute operations in an ACID transaction.

Example
c1 := db.Model("shelf")
c2 := db.Model("protos")

// Set transaction options
opts := options.Transaction()
opts.SetReadConcern(readconcern.Snapshot())
opts.SetWriteConcern(writeconcern.Majority())

// Complex multi-collection operation
complexOperation := func(tx *Transaction) error {
	// Set the active transaction on all models used
	if err := c1.WithTransaction(tx); err != nil {
		return tx.Abort()
	}
	if err := c2.WithTransaction(tx); err != nil {
		return tx.Abort()
	}

	// Run tasks
	if _, err := c1.Insert(annotatedStruct()); err != nil {
		return tx.Abort()
	}
	if _, err := c2.Insert(notAnnotatedStruct()); err != nil {
		return tx.Abort()
	}

	// Commit transaction and return final result
	return tx.Commit()
}

// Execute complex atomic operation
if err := db.Tx(complexOperation, opts); err != nil {
	panic(err)
}
Output:

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream provides a simple interface to listen for change events. More information:

https://docs.mongodb.com/manual/changeStreams/

func (*Stream) Close

func (s *Stream) Close() (bson.Raw, error)

Close the stream and free all its related resources, the method returns the stream's resume token (when available) and any error produced when closing the stream. The resume token can later be used when opening a new stream using the 'SetResumeAfter' configuration option.

conf.SetResumeAfter(rt)

func (*Stream) Done

func (s *Stream) Done() <-chan struct{}

Done is used to signal when the stream processing has terminated.

func (*Stream) Event

func (s *Stream) Event() <-chan ChangeEvent

Event channel is used to deliver any changes detected for the stream parameters.

type Transaction

type Transaction struct {
	// contains filtered or unexported fields
}

Transaction instances allow to atomically run several operations. If an operation fails, the whole transaction is reverted.

func (*Transaction) Abort

func (tx *Transaction) Abort() error

Abort the transaction and rollback all changes. This method will return an error if transaction is no longer active, has been committed or aborted.

func (*Transaction) Commit

func (tx *Transaction) Commit() error

Commit the transaction. This method will return an error if the transaction is no longer active or has been aborted.

func (*Transaction) Done

func (tx *Transaction) Done() <-chan struct{}

Done triggers a notification when the transaction is completed. Either by abort or commit.

func (*Transaction) ID

func (tx *Transaction) ID() bson.Raw

ID returns the session identifier for the transaction.

type TransactionBody

type TransactionBody func(tx *Transaction) error

TransactionBody serves as a wrapper to group all operations confirming a specific transaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL