fxgrpc

package
v0.0.0-...-edc4474 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2026 License: Apache-2.0 Imports: 25 Imported by: 2

README

Grpc Module

This module provides gRPC support.

This package provides 2 modules:

  • A grpc server module
  • A grpc client module

It will also install a custom codec that uses vtprotobuf optimized (un)marshaling when possible.

Server

Components

The server module lazily provides the following components:

  • A grpc.ServiceRegistrar

The module adds the following features to the server:

  • It will use CertficateReloader in case the configuration specifies TLS options.
  • It will enable the reflection service on the server.
  • All the interceptors from the unary_server_interceptor and stream_server_interceptor value groups will be installed on the server. Multiple stelling modules will lazily provide interceptors already. See the interceptors section for more details

The user needs to explicitly Invoke StartGrpcServer in their system. This allows fine grained control over the start and stop timing of components that do not share explicit dependencies.

The server can further be customized by providing grpc.ServerOptions in the grpc_server_options value group.

Configuration

The module provides the following configuration options:

  • Address: The address + port on which the grpc server will bind
  • TLS: A boolean indicating that the server must expose using TLS
  • CertFile: Path to the pem encoded server TLS certificate
  • KeyFile: Path to the pem encoded private key of the server TLS certificate
  • ClientCAFile: Path to a pem encoded CA cert bundle used to validate clients. No client validation happens if unset.

Client

Components

The module lazily provides the following components:

  • A grpc.ClientConnInterface

The module adds the following features to the client:

  • It will use CertficateReloader in case the configuration specifies TLS options.
  • All the interceptors from the unary_client_interceptor and stream_client_interceptor value groups will be installed on the client. Multiple stelling modules will lazily provide interceptors already. See the interceptors section for more details
Configuration

The module provides the following configuration options:

  • InsecureConnection: Disables TLS when connecting to the server
  • CertFile: Path to a pem encoded client TLS certificate
  • KeyFile: Path to the pem encoded private key of the client TLS certificate
  • RootCAFile: Path to a pem encoded CA bundle to validate the server certificate
  • Endpoint: The address + port (without protocol) of the grpc server

The client can further be customized by providing grpc.DialOption in the grpc_client_options value group.

ConnManager

Components

The module lazily provides the following components:

  • A *fxgrpc.ConnManager

This is a cache of *grpc.ClientConn to an endpoint. Each connection has the same features as a regular gRPC client produced by stelling (TLS management, interceptors, etc.)

The interface is very simple: manager.Get(endpoint) will return a *grpc.ClientConn to that endpoint. Since connections can be used by multiple clients, there's no reason to return them to the manager. Grpc will automatically close and recreate any underlying TCP connections depending on usage.

Configuration

The module provides the following configuration options:

  • InsecureConnection: Disables TLS when connecting to the server
  • CertFile: Path to a pem encoded client TLS certificate
  • KeyFile: Path to the pem encoded private key of the client TLS certificate
  • RootCAFile: Path to a pem encoded CA bundle to validate the server certificate

The connections can further be customized by providing grpc.DialOption in the grpc_client_options value group.

Interceptors

Because grpc interceptors form a chain, the order in which they are installed on the server or client is important.

Unfortunately, fx value groups do not make any guarantees about the order in which the elements will be provided to the component.

In order have control over the place of an interceptor in the chain, this package exposes its own Interceptor types. In addition to the gRPC interceptor they also include a Weight.

Before being installed on the gRPC server or client, the interceptors will be sorted by their Weight in ascending order. Furthermore, each package in this module that provides gRPC interceptor will contain a GrpcInterceptorWeight constant, containing the weight assigned to their interceptors. This allows you to place your own interceptors in the chain relative to these interceptors without having to hardcode any specific values.

Documentation

Overview

Package fxgrpc provides a convenient way to create well behaved grpc servers and clients.

Example
package main

import (
	"context"
	"fmt"

	sconfig "github.com/exoscale/stelling/config"
	"github.com/exoscale/stelling/fxgrpc"
	"go.uber.org/fx"
	"go.uber.org/fx/fxevent"
	"go.uber.org/zap"
	pb "google.golang.org/grpc/examples/route_guide/routeguide"
	"google.golang.org/grpc/status"
)

type Config struct {
	fxgrpc.Server
	fxgrpc.Client
}

type RouteGuideServer struct {
	pb.UnimplementedRouteGuideServer
}

func NewRouteGuideServer() pb.RouteGuideServer {
	return &RouteGuideServer{}
}

func main() {
	conf := &Config{}
	args := []string{"grpc-test", "--server.address", "localhost:8080", "--client.endpoint", "localhost:8080", "--client.insecure-connection"}
	if err := sconfig.Load(conf, args); err != nil {
		panic(err)
	}
	opts := fx.Options(
		// Suppressing fx logs to ensure deterministic output
		fx.WithLogger(func() fxevent.Logger { return fxevent.NopLogger }),
		fxgrpc.NewServerModule(conf),
		fxgrpc.NewClientModule(conf),
		fx.Provide(
			// supplying a NopLogger to make output deterministic
			// In practise you'd use fxlogging.NewModule to get a zap logger
			// and replace the fxevent logger
			zap.NewNop,
			NewRouteGuideServer,
			pb.NewRouteGuideClient,
		),
		fx.Invoke(
			pb.RegisterRouteGuideServer,
			// We explicitly need to invoke this, because ordering matters
			fxgrpc.StartGrpcServer,
			run,
		),
	)
	if err := fx.ValidateApp(opts); err != nil {
		panic(err)
	}

	fx.New(opts).Run()

}

func run(lc fx.Lifecycle, sd fx.Shutdowner, client pb.RouteGuideClient) {
	lc.Append(fx.Hook{
		OnStart: func(ctx context.Context) error {
			go func() {
				defer sd.Shutdown() //nolint:errcheck
				_, err := client.GetFeature(
					context.Background(),
					&pb.Point{},
				)
				res, ok := status.FromError(err)
				if !ok {
					panic(fmt.Sprintln("could not extract grpc status code:", err))
				}
				fmt.Println("Endpoint returned status", res.String())
			}()
			return nil
		},
	})
}
Output:
Endpoint returned status rpc error: code = Unimplemented desc = method GetFeature not implemented

Index

Examples

Constants

View Source
const Name = "proto"

Name is the name registered for the proto compressor.

Variables

This section is empty.

Functions

func GetCertReloaderConfig

func GetCertReloaderConfig(conf Config) *reloader.CertReloaderConfig

func NewClientModule

func NewClientModule(conf ClientConfig) fx.Option

NewClientModule Provides a grpc client

func NewConnManagerModule

func NewConnManagerModule(conf ConnManagerConfig) fx.Option

func NewGrpcClient

func NewGrpcClient(conf ClientConfig, logger *zap.Logger, ui []*UnaryClientInterceptor, si []*StreamClientInterceptor, handlers []stats.Handler, dOpts ...grpc.DialOption) (*grpc.ClientConn, error)

NewGrpcClient returns a grpc client connection that is configured with the same conventions as the fx module It is intended to be used for dynamically created, short lived, clients where using fx causes more troubles than benefits Because the client is assumed to be short lived, it will not reload TLS certificates

func NewGrpcServer

func NewGrpcServer(p GrpcServerParams) (*grpc.Server, error)

func NewNamedClientModule

func NewNamedClientModule(name string, conf ClientConfig) fx.Option

NewNamedClientModule Provides a named grpc client

func NewServerModule

func NewServerModule(conf Config, sOpts ...serverModuleOption) fx.Option

func SortInterceptors

func SortInterceptors[T WeightedInterceptor](list []T) []T

SortInterceptors will order the slice of given interceptors and returns the ordered slice Items will be sorted in ascending weight Any nil items will be removed It is some sugar around sort.Sort to help with the type system checks The interceptor list should never be so large that the performance of this function matters

func StartGrpcServer

func StartGrpcServer(lc fx.Lifecycle, logger *zap.Logger, s *server)

func StreamServerInterceptors

func StreamServerInterceptors(si []*StreamServerInterceptor) grpc.ServerOption

func UnaryServerInterceptors

func UnaryServerInterceptors(ui []*UnaryServerInterceptor) grpc.ServerOption

func WithServerModuleName

func WithServerModuleName(name string) serverModuleOption

WithServerModuleName will annotate the outputs with the given name

func WithStreamClientInterceptors

func WithStreamClientInterceptors(si []*StreamClientInterceptor) grpc.DialOption

func WithUnaryClientInterceptors

func WithUnaryClientInterceptors(ui []*UnaryClientInterceptor) grpc.DialOption

Types

type Client

type Client struct {
	// InsecureConnection indicates whether TLS needs to be disabled when connecting to the grpc server
	InsecureConnection bool
	// CertFile is the path to the pem encoded TLS certificate
	CertFile string `validate:"omitempty,file"`
	// KeyFile is the path to the pem encoded private key of the TLS certificate
	KeyFile string `validate:"required_with=CertFile,omitempty,file"`
	// RootCAFile is the  path to a pem encoded CA bundle used to validate server connections
	RootCAFile string `validate:"omitempty,file"`
	// Endpoint is IP or hostname or scheme for the target gRPC server
	Endpoint string `validate:"required"`
}

func (*Client) GrpcClientConfig

func (c *Client) GrpcClientConfig() *Client

func (*Client) MarshalLogObject

func (c *Client) MarshalLogObject(enc zapcore.ObjectEncoder) error

type ClientConfig

type ClientConfig interface {
	GrpcClientConfig() *Client
}

type Config

type Config interface {
	GrpcServerConfig() *Server
	AsHttpConfig() *fxhttp.Server
}

type ConnManager

type ConnManager struct {
	// contains filtered or unexported fields
}

ConnManager is a cache of grpc.ClientConn's Users of the manager should leave the lifecycle of the underlying gRPC connections entirely up to the manager

func NewConnManager

func NewConnManager(opts []grpc.DialOption) *ConnManager

func ProvideConnManager

func ProvideConnManager(p ConnManagerParams) *ConnManager

func (*ConnManager) Get

func (m *ConnManager) Get(address string) (*grpc.ClientConn, error)

func (*ConnManager) Stop

func (m *ConnManager) Stop(ctx context.Context) error

type ConnManagerConfig

type ConnManagerConfig interface {
	ConnManagerConfig() *ConnManagerOpts
}

type ConnManagerOpts

type ConnManagerOpts struct {
	// InsecureConnection indicates whether TLS needs to be disabled when connecting to the grpc server
	InsecureConnection bool
	// CertFile is the path to the pem encoded TLS certificate
	CertFile string `validate:"omitempty,file"`
	// KeyFile is the path to the pem encoded private key of the TLS certificate
	KeyFile string `validate:"required_with=CertFile,omitempty,file"`
	// RootCAFile is the  path to a pem encoded CA bundle used to validate server connections
	RootCAFile string `validate:"omitempty,file"`
}

func (*ConnManagerOpts) ConnManagerConfig

func (c *ConnManagerOpts) ConnManagerConfig() *ConnManagerOpts

type ConnManagerParams

type ConnManagerParams struct {
	fx.In

	Lc                 fx.Lifecycle
	Opts               []grpc.DialOption             `group:"grpc_client_options"`
	Reloader           *fxcert_reloader.CertReloader `optional:"true" name:"grpc_conn_manager"`
	UnaryInterceptors  []*UnaryClientInterceptor     `group:"unary_client_interceptor"`
	StreamInterceptors []*StreamClientInterceptor    `group:"stream_client_interceptor"`
}

type GrpcClientParams

type GrpcClientParams struct {
	fx.In

	Lc                 fx.Lifecycle
	Conf               ClientConfig
	Logger             *zap.Logger
	StatsHandlers      []stats.Handler            `group:"client_stats_handler"`
	UnaryInterceptors  []*UnaryClientInterceptor  `group:"unary_client_interceptor"`
	StreamInterceptors []*StreamClientInterceptor `group:"stream_client_interceptor"`
	ClientOpts         []grpc.DialOption          `group:"grpc_client_options"`
}

type GrpcServerParams

type GrpcServerParams struct {
	fx.In

	Conf               Config
	StatsHandlers      []stats.Handler            `group:"server_stats_handler"`
	UnaryInterceptors  []*UnaryServerInterceptor  `group:"unary_server_interceptor"`
	StreamInterceptors []*StreamServerInterceptor `group:"stream_server_interceptor"`
	Reloader           *reloader.CertReloader     `name:"grpc_server" optional:"true"`
	ServerOpts         []grpc.ServerOption        `group:"grpc_server_options"`
}

type Server

type Server struct {

	// A systemd socket name. Takes precedence over Address
	// In order to simplify, only systemd-activated socket with names are allowed, even if it is
	// just one socket
	SocketName string
	// Address is the address+port the server will bind to, as passed to net.Listen
	// If the Address starts with a / we will create unix domainsocket listener
	Address string `default:"localhost:8080" validate:"tcp_addr|unix_addr"`
	// TLS indicates whether the http server exposes with TLS
	TLS bool
	// CertFile is the path to the pem encoded TLS certificate
	CertFile string `validate:"required_if=TLS true,omitempty,file"`
	// KeyFile is the path to the pem encoded private key of the TLS certificate
	KeyFile string `validate:"required_if=TLS true,omitempty,file"`
	// ClientCAFile is the path to a pem encoded CA cert bundle used to validate clients
	ClientCAFile string `validate:"excluded_without=TLS,omitempty,file"`
}

func (*Server) AsHttpConfig

func (s *Server) AsHttpConfig() *fxhttp.Server

func (*Server) GrpcServerConfig

func (s *Server) GrpcServerConfig() *Server

func (*Server) MarshalLogObject

func (s *Server) MarshalLogObject(enc zapcore.ObjectEncoder) error

type StreamClientInterceptor

type StreamClientInterceptor struct {
	Weight      uint
	Interceptor grpc.StreamClientInterceptor
}

StreamClientInterceptor wraps a grpc.StreamClientInterceptor with a weight that determines its position in the interceptor chain

func (*StreamClientInterceptor) GetWeight

func (i *StreamClientInterceptor) GetWeight() uint

func (*StreamClientInterceptor) IsNil

func (i *StreamClientInterceptor) IsNil() bool

type StreamServerInterceptor

type StreamServerInterceptor struct {
	Weight      uint
	Interceptor grpc.StreamServerInterceptor
}

StreamServerInterceptor wraps a grpc.StreamServerInterceptor with a weight that determines its position in the interceptor chain

func (*StreamServerInterceptor) GetWeight

func (i *StreamServerInterceptor) GetWeight() uint

func (*StreamServerInterceptor) IsNil

func (i *StreamServerInterceptor) IsNil() bool

type UnaryClientInterceptor

type UnaryClientInterceptor struct {
	Weight      uint
	Interceptor grpc.UnaryClientInterceptor
}

UnaryClientInterceptor wraps a grpc.UnaryClientInterceptor with a weight that determines its position in the interceptor chain

func (*UnaryClientInterceptor) GetWeight

func (i *UnaryClientInterceptor) GetWeight() uint

func (*UnaryClientInterceptor) IsNil

func (i *UnaryClientInterceptor) IsNil() bool

type UnaryServerInterceptor

type UnaryServerInterceptor struct {
	Weight      uint
	Interceptor grpc.UnaryServerInterceptor
}

UnaryServerInterceptor wraps a grpc.UnaryServerInterceptor with a weight that determines its position in the interceptor chain

func (*UnaryServerInterceptor) GetWeight

func (i *UnaryServerInterceptor) GetWeight() uint

func (*UnaryServerInterceptor) IsNil

func (i *UnaryServerInterceptor) IsNil() bool

type WeightedInterceptor

type WeightedInterceptor interface {
	IsNil() bool
	GetWeight() uint
}

type WeightedInterceptors

type WeightedInterceptors []WeightedInterceptor

func (WeightedInterceptors) Len

func (w WeightedInterceptors) Len() int

func (WeightedInterceptors) Less

func (w WeightedInterceptors) Less(i, j int) bool

func (WeightedInterceptors) Swap

func (w WeightedInterceptors) Swap(i, j int)

Directories

Path Synopsis
Package health provides client-side health check capabilities for grpc servers.
Package health provides client-side health check capabilities for grpc servers.
Package reflection provides reflection capabilities for grpc servers.
Package reflection provides reflection capabilities for grpc servers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL