rebouncer

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 10, 2023 License: MIT Imports: 1 Imported by: 1

README

Rebouncer

A Powerful Debouncer for your Conjuring Needs

Maintenance Go Report Card Go version Go Reference

Rebouncer is a package that takes a noisy source of events and produces a cleaner, fitter, happier (not drinking too much) source. It is useful in scenarios where you want debounce-like functionality, and full control over how events are consumed, filtered, queued and flushed to the consuming process.

Concepts

The NICE Event

NiceEvent is simply the type of event you pass in to Rebouncer. It exists only as a concept so we can have something to refer to:

type Rebouncer[NICE any] interface {
	Subscribe() <-chan NICE 	// the channel a consumer can subsribe to
	emit()                  	// flushes the Queue
	readQueue() []NICE      	// gets the Queue, with safety and locking
	writeQueue([]NICE)      	// sets the Queue, handling safety and locking
	ingest(Ingester[NICE])
	quantize(Quantizer[NICE])   // decides whether the flush the Queue
	reduce(Reducer[NICE], NICE) // removes unwanted NiceEvents from the Queue
	Interrupt()                 //	call this to initiate the "Draining" state
}

type myType struct {
	...
}

bufferSize = 1024 // how much buffer space do we want for incoming events?

//	myRebouncer is a Rebouncer of type myType
myRebouncer := rebouncer.NewRebouncer[myType](ingest, reduce, quantize, bufferSize)

Rebouncer has two run-loops:

Ingest ☞ Reduce

The Ingestor runs in it's own loop, pushing events to a channel in Rebouncer. Every time an event is pushed, Reducer runs. Reducer operates on the entire queue of events, filtering out unwanted events or modifying to taste. Here are the definitions of these functions. NICE is a type parameter. Internally, your custom event type is known as a "Nice Event".

type Ingester[NICE any] func(chan<- NICE)
type Reducer[NICE any] func([]NICE) []NICE
Quantize ☞ Emit

Quantizer returns true or false. True when we want to flush the queue to the consumer, and False when we don't. As soon as Quantizer is returned, it's run again. So to throttle it, do time.Sleep().

When the program enters the Draining state, it shuts down after the last Emit(). Otherwise it keeps looping.

type Quantizer[NICE any] func([]NICE) bool

Ensure that your Ingestor, Reducer, and Quantizer all operate on the same type:

//	Example

type myEvent struct {
	id int
	name string
	timestamp time.Time
}

//	ingest events
ingest := func(incoming<- myEvent) {
	for ev := range mySourceOfEvents() {
		incoming<-ev
	} 
}

//	we're not interested in any event involving .DS_Store
reduce := func(inEvents []myEvent) []myEvent {
	outEvents := []myEvent{}
	for ev := range inEvents {
		if ev.name != ".DS_Store" {
			outEvents = append(outEvents, ev)
		}
	}
	return outEvents
}

//	flush the queue every second
quantize := func(queue []myEvent) bool {
	time.Sleep(time.Second)
	if len(queue) > 0 {
		return true
	} else {
		return false
	}
}

re := rebouncer.NewRebouncer[myEvent](ingest, reduce, quantize, 1024)

for ev := range re.Subscribe() {
	fmt.Println(ev)
}

Documentation

Overview

Package rebouncer is a generic library that takes a noisy source of events and produces a calmer, fitter, and happier source.

It has a well-defined set of lifecycle events that the user hooks into to get desired functionality.

The canonical example is a file-watcher that discards events involving temp files that IDEs might create. A file-watcher will also typically want a "rebounce" feature that prevents premature firing. Rebouncer provides a generic framework that can solve these problems by allowing the user to inject three types of user-defined functions: Ingester, Reducer, and Quantizer.

Components

These architectural components are involved in making Rebouncer work:

  • The NiceEvent is the atomic unit. It is a user-defined type. It is whatever you need it to be for your use case.
  • The Ingester produces events. When it's work is done, Rebouncer enters the Draining lifecycle state.
  • The Reducer is run every time after Ingester pushes an event to the Queue. It operates on all records in the queue and modifies the queue in its totality.
  • The Queue is a memory-safe slice of Events, waiting to be flushed to the consumer
  • The Quantizer runs at intervals of its choosing, deciding whether or not to flush to the consumer. It and Reducer take turns locking the Queue, ensuring safety.

These mechanical components exist to enable the above:

  • an incomingEvents channel of type Event
  • a lifeCycle channel to keep track of lifecycle state.
  • a mutex lock to enable memory-safe operations against the Queue.

Behaviour

When Ingester completes, Rebouncer enters the Draining state.

You can receive events with rebouncer.Subscribe, which returns a channel.

You can trigger the Draining state with rebouncer.Interrupt.

Index

Examples

Constants

View Source
const (
	StartingUp lifeCycleState = iota
	Running
	Ingesting
	Reducing
	Quantizing
	Emiting
	Draining
	Drained
	ShuttingDown
)
View Source
const DefaultBufferSize = 1024

incomingEvents will have this capacity

Variables

This section is empty.

Functions

This section is empty.

Types

type Ingester added in v1.0.0

type Ingester[NICE any] func(chan<- NICE)

Ingester eats up dirty events and produces NiceEvents. It can decide to simply convert all dirty events to their clean equivalents, or drop some on the floor.

An Ingester can only push new NiceEvents to the queue. It doesn't know what's already there. Ingest is the first lifecycle event. It will be followed by Reducer When Ingester finishes its work, Rebouncer transitions to the Draining state.

type Quantizer

type Quantizer[NICE any] func([]NICE) bool

Quantizer reads from the Queue, deciding when to emit(), and when to call itself again. Quantizer is run any time `false` is written to readyChannel. Periodicity is achieved when Quantizer itself writes to readyChannel

A value of `false` sent to readyChannel triggers another run of Quantizer. A value of `true` triggers emit()

Example
package main

import (
	"fmt"
	"time"
)

type fsEvent struct {
	File          string
	Operation     string
	TransactionId uint64
}

func main() {

	// type Quantizer[NICE any] func([]NICE) bool

	quantFunc := func(queue []fsEvent) bool {

		//	one second between runs
		time.Sleep(time.Second)

		//	return true if there is anything at all in the queue
		ok2flush := (len(queue) > 0)
		return ok2flush

	}

	fmt.Println(quantFunc([]fsEvent{}))
}
Output:

false

type Queue added in v1.0.0

type Queue[NICE any] []NICE

type Rebouncer added in v1.0.0

type Rebouncer[NICE any] interface {
	Subscribe() <-chan NICE // the channel a consumer can subsribe to

	Interrupt() //	call this to initiate the "Draining" state
	// contains filtered or unexported methods
}
Example
//	This example ingests a source of randomly shuffled cards,
//	excludes the jokers,
//	batches them into hands of 5,
//	and emits those hands which beat 🂷🃇🂧🃞🂣 (three sevens).

//	Consume a stream of cards. Reject jokers. Make piles of 5. Send them to incomingEvents
ingestFunc := func(incoming chan<- PokerInfo) {

	//done := make(chan bool)
	randy := rand.NewSource(time.Now().UnixNano())
	cardsChan, done := frenchDeck.StreamCards(randy)

	piles := make(chan easypoker.Card, 5)
	i := 0
	for card := range cardsChan {
		i++
		goodCard, err := easypoker.CardFromFrench(card)
		if err == nil {
			piles <- goodCard
			if len(piles) == 5 {
				fiveCards := []easypoker.Card{
					<-piles, <-piles, <-piles, <-piles, <-piles,
				}
				pi := PokerInfo{
					RowId: i,
					Cards: fiveCards,
				}
				incoming <- pi
				if i > 10_000 {
					done <- true // signal to StreamCards
				}
			}
		}
	}
}

//	reducer. Omit any hand that doesn't beat 3 sevens
reduceFunc := func(oldcards []PokerInfo) []PokerInfo {
	goodHands := make([]PokerInfo, 0, len(oldcards))
	lowHand, _ := easypoker.HandFromString("🂷🃇🂧🃞🂣")
	for _, thisHand := range oldcards {
		if thisHand.Cards.Beats(lowHand) {
			thisHand.Hand = easypoker.HighestPokerHand(thisHand.Cards)
			goodHands = append(goodHands, thisHand)
		}
	}
	return goodHands
}

//	quantize. Wait before flushing
quantizeFunc := func(stuff []PokerInfo) bool {
	time.Sleep(time.Millisecond * 100)
	return (len(stuff) > 0)
}

//	invoke rebouncer
streamOfPokerHands := NewRebouncer[PokerInfo](
	ingestFunc,
	reduceFunc,
	quantizeFunc,
	1024,
)

//	subscribe to rebouncer's OutgoingEvents channel
for pokerHand := range streamOfPokerHands.Subscribe() {
	fmt.Println(pokerHand)
	fmt.Println("------------------")
}
Output:

func NewRebouncer added in v1.0.0

func NewRebouncer[NICE any](
	ingestFunc Ingester[NICE],
	reduceFunc Reducer[NICE],
	quantizeFunc Quantizer[NICE],
	bufferSize int,
) Rebouncer[NICE]

NewRebouncer is the best way to create a new Rebouncer.

type Reducer

type Reducer[NICE any] func([]NICE) []NICE

Reducer modifies the Queue. It takes a slice of events, cleans them, and returns a new slice. Reducing is the 2nd lifecycle event, after Ingesting and "before" Quantizing.

Quantizer actually runs in it's own loop seperate from ingest=>reduce, but it's helpful to think of it as coming after Reduce.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL