betterwatcher

package module
v0.0.0-...-3f71644 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2019 License: MIT Imports: 11 Imported by: 0

README

While working with mongo watchers, I had to do a lot of work scaling it out and managing resume tokens.
I had to make sure that each message is processed only once, since mongo watcher updates are broadcasted in fanout mode which results in messages being processed multiple times if you scale the watchers on mutliple servers, and I didn't want to manage additional single server dedicated only to updates.

I decided to write a library to run multiple watcher instances and still handle each message once, and manage resume token internally, so that it proceeds to read after last successfuly handled message.

Only one instance at a time will run watcher on collection, the other instances will serve as failover instances, the moment the current watcher instance closes stream due to panic or internal error, any other instance will automatically take over.

Import

go get -u github.com/markshapiro/go-mongo-better-watcher

import (
	betterwatcher "github.com/markshapiro/go-mongo-better-watcher"
)

Usage

myCollection := mognoConn.Database("my_database").Collection("MyCollection")

config := betterwatcher.RedisConfig{ Url: "127.0.0.1:6379" }

opts := betterwatcher.Options{
    AttachDocumentOnUpdate: true,
    WatcherSwitchAfter:     1 * time.Minute,
    MaxRetries:             3,
}

factory := betterwatcher.New(config, opts)

pipeline := []bson.M{
    bson.M{"$match": bson.M{
        "$or": []bson.M{
            bson.M{"operationType": "insert"}, // listen to inserts
            //bson.M{"operationType": "update"}, // listen to updates
        },
    }},
}

watchId := "watch_id"

go func() {
    err = factory.CreateWatcher(myCollection, pipeline, watchId, func(changeDoc *betterwatcher.ChangeDoc) error {
        // handler
        
        return nil
    })
}()

Config

New() takes redis connection config as first parameter, with either database endpoint passed in Url property, or a client of github.com/go-redis/redis library in Client property.

Options

WatcherSwitchAfter

no switching by default

If you want your watcher instances to switch once in a while to balance out the load, use this parameter to pass a time duration after which the instances should switch.

AttachDocumentOnUpdate

default: false

Will pass full document to handler when document was modified (by default it happens only for inserted documents)

MaxRetries

default: unlimited

How many times should the library retry to process the message before discarding it, when handler returns error.

NOTE

  1. When update handler returns error, the message will be reprocessed unlimited amount of times and hold the processing of next messages, unless MaxRetries is specified.
  2. If your message handler runs too long, it will delay the handling of next message, in this case it would be better to forward the messages into some queue.

TODO

There is an idea for proper solution for horizontal scaling described here: https://stackoverflow.com/questions/54295043/what-is-a-good-horizontal-scaling-strategy-for-a-mongodb-change-stream-reader but it would work only from mongo version 4 because of conversion of ObjectId to string before applying regex.

Documentation

Index

Constants

View Source
const (
	LOCK_TTL_DEFAULT       = 1 * time.Minute
	LOCK_OBTAIN_RETRY_TIME = 10 * time.Second
	LOCK_REFRESH_TIME      = 10 * time.Second
	LOCK_PREFIX            = "betterwatcher::lock::"
)

Variables

This section is empty.

Functions

func GetRedisInstance

func GetRedisInstance(url string) *redis.Client

Types

type ChangeDoc

type ChangeDoc struct {
	Id     *bson.Raw `bson:"_id"`
	DocKey struct {
		Id primitive.ObjectID `bson:"_id"`
	} `bson:"documentKey"`
	Operation         string              `bson:"operationType"`
	FullDoc           *bson.Raw           `bson:"fullDocument"`
	Timestamp         primitive.Timestamp `bson:"clusterTime"`
	UpdateDescription struct {
		UpdatedFields *bson.Raw `bson:"updatedFields"`
		RemovedFields []string  `bson:"removedFields"`
	} `bson:"updateDescription"`
}

type Options

type Options struct {
	Context                context.Context
	AttachDocumentOnUpdate bool
	WatcherSwitchAfter     time.Duration
	MaxRetries             int
}

type RedisConfig

type RedisConfig struct {
	Url    string
	Client *redis.Client
}

type WatcherFactory

type WatcherFactory struct {
	// contains filtered or unexported fields
}

func New

func New(redisConfig RedisConfig, opts ...Options) *WatcherFactory

func (WatcherFactory) CreateWatcher

func (factory WatcherFactory) CreateWatcher(collection *mongo.Collection, pipeline []bson.M, watcherId string, handler func(*ChangeDoc) error) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL