up

package
v0.0.0-...-0411a9e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2026 License: Apache-2.0 Imports: 29 Imported by: 0

README

up

Package up is the user-facing API for Transit HTTP filters. Import it from handler packages. The shared library entrypoint additionally blank-imports down/abi_impl to link the Envoy ABI exports — but that import belongs only there, not in reusable handler packages.

Registration

Register a filter from your shared library entrypoint:

package main

import (
	_ "github.com/dio/transit/down/abi_impl"
	_ "github.com/dio/transit/examples/hello"
)

func main() {}

And in your handler package:

func init() {
	up.Register("hello", Handler)
}

The registered name must match filter_name in the Envoy dynamic module filter config.

Registration forms

Use case Register with
Request headers only up.Register(name, onReq)
Request and response headers up.RegisterWithResponse(name, onReq, onResp)
Streaming request body chunks up.RegisterWithBody(name, onReq, onBody, onResp)
Buffered body reads or replacement up.RegisterWithMutableBody(name, onReq, onBody, onResp)
Per-config metrics setup up.RegisterWithConfig(name, configure, onReq, onResp)
Background goroutines tied to filter config up.RegisterWithGroup(name, group, onReq)
Access logs after the stream is complete up.RegisterAccessLogger(name, factory)
Cluster Extension modules up.RegisterCluster(name, factory)
LB Policy modules up.RegisterLBPolicy(name, factory)

Start with up.Register. Move to a larger form only when the filter needs another stream phase or lifecycle hook.

Handler shape

Request handlers always have this signature:

func(w *up.Writer, r *up.Request)

r gives you the parsed request: Method, Path, Host, FilterName, and individual headers via r.Header(name) or all headers via r.AllHeaders().

w gives you the actions available from a filter.

Writer actions

  • w.Log(level, format, args...) — log through Envoy at a given severity
  • w.SendLocalResponse(status, body, headers...) — send an immediate response and stop the filter chain
  • w.SetRequestHeader(key, value) — add or replace a request header
  • w.RemoveRequestHeader(key) — remove a request header
  • w.SetResponseHeader(key, value) — add or replace a response header
  • w.SetFilterState(key, value) — write Envoy filter state
  • w.SetUpstreamOverrideHost(host) — override the upstream host for this request
  • w.AddSpanTag(key, value) — annotate the active tracing span
  • w.IncrementCounter(id, delta) — increment an Envoy counter defined at config time
  • w.IncrementCounterLabels(id, delta, labels...) — increment a labeled Envoy counter
  • w.IncrementGauge(id, delta) — increment an Envoy gauge defined at config time
  • w.DecrementGauge(id, delta) — decrement an Envoy gauge defined at config time
  • w.SetGauge(id, value) — set an Envoy gauge to an absolute value
  • w.RecordHistogram(id, value) — record a histogram observation
  • w.RecordHistogramLabels(id, value, labels...) — record a labeled histogram observation
  • w.GetBufferedBody() — read the buffered request body (requires RegisterWithMutableBody)
  • w.SetBufferedBody(body) — replace the buffered request body
  • w.HTTPCallout(req, callback) — make an async HTTP callout to an Envoy cluster

Async HTTP callouts

Use w.HTTPCallout when the handler needs to call an upstream before deciding the response. The callout pauses the request stream. The callback receives the callout result and response body as borrowed shared.UnsafeEnvoyBuffer values; copy them with ToString() or ToBytes() before the callback returns.

For fire-and-forget work that does not need to send a local response, use w.Go plus w.Do to queue mutations and return Continue directly. See docs/async-http-callouts.md for the detailed decision guide.

Middleware

up.Chain composes HandlerFunc values with middleware:

up.Register("myfilter", up.Chain(handler, authMiddleware, loggingMiddleware))

Middleware runs left-to-right; the first entry is outermost.

Upstream HTTP filters

Filters registered with up.Register run on the listener (downstream) side by default. To run a filter on the upstream side, configure the dynamic module filter under HttpProtocolOptions.http_filters on a cluster.

Body buffering difference vs downstream

Downstream filters using WithMutableBody (buffered mode) rely on BufferedRequestBody() to read the accumulated request body. Envoy pre-fills this buffer across StopAndBuffer calls, so by the time the handler sees endOfStream=true, BufferedRequestBody() holds all data.

Upstream filter chains behave differently: Envoy does not pre-fill the buffer when endOfStream=true arrives on the first body call (i.e. when no prior StopAndBuffer has occurred — common for small single-frame requests). In that case the body is only available in the body argument passed to OnRequestBody, not in BufferedRequestBody().

The up framework handles this transparently: when WithMutableBody is used, OnRequestBody falls back to the body argument when BufferedRequestBody() returns an empty buffer, so BodyChunk.Data always contains the correct body regardless of which filter chain the filter runs in.

Log levels

up.LogTrace, up.LogDebug, up.LogInfo, up.LogWarn, up.LogError, up.LogCritical map directly to Envoy log severity levels.

Documentation

Overview

Package up provides the user-facing API for transit HTTP filter handlers. This file defines all types related to asynchronous HTTP callouts and the callback callout and goroutine callout modes:

  1. HTTPCallout / HTTPCalloutAllSettled / HTTPCalloutSequence — callback form. The filter stops the request, Envoy sends outbound HTTP requests to named clusters, and the user-supplied callback runs when the response, response group, or sequence arrives. The callback may queue mutations (SetRequestHeader, etc.) or send a local response. Transit then applies those mutations and resumes (or terminates) the stream. This is the only path that supports SendLocalResponse reliably, because the callback runs from a filter callback, not from a scheduler, and Envoy only honours SendLocalResponse from filter callbacks.

  2. Go + Do — goroutine form. The handler calls w.Go(fn); fn runs in a goroutine and may call w.Do(...) to issue callouts from that goroutine. After fn returns, Transit hops back to the Envoy worker thread via goScheduler.Schedule and applies queued mutations, then continues the request. SendLocalResponse from this path is NOT reliable — Envoy ignores it from scheduled callbacks. Use Go+Do only for work that forwards the request.

Both paths are mutex-free by design. See the calloutState comment in writer.go for the full concurrency model.

Package up — stream_object.go: Primitive A typed per-stream object handoff.

Why the drain lives in OnStreamComplete (for filters without WithOnStreamFinalized) / finalizedLogger.OnLog (for filters with it):

OnStreamComplete fires unconditionally for every stream Envoy terminates — normal end-of-stream, client disconnect, idle/request timeout, foreign local reply, stream reset. This is exactly the "teardown matrix" described in docs/orange-token-correlation-risks.md. For filters that also use WithOnStreamFinalized, the access logger's OnLog fires AFTER OnStreamComplete, so drain is deferred to OnLog so finalized cleanup runs before the SDK removes stream-scoped objects. Drain order:

user OnStreamComplete → user OnStreamFinalized → dropBag

Index

Constants

View Source
const (
	AccessLogTypeNotSet                                  = down.AccessLogTypeNotSet
	AccessLogTypeTcpUpstreamConnected                    = down.AccessLogTypeTcpUpstreamConnected
	AccessLogTypeTcpPeriodic                             = down.AccessLogTypeTcpPeriodic
	AccessLogTypeTcpConnectionEnd                        = down.AccessLogTypeTcpConnectionEnd
	AccessLogTypeDownstreamStart                         = down.AccessLogTypeDownstreamStart
	AccessLogTypeDownstreamPeriodic                      = down.AccessLogTypeDownstreamPeriodic
	AccessLogTypeDownstreamEnd                           = down.AccessLogTypeDownstreamEnd
	AccessLogTypeUpstreamPoolReady                       = down.AccessLogTypeUpstreamPoolReady
	AccessLogTypeUpstreamPeriodic                        = down.AccessLogTypeUpstreamPeriodic
	AccessLogTypeUpstreamEnd                             = down.AccessLogTypeUpstreamEnd
	AccessLogTypeDownstreamTunnelSuccessfullyEstablished = down.AccessLogTypeDownstreamTunnelSuccessfullyEstablished
	AccessLogTypeUdpTunnelUpstreamConnected              = down.AccessLogTypeUdpTunnelUpstreamConnected
	AccessLogTypeUdpPeriodic                             = down.AccessLogTypeUdpPeriodic
	AccessLogTypeUdpSessionEnd                           = down.AccessLogTypeUdpSessionEnd
)

AccessLogType constants.

View Source
const (
	HttpHeaderTypeRequest         = down.HttpHeaderTypeRequest
	HttpHeaderTypeRequestTrailer  = down.HttpHeaderTypeRequestTrailer
	HttpHeaderTypeResponse        = down.HttpHeaderTypeResponse
	HttpHeaderTypeResponseTrailer = down.HttpHeaderTypeResponseTrailer
)

HttpHeaderType constants.

View Source
const (
	HostUnhealthy = down.HostUnhealthy
	HostDegraded  = down.HostDegraded
	HostHealthy   = down.HostHealthy
)
View Source
const (
	HostStatCxConnectFail = down.HostStatCxConnectFail
	HostStatCxTotal       = down.HostStatCxTotal
	HostStatRqError       = down.HostStatRqError
	HostStatRqSuccess     = down.HostStatRqSuccess
	HostStatRqTimeout     = down.HostStatRqTimeout
	HostStatRqTotal       = down.HostStatRqTotal
	HostStatCxActive      = down.HostStatCxActive
	HostStatRqActive      = down.HostStatRqActive
)
View Source
const (
	// DefaultPollInterval is used when PollOptions.Interval is zero.
	DefaultPollInterval = 30 * time.Second
	// DefaultPollTimeout is used when PollOptions.Timeout is zero.
	DefaultPollTimeout = 5 * time.Second
)
View Source
const (
	HeaderAuthority = ":authority"
	HeaderMethod    = ":method"
	HeaderPath      = ":path"
	HeaderScheme    = ":scheme"
)

HTTP/2 pseudo-header names used by Envoy. Prefer these constants over bare string literals when calling SetRequestHeader.

View Source
const (
	ResponseFlagFailedLocalHealthCheck          = "LH"
	ResponseFlagNoHealthyUpstream               = "UH"
	ResponseFlagUpstreamRequestTimeout          = "UT"
	ResponseFlagLocalReset                      = "LR"
	ResponseFlagUpstreamRemoteReset             = "UR"
	ResponseFlagUpstreamConnectionFailure       = "UF"
	ResponseFlagUpstreamConnectionTermination   = "UC"
	ResponseFlagUpstreamOverflow                = "UO"
	ResponseFlagNoRouteFound                    = "NR"
	ResponseFlagDelayInjected                   = "DI"
	ResponseFlagFaultInjected                   = "FI"
	ResponseFlagRateLimited                     = "RL"
	ResponseFlagUnauthorizedExternalService     = "UAEX"
	ResponseFlagRateLimitServiceError           = "RLSE"
	ResponseFlagDownstreamConnectionTermination = "DC"
	ResponseFlagUpstreamRetryLimitExceeded      = "URX"
	ResponseFlagStreamIdleTimeout               = "SI"
	ResponseFlagInvalidRequestHeaders           = "IH"
	ResponseFlagDownstreamProtocolError         = "DPE"
	ResponseFlagUpstreamMaxStreamDuration       = "UMSDR"
	ResponseFlagResponseFromCacheFilter         = "RFCF"
	ResponseFlagNoFilterConfigFound             = "NFCF"
	ResponseFlagDurationTimeout                 = "DT"
	ResponseFlagUpstreamProtocolError           = "UPE"
	ResponseFlagNoClusterFound                  = "NC"
	ResponseFlagOverloadManager                 = "OM"
)

Response flag constants match Envoy's %RESPONSE_FLAGS% access log format. ResponseFlagsString converts the bitmask from AccessLoggerHandle.GetResponseFlags() to a comma-separated string using these tokens (e.g. "UF,UT").

View Source
const DefaultRefreshInterval = 30 * time.Second

DefaultRefreshInterval is the interval used when HostRefreshOptions.Interval is zero.

View Source
const DefaultRefreshTimeout = time.Second

DefaultRefreshTimeout is the per-fetch context timeout used when HostRefreshOptions.Timeout is zero.

Variables

This section is empty.

Functions

func Register

func Register(name string, h HandlerFunc, opts ...FilterOption)

Register registers a named HTTP filter handler and wires it into the Envoy SDK. Must be called from an init() function. Panics on duplicate names.

Optional features are configured via FilterOptions: WithConfig, WithResponse, WithStreamingBody, WithMutableBody, WithGroup, WithOnStreamComplete.

func RegisterAccessLogger

func RegisterAccessLogger(name string, f AccessLoggerConfigFactory)

RegisterAccessLogger registers a named access logger factory. Must be called from an init() function. Panics on duplicate names.

func RegisterCluster

func RegisterCluster(name string, f ClusterFactory)

RegisterCluster registers a named ClusterFactory. Must be called from an init() function. Panics on duplicate names.

func RegisterLBPolicy

func RegisterLBPolicy(name string, f LBPolicyFactory)

RegisterLBPolicy registers a named LBPolicyFactory. Must be called from an init() function. Panics on duplicate names.

func ResponseFlagsString

func ResponseFlagsString(mask uint64) string

ResponseFlagsString converts the GetResponseFlags() bitmask to Envoy's human-readable flag string (e.g. "UF,UT"), matching %RESPONSE_FLAGS%.

func RunRetry

func RunRetry(ctx context.Context, label string, fn func(ctx context.Context) error)

RunRetry calls fn in a loop until ctx is cancelled. When fn returns a non-nil error and the context is still active, the error is logged with label as a prefix and fn is retried immediately. When fn returns nil the loop continues regardless — fn should only return nil on a clean shutdown.

Use this inside a ClusterGroup.Go goroutine to wrap a streaming RPC receive loop so that transient stream failures retry automatically:

cg.Go(func(ctx context.Context) {
    up.RunRetry(ctx, "catalog-watch", func(ctx context.Context) error {
        stream, err := client.Watch(ctx, connect.NewRequest(&req))
        if err != nil {
            return err
        }
        for stream.Receive() {
            handle(stream.Msg())
        }
        return stream.Err()
    })
})

func StartFileWatch

func StartFileWatch[T any](p *PipelineConfig[T], path string) func()

StartFileWatch enables file system watching for the given config file path. When the file is modified, an immediate refresh is triggered. Returns a stop function that cancels watching; safe to call multiple times. If watching cannot be set up, returns a no-op function and logs the error via the config's observer.

func TruncateBody

func TruncateBody(data []byte, max int) []byte

TruncateBody returns data[:max] if len(data) > max, otherwise data unchanged.

Types

type AccessLogType

type AccessLogType = down.AccessLogType

type AccessLogger

type AccessLogger interface {
	OnLog(handle AccessLoggerHandle, logType AccessLogType)
	OnDestroy()
}

AccessLogger is the per-worker-thread logger instance.

type AccessLoggerConfigFactory

type AccessLoggerConfigFactory interface {
	Create(handle AccessLoggerConfigHandle, config []byte) (AccessLoggerFactory, error)
}

AccessLoggerConfigFactory parses the logger config and creates factories.

type AccessLoggerConfigHandle

type AccessLoggerConfigHandle interface {
	Log(level LogLevel, format string, args ...any)
	DefineCounter(name string, tagKeys ...string) (MetricID, error)
	DefineGauge(name string, tagKeys ...string) (MetricID, error)
	DefineHistogram(name string, tagKeys ...string) (MetricID, error)
}

AccessLoggerConfigHandle is passed to AccessLoggerConfigFactory.Create on the main thread. Use it to define Envoy metrics during initialization.

type AccessLoggerFactory

type AccessLoggerFactory interface {
	NewLogger() AccessLogger
	OnDestroy()
}

AccessLoggerFactory creates AccessLogger instances, one per worker thread.

type AccessLoggerHandle

type AccessLoggerHandle interface {
	GetTimingInfo() TimingInfo
	GetBytesInfo() BytesInfo
	GetResponseFlags() uint64
	GetResponseCode() uint32
	GetAttributeString(id AttributeID) (Buffer, bool)
	GetAttributeInt(id AttributeID) (int64, bool)
	GetAttributeBool(id AttributeID) (bool, bool)
	GetHeader(headerType HttpHeaderType, key string) (Buffer, bool)
	GetWorkerIndex() uint32
	GetTraceID() (Buffer, bool)
	GetSpanID() (Buffer, bool)
	IsTraceSampled() bool
	GetLocalReplyBody() (Buffer, bool)
	GetUpstreamPoolReadyDurationNs() int64
	GetUpstreamRequestAttemptCount() uint32
	Log(level LogLevel, format string, args ...any)
}

AccessLoggerHandle provides access to finalized stream state during OnLog. Values backed by Envoy memory are returned as Buffer.

type AdminServer

type AdminServer struct {
	// contains filtered or unexported fields
}

AdminServer is a local-only HTTP server for operational debug endpoints. Register handlers before passing it to WithAdminServer.

Example:

admin := up.NewAdminServer(up.AdminServerOptions{})
admin.RegisterPprof()
up.Register("my-filter", handler, up.WithAdminServer(admin))

func NewAdminServer

func NewAdminServer(opts AdminServerOptions) *AdminServer

NewAdminServer creates an AdminServer. Call RegisterPprof or Handle/HandleFunc to add endpoints, then pass to WithAdminServer.

func (*AdminServer) Handle

func (a *AdminServer) Handle(pattern string, h http.Handler)

Handle registers a handler for the given pattern.

func (*AdminServer) HandleFunc

func (a *AdminServer) HandleFunc(pattern string, fn http.HandlerFunc)

HandleFunc registers a handler function for the given pattern.

func (*AdminServer) ListenAddr

func (a *AdminServer) ListenAddr() string

ListenAddr returns the resolved listen address. Valid after Ready() closes.

func (*AdminServer) Ready

func (a *AdminServer) Ready() <-chan struct{}

Ready returns a channel closed after net.Listen succeeds.

func (*AdminServer) RegisterPprof

func (a *AdminServer) RegisterPprof()

RegisterPprof adds the standard /debug/pprof/ suite to the admin mux. Explicit registration on the admin's own mux avoids polluting http.DefaultServeMux, which in an Envoy .so plugin risks colliding with Envoy's own Go runtime.

type AdminServerOptions

type AdminServerOptions struct {
	// ListenAddr defaults to "127.0.0.1:6060".
	ListenAddr      string
	ShutdownTimeout time.Duration
}

AdminServerOptions configures the admin HTTP server.

type AsyncHostSelector

type AsyncHostSelector[T any] struct {
	// contains filtered or unexported fields
}

AsyncHostSelector[T] implements the ChooseHost + CancelHostSelection pair for body-driven async host selection.

Usage: embed or delegate from a ClusterLB implementation.

  • ChooseHost: call s.ChooseHost(handle, ctx)
  • CancelHostSelection: call s.Cancel(completion)

T is the decision type resolved by the body filter (e.g. a struct with provider name, model, error code). lookup maps a T to a HostResult.

The OnResolve callback may fire from a worker goroutine; AsyncHostSelector always marshals completion.Complete back to the main thread via handle.Schedule — the canonical example of the main-thread contract. See .agents/skills/transit-cluster-main-thread/SKILL.md.

func NewAsyncHostSelector

func NewAsyncHostSelector[T any](
	handle ClusterHandle,
	key StreamKey[*StreamPromise[T]],
	lookup func(T) HostResult,
	obs SelectorObserver,
) *AsyncHostSelector[T]

NewAsyncHostSelector creates an AsyncHostSelector that reads promises from key and resolves hosts via lookup. handle is the ClusterHandle used for Schedule. obs may be zero (all nil fields are no-ops).

func (*AsyncHostSelector[T]) Cancel

func (s *AsyncHostSelector[T]) Cancel(completion *ClusterLBCompletion)

Cancel implements the CancelHostSelection half of ClusterLB. Prevents the pending OnResolve callback from calling Complete.

func (*AsyncHostSelector[T]) ChooseHost

ChooseHost implements the ChooseHost half of ClusterLB. Returns (nil, completion) for async resolution, or (nil, nil) when no promise is found.

type AttributeID

type AttributeID uint32

AttributeID identifies a stream attribute readable via Writer.GetAttributeString etc.

Response attributes.

const (
	AttributeIDSourceAddress      AttributeID = AttributeID(shared.AttributeIDSourceAddress)
	AttributeIDSourcePort         AttributeID = AttributeID(shared.AttributeIDSourcePort)
	AttributeIDDestinationAddress AttributeID = AttributeID(shared.AttributeIDDestinationAddress)
	AttributeIDDestinationPort    AttributeID = AttributeID(shared.AttributeIDDestinationPort)
	AttributeIDConnectionId       AttributeID = AttributeID(shared.AttributeIDConnectionId)

	// MTLS / TLS — acronyms use all-caps per Go conventions (changed in SDK 1.39-dev).
	AttributeIDConnectionMTLS       AttributeID = AttributeID(shared.AttributeIDConnectionMTLS)
	AttributeIDConnectionTLSVersion AttributeID = AttributeID(shared.AttributeIDConnectionTLSVersion)

	AttributeIDConnectionRequestedServerName         AttributeID = AttributeID(shared.AttributeIDConnectionRequestedServerName)
	AttributeIDConnectionSubjectLocalCertificate     AttributeID = AttributeID(shared.AttributeIDConnectionSubjectLocalCertificate)
	AttributeIDConnectionSubjectPeerCertificate      AttributeID = AttributeID(shared.AttributeIDConnectionSubjectPeerCertificate)
	AttributeIDConnectionDNSSanLocalCertificate      AttributeID = AttributeID(shared.AttributeIDConnectionDNSSanLocalCertificate)
	AttributeIDConnectionDNSSanPeerCertificate       AttributeID = AttributeID(shared.AttributeIDConnectionDNSSanPeerCertificate)
	AttributeIDConnectionURISanLocalCertificate      AttributeID = AttributeID(shared.AttributeIDConnectionURISanLocalCertificate)
	AttributeIDConnectionURISanPeerCertificate       AttributeID = AttributeID(shared.AttributeIDConnectionURISanPeerCertificate)
	AttributeIDConnectionSha256PeerCertificateDigest AttributeID = AttributeID(shared.AttributeIDConnectionSha256PeerCertificateDigest)
	AttributeIDConnectionTransportFailureReason      AttributeID = AttributeID(shared.AttributeIDConnectionTransportFailureReason)
	AttributeIDConnectionTerminationDetails          AttributeID = AttributeID(shared.AttributeIDConnectionTerminationDetails)
)

Source / destination / connection attributes.

const (
	AttributeIDUpstreamAddress                AttributeID = AttributeID(shared.AttributeIDUpstreamAddress)
	AttributeIDUpstreamPort                   AttributeID = AttributeID(shared.AttributeIDUpstreamPort)
	AttributeIDUpstreamLocalAddress           AttributeID = AttributeID(shared.AttributeIDUpstreamLocalAddress)
	AttributeIDUpstreamTransportFailureReason AttributeID = AttributeID(shared.AttributeIDUpstreamTransportFailureReason)
	AttributeIDUpstreamRequestAttemptCount    AttributeID = AttributeID(shared.AttributeIDUpstreamRequestAttemptCount)
	AttributeIDUpstreamCxPoolReadyDuration    AttributeID = AttributeID(shared.AttributeIDUpstreamCxPoolReadyDuration)
	AttributeIDUpstreamLocality               AttributeID = AttributeID(shared.AttributeIDUpstreamLocality)

	// TLS / DNS / URI — acronyms use all-caps per Go conventions (changed in SDK 1.39-dev).
	AttributeIDUpstreamTLSVersion                  AttributeID = AttributeID(shared.AttributeIDUpstreamTLSVersion)
	AttributeIDUpstreamSubjectLocalCertificate     AttributeID = AttributeID(shared.AttributeIDUpstreamSubjectLocalCertificate)
	AttributeIDUpstreamSubjectPeerCertificate      AttributeID = AttributeID(shared.AttributeIDUpstreamSubjectPeerCertificate)
	AttributeIDUpstreamDNSSanLocalCertificate      AttributeID = AttributeID(shared.AttributeIDUpstreamDNSSanLocalCertificate)
	AttributeIDUpstreamDNSSanPeerCertificate       AttributeID = AttributeID(shared.AttributeIDUpstreamDNSSanPeerCertificate)
	AttributeIDUpstreamURISanLocalCertificate      AttributeID = AttributeID(shared.AttributeIDUpstreamURISanLocalCertificate)
	AttributeIDUpstreamURISanPeerCertificate       AttributeID = AttributeID(shared.AttributeIDUpstreamURISanPeerCertificate)
	AttributeIDUpstreamSha256PeerCertificateDigest AttributeID = AttributeID(shared.AttributeIDUpstreamSha256PeerCertificateDigest)
)

Upstream attributes.

XDS / metadata attributes.

const (
	AttributeIDHealthCheck AttributeID = AttributeID(shared.AttributeIDHealthCheck)
)

Health check attribute.

type BaseCluster

type BaseCluster struct{}

BaseCluster provides no-op implementations of all Cluster lifecycle methods except [Cluster.NewClusterLB], which must always be implemented by the embedder because [ClusterLB.ChooseHost] has no sensible default.

Embed BaseCluster to only override the methods your cluster actually needs:

type myCluster struct {
    up.BaseCluster
    bg up.ClusterGroup
    // ...
}

func (c *myCluster) NewClusterLB() up.ClusterLB { return &myLB{} }

func (c *myCluster) Init(h up.ClusterHandle) {
    // add initial hosts, then:
    h.PreInitComplete()
}

func (c *myCluster) ServerInitialized(_ up.ClusterHandle) {
    c.bg.Go(func(ctx context.Context) { c.watch(ctx) })
    c.bg.Start()
}

func (c *myCluster) Shutdown(_ up.ClusterHandle, done func()) {
    c.bg.Stop()
    done()
}

If neither Init nor ServerInitialized is overridden, BaseCluster.Init calls PreInitComplete (the cluster starts with no hosts) and BaseCluster.Shutdown calls done() immediately.

func (BaseCluster) Close

func (BaseCluster) Close()

Close is a no-op.

func (BaseCluster) DrainStarted

func (BaseCluster) DrainStarted(_ ClusterHandle)

DrainStarted is a no-op.

func (BaseCluster) Init

func (BaseCluster) Init(h ClusterHandle)

Init calls PreInitComplete with no hosts. Override to add initial hosts before signalling Envoy that init is complete.

func (BaseCluster) ServerInitialized

func (BaseCluster) ServerInitialized(_ ClusterHandle)

ServerInitialized is a no-op. Override to start background goroutines via ClusterGroup.Start or to run synchronous cold-start work before Envoy workers begin accepting traffic.

func (BaseCluster) Shutdown

func (BaseCluster) Shutdown(_ ClusterHandle, done func())

Shutdown calls done immediately with no cleanup. Override to stop ClusterGroup goroutines (or other background work) before calling done.

type BodyChunk

type BodyChunk struct {
	Data            []byte
	EndStream       bool
	ContentEncoding string
	ContentType     string
	Context         *any // same per-stream slot as ResponseChunk.Context
}

BodyChunk is passed to a RequestBodyHandlerFunc on every request body event.

In streaming mode the handler is called once per chunk as data arrives. In buffered mode (WithMutableBody) the handler is called exactly once with the full accumulated body when EndStream is true.

The handler is also called synthetically with Data: nil, EndStream: true when the request has no body (GET, DELETE, HEAD, etc.) so body-dependent logic always has a single completion point.

ContentEncoding and ContentType are populated automatically from the request's Content-Encoding and Content-Type headers.

type Buffer

type Buffer struct {

	// Len is the number of bytes available in the buffer.
	Len uint64
	// contains filtered or unexported fields
}

Buffer is a borrowed buffer owned by Envoy.

Copy the data with Bytes or String before retaining it beyond the current callback. UnsafeBytes and UnsafeString avoid the copy and are only valid while Envoy keeps the underlying memory alive.

func (Buffer) Bytes

func (b Buffer) Bytes() []byte

Bytes returns a Go-owned copy of the buffer bytes.

func (Buffer) String

func (b Buffer) String() string

String returns a Go-owned copy of the buffer as a string.

func (Buffer) ToBytes

func (b Buffer) ToBytes() []byte

ToBytes returns a Go-owned copy of the buffer bytes.

func (Buffer) ToString

func (b Buffer) ToString() string

ToString returns a Go-owned copy of the buffer as a string.

func (Buffer) UnsafeBytes

func (b Buffer) UnsafeBytes() []byte

UnsafeBytes returns a borrowed byte slice backed by Envoy memory.

func (Buffer) UnsafeString

func (b Buffer) UnsafeString() string

UnsafeString returns a borrowed string backed by Envoy memory.

type BytesInfo

type BytesInfo = down.BytesInfo

type Cluster

type Cluster = down.Cluster

type ClusterConfigFactory

type ClusterConfigFactory = down.ClusterConfigFactory

type ClusterFactory

type ClusterFactory = down.ClusterFactory

type ClusterFactoryWithMetrics

type ClusterFactoryWithMetrics interface {
	CreateWithMetrics(metrics ClusterMetrics, config []byte) (ClusterConfigFactory, error)
}

ClusterFactoryWithMetrics is an optional extension of ClusterFactory. When implemented, CreateWithMetrics is called instead of Create, providing a ClusterMetrics handle for defining Envoy metrics at config-load time.

type ClusterGroup

type ClusterGroup struct {
	// contains filtered or unexported fields
}

ClusterGroup manages background goroutines scoped to a cluster extension's lifecycle. Goroutines are registered with ClusterGroup.Go, launched at ClusterGroup.Start (called from [Cluster.ServerInitialized]), and stopped when ClusterGroup.Stop is called (called from [Cluster.Shutdown]).

This mirrors the Register + WithGroup pattern from the HTTP filter side: goroutines are declared alongside setup logic and lifecycle plumbing is handled for you.

type myCluster struct {
    bg up.ClusterGroup
}

func (c *myCluster) ServerInitialized(_ up.ClusterHandle) {
    c.bg.Go(func(ctx context.Context) {
        up.RunRetry(ctx, "routes-watch", func(ctx context.Context) error {
            return c.watchRoutes(ctx)
        })
    })
    c.bg.Start()
}

func (c *myCluster) Shutdown(_ up.ClusterHandle, done func()) {
    c.bg.Stop()
    done()
}

func (*ClusterGroup) Go

func (cg *ClusterGroup) Go(fn func(ctx context.Context))

Go registers a context-aware background goroutine. Must be called before ClusterGroup.Start. Panics if called after Start.

func (*ClusterGroup) Start

func (cg *ClusterGroup) Start()

Start launches all registered goroutines. Call exactly once from [Cluster.ServerInitialized]. No-op when no goroutines have been registered.

func (*ClusterGroup) Stop

func (cg *ClusterGroup) Stop()

Stop cancels all goroutines and waits for them to finish. Safe to call multiple times or before Start. Call from [Cluster.Shutdown] before invoking the done callback.

type ClusterHandle

type ClusterHandle = down.ClusterHandle

ClusterHandle gives a Cluster access to Envoy cluster operations. All methods except Schedule must be called on the main thread — AddHosts, RemoveHosts, UpdateHostHealth, FindHostByAddress, and PreInitComplete will silently no-op (and log an envoy_bug error) if invoked from a background goroutine. Use Schedule to marshal mutations back to the main thread from refresh loops or async callbacks. See .agents/skills/transit-cluster-main-thread/SKILL.md for the pattern.

type ClusterLB

type ClusterLB = down.ClusterLB

type ClusterLBCompletion

type ClusterLBCompletion = down.ClusterLBCompletion

type ClusterLBContext

type ClusterLBContext = down.ClusterLBContext

type ClusterLBHandle

type ClusterLBHandle = down.ClusterLBHandle

type ClusterMetrics

type ClusterMetrics interface {
	DefineCounter(name string, tagKeys ...string) (MetricID, error)
	DefineGauge(name string, tagKeys ...string) (MetricID, error)
	DefineHistogram(name string, tagKeys ...string) (MetricID, error)

	IncrementCounter(id MetricID, delta uint64, labelValues ...string) error
	SetGauge(id MetricID, value uint64, labelValues ...string) error
	IncrementGauge(id MetricID, delta uint64, labelValues ...string) error
	DecrementGauge(id MetricID, delta uint64, labelValues ...string) error
	RecordHistogram(id MetricID, value uint64, labelValues ...string) error
}

ClusterMetrics provides access to Envoy cluster metrics. Define metrics during ClusterFactoryWithMetrics.CreateWithMetrics on the main thread; record them at any time during the cluster's lifetime.

type ConfigDecoder

type ConfigDecoder[T any] func(data []byte) (T, error)

ConfigDecoder[T] parses raw bytes into a typed snapshot. Called from the background polling goroutine; must be safe to call concurrently.

func JSONDecoder

func JSONDecoder[T any]() ConfigDecoder[T]

JSONDecoder[T] decodes JSON bytes into T using encoding/json.

func YAMLDecoder

func YAMLDecoder[T any]() ConfigDecoder[T]

YAMLDecoder[T] decodes YAML bytes into T using gopkg.in/yaml.v3.

type ConfigEvent

type ConfigEvent struct {
	Version  string        // opaque; hash or counter; empty on error
	Duration time.Duration // fetch+decode wall time
	Err      error         // non-nil means last-good snapshot was kept
}

ConfigEvent carries diagnostics from one refresh cycle.

type ConfigFunc

type ConfigFunc func(h ConfigHandle) error

ConfigFunc is called once at filter config creation time on the main thread. Use it to define metrics via ConfigHandle.DefineCounter, etc.

type ConfigHandle

type ConfigHandle interface {
	// DefineCounter defines an Envoy counter metric. tagKeys are optional dimension names.
	DefineCounter(name string, tagKeys ...string) (MetricID, error)
	// DefineGauge defines an Envoy gauge metric. tagKeys are optional dimension names.
	DefineGauge(name string, tagKeys ...string) (MetricID, error)
	// DefineHistogram defines an Envoy histogram metric. tagKeys are optional dimension names.
	DefineHistogram(name string, tagKeys ...string) (MetricID, error)
	// FilterConfigBytes returns the raw bytes of the filter_config StringValue
	// passed in the Envoy YAML. Returns nil when no config was provided.
	FilterConfigBytes() []byte
}

ConfigHandle is passed to config callbacks at filter config creation time. Use it to define metrics once — before any requests arrive.

type ConfigSource

type ConfigSource func(ctx context.Context) ([]byte, error)

ConfigSource fetches raw config bytes. May be called from a background goroutine. A non-nil error keeps the last-good snapshot.

type EmptyAccessLogger

type EmptyAccessLogger struct{}

EmptyAccessLogger is a no-op base; embed it to skip unused methods.

func (*EmptyAccessLogger) OnDestroy

func (e *EmptyAccessLogger) OnDestroy()

func (*EmptyAccessLogger) OnLog

type EmptyClusterLB

type EmptyClusterLB = down.EmptyClusterLB

type EmptyLBPolicy

type EmptyLBPolicy = down.EmptyLBPolicy

type ExchangeHooks

type ExchangeHooks[T any] struct {
	// OnRequest initializes and returns the per-stream accumulator.
	// Called at request headers phase.
	OnRequest func(w *Writer, r *Request) T

	// OnResponse is called at response headers (chunk.StatusCode != 0, chunk.Data == nil)
	// and optionally response body (chunk.Data != nil). If nil, response phase is skipped.
	OnResponse func(st T, w *Writer, chunk *ResponseChunk)

	// OnFinalized receives the accumulated state and Envoy's FinalizedInfo.
	// Delivery semantics match WithOnStreamFinalized.
	OnFinalized func(st T, info FinalizedInfo)
}

ExchangeHooks[T] carries typed callbacks for the three phases of an exchange.

type FilterOption

type FilterOption func(*configFactory)

FilterOption configures filter registration. Pass options to Register.

func WithAdminServer

func WithAdminServer(a *AdminServer) FilterOption

WithAdminServer returns a FilterOption that wires a into the filter's Group lifecycle. The server starts when Envoy loads the filter config and stops (with graceful shutdown) when Envoy destroys the factory.

func WithAttributes

func WithAttributes(kvs ...any) FilterOption

WithAttributes pre-bakes key-value pairs into every log line emitted via Writer.Slog for this filter. kvs follows the same alternating key/value convention as log/slog.Logger.With: WithAttributes("scope", "match", "region", "us-west"). The filter name is always included automatically; only pass additional attributes here. Pairs with a non-string key are skipped.

func WithBody

WithBody enables buffered, read-only request body handling. If rb is non-nil, it is called once with the full accumulated request body. Use this when a filter needs the finalized body for inspection or signing but must not replace the request body. Mutually exclusive with WithStreamingBody and WithMutableBody.

Unlike WithMutableBody, request content-length and transfer-encoding are preserved because the request body is not replaced.

func WithConfig

func WithConfig(fn ConfigFunc) FilterOption

WithConfig attaches a config callback invoked once on the main thread when Envoy loads the filter config. Use it to define metrics via ConfigHandle.

func WithExchangeObserver

func WithExchangeObserver[T any](hooks ExchangeHooks[T]) []FilterOption

WithExchangeObserver returns a set of FilterOptions that wire ExchangeHooks[T] into a filter registration. The returned options are passed directly to up.Register. The SDK owns the context slot and pool lifecycle; callers never touch *any.

func WithGroup

func WithGroup(g *Group) FilterOption

WithGroup attaches a Group of background goroutines. The group is started when Envoy loads the filter config and stopped (via Group.Stop) when Envoy destroys the filter factory. The handler and the goroutines in g share state via closure — no package-level variables needed.

Note: if any goroutine in the group exits for any reason — including a normal return — the entire group is stopped via Group.Stop. Goroutines that must survive transient errors should loop internally or use RunRetry.

For background work in cluster extensions (which have no filter factory), use ClusterGroup with ClusterGroup.Start called from [Cluster.ServerInitialized] instead.

func WithLogMetadata

func WithLogMetadata(ns string) FilterOption

WithLogMetadata sets the dynamic-metadata namespace that Writer.AddLogAttrs writes through to. When set, every w.AddLogAttrs("k", v) call also writes w.SetMetadata(ns, "k", v), making those attrs available to the Envoy access log via %DYNAMIC_METADATA(namespace:key)%.

Use a reverse-DNS namespace to avoid collisions:

up.WithLogMetadata("com.example.myfilter")

Unsupported value types (structs, slices, etc.) are serialised to their string representation rather than panicking — see Writer.AddLogAttrs.

func WithMutableBody

func WithMutableBody(rb RequestBodyHandlerFunc) FilterOption

WithMutableBody enables buffered body handling. If rb is non-nil, it is called once with the full accumulated request body; pass nil to buffer the response body only (useful when WithResponse needs the full body but the request body is not of interest). Use Writer.SetRequestBody / SetResponseBody to replace body content. Mutually exclusive with WithStreamingBody and WithBody.

Upstream vs downstream body source

In downstream (listener-side) filters, Envoy pre-fills BufferedRequestBody across StopAndBuffer calls so the handler always receives the complete body via BodyChunk.Data.

In upstream (cluster-side) filters, Envoy does NOT pre-fill BufferedRequestBody when endOfStream=true arrives on the very first body call — i.e. when no prior StopAndBuffer has occurred, which is the common case for small single-frame requests. The framework handles this automatically: BodyChunk.Data is always the full body regardless of which filter chain the filter runs in.

func WithOnStreamComplete

func WithOnStreamComplete(fn OnStreamCompleteFunc) FilterOption

WithOnStreamComplete attaches a stream-termination callback to the filter. See OnStreamCompleteFunc for semantics.

func WithOnStreamFinalized

func WithOnStreamFinalized(fn OnStreamFinalizedFunc) FilterOption

WithOnStreamFinalized attaches a callback that fires after Envoy finalizes the stream and delivers it through the access-logger path. See OnStreamFinalizedFunc for delivery semantics and the YAML requirements.

Coexists with WithOnStreamComplete: cleanup-only consumers should keep using OnStreamComplete; OnStreamFinalized is the right hook when the callback needs finalized durations, byte counts, or response flags.

func WithResponse

func WithResponse(r ResponseHandlerFunc) FilterOption

WithResponse attaches a response observer.

func WithSidecar

func WithSidecar(s *Sidecar) FilterOption

WithSidecar returns a FilterOption that wires s into a new Group and attaches it to the filter. The sidecar starts when Envoy loads the filter config and stops (with graceful shutdown) when Envoy destroys the factory.

func WithStreamingBody

func WithStreamingBody(rb RequestBodyHandlerFunc) FilterOption

WithStreamingBody attaches a request body handler in streaming mode: the handler is called once per chunk as data arrives. For bodyless requests (GET etc.) the handler is called once with Data: nil. Mutually exclusive with WithBody and WithMutableBody.

type FinalizedInfo

type FinalizedInfo struct {
	Timing                      TimingInfo
	Bytes                       BytesInfo
	ResponseCode                uint32
	ResponseCodeDetails         string
	ResponseFlags               uint64
	UpstreamFailure             string
	UpstreamLocalAddress        string
	UpstreamAddress             string
	RequestProtocol             string
	UpstreamPoolReadyDurationNs int64
	UpstreamRequestAttempts     uint32
	TraceID                     string
	SpanID                      string
	TraceSampled                bool
	LocalReplyBody              string
}

FinalizedInfo holds finalized stream fields delivered to an OnStreamFinalizedFunc after Envoy completes the stream. The values mirror what an AccessLoggerHandle would expose at AccessLogTypeDownstreamEnd: durations, byte counts, response flags, upstream attempts, trace ids, and a local-reply body if one was sent.

All durations are in nanoseconds; -1 in a duration means the timing is unavailable. ResponseFlags is the raw bitmask — use ResponseFlagsString to render the human-readable form.

type GRPCCalloutFunc

type GRPCCalloutFunc func(GRPCCalloutResponse)

GRPCCalloutFunc is invoked when a gRPC callout completes. The callback runs on the Envoy worker thread under the same constraints as HTTPCalloutFunc.

type GRPCCalloutRequest

type GRPCCalloutRequest struct {
	Cluster       string
	Authority     string // host/:authority header; defaults to Cluster if empty
	Method        string // full gRPC path, e.g. "/envoy.service.ratelimit.v3.RateLimitService/ShouldRateLimit"
	Message       []byte // raw proto bytes, unframed
	TimeoutMillis uint64
}

GRPCCalloutRequest carries parameters for an outbound gRPC unary callout. Message is the serialised proto body — no gRPC framing needed; GRPCCallout prepends the 5-byte length-prefix frame before sending.

type GRPCCalloutResponse

type GRPCCalloutResponse struct {
	Result      HTTPCalloutResult
	GRPCStatus  uint32 // grpc-status value; 0 = OK
	GRPCMessage string // grpc-message value
	Body        []byte // unframed proto bytes
}

GRPCCalloutResponse is delivered to GRPCCalloutFunc after the callout completes. Body is the unframed proto bytes (5-byte gRPC header stripped), ready to unmarshal. GRPCStatus and GRPCMessage are parsed from grpc-status / grpc-message response headers or trailers; successful responses may omit them, in which case status 0 is implied by Result==HTTPCalloutSuccess.

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group manages a set of background goroutines sharing a common lifecycle. All goroutines start together and stop together when Group.Stop is called.

Register goroutines with Group.Add or Group.AddGoroutine, then call Group.Start exactly once. Call Group.Stop from your filter factory's OnDestroy (done automatically when registered via Register with WithGroup).

func NewGroup

func NewGroup() *Group

NewGroup creates a new Group ready for actor registration.

func (*Group) Add

func (g *Group) Add(execute func() error, stop func())

Add registers an actor with explicit execute and stop functions. execute blocks until the actor finishes; it runs in a background goroutine. stop must cause execute to return promptly — it is called from another goroutine. Panics inside execute are recovered rather than crashing the process.

func (*Group) AddGoroutine

func (g *Group) AddGoroutine(fn func(ctx context.Context))

AddGoroutine registers a context-aware background function. The context is cancelled automatically when Group.Stop is called.

g.AddGoroutine(func(ctx context.Context) {
    t := time.NewTicker(30 * time.Second)
    defer t.Stop()
    for {
        select {
        case <-ctx.Done(): return
        case <-t.C: doWork()
        }
    }
})

func (*Group) Start

func (g *Group) Start()

Start launches all registered goroutines. Call exactly once after all actors are registered. If any actor finishes for any reason (including normal return), Stop is called on the whole group. Actors that must survive transient errors should loop internally rather than returning.

func (*Group) Stop

func (g *Group) Stop()

Stop cancels the shared context, interrupts all actors, and waits for them to finish. Safe to call multiple times; only the first call has effect.

type HTTPCalloutAllSettledFunc

type HTTPCalloutAllSettledFunc func(responses []HTTPCalloutAllSettledResponse)

HTTPCalloutAllSettledFunc is invoked once after all accepted callouts in a group have completed and all init failures have been recorded.

The function runs from the final callout callback or synchronously when all callouts fail initialization. It may call Writer mutation methods, including SendLocalResponse.

type HTTPCalloutAllSettledResponse

type HTTPCalloutAllSettledResponse struct {
	Result  HTTPCalloutResult
	Init    HTTPCalloutInitResult
	Err     error
	Headers [][2]shared.UnsafeEnvoyBuffer
	Body    []shared.UnsafeEnvoyBuffer
}

HTTPCalloutAllSettledResponse is one response delivered to HTTPCalloutAllSettledFunc.

Headers and Body are Go-owned copies, because earlier callout callbacks may return before every callout has settled. Response slots preserve request order: responses[i] corresponds to the request at reqs[i].

Err is non-nil when Envoy rejected the callout at init time or when an accepted callout completed with a non-success result. Callers still receive Init and Result so they can distinguish init failures, resets, and buffer limit errors when choosing aggregate policy.

func (HTTPCalloutAllSettledResponse) Failed

Failed reports whether the response slot represents an init or callout error.

func (HTTPCalloutAllSettledResponse) OK

OK reports whether the callout was accepted and completed successfully.

type HTTPCalloutFunc

type HTTPCalloutFunc func(result HTTPCalloutResult, headers [][2]shared.UnsafeEnvoyBuffer, body []shared.UnsafeEnvoyBuffer)

HTTPCalloutFunc is the callback invoked when an Envoy HTTP callout completes.

LIFETIME: headers and body point into Envoy-owned memory that is only valid for the duration of this call. Copy any value that must outlive the callback. Specifically: do NOT send these slices to another goroutine or store them in a struct that outlives the function — Envoy may reuse or free that memory as soon as the callback returns.

The function runs on the Envoy worker thread (or the goroutine that called HttpCallout synchronously). It may call any Writer mutation method; those mutations are queued and applied by flush() after the callback returns.

type HTTPCalloutInitResult

type HTTPCalloutInitResult uint32

HTTPCalloutInitResult reports whether Envoy accepted the callout request. A non-Success result means no callout was initiated and no callback will fire; the handler should treat it as an immediate error.

type HTTPCalloutRequest

type HTTPCalloutRequest struct {
	Cluster       string
	Headers       [][2]string
	Body          []byte
	TimeoutMillis uint64
}

HTTPCalloutRequest carries the parameters for an outbound Envoy HTTP callout. Cluster must name a cluster defined in the Envoy bootstrap config; if it does not exist, HTTPCallout returns HTTPCalloutInitClusterNotFound. Headers must include at minimum :method, :path, and host. Include :scheme when the upstream or route logic needs it. TimeoutMillis of 0 uses Envoy's default callout timeout.

type HTTPCalloutResponse

type HTTPCalloutResponse struct {
	Result  HTTPCalloutResult
	Headers [][2]shared.UnsafeEnvoyBuffer
	Body    []shared.UnsafeEnvoyBuffer
}

HTTPCalloutResponse is the value returned by Writer.Do.

Headers and Body are Go-owned copies made inside the callout callback before the response is sent across the goroutine channel. They are safe to read after Do returns, even though the underlying Envoy memory may have been freed.

For Writer.HTTPCallout (callback form), the buffers are NOT copied — they are passed directly to HTTPCalloutFunc and are only valid during that call.

type HTTPCalloutResult

type HTTPCalloutResult uint32

HTTPCalloutResult is the terminal outcome of an accepted callout, delivered to HTTPCalloutFunc or via HTTPCalloutResponse.Result from Writer.Do.

const (
	// HTTPCalloutSuccess means the upstream responded; headers and body are valid.
	HTTPCalloutSuccess HTTPCalloutResult = HTTPCalloutResult(shared.HttpCalloutSuccess)

	// HTTPCalloutReset means the connection was reset before a response arrived.
	// headers and body will be empty.
	HTTPCalloutReset HTTPCalloutResult = HTTPCalloutResult(shared.HttpCalloutReset)

	// HTTPCalloutExceedResponseBufferLimit means the response body was larger
	// than Envoy's configured callout buffer limit. headers may be present;
	// body will be truncated or empty.
	HTTPCalloutExceedResponseBufferLimit HTTPCalloutResult = HTTPCalloutResult(shared.HttpCalloutExceedResponseBufferLimit)
)

type HTTPCalloutSequenceDoneFunc

type HTTPCalloutSequenceDoneFunc func(responses []HTTPCalloutAllSettledResponse)

HTTPCalloutSequenceDoneFunc is invoked once when HTTPCalloutSequenceNextFunc stops the sequence. It runs from the callout callback path and may call Writer mutation methods, including SendLocalResponse.

type HTTPCalloutSequenceNextFunc

type HTTPCalloutSequenceNextFunc func(attempt int, previous *HTTPCalloutAllSettledResponse) (req HTTPCalloutRequest, ok bool)

HTTPCalloutSequenceNextFunc decides the next request in a sequential callout chain. attempt is zero-based. previous is nil for attempt 0, then points to the response from the immediately preceding attempt. Return ok=false to stop the sequence and run HTTPCalloutSequenceDoneFunc with all collected responses.

type HandlerFunc

type HandlerFunc func(w *Writer, r *Request)

HandlerFunc is called on every request.

func Chain

func Chain(h HandlerFunc, mw ...Middleware) HandlerFunc

Chain wraps h with the given middleware in left-to-right order: the first middleware in the list is outermost (runs first).

type HostEntry

type HostEntry struct {
	Spec HostSpec
	Host HostPtr
}

HostEntry pairs a user key's resolved spec with its live Envoy HostPtr.

type HostHealth

type HostHealth = down.HostHealth

HostHealth and HostStat are shared with LB Policy; defined here for convenience.

type HostPtr

type HostPtr = down.HostPtr

type HostRefreshEvent

type HostRefreshEvent struct {
	// Duration covers the full fetch + apply round-trip.
	Duration time.Duration

	// Added, Removed, Unchanged count host changes for the cycle.
	// All three are zero when Err is non-nil.
	Added     int
	Removed   int
	Unchanged int

	// Err is non-nil when the snapshot fetch returned an error.
	Err error
}

HostRefreshEvent carries diagnostics from a single refresh cycle.

type HostRefreshLoop

type HostRefreshLoop[K comparable] struct {
	// contains filtered or unexported fields
}

HostRefreshLoop periodically refreshes a HostSet from a user-supplied snapshot function.

func NewHostRefreshLoop

func NewHostRefreshLoop[K comparable](
	handle ClusterHandle,
	set *HostSet[K],
	snapshot HostSnapshotFunc[K],
	opts HostRefreshOptions,
) *HostRefreshLoop[K]

NewHostRefreshLoop creates a HostRefreshLoop. Call Start to begin ticking.

func (*HostRefreshLoop[K]) RefreshOnce

func (l *HostRefreshLoop[K]) RefreshOnce(ctx context.Context) error

RefreshOnce fetches a snapshot and schedules an apply via ClusterHandle.Schedule, blocking until the apply has run on the main thread (or until ctx is cancelled). Returns the snapshot fetch error, if any; on error the apply is not scheduled.

func (*HostRefreshLoop[K]) Start

func (l *HostRefreshLoop[K]) Start()

Start launches the background ticker goroutine. It does not perform an immediate refresh — call RefreshOnce first if a warm host set is needed.

func (*HostRefreshLoop[K]) Stop

func (l *HostRefreshLoop[K]) Stop()

Stop cancels the ticker, waits for any in-flight fetch to finish, and waits for any already-scheduled apply to complete before returning. After Stop returns no further Apply will run.

type HostRefreshObserver

type HostRefreshObserver func(HostRefreshEvent)

HostRefreshObserver is called on the cluster main thread after each Apply (or on error).

type HostRefreshOptions

type HostRefreshOptions struct {
	// Interval between refreshes. Zero uses DefaultRefreshInterval.
	Interval time.Duration
	// Timeout for each snapshot fetch. Zero uses DefaultRefreshTimeout.
	Timeout time.Duration
	// Jitter adds a random ±Jitter offset to each tick interval.
	Jitter time.Duration
	// Observe is called on the cluster main thread after each Apply.
	// Optional; nil disables the observer.
	Observe HostRefreshObserver
}

HostRefreshOptions configures a HostRefreshLoop.

type HostResult

type HostResult struct {
	Host      HostPtr // nil → use ErrDetail
	ErrDetail string  // non-empty → complete with error
}

HostResult is returned by the lookup function to resolve a host.

type HostSet

type HostSet[K comparable] struct {
	// contains filtered or unexported fields
}

HostSet manages a published, atomically-readable host map. Apply must be called on the cluster main thread. Get, Entry, and Current are safe from any goroutine.

func NewHostSet

func NewHostSet[K comparable](handle ClusterHandle) *HostSet[K]

NewHostSet creates a HostSet backed by handle. The initial published map is empty.

func (*HostSet[K]) Apply

func (s *HostSet[K]) Apply(snapshot HostSnapshot[K])

Apply replaces the desired host snapshot. It must be called on the cluster main thread — it calls AddHosts, UpdateHostHealth, and RemoveHosts, all of which are main-thread-only. From a background goroutine, build the snapshot off-thread and use ClusterHandle.Schedule to invoke Apply on the main thread.

Ordering guarantee: add → publish → remove so ChooseHost never sees a dangling pointer.

func (*HostSet[K]) Current

func (s *HostSet[K]) Current() map[K]HostEntry

Current returns a freshly-copied snapshot of the published map. Safe from any goroutine. Callers that only need one key should prefer Get or Entry.

func (*HostSet[K]) Entry

func (s *HostSet[K]) Entry(key K) (HostEntry, bool)

Entry returns the full HostEntry for key. Safe from any goroutine.

func (*HostSet[K]) Get

func (s *HostSet[K]) Get(key K) (HostPtr, bool)

Get returns the live HostPtr for key. Safe from any goroutine.

type HostSnapshot

type HostSnapshot[K comparable] map[K]HostSpec

HostSnapshot is the complete desired host set keyed by caller's key type. A missing key means "remove that host". Pass nil or an empty map to remove all.

type HostSnapshotFunc

type HostSnapshotFunc[K comparable] func(ctx context.Context) (HostSnapshot[K], error)

HostSnapshotFunc fetches the current desired host snapshot. It may be called from a background goroutine. Return a non-nil error to keep the previous snapshot.

type HostSpec

type HostSpec = down.HostSpec

type HostStat

type HostStat = down.HostStat

HostHealth and HostStat are shared with LB Policy; defined here for convenience.

type HttpHeaderType

type HttpHeaderType = down.HttpHeaderType

type LBContext

type LBContext = down.LBContext

type LBHandle

type LBHandle = down.LBHandle

type LBPolicy

type LBPolicy = down.LBPolicy

type LBPolicyConfigFactory

type LBPolicyConfigFactory = down.LBPolicyConfigFactory

type LBPolicyFactory

type LBPolicyFactory = down.LBPolicyFactory

type LogLevel

type LogLevel uint32

LogLevel controls Envoy log severity for Writer.Log.

type MetadataSource

type MetadataSource uint32

MetadataSource identifies which metadata store to read from.

type MetricID

type MetricID uint64

MetricID is an opaque handle to an Envoy metric defined at config time via ConfigHandle.

type Middleware

type Middleware func(next HandlerFunc) HandlerFunc

Middleware wraps a HandlerFunc, enabling before/after logic around a handler.

type OnStreamCompleteFunc

type OnStreamCompleteFunc func(ctx *any)

OnStreamCompleteFunc is called exactly once per stream after Envoy terminates it, regardless of how the stream ended (normal end-of-stream, reset, idle/request timeout, local reply from another filter). Use it for cleanup that must run even when the request/response handlers did not — e.g. removing entries from process-wide registries.

ctx is the per-stream context slot (same value Request.Context and BodyChunk.Context point at). Mutations are no-ops at this point: the stream is dead and any queued header/filter-state changes will not be applied. Do not call Writer methods from here.

type OnStreamFinalizedFunc

type OnStreamFinalizedFunc func(ctx *any, info FinalizedInfo)

OnStreamFinalizedFunc fires on the worker thread after Envoy finalizes the stream, before the per-stream context is released. Like OnStreamCompleteFunc it carries no Writer and mutations are not possible — the stream is dead. The strict superset over OnStreamComplete is the finalized stream fields in FinalizedInfo, which Envoy only exposes through the access-logger path.

Delivery: when a filter is registered with WithOnStreamFinalized the SDK auto-registers an internal access logger under the same name and correlates filter ↔ logger via the request id (x-request-id), or an SDK fallback request header when the request id is empty. The listener Envoy YAML must include an access_log entry pointing at this dynamic module with logger_name equal to the filter name (see examples/request-ui/envoy.yaml). If the access-logger entry is missing from the YAML, the callback will not fire — Envoy delivers finalized fields only through that path.

type PipelineConfig

type PipelineConfig[T any] struct {
	// contains filtered or unexported fields
}

PipelineConfig[T] holds a decoded config snapshot refreshable from a source. Snapshot() is safe from any goroutine. The request path never calls the source.

func NewFileConfig

func NewFileConfig[T any](path string, dec ConfigDecoder[T], opts PollOptions) *PipelineConfig[T]

NewFileConfig returns a PipelineConfig backed by a file path. The file is read fresh on every poll tick; path is evaluated at fetch time.

func NewPollingConfig

func NewPollingConfig[T any](src ConfigSource, dec ConfigDecoder[T], opts PollOptions) *PipelineConfig[T]

NewPollingConfig returns a PipelineConfig backed by an arbitrary source. The first fetch fires immediately on Start; subsequent fetches tick at Interval ± Jitter.

func NewStaticConfig

func NewStaticConfig[T any](v T) *PipelineConfig[T]

NewStaticConfig returns a PipelineConfig whose snapshot never changes. Start is a no-op. Snapshot() returns v immediately.

func (*PipelineConfig[T]) RefreshOnce

func (p *PipelineConfig[T]) RefreshOnce(ctx context.Context) error

RefreshOnce fetches and decodes a single snapshot, blocking until done or ctx is cancelled. Updates Snapshot() on success; on error keeps last-good. Useful for warming the cache before Start() begins ticking.

func (*PipelineConfig[T]) SignalRefresh

func (p *PipelineConfig[T]) SignalRefresh()

SignalRefresh requests an immediate refresh (called by file watchers, etc). Non-blocking; ignored if a refresh is already pending.

func (*PipelineConfig[T]) Snapshot

func (p *PipelineConfig[T]) Snapshot() T

Snapshot returns the current decoded config. Returns the zero value if no successful fetch has completed yet. Never returns a partially-updated value.

func (*PipelineConfig[T]) Start

func (p *PipelineConfig[T]) Start(ctx context.Context) (stop func())

Start launches the background polling goroutine. No-op for static configs. The first fetch fires immediately; subsequent fetches tick at Interval ± Jitter. The returned stop func cancels polling and waits for any in-flight fetch to finish. Wire through up.WithGroup or call from Cluster.ServerInitialized / OnDestroy.

type PollOptions

type PollOptions struct {
	Interval time.Duration // zero → DefaultPollInterval
	Timeout  time.Duration // zero → DefaultPollTimeout; applied per fetch attempt
	Jitter   time.Duration // random [0, Jitter) added to each interval

	// Observe is called after every refresh cycle (success or failure).
	// Called from the polling goroutine; must not block indefinitely.
	Observe func(ConfigEvent)
}

PollOptions configures the polling behaviour for NewPollingConfig / NewFileConfig.

type Request

type Request struct {
	Method     string
	Path       string
	Host       string
	FilterName string
	Context    *any
	// contains filtered or unexported fields
}

Request holds the per-request fields populated before the handler is called.

func NewRequest

func NewRequest(headers shared.HeaderMap, name string) *Request

NewRequest constructs a Request from a HeaderMap for use in tests.

func (*Request) AllHeaders

func (r *Request) AllHeaders() [][2]string

AllHeaders returns all request headers as copied Go strings.

func (*Request) Header

func (r *Request) Header(name string) string

Header returns the first value of the named request header, or "" if absent.

type RequestBodyHandlerFunc

type RequestBodyHandlerFunc func(w *Writer, chunk *BodyChunk)

RequestBodyHandlerFunc is called for each request body event.

type ResponseChunk

type ResponseChunk struct {
	StatusCode      int
	Headers         shared.HeaderMap
	Data            []byte
	EndStream       bool
	ContentEncoding string
	ContentType     string
	Context         *any
}

ResponseChunk is passed to a ResponseHandlerFunc on every response event.

On the response headers call, StatusCode is non-zero and Headers is set. On response body calls, StatusCode is 0 and Data holds the received bytes. A synthetic body call with Data: nil, EndStream: true is issued when the response has no body (204, HEAD, etc.) so body logic always fires once.

ContentEncoding and ContentType are populated automatically from the response's Content-Encoding and Content-Type headers. Context is a per-stream slot shared across all callbacks on one stream.

func (*ResponseChunk) AllHeaders

func (c *ResponseChunk) AllHeaders() [][2]string

AllHeaders returns all response headers as copied Go strings. It is only meaningful when StatusCode is non-zero (the headers call).

type ResponseHandlerFunc

type ResponseHandlerFunc func(w *Writer, chunk *ResponseChunk)

ResponseHandlerFunc is called for each response event on a stream.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router dispatches a request to a handler matched by HTTP method and exact path. Unmatched requests are forwarded to the notFound func; if notFound is nil, they are silently passed through.

func NewRouter

func NewRouter(notFound func(*Writer, *Request)) *Router

NewRouter returns a Router that calls notFound for unmatched requests.

func (*Router) DELETE

func (r *Router) DELETE(path string, handler func(*Writer, *Request)) *Router

DELETE registers handler for DELETE requests to the exact path.

func (*Router) DELETEPrefix

func (r *Router) DELETEPrefix(prefix string, handler func(*Writer, *Request)) *Router

DELETEPrefix registers handler for DELETE requests to any path with the given prefix.

func (*Router) Dispatch

func (r *Router) Dispatch(w *Writer, req *Request)

Dispatch is the request handler func for Register. Exact routes take priority over prefix routes; among each group, first registration wins.

func (*Router) GET

func (r *Router) GET(path string, handler func(*Writer, *Request)) *Router

GET registers handler for GET requests to the exact path.

func (*Router) GETPrefix

func (r *Router) GETPrefix(prefix string, handler func(*Writer, *Request)) *Router

GETPrefix registers handler for GET requests to any path with the given prefix.

func (*Router) Handle

func (r *Router) Handle(method, path string, handler func(*Writer, *Request)) *Router

Handle registers handler for the given HTTP method and exact path. Returns r for chaining.

func (*Router) HandlePrefix

func (r *Router) HandlePrefix(method, pathPrefix string, handler func(*Writer, *Request)) *Router

HandlePrefix registers handler for the given HTTP method and any path that starts with the given prefix. Exact routes are checked first; prefix routes are checked in registration order among themselves.

func (*Router) POST

func (r *Router) POST(path string, handler func(*Writer, *Request)) *Router

POST registers handler for POST requests to the exact path.

func (*Router) POSTPrefix

func (r *Router) POSTPrefix(prefix string, handler func(*Writer, *Request)) *Router

POSTPrefix registers handler for POST requests to any path with the given prefix.

type SelectorObserver

type SelectorObserver struct {
	// OnSelected is called (on cluster main thread) when a host is chosen.
	OnSelected func(host HostPtr)
	// OnFailed is called (on cluster main thread) when the lookup returns an error.
	OnFailed func(errDetail string)
	// OnCancelled is called (on cluster main thread) when CancelHostSelection fires.
	OnCancelled func()
	// OnMissingPromise is called (on cluster main thread) when no promise is
	// found in the stream-object bag (key absent or wrong type).
	OnMissingPromise func()
}

SelectorObserver receives events from AsyncHostSelector. All nil fields are treated as no-ops.

type Sidecar

type Sidecar struct {
	// contains filtered or unexported fields
}

Sidecar wraps an http.Handler with net.Listen, readiness, and graceful shutdown.

func NewSidecar

func NewSidecar(handler http.Handler, opts SidecarOptions) *Sidecar

NewSidecar creates a new Sidecar wrapping the given handler.

func (*Sidecar) ListenAddr

func (s *Sidecar) ListenAddr() string

ListenAddr returns the resolved listen address (e.g. "127.0.0.1:43210"). Returns "" before Ready() closes.

func (*Sidecar) Ready

func (s *Sidecar) Ready() <-chan struct{}

Ready returns a channel that is closed after net.Listen succeeds and before srv.Serve is called. ListenAddr() returns a valid address after Ready() closes.

type SidecarOptions

type SidecarOptions struct {
	ListenAddr      string
	ShutdownTimeout time.Duration // default 5s
	EgressURL       string        // empty = break-glass direct-dial
	Rationale       string        // required when EgressURL is empty
	OnSession       func(SidecarSessionEvent)
	// StartupLogFile, when non-empty, causes the startup rationale to also be
	// written to this file (one line). Used by e2e tests to assert the rationale
	// without capturing raw stderr.
	StartupLogFile string
}

SidecarOptions configures a Sidecar.

type SidecarSessionEvent

type SidecarSessionEvent struct {
	Path     string
	Start    time.Time
	Duration time.Duration
	Err      error
}

SidecarSessionEvent is delivered to SidecarOptions.OnSession when a session ends.

type StreamKey

type StreamKey[T any] struct {
	// contains filtered or unexported fields
}

StreamKey[T] is a typed handle for one slot in the per-stream object bag (Primitive A). The zero value is unusable; construct with NewStreamKey.

StreamKey is a thin, allocation-free wrapper around a string key. All type safety is enforced at compile time via the generic parameter T; no reflect or interface{} type switches are needed at the call site.

func NewStreamKey

func NewStreamKey[T any](key string) StreamKey[T]

NewStreamKey returns a StreamKey that uses key as its bag slot name. key must be unique within a pipeline; conventionally use a dotted reverse- domain form: "orange.decision", "mcp.session", etc.

func (StreamKey[T]) Get

func (k StreamKey[T]) Get(w *Writer) (T, bool)

Get retrieves the value from the per-stream bag via w. Returns (zero, false) if the slot has not been set or the stored value cannot be asserted to T (which should not happen in correct usage). Must be called on the worker thread.

func (StreamKey[T]) GetFromCtx

func (k StreamKey[T]) GetFromCtx(ctx ClusterLBContext) (T, bool)

GetFromCtx retrieves the value from the per-stream bag via a ClusterLBContext. Returns (zero, false) if the slot has not been set, the context has no bag, or the stored value cannot be asserted to T. Safe to call from the cluster main thread (same constraint as ClusterLBContext.GetStreamObject).

func (StreamKey[T]) Key

func (k StreamKey[T]) Key() string

Key returns the underlying string key.

func (StreamKey[T]) Set

func (k StreamKey[T]) Set(w *Writer, v T)

Set stores v in the per-stream bag via w. Must be called on the worker thread (same constraint as Writer.SetStreamObject).

type StreamPromise

type StreamPromise[T any] struct {
	// contains filtered or unexported fields
}

StreamPromise[T] is a resolve-once promise. The zero value is unusable; construct with NewStreamPromise.

Concurrency model:

  • mu guards resolved, value, and callbacks.
  • done is a channel closed on first Resolve; callers may wait on it.
  • resolvedOnce ensures close(done) happens exactly once even under concurrent Resolve calls racing with OnResolve.

func NewStreamPromise

func NewStreamPromise[T any]() *StreamPromise[T]

NewStreamPromise returns a new, unresolved StreamPromise[T].

func (*StreamPromise[T]) Done

func (p *StreamPromise[T]) Done() <-chan struct{}

Done returns a channel that is closed when the promise is resolved. Safe to use in a select.

func (*StreamPromise[T]) OnResolve

func (p *StreamPromise[T]) OnResolve(cb func(T)) (cancel func())

OnResolve registers cb to fire when the promise is resolved. If the promise is already resolved, cb fires synchronously before OnResolve returns.

Returns a cancel func. Calling cancel prevents cb from firing if it has not fired yet; cancel is idempotent and safe to call multiple times.

func (*StreamPromise[T]) Resolve

func (p *StreamPromise[T]) Resolve(v T) bool

Resolve publishes v. The first call wins and returns true; subsequent calls are no-ops and return false.

All registered OnResolve callbacks that have not been canceled are fired synchronously from the goroutine that calls Resolve, in registration order.

func (*StreamPromise[T]) Result

func (p *StreamPromise[T]) Result() (T, bool)

Result returns (v, true) if the promise is resolved, or (zero, false) if not.

type TimingInfo

type TimingInfo = down.TimingInfo

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer is a thin per-invocation view over filter. It carries only the state scoped to a single handler invocation: calloutCB, calloutStarted, and directWrite. All mutable stream state — mutation queues, callout fn/state, async state — lives on the backing filter, which is long-lived for the stream.

A Writer must not be retained beyond the handler call. On the HTTPCallout path the same Writer is reused across the initial handler call and the callout callback; both run sequentially within one stream operation so this is safe. See Do for goroutine-safety rules when using w.Go and fan-out.

directWrite mode (set only by NewWriter):

Filter-created Writers (all production paths) set directWrite=false; mutation methods queue unconditionally so flush() applies them on the worker thread at the right time. NewWriter sets directWrite=true so that mutation methods apply directly to the handle — this preserves the behaviour expected by tests and examples that call handler functions outside a filter lifecycle, where there is no enclosing flush() call.

func NewWriter

func NewWriter(h shared.HttpFilterHandle) *Writer

NewWriter wraps handle in a Writer that applies mutations directly (no queue). Intended for use in tests and examples that invoke handler functions outside a filter lifecycle. Tests that need callout or async lifecycle behavior should instantiate filter directly.

func (*Writer) AddLogAttrs

func (w *Writer) AddLogAttrs(kvs ...any)

AddLogAttrs appends key-value pairs to the per-request log context.

When WithLogMetadata is configured, each attr is also written to dynamic metadata under the filter's namespace so it is accessible from the Envoy access log via %DYNAMIC_METADATA(ns:key)%. Value types that Envoy's metadata store does not natively support (structs, slices, etc.) are serialised to their string representation rather than panicking.

By default the attrs are NOT printed inline in the process-log message; they flow to the access log through metadata. Use [WithInlineLogAttrs] to also print them inline (useful for local debugging without a JSON access log).

For a one-off derived logger without persisting attrs on the writer, use w.Slog().With(kvs...) directly instead.

func (*Writer) AddRequestHeader

func (w *Writer) AddRequestHeader(name, value string)

AddRequestHeader queues (or immediately adds) a request header (multi-value).

func (*Writer) AddResponseHeader

func (w *Writer) AddResponseHeader(name, value string)

AddResponseHeader adds a response header inline. Response-phase only; not queued.

func (*Writer) DecrementGauge

func (w *Writer) DecrementGauge(id MetricID, delta uint64)

DecrementGauge queues (or immediately applies) a gauge decrement.

func (*Writer) Do

Do performs an Envoy HTTP callout from inside a Go goroutine and blocks until the callout completes or ctx is cancelled.

Multiple Do calls may be in flight concurrently. Writer mutation methods (SetRequestHeader, SetFilterState, etc.) are NOT goroutine-safe and must not be called while any Do goroutine is still running. Join all fan-out goroutines before issuing any mutations; the race detector will catch violations if this rule is broken.

Panics if called outside a Go goroutine.

Response buffers (Headers, Body) are Go-owned copies safe to use after Do returns.

func (*Writer) GRPCCallout

GRPCCallout issues a gRPC unary callout over an Envoy HTTP/2 cluster, pausing the current request until the response arrives. fn is called with the decoded response; it may queue mutations or call SendLocalResponse.

Internally uses Envoy's streamable HTTP callout API so response trailers are observed separately; gRPC status normally lives in trailers.

Returns a non-nil error if Envoy rejected the callout; fn will not be called.

func (*Writer) GetActiveSpan

func (w *Writer) GetActiveSpan() shared.Span

GetActiveSpan returns the active tracing span for the current stream.

func (*Writer) GetAttributeBool

func (w *Writer) GetAttributeBool(id AttributeID) (bool, bool)

GetAttributeBool returns the boolean stream attribute for the given ID.

func (*Writer) GetAttributeNumber

func (w *Writer) GetAttributeNumber(id AttributeID) (float64, bool)

GetAttributeNumber returns the numeric stream attribute for the given ID.

func (*Writer) GetAttributeString

func (w *Writer) GetAttributeString(id AttributeID) (Buffer, bool)

GetAttributeString returns the string stream attribute for the given ID.

func (*Writer) GetFilterState

func (w *Writer) GetFilterState(key string) (string, bool)

GetFilterState reads a string filter state value previously written by SetFilterState. Returns the empty string and false if the key is absent.

func (*Writer) GetMetadataBool

func (w *Writer) GetMetadataBool(source MetadataSource, ns, key string) (bool, bool)

GetMetadataBool reads a boolean metadata value from the given source.

func (*Writer) GetMetadataNumber

func (w *Writer) GetMetadataNumber(source MetadataSource, ns, key string) (float64, bool)

GetMetadataNumber reads a numeric metadata value from the given source.

func (*Writer) GetMetadataString

func (w *Writer) GetMetadataString(source MetadataSource, ns, key string) (Buffer, bool)

GetMetadataString reads a string metadata value from the given source. Returns a Buffer (copy the data with buf.String() before the callback returns).

func (*Writer) GetStreamObject

func (w *Writer) GetStreamObject(key string) (any, bool)

GetStreamObject returns the value stored under key for this stream, or (nil, false) if the key was never set. Must be called on the worker thread. If SetStreamObject was never called on this stream no allocation occurs and no nonce is minted.

func (*Writer) Go

func (w *Writer) Go(fn func(ctx context.Context))

Go upgrades this request to asynchronous mode. fn runs in a new goroutine and may call w.Do to issue outbound HTTP callouts. After fn returns, Transit hops back to the Envoy worker thread, applies queued mutations, and resumes the stream — unless the stream was cancelled while fn was running.

goStarted is cleared inside the scheduled finish before flush(true) so that subsequent callbacks (e.g. OnRequestBody on a request that still has a body) do not see stale goroutine state and spuriously return StopAndBuffer.

Panics if called twice or after HTTPCallout.

SendLocalResponse from inside fn is NOT reliable — see type-level docs.

func (*Writer) HTTPCallout

HTTPCallout initiates an outbound Envoy HTTP callout from a request callback and pauses the request until the callout completes. fn is invoked with the callout result; it may queue mutations or call SendLocalResponse.

Returns HTTPCalloutInitSuccess and nil error if Envoy accepted the callout. A non-nil error means fn will never be called.

Panics if called after Go or after a previous HTTPCallout.

func (*Writer) HTTPCalloutAllSettled

func (w *Writer) HTTPCalloutAllSettled(reqs []HTTPCalloutRequest, fn HTTPCalloutAllSettledFunc) error

HTTPCalloutAllSettled initiates multiple outbound Envoy HTTP callouts from a request callback and pauses the request until all accepted callouts complete. fn is invoked exactly once with one response slot per request; init failures are recorded in their corresponding response slot.

Response headers and body buffers are Go-owned copies safe to read after individual callout callbacks return. fn may queue mutations or call SendLocalResponse.

Panics if called after Go or after another HTTPCallout/HTTPCalloutAllSettled.

func (*Writer) HTTPCalloutSequence

func (w *Writer) HTTPCalloutSequence(next HTTPCalloutSequenceNextFunc, done HTTPCalloutSequenceDoneFunc) error

HTTPCalloutSequence initiates outbound Envoy HTTP callouts one at a time from a request callback and pauses the request until done runs. next receives the previous response and decides whether to issue another callout. done runs from the callback path, so SendLocalResponse is reliable there.

Panics if called after Go or after another HTTPCallout/HTTPCalloutAllSettled.

func (*Writer) IncrementCounter

func (w *Writer) IncrementCounter(id MetricID, delta uint64)

IncrementCounter queues (or immediately applies) a counter increment.

func (*Writer) IncrementCounterLabels

func (w *Writer) IncrementCounterLabels(id MetricID, delta uint64, labelValues ...string)

IncrementCounterLabels queues (or immediately applies) a counter increment with label values matching the tag keys used when the counter was defined.

func (*Writer) IncrementGauge

func (w *Writer) IncrementGauge(id MetricID, delta uint64)

IncrementGauge queues (or immediately applies) a gauge increment.

func (*Writer) Log

func (w *Writer) Log(level LogLevel, format string, args ...any)

Log emits a message via Envoy's logging mechanism at the given severity.

func (*Writer) RecordHistogram

func (w *Writer) RecordHistogram(id MetricID, value uint64)

RecordHistogram queues (or immediately applies) a histogram observation.

func (*Writer) RecordHistogramLabels

func (w *Writer) RecordHistogramLabels(id MetricID, value uint64, labelValues ...string)

RecordHistogramLabels queues (or immediately applies) a histogram observation with label values matching the tag keys used when the histogram was defined.

func (*Writer) RemoveRequestHeader

func (w *Writer) RemoveRequestHeader(name string)

RemoveRequestHeader queues (or immediately removes) all values for the named header.

func (*Writer) RemoveResponseHeader

func (w *Writer) RemoveResponseHeader(name string)

RemoveResponseHeader removes a response header inline. Response-phase only; not queued.

func (*Writer) RequestHeader

func (w *Writer) RequestHeader(name string) string

RequestHeader returns the first current value of the named request header, or "" if absent. Header mutations queued earlier in the same callback are reflected in the returned value.

func (*Writer) RequestHeaders

func (w *Writer) RequestHeaders() [][2]string

RequestHeaders returns all current request headers as copied Go strings. Header mutations queued earlier in the same callback are reflected in the returned view, even though they are applied to Envoy only when the callback flushes.

func (*Writer) SendLocalResponse

func (w *Writer) SendLocalResponse(status int, body []byte, headers ...[2]string)

SendLocalResponse queues (or immediately sends) a client response. Only the first call takes effect; additional calls are silently ignored.

NOTE: SendLocalResponse from inside w.Go is NOT reliable. Envoy only honours it from filter callbacks. Use HTTPCallout (callback form) if the filter needs to reject with a local response.

func (*Writer) SetFilterState

func (w *Writer) SetFilterState(key, value string)

SetFilterState queues (or immediately writes) a per-stream filter state value.

func (*Writer) SetGauge

func (w *Writer) SetGauge(id MetricID, value uint64)

SetGauge queues (or immediately applies) a gauge absolute assignment.

func (*Writer) SetMetadata

func (w *Writer) SetMetadata(ns, key string, value any)

SetMetadata writes a per-stream dynamic metadata value. value must be a string, bool, or numeric type (int, int64, float64, etc.). Panics for unsupported types rather than silently producing a no-op. On the request path (queued mode) this is batched with other mutations and applied in flush() before ContinueRequest. On the response path (directWrite=true) it applies immediately.

func (*Writer) SetRequestBody

func (w *Writer) SetRequestBody(data []byte)

SetRequestBody marks data as the replacement for the request body buffer. Only effective in mutable buffered request mode (WithMutableBody).

func (*Writer) SetRequestHeader

func (w *Writer) SetRequestHeader(name, value string)

SetRequestHeader queues (or immediately sets) a request header.

func (*Writer) SetResponseBody

func (w *Writer) SetResponseBody(data []byte)

SetResponseBody marks data as the replacement for the response body buffer. Only effective in buffered mode (WithMutableBody).

func (*Writer) SetResponseHeader

func (w *Writer) SetResponseHeader(name, value string)

SetResponseHeader sets a response header inline. Response-phase callbacks only. Not queued — response mutations are always applied immediately.

func (*Writer) SetStreamObject

func (w *Writer) SetStreamObject(key string, v any)

SetStreamObject stores v under key in the per-stream typed-value bag (Primitive A). Must be called on the worker thread — same constraint as SetFilterState.

On the first call for a stream, getOrCreateBag mints a short random nonce, queues a SetFilterState write of that nonce under the reserved key "up.stream_object_id" (via the same queued-mutation path as SetFilterState), and creates the bag. Subsequent calls for the same stream reuse the bag.

The nonce in filter state lets ClusterLBContext.GetStreamObject look up the bag. The bag is drained by OnStreamComplete (or finalizedLogger.OnLog for filters using WithOnStreamFinalized) after all user callbacks have run.

func (*Writer) SetTypedMetadata

func (w *Writer) SetTypedMetadata(ns, key string, value any)

SetTypedMetadata is like SetMetadata but numeric values are automatically formatted as decimal strings before being written. This satisfies the Envoy dynamic modules ABI constraint (metadata values must be strings) while letting callers pass typed values without manual strconv calls.

func (*Writer) SetUpstreamOverrideHost

func (w *Writer) SetUpstreamOverrideHost(host string, strict bool) bool

SetUpstreamOverrideHost queues (or immediately sets) an upstream host override. Returns true when queued (optimistic). Returns the handle's result in direct-write mode.

func (*Writer) Slog

func (w *Writer) Slog() *slog.Logger

Slog returns a *slog.Logger whose handler routes through Envoy's logging mechanism. The logger automatically prepends filter=<name> to every line, followed by any attributes registered via WithAttributes. Call once at the top of a handler and reuse the result rather than calling Slog repeatedly.

Directories

Path Synopsis
Package buffer provides zero-allocation stream buffering for Envoy filters.
Package buffer provides zero-allocation stream buffering for Envoy filters.
Package compress handles Content-Encoding compression for HTTP body inspection.
Package compress handles Content-Encoding compression for HTTP body inspection.
Package testutil provides test helpers for up package filters.
Package testutil provides test helpers for up package filters.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL