Documentation
¶
Index ¶
- type FlushFunc
- type FlushOp
- type Manager
- type ManagerOptions
- type Metrics
- type SignalBuffer
- func (b *SignalBuffer) Add(rec arrow.RecordBatch) error
- func (b *SignalBuffer) EstimatedSize() int64
- func (b *SignalBuffer) FlushVia(op FlushOp) (rows int64, err error)
- func (b *SignalBuffer) IsEmpty() bool
- func (b *SignalBuffer) Metrics() Metrics
- func (b *SignalBuffer) Rows() int64
- func (b *SignalBuffer) Table() string
- type StorageOptions
- type StorageType
- type StoreMetrics
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type FlushFunc ¶
type FlushFunc func(ctx context.Context, table string, records []arrow.RecordBatch, totalRows int64) (parquetBytes int64, err error)
FlushFunc is called when a buffer needs flushing. It receives the table name and the accumulated records. Returns the number of Parquet bytes written (for calibration) or an error.
type FlushOp ¶
type FlushOp func(records []arrow.RecordBatch, rows int64) (parquetBytes int64, err error)
FlushOp is the caller-supplied function invoked by FlushVia with the records to be flushed. It returns the actual Parquet bytes written (used for bytes-per-row calibration) or an error.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager implements a size+time hybrid buffer manager with per-table buffers.
func NewManager ¶
NewManager creates a buffer manager. Returns an error if the options are invalid (e.g. disk storage selected without a Path).
func (*Manager) Add ¶
Add adds a record to the named table's buffer. If the post-Add size would exceed MaxSizeBytes, a synchronous flush is triggered first; if that flush fails, the record is rejected (not buffered) and the error is returned so OTel can retry the whole batch without duplicating already-buffered data.
Errors from the underlying store (e.g. disk write failures for disk-backed buffers) are also propagated.
func (*Manager) Start ¶
Start registers telemetry callbacks and begins the background time-based flush goroutine. Idempotent — subsequent calls return the first call's error (or nil) without re-registering or re-spawning.
func (*Manager) Stop ¶
Stop cancels the background flush goroutine, then drains all buffers. Idempotent — subsequent calls return the first call's error without re-cancelling or re-draining. Telemetry callbacks remain registered until the TelemetryBuilder is shut down by its owner (typically the exporter).
type ManagerOptions ¶
type ManagerOptions struct {
// MaxSizeBytes is the hard cap on a single buffer's estimated size. Add
// triggers a synchronous flush before crossing this threshold; if the
// flush fails the Add is rejected so the buffer cannot exceed the cap.
// Zero disables the cap (buffer grows freely; only the time-triggered
// flush bounds it).
MaxSizeBytes int64
// FlushInterval is the period between time-triggered flushes.
FlushInterval time.Duration
// Storage selects the per-table store backend.
Storage StorageOptions
// Allocator is the Arrow memory allocator used by disk-backed stores
// when reading records back from IPC streams. Optional; defaults to
// memory.DefaultAllocator.
Allocator memory.Allocator
// Telemetry is the optional generated telemetry builder used to emit
// component metrics. When nil, the Manager runs without metrics.
Telemetry *metadata.TelemetryBuilder
}
ManagerOptions configures a Manager.
type Metrics ¶
type Metrics struct {
// Rows is the current total row count (active + drained-but-not-committed).
Rows int64
// PendingFiles, PendingBytes, OldestPendingAgeSeconds are populated only
// for disk-backed buffers; in-memory buffers always report zeros.
PendingFiles int
PendingBytes int64
OldestPendingAgeSeconds float64
}
Metrics is a snapshot of per-buffer telemetry counters.
type SignalBuffer ¶
type SignalBuffer struct {
// contains filtered or unexported fields
}
SignalBuffer accumulates Arrow records for a single table and tracks an estimated byte size derived from store row count × calibrated bytes-per-row. Storage is delegated to a recordStore (memStore by default; diskStore for persistent buffers).
Concurrency:
- mu guards bytesPerRow and per-op store calls (Append, IsEmpty, Rows).
- flushMu serialises drain/commit cycles. FlushVia holds flushMu for the entire drain → op → commit sequence, so at most one drain is in flight. Add can proceed concurrently with the op (it acquires only mu, not flushMu).
func NewSignalBuffer ¶
func NewSignalBuffer(table string) *SignalBuffer
NewSignalBuffer creates a buffer for the given table name with an in-memory store.
func (*SignalBuffer) Add ¶
func (b *SignalBuffer) Add(rec arrow.RecordBatch) error
Add appends a record to the buffer. The record is retained by the store (in-memory) or serialised to disk (disk-backed) before the call returns.
func (*SignalBuffer) EstimatedSize ¶
func (b *SignalBuffer) EstimatedSize() int64
EstimatedSize returns the estimated total buffer size in bytes (active + drained-but-not-committed records). Computed from the store's row count times the calibrated bytes-per-row, so disk-recovered records contribute.
func (*SignalBuffer) FlushVia ¶
func (b *SignalBuffer) FlushVia(op FlushOp) (rows int64, err error)
FlushVia drains the buffer and runs op against the drained records. On op success the records are committed (discarded from the store, refs released) and the bytes-per-row calibration is updated. On op failure or drain failure, records remain drainable for retry. Concurrent FlushVia calls on the same buffer serialise via flushMu.
Returns the row count that was passed to op (zero for an empty drain) and any drain or op error. The row count lets callers skip telemetry emission when there was nothing to do.
FlushVia is panic-safe: if op panics, drained records are still released before the panic propagates.
func (*SignalBuffer) IsEmpty ¶
func (b *SignalBuffer) IsEmpty() bool
IsEmpty returns true if the buffer holds no records.
func (*SignalBuffer) Metrics ¶
func (b *SignalBuffer) Metrics() Metrics
Metrics returns a snapshot of the buffer's telemetry counters. Safe to call concurrently with Add/FlushVia.
func (*SignalBuffer) Rows ¶
func (b *SignalBuffer) Rows() int64
Rows returns the total number of buffered rows.
func (*SignalBuffer) Table ¶
func (b *SignalBuffer) Table() string
Table returns the table name for this buffer.
type StorageOptions ¶
type StorageOptions struct {
// Type is the backend: StorageMemory (default) or StorageDisk.
Type StorageType
// Path is the root directory for disk-backed buffers. Each table gets a
// subdirectory underneath. Required when Type is StorageDisk.
Path string
}
StorageOptions configures the per-table store implementation.
type StorageType ¶
type StorageType string
StorageType selects the backing store for per-table buffers.
const ( StorageMemory StorageType = "memory" StorageDisk StorageType = "disk" )
type StoreMetrics ¶
type StoreMetrics struct {
// PendingFiles is the number of disk-backed pending files awaiting commit.
PendingFiles int
// PendingBytes is the total disk bytes across pending files.
PendingBytes int64
// OldestPendingAgeSeconds is the age of the oldest pending file. Zero when
// no pending files exist or for in-memory stores.
OldestPendingAgeSeconds float64
}
StoreMetrics captures counters that the buffer exposes for telemetry. Only the disk-backed store reports non-zero values; in-memory stores have no pending files concept.