mongostore

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2024 License: MIT Imports: 20 Imported by: 1

Documentation

Index

Constants

View Source
const (
	DefaultDatabaseName                         = "datastore"
	DefaultConnectTimeoutSeconds                = uint64(10)
	DefaultTimeoutSecondsShutdown               = uint64(10)
	DefaultTimeoutSecondsQuery                  = uint64(10)
	DefaultPingHeartbeatSeconds                 = uint64(10)
	DefaultMaxFailedEnsureIndexesBackoffSeconds = uint64(300)
	DefaultUsername                             = ""
	DefaultPassword                             = ""
	DefaultAuthMechanism                        = "PLAIN"
	DefaultMaxPoolSize                          = uint64(100)
	DefaultHost                                 = "localhost:27017"
)
View Source
const (
	ASC  = 1
	DESC = -1
)
View Source
const (
	MaxSliceSizePerMongoDocument = uint64(10 * 1024 * 1024)
)

Variables

View Source
var DirtyWriteError = errors.New("dirty write error")
View Source
var ErrorServiceNotStarted = errors.New("getting mongo client failed: service is not started or shutdown")

Functions

func CheckForDirtyWriteOnDeleteOne

func CheckForDirtyWriteOnDeleteOne(deleteResult *mongo.DeleteResult, inputErr error) (err error)

CheckForDirtyWriteOnDeleteOne is expected to be used like this: Add a field to your struct called "DirtyWriteGuard"

type Person struct {
  ...
  DirtyWriteGuard uint64  `bson:"dirtyWriteGuard"`
}

Then when you delete from mongo and want to ensure another thread has not already upserted a replacement document:

	filter := bson.D{
		{"_id", person.Id},	// Note: must use _id or some indexed field with a unique constraint.
     	// where person.DirtyWriteGuard is 0 on new or == to the dirtyWriteGuard field of the entity we expect in the collection
		{"dirtyWriteGuard", person.DirtyWriteGuard}, // this value should be unmodified by your code as it was loaded from mongo.
	}

	var deleteResult *mongo.DeleteResult
	deleteResult, err = collection.DeleteOne(ctx, filter)
	err = mongostore.CheckForDirtyWriteOnDeleteOne(deleteResult, err)
	if err != nil {
		if err != mongostore.DirtyWriteError {
			// only log or mess with err returned if not a DirtyWriteError
			logger.Instance().ErrorIgnoreCancel(ctx, "error on ReplaceOne for Person", logger.Error(err))
			err = errors.Wrap(err, "error on ReplaceOne for Person")
		}
		return
	}

In the expected dirty write case mongo will return deleteResult.DeletedCount == 0. Meaning that 0 documents matched the filter with the unique id and the dirtyWriteGuard equality.

In case of no dirty write and no error returned by the UpdateOne() we expect one document to be deleted (deleteResult.DeletedCount == 1).

func CheckForDirtyWriteOnUpsert

func CheckForDirtyWriteOnUpsert(updateResult *mongo.UpdateResult, inputErr error) (err error)

CheckForDirtyWriteOnUpsert is expected to be used like this: Add a field to your struct called "DirtyWriteGuard"

type Person struct {
  ...
  DirtyWriteGuard uint64  `bson:"dirtyWriteGuard"`
}

Then when you update mongo:

	filter := bson.D{
		{"_id", person.Id},	// Note: must use _id or some indexed field with a unique constraint.
     	// where person.DirtyWriteGuard is 0 on new or == to the dirtyWriteGuard field of the entity we expect in the collection
		{"dirtyWriteGuard", person.DirtyWriteGuard}, // this value should be unmodified by your code as it was loaded from mongo.
	}

	// increment the counter before update or insert
	person.DirtyWriteGuard++
	defer func() {
		if err != nil {
			// if upsert fails decrement the counter
			person.DirtyWriteGuard--
		}
	}()

	updateOptions := &options.ReplaceOptions{}
	updateOptions.SetUpsert(true)

	var updateResult *mongo.UpdateResult
	updateResult, err = collection.ReplaceOne(ctx, filter, person, updateOptions)
	err = mongostore.CheckForDirtyWriteOnUpsert(updateResult, err)
	if err != nil {
		if err != mongostore.DirtyWriteError {
			// only log or mess with err returned if not a DirtyWriteError
			logger.Instance().ErrorIgnoreCancel(ctx, "error on ReplaceOne for Person", logger.Error(err))
			err = errors.Wrap(err, "error on ReplaceOne for Person")
		}
		return
	}

In the expected dirty write case mongo will return updateResult.MatchedCount == 0 && updateResult.UpsertedID == nil. Meaning that 0 documents matched the filter with the unique id and the dirtyWriteGuard equality.

In case of no dirty write and no error returned by the UpdateOne() we expect either an insert (updateResult.UpsertedID has a value) or an updated existing document (updateResult.MatchedCount == 1).

In the the real-world and tested case mongo will return E11000 duplicate key error collection in case of dirty write. This is because no document will exist that matches _id and dirtyWriteGuard causing mongo to attempt to insert a new document which will return duplicate key error.

return E11000 duplicate key error collection in case of dirty write. This is because no document will exist that matches _id and dirtyWriteGuard causing mongo to attempt to insert a new document which will return duplicate key error. In case of no dirty write and no error returned by the UpdateOne() we expect either an insert (updateResult.UpsertedID has a value) or an updated existing document (updateResult.MatchedCount == 1).

func DuplicateKeyFiltersFromBulkWriteError

func DuplicateKeyFiltersFromBulkWriteError(err error) (containsDuplicateKeyError bool, filtersForDups []interface{})

DuplicateKeyFiltersFromBulkWriteError returns true if there are any E11000 duplicate key errors. Returns a slice of whatever you passed into the mongo command for your Filter or Filters if BulkWrite(). Should be a primitive.M or primitive.D.

func IsDuplicateKeyError

func IsDuplicateKeyError(err error) bool

IsDuplicateKeyError can help handle expected behavior for any mongo command that uses Upsert. If IsDuplicateKeyError returns true for the error returned by a mongo operation, and you are using Upsert then you are expected to retry.

Will return false if there are multiple nested mongo writeExceptions and one of the errors has a Code != 11000 (duplicate key) indicating there are other errors that should be handled and not ignored or handled the same.

func IsIndexNotFoundError

func IsIndexNotFoundError(err error) bool

func RetryDirtyWrite

func RetryDirtyWrite(dirtyWriteFunc RetryFunc) (err error)

RetryDirtyWrite is used by callers of functions that call CheckForDirtyWriteOnUpsert and can return DirtyWriteError. It will retry the anonymous function code up to 100 times before giving up if a dirty write error is detected. The caller of RetryDirtyWrite needs to ensure it has logic to refresh the copy of the object or objects its updating with a fresh copy from the collection.

 Example:
 // This code will be run repeatedly until there is no DirtyWriteError or the max retries is exceeded.
	err = mongostore.RetryDirtyWrite(func() error {
		var retryErr error

		// query an entity from the collection that has a dirtyWriteGuard uint64 field
		var existingPerson *Person
		existingPerson, retryErr = YourFunctionThatDoesMongoFind(ctx, personId)

		// ...logic that makes changes to existingPerson which could be now stale

		// YourFunctionThatDoesMongoUpsert can return DirtyWriteError
		if retryErr = YourFunctionThatDoesMongoUpsert(ctx, existingPerson); retryErr != nil {
			if retryErr != mongostore.DirtyWriteError {
				logger.Instance().ErrorIgnoreCancel(ctx, "error in YourFunctionThatDoesMongoUpsert", logger.Error(retryErr))
			}
			return retryErr
		}
		return nil
	})

func RetryDirtyWriteDebug added in v1.0.2

func RetryDirtyWriteDebug(dirtyWriteFunc RetryFunc) (err error)

RetryDirtyWriteDebug is the same as RetryDirtyWrite except it will log a warning every dirty write.

func RetryUpsertIfDuplicateKey

func RetryUpsertIfDuplicateKey(retryFunc RetryFunc) (err error)

RetryUpsertIfDuplicateKey can help handle expected behavior for any mongo command that uses Upsert. The retryFunc will be tried up to numRetries times before giving up.

BE WARNED: Mongo can at any time return E11000 duplicate key error for ANY command with Upsert enabled. Mongo expects the application to handle this error. This happens if: "During an update with upsert:true option, two (or more) threads may attempt an upsert operation using the same query predicate and, upon not finding a match, the threads will attempt to insert a new document. Both inserts will (and should) succeed, unless the second causes a unique constraint violation." See: https://jira.mongodb.org/browse/SERVER-14322

Note: its documented that mongo retries on its own under the usual cases. While that may be true im seeing an error being returned in the wild as of mongo 6.x.

 	Example:
	opt := &options.ReplaceOptions{}
	opt.SetUpsert(true)
 	// This code will be run repeatedly until there is no DuplicateKeyError or the max retries is exceeded.
	err = mongostore.RetryUpsertIfDuplicateKey(func() error {
		_, retryErr := yourCollection.ReplaceOne(ctx, yourFilter, yourDocument, opt)
		return retryErr
	})

func TruncateStringSliceForMongoDoc

func TruncateStringSliceForMongoDoc(slice []string) (newSlice []string)

TruncateStringSliceForMongoDoc ensures a string slice will fit in the mongodb doc size limit and truncates the slice if necessary logging a warning.

Types

type DataStore

type DataStore struct {
	// contains filtered or unexported fields
}

func New

func New(opts ...DataStoreOption) *DataStore

New returns a new instance of the data store. You should only create one instance in your program and re-use it. You must call Stop() when your program exits.

The instance returned is multithreading safe.

func NewWithManagedIndexes

func NewWithManagedIndexes(managedIndexes []Index, opts ...DataStoreOption) *DataStore

func (*DataStore) AddAndEnsureManagedIndexes

func (a *DataStore) AddAndEnsureManagedIndexes(groupName string, addManagedIndexes []Index)

AddAndEnsureManagedIndexes adds additional indexes to be managed after startup. groupName must be unique and each group must operate on a different set of Collections than another group. If groupName is already registered then this function does nothing and returns. If this group has Collections overlapping with another managed group then panics. AddAndEnsureManagedIndexes will do the work in a new go routine. If there are problems you will need to watch log messages. If it's unable to connect to mongo it will keep retrying using an exponential backoff with a default of DefaultMaxFailedEnsureIndexesBackoffSeconds configurable with WithMaxFailedEnsureIndexesBackoffSeconds().

func (*DataStore) Collection

func (a *DataStore) Collection(ctx context.Context, name string) (*mongo.Collection, error)

Collection calls CollectionLinearWriteRead()

func (*DataStore) CollectionForWatch

func (a *DataStore) CollectionForWatch(ctx context.Context, name string) (*mongo.Collection, error)

CollectionForWatch creates a connection with: - readconcern.Majority() - readpref.SecondaryPreferred() - writeconcern.J(true) - writeconcern.WMajority()

This is recommended for use with Change Streams (Watch()). The write concerns are just in case you use it for writes by accident.

func (*DataStore) CollectionLinearWriteRead

func (a *DataStore) CollectionLinearWriteRead(ctx context.Context, name string) (*mongo.Collection, error)

CollectionLinearWriteRead creates a connection with: - readconcern.Linearizable() - readpref.Primary() - writeconcern.J(true) - writeconcern.WMajority()

This connection supplies: "Casual Consistency" in a sharded cluster inside a single client thread. https://www.mongodb.com/docs/manual/core/read-isolation-consistency-recency/#std-label-sessions

Note: readpref.Primary() is critical for reads to consistently return results in the same go routine immediately after an insert. And perhaps not well documented.

func (*DataStore) CollectionReadNearest

func (a *DataStore) CollectionReadNearest(ctx context.Context, name string) (*mongo.Collection, error)

CollectionReadNearest creates a connection with: - readconcern.Majority() - readpref.Nearest() - writeconcern.J(true) - writeconcern.WMajority()

func (*DataStore) CollectionReadSecondaryPreferred

func (a *DataStore) CollectionReadSecondaryPreferred(ctx context.Context, name string) (*mongo.Collection, error)

CollectionReadSecondaryPreferred creates a connection with: - readconcern.Majority() - readpref.SecondaryPreferred() - writeconcern.J(true) - writeconcern.WMajority()

func (*DataStore) CollectionUnsafeFastWrites

func (a *DataStore) CollectionUnsafeFastWrites(ctx context.Context, name string) (*mongo.Collection, error)

CollectionUnsafeFastWrites creates a connection with: - readconcern.Local() - readpref.Primary() - writeconcern.J(false) - writeconcern.W(1)

func (*DataStore) ContextTimeout

func (a *DataStore) ContextTimeout(ctx context.Context) (clientCtx context.Context, cancelFunc context.CancelFunc)

ContextTimeout merges the ctx argument with a new context with a timeout using the WithTimeoutSecondsQuery option on the DataStore and returns a new context the client should use on queries to mon go. The cancelFunc returned MUST be called when the client is done using the context for queries in the span of the timeout. By merging the contexts (see onecontext.Merge) the client can distinguish between a failure from timeout on a query v.s. an interrupted client.

Example:

	queryCtx, cancel := mgocluster.Instance().Cluster(impl.clusterName).ContextTimeout(clientCtx)
	defer cancel()

 // Note use the clientCtx for obtaining the collection it uses a separate connect timeout.
	var collection *mongo.Collection
	if collection, err = mgocluster.Instance().Cluster(impl.clusterName).CollectionLinearWriteRead(clientCtx, impl.collectionName); err != nil {
		return
	}
	...
	if mongoCursor, err = collection.Find(queryCtx, filter);  err != nil {
    return
 }

func (*DataStore) ContextTimeoutWithDuration

func (a *DataStore) ContextTimeoutWithDuration(ctx context.Context, timeout time.Duration) (clientCtx context.Context, cancelFunc context.CancelFunc)

ContextTimeoutWithDuration is same as ContextTimeout except the duration is given as input instead of coming from the DataStore options.

func (*DataStore) Index

func (a *DataStore) Index(collectionName string, indexId IndexIdentifier) (idx Index, err error)

func (*DataStore) IndexOrPanic

func (a *DataStore) IndexOrPanic(collectionName string, indexId IndexIdentifier) (idx Index)

func (*DataStore) Ping

func (a *DataStore) Ping(clientCtx context.Context) error

func (*DataStore) Stop

func (a *DataStore) Stop()

Stop disconnects the mongo clients and stops the background routines. Call this once on exit of your main.go.

type DataStoreOption

type DataStoreOption func(o *Options)

func WithAuthMechanism

func WithAuthMechanism(authMechanism string) DataStoreOption

func WithConnectTimeoutSeconds

func WithConnectTimeoutSeconds(connectTimeoutSeconds uint64) DataStoreOption

func WithDatabaseName

func WithDatabaseName(databaseName string) DataStoreOption

func WithHosts

func WithHosts(hosts []string) DataStoreOption

func WithMaxFailedEnsureIndexesBackoffSeconds

func WithMaxFailedEnsureIndexesBackoffSeconds(maxFailedEnsureIndexesBackoffSeconds uint64) DataStoreOption

func WithMaxPoolSize

func WithMaxPoolSize(maxPoolSize uint64) DataStoreOption

func WithMonitor

func WithMonitor(monitor *event.CommandMonitor) DataStoreOption

func WithPassword

func WithPassword(password string) DataStoreOption

func WithPingHeartbeatSeconds

func WithPingHeartbeatSeconds(pingHeartbeatSeconds uint64) DataStoreOption

func WithTimeoutSecondsQuery

func WithTimeoutSecondsQuery(timeoutSecondsQuery uint64) DataStoreOption

func WithTimeoutSecondsShutdown

func WithTimeoutSecondsShutdown(timeoutSecondsShutdown uint64) DataStoreOption

func WithUri

func WithUri(uri string) DataStoreOption

func WithUsername

func WithUsername(username string) DataStoreOption

type Index

type Index struct {
	CollectionName string
	Id             IndexIdentifier
	Version        uint64 // increment any time the model or options changes - calling createIndex() with the same name but different \
	// options than an existing index will throw an error MongoError: \
	// Index with name: **MongoIndexName** already exists with different options
	Model       mongo.IndexModel
	SkipVersion bool
}

func (Index) MongoIndexName

func (idx Index) MongoIndexName() string

type IndexIdentifier

type IndexIdentifier string

func (IndexIdentifier) String

func (iid IndexIdentifier) String() string

type Options

type Options struct {
	// contains filtered or unexported fields
}

type RetryFunc

type RetryFunc func() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL