serverstream

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package serverstream provides an execution pipeline for gRPC server-streaming RPCs.

It wraps streaming handlers with admission control, panic recovery, supervision, metrics, and graceful shutdown — matching the unary pipeline but adapted for server-side streaming where the handler produces multiple responses via a stream.

Quick Start

engine := serverstream.New(
    serverstream.WithAdmissionLimit(100),
)
stream := serverstream.NewStream(ctx, grpcStream)
err := engine.Run(ctx, stream, core.AdmissionSoftAllow, handler)

Features

  • Admission control with configurable concurrency limits
  • Automatic panic recovery per stream
  • Supervisor retry policies
  • Prometheus and OpenTelemetry metrics
  • Graceful shutdown coordination

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func StreamServerInterceptor

func StreamServerInterceptor(
	engine *Engine,
	cfg *Config,
) grpc.StreamServerInterceptor

StreamServerInterceptor adapts a gRPC server-stream handler into the Grip server-stream execution pipeline.

It does not run user code directly. It only wires gRPC to Engine.

Example

===========================================================

Example: gRPC StreamServerInterceptor

===========================================================

Most users integrate Grip via the gRPC interceptor. The engine handles admission, lifecycle, and safety.

cfg := buildConfig(
	WithAdmissionLimiter(admission.NewLimiter(100)),
)

engine := NewEngine(cfg)

grpcServer := grpc.NewServer(
	grpc.StreamInterceptor(
		StreamServerInterceptor(engine, cfg),
	),
)

_ = grpcServer

Types

type Config

type Config struct {
	AdmissionLimiter *admission.Limiter
	AdmissionMode    core.AdmissionMode
	Shutdown         core.ShutdownCoordinator
	SupervisorPolicy supervisor.SupervisorPolicy
	MetricsSink      metrics.Sink
	Name             string
}

Config controls server-stream execution behavior. It mirrors unary configuration where possible.

func BuildConfig

func BuildConfig(opts ...Option) *Config

BuildConfig creates a validated Config from functional options. It starts from sensible defaults and applies options in order.

func (*Config) String

func (c *Config) String() string

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine wires admission, execution and supervision for server-side streaming handlers.

It mirrors the unary pipeline, except the handler produces multiple responses via a stream.

Example (AdmissionModes)

===========================================================

Example: Admission modes

===========================================================

Admission mode is chosen per execution. This allows different endpoints to use different semantics.

cfg := buildConfig(
	WithAdmissionLimiter(admission.NewLimiter(0)),
)

engine := NewEngine(cfg)

stream := NewStream(
	context.Background(),
	&fakeServerStream{},
)

handler := func(ctx context.Context, s *Stream) error {
	// Under soft-allow, handler may still execute
	// even if capacity is exhausted.
	return s.Send("hello")
}

_ = engine.Run(
	context.Background(),
	stream,
	core.AdmissionSoftAllow,
	handler,
)
Example (BasicStream)

===========================================================

Example: Basic server-stream execution

===========================================================

This example shows the simplest usage of the server-stream engine. A handler sends a few messages and exits normally.

cfg := buildConfig(
	WithAdmissionLimiter(admission.NewLimiter(10)),
)

engine := NewEngine(cfg)

stream := NewStream(
	context.Background(),
	&fakeServerStream{},
)

handler := func(ctx context.Context, s *Stream) error {
	for i := 0; i < 3; i++ {
		if err := s.Send(i); err != nil {
			return err
		}
	}
	return nil
}

_ = engine.Run(
	context.Background(),
	stream,
	core.AdmissionSoftAllow,
	handler,
)
Example (GracefulShutdown)

===========================================================

Example: Graceful shutdown

===========================================================

Once shutdown begins: - New executions observe shutdown - In-flight handlers receive cancellation - Streams close safely

shutdown := lifecycle.NewShutdown()

cfg := buildConfig(
	WithShutdown(shutdown),
)

engine := NewEngine(cfg)

stream := NewStream(
	context.Background(),
	&fakeServerStream{},
)

handler := func(ctx context.Context, s *Stream) error {
	for {
		select {
		case <-ctx.Done():
			log.Println("stream canceled")
			return ctx.Err()
		default:
			_ = s.Send("tick")
			time.Sleep(10 * time.Millisecond)
		}
	}
}

go func() {
	time.Sleep(50 * time.Millisecond)
	shutdown.BeginShutdown()
}()

_ = engine.Run(
	context.Background(),
	stream,
	core.AdmissionSoftAllow,
	handler,
)
Example (PanicSafety)

===========================================================

Example: Panic safety

===========================================================

Panics inside handlers are always recovered. They never escape the engine.

cfg := buildConfig()

engine := NewEngine(cfg)

stream := NewStream(
	context.Background(),
	&fakeServerStream{},
)

handler := func(ctx context.Context, s *Stream) error {
	panic("unexpected bug")
}

_ = engine.Run(
	context.Background(),
	stream,
	core.AdmissionSoftAllow,
	handler,
)

func New

func New(opts ...Option) *Engine

New creates a new server stream Engine with functional options. This is the recommended constructor for external consumers.

engine := serverstream.New(
    serverstream.WithSupervisorPolicy(policy),
)

func NewEngine

func NewEngine(cfg *Config) *Engine

NewEngine builds the server-stream execution pipeline.

func (*Engine) Run

func (e *Engine) Run(
	ctx context.Context,
	stream *Stream,
	mode core.AdmissionMode,
	handler func(context.Context, *Stream) error,
) error

Run executes a server-stream handler under supervision.

AdmissionMode is per-execution and supplied by the caller (typically the interceptor).

type Option

type Option func(*Config)

func WithAdmissionLimit

func WithAdmissionLimit(n int) Option

WithAdmissionLimit sets the maximum number of concurrent executions.

This is the recommended way to configure admission control. No internal imports required.

serverstream.New(serverstream.WithAdmissionLimit(100))  // max 100 concurrent
serverstream.New(serverstream.WithAdmissionLimit(-1))   // unlimited
serverstream.New(serverstream.WithAdmissionLimit(0))    // reject all

func WithAdmissionLimiter

func WithAdmissionLimiter(l *admission.Limiter) Option

WithAdmissionLimiter sets a custom admission limiter.

Prefer WithAdmissionLimit(n) for simple capacity configuration. Use this for advanced cases requiring a pre-built limiter:

import "github.com/abhipray-cpu/grip/admission"

serverstream.WithAdmissionLimiter(admission.NewLimiter(100))

func WithAdmissionMode

func WithAdmissionMode(m core.AdmissionMode) Option

func WithMetricsSink

func WithMetricsSink(s metrics.Sink) Option

WithMetricsSink sets a custom metrics sink.

The sink must implement the metrics.Sink interface. Use the public packages for built-in implementations:

import "github.com/abhipray-cpu/grip/metrics"
import "github.com/abhipray-cpu/grip/metrics/prommetrics"

serverstream.WithMetricsSink(metrics.Noop())
serverstream.WithMetricsSink(prommetrics.New(prommetrics.DefaultOptions()))

func WithName

func WithName(name string) Option

func WithShutdown

func WithShutdown(s core.ShutdownCoordinator) Option

func WithSupervisorPolicy

func WithSupervisorPolicy(p supervisor.SupervisorPolicy) Option

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream wraps grpc.ServerStream and provides context-aware, shutdown-safe send semantics.

func NewStream

func NewStream(ctx context.Context, ss grpc.ServerStream) *Stream

func (*Stream) Close

func (s *Stream) Close()

Close marks the stream as closed. Safe to call multiple times.

func (*Stream) Context

func (s *Stream) Context() context.Context

Context returns the execution context associated with the stream.

func (*Stream) Send

func (s *Stream) Send(msg any) error

Send sends a message to the client. It respects context cancellation and server shutdown.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL