Documentation
¶
Index ¶
- Constants
- Variables
- func CheckForDirtyWriteOnUpsert(updateResult *mongo.UpdateResult, inputErr error) (err error)
- func DuplicateKeyFiltersFromBulkWriteError(err error) (containsDuplicateKeyError bool, filtersForDups []interface{})
- func IsDuplicateKeyError(err error) bool
- func IsIndexNotFoundError(err error) bool
- func RetryDirtyWrite(dirtyWriteFunc RetryFunc) (err error)
- func RetryUpsertIfDuplicateKey(retryFunc RetryFunc) (err error)
- func TruncateStringSliceForMongoDoc(slice []string) (newSlice []string)
- func TruncateUUIDSliceForMongoDoc(slice []mongouuid.UUID) (newSlice []mongouuid.UUID)
- type DataStore
- func (a *DataStore) AddAndEnsureManagedIndexes(groupName string, addManagedIndexes []Index)
- func (a *DataStore) Collection(ctx context.Context, name string) (*mongo.Collection, error)
- func (a *DataStore) CollectionForWatch(ctx context.Context, name string) (*mongo.Collection, error)
- func (a *DataStore) CollectionLinearWriteRead(ctx context.Context, name string) (*mongo.Collection, error)
- func (a *DataStore) CollectionReadNearest(ctx context.Context, name string) (*mongo.Collection, error)
- func (a *DataStore) CollectionReadSecondaryPreferred(ctx context.Context, name string) (*mongo.Collection, error)
- func (a *DataStore) CollectionUnsafeFastWrites(ctx context.Context, name string) (*mongo.Collection, error)
- func (a *DataStore) ContextTimeout(ctx context.Context) (context.Context, context.CancelFunc)
- func (a *DataStore) Index(collectionName string, indexId IndexIdentifier) (idx Index, err error)
- func (a *DataStore) IndexOrPanic(collectionName string, indexId IndexIdentifier) (idx Index)
- func (a *DataStore) Ping(ctx context.Context) error
- func (a *DataStore) StartTask(managedIndexes []Index, opts ...DataStoreOption)
- func (a *DataStore) StopTask()
- type DataStoreOption
- func WithAuthMechanism(authMechanism string) DataStoreOption
- func WithConnectTimeoutSeconds(connectTimeoutSeconds uint64) DataStoreOption
- func WithDatabaseName(databaseName string) DataStoreOption
- func WithHosts(hosts []string) DataStoreOption
- func WithMaxFailedEnsureIndexesBackoffSeconds(maxFailedEnsureIndexesBackoffSeconds uint64) DataStoreOption
- func WithMaxPoolSize(maxPoolSize uint64) DataStoreOption
- func WithPassword(password string) DataStoreOption
- func WithPingHeartbeatSeconds(pingHeartbeatSeconds uint64) DataStoreOption
- func WithTimeoutSecondsQuery(timeoutSecondsQuery uint64) DataStoreOption
- func WithTimeoutSecondsShutdown(timeoutSecondsShutdown uint64) DataStoreOption
- func WithUri(uri string) DataStoreOption
- func WithUsername(username string) DataStoreOption
- type Index
- type IndexIdentifier
- type Options
- type RetryFunc
Constants ¶
const ( DefaultDatabaseName = "datastore" DefaultConnectTimeoutSeconds = uint64(10) DefaultTimeoutSecondsShutdown = uint64(10) DefaultTimeoutSecondsQuery = uint64(10) DefaultPingHeartbeatSeconds = uint64(10) DefaultMaxFailedEnsureIndexesBackoffSeconds = uint64(300) DefaultUsername = "" DefaultPassword = "" DefaultAuthMechanism = "PLAIN" DefaultMaxPoolSize = uint64(100) DefaultHost = "localhost:27017" )
const ( ASC = 1 DESC = -1 )
const (
MaxSliceSizePerMongoDocument = uint64(10 * 1024 * 1024)
)
Variables ¶
var DirtyWriteError = errors.New("dirty write error")
var ErrorServiceNotStarted = errors.New("getting mongo client failed: service is not started or shutdown")
Functions ¶
func CheckForDirtyWriteOnUpsert ¶
func CheckForDirtyWriteOnUpsert(updateResult *mongo.UpdateResult, inputErr error) (err error)
CheckForDirtyWriteOnUpsert is expected to be used like this: Add a field to your struct called "DirtyWriteGuard"
type Person struct { ... DirtyWriteGuard uint64 `bson:"dirtyWriteGuard"` }
Then when you update mongo:
filter := bson.D{ {"_id", person.Id}, // Note: must use _id or some indexed field with a unique constraint. // where device.DirtyWriteGuard is 0 on new or == to the dirtyWriteGuard field of the entity we expect in the collection {"dirtyWriteGuard", person.DirtyWriteGuard}, // this value should be unmodified by your code as it was loaded from mongo. } // increment the counter before update or insert person.DirtyWriteGuard++ defer func() { if err != nil { // if upsert fails decrement the counter person.DirtyWriteGuard-- } }() updateOptions := &options.ReplaceOptions{} updateOptions.SetUpsert(true) var updateResult *mongo.UpdateResult updateResult, err = collection.ReplaceOne(ctx, filter, person, updateOptions) err = mongostore.CheckForDirtyWriteOnUpsert(updateResult, err) if err != nil { if err != mongostore.DirtyWriteError { // only log or mess with err returned if not a DirtyWriteError logger.Instance().ErrorIgnoreCancel(ctx, "error on ReplaceOne for Person", logger.Error(err)) err = errors.Wrap(err, "error on ReplaceOne for Person") } return }
In the expected dirty write case mongo will return updateResult.MatchedCount == 0 && updateResult.UpsertedID == nil. Meaning that 0 documents matched the filter with the unique id and the dirtyWriteGuard equality.
In case of no dirty write and no error returned by the UpdateOne() we expect either an insert (updateResult.UpsertedID has a value) or an updated existing document (updateResult.MatchedCount == 1).
In the the real-world and tested case mongo will return E11000 duplicate key error collection in case of dirty write. This is because no document will exist that matches _id and dirtyWriteGuard causing mongo to attempt to insert a new document which will return duplicate key error.
return E11000 duplicate key error collection in case of dirty write. This is because no document will exist that matches _id and dirtyWriteGuard causing mongo to attempt to insert a new document which will return duplicate key error. In case of no dirty write and no error returned by the UpdateOne() we expect either an insert (updateResult.UpsertedID has a value) or an updated existing document (updateResult.MatchedCount == 1).
func DuplicateKeyFiltersFromBulkWriteError ¶
func DuplicateKeyFiltersFromBulkWriteError(err error) (containsDuplicateKeyError bool, filtersForDups []interface{})
DuplicateKeyFiltersFromBulkWriteError returns true if there are any E11000 duplicate key errors. Returns a slice of whatever you passed into the mongo command for your Filter or Filters if BulkWrite(). Should be a primitive.M or primitive.D.
func IsDuplicateKeyError ¶
IsDuplicateKeyError can help handle expected behavior for any mongo command that uses Upsert. If IsDuplicateKeyError returns true for the error returned by a mongo operation, and you are using Upsert then you are expected to retry.
Will return false if there are multiple nested mongo writeExceptions and one of the errors has a Code != 11000 (duplicate key) indicating there are other errors that should be handled and not ignored or handled the same.
func IsIndexNotFoundError ¶
func RetryDirtyWrite ¶
RetryDirtyWrite is used by callers of functions that call CheckForDirtyWriteOnUpsert and can return DirtyWriteError. It will retry the anonymous function code up to 100 times before giving up if a dirty write error is detected. The caller of RetryDirtyWrite needs to ensure it has logic to refresh the copy of the object or objects its updating with a fresh copy from the collection.
Example: // This code will be run repeatedly until there is no DirtyWriteError or the max retries is exceeded. err = mongostore.RetryDirtyWrite(func() error { var retryErr error // query an entity from the collection that has a dirtyWriteGuard uint64 field var existingPerson *Person existingPerson, retryErr = YourFunctionThatDoesMongoFind(ctx, personId) // ...logic that makes changes to existingPerson which could be now stale // YourFunctionThatDoesMongoUpsert can return DirtyWriteError if retryErr = YourFunctionThatDoesMongoUpsert(ctx, existingPerson); retryErr != nil { if retryErr != mongostore.DirtyWriteError { logger.Instance().ErrorIgnoreCancel(ctx, "error in YourFunctionThatDoesMongoUpsert", logger.Error(retryErr)) } return retryErr } return nil })
func RetryUpsertIfDuplicateKey ¶
RetryUpsertIfDuplicateKey can help handle expected behavior for any mongo command that uses Upsert. The retryFunc will be tried up to numRetries times before giving up.
BE WARNED: Mongo can at any time return E11000 duplicate key error for ANY command with Upsert enabled. Mongo expects the application to handle this error. This happens if: "During an update with upsert:true option, two (or more) threads may attempt an upsert operation using the same query predicate and, upon not finding a match, the threads will attempt to insert a new document. Both inserts will (and should) succeed, unless the second causes a unique constraint violation." See: https://jira.mongodb.org/browse/SERVER-14322
Note: its documented that mongo retries on its own under the usual cases. While that may be true im seeing an error being returned in the wild as of mongo 6.x.
Example: opt := &options.ReplaceOptions{} opt.SetUpsert(true) // This code will be run repeatedly until there is no DuplicateKeyError or the max retries is exceeded. err = mongostore.RetryUpsertIfDuplicateKey(func() error { _, retryErr := yourCollection.ReplaceOne(ctx, yourFilter, yourDocument, opt) return retryErr })
func TruncateStringSliceForMongoDoc ¶
TruncateStringSliceForMongoDoc ensures a string slice will fit in the mongodb doc size limit and truncates the slice if necessary logging a warning.
Types ¶
type DataStore ¶
func Instance ¶
func Instance() *DataStore
Instance returns an instance of the data store singleton. This ensures you only have once instance of this per program. All connections will be polled automatically and you have no work for startup or cleaning up connections than running Instance().StartTask() and Instance().StopTask().
The singleton is multithreading safe. Reference anywhere you need a connection to mongo or want to add Indexes after startup: e.g. mongostore.Instance().CollectionLinearWriteRead(...) mongostore.Instance().AddAndEnsureManagedIndexes(...)
func (*DataStore) AddAndEnsureManagedIndexes ¶
AddAndEnsureManagedIndexes adds additional indexes to be managed after startup. groupName must be unique and each group must operate on a different set of Collections than another group. If groupName is already registered then this function does nothing and returns. If this group has Collections overlapping with another managed group then panics. AddAndEnsureManagedIndexes will do the work in a new go routine. If there are problems you will need to watch log messages. If it's unable to connect to mongo it will keep retrying using an exponential backoff with a default of DefaultMaxFailedEnsureIndexesBackoffSeconds configurable with WithMaxFailedEnsureIndexesBackoffSeconds().
func (*DataStore) Collection ¶
Collection calls CollectionLinearWriteRead()
func (*DataStore) CollectionForWatch ¶
CollectionForWatch creates a connection with: - readconcern.Majority() - readpref.SecondaryPreferred() - writeconcern.J(true) - writeconcern.WMajority()
This is recommended for use with Change Streams (Watch()). The write concerns are just in case you use it for writes by accident.
func (*DataStore) CollectionLinearWriteRead ¶
func (a *DataStore) CollectionLinearWriteRead(ctx context.Context, name string) (*mongo.Collection, error)
CollectionLinearWriteRead creates a connection with: - readconcern.Linearizable() - readpref.Primary() - writeconcern.J(true) - writeconcern.WMajority()
This connection supplies: "Casual Consistency" in a sharded cluster inside a single client thread. https://www.mongodb.com/docs/manual/core/read-isolation-consistency-recency/#std-label-sessions
Note: readpref.Primary() is critical for reads to consistently return results in the same go routine immediately after an insert. And perhaps not well documented.
func (*DataStore) CollectionReadNearest ¶
func (a *DataStore) CollectionReadNearest(ctx context.Context, name string) (*mongo.Collection, error)
CollectionReadNearest creates a connection with: - readconcern.Majority() - readpref.Nearest() - writeconcern.J(true) - writeconcern.WMajority()
func (*DataStore) CollectionReadSecondaryPreferred ¶
func (a *DataStore) CollectionReadSecondaryPreferred(ctx context.Context, name string) (*mongo.Collection, error)
CollectionReadSecondaryPreferred creates a connection with: - readconcern.Majority() - readpref.SecondaryPreferred() - writeconcern.J(true) - writeconcern.WMajority()
func (*DataStore) CollectionUnsafeFastWrites ¶
func (a *DataStore) CollectionUnsafeFastWrites(ctx context.Context, name string) (*mongo.Collection, error)
CollectionUnsafeFastWrites creates a connection with: - readconcern.Local() - readpref.Primary() - writeconcern.J(false) - writeconcern.W(1)
func (*DataStore) ContextTimeout ¶
func (*DataStore) Index ¶
func (a *DataStore) Index(collectionName string, indexId IndexIdentifier) (idx Index, err error)
func (*DataStore) IndexOrPanic ¶
func (a *DataStore) IndexOrPanic(collectionName string, indexId IndexIdentifier) (idx Index)
func (*DataStore) StartTask ¶
func (a *DataStore) StartTask(managedIndexes []Index, opts ...DataStoreOption)
StartTask starts the background routines. Call this once on startup from your main.go. Call StopTask() on exit. Indexes supplied here will be managed in a separate go routine after startup.
type DataStoreOption ¶
type DataStoreOption func(o *Options)
func WithAuthMechanism ¶
func WithAuthMechanism(authMechanism string) DataStoreOption
func WithConnectTimeoutSeconds ¶
func WithConnectTimeoutSeconds(connectTimeoutSeconds uint64) DataStoreOption
func WithDatabaseName ¶
func WithDatabaseName(databaseName string) DataStoreOption
func WithHosts ¶
func WithHosts(hosts []string) DataStoreOption
func WithMaxFailedEnsureIndexesBackoffSeconds ¶
func WithMaxFailedEnsureIndexesBackoffSeconds(maxFailedEnsureIndexesBackoffSeconds uint64) DataStoreOption
func WithMaxPoolSize ¶
func WithMaxPoolSize(maxPoolSize uint64) DataStoreOption
func WithPassword ¶
func WithPassword(password string) DataStoreOption
func WithPingHeartbeatSeconds ¶
func WithPingHeartbeatSeconds(pingHeartbeatSeconds uint64) DataStoreOption
func WithTimeoutSecondsQuery ¶
func WithTimeoutSecondsQuery(timeoutSecondsQuery uint64) DataStoreOption
func WithTimeoutSecondsShutdown ¶
func WithTimeoutSecondsShutdown(timeoutSecondsShutdown uint64) DataStoreOption
func WithUri ¶
func WithUri(uri string) DataStoreOption
func WithUsername ¶
func WithUsername(username string) DataStoreOption
type Index ¶
type Index struct { CollectionName string Id IndexIdentifier Version uint64 // increment any time the model or options changes - calling createIndex() with the same name but different \ // options than an existing index will throw an error MongoError: \ // Index with name: **MongoIndexName** already exists with different options Model mongo.IndexModel }
func (Index) MongoIndexName ¶
type IndexIdentifier ¶
type IndexIdentifier string
func (IndexIdentifier) String ¶
func (iid IndexIdentifier) String() string