mongobatch

package module
v0.0.0-...-7bdccb9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2019 License: MIT Imports: 7 Imported by: 0

README

A reliable batch processor for Mongo

This library is a helper for processing batches in Mongo reliably.

Features

  • Fetch records with a defined query
  • Update records “processing”
  • Option to update records to "processed" by batch, time interval or rolling
  • Support buffered streaming
  • Timeout to update “Processing” back to “initial”

TODO

  • Opting in/out time fields to store
  • Cope when mongo connection interrupts
  • Distributed locking to support multiple daemons
  • Option to opt in/out ME when fetching batches
  • Error channel

Documentation

Index

Constants

View Source
const (
	DefaultStateFld          = "state"
	DefaultProcessingTimeFld = "processing_at"
	DefaultProcessedTimeFld  = "processed_at"
	DefaultRevertedTimeFld   = "reverted_at"
	DefaultFetchOrder        = "updated_at"
	DefaultFetchLimit        = 10
	DefaultProcessingState   = "processing"
	DefaultProcessedState    = "processed"
	DefaultErrorSleep        = 1000 //Millisecond  (1000 = 1 second)
	DefaultNoRecordSleep     = 5000 //Millisecond (5000 = 5 seconds)
	// update configs
	DefaultMaxInterval = 10000 //Millisecond (10000 = 10 seconds)
	DefaultMinRecords  = 30
	//reattempt times
	DefaultVisibilityTimeout = 3600 //seconds (3600 = 1 hour)
	DefaultCronInterval      = 600  //seconds (600 = 10 mins)
)

Variables

This section is empty.

Functions

func BatchTimeout

func BatchTimeout(conf *Configuration) <-chan bool

It returns a close channel that can be used to cancel the process.

func BufferBatch

func BufferBatch(conf *Configuration, result interface{}, bufsize int) chan interface{}

BufferBatch returns a buffered channel of length bufsize that will stream fetched objects capped at bufsize. This is a convenience method for FetchBatch.

func FetchBatch

func FetchBatch(conf *Configuration, results interface{}) error

For instance:

   var results []struct{ Value int }
	  config := NewConfiguration("localhost", 27017, "salaries", "batch")
   err := FetchInput(config, &results)
   if err != nil {
       return err
   }

func UpdateBatch

func UpdateBatch(conf *Configuration, result interface{}) (chan interface{}, error)

UpdateBatch returns a channel of incoming objects (mongodb documents) whose state will be updated to procesed using strategy defined.

Types

type Configuration

type Configuration struct {
	Host              string
	Port              uint
	Database          string
	Collection        string
	StateFld          string
	ProcessingTimeFld string
	ProcessedTimeFld  string
	RevertedTimeFld   string
	FetchOrder        string
	FetchLimit        int
	FetchQuery        bson.M
	ProcessingState   string
	ProcessedState    string
	ErrorSleep        uint
	NoRecordSleep     uint
	VisibilityTimeout uint
	CronInterval      uint
	UpdateStrategy    UpdateStrategy
}

func NewConfiguration

func NewConfiguration(host string, port uint, db string, col string) *Configuration

NewConfiguration creates a new Configuration object with default values.

type UpdateStrategy

type UpdateStrategy struct {
	UseTimeInterval bool
	UseMinRecords   bool
	MaxInterval     uint
	MinRecords      uint
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL