ingest

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package ingest implements the gRPC streaming ingestion server and the pipeline that routes incoming ticks through micro-batching, storage writes, and the aggregation engine.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

Batcher accumulates ticks into micro-batches for efficient writes.

func NewBatcher

func NewBatcher(cfg BatcherConfig, flushFn func([]core.Tick)) *Batcher

NewBatcher creates a batcher that calls flushFn with each accumulated batch.

func (*Batcher) Add

func (b *Batcher) Add(tick core.Tick)

Add adds a tick to the current batch.

func (*Batcher) Close

func (b *Batcher) Close()

Close flushes remaining ticks and marks the batcher as closed. Sets closed before flushing so no new ticks are accepted during the unlock window inside flushLocked.

func (*Batcher) Flush

func (b *Batcher) Flush()

Flush forces a flush of the current batch.

type BatcherConfig

type BatcherConfig struct {
	MaxSize    int           // max ticks per batch before flush
	MaxLatency time.Duration // max time before flush
}

BatcherConfig controls batching behavior.

func DefaultBatcherConfig

func DefaultBatcherConfig() BatcherConfig

DefaultBatcherConfig returns sensible defaults.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline orchestrates ingestion across all configured series.

func NewPipeline

func NewPipeline(cfg PipelineConfig) (*Pipeline, error)

NewPipeline creates a pipeline for all configured series.

func (*Pipeline) Close

func (p *Pipeline) Close()

Close gracefully shuts down all series pipelines.

func (*Pipeline) GetSeriesPipeline

func (p *Pipeline) GetSeriesPipeline(name string) (*SeriesPipeline, bool)

GetSeriesPipeline returns the pipeline for a given series name.

func (*Pipeline) Ingest

func (p *Pipeline) Ingest(tick core.Tick) error

Ingest routes a tick to the appropriate series pipeline.

func (*Pipeline) SeriesIDs

func (p *Pipeline) SeriesIDs() map[string]uint16

SeriesIDs returns a map of series name → series ID.

type PipelineConfig

type PipelineConfig struct {
	Specs      []*core.SeriesSpec
	Writer     *storage.Writer
	BatcherCfg BatcherConfig

	// Hook is the primary bar side-effect handler (e.g., Kafka producer).
	// It is called synchronously from the rollup engine for each flushed bar.
	Hook agg.BarHook

	// OnBarFlushed is an optional observability callback (e.g., logging, metrics).
	// It is called from the consumeBars goroutine after the bar is written to storage.
	// Do not use this for side-effects that duplicate the Hook path (e.g., Kafka).
	OnBarFlushed func(*core.Bar)

	// Metrics is optional OTel instrumentation. When set, the pipeline records
	// BarsFlushedTotal in the consumeBars goroutine.
	Metrics *telemetry.Metrics
}

PipelineConfig holds the configuration for creating a pipeline.

type SeriesPipeline

type SeriesPipeline struct {
	Spec     *core.SeriesSpec
	SeriesID uint16
	Rollup   *agg.RollupEngine
	Batcher  *Batcher
}

SeriesPipeline holds the processing components for a single series.

type Server

type Server struct {
	pb.UnimplementedTikrServer
	// contains filtered or unexported fields
}

Server implements the gRPC Tikr ingest service.

func NewServer

func NewServer(pipeline *Pipeline, m *telemetry.Metrics) *Server

NewServer creates a new ingest gRPC server. Pass nil for metrics if OTel instrumentation is not needed.

func (*Server) IngestTicks

func (s *Server) IngestTicks(stream pb.Tikr_IngestTicksServer) error

IngestTicks handles streaming tick ingestion.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL