Documentation
¶
Overview ¶
Package grpcserver provides the gRPC server and service handlers for metalog.
It exposes four services:
- Metadata ingestion (push-based record ingestion)
- Query splits (paginated metadata queries with filtering)
- Metadata service (table listing and schema introspection)
- Admin service (runtime table registration)
The Server wraps a google.golang.org/grpc server with graceful shutdown support. Each handler adapts between protobuf request/response types and the corresponding internal service layer.
Index ¶
- type AdminHandler
- func (h *AdminHandler) DeleteKafkaSource(ctx context.Context, req *pb.DeleteKafkaSourceRequest) (*pb.DeleteKafkaSourceResponse, error)
- func (h *AdminHandler) InvalidateColumn(ctx context.Context, req *pb.InvalidateColumnRequest) (*pb.InvalidateColumnResponse, error)
- func (h *AdminHandler) RegisterKafkaSource(ctx context.Context, req *pb.RegisterKafkaSourceRequest) (*pb.RegisterKafkaSourceResponse, error)
- func (h *AdminHandler) RegisterTable(ctx context.Context, req *pb.RegisterTableRequest) (*pb.RegisterTableResponse, error)
- func (h *AdminHandler) SetColumnAlias(ctx context.Context, req *pb.SetColumnAliasRequest) (*pb.SetColumnAliasResponse, error)
- type IngestionHandler
- type MetadataHandler
- func (h *MetadataHandler) ListAggs(ctx context.Context, req *metapb.ListAggsRequest) (*metapb.ListAggsResponse, error)
- func (h *MetadataHandler) ListDimensions(ctx context.Context, req *metapb.ListDimensionsRequest) (*metapb.ListDimensionsResponse, error)
- func (h *MetadataHandler) ListSketches(ctx context.Context, req *metapb.ListSketchesRequest) (*metapb.ListSketchesResponse, error)
- func (h *MetadataHandler) ListTables(ctx context.Context, _ *metapb.ListTablesRequest) (*metapb.ListTablesResponse, error)
- type QueryHandler
- type RegistryLookup
- type Server
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AdminHandler ¶
type AdminHandler struct {
pb.UnimplementedAdminServiceServer
// contains filtered or unexported fields
}
AdminHandler implements the AdminService gRPC interface.
func NewAdminHandler ¶
func NewAdminHandler(reg *coordinator.TableRegistration, kafkaSources *metastore.KafkaSourceStore, log *zap.Logger) *AdminHandler
NewAdminHandler creates an AdminHandler.
func (*AdminHandler) DeleteKafkaSource ¶
func (h *AdminHandler) DeleteKafkaSource(ctx context.Context, req *pb.DeleteKafkaSourceRequest) (*pb.DeleteKafkaSourceResponse, error)
DeleteKafkaSource removes a Kafka source and its assignment.
func (*AdminHandler) InvalidateColumn ¶
func (h *AdminHandler) InvalidateColumn(ctx context.Context, req *pb.InvalidateColumnRequest) (*pb.InvalidateColumnResponse, error)
InvalidateColumn marks a dimension or aggregation column as INVALIDATED.
func (*AdminHandler) RegisterKafkaSource ¶
func (h *AdminHandler) RegisterKafkaSource(ctx context.Context, req *pb.RegisterKafkaSourceRequest) (*pb.RegisterKafkaSourceResponse, error)
RegisterKafkaSource registers a Kafka ingestion source for a table.
func (*AdminHandler) RegisterTable ¶
func (h *AdminHandler) RegisterTable(ctx context.Context, req *pb.RegisterTableRequest) (*pb.RegisterTableResponse, error)
RegisterTable handles runtime table registration requests.
func (*AdminHandler) SetColumnAlias ¶
func (h *AdminHandler) SetColumnAlias(ctx context.Context, req *pb.SetColumnAliasRequest) (*pb.SetColumnAliasResponse, error)
SetColumnAlias sets or clears the alias for a dimension or aggregation column.
type IngestionHandler ¶
type IngestionHandler struct {
pb.UnimplementedMetadataIngestionServiceServer
// contains filtered or unexported fields
}
IngestionHandler implements the MetadataIngestionService gRPC interface.
func NewIngestionHandler ¶
func NewIngestionHandler(svc *ingestion.Service, log *zap.Logger) *IngestionHandler
NewIngestionHandler creates an IngestionHandler.
func (*IngestionHandler) Ingest ¶
func (h *IngestionHandler) Ingest(ctx context.Context, req *pb.IngestRequest) (*pb.IngestResponse, error)
Ingest handles a single metadata record ingestion request. Backpressure and transport errors are returned as gRPC status codes:
- RESOURCE_EXHAUSTED: ingestion channel full (client should retry with backoff)
- DEADLINE_EXCEEDED: request context deadline passed
- CANCELLED: request context was cancelled
- INVALID_ARGUMENT: validation failure (missing fields, invalid state)
Application-level errors (internal processing) are returned via IngestResponse.
type MetadataHandler ¶
type MetadataHandler struct {
metapb.UnimplementedMetadataServiceServer
// contains filtered or unexported fields
}
MetadataHandler implements the MetadataService gRPC interface.
func NewMetadataHandler ¶
func NewMetadataHandler(querier *metastore.MetadataReader, log *zap.Logger) *MetadataHandler
NewMetadataHandler creates a MetadataHandler.
func (*MetadataHandler) ListAggs ¶
func (h *MetadataHandler) ListAggs(ctx context.Context, req *metapb.ListAggsRequest) (*metapb.ListAggsResponse, error)
ListAggs returns aggregation metadata for a table.
func (*MetadataHandler) ListDimensions ¶
func (h *MetadataHandler) ListDimensions(ctx context.Context, req *metapb.ListDimensionsRequest) (*metapb.ListDimensionsResponse, error)
ListDimensions returns dimension metadata for a table.
func (*MetadataHandler) ListSketches ¶
func (h *MetadataHandler) ListSketches(ctx context.Context, req *metapb.ListSketchesRequest) (*metapb.ListSketchesResponse, error)
ListSketches returns sketch metadata for a table.
func (*MetadataHandler) ListTables ¶
func (h *MetadataHandler) ListTables(ctx context.Context, _ *metapb.ListTablesRequest) (*metapb.ListTablesResponse, error)
ListTables returns all registered table names.
type QueryHandler ¶
type QueryHandler struct {
pb.UnimplementedSplitQueryServiceServer
// contains filtered or unexported fields
}
QueryHandler implements the SplitQueryService gRPC interface.
func NewQueryHandler ¶
func NewQueryHandler(engine *query.SplitQueryEngine, lookup RegistryLookup, log *zap.Logger) *QueryHandler
NewQueryHandler creates a QueryHandler.
func (*QueryHandler) SetMeter ¶
func (h *QueryHandler) SetMeter(m metric.Meter)
SetMeter configures OpenTelemetry metrics. Must be called before serving.
func (*QueryHandler) StreamSplits ¶
func (h *QueryHandler) StreamSplits(req *pb.StreamSplitsRequest, stream gogrpc.ServerStreamingServer[pb.StreamSplitsResponse]) error
StreamSplits handles server-streaming split queries. It fetches all matching splits across multiple internal pages using a background prefetch goroutine, streaming each result to the client as it becomes available.
type RegistryLookup ¶
type RegistryLookup func(tableName string) *schema.ColumnRegistry
RegistryLookup returns the ColumnRegistry for a table, or nil.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server wraps a gRPC server with lifecycle management.
func NewServer ¶
NewServer creates a gRPC server on the given port with the standard gRPC health checking service registered.
func (*Server) GRPCServer ¶
GRPCServer returns the underlying grpc.Server for service registration.