

Nibbler
Nibbler is a resilient, minimal, package which helps you implement micro-batch processing.
What is Micro-batch Processing?
Micro-batch processing is a way to handle data by breaking a big task into smaller pieces and processing them one by one. This method is useful in real-time data or streaming situations, wher ,the incoming data is split into "micro-batches" and processed quickly, rather than waiting to collect all data at once.
The same concept can also be extended to handle events processing. So, we have a queue subscriber, and instead of processing the events individually, we create micro batches and process them.
The processing of a single micro batch can be triggered in two ways, based on a time ticker or if the micro batch size is full. i.e. process a non empty batch if duration X has passed or if the batch size is full
Config
type BatchProcessor[T any] func(ctx context.Context, trigger trigger, batch []T) error
type Config[T any] struct {
// ProcessingTimeout is context timeout for processing a single batch
ProcessingTimeout time.Duration
// TickerDuration is the ticker duration, for when a non empty batch would be processed
TickerDuration time.Duration
// Size is the micro batch size
Size uint
// Processor is the function which processes a single batch
Processor BatchProcessor[T]
// ResumeAfterErr if true will continue listening and keep processing if the processor returns
// an error, or if processor panics. In both cases, ProcessorErr would be executed
ResumeAfterErr bool
// ProcessorErr is executed if the processor returns erorr or panics
ProcessorErr func(failedBatch []T, err error)
}
How to use nibbler?
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/naughtygopher/nibbler"
)
type db struct {
data sync.Map
totalBalance int
}
func (d *db) BulkAddAccountsAndBalance(pp []AccStatement) error {
// assume we are doing a bulk insert/update into the database instead of inserting one by one.
// Bulk operations reduce the number of I/O required between your application and the database.
// Thereby making it better in most cases.
for _, p := range pp {
d.data.Store(p.AccountID, p.Balance)
d.totalBalance += p.Balance
}
return nil
}
type Bank struct {
db *db
}
func (bnk *Bank) ProcessAccountsBatch(
ctx context.Context,
trigger nibbler.Trigger,
batch []AccStatement,
) error {
err := bnk.db.BulkAddAccountsAndBalance(batch)
if err != nil {
return err
}
return nil
}
func (bnk *Bank) TotalBalance() int {
return bnk.db.totalBalance
}
func (bnk *Bank) TotalAccounts() int {
counter := 0
bnk.db.data.Range(func(key, value any) bool {
counter++
return true
})
return counter
}
type AccStatement struct {
AccountID string
Balance int
}
func main() {
bnk := Bank{
db: &db{
data: sync.Map{},
},
}
nib, err := nibbler.Start(&nibbler.Config[AccStatement]{
Size: 10,
TickerDuration: time.Second,
Processor: bnk.ProcessAccountsBatch,
})
if err != nil {
panic(err)
}
receiver := nib.Receiver()
for i := range 100 {
accID := fmt.Sprintf("account_id_%d", i)
receiver <- AccStatement{
AccountID: accID,
Balance: 50000 / (i + 1),
}
}
// wait for batches to be processed. Ideally this wouldn't be required as our application
// would not exit, instead just keep listening to the events stream.
time.Sleep(time.Second)
fmt.Printf(
"Number of accounts %d, total balance: %d\n",
bnk.TotalAccounts(),
bnk.TotalBalance(),
)
}
You can find all usage details in the tests.
The gopher
The gopher used here was created using Gopherize.me. Nibbler is out there eating your events/streams
one bite at a time.