Documentation
¶
Overview ¶
Package ingest implements the gRPC streaming ingestion server and the pipeline that routes incoming ticks through micro-batching, storage writes, and the aggregation engine.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Batcher ¶
type Batcher struct {
// contains filtered or unexported fields
}
Batcher accumulates ticks into micro-batches for efficient writes.
func NewBatcher ¶
func NewBatcher(cfg BatcherConfig, flushFn func([]core.Tick)) *Batcher
NewBatcher creates a batcher that calls flushFn with each accumulated batch.
type BatcherConfig ¶
type BatcherConfig struct {
MaxSize int // max ticks per batch before flush
MaxLatency time.Duration // max time before flush
}
BatcherConfig controls batching behavior.
func DefaultBatcherConfig ¶
func DefaultBatcherConfig() BatcherConfig
DefaultBatcherConfig returns sensible defaults.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline orchestrates ingestion across all configured series.
func NewPipeline ¶
func NewPipeline(cfg PipelineConfig) (*Pipeline, error)
NewPipeline creates a pipeline for all configured series.
func (*Pipeline) Close ¶
func (p *Pipeline) Close()
Close gracefully shuts down all series pipelines.
func (*Pipeline) GetSeriesPipeline ¶
func (p *Pipeline) GetSeriesPipeline(name string) (*SeriesPipeline, bool)
GetSeriesPipeline returns the pipeline for a given series name.
type PipelineConfig ¶
type PipelineConfig struct {
Specs []*core.SeriesSpec
Writer *storage.Writer
BatcherCfg BatcherConfig
// Hook is the primary bar side-effect handler (e.g., Kafka producer).
// It is called synchronously from the rollup engine for each flushed bar.
Hook agg.BarHook
// OnBarFlushed is an optional observability callback (e.g., logging, metrics).
// It is called from the consumeBars goroutine after the bar is written to storage.
// Do not use this for side-effects that duplicate the Hook path (e.g., Kafka).
OnBarFlushed func(*core.Bar)
// Metrics is optional OTel instrumentation. When set, the pipeline records
// BarsFlushedTotal in the consumeBars goroutine.
Metrics *telemetry.Metrics
}
PipelineConfig holds the configuration for creating a pipeline.
type SeriesPipeline ¶
type SeriesPipeline struct {
Spec *core.SeriesSpec
SeriesID uint16
Rollup *agg.RollupEngine
Batcher *Batcher
}
SeriesPipeline holds the processing components for a single series.
type Server ¶
type Server struct {
pb.UnimplementedTikrServer
// contains filtered or unexported fields
}
Server implements the gRPC Tikr ingest service.
func NewServer ¶
NewServer creates a new ingest gRPC server. Pass nil for metrics if OTel instrumentation is not needed.
func (*Server) IngestTicks ¶
func (s *Server) IngestTicks(stream pb.Tikr_IngestTicksServer) error
IngestTicks handles streaming tick ingestion.