xstreamencoding

package module
v0.150.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2026 License: Apache-2.0 Imports: 7 Imported by: 4

README

Stream Helpers

[!NOTE] These helpers are experimental and may change in future releases.

This package provides reusable helpers for stream-based unmarshaling of OpenTelemetry signals. It is designed to support efficient processing of newline-delimited streams with configurable batching and flushing behavior.

Overview

When processing large data streams, it's often necessary to batch records and flush them periodically based on size or count thresholds. This package provides the building blocks to implement such stream processing logic consistently across different data formats.

Components

ScannerHelper

A helper that wraps io.Reader to scan newline-delimited records. User may forward a bufio.Reader with predefined buffers to optimize stream reading. It tracks batch metrics and signals when to flush based on configured thresholds using encoding.DecoderOption functional options. It also tracks the current byte offset read from the stream via Offset() method. Use Options() to access the configured decoder options.

Note: Not safe for concurrent use.

BatchHelper

A standalone helper for tracking batch metrics (bytes and items) and determining flush conditions. Useful when you need custom scanning logic but still want batch tracking. Use Options() to access the configured decoder options.

Note: Not safe for concurrent use.

Decoder Adapters
  • LogsDecoderAdapter - A struct that implements encoding.LogsDecoder interface by wrapping decode and offset functions
  • MetricsDecoderAdapter - A struct that implements encoding.MetricsDecoder interface by wrapping decode and offset functions

Use NewLogsDecoderAdapter and NewMetricsDecoderAdapter to create instances.

Usage

Flush batch by Item Count
// Flush after every 100 items
helper, err := xstreamencoding.NewScannerHelper(reader,
    encoding.WithFlushItems(100),
)

if err != nil {
    return nil, err
}

for {
    line, flush, err := helper.ScanString()
    if err == io.EOF {
        break
    }
    if err != nil {
        return err
    }

    batch = append(batch, line)

    if flush {
        sendBatch(batch)
        batch = batch[:0] // Reset batch
    }
}
Flush by Byte Size
// Flush after accumulating ~1MB of data
helper, err := xstreamencoding.NewScannerHelper(reader,
    encoding.WithFlushBytes(1024 * 1024),
)
Combined Flush Conditions
// Flush after 1000 items OR 1MB, whichever comes first
helper, err := xstreamencoding.NewScannerHelper(reader,
    encoding.WithFlushItems(1000),
    encoding.WithFlushBytes(1024 * 1024),
)
Using BatchHelper Standalone

For custom scanning logic where you need full control over reading records, you can use BatchHelper directly:

batchHelper := xstreamencoding.NewBatchHelper(
    encoding.WithFlushItems(100),
    encoding.WithFlushBytes(1024 * 1024),
)

scanner := bufio.NewScanner(reader)
for scanner.Scan() {
    record := scanner.Bytes()

    // Track each record
    batchHelper.IncrementItems(1)
    batchHelper.IncrementBytes(int64(len(record)))

    // Check if batch should be flushed
    if batchHelper.ShouldFlush() {
        flushBatch()
        batchHelper.Reset()
    }
}
Using Decoder Adapters

To implement encoding.LogsDecoder or encoding.MetricsDecoder, you may use the decoder adapter structs:

func (c *myCodec) NewLogsDecoder(reader io.Reader, options ...encoding.DecoderOption) (encoding.LogsDecoder, error) {
    scanner, err := xstreamencoding.NewScannerHelper(reader, options...)
    if err != nil {
        return nil, err
    }
	
    logs := plog.NewLogs()

    decodeFunc := func() (plog.Logs, error) {
        for {
            line, flush, err := scanner.ScanBytes()

            if len(line) > 0 {
                // Parse line and add to logs
                lr := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
                lr.Body().SetStr(string(line))
            }

            if flush || err == io.EOF {
                result := logs
                logs = plog.NewLogs() // reset for next batch
                if err == io.EOF {
                    return result, io.EOF
                }
                return result, nil
            }

            if err != nil {
                return plog.Logs{}, err
            }
        }
    }

    offsetFunc := func() int64 {
        return scanner.Offset()
    }

    return xstreamencoding.NewLogsDecoderAdapter(decodeFunc, offsetFunc), nil
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewLogsUnmarshalerDecoderFactory added in v0.148.0

func NewLogsUnmarshalerDecoderFactory(unmarshaler plog.Unmarshaler) encoding.LogsDecoderFactory

NewLogsUnmarshalerDecoderFactory returns an encoding.LogsDecoderFactory that reads the full stream into memory and delegates to the provided plog.Unmarshaler.

func NewMetricsUnmarshalerDecoderFactory added in v0.148.0

func NewMetricsUnmarshalerDecoderFactory(unmarshaler pmetric.Unmarshaler) encoding.MetricsDecoderFactory

NewMetricsUnmarshalerDecoderFactory returns an encoding.MetricsDecoderFactory that reads the full stream into memory and delegates to the provided pmetric.Unmarshaler.

Types

type BatchHelper

type BatchHelper struct {
	// contains filtered or unexported fields
}

BatchHelper is a helper to determine when to flush based on configured options. It tracks the current byte and item counts and compares them against configured thresholds. Not safe for concurrent use.

func NewBatchHelper

func NewBatchHelper(opts ...encoding.DecoderOption) *BatchHelper

NewBatchHelper creates a new BatchHelper with the provided options.

func (*BatchHelper) IncrementBytes

func (sh *BatchHelper) IncrementBytes(n int64)

IncrementBytes adds n to the current byte count.

func (*BatchHelper) IncrementItems

func (sh *BatchHelper) IncrementItems(n int64)

IncrementItems adds n to the current item count.

func (*BatchHelper) Options

func (sh *BatchHelper) Options() encoding.DecoderOptions

Options returns the DecoderOptions used by the BatchHelper.

func (*BatchHelper) Reset

func (sh *BatchHelper) Reset()

Reset resets the current byte and item counts to zero. Should be called after flushing a batch to start tracking the next batch.

func (*BatchHelper) ShouldFlush

func (sh *BatchHelper) ShouldFlush() bool

ShouldFlush returns true if the current counts exceed configured thresholds. Make sure to call Reset after flushing to start tracking the next batch.

type LogsDecoderAdapter

type LogsDecoderAdapter struct {
	// contains filtered or unexported fields
}

LogsDecoderAdapter adapts decode and offset functions to implement encoding.LogsDecoder.

func NewLogsDecoderAdapter

func NewLogsDecoderAdapter(decode func() (plog.Logs, error), offset func() int64) LogsDecoderAdapter

NewLogsDecoderAdapter creates a new LogsDecoderAdapter with the provided decode and offset functions.

func (LogsDecoderAdapter) DecodeLogs

func (a LogsDecoderAdapter) DecodeLogs() (plog.Logs, error)

func (LogsDecoderAdapter) Offset

func (a LogsDecoderAdapter) Offset() int64

type MetricsDecoderAdapter

type MetricsDecoderAdapter struct {
	// contains filtered or unexported fields
}

MetricsDecoderAdapter adapts decode and offset functions to implement encoding.MetricsDecoder.

func NewMetricsDecoderAdapter

func NewMetricsDecoderAdapter(decode func() (pmetric.Metrics, error), offset func() int64) MetricsDecoderAdapter

NewMetricsDecoderAdapter creates a new MetricsDecoderAdapter with the provided decode and offset functions.

func (MetricsDecoderAdapter) DecodeMetrics

func (a MetricsDecoderAdapter) DecodeMetrics() (pmetric.Metrics, error)

func (MetricsDecoderAdapter) Offset

func (a MetricsDecoderAdapter) Offset() int64

type ScannerHelper

type ScannerHelper struct {
	// contains filtered or unexported fields
}

ScannerHelper is a helper to scan new line delimited records from io.Reader and determine when to flush. It uses new line delimiters and bytes for batching. Not safe for concurrent use.

func NewScannerHelper

func NewScannerHelper(reader io.Reader, opts ...encoding.DecoderOption) (*ScannerHelper, error)

NewScannerHelper creates a new ScannerHelper that reads from the provided io.Reader. It accepts optional encoding.DecoderOption to configure batch flushing behavior. If a bufio.Reader is provided, it will be used as-is. Otherwise, one will be derived with default buffer size.

func (*ScannerHelper) Offset

func (h *ScannerHelper) Offset() int64

Offset returns the current byte offset read from the stream.

func (*ScannerHelper) Options

func (h *ScannerHelper) Options() encoding.DecoderOptions

Options returns the DecoderOptions used by the ScannerHelper's BatchHelper.

func (*ScannerHelper) ScanBytes

func (h *ScannerHelper) ScanBytes() (bytes []byte, flush bool, err error)

ScanBytes scans the next line from the stream and returns it as a byte slice. This excludes new line delimiter. flush indicates whether the batch should be flushed after processing these bytes. err is non-nil if an error occurred during scanning. If the end of the stream is reached, err will be io.EOF.

func (*ScannerHelper) ScanString

func (h *ScannerHelper) ScanString() (line string, flush bool, err error)

ScanString scans the next line from the stream and returns it as a string. This excludes new line delimiter. flush indicates whether the batch should be flushed after processing this string. err is non-nil if an error occurred during scanning. If the end of the stream is reached, err will be io.EOF.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL