buffer

package
v0.0.0-...-a76d271 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type FlushFunc

type FlushFunc func(ctx context.Context, table string, records []arrow.RecordBatch, totalRows int64) (parquetBytes int64, err error)

FlushFunc is called when a buffer needs flushing. It receives the table name and the accumulated records. Returns the number of Parquet bytes written (for calibration) or an error.

type FlushOp

type FlushOp func(records []arrow.RecordBatch, rows int64) (parquetBytes int64, err error)

FlushOp is the caller-supplied function invoked by FlushVia with the records to be flushed. It returns the actual Parquet bytes written (used for bytes-per-row calibration) or an error.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager implements a size+time hybrid buffer manager with per-table buffers.

func NewManager

func NewManager(opts ManagerOptions, flushFn FlushFunc, logger *zap.Logger) (*Manager, error)

NewManager creates a buffer manager. Returns an error if the options are invalid (e.g. disk storage selected without a Path).

func (*Manager) Add

func (m *Manager) Add(ctx context.Context, table string, rec arrow.RecordBatch) error

Add adds a record to the named table's buffer. If the post-Add size would exceed MaxSizeBytes, a synchronous flush is triggered first; if that flush fails, the record is rejected (not buffered) and the error is returned so OTel can retry the whole batch without duplicating already-buffered data.

Errors from the underlying store (e.g. disk write failures for disk-backed buffers) are also propagated.

func (*Manager) Start

func (m *Manager) Start() error

Start registers telemetry callbacks and begins the background time-based flush goroutine. Idempotent — subsequent calls return the first call's error (or nil) without re-registering or re-spawning.

func (*Manager) Stop

func (m *Manager) Stop(ctx context.Context) error

Stop cancels the background flush goroutine, then drains all buffers. Idempotent — subsequent calls return the first call's error without re-cancelling or re-draining. Telemetry callbacks remain registered until the TelemetryBuilder is shut down by its owner (typically the exporter).

type ManagerOptions

type ManagerOptions struct {
	// MaxSizeBytes is the hard cap on a single buffer's estimated size. Add
	// triggers a synchronous flush before crossing this threshold; if the
	// flush fails the Add is rejected so the buffer cannot exceed the cap.
	// Zero disables the cap (buffer grows freely; only the time-triggered
	// flush bounds it).
	MaxSizeBytes int64

	// FlushInterval is the period between time-triggered flushes.
	FlushInterval time.Duration

	// Storage selects the per-table store backend.
	Storage StorageOptions

	// Allocator is the Arrow memory allocator used by disk-backed stores
	// when reading records back from IPC streams. Optional; defaults to
	// memory.DefaultAllocator.
	Allocator memory.Allocator

	// Telemetry is the optional generated telemetry builder used to emit
	// component metrics. When nil, the Manager runs without metrics.
	Telemetry *metadata.TelemetryBuilder
}

ManagerOptions configures a Manager.

type Metrics

type Metrics struct {
	// Rows is the current total row count (active + drained-but-not-committed).
	Rows int64
	// PendingFiles, PendingBytes, OldestPendingAgeSeconds are populated only
	// for disk-backed buffers; in-memory buffers always report zeros.
	PendingFiles            int
	PendingBytes            int64
	OldestPendingAgeSeconds float64
}

Metrics is a snapshot of per-buffer telemetry counters.

type SignalBuffer

type SignalBuffer struct {
	// contains filtered or unexported fields
}

SignalBuffer accumulates Arrow records for a single table and tracks an estimated byte size derived from store row count × calibrated bytes-per-row. Storage is delegated to a recordStore (memStore by default; diskStore for persistent buffers).

Concurrency:

  • mu guards bytesPerRow and per-op store calls (Append, IsEmpty, Rows).
  • flushMu serialises drain/commit cycles. FlushVia holds flushMu for the entire drain → op → commit sequence, so at most one drain is in flight. Add can proceed concurrently with the op (it acquires only mu, not flushMu).

func NewSignalBuffer

func NewSignalBuffer(table string) *SignalBuffer

NewSignalBuffer creates a buffer for the given table name with an in-memory store.

func (*SignalBuffer) Add

func (b *SignalBuffer) Add(rec arrow.RecordBatch) error

Add appends a record to the buffer. The record is retained by the store (in-memory) or serialised to disk (disk-backed) before the call returns.

func (*SignalBuffer) EstimatedSize

func (b *SignalBuffer) EstimatedSize() int64

EstimatedSize returns the estimated total buffer size in bytes (active + drained-but-not-committed records). Computed from the store's row count times the calibrated bytes-per-row, so disk-recovered records contribute.

func (*SignalBuffer) FlushVia

func (b *SignalBuffer) FlushVia(op FlushOp) (rows int64, err error)

FlushVia drains the buffer and runs op against the drained records. On op success the records are committed (discarded from the store, refs released) and the bytes-per-row calibration is updated. On op failure or drain failure, records remain drainable for retry. Concurrent FlushVia calls on the same buffer serialise via flushMu.

Returns the row count that was passed to op (zero for an empty drain) and any drain or op error. The row count lets callers skip telemetry emission when there was nothing to do.

FlushVia is panic-safe: if op panics, drained records are still released before the panic propagates.

func (*SignalBuffer) IsEmpty

func (b *SignalBuffer) IsEmpty() bool

IsEmpty returns true if the buffer holds no records.

func (*SignalBuffer) Metrics

func (b *SignalBuffer) Metrics() Metrics

Metrics returns a snapshot of the buffer's telemetry counters. Safe to call concurrently with Add/FlushVia.

func (*SignalBuffer) Rows

func (b *SignalBuffer) Rows() int64

Rows returns the total number of buffered rows.

func (*SignalBuffer) Table

func (b *SignalBuffer) Table() string

Table returns the table name for this buffer.

type StorageOptions

type StorageOptions struct {
	// Type is the backend: StorageMemory (default) or StorageDisk.
	Type StorageType

	// Path is the root directory for disk-backed buffers. Each table gets a
	// subdirectory underneath. Required when Type is StorageDisk.
	Path string
}

StorageOptions configures the per-table store implementation.

type StorageType

type StorageType string

StorageType selects the backing store for per-table buffers.

const (
	StorageMemory StorageType = "memory"
	StorageDisk   StorageType = "disk"
)

type StoreMetrics

type StoreMetrics struct {
	// PendingFiles is the number of disk-backed pending files awaiting commit.
	PendingFiles int
	// PendingBytes is the total disk bytes across pending files.
	PendingBytes int64
	// OldestPendingAgeSeconds is the age of the oldest pending file. Zero when
	// no pending files exist or for in-memory stores.
	OldestPendingAgeSeconds float64
}

StoreMetrics captures counters that the buffer exposes for telemetry. Only the disk-backed store reports non-zero values; in-memory stores have no pending files concept.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL