otelforwarding

package
v0.0.0-...-9ee283c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2026 License: AGPL-3.0 Imports: 36 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MaxForwardBodyBytes caps the body we are willing to buffer in memory
	// for forwarding. Requests larger than this are still processed by the
	// Goa handler (we let them through with the original body intact) but
	// are skipped for forwarding so we don't OOM on a runaway payload.
	MaxForwardBodyBytes = 4 * 1024 * 1024
)

Variables

View Source
var ErrQueueFull = errors.New("otel forward queue full")

ErrQueueFull is exported so tests can distinguish drop-due-to-backpressure from other failure modes.

Functions

func Attach

func Attach(mux goahttp.Muxer, service *Service)

func Middleware

func Middleware(logger *slog.Logger, client *Client, forwarder *Forwarder) func(http.Handler) http.Handler

Middleware returns an HTTP middleware that intercepts requests to the hooks.otel endpoints, buffers the body, and asynchronously forwards a copy to the customer's configured endpoint. The original request continues to the downstream Goa handler unchanged so we still process the payload ourselves.

Failures (queue full, customer endpoint 5xx, lookup errors) are logged only — they never affect the response returned to the caller.

Types

type CachedConfig

type CachedConfig struct {
	OrganizationID string            `json:"organization_id"`
	URL            string            `json:"url"`
	Headers        map[string]string `json:"headers"`
	Enabled        bool              `json:"enabled"`
}

CachedConfig holds the decrypted forwarding config for one org. An empty struct (URL == "") means "no config" — cached to avoid hitting the DB on every OTEL request for orgs that haven't configured forwarding.

func (CachedConfig) AdditionalCacheKeys

func (c CachedConfig) AdditionalCacheKeys() []string

func (CachedConfig) CacheKey

func (c CachedConfig) CacheKey() string

func (CachedConfig) IsConfigured

func (c CachedConfig) IsConfigured() bool

IsConfigured reports whether the org has a non-deleted forwarding config. Note: a configured-but-disabled config still returns true here; callers check Enabled separately.

func (CachedConfig) TTL

func (c CachedConfig) TTL() time.Duration

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client owns DB + cache access for the per-org forwarding config. Both the body-tee middleware (read path) and the management API (write path) use this same client so cache invalidation stays consistent. Writes accept a repo.DBTX so callers can keep the row and any audit-log entry in the same transaction.

func NewClient

func NewClient(logger *slog.Logger, db *pgxpool.Pool, enc *encryption.Client, cacheImpl cache.Cache) *Client

func (*Client) GetForOrg

func (c *Client) GetForOrg(ctx context.Context, orgID string) (CachedConfig, error)

GetForOrg returns the cached/decoded config for an org. A returned CachedConfig with URL == "" means "no config configured" — this is also cached to avoid hammering the DB on the hot OTEL ingest path.

func (*Client) InvalidateCache

func (c *Client) InvalidateCache(ctx context.Context, orgID string)

InvalidateCache removes the cached config for the org.

func (*Client) LoadForOrg

func (c *Client) LoadForOrg(ctx context.Context, orgID string) (CachedConfig, error)

LoadForOrg bypasses the cache and reads the row directly from the database. Returns an empty CachedConfig (with URL == "") if the org has no active config. Pairs with LoadForOrgRow when callers need the full repo row (e.g. for audit subject IDs).

func (*Client) LoadForOrgRow

func (c *Client) LoadForOrgRow(ctx context.Context, orgID string) (CachedConfig, *repo.OtelForwardingConfig, error)

LoadForOrgRow returns the full repo row alongside the decoded config. A nil row means the org has no active forwarding config. Callers that only need the config payload should use LoadForOrg.

func (*Client) RefreshCache

func (c *Client) RefreshCache(ctx context.Context, cfg CachedConfig)

RefreshCache stores the given config in the cache so subsequent reads don't wait for the TTL.

func (*Client) SoftDeleteWithTx

func (c *Client) SoftDeleteWithTx(ctx context.Context, dbtx repo.DBTX, orgID string) error

SoftDeleteWithTx soft-deletes the org's forwarding config via the given dbtx. Caller is responsible for committing and then calling InvalidateCache(ctx, orgID).

func (*Client) UpsertWithTx

func (c *Client) UpsertWithTx(ctx context.Context, dbtx repo.DBTX, orgID, url string, headers map[string]string, enabled bool) (CachedConfig, *repo.OtelForwardingConfig, error)

UpsertWithTx encrypts the headers and writes the row via the given dbtx. Caller is responsible for committing the transaction and then calling RefreshCache(ctx, returned) so subsequent reads see the new value.

type Forwarder

type Forwarder struct {
	// contains filtered or unexported fields
}

Forwarder is a bounded async worker pool. Jobs that don't fit in the queue are dropped with a logged warning — the OTEL ingest path must not block because a customer's downstream endpoint is slow.

func NewForwarder

func NewForwarder(logger *slog.Logger, tracerProvider trace.TracerProvider, meterProvider metric.MeterProvider, policy *guardian.Policy) *Forwarder

func (*Forwarder) Enqueue

func (f *Forwarder) Enqueue(ctx context.Context, job Job)

Enqueue queues a forward job. If the queue is full, the job is dropped and a metric is incremented — we deliberately do not block the OTEL ingest path on slow customer endpoints.

func (*Forwarder) Shutdown

func (f *Forwarder) Shutdown(ctx context.Context)

Shutdown stops accepting new jobs and waits for in-flight workers to drain. Jobs already in the queue are sent best-effort before exit.

func (*Forwarder) Start

func (f *Forwarder) Start(ctx context.Context)

Start spawns the worker goroutines. Safe to call once.

type Job

type Job struct {
	OrgID       string
	URL         string
	ContentType string
	Headers     map[string]string
	Body        []byte
}

Job is a single OTEL payload queued for forwarding to a customer endpoint. Body is the raw bytes received on /rpc/hooks.otel/v1/*, forwarded verbatim (raw passthrough).

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService(
	logger *slog.Logger,
	tracerProvider trace.TracerProvider,
	db *pgxpool.Pool,
	sessions *sessions.Manager,
	authzEngine *authz.Engine,
	auditLogger *audit.Logger,
	client *Client,
) *Service

func (*Service) APIKeyAuth

func (s *Service) APIKeyAuth(ctx context.Context, key string, schema *security.APIKeyScheme) (context.Context, error)

func (*Service) DeleteConfig

func (s *Service) DeleteConfig(ctx context.Context, _ *gen.DeleteConfigPayload) error

func (*Service) GetConfig

func (*Service) UpsertConfig

func (s *Service) UpsertConfig(ctx context.Context, payload *gen.UpsertConfigPayload) (*gen.OtelForwardingConfig, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL