Documentation
¶
Overview ¶
Package up provides the user-facing API for transit HTTP filter handlers. This file defines all types related to asynchronous HTTP callouts and the callback callout and goroutine callout modes:
HTTPCallout / HTTPCalloutAllSettled / HTTPCalloutSequence — callback form. The filter stops the request, Envoy sends outbound HTTP requests to named clusters, and the user-supplied callback runs when the response, response group, or sequence arrives. The callback may queue mutations (SetRequestHeader, etc.) or send a local response. Transit then applies those mutations and resumes (or terminates) the stream. This is the only path that supports SendLocalResponse reliably, because the callback runs from a filter callback, not from a scheduler, and Envoy only honours SendLocalResponse from filter callbacks.
Go + Do — goroutine form. The handler calls w.Go(fn); fn runs in a goroutine and may call w.Do(...) to issue callouts from that goroutine. After fn returns, Transit hops back to the Envoy worker thread via goScheduler.Schedule and applies queued mutations, then continues the request. SendLocalResponse from this path is NOT reliable — Envoy ignores it from scheduled callbacks. Use Go+Do only for work that forwards the request.
Both paths are mutex-free by design. See the calloutState comment in writer.go for the full concurrency model.
Package up — stream_object.go: Primitive A typed per-stream object handoff.
Why the drain lives in OnStreamComplete (for filters without WithOnStreamFinalized) / finalizedLogger.OnLog (for filters with it):
OnStreamComplete fires unconditionally for every stream Envoy terminates — normal end-of-stream, client disconnect, idle/request timeout, foreign local reply, stream reset. This is exactly the "teardown matrix" described in docs/orange-token-correlation-risks.md. For filters that also use WithOnStreamFinalized, the access logger's OnLog fires AFTER OnStreamComplete, so drain is deferred to OnLog so finalized cleanup runs before the SDK removes stream-scoped objects. Drain order:
user OnStreamComplete → user OnStreamFinalized → dropBag
Index ¶
- Constants
- func Register(name string, h HandlerFunc, opts ...FilterOption)
- func RegisterAccessLogger(name string, f AccessLoggerConfigFactory)
- func RegisterCluster(name string, f ClusterFactory)
- func RegisterLBPolicy(name string, f LBPolicyFactory)
- func ResponseFlagsString(mask uint64) string
- func RunRetry(ctx context.Context, label string, fn func(ctx context.Context) error)
- func StartFileWatch[T any](p *PipelineConfig[T], path string) func()
- func TruncateBody(data []byte, max int) []byte
- type AccessLogType
- type AccessLogger
- type AccessLoggerConfigFactory
- type AccessLoggerConfigHandle
- type AccessLoggerFactory
- type AccessLoggerHandle
- type AdminServer
- type AdminServerOptions
- type AsyncHostSelector
- type AttributeID
- type BaseCluster
- type BodyChunk
- type Buffer
- type BytesInfo
- type Cluster
- type ClusterConfigFactory
- type ClusterFactory
- type ClusterFactoryWithMetrics
- type ClusterGroup
- type ClusterHandle
- type ClusterLB
- type ClusterLBCompletion
- type ClusterLBContext
- type ClusterLBHandle
- type ClusterMetrics
- type ConfigDecoder
- type ConfigEvent
- type ConfigFunc
- type ConfigHandle
- type ConfigSource
- type EmptyAccessLogger
- type EmptyClusterLB
- type EmptyLBPolicy
- type ExchangeHooks
- type FilterOption
- func WithAdminServer(a *AdminServer) FilterOption
- func WithAttributes(kvs ...any) FilterOption
- func WithBody(rb RequestBodyHandlerFunc) FilterOption
- func WithConfig(fn ConfigFunc) FilterOption
- func WithExchangeObserver[T any](hooks ExchangeHooks[T]) []FilterOption
- func WithGroup(g *Group) FilterOption
- func WithLogMetadata(ns string) FilterOption
- func WithMutableBody(rb RequestBodyHandlerFunc) FilterOption
- func WithOnStreamComplete(fn OnStreamCompleteFunc) FilterOption
- func WithOnStreamFinalized(fn OnStreamFinalizedFunc) FilterOption
- func WithResponse(r ResponseHandlerFunc) FilterOption
- func WithSidecar(s *Sidecar) FilterOption
- func WithStreamingBody(rb RequestBodyHandlerFunc) FilterOption
- type FinalizedInfo
- type GRPCCalloutFunc
- type GRPCCalloutRequest
- type GRPCCalloutResponse
- type Group
- type HTTPCalloutAllSettledFunc
- type HTTPCalloutAllSettledResponse
- type HTTPCalloutFunc
- type HTTPCalloutInitResult
- type HTTPCalloutRequest
- type HTTPCalloutResponse
- type HTTPCalloutResult
- type HTTPCalloutSequenceDoneFunc
- type HTTPCalloutSequenceNextFunc
- type HandlerFunc
- type HostEntry
- type HostHealth
- type HostPtr
- type HostRefreshEvent
- type HostRefreshLoop
- type HostRefreshObserver
- type HostRefreshOptions
- type HostResult
- type HostSet
- type HostSnapshot
- type HostSnapshotFunc
- type HostSpec
- type HostStat
- type HttpHeaderType
- type LBContext
- type LBHandle
- type LBPolicy
- type LBPolicyConfigFactory
- type LBPolicyFactory
- type LogLevel
- type MetadataSource
- type MetricID
- type Middleware
- type OnStreamCompleteFunc
- type OnStreamFinalizedFunc
- type PipelineConfig
- type PollOptions
- type Request
- type RequestBodyHandlerFunc
- type ResponseChunk
- type ResponseHandlerFunc
- type Router
- func (r *Router) DELETE(path string, handler func(*Writer, *Request)) *Router
- func (r *Router) DELETEPrefix(prefix string, handler func(*Writer, *Request)) *Router
- func (r *Router) Dispatch(w *Writer, req *Request)
- func (r *Router) GET(path string, handler func(*Writer, *Request)) *Router
- func (r *Router) GETPrefix(prefix string, handler func(*Writer, *Request)) *Router
- func (r *Router) Handle(method, path string, handler func(*Writer, *Request)) *Router
- func (r *Router) HandlePrefix(method, pathPrefix string, handler func(*Writer, *Request)) *Router
- func (r *Router) POST(path string, handler func(*Writer, *Request)) *Router
- func (r *Router) POSTPrefix(prefix string, handler func(*Writer, *Request)) *Router
- type SelectorObserver
- type Sidecar
- type SidecarOptions
- type SidecarSessionEvent
- type StreamKey
- type StreamPromise
- type TimingInfo
- type Writer
- func (w *Writer) AddLogAttrs(kvs ...any)
- func (w *Writer) AddRequestHeader(name, value string)
- func (w *Writer) AddResponseHeader(name, value string)
- func (w *Writer) DecrementGauge(id MetricID, delta uint64)
- func (w *Writer) Do(ctx context.Context, req HTTPCalloutRequest) (*HTTPCalloutResponse, error)
- func (w *Writer) GRPCCallout(req GRPCCalloutRequest, fn GRPCCalloutFunc) (HTTPCalloutInitResult, error)
- func (w *Writer) GetActiveSpan() shared.Span
- func (w *Writer) GetAttributeBool(id AttributeID) (bool, bool)
- func (w *Writer) GetAttributeNumber(id AttributeID) (float64, bool)
- func (w *Writer) GetAttributeString(id AttributeID) (Buffer, bool)
- func (w *Writer) GetFilterState(key string) (string, bool)
- func (w *Writer) GetMetadataBool(source MetadataSource, ns, key string) (bool, bool)
- func (w *Writer) GetMetadataNumber(source MetadataSource, ns, key string) (float64, bool)
- func (w *Writer) GetMetadataString(source MetadataSource, ns, key string) (Buffer, bool)
- func (w *Writer) GetStreamObject(key string) (any, bool)
- func (w *Writer) Go(fn func(ctx context.Context))
- func (w *Writer) HTTPCallout(req HTTPCalloutRequest, fn HTTPCalloutFunc) (HTTPCalloutInitResult, error)
- func (w *Writer) HTTPCalloutAllSettled(reqs []HTTPCalloutRequest, fn HTTPCalloutAllSettledFunc) error
- func (w *Writer) HTTPCalloutSequence(next HTTPCalloutSequenceNextFunc, done HTTPCalloutSequenceDoneFunc) error
- func (w *Writer) IncrementCounter(id MetricID, delta uint64)
- func (w *Writer) IncrementCounterLabels(id MetricID, delta uint64, labelValues ...string)
- func (w *Writer) IncrementGauge(id MetricID, delta uint64)
- func (w *Writer) Log(level LogLevel, format string, args ...any)
- func (w *Writer) RecordHistogram(id MetricID, value uint64)
- func (w *Writer) RecordHistogramLabels(id MetricID, value uint64, labelValues ...string)
- func (w *Writer) RemoveRequestHeader(name string)
- func (w *Writer) RemoveResponseHeader(name string)
- func (w *Writer) RequestHeader(name string) string
- func (w *Writer) RequestHeaders() [][2]string
- func (w *Writer) SendLocalResponse(status int, body []byte, headers ...[2]string)
- func (w *Writer) SetFilterState(key, value string)
- func (w *Writer) SetGauge(id MetricID, value uint64)
- func (w *Writer) SetMetadata(ns, key string, value any)
- func (w *Writer) SetRequestBody(data []byte)
- func (w *Writer) SetRequestHeader(name, value string)
- func (w *Writer) SetResponseBody(data []byte)
- func (w *Writer) SetResponseHeader(name, value string)
- func (w *Writer) SetStreamObject(key string, v any)
- func (w *Writer) SetTypedMetadata(ns, key string, value any)
- func (w *Writer) SetUpstreamOverrideHost(host string, strict bool) bool
- func (w *Writer) Slog() *slog.Logger
Constants ¶
const ( AccessLogTypeNotSet = down.AccessLogTypeNotSet AccessLogTypeTcpUpstreamConnected = down.AccessLogTypeTcpUpstreamConnected AccessLogTypeTcpPeriodic = down.AccessLogTypeTcpPeriodic AccessLogTypeTcpConnectionEnd = down.AccessLogTypeTcpConnectionEnd AccessLogTypeDownstreamStart = down.AccessLogTypeDownstreamStart AccessLogTypeDownstreamPeriodic = down.AccessLogTypeDownstreamPeriodic AccessLogTypeDownstreamEnd = down.AccessLogTypeDownstreamEnd AccessLogTypeUpstreamPoolReady = down.AccessLogTypeUpstreamPoolReady AccessLogTypeUpstreamPeriodic = down.AccessLogTypeUpstreamPeriodic AccessLogTypeUpstreamEnd = down.AccessLogTypeUpstreamEnd AccessLogTypeDownstreamTunnelSuccessfullyEstablished = down.AccessLogTypeDownstreamTunnelSuccessfullyEstablished AccessLogTypeUdpTunnelUpstreamConnected = down.AccessLogTypeUdpTunnelUpstreamConnected AccessLogTypeUdpPeriodic = down.AccessLogTypeUdpPeriodic AccessLogTypeUdpSessionEnd = down.AccessLogTypeUdpSessionEnd )
AccessLogType constants.
const ( HttpHeaderTypeRequest = down.HttpHeaderTypeRequest HttpHeaderTypeRequestTrailer = down.HttpHeaderTypeRequestTrailer HttpHeaderTypeResponse = down.HttpHeaderTypeResponse HttpHeaderTypeResponseTrailer = down.HttpHeaderTypeResponseTrailer )
HttpHeaderType constants.
const ( HostUnhealthy = down.HostUnhealthy HostDegraded = down.HostDegraded HostHealthy = down.HostHealthy )
const ( HostStatCxConnectFail = down.HostStatCxConnectFail HostStatCxTotal = down.HostStatCxTotal HostStatRqError = down.HostStatRqError HostStatRqSuccess = down.HostStatRqSuccess HostStatRqTimeout = down.HostStatRqTimeout HostStatRqTotal = down.HostStatRqTotal HostStatCxActive = down.HostStatCxActive HostStatRqActive = down.HostStatRqActive )
const ( // DefaultPollInterval is used when PollOptions.Interval is zero. DefaultPollInterval = 30 * time.Second // DefaultPollTimeout is used when PollOptions.Timeout is zero. DefaultPollTimeout = 5 * time.Second )
const ( HeaderAuthority = ":authority" HeaderMethod = ":method" HeaderPath = ":path" HeaderScheme = ":scheme" )
HTTP/2 pseudo-header names used by Envoy. Prefer these constants over bare string literals when calling SetRequestHeader.
const ( ResponseFlagFailedLocalHealthCheck = "LH" ResponseFlagNoHealthyUpstream = "UH" ResponseFlagUpstreamRequestTimeout = "UT" ResponseFlagLocalReset = "LR" ResponseFlagUpstreamRemoteReset = "UR" ResponseFlagUpstreamConnectionFailure = "UF" ResponseFlagUpstreamConnectionTermination = "UC" ResponseFlagUpstreamOverflow = "UO" ResponseFlagNoRouteFound = "NR" ResponseFlagDelayInjected = "DI" ResponseFlagFaultInjected = "FI" ResponseFlagRateLimited = "RL" ResponseFlagRateLimitServiceError = "RLSE" ResponseFlagDownstreamConnectionTermination = "DC" ResponseFlagUpstreamRetryLimitExceeded = "URX" ResponseFlagStreamIdleTimeout = "SI" ResponseFlagInvalidRequestHeaders = "IH" ResponseFlagDownstreamProtocolError = "DPE" ResponseFlagUpstreamMaxStreamDuration = "UMSDR" ResponseFlagResponseFromCacheFilter = "RFCF" ResponseFlagNoFilterConfigFound = "NFCF" ResponseFlagDurationTimeout = "DT" ResponseFlagUpstreamProtocolError = "UPE" ResponseFlagNoClusterFound = "NC" ResponseFlagOverloadManager = "OM" )
Response flag constants match Envoy's %RESPONSE_FLAGS% access log format. ResponseFlagsString converts the bitmask from AccessLoggerHandle.GetResponseFlags() to a comma-separated string using these tokens (e.g. "UF,UT").
const DefaultRefreshInterval = 30 * time.Second
DefaultRefreshInterval is the interval used when HostRefreshOptions.Interval is zero.
const DefaultRefreshTimeout = time.Second
DefaultRefreshTimeout is the per-fetch context timeout used when HostRefreshOptions.Timeout is zero.
Variables ¶
This section is empty.
Functions ¶
func Register ¶
func Register(name string, h HandlerFunc, opts ...FilterOption)
Register registers a named HTTP filter handler and wires it into the Envoy SDK. Must be called from an init() function. Panics on duplicate names.
Optional features are configured via FilterOptions: WithConfig, WithResponse, WithStreamingBody, WithMutableBody, WithGroup, WithOnStreamComplete.
func RegisterAccessLogger ¶
func RegisterAccessLogger(name string, f AccessLoggerConfigFactory)
RegisterAccessLogger registers a named access logger factory. Must be called from an init() function. Panics on duplicate names.
func RegisterCluster ¶
func RegisterCluster(name string, f ClusterFactory)
RegisterCluster registers a named ClusterFactory. Must be called from an init() function. Panics on duplicate names.
func RegisterLBPolicy ¶
func RegisterLBPolicy(name string, f LBPolicyFactory)
RegisterLBPolicy registers a named LBPolicyFactory. Must be called from an init() function. Panics on duplicate names.
func ResponseFlagsString ¶
ResponseFlagsString converts the GetResponseFlags() bitmask to Envoy's human-readable flag string (e.g. "UF,UT"), matching %RESPONSE_FLAGS%.
func RunRetry ¶
RunRetry calls fn in a loop until ctx is cancelled. When fn returns a non-nil error and the context is still active, the error is logged with label as a prefix and fn is retried immediately. When fn returns nil the loop continues regardless — fn should only return nil on a clean shutdown.
Use this inside a ClusterGroup.Go goroutine to wrap a streaming RPC receive loop so that transient stream failures retry automatically:
cg.Go(func(ctx context.Context) {
up.RunRetry(ctx, "catalog-watch", func(ctx context.Context) error {
stream, err := client.Watch(ctx, connect.NewRequest(&req))
if err != nil {
return err
}
for stream.Receive() {
handle(stream.Msg())
}
return stream.Err()
})
})
func StartFileWatch ¶
func StartFileWatch[T any](p *PipelineConfig[T], path string) func()
StartFileWatch enables file system watching for the given config file path. When the file is modified, an immediate refresh is triggered. Returns a stop function that cancels watching; safe to call multiple times. If watching cannot be set up, returns a no-op function and logs the error via the config's observer.
func TruncateBody ¶
TruncateBody returns data[:max] if len(data) > max, otherwise data unchanged.
Types ¶
type AccessLogType ¶
type AccessLogType = down.AccessLogType
type AccessLogger ¶
type AccessLogger interface {
OnLog(handle AccessLoggerHandle, logType AccessLogType)
OnDestroy()
}
AccessLogger is the per-worker-thread logger instance.
type AccessLoggerConfigFactory ¶
type AccessLoggerConfigFactory interface {
Create(handle AccessLoggerConfigHandle, config []byte) (AccessLoggerFactory, error)
}
AccessLoggerConfigFactory parses the logger config and creates factories.
type AccessLoggerConfigHandle ¶
type AccessLoggerConfigHandle interface {
Log(level LogLevel, format string, args ...any)
DefineCounter(name string, tagKeys ...string) (MetricID, error)
DefineGauge(name string, tagKeys ...string) (MetricID, error)
DefineHistogram(name string, tagKeys ...string) (MetricID, error)
}
AccessLoggerConfigHandle is passed to AccessLoggerConfigFactory.Create on the main thread. Use it to define Envoy metrics during initialization.
type AccessLoggerFactory ¶
type AccessLoggerFactory interface {
NewLogger() AccessLogger
OnDestroy()
}
AccessLoggerFactory creates AccessLogger instances, one per worker thread.
type AccessLoggerHandle ¶
type AccessLoggerHandle interface {
GetTimingInfo() TimingInfo
GetBytesInfo() BytesInfo
GetResponseFlags() uint64
GetResponseCode() uint32
GetAttributeString(id AttributeID) (Buffer, bool)
GetAttributeInt(id AttributeID) (int64, bool)
GetAttributeBool(id AttributeID) (bool, bool)
GetHeader(headerType HttpHeaderType, key string) (Buffer, bool)
GetWorkerIndex() uint32
GetTraceID() (Buffer, bool)
GetSpanID() (Buffer, bool)
IsTraceSampled() bool
GetLocalReplyBody() (Buffer, bool)
GetUpstreamPoolReadyDurationNs() int64
GetUpstreamRequestAttemptCount() uint32
Log(level LogLevel, format string, args ...any)
}
AccessLoggerHandle provides access to finalized stream state during OnLog. Values backed by Envoy memory are returned as Buffer.
type AdminServer ¶
type AdminServer struct {
// contains filtered or unexported fields
}
AdminServer is a local-only HTTP server for operational debug endpoints. Register handlers before passing it to WithAdminServer.
Example:
admin := up.NewAdminServer(up.AdminServerOptions{})
admin.RegisterPprof()
up.Register("my-filter", handler, up.WithAdminServer(admin))
func NewAdminServer ¶
func NewAdminServer(opts AdminServerOptions) *AdminServer
NewAdminServer creates an AdminServer. Call RegisterPprof or Handle/HandleFunc to add endpoints, then pass to WithAdminServer.
func (*AdminServer) Handle ¶
func (a *AdminServer) Handle(pattern string, h http.Handler)
Handle registers a handler for the given pattern.
func (*AdminServer) HandleFunc ¶
func (a *AdminServer) HandleFunc(pattern string, fn http.HandlerFunc)
HandleFunc registers a handler function for the given pattern.
func (*AdminServer) ListenAddr ¶
func (a *AdminServer) ListenAddr() string
ListenAddr returns the resolved listen address. Valid after Ready() closes.
func (*AdminServer) Ready ¶
func (a *AdminServer) Ready() <-chan struct{}
Ready returns a channel closed after net.Listen succeeds.
func (*AdminServer) RegisterPprof ¶
func (a *AdminServer) RegisterPprof()
RegisterPprof adds the standard /debug/pprof/ suite to the admin mux. Explicit registration on the admin's own mux avoids polluting http.DefaultServeMux, which in an Envoy .so plugin risks colliding with Envoy's own Go runtime.
type AdminServerOptions ¶
type AdminServerOptions struct {
// ListenAddr defaults to "127.0.0.1:6060".
ListenAddr string
ShutdownTimeout time.Duration
}
AdminServerOptions configures the admin HTTP server.
type AsyncHostSelector ¶
type AsyncHostSelector[T any] struct { // contains filtered or unexported fields }
AsyncHostSelector[T] implements the ChooseHost + CancelHostSelection pair for body-driven async host selection.
Usage: embed or delegate from a ClusterLB implementation.
- ChooseHost: call s.ChooseHost(handle, ctx)
- CancelHostSelection: call s.Cancel(completion)
T is the decision type resolved by the body filter (e.g. a struct with provider name, model, error code). lookup maps a T to a HostResult.
The OnResolve callback may fire from a worker goroutine; AsyncHostSelector always marshals completion.Complete back to the main thread via handle.Schedule — the canonical example of the main-thread contract. See .agents/skills/transit-cluster-main-thread/SKILL.md.
func NewAsyncHostSelector ¶
func NewAsyncHostSelector[T any]( handle ClusterHandle, key StreamKey[*StreamPromise[T]], lookup func(T) HostResult, obs SelectorObserver, ) *AsyncHostSelector[T]
NewAsyncHostSelector creates an AsyncHostSelector that reads promises from key and resolves hosts via lookup. handle is the ClusterHandle used for Schedule. obs may be zero (all nil fields are no-ops).
func (*AsyncHostSelector[T]) Cancel ¶
func (s *AsyncHostSelector[T]) Cancel(completion *ClusterLBCompletion)
Cancel implements the CancelHostSelection half of ClusterLB. Prevents the pending OnResolve callback from calling Complete.
func (*AsyncHostSelector[T]) ChooseHost ¶
func (s *AsyncHostSelector[T]) ChooseHost(_ ClusterLBHandle, ctx ClusterLBContext) (HostPtr, *ClusterLBCompletion)
ChooseHost implements the ChooseHost half of ClusterLB. Returns (nil, completion) for async resolution, or (nil, nil) when no promise is found.
type AttributeID ¶
type AttributeID uint32
AttributeID identifies a stream attribute readable via Writer.GetAttributeString etc.
const ( AttributeIDRequestPath AttributeID = AttributeID(shared.AttributeIDRequestPath) AttributeIDRequestUrlPath AttributeID = AttributeID(shared.AttributeIDRequestUrlPath) AttributeIDRequestHost AttributeID = AttributeID(shared.AttributeIDRequestHost) AttributeIDRequestScheme AttributeID = AttributeID(shared.AttributeIDRequestScheme) AttributeIDRequestMethod AttributeID = AttributeID(shared.AttributeIDRequestMethod) AttributeIDRequestHeaders AttributeID = AttributeID(shared.AttributeIDRequestHeaders) AttributeIDRequestReferer AttributeID = AttributeID(shared.AttributeIDRequestReferer) AttributeIDRequestUserAgent AttributeID = AttributeID(shared.AttributeIDRequestUserAgent) AttributeIDRequestTime AttributeID = AttributeID(shared.AttributeIDRequestTime) AttributeIDRequestId AttributeID = AttributeID(shared.AttributeIDRequestId) AttributeIDRequestProtocol AttributeID = AttributeID(shared.AttributeIDRequestProtocol) AttributeIDRequestQuery AttributeID = AttributeID(shared.AttributeIDRequestQuery) AttributeIDRequestDuration AttributeID = AttributeID(shared.AttributeIDRequestDuration) AttributeIDRequestSize AttributeID = AttributeID(shared.AttributeIDRequestSize) AttributeIDRequestTotalSize AttributeID = AttributeID(shared.AttributeIDRequestTotalSize) )
Request attributes.
const ( AttributeIDResponseCode AttributeID = AttributeID(shared.AttributeIDResponseCode) AttributeIDResponseCodeDetails AttributeID = AttributeID(shared.AttributeIDResponseCodeDetails) AttributeIDResponseFlags AttributeID = AttributeID(shared.AttributeIDResponseFlags) AttributeIDResponseGrpcStatus AttributeID = AttributeID(shared.AttributeIDResponseGrpcStatus) AttributeIDResponseHeaders AttributeID = AttributeID(shared.AttributeIDResponseHeaders) AttributeIDResponseTrailers AttributeID = AttributeID(shared.AttributeIDResponseTrailers) AttributeIDResponseSize AttributeID = AttributeID(shared.AttributeIDResponseSize) AttributeIDResponseTotalSize AttributeID = AttributeID(shared.AttributeIDResponseTotalSize) AttributeIDResponseBackendLatency AttributeID = AttributeID(shared.AttributeIDResponseBackendLatency) )
Response attributes.
const ( AttributeIDSourceAddress AttributeID = AttributeID(shared.AttributeIDSourceAddress) AttributeIDSourcePort AttributeID = AttributeID(shared.AttributeIDSourcePort) AttributeIDDestinationAddress AttributeID = AttributeID(shared.AttributeIDDestinationAddress) AttributeIDDestinationPort AttributeID = AttributeID(shared.AttributeIDDestinationPort) AttributeIDConnectionId AttributeID = AttributeID(shared.AttributeIDConnectionId) // MTLS / TLS — acronyms use all-caps per Go conventions (changed in SDK 1.39-dev). AttributeIDConnectionMTLS AttributeID = AttributeID(shared.AttributeIDConnectionMTLS) AttributeIDConnectionTLSVersion AttributeID = AttributeID(shared.AttributeIDConnectionTLSVersion) AttributeIDConnectionRequestedServerName AttributeID = AttributeID(shared.AttributeIDConnectionRequestedServerName) AttributeIDConnectionSubjectLocalCertificate AttributeID = AttributeID(shared.AttributeIDConnectionSubjectLocalCertificate) AttributeIDConnectionSubjectPeerCertificate AttributeID = AttributeID(shared.AttributeIDConnectionSubjectPeerCertificate) AttributeIDConnectionDNSSanLocalCertificate AttributeID = AttributeID(shared.AttributeIDConnectionDNSSanLocalCertificate) AttributeIDConnectionDNSSanPeerCertificate AttributeID = AttributeID(shared.AttributeIDConnectionDNSSanPeerCertificate) AttributeIDConnectionURISanLocalCertificate AttributeID = AttributeID(shared.AttributeIDConnectionURISanLocalCertificate) AttributeIDConnectionURISanPeerCertificate AttributeID = AttributeID(shared.AttributeIDConnectionURISanPeerCertificate) AttributeIDConnectionSha256PeerCertificateDigest AttributeID = AttributeID(shared.AttributeIDConnectionSha256PeerCertificateDigest) AttributeIDConnectionTransportFailureReason AttributeID = AttributeID(shared.AttributeIDConnectionTransportFailureReason) AttributeIDConnectionTerminationDetails AttributeID = AttributeID(shared.AttributeIDConnectionTerminationDetails) )
Source / destination / connection attributes.
const ( AttributeIDUpstreamAddress AttributeID = AttributeID(shared.AttributeIDUpstreamAddress) AttributeIDUpstreamPort AttributeID = AttributeID(shared.AttributeIDUpstreamPort) AttributeIDUpstreamLocalAddress AttributeID = AttributeID(shared.AttributeIDUpstreamLocalAddress) AttributeIDUpstreamTransportFailureReason AttributeID = AttributeID(shared.AttributeIDUpstreamTransportFailureReason) AttributeIDUpstreamRequestAttemptCount AttributeID = AttributeID(shared.AttributeIDUpstreamRequestAttemptCount) AttributeIDUpstreamCxPoolReadyDuration AttributeID = AttributeID(shared.AttributeIDUpstreamCxPoolReadyDuration) AttributeIDUpstreamLocality AttributeID = AttributeID(shared.AttributeIDUpstreamLocality) // TLS / DNS / URI — acronyms use all-caps per Go conventions (changed in SDK 1.39-dev). AttributeIDUpstreamTLSVersion AttributeID = AttributeID(shared.AttributeIDUpstreamTLSVersion) AttributeIDUpstreamSubjectLocalCertificate AttributeID = AttributeID(shared.AttributeIDUpstreamSubjectLocalCertificate) AttributeIDUpstreamSubjectPeerCertificate AttributeID = AttributeID(shared.AttributeIDUpstreamSubjectPeerCertificate) AttributeIDUpstreamDNSSanLocalCertificate AttributeID = AttributeID(shared.AttributeIDUpstreamDNSSanLocalCertificate) AttributeIDUpstreamDNSSanPeerCertificate AttributeID = AttributeID(shared.AttributeIDUpstreamDNSSanPeerCertificate) AttributeIDUpstreamURISanLocalCertificate AttributeID = AttributeID(shared.AttributeIDUpstreamURISanLocalCertificate) AttributeIDUpstreamURISanPeerCertificate AttributeID = AttributeID(shared.AttributeIDUpstreamURISanPeerCertificate) AttributeIDUpstreamSha256PeerCertificateDigest AttributeID = AttributeID(shared.AttributeIDUpstreamSha256PeerCertificateDigest) )
Upstream attributes.
const ( AttributeIDXdsNode AttributeID = AttributeID(shared.AttributeIDXdsNode) AttributeIDXdsClusterName AttributeID = AttributeID(shared.AttributeIDXdsClusterName) AttributeIDXdsClusterMetadata AttributeID = AttributeID(shared.AttributeIDXdsClusterMetadata) AttributeIDXdsFilterChainName AttributeID = AttributeID(shared.AttributeIDXdsFilterChainName) AttributeIDXdsListenerDirection AttributeID = AttributeID(shared.AttributeIDXdsListenerDirection) AttributeIDXdsListenerMetadata AttributeID = AttributeID(shared.AttributeIDXdsListenerMetadata) AttributeIDXdsRouteMetadata AttributeID = AttributeID(shared.AttributeIDXdsRouteMetadata) AttributeIDXdsRouteName AttributeID = AttributeID(shared.AttributeIDXdsRouteName) AttributeIDXdsVirtualHostName AttributeID = AttributeID(shared.AttributeIDXdsVirtualHostName) AttributeIDXdsVirtualHostMetadata AttributeID = AttributeID(shared.AttributeIDXdsVirtualHostMetadata) AttributeIDXdsUpstreamHostMetadata AttributeID = AttributeID(shared.AttributeIDXdsUpstreamHostMetadata) )
XDS / metadata attributes.
const (
AttributeIDHealthCheck AttributeID = AttributeID(shared.AttributeIDHealthCheck)
)
Health check attribute.
type BaseCluster ¶
type BaseCluster struct{}
BaseCluster provides no-op implementations of all Cluster lifecycle methods except [Cluster.NewClusterLB], which must always be implemented by the embedder because [ClusterLB.ChooseHost] has no sensible default.
Embed BaseCluster to only override the methods your cluster actually needs:
type myCluster struct {
up.BaseCluster
bg up.ClusterGroup
// ...
}
func (c *myCluster) NewClusterLB() up.ClusterLB { return &myLB{} }
func (c *myCluster) Init(h up.ClusterHandle) {
// add initial hosts, then:
h.PreInitComplete()
}
func (c *myCluster) ServerInitialized(_ up.ClusterHandle) {
c.bg.Go(func(ctx context.Context) { c.watch(ctx) })
c.bg.Start()
}
func (c *myCluster) Shutdown(_ up.ClusterHandle, done func()) {
c.bg.Stop()
done()
}
If neither Init nor ServerInitialized is overridden, BaseCluster.Init calls PreInitComplete (the cluster starts with no hosts) and BaseCluster.Shutdown calls done() immediately.
func (BaseCluster) DrainStarted ¶
func (BaseCluster) DrainStarted(_ ClusterHandle)
DrainStarted is a no-op.
func (BaseCluster) Init ¶
func (BaseCluster) Init(h ClusterHandle)
Init calls PreInitComplete with no hosts. Override to add initial hosts before signalling Envoy that init is complete.
func (BaseCluster) ServerInitialized ¶
func (BaseCluster) ServerInitialized(_ ClusterHandle)
ServerInitialized is a no-op. Override to start background goroutines via ClusterGroup.Start or to run synchronous cold-start work before Envoy workers begin accepting traffic.
func (BaseCluster) Shutdown ¶
func (BaseCluster) Shutdown(_ ClusterHandle, done func())
Shutdown calls done immediately with no cleanup. Override to stop ClusterGroup goroutines (or other background work) before calling done.
type BodyChunk ¶
type BodyChunk struct {
Data []byte
EndStream bool
ContentEncoding string
ContentType string
Context *any // same per-stream slot as ResponseChunk.Context
}
BodyChunk is passed to a RequestBodyHandlerFunc on every request body event.
In streaming mode the handler is called once per chunk as data arrives. In buffered mode (WithMutableBody) the handler is called exactly once with the full accumulated body when EndStream is true.
The handler is also called synthetically with Data: nil, EndStream: true when the request has no body (GET, DELETE, HEAD, etc.) so body-dependent logic always has a single completion point.
ContentEncoding and ContentType are populated automatically from the request's Content-Encoding and Content-Type headers.
type Buffer ¶
type Buffer struct {
// Len is the number of bytes available in the buffer.
Len uint64
// contains filtered or unexported fields
}
Buffer is a borrowed buffer owned by Envoy.
Copy the data with Bytes or String before retaining it beyond the current callback. UnsafeBytes and UnsafeString avoid the copy and are only valid while Envoy keeps the underlying memory alive.
func (Buffer) UnsafeBytes ¶
UnsafeBytes returns a borrowed byte slice backed by Envoy memory.
func (Buffer) UnsafeString ¶
UnsafeString returns a borrowed string backed by Envoy memory.
type ClusterConfigFactory ¶
type ClusterConfigFactory = down.ClusterConfigFactory
type ClusterFactory ¶
type ClusterFactory = down.ClusterFactory
type ClusterFactoryWithMetrics ¶
type ClusterFactoryWithMetrics interface {
CreateWithMetrics(metrics ClusterMetrics, config []byte) (ClusterConfigFactory, error)
}
ClusterFactoryWithMetrics is an optional extension of ClusterFactory. When implemented, CreateWithMetrics is called instead of Create, providing a ClusterMetrics handle for defining Envoy metrics at config-load time.
type ClusterGroup ¶
type ClusterGroup struct {
// contains filtered or unexported fields
}
ClusterGroup manages background goroutines scoped to a cluster extension's lifecycle. Goroutines are registered with ClusterGroup.Go, launched at ClusterGroup.Start (called from [Cluster.ServerInitialized]), and stopped when ClusterGroup.Stop is called (called from [Cluster.Shutdown]).
This mirrors the Register + WithGroup pattern from the HTTP filter side: goroutines are declared alongside setup logic and lifecycle plumbing is handled for you.
type myCluster struct {
bg up.ClusterGroup
}
func (c *myCluster) ServerInitialized(_ up.ClusterHandle) {
c.bg.Go(func(ctx context.Context) {
up.RunRetry(ctx, "routes-watch", func(ctx context.Context) error {
return c.watchRoutes(ctx)
})
})
c.bg.Start()
}
func (c *myCluster) Shutdown(_ up.ClusterHandle, done func()) {
c.bg.Stop()
done()
}
func (*ClusterGroup) Go ¶
func (cg *ClusterGroup) Go(fn func(ctx context.Context))
Go registers a context-aware background goroutine. Must be called before ClusterGroup.Start. Panics if called after Start.
func (*ClusterGroup) Start ¶
func (cg *ClusterGroup) Start()
Start launches all registered goroutines. Call exactly once from [Cluster.ServerInitialized]. No-op when no goroutines have been registered.
func (*ClusterGroup) Stop ¶
func (cg *ClusterGroup) Stop()
Stop cancels all goroutines and waits for them to finish. Safe to call multiple times or before Start. Call from [Cluster.Shutdown] before invoking the done callback.
type ClusterHandle ¶
type ClusterHandle = down.ClusterHandle
ClusterHandle gives a Cluster access to Envoy cluster operations. All methods except Schedule must be called on the main thread — AddHosts, RemoveHosts, UpdateHostHealth, FindHostByAddress, and PreInitComplete will silently no-op (and log an envoy_bug error) if invoked from a background goroutine. Use Schedule to marshal mutations back to the main thread from refresh loops or async callbacks. See .agents/skills/transit-cluster-main-thread/SKILL.md for the pattern.
type ClusterLBCompletion ¶
type ClusterLBCompletion = down.ClusterLBCompletion
type ClusterLBContext ¶
type ClusterLBContext = down.ClusterLBContext
type ClusterLBHandle ¶
type ClusterLBHandle = down.ClusterLBHandle
type ClusterMetrics ¶
type ClusterMetrics interface {
DefineCounter(name string, tagKeys ...string) (MetricID, error)
DefineGauge(name string, tagKeys ...string) (MetricID, error)
DefineHistogram(name string, tagKeys ...string) (MetricID, error)
IncrementCounter(id MetricID, delta uint64, labelValues ...string) error
SetGauge(id MetricID, value uint64, labelValues ...string) error
IncrementGauge(id MetricID, delta uint64, labelValues ...string) error
DecrementGauge(id MetricID, delta uint64, labelValues ...string) error
RecordHistogram(id MetricID, value uint64, labelValues ...string) error
}
ClusterMetrics provides access to Envoy cluster metrics. Define metrics during ClusterFactoryWithMetrics.CreateWithMetrics on the main thread; record them at any time during the cluster's lifetime.
type ConfigDecoder ¶
ConfigDecoder[T] parses raw bytes into a typed snapshot. Called from the background polling goroutine; must be safe to call concurrently.
func JSONDecoder ¶
func JSONDecoder[T any]() ConfigDecoder[T]
JSONDecoder[T] decodes JSON bytes into T using encoding/json.
func YAMLDecoder ¶
func YAMLDecoder[T any]() ConfigDecoder[T]
YAMLDecoder[T] decodes YAML bytes into T using gopkg.in/yaml.v3.
type ConfigEvent ¶
type ConfigEvent struct {
Version string // opaque; hash or counter; empty on error
Duration time.Duration // fetch+decode wall time
Err error // non-nil means last-good snapshot was kept
}
ConfigEvent carries diagnostics from one refresh cycle.
type ConfigFunc ¶
type ConfigFunc func(h ConfigHandle) error
ConfigFunc is called once at filter config creation time on the main thread. Use it to define metrics via ConfigHandle.DefineCounter, etc.
type ConfigHandle ¶
type ConfigHandle interface {
// DefineCounter defines an Envoy counter metric. tagKeys are optional dimension names.
DefineCounter(name string, tagKeys ...string) (MetricID, error)
// DefineGauge defines an Envoy gauge metric. tagKeys are optional dimension names.
DefineGauge(name string, tagKeys ...string) (MetricID, error)
// DefineHistogram defines an Envoy histogram metric. tagKeys are optional dimension names.
DefineHistogram(name string, tagKeys ...string) (MetricID, error)
// FilterConfigBytes returns the raw bytes of the filter_config StringValue
// passed in the Envoy YAML. Returns nil when no config was provided.
FilterConfigBytes() []byte
}
ConfigHandle is passed to config callbacks at filter config creation time. Use it to define metrics once — before any requests arrive.
type ConfigSource ¶
ConfigSource fetches raw config bytes. May be called from a background goroutine. A non-nil error keeps the last-good snapshot.
type EmptyAccessLogger ¶
type EmptyAccessLogger struct{}
EmptyAccessLogger is a no-op base; embed it to skip unused methods.
func (*EmptyAccessLogger) OnDestroy ¶
func (e *EmptyAccessLogger) OnDestroy()
func (*EmptyAccessLogger) OnLog ¶
func (e *EmptyAccessLogger) OnLog(_ AccessLoggerHandle, _ AccessLogType)
type EmptyClusterLB ¶
type EmptyClusterLB = down.EmptyClusterLB
type EmptyLBPolicy ¶
type EmptyLBPolicy = down.EmptyLBPolicy
type ExchangeHooks ¶
type ExchangeHooks[T any] struct { // OnRequest initializes and returns the per-stream accumulator. // Called at request headers phase. OnRequest func(w *Writer, r *Request) T // OnResponse is called at response headers (chunk.StatusCode != 0, chunk.Data == nil) // and optionally response body (chunk.Data != nil). If nil, response phase is skipped. OnResponse func(st T, w *Writer, chunk *ResponseChunk) // OnFinalized receives the accumulated state and Envoy's FinalizedInfo. // Delivery semantics match WithOnStreamFinalized. OnFinalized func(st T, info FinalizedInfo) }
ExchangeHooks[T] carries typed callbacks for the three phases of an exchange.
type FilterOption ¶
type FilterOption func(*configFactory)
FilterOption configures filter registration. Pass options to Register.
func WithAdminServer ¶
func WithAdminServer(a *AdminServer) FilterOption
WithAdminServer returns a FilterOption that wires a into the filter's Group lifecycle. The server starts when Envoy loads the filter config and stops (with graceful shutdown) when Envoy destroys the factory.
func WithAttributes ¶
func WithAttributes(kvs ...any) FilterOption
WithAttributes pre-bakes key-value pairs into every log line emitted via Writer.Slog for this filter. kvs follows the same alternating key/value convention as log/slog.Logger.With: WithAttributes("scope", "match", "region", "us-west"). The filter name is always included automatically; only pass additional attributes here. Pairs with a non-string key are skipped.
func WithBody ¶
func WithBody(rb RequestBodyHandlerFunc) FilterOption
WithBody enables buffered, read-only request body handling. If rb is non-nil, it is called once with the full accumulated request body. Use this when a filter needs the finalized body for inspection or signing but must not replace the request body. Mutually exclusive with WithStreamingBody and WithMutableBody.
Unlike WithMutableBody, request content-length and transfer-encoding are preserved because the request body is not replaced.
func WithConfig ¶
func WithConfig(fn ConfigFunc) FilterOption
WithConfig attaches a config callback invoked once on the main thread when Envoy loads the filter config. Use it to define metrics via ConfigHandle.
func WithExchangeObserver ¶
func WithExchangeObserver[T any](hooks ExchangeHooks[T]) []FilterOption
WithExchangeObserver returns a set of FilterOptions that wire ExchangeHooks[T] into a filter registration. The returned options are passed directly to up.Register. The SDK owns the context slot and pool lifecycle; callers never touch *any.
func WithGroup ¶
func WithGroup(g *Group) FilterOption
WithGroup attaches a Group of background goroutines. The group is started when Envoy loads the filter config and stopped (via Group.Stop) when Envoy destroys the filter factory. The handler and the goroutines in g share state via closure — no package-level variables needed.
Note: if any goroutine in the group exits for any reason — including a normal return — the entire group is stopped via Group.Stop. Goroutines that must survive transient errors should loop internally or use RunRetry.
For background work in cluster extensions (which have no filter factory), use ClusterGroup with ClusterGroup.Start called from [Cluster.ServerInitialized] instead.
func WithLogMetadata ¶
func WithLogMetadata(ns string) FilterOption
WithLogMetadata sets the dynamic-metadata namespace that Writer.AddLogAttrs writes through to. When set, every w.AddLogAttrs("k", v) call also writes w.SetMetadata(ns, "k", v), making those attrs available to the Envoy access log via %DYNAMIC_METADATA(namespace:key)%.
Use a reverse-DNS namespace to avoid collisions:
up.WithLogMetadata("com.example.myfilter")
Unsupported value types (structs, slices, etc.) are serialised to their string representation rather than panicking — see Writer.AddLogAttrs.
func WithMutableBody ¶
func WithMutableBody(rb RequestBodyHandlerFunc) FilterOption
WithMutableBody enables buffered body handling. If rb is non-nil, it is called once with the full accumulated request body; pass nil to buffer the response body only (useful when WithResponse needs the full body but the request body is not of interest). Use Writer.SetRequestBody / SetResponseBody to replace body content. Mutually exclusive with WithStreamingBody and WithBody.
Upstream vs downstream body source ¶
In downstream (listener-side) filters, Envoy pre-fills BufferedRequestBody across StopAndBuffer calls so the handler always receives the complete body via BodyChunk.Data.
In upstream (cluster-side) filters, Envoy does NOT pre-fill BufferedRequestBody when endOfStream=true arrives on the very first body call — i.e. when no prior StopAndBuffer has occurred, which is the common case for small single-frame requests. The framework handles this automatically: BodyChunk.Data is always the full body regardless of which filter chain the filter runs in.
func WithOnStreamComplete ¶
func WithOnStreamComplete(fn OnStreamCompleteFunc) FilterOption
WithOnStreamComplete attaches a stream-termination callback to the filter. See OnStreamCompleteFunc for semantics.
func WithOnStreamFinalized ¶
func WithOnStreamFinalized(fn OnStreamFinalizedFunc) FilterOption
WithOnStreamFinalized attaches a callback that fires after Envoy finalizes the stream and delivers it through the access-logger path. See OnStreamFinalizedFunc for delivery semantics and the YAML requirements.
Coexists with WithOnStreamComplete: cleanup-only consumers should keep using OnStreamComplete; OnStreamFinalized is the right hook when the callback needs finalized durations, byte counts, or response flags.
func WithResponse ¶
func WithResponse(r ResponseHandlerFunc) FilterOption
WithResponse attaches a response observer.
func WithSidecar ¶
func WithSidecar(s *Sidecar) FilterOption
WithSidecar returns a FilterOption that wires s into a new Group and attaches it to the filter. The sidecar starts when Envoy loads the filter config and stops (with graceful shutdown) when Envoy destroys the factory.
func WithStreamingBody ¶
func WithStreamingBody(rb RequestBodyHandlerFunc) FilterOption
WithStreamingBody attaches a request body handler in streaming mode: the handler is called once per chunk as data arrives. For bodyless requests (GET etc.) the handler is called once with Data: nil. Mutually exclusive with WithBody and WithMutableBody.
type FinalizedInfo ¶
type FinalizedInfo struct {
Timing TimingInfo
Bytes BytesInfo
ResponseCode uint32
ResponseCodeDetails string
ResponseFlags uint64
UpstreamFailure string
UpstreamLocalAddress string
UpstreamAddress string
RequestProtocol string
UpstreamPoolReadyDurationNs int64
UpstreamRequestAttempts uint32
TraceID string
SpanID string
TraceSampled bool
LocalReplyBody string
}
FinalizedInfo holds finalized stream fields delivered to an OnStreamFinalizedFunc after Envoy completes the stream. The values mirror what an AccessLoggerHandle would expose at AccessLogTypeDownstreamEnd: durations, byte counts, response flags, upstream attempts, trace ids, and a local-reply body if one was sent.
All durations are in nanoseconds; -1 in a duration means the timing is unavailable. ResponseFlags is the raw bitmask — use ResponseFlagsString to render the human-readable form.
type GRPCCalloutFunc ¶
type GRPCCalloutFunc func(GRPCCalloutResponse)
GRPCCalloutFunc is invoked when a gRPC callout completes. The callback runs on the Envoy worker thread under the same constraints as HTTPCalloutFunc.
type GRPCCalloutRequest ¶
type GRPCCalloutRequest struct {
Cluster string
Authority string // host/:authority header; defaults to Cluster if empty
Method string // full gRPC path, e.g. "/envoy.service.ratelimit.v3.RateLimitService/ShouldRateLimit"
Message []byte // raw proto bytes, unframed
TimeoutMillis uint64
}
GRPCCalloutRequest carries parameters for an outbound gRPC unary callout. Message is the serialised proto body — no gRPC framing needed; GRPCCallout prepends the 5-byte length-prefix frame before sending.
type GRPCCalloutResponse ¶
type GRPCCalloutResponse struct {
Result HTTPCalloutResult
GRPCStatus uint32 // grpc-status value; 0 = OK
GRPCMessage string // grpc-message value
Body []byte // unframed proto bytes
}
GRPCCalloutResponse is delivered to GRPCCalloutFunc after the callout completes. Body is the unframed proto bytes (5-byte gRPC header stripped), ready to unmarshal. GRPCStatus and GRPCMessage are parsed from grpc-status / grpc-message response headers or trailers; successful responses may omit them, in which case status 0 is implied by Result==HTTPCalloutSuccess.
type Group ¶
type Group struct {
// contains filtered or unexported fields
}
Group manages a set of background goroutines sharing a common lifecycle. All goroutines start together and stop together when Group.Stop is called.
Register goroutines with Group.Add or Group.AddGoroutine, then call Group.Start exactly once. Call Group.Stop from your filter factory's OnDestroy (done automatically when registered via Register with WithGroup).
func (*Group) Add ¶
Add registers an actor with explicit execute and stop functions. execute blocks until the actor finishes; it runs in a background goroutine. stop must cause execute to return promptly — it is called from another goroutine. Panics inside execute are recovered rather than crashing the process.
func (*Group) AddGoroutine ¶
AddGoroutine registers a context-aware background function. The context is cancelled automatically when Group.Stop is called.
g.AddGoroutine(func(ctx context.Context) {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done(): return
case <-t.C: doWork()
}
}
})
func (*Group) Start ¶
func (g *Group) Start()
Start launches all registered goroutines. Call exactly once after all actors are registered. If any actor finishes for any reason (including normal return), Stop is called on the whole group. Actors that must survive transient errors should loop internally rather than returning.
type HTTPCalloutAllSettledFunc ¶
type HTTPCalloutAllSettledFunc func(responses []HTTPCalloutAllSettledResponse)
HTTPCalloutAllSettledFunc is invoked once after all accepted callouts in a group have completed and all init failures have been recorded.
The function runs from the final callout callback or synchronously when all callouts fail initialization. It may call Writer mutation methods, including SendLocalResponse.
type HTTPCalloutAllSettledResponse ¶
type HTTPCalloutAllSettledResponse struct {
Result HTTPCalloutResult
Init HTTPCalloutInitResult
Err error
Headers [][2]shared.UnsafeEnvoyBuffer
Body []shared.UnsafeEnvoyBuffer
}
HTTPCalloutAllSettledResponse is one response delivered to HTTPCalloutAllSettledFunc.
Headers and Body are Go-owned copies, because earlier callout callbacks may return before every callout has settled. Response slots preserve request order: responses[i] corresponds to the request at reqs[i].
Err is non-nil when Envoy rejected the callout at init time or when an accepted callout completed with a non-success result. Callers still receive Init and Result so they can distinguish init failures, resets, and buffer limit errors when choosing aggregate policy.
func (HTTPCalloutAllSettledResponse) Failed ¶
func (r HTTPCalloutAllSettledResponse) Failed() bool
Failed reports whether the response slot represents an init or callout error.
func (HTTPCalloutAllSettledResponse) OK ¶
func (r HTTPCalloutAllSettledResponse) OK() bool
OK reports whether the callout was accepted and completed successfully.
type HTTPCalloutFunc ¶
type HTTPCalloutFunc func(result HTTPCalloutResult, headers [][2]shared.UnsafeEnvoyBuffer, body []shared.UnsafeEnvoyBuffer)
HTTPCalloutFunc is the callback invoked when an Envoy HTTP callout completes.
LIFETIME: headers and body point into Envoy-owned memory that is only valid for the duration of this call. Copy any value that must outlive the callback. Specifically: do NOT send these slices to another goroutine or store them in a struct that outlives the function — Envoy may reuse or free that memory as soon as the callback returns.
The function runs on the Envoy worker thread (or the goroutine that called HttpCallout synchronously). It may call any Writer mutation method; those mutations are queued and applied by flush() after the callback returns.
type HTTPCalloutInitResult ¶
type HTTPCalloutInitResult uint32
HTTPCalloutInitResult reports whether Envoy accepted the callout request. A non-Success result means no callout was initiated and no callback will fire; the handler should treat it as an immediate error.
const ( HTTPCalloutInitSuccess HTTPCalloutInitResult = HTTPCalloutInitResult(shared.HttpCalloutInitSuccess) HTTPCalloutInitMissingRequiredHeaders HTTPCalloutInitResult = HTTPCalloutInitResult(shared.HttpCalloutInitMissingRequiredHeaders) HTTPCalloutInitClusterNotFound HTTPCalloutInitResult = HTTPCalloutInitResult(shared.HttpCalloutInitClusterNotFound) HTTPCalloutInitDuplicateCalloutID HTTPCalloutInitResult = HTTPCalloutInitResult(shared.HttpCalloutInitDuplicateCalloutId) HTTPCalloutInitCannotCreateRequest HTTPCalloutInitResult = HTTPCalloutInitResult(shared.HttpCalloutInitCannotCreateRequest) )
type HTTPCalloutRequest ¶
type HTTPCalloutRequest struct {
Cluster string
Headers [][2]string
Body []byte
TimeoutMillis uint64
}
HTTPCalloutRequest carries the parameters for an outbound Envoy HTTP callout. Cluster must name a cluster defined in the Envoy bootstrap config; if it does not exist, HTTPCallout returns HTTPCalloutInitClusterNotFound. Headers must include at minimum :method, :path, and host. Include :scheme when the upstream or route logic needs it. TimeoutMillis of 0 uses Envoy's default callout timeout.
type HTTPCalloutResponse ¶
type HTTPCalloutResponse struct {
Result HTTPCalloutResult
Headers [][2]shared.UnsafeEnvoyBuffer
Body []shared.UnsafeEnvoyBuffer
}
HTTPCalloutResponse is the value returned by Writer.Do.
Headers and Body are Go-owned copies made inside the callout callback before the response is sent across the goroutine channel. They are safe to read after Do returns, even though the underlying Envoy memory may have been freed.
For Writer.HTTPCallout (callback form), the buffers are NOT copied — they are passed directly to HTTPCalloutFunc and are only valid during that call.
type HTTPCalloutResult ¶
type HTTPCalloutResult uint32
HTTPCalloutResult is the terminal outcome of an accepted callout, delivered to HTTPCalloutFunc or via HTTPCalloutResponse.Result from Writer.Do.
const ( // HTTPCalloutSuccess means the upstream responded; headers and body are valid. HTTPCalloutSuccess HTTPCalloutResult = HTTPCalloutResult(shared.HttpCalloutSuccess) // HTTPCalloutReset means the connection was reset before a response arrived. // headers and body will be empty. HTTPCalloutReset HTTPCalloutResult = HTTPCalloutResult(shared.HttpCalloutReset) // HTTPCalloutExceedResponseBufferLimit means the response body was larger // than Envoy's configured callout buffer limit. headers may be present; // body will be truncated or empty. HTTPCalloutExceedResponseBufferLimit HTTPCalloutResult = HTTPCalloutResult(shared.HttpCalloutExceedResponseBufferLimit) )
type HTTPCalloutSequenceDoneFunc ¶
type HTTPCalloutSequenceDoneFunc func(responses []HTTPCalloutAllSettledResponse)
HTTPCalloutSequenceDoneFunc is invoked once when HTTPCalloutSequenceNextFunc stops the sequence. It runs from the callout callback path and may call Writer mutation methods, including SendLocalResponse.
type HTTPCalloutSequenceNextFunc ¶
type HTTPCalloutSequenceNextFunc func(attempt int, previous *HTTPCalloutAllSettledResponse) (req HTTPCalloutRequest, ok bool)
HTTPCalloutSequenceNextFunc decides the next request in a sequential callout chain. attempt is zero-based. previous is nil for attempt 0, then points to the response from the immediately preceding attempt. Return ok=false to stop the sequence and run HTTPCalloutSequenceDoneFunc with all collected responses.
type HandlerFunc ¶
HandlerFunc is called on every request.
func Chain ¶
func Chain(h HandlerFunc, mw ...Middleware) HandlerFunc
Chain wraps h with the given middleware in left-to-right order: the first middleware in the list is outermost (runs first).
type HostHealth ¶
type HostHealth = down.HostHealth
HostHealth and HostStat are shared with LB Policy; defined here for convenience.
type HostRefreshEvent ¶
type HostRefreshEvent struct {
// Duration covers the full fetch + apply round-trip.
Duration time.Duration
// Added, Removed, Unchanged count host changes for the cycle.
// All three are zero when Err is non-nil.
Added int
Removed int
Unchanged int
// Err is non-nil when the snapshot fetch returned an error.
Err error
}
HostRefreshEvent carries diagnostics from a single refresh cycle.
type HostRefreshLoop ¶
type HostRefreshLoop[K comparable] struct { // contains filtered or unexported fields }
HostRefreshLoop periodically refreshes a HostSet from a user-supplied snapshot function.
func NewHostRefreshLoop ¶
func NewHostRefreshLoop[K comparable]( handle ClusterHandle, set *HostSet[K], snapshot HostSnapshotFunc[K], opts HostRefreshOptions, ) *HostRefreshLoop[K]
NewHostRefreshLoop creates a HostRefreshLoop. Call Start to begin ticking.
func (*HostRefreshLoop[K]) RefreshOnce ¶
func (l *HostRefreshLoop[K]) RefreshOnce(ctx context.Context) error
RefreshOnce fetches a snapshot and schedules an apply via ClusterHandle.Schedule, blocking until the apply has run on the main thread (or until ctx is cancelled). Returns the snapshot fetch error, if any; on error the apply is not scheduled.
func (*HostRefreshLoop[K]) Start ¶
func (l *HostRefreshLoop[K]) Start()
Start launches the background ticker goroutine. It does not perform an immediate refresh — call RefreshOnce first if a warm host set is needed.
func (*HostRefreshLoop[K]) Stop ¶
func (l *HostRefreshLoop[K]) Stop()
Stop cancels the ticker, waits for any in-flight fetch to finish, and waits for any already-scheduled apply to complete before returning. After Stop returns no further Apply will run.
type HostRefreshObserver ¶
type HostRefreshObserver func(HostRefreshEvent)
HostRefreshObserver is called on the cluster main thread after each Apply (or on error).
type HostRefreshOptions ¶
type HostRefreshOptions struct {
// Interval between refreshes. Zero uses DefaultRefreshInterval.
Interval time.Duration
// Timeout for each snapshot fetch. Zero uses DefaultRefreshTimeout.
Timeout time.Duration
// Jitter adds a random ±Jitter offset to each tick interval.
Jitter time.Duration
// Observe is called on the cluster main thread after each Apply.
// Optional; nil disables the observer.
Observe HostRefreshObserver
}
HostRefreshOptions configures a HostRefreshLoop.
type HostResult ¶
type HostResult struct {
Host HostPtr // nil → use ErrDetail
ErrDetail string // non-empty → complete with error
}
HostResult is returned by the lookup function to resolve a host.
type HostSet ¶
type HostSet[K comparable] struct { // contains filtered or unexported fields }
HostSet manages a published, atomically-readable host map. Apply must be called on the cluster main thread. Get, Entry, and Current are safe from any goroutine.
func NewHostSet ¶
func NewHostSet[K comparable](handle ClusterHandle) *HostSet[K]
NewHostSet creates a HostSet backed by handle. The initial published map is empty.
func (*HostSet[K]) Apply ¶
func (s *HostSet[K]) Apply(snapshot HostSnapshot[K])
Apply replaces the desired host snapshot. It must be called on the cluster main thread — it calls AddHosts, UpdateHostHealth, and RemoveHosts, all of which are main-thread-only. From a background goroutine, build the snapshot off-thread and use ClusterHandle.Schedule to invoke Apply on the main thread.
Ordering guarantee: add → publish → remove so ChooseHost never sees a dangling pointer.
func (*HostSet[K]) Current ¶
Current returns a freshly-copied snapshot of the published map. Safe from any goroutine. Callers that only need one key should prefer Get or Entry.
type HostSnapshot ¶
type HostSnapshot[K comparable] map[K]HostSpec
HostSnapshot is the complete desired host set keyed by caller's key type. A missing key means "remove that host". Pass nil or an empty map to remove all.
type HostSnapshotFunc ¶
type HostSnapshotFunc[K comparable] func(ctx context.Context) (HostSnapshot[K], error)
HostSnapshotFunc fetches the current desired host snapshot. It may be called from a background goroutine. Return a non-nil error to keep the previous snapshot.
type HttpHeaderType ¶
type HttpHeaderType = down.HttpHeaderType
type LBPolicyConfigFactory ¶
type LBPolicyConfigFactory = down.LBPolicyConfigFactory
type LBPolicyFactory ¶
type LBPolicyFactory = down.LBPolicyFactory
type LogLevel ¶
type LogLevel uint32
LogLevel controls Envoy log severity for Writer.Log.
const ( LogTrace LogLevel = LogLevel(shared.LogLevelTrace) LogDebug LogLevel = LogLevel(shared.LogLevelDebug) LogInfo LogLevel = LogLevel(shared.LogLevelInfo) LogWarn LogLevel = LogLevel(shared.LogLevelWarn) LogError LogLevel = LogLevel(shared.LogLevelError) LogCritical LogLevel = LogLevel(shared.LogLevelCritical) )
LogLevel aliases.
type MetadataSource ¶
type MetadataSource uint32
MetadataSource identifies which metadata store to read from.
const ( MetadataSourceDynamic MetadataSource = MetadataSource(shared.MetadataSourceTypeDynamic) MetadataSourceRoute MetadataSource = MetadataSource(shared.MetadataSourceTypeRoute) MetadataSourceCluster MetadataSource = MetadataSource(shared.MetadataSourceTypeCluster) MetadataSourceHost MetadataSource = MetadataSource(shared.MetadataSourceTypeHost) MetadataSourceHostLocality MetadataSource = MetadataSource(shared.MetadataSourceTypeHostLocality) )
type MetricID ¶
type MetricID uint64
MetricID is an opaque handle to an Envoy metric defined at config time via ConfigHandle.
type Middleware ¶
type Middleware func(next HandlerFunc) HandlerFunc
Middleware wraps a HandlerFunc, enabling before/after logic around a handler.
type OnStreamCompleteFunc ¶
type OnStreamCompleteFunc func(ctx *any)
OnStreamCompleteFunc is called exactly once per stream after Envoy terminates it, regardless of how the stream ended (normal end-of-stream, reset, idle/request timeout, local reply from another filter). Use it for cleanup that must run even when the request/response handlers did not — e.g. removing entries from process-wide registries.
ctx is the per-stream context slot (same value Request.Context and BodyChunk.Context point at). Mutations are no-ops at this point: the stream is dead and any queued header/filter-state changes will not be applied. Do not call Writer methods from here.
type OnStreamFinalizedFunc ¶
type OnStreamFinalizedFunc func(ctx *any, info FinalizedInfo)
OnStreamFinalizedFunc fires on the worker thread after Envoy finalizes the stream, before the per-stream context is released. Like OnStreamCompleteFunc it carries no Writer and mutations are not possible — the stream is dead. The strict superset over OnStreamComplete is the finalized stream fields in FinalizedInfo, which Envoy only exposes through the access-logger path.
Delivery: when a filter is registered with WithOnStreamFinalized the SDK auto-registers an internal access logger under the same name and correlates filter ↔ logger via the request id (x-request-id), or an SDK fallback request header when the request id is empty. The listener Envoy YAML must include an access_log entry pointing at this dynamic module with logger_name equal to the filter name (see examples/request-ui/envoy.yaml). If the access-logger entry is missing from the YAML, the callback will not fire — Envoy delivers finalized fields only through that path.
type PipelineConfig ¶
type PipelineConfig[T any] struct { // contains filtered or unexported fields }
PipelineConfig[T] holds a decoded config snapshot refreshable from a source. Snapshot() is safe from any goroutine. The request path never calls the source.
func NewFileConfig ¶
func NewFileConfig[T any](path string, dec ConfigDecoder[T], opts PollOptions) *PipelineConfig[T]
NewFileConfig returns a PipelineConfig backed by a file path. The file is read fresh on every poll tick; path is evaluated at fetch time.
func NewPollingConfig ¶
func NewPollingConfig[T any](src ConfigSource, dec ConfigDecoder[T], opts PollOptions) *PipelineConfig[T]
NewPollingConfig returns a PipelineConfig backed by an arbitrary source. The first fetch fires immediately on Start; subsequent fetches tick at Interval ± Jitter.
func NewStaticConfig ¶
func NewStaticConfig[T any](v T) *PipelineConfig[T]
NewStaticConfig returns a PipelineConfig whose snapshot never changes. Start is a no-op. Snapshot() returns v immediately.
func (*PipelineConfig[T]) RefreshOnce ¶
func (p *PipelineConfig[T]) RefreshOnce(ctx context.Context) error
RefreshOnce fetches and decodes a single snapshot, blocking until done or ctx is cancelled. Updates Snapshot() on success; on error keeps last-good. Useful for warming the cache before Start() begins ticking.
func (*PipelineConfig[T]) SignalRefresh ¶
func (p *PipelineConfig[T]) SignalRefresh()
SignalRefresh requests an immediate refresh (called by file watchers, etc). Non-blocking; ignored if a refresh is already pending.
func (*PipelineConfig[T]) Snapshot ¶
func (p *PipelineConfig[T]) Snapshot() T
Snapshot returns the current decoded config. Returns the zero value if no successful fetch has completed yet. Never returns a partially-updated value.
func (*PipelineConfig[T]) Start ¶
func (p *PipelineConfig[T]) Start(ctx context.Context) (stop func())
Start launches the background polling goroutine. No-op for static configs. The first fetch fires immediately; subsequent fetches tick at Interval ± Jitter. The returned stop func cancels polling and waits for any in-flight fetch to finish. Wire through up.WithGroup or call from Cluster.ServerInitialized / OnDestroy.
type PollOptions ¶
type PollOptions struct {
Interval time.Duration // zero → DefaultPollInterval
Timeout time.Duration // zero → DefaultPollTimeout; applied per fetch attempt
Jitter time.Duration // random [0, Jitter) added to each interval
// Observe is called after every refresh cycle (success or failure).
// Called from the polling goroutine; must not block indefinitely.
Observe func(ConfigEvent)
}
PollOptions configures the polling behaviour for NewPollingConfig / NewFileConfig.
type Request ¶
type Request struct {
Method string
Path string
Host string
FilterName string
Context *any
// contains filtered or unexported fields
}
Request holds the per-request fields populated before the handler is called.
func NewRequest ¶
NewRequest constructs a Request from a HeaderMap for use in tests.
func (*Request) AllHeaders ¶
AllHeaders returns all request headers as copied Go strings.
type RequestBodyHandlerFunc ¶
RequestBodyHandlerFunc is called for each request body event.
type ResponseChunk ¶
type ResponseChunk struct {
StatusCode int
Headers shared.HeaderMap
Data []byte
EndStream bool
ContentEncoding string
ContentType string
Context *any
}
ResponseChunk is passed to a ResponseHandlerFunc on every response event.
On the response headers call, StatusCode is non-zero and Headers is set. On response body calls, StatusCode is 0 and Data holds the received bytes. A synthetic body call with Data: nil, EndStream: true is issued when the response has no body (204, HEAD, etc.) so body logic always fires once.
ContentEncoding and ContentType are populated automatically from the response's Content-Encoding and Content-Type headers. Context is a per-stream slot shared across all callbacks on one stream.
func (*ResponseChunk) AllHeaders ¶
func (c *ResponseChunk) AllHeaders() [][2]string
AllHeaders returns all response headers as copied Go strings. It is only meaningful when StatusCode is non-zero (the headers call).
type ResponseHandlerFunc ¶
type ResponseHandlerFunc func(w *Writer, chunk *ResponseChunk)
ResponseHandlerFunc is called for each response event on a stream.
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router dispatches a request to a handler matched by HTTP method and exact path. Unmatched requests are forwarded to the notFound func; if notFound is nil, they are silently passed through.
func (*Router) DELETEPrefix ¶
DELETEPrefix registers handler for DELETE requests to any path with the given prefix.
func (*Router) Dispatch ¶
Dispatch is the request handler func for Register. Exact routes take priority over prefix routes; among each group, first registration wins.
func (*Router) GETPrefix ¶
GETPrefix registers handler for GET requests to any path with the given prefix.
func (*Router) Handle ¶
Handle registers handler for the given HTTP method and exact path. Returns r for chaining.
func (*Router) HandlePrefix ¶
HandlePrefix registers handler for the given HTTP method and any path that starts with the given prefix. Exact routes are checked first; prefix routes are checked in registration order among themselves.
type SelectorObserver ¶
type SelectorObserver struct {
// OnSelected is called (on cluster main thread) when a host is chosen.
OnSelected func(host HostPtr)
// OnFailed is called (on cluster main thread) when the lookup returns an error.
OnFailed func(errDetail string)
// OnCancelled is called (on cluster main thread) when CancelHostSelection fires.
OnCancelled func()
// OnMissingPromise is called (on cluster main thread) when no promise is
// found in the stream-object bag (key absent or wrong type).
OnMissingPromise func()
}
SelectorObserver receives events from AsyncHostSelector. All nil fields are treated as no-ops.
type Sidecar ¶
type Sidecar struct {
// contains filtered or unexported fields
}
Sidecar wraps an http.Handler with net.Listen, readiness, and graceful shutdown.
func NewSidecar ¶
func NewSidecar(handler http.Handler, opts SidecarOptions) *Sidecar
NewSidecar creates a new Sidecar wrapping the given handler.
func (*Sidecar) ListenAddr ¶
ListenAddr returns the resolved listen address (e.g. "127.0.0.1:43210"). Returns "" before Ready() closes.
type SidecarOptions ¶
type SidecarOptions struct {
ListenAddr string
ShutdownTimeout time.Duration // default 5s
EgressURL string // empty = break-glass direct-dial
Rationale string // required when EgressURL is empty
OnSession func(SidecarSessionEvent)
// StartupLogFile, when non-empty, causes the startup rationale to also be
// written to this file (one line). Used by e2e tests to assert the rationale
// without capturing raw stderr.
StartupLogFile string
}
SidecarOptions configures a Sidecar.
type SidecarSessionEvent ¶
SidecarSessionEvent is delivered to SidecarOptions.OnSession when a session ends.
type StreamKey ¶
type StreamKey[T any] struct { // contains filtered or unexported fields }
StreamKey[T] is a typed handle for one slot in the per-stream object bag (Primitive A). The zero value is unusable; construct with NewStreamKey.
StreamKey is a thin, allocation-free wrapper around a string key. All type safety is enforced at compile time via the generic parameter T; no reflect or interface{} type switches are needed at the call site.
func NewStreamKey ¶
NewStreamKey returns a StreamKey that uses key as its bag slot name. key must be unique within a pipeline; conventionally use a dotted reverse- domain form: "orange.decision", "mcp.session", etc.
func (StreamKey[T]) Get ¶
Get retrieves the value from the per-stream bag via w. Returns (zero, false) if the slot has not been set or the stored value cannot be asserted to T (which should not happen in correct usage). Must be called on the worker thread.
func (StreamKey[T]) GetFromCtx ¶
func (k StreamKey[T]) GetFromCtx(ctx ClusterLBContext) (T, bool)
GetFromCtx retrieves the value from the per-stream bag via a ClusterLBContext. Returns (zero, false) if the slot has not been set, the context has no bag, or the stored value cannot be asserted to T. Safe to call from the cluster main thread (same constraint as ClusterLBContext.GetStreamObject).
type StreamPromise ¶
type StreamPromise[T any] struct { // contains filtered or unexported fields }
StreamPromise[T] is a resolve-once promise. The zero value is unusable; construct with NewStreamPromise.
Concurrency model:
- mu guards resolved, value, and callbacks.
- done is a channel closed on first Resolve; callers may wait on it.
- resolvedOnce ensures close(done) happens exactly once even under concurrent Resolve calls racing with OnResolve.
func NewStreamPromise ¶
func NewStreamPromise[T any]() *StreamPromise[T]
NewStreamPromise returns a new, unresolved StreamPromise[T].
func (*StreamPromise[T]) Done ¶
func (p *StreamPromise[T]) Done() <-chan struct{}
Done returns a channel that is closed when the promise is resolved. Safe to use in a select.
func (*StreamPromise[T]) OnResolve ¶
func (p *StreamPromise[T]) OnResolve(cb func(T)) (cancel func())
OnResolve registers cb to fire when the promise is resolved. If the promise is already resolved, cb fires synchronously before OnResolve returns.
Returns a cancel func. Calling cancel prevents cb from firing if it has not fired yet; cancel is idempotent and safe to call multiple times.
func (*StreamPromise[T]) Resolve ¶
func (p *StreamPromise[T]) Resolve(v T) bool
Resolve publishes v. The first call wins and returns true; subsequent calls are no-ops and return false.
All registered OnResolve callbacks that have not been canceled are fired synchronously from the goroutine that calls Resolve, in registration order.
func (*StreamPromise[T]) Result ¶
func (p *StreamPromise[T]) Result() (T, bool)
Result returns (v, true) if the promise is resolved, or (zero, false) if not.
type TimingInfo ¶
type TimingInfo = down.TimingInfo
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer is a thin per-invocation view over filter. It carries only the state scoped to a single handler invocation: calloutCB, calloutStarted, and directWrite. All mutable stream state — mutation queues, callout fn/state, async state — lives on the backing filter, which is long-lived for the stream.
A Writer must not be retained beyond the handler call. On the HTTPCallout path the same Writer is reused across the initial handler call and the callout callback; both run sequentially within one stream operation so this is safe. See Do for goroutine-safety rules when using w.Go and fan-out.
directWrite mode (set only by NewWriter):
Filter-created Writers (all production paths) set directWrite=false; mutation methods queue unconditionally so flush() applies them on the worker thread at the right time. NewWriter sets directWrite=true so that mutation methods apply directly to the handle — this preserves the behaviour expected by tests and examples that call handler functions outside a filter lifecycle, where there is no enclosing flush() call.
func NewWriter ¶
func NewWriter(h shared.HttpFilterHandle) *Writer
NewWriter wraps handle in a Writer that applies mutations directly (no queue). Intended for use in tests and examples that invoke handler functions outside a filter lifecycle. Tests that need callout or async lifecycle behavior should instantiate filter directly.
func (*Writer) AddLogAttrs ¶
AddLogAttrs appends key-value pairs to the per-request log context.
When WithLogMetadata is configured, each attr is also written to dynamic metadata under the filter's namespace so it is accessible from the Envoy access log via %DYNAMIC_METADATA(ns:key)%. Value types that Envoy's metadata store does not natively support (structs, slices, etc.) are serialised to their string representation rather than panicking.
By default the attrs are NOT printed inline in the process-log message; they flow to the access log through metadata. Use [WithInlineLogAttrs] to also print them inline (useful for local debugging without a JSON access log).
For a one-off derived logger without persisting attrs on the writer, use w.Slog().With(kvs...) directly instead.
func (*Writer) AddRequestHeader ¶
AddRequestHeader queues (or immediately adds) a request header (multi-value).
func (*Writer) AddResponseHeader ¶
AddResponseHeader adds a response header inline. Response-phase only; not queued.
func (*Writer) DecrementGauge ¶
DecrementGauge queues (or immediately applies) a gauge decrement.
func (*Writer) Do ¶
func (w *Writer) Do(ctx context.Context, req HTTPCalloutRequest) (*HTTPCalloutResponse, error)
Do performs an Envoy HTTP callout from inside a Go goroutine and blocks until the callout completes or ctx is cancelled.
Multiple Do calls may be in flight concurrently. Writer mutation methods (SetRequestHeader, SetFilterState, etc.) are NOT goroutine-safe and must not be called while any Do goroutine is still running. Join all fan-out goroutines before issuing any mutations; the race detector will catch violations if this rule is broken.
Panics if called outside a Go goroutine.
Response buffers (Headers, Body) are Go-owned copies safe to use after Do returns.
func (*Writer) GRPCCallout ¶
func (w *Writer) GRPCCallout(req GRPCCalloutRequest, fn GRPCCalloutFunc) (HTTPCalloutInitResult, error)
GRPCCallout issues a gRPC unary callout over an Envoy HTTP/2 cluster, pausing the current request until the response arrives. fn is called with the decoded response; it may queue mutations or call SendLocalResponse.
Internally uses Envoy's streamable HTTP callout API so response trailers are observed separately; gRPC status normally lives in trailers.
Returns a non-nil error if Envoy rejected the callout; fn will not be called.
func (*Writer) GetActiveSpan ¶
GetActiveSpan returns the active tracing span for the current stream.
func (*Writer) GetAttributeBool ¶
func (w *Writer) GetAttributeBool(id AttributeID) (bool, bool)
GetAttributeBool returns the boolean stream attribute for the given ID.
func (*Writer) GetAttributeNumber ¶
func (w *Writer) GetAttributeNumber(id AttributeID) (float64, bool)
GetAttributeNumber returns the numeric stream attribute for the given ID.
func (*Writer) GetAttributeString ¶
func (w *Writer) GetAttributeString(id AttributeID) (Buffer, bool)
GetAttributeString returns the string stream attribute for the given ID.
func (*Writer) GetFilterState ¶
GetFilterState reads a string filter state value previously written by SetFilterState. Returns the empty string and false if the key is absent.
func (*Writer) GetMetadataBool ¶
func (w *Writer) GetMetadataBool(source MetadataSource, ns, key string) (bool, bool)
GetMetadataBool reads a boolean metadata value from the given source.
func (*Writer) GetMetadataNumber ¶
func (w *Writer) GetMetadataNumber(source MetadataSource, ns, key string) (float64, bool)
GetMetadataNumber reads a numeric metadata value from the given source.
func (*Writer) GetMetadataString ¶
func (w *Writer) GetMetadataString(source MetadataSource, ns, key string) (Buffer, bool)
GetMetadataString reads a string metadata value from the given source. Returns a Buffer (copy the data with buf.String() before the callback returns).
func (*Writer) GetStreamObject ¶
GetStreamObject returns the value stored under key for this stream, or (nil, false) if the key was never set. Must be called on the worker thread. If SetStreamObject was never called on this stream no allocation occurs and no nonce is minted.
func (*Writer) Go ¶
Go upgrades this request to asynchronous mode. fn runs in a new goroutine and may call w.Do to issue outbound HTTP callouts. After fn returns, Transit hops back to the Envoy worker thread, applies queued mutations, and resumes the stream — unless the stream was cancelled while fn was running.
goStarted is cleared inside the scheduled finish before flush(true) so that subsequent callbacks (e.g. OnRequestBody on a request that still has a body) do not see stale goroutine state and spuriously return StopAndBuffer.
Panics if called twice or after HTTPCallout.
SendLocalResponse from inside fn is NOT reliable — see type-level docs.
func (*Writer) HTTPCallout ¶
func (w *Writer) HTTPCallout(req HTTPCalloutRequest, fn HTTPCalloutFunc) (HTTPCalloutInitResult, error)
HTTPCallout initiates an outbound Envoy HTTP callout from a request callback and pauses the request until the callout completes. fn is invoked with the callout result; it may queue mutations or call SendLocalResponse.
Returns HTTPCalloutInitSuccess and nil error if Envoy accepted the callout. A non-nil error means fn will never be called.
Panics if called after Go or after a previous HTTPCallout.
func (*Writer) HTTPCalloutAllSettled ¶
func (w *Writer) HTTPCalloutAllSettled(reqs []HTTPCalloutRequest, fn HTTPCalloutAllSettledFunc) error
HTTPCalloutAllSettled initiates multiple outbound Envoy HTTP callouts from a request callback and pauses the request until all accepted callouts complete. fn is invoked exactly once with one response slot per request; init failures are recorded in their corresponding response slot.
Response headers and body buffers are Go-owned copies safe to read after individual callout callbacks return. fn may queue mutations or call SendLocalResponse.
Panics if called after Go or after another HTTPCallout/HTTPCalloutAllSettled.
func (*Writer) HTTPCalloutSequence ¶
func (w *Writer) HTTPCalloutSequence(next HTTPCalloutSequenceNextFunc, done HTTPCalloutSequenceDoneFunc) error
HTTPCalloutSequence initiates outbound Envoy HTTP callouts one at a time from a request callback and pauses the request until done runs. next receives the previous response and decides whether to issue another callout. done runs from the callback path, so SendLocalResponse is reliable there.
Panics if called after Go or after another HTTPCallout/HTTPCalloutAllSettled.
func (*Writer) IncrementCounter ¶
IncrementCounter queues (or immediately applies) a counter increment.
func (*Writer) IncrementCounterLabels ¶
IncrementCounterLabels queues (or immediately applies) a counter increment with label values matching the tag keys used when the counter was defined.
func (*Writer) IncrementGauge ¶
IncrementGauge queues (or immediately applies) a gauge increment.
func (*Writer) RecordHistogram ¶
RecordHistogram queues (or immediately applies) a histogram observation.
func (*Writer) RecordHistogramLabels ¶
RecordHistogramLabels queues (or immediately applies) a histogram observation with label values matching the tag keys used when the histogram was defined.
func (*Writer) RemoveRequestHeader ¶
RemoveRequestHeader queues (or immediately removes) all values for the named header.
func (*Writer) RemoveResponseHeader ¶
RemoveResponseHeader removes a response header inline. Response-phase only; not queued.
func (*Writer) RequestHeader ¶
RequestHeader returns the first current value of the named request header, or "" if absent. Header mutations queued earlier in the same callback are reflected in the returned value.
func (*Writer) RequestHeaders ¶
RequestHeaders returns all current request headers as copied Go strings. Header mutations queued earlier in the same callback are reflected in the returned view, even though they are applied to Envoy only when the callback flushes.
func (*Writer) SendLocalResponse ¶
SendLocalResponse queues (or immediately sends) a client response. Only the first call takes effect; additional calls are silently ignored.
NOTE: SendLocalResponse from inside w.Go is NOT reliable. Envoy only honours it from filter callbacks. Use HTTPCallout (callback form) if the filter needs to reject with a local response.
func (*Writer) SetFilterState ¶
SetFilterState queues (or immediately writes) a per-stream filter state value.
func (*Writer) SetMetadata ¶
SetMetadata writes a per-stream dynamic metadata value. value must be a string, bool, or numeric type (int, int64, float64, etc.). Panics for unsupported types rather than silently producing a no-op. On the request path (queued mode) this is batched with other mutations and applied in flush() before ContinueRequest. On the response path (directWrite=true) it applies immediately.
func (*Writer) SetRequestBody ¶
SetRequestBody marks data as the replacement for the request body buffer. Only effective in mutable buffered request mode (WithMutableBody).
func (*Writer) SetRequestHeader ¶
SetRequestHeader queues (or immediately sets) a request header.
func (*Writer) SetResponseBody ¶
SetResponseBody marks data as the replacement for the response body buffer. Only effective in buffered mode (WithMutableBody).
func (*Writer) SetResponseHeader ¶
SetResponseHeader sets a response header inline. Response-phase callbacks only. Not queued — response mutations are always applied immediately.
func (*Writer) SetStreamObject ¶
SetStreamObject stores v under key in the per-stream typed-value bag (Primitive A). Must be called on the worker thread — same constraint as SetFilterState.
On the first call for a stream, getOrCreateBag mints a short random nonce, queues a SetFilterState write of that nonce under the reserved key "up.stream_object_id" (via the same queued-mutation path as SetFilterState), and creates the bag. Subsequent calls for the same stream reuse the bag.
The nonce in filter state lets ClusterLBContext.GetStreamObject look up the bag. The bag is drained by OnStreamComplete (or finalizedLogger.OnLog for filters using WithOnStreamFinalized) after all user callbacks have run.
func (*Writer) SetTypedMetadata ¶
SetTypedMetadata is like SetMetadata but numeric values are automatically formatted as decimal strings before being written. This satisfies the Envoy dynamic modules ABI constraint (metadata values must be strings) while letting callers pass typed values without manual strconv calls.
func (*Writer) SetUpstreamOverrideHost ¶
SetUpstreamOverrideHost queues (or immediately sets) an upstream host override. Returns true when queued (optimistic). Returns the handle's result in direct-write mode.
func (*Writer) Slog ¶
Slog returns a *slog.Logger whose handler routes through Envoy's logging mechanism. The logger automatically prepends filter=<name> to every line, followed by any attributes registered via WithAttributes. Call once at the top of a handler and reuse the result rather than calling Slog repeatedly.
Source Files
¶
- access_logger.go
- admin.go
- async.go
- async_host_selector.go
- attributes.go
- body.go
- buffer.go
- cluster.go
- cluster_group.go
- exchange_hooks.go
- filter.go
- group.go
- grpc.go
- host_refresh_loop.go
- host_set.go
- lb.go
- metadata.go
- pipeline_config.go
- pipeline_config_file.go
- pipeline_config_poll.go
- pipeline_config_yaml.go
- request.go
- response.go
- response_flags.go
- router.go
- sidecar.go
- stream_finalized.go
- stream_key.go
- stream_object.go
- stream_promise.go
- up.go
- watch.go
- writer.go
Directories
¶
| Path | Synopsis |
|---|---|
|
Package buffer provides zero-allocation stream buffering for Envoy filters.
|
Package buffer provides zero-allocation stream buffering for Envoy filters. |
|
Package compress handles Content-Encoding compression for HTTP body inspection.
|
Package compress handles Content-Encoding compression for HTTP body inspection. |
|
Package testutil provides test helpers for up package filters.
|
Package testutil provides test helpers for up package filters. |