Documentation ¶
Overview ¶
Package rebouncer is a generic library that takes a noisy source of events and produces a calmer, fitter, and happier source.
It has a well-defined set of lifecycle events that the user hooks into to get desired functionality.
The canonical example is a file-watcher that discards events involving temp files that IDEs might create. A file-watcher will also typically want a "rebounce" feature that prevents premature firing. Rebouncer provides a generic framework that can solve these problems by allowing the user to inject three types of user-defined functions: Ingester, Reducer, and Quantizer.
Components ¶
These architectural components are involved in making Rebouncer work:
- The NiceEvent is the atomic unit. It is a user-defined type. It is whatever you need it to be for your use case.
- The Ingester produces events. When it's work is done, Rebouncer enters the Draining lifecycle state.
- The Reducer is run every time after Ingester pushes an event to the Queue. It operates on all records in the queue and modifies the queue in its totality.
- The Queue is a memory-safe slice of Events, waiting to be flushed to the consumer
- The Quantizer runs at intervals of its choosing, deciding whether or not to flush to the consumer. It and Reducer take turns locking the Queue, ensuring safety.
These mechanical components exist to enable the above:
- an incomingEvents channel of type Event
- a lifeCycle channel to keep track of lifecycle state.
- a mutex lock to enable memory-safe operations against the Queue.
Behaviour ¶
When Ingester completes, Rebouncer enters the Draining state.
You can receive events with rebouncer.Subscribe, which returns a channel.
You can trigger the Draining state with rebouncer.Interrupt.
Index ¶
Examples ¶
Constants ¶
const ( StartingUp lifeCycleState = iota Running Ingesting Reducing Quantizing Emiting Draining Drained ShuttingDown )
const DefaultBufferSize = 1024
incomingEvents will have this capacity
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Ingester ¶ added in v1.0.0
type Ingester[NICE any] func(chan<- NICE)
Ingester eats up dirty events and produces NiceEvents. It can decide to simply convert all dirty events to their clean equivalents, or drop some on the floor.
An Ingester can only push new NiceEvents to the queue. It doesn't know what's already there. Ingest is the first lifecycle event. It will be followed by Reducer When Ingester finishes its work, Rebouncer transitions to the Draining state.
type Quantizer ¶
Quantizer reads from the Queue, deciding when to emit(), and when to call itself again. Quantizer is run any time `false` is written to readyChannel. Periodicity is achieved when Quantizer itself writes to readyChannel
A value of `false` sent to readyChannel triggers another run of Quantizer. A value of `true` triggers emit()
Example ¶
package main import ( "fmt" "time" ) type fsEvent struct { File string Operation string TransactionId uint64 } func main() { // type Quantizer[NICE any] func([]NICE) bool quantFunc := func(queue []fsEvent) bool { // one second between runs time.Sleep(time.Second) // return true if there is anything at all in the queue ok2flush := (len(queue) > 0) return ok2flush } fmt.Println(quantFunc([]fsEvent{})) }
Output: false
type Rebouncer ¶ added in v1.0.0
type Rebouncer[NICE any] interface { Subscribe() <-chan NICE // the channel a consumer can subsribe to Interrupt() // call this to initiate the "Draining" state // contains filtered or unexported methods }
Example ¶
// This example ingests a source of randomly shuffled cards, // excludes the jokers, // batches them into hands of 5, // and emits those hands which beat 🂷🃇🂧🃞🂣 (three sevens). // Consume a stream of cards. Reject jokers. Make piles of 5. Send them to incomingEvents ingestFunc := func(incoming chan<- PokerInfo) { //done := make(chan bool) randy := rand.NewSource(time.Now().UnixNano()) cardsChan, done := frenchDeck.StreamCards(randy) piles := make(chan easypoker.Card, 5) i := 0 for card := range cardsChan { i++ goodCard, err := easypoker.CardFromFrench(card) if err == nil { piles <- goodCard if len(piles) == 5 { fiveCards := []easypoker.Card{ <-piles, <-piles, <-piles, <-piles, <-piles, } pi := PokerInfo{ RowId: i, Cards: fiveCards, } incoming <- pi if i > 10_000 { done <- true // signal to StreamCards } } } } } // reducer. Omit any hand that doesn't beat 3 sevens reduceFunc := func(oldcards []PokerInfo) []PokerInfo { goodHands := make([]PokerInfo, 0, len(oldcards)) lowHand, _ := easypoker.HandFromString("🂷🃇🂧🃞🂣") for _, thisHand := range oldcards { if thisHand.Cards.Beats(lowHand) { thisHand.Hand = easypoker.HighestPokerHand(thisHand.Cards) goodHands = append(goodHands, thisHand) } } return goodHands } // quantize. Wait before flushing quantizeFunc := func(stuff []PokerInfo) bool { time.Sleep(time.Millisecond * 100) return (len(stuff) > 0) } // invoke rebouncer streamOfPokerHands := NewRebouncer[PokerInfo]( ingestFunc, reduceFunc, quantizeFunc, 1024, ) // subscribe to rebouncer's OutgoingEvents channel for pokerHand := range streamOfPokerHands.Subscribe() { fmt.Println(pokerHand) fmt.Println("------------------") }
Output:
type Reducer ¶
type Reducer[NICE any] func([]NICE) []NICE
Reducer modifies the Queue. It takes a slice of events, cleans them, and returns a new slice. Reducing is the 2nd lifecycle event, after Ingesting and "before" Quantizing.
Quantizer actually runs in it's own loop seperate from ingest=>reduce, but it's helpful to think of it as coming after Reduce.