nibbler

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 5, 2024 License: MIT Imports: 4 Imported by: 0

README

nibbler gopher

Go Reference Go Report Card Coverage Status

Nibbler

Nibbler is a resilient, minimal, package which helps you implement micro-batch processing.

What is Micro-batch Processing?

Micro-batch processing is a way to handle data by breaking a big task into smaller pieces and processing them one by one. This method is useful in real-time data or streaming situations, wher ,the incoming data is split into "micro-batches" and processed quickly, rather than waiting to collect all data at once.

The same concept can also be extended to handle events processing. So, we have a queue subscriber, and instead of processing the events individually, we create micro batches and process them.

The processing of a single micro batch can be triggered in two ways, based on a time ticker or if the micro batch size is full. i.e. process a non empty batch if duration X has passed or if the batch size is full

Config
type BatchProcessor[T any] func(ctx context.Context, trigger trigger, batch []T) error

type Config[T any] struct {
	// ProcessingTimeout is context timeout for processing a single batch
	ProcessingTimeout time.Duration
    // TickerDuration is the ticker duration, for when a non empty batch would be processed
	TickerDuration    time.Duration
	// Size is the micro batch size
	Size uint

    // Processor is the function which processes a single batch
	Processor BatchProcessor[T]

	// ResumeAfterErr if true will continue listening and keep processing if the processor returns
	// an error, or if processor panics. In both cases, ProcessorErr would be executed
	ResumeAfterErr bool
    // ProcessorErr is executed if the processor returns erorr or panics
	ProcessorErr   func(failedBatch []T, err error)
}

How to use nibbler?

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/naughtygopher/nibbler"
)

type db struct {
	data         sync.Map
	totalBalance int
}

func (d *db) BulkAddAccountsAndBalance(pp []AccStatement) error {
	// assume we are doing a bulk insert/update into the database instead of inserting one by one.
	// Bulk operations reduce the number of I/O required between your application and the database.
	// Thereby making it better in most cases.
	for _, p := range pp {
		d.data.Store(p.AccountID, p.Balance)
		d.totalBalance += p.Balance
	}
	return nil
}

type Bank struct {
	db *db
}

func (bnk *Bank) ProcessAccountsBatch(
	ctx context.Context,
	trigger nibbler.Trigger,
	batch []AccStatement,
) error {
	err := bnk.db.BulkAddAccountsAndBalance(batch)
	if err != nil {
		return err
	}

	return nil
}

func (bnk *Bank) TotalBalance() int {
	return bnk.db.totalBalance
}

func (bnk *Bank) TotalAccounts() int {
	counter := 0
	bnk.db.data.Range(func(key, value any) bool {
		counter++
		return true
	})
	return counter
}

type AccStatement struct {
	AccountID string
	Balance   int
}

func main() {
	bnk := Bank{
		db: &db{
			data: sync.Map{},
		},
	}

	nib, err := nibbler.Start(&nibbler.Config[AccStatement]{
		Size:           10,
		TickerDuration: time.Second,
		Processor:      bnk.ProcessAccountsBatch,
	})
	if err != nil {
		panic(err)
	}

	receiver := nib.Receiver()
	for i := range 100 {
		accID := fmt.Sprintf("account_id_%d", i)
		receiver <- AccStatement{
			AccountID: accID,
			Balance:   50000 / (i + 1),
		}
	}

	// wait for batches to be processed. Ideally this wouldn't be required as our application
	// would not exit, instead just keep listening to the events stream.
	time.Sleep(time.Second)

	fmt.Printf(
		"Number of accounts %d, total balance: %d\n",
		bnk.TotalAccounts(),
		bnk.TotalBalance(),
	)
}

You can find all usage details in the tests.

The gopher

The gopher used here was created using Gopherize.me. Nibbler is out there eating your events/streams one bite at a time.

Documentation

Overview

Package nibbler provides a simple interface for micro-batch processing

Each micro batch can start processing when either of the conditions are fulfilled 1. When the ticker ticks 2. When the batch is "full"

Index

Constants

This section is empty.

Variables

View Source
var ErrValidation = errors.New("validation failed")

Functions

This section is empty.

Types

type BatchProcessor

type BatchProcessor[T any] func(ctx context.Context, trigger Trigger, batch []T) error

type Config

type Config[T any] struct {
	// ProcessingTimeout is context timeout for processing a single batch
	ProcessingTimeout time.Duration
	TickerDuration    time.Duration
	// Size is the micro batch size
	Size uint

	Processor BatchProcessor[T]
	// ResumeAfterErr if true will continue listening and keep processing if the processor returns
	// an error, or if processor panics. In both cases, ProcessorErr would be executed
	ResumeAfterErr bool
	ProcessorErr   func(failedBatch []T, err error)
}

func (*Config[T]) Sanitize

func (cfg *Config[T]) Sanitize()

func (*Config[T]) SanitizeValidate

func (cfg *Config[T]) SanitizeValidate() error

func (*Config[T]) Validate

func (cfg *Config[T]) Validate() error

type Nibbler

type Nibbler[T any] struct {
	// contains filtered or unexported fields
}

func New

func New[T any](cfg *Config[T]) (*Nibbler[T], error)

func Start

func Start[T any](cfg *Config[T]) (*Nibbler[T], error)

func (*Nibbler[T]) Listen

func (bat *Nibbler[T]) Listen()

Listen listens to the receiver channel for processing the micro batches

func (*Nibbler[T]) Receiver

func (bat *Nibbler[T]) Receiver() chan<- T

Receiver returns a write only channel for pushing items to the batch processor

type Trigger

type Trigger string
const (
	TriggerTicker Trigger = "TICKER"
	TriggerFull   Trigger = "BATCH_FULL"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL