Documentation
¶
Index ¶
- Constants
- func BatchTimeout(conf *Configuration) <-chan bool
- func BufferBatch(conf *Configuration, result interface{}, bufsize int) chan interface{}
- func FetchBatch(conf *Configuration, results interface{}) error
- func UpdateBatch(conf *Configuration, result interface{}) (chan interface{}, error)
- type Configuration
- type UpdateStrategy
Constants ¶
View Source
const ( DefaultStateFld = "state" DefaultProcessingTimeFld = "processing_at" DefaultProcessedTimeFld = "processed_at" DefaultRevertedTimeFld = "reverted_at" DefaultFetchOrder = "updated_at" DefaultFetchLimit = 10 DefaultProcessingState = "processing" DefaultProcessedState = "processed" DefaultErrorSleep = 1000 //Millisecond (1000 = 1 second) DefaultNoRecordSleep = 5000 //Millisecond (5000 = 5 seconds) // update configs DefaultMaxInterval = 10000 //Millisecond (10000 = 10 seconds) DefaultMinRecords = 30 //reattempt times DefaultVisibilityTimeout = 3600 //seconds (3600 = 1 hour) DefaultCronInterval = 600 //seconds (600 = 10 mins) )
Variables ¶
This section is empty.
Functions ¶
func BatchTimeout ¶
func BatchTimeout(conf *Configuration) <-chan bool
It returns a close channel that can be used to cancel the process.
func BufferBatch ¶
func BufferBatch(conf *Configuration, result interface{}, bufsize int) chan interface{}
BufferBatch returns a buffered channel of length bufsize that will stream fetched objects capped at bufsize. This is a convenience method for FetchBatch.
func FetchBatch ¶
func FetchBatch(conf *Configuration, results interface{}) error
For instance:
var results []struct{ Value int }
config := NewConfiguration("localhost", 27017, "salaries", "batch")
err := FetchInput(config, &results)
if err != nil {
return err
}
func UpdateBatch ¶
func UpdateBatch(conf *Configuration, result interface{}) (chan interface{}, error)
UpdateBatch returns a channel of incoming objects (mongodb documents) whose state will be updated to procesed using strategy defined.
Types ¶
type Configuration ¶
type Configuration struct {
Host string
Port uint
Database string
Collection string
StateFld string
ProcessingTimeFld string
ProcessedTimeFld string
RevertedTimeFld string
FetchOrder string
FetchLimit int
FetchQuery bson.M
ProcessingState string
ProcessedState string
ErrorSleep uint
NoRecordSleep uint
VisibilityTimeout uint
CronInterval uint
UpdateStrategy UpdateStrategy
}
func NewConfiguration ¶
func NewConfiguration(host string, port uint, db string, col string) *Configuration
NewConfiguration creates a new Configuration object with default values.
Click to show internal directories.
Click to hide internal directories.