grpcserver

package
v0.0.0-...-ae8c3b3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2026 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Overview

Package grpcserver provides the gRPC server and service handlers for metalog.

It exposes four services:

  • Metadata ingestion (push-based record ingestion)
  • Query splits (paginated metadata queries with filtering)
  • Metadata service (table listing and schema introspection)
  • Admin service (runtime table registration)

The Server wraps a google.golang.org/grpc server with graceful shutdown support. Each handler adapts between protobuf request/response types and the corresponding internal service layer.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AdminHandler

type AdminHandler struct {
	pb.UnimplementedAdminServiceServer
	// contains filtered or unexported fields
}

AdminHandler implements the AdminService gRPC interface.

func NewAdminHandler

func NewAdminHandler(reg *coordinator.TableRegistration, kafkaSources *metastore.KafkaSourceStore, log *zap.Logger) *AdminHandler

NewAdminHandler creates an AdminHandler.

func (*AdminHandler) DeleteKafkaSource

DeleteKafkaSource removes a Kafka source and its assignment.

func (*AdminHandler) InvalidateColumn

InvalidateColumn marks a dimension or aggregation column as INVALIDATED.

func (*AdminHandler) RegisterKafkaSource

RegisterKafkaSource registers a Kafka ingestion source for a table.

func (*AdminHandler) RegisterTable

RegisterTable handles runtime table registration requests.

func (*AdminHandler) SetColumnAlias

SetColumnAlias sets or clears the alias for a dimension or aggregation column.

type IngestionHandler

type IngestionHandler struct {
	pb.UnimplementedMetadataIngestionServiceServer
	// contains filtered or unexported fields
}

IngestionHandler implements the MetadataIngestionService gRPC interface.

func NewIngestionHandler

func NewIngestionHandler(svc *ingestion.Service, log *zap.Logger) *IngestionHandler

NewIngestionHandler creates an IngestionHandler.

func (*IngestionHandler) Ingest

Ingest handles a single metadata record ingestion request. Backpressure and transport errors are returned as gRPC status codes:

  • RESOURCE_EXHAUSTED: ingestion channel full (client should retry with backoff)
  • DEADLINE_EXCEEDED: request context deadline passed
  • CANCELLED: request context was cancelled
  • INVALID_ARGUMENT: validation failure (missing fields, invalid state)

Application-level errors (internal processing) are returned via IngestResponse.

type MetadataHandler

type MetadataHandler struct {
	metapb.UnimplementedMetadataServiceServer
	// contains filtered or unexported fields
}

MetadataHandler implements the MetadataService gRPC interface.

func NewMetadataHandler

func NewMetadataHandler(querier *metastore.MetadataReader, log *zap.Logger) *MetadataHandler

NewMetadataHandler creates a MetadataHandler.

func (*MetadataHandler) ListAggs

ListAggs returns aggregation metadata for a table.

func (*MetadataHandler) ListDimensions

ListDimensions returns dimension metadata for a table.

func (*MetadataHandler) ListSketches

ListSketches returns sketch metadata for a table.

func (*MetadataHandler) ListTables

ListTables returns all registered table names.

type QueryHandler

type QueryHandler struct {
	pb.UnimplementedSplitQueryServiceServer
	// contains filtered or unexported fields
}

QueryHandler implements the SplitQueryService gRPC interface.

func NewQueryHandler

func NewQueryHandler(engine *query.SplitQueryEngine, lookup RegistryLookup, log *zap.Logger) *QueryHandler

NewQueryHandler creates a QueryHandler.

func (*QueryHandler) SetMeter

func (h *QueryHandler) SetMeter(m metric.Meter)

SetMeter configures OpenTelemetry metrics. Must be called before serving.

func (*QueryHandler) StreamSplits

StreamSplits handles server-streaming split queries. It fetches all matching splits across multiple internal pages using a background prefetch goroutine, streaming each result to the client as it becomes available.

type RegistryLookup

type RegistryLookup func(tableName string) *schema.ColumnRegistry

RegistryLookup returns the ColumnRegistry for a table, or nil.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server wraps a gRPC server with lifecycle management.

func NewServer

func NewServer(port int, log *zap.Logger) *Server

NewServer creates a gRPC server on the given port with the standard gRPC health checking service registered.

func (*Server) GRPCServer

func (s *Server) GRPCServer() *grpc.Server

GRPCServer returns the underlying grpc.Server for service registration.

func (*Server) Start

func (s *Server) Start() error

Start begins listening. Blocks until the server is stopped.

func (*Server) Stop

func (s *Server) Stop()

Stop gracefully stops the gRPC server.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL