tempo

package module
v0.0.0-...-f4d2665 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2026 License: MIT Imports: 4 Imported by: 2

README

tempo

Go Reference Go Report Card

Tempo is a thin in-process []byte batcher for high-frequency payloads. It collects incoming data and emits it in batches instead of processing each payload one at a time.

Think of Tempo as a waiting room for payloads: instead of sending every event the instant it arrives, it holds bytes until enough are ready or the next batch is scheduled.

This "batch with timeout" approach works best when events arrive quickly and processing each one individually has high fixed overhead, like opening a connection, making a system call, writing to a database, or calling an API.

What makes it a "thin" buffer is its architecture: its batching flow is driven by a single dispatcher loop and Go channels, with no heavy mutex locking or complex internal state machines. That helps avoid introducing heavier tools like RabbitMQ or Kafka if all that's needed is local in-process buffering and batching.

Tempo is byte-oriented:

  • Enqueue accepts []byte
  • batches are emitted as [][]byte
  • MaxBufferedBytes bounds payload bytes owned by Tempo
  • Interval bounds latency for partial batches
  • MaxBatchBytes is an optional shaping lever for work per dispatch

If a single payload is larger than MaxBatchBytes but still fits within MaxBufferedBytes, Tempo accepts it and flushes it as a one-item batch. Only payloads that exceed MaxBufferedBytes are rejected.

Admission is based on both total fit and current free space:

  • if a payload is larger than MaxBufferedBytes, it is rejected with ErrPayloadTooLarge
  • if a payload would fit within MaxBufferedBytes in principle, but there is not enough pending space available right now, it is rejected with ErrQueueFull

Use it for:

  • Analytics and telemetry ingestion - Batch clicks, heartbeats, and errors before inserting them into a database or warehouse.
  • External API batching - Group payloads into fewer requests to reduce overhead and help avoid rate limits.
  • Agent and LLM event collection - Batch streamed thoughts, traces, and tool calls instead of reacting to every intermediate update in real time.

3s demo

Install

go get github.com/ef2k/tempo

Documentation

pkg.go.dev/github.com/ef2k/tempo

Configuration

d, err := tempo.NewDispatcher(&tempo.Config{
    Interval:        30 * time.Second,
    MaxBatchBytes:   10 * tempo.MiB,
    MaxBufferedBytes: 500 * tempo.MiB,
})

This means:

  • flush whatever is buffered every 30 seconds
  • prefer batches up to 10 MiB
  • never let Tempo own more than 500 MiB of payload data

If you do not need batch-size shaping, you can leave MaxBatchBytes unset and let Tempo flush by Interval while MaxBufferedBytes remains the hard safety boundary.

Sample Usage

See examples/ for working code.

Contribute

Improvements, fixes, and feedback are welcome.

Documentation

Index

Constants

View Source
const (
	KiB int64 = 1024
	MiB       = 1024 * KiB
	GiB       = 1024 * MiB
)

Variables

View Source
var (
	ErrStopped          = errors.New("tempo: dispatcher stopped")
	ErrShuttingDown     = errors.New("tempo: dispatcher shutting down")
	ErrQueueFull        = errors.New("tempo: dispatcher queue full")
	ErrNilConfig        = errors.New("tempo: nil config")
	ErrBadInterval      = errors.New("tempo: interval must be greater than zero")
	ErrBadMaxBatchBytes = errors.New("tempo: max batch bytes must not be negative")
	ErrBadMaxBuffered   = errors.New("tempo: max buffered bytes must be greater than zero")
	ErrPayloadTooLarge  = errors.New("tempo: payload exceeds configured buffered byte limit")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	Interval         time.Duration
	MaxBatchBytes    int64 // optional
	MaxBufferedBytes int64
}

Config configures interval-based flushing and byte-oriented queue limits.

type Dispatcher

type Dispatcher struct {
	Q                chan []byte
	Batch            chan [][]byte
	Interval         time.Duration
	MaxBatchBytes    int64
	MaxBufferedBytes int64
	// contains filtered or unexported fields
}

Dispatcher coordinates dispatching of queued payloads by time interval or, when configured, when the preferred batch byte target is met.

func NewDispatcher

func NewDispatcher(c *Config) (*Dispatcher, error)

NewDispatcher returns an initialized instance of Dispatcher.

func (*Dispatcher) Batches

func (d *Dispatcher) Batches() <-chan [][]byte

Batches exposes the batch output stream as a read-only channel.

func (*Dispatcher) Enqueue

func (d *Dispatcher) Enqueue(v []byte) error

Enqueue submits a payload while the dispatcher is running.

func (*Dispatcher) Shutdown

func (d *Dispatcher) Shutdown(ctx context.Context) error

Shutdown gracefully drains buffered items or returns when the context expires.

func (*Dispatcher) Start

func (d *Dispatcher) Start()

Start begins payload dispatching.

func (*Dispatcher) Stop

func (d *Dispatcher) Stop()

Stop stops the internal dispatch scheduler.

Directories

Path Synopsis
examples
cmd/tempo-tune command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL