Documentation
¶
Index ¶
- Constants
- Variables
- func Attach(mux goahttp.Muxer, service *Service)
- func Middleware(logger *slog.Logger, client *Client, forwarder *Forwarder) func(http.Handler) http.Handler
- type CachedConfig
- type Client
- func (c *Client) GetForOrg(ctx context.Context, orgID string) (CachedConfig, error)
- func (c *Client) InvalidateCache(ctx context.Context, orgID string)
- func (c *Client) LoadForOrg(ctx context.Context, orgID string) (CachedConfig, error)
- func (c *Client) LoadForOrgRow(ctx context.Context, orgID string) (CachedConfig, *repo.OtelForwardingConfig, error)
- func (c *Client) RefreshCache(ctx context.Context, cfg CachedConfig)
- func (c *Client) SoftDeleteWithTx(ctx context.Context, dbtx repo.DBTX, orgID string) error
- func (c *Client) UpsertWithTx(ctx context.Context, dbtx repo.DBTX, orgID, url string, ...) (CachedConfig, *repo.OtelForwardingConfig, error)
- type Forwarder
- type Job
- type Service
- func (s *Service) APIKeyAuth(ctx context.Context, key string, schema *security.APIKeyScheme) (context.Context, error)
- func (s *Service) DeleteConfig(ctx context.Context, _ *gen.DeleteConfigPayload) error
- func (s *Service) GetConfig(ctx context.Context, _ *gen.GetConfigPayload) (*gen.OtelForwardingConfig, error)
- func (s *Service) UpsertConfig(ctx context.Context, payload *gen.UpsertConfigPayload) (*gen.OtelForwardingConfig, error)
Constants ¶
const ( // MaxForwardBodyBytes caps the body we are willing to buffer in memory // for forwarding. Requests larger than this are still processed by the // Goa handler (we let them through with the original body intact) but // are skipped for forwarding so we don't OOM on a runaway payload. MaxForwardBodyBytes = 4 * 1024 * 1024 )
Variables ¶
var ErrQueueFull = errors.New("otel forward queue full")
ErrQueueFull is exported so tests can distinguish drop-due-to-backpressure from other failure modes.
Functions ¶
func Middleware ¶
func Middleware(logger *slog.Logger, client *Client, forwarder *Forwarder) func(http.Handler) http.Handler
Middleware returns an HTTP middleware that intercepts requests to the hooks.otel endpoints, buffers the body, and asynchronously forwards a copy to the customer's configured endpoint. The original request continues to the downstream Goa handler unchanged so we still process the payload ourselves.
Failures (queue full, customer endpoint 5xx, lookup errors) are logged only — they never affect the response returned to the caller.
Types ¶
type CachedConfig ¶
type CachedConfig struct {
OrganizationID string `json:"organization_id"`
URL string `json:"url"`
Headers map[string]string `json:"headers"`
Enabled bool `json:"enabled"`
}
CachedConfig holds the decrypted forwarding config for one org. An empty struct (URL == "") means "no config" — cached to avoid hitting the DB on every OTEL request for orgs that haven't configured forwarding.
func (CachedConfig) AdditionalCacheKeys ¶
func (c CachedConfig) AdditionalCacheKeys() []string
func (CachedConfig) CacheKey ¶
func (c CachedConfig) CacheKey() string
func (CachedConfig) IsConfigured ¶
func (c CachedConfig) IsConfigured() bool
IsConfigured reports whether the org has a non-deleted forwarding config. Note: a configured-but-disabled config still returns true here; callers check Enabled separately.
func (CachedConfig) TTL ¶
func (c CachedConfig) TTL() time.Duration
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client owns DB + cache access for the per-org forwarding config. Both the body-tee middleware (read path) and the management API (write path) use this same client so cache invalidation stays consistent. Writes accept a repo.DBTX so callers can keep the row and any audit-log entry in the same transaction.
func (*Client) GetForOrg ¶
GetForOrg returns the cached/decoded config for an org. A returned CachedConfig with URL == "" means "no config configured" — this is also cached to avoid hammering the DB on the hot OTEL ingest path.
func (*Client) InvalidateCache ¶
InvalidateCache removes the cached config for the org.
func (*Client) LoadForOrg ¶
LoadForOrg bypasses the cache and reads the row directly from the database. Returns an empty CachedConfig (with URL == "") if the org has no active config. Pairs with LoadForOrgRow when callers need the full repo row (e.g. for audit subject IDs).
func (*Client) LoadForOrgRow ¶
func (c *Client) LoadForOrgRow(ctx context.Context, orgID string) (CachedConfig, *repo.OtelForwardingConfig, error)
LoadForOrgRow returns the full repo row alongside the decoded config. A nil row means the org has no active forwarding config. Callers that only need the config payload should use LoadForOrg.
func (*Client) RefreshCache ¶
func (c *Client) RefreshCache(ctx context.Context, cfg CachedConfig)
RefreshCache stores the given config in the cache so subsequent reads don't wait for the TTL.
func (*Client) SoftDeleteWithTx ¶
SoftDeleteWithTx soft-deletes the org's forwarding config via the given dbtx. Caller is responsible for committing and then calling InvalidateCache(ctx, orgID).
func (*Client) UpsertWithTx ¶
func (c *Client) UpsertWithTx(ctx context.Context, dbtx repo.DBTX, orgID, url string, headers map[string]string, enabled bool) (CachedConfig, *repo.OtelForwardingConfig, error)
UpsertWithTx encrypts the headers and writes the row via the given dbtx. Caller is responsible for committing the transaction and then calling RefreshCache(ctx, returned) so subsequent reads see the new value.
type Forwarder ¶
type Forwarder struct {
// contains filtered or unexported fields
}
Forwarder is a bounded async worker pool. Jobs that don't fit in the queue are dropped with a logged warning — the OTEL ingest path must not block because a customer's downstream endpoint is slow.
func NewForwarder ¶
func NewForwarder(logger *slog.Logger, tracerProvider trace.TracerProvider, meterProvider metric.MeterProvider, policy *guardian.Policy) *Forwarder
func (*Forwarder) Enqueue ¶
Enqueue queues a forward job. If the queue is full, the job is dropped and a metric is incremented — we deliberately do not block the OTEL ingest path on slow customer endpoints.
type Job ¶
type Job struct {
OrgID string
URL string
ContentType string
Headers map[string]string
Body []byte
}
Job is a single OTEL payload queued for forwarding to a customer endpoint. Body is the raw bytes received on /rpc/hooks.otel/v1/*, forwarded verbatim (raw passthrough).
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
func NewService ¶
func (*Service) APIKeyAuth ¶
func (*Service) DeleteConfig ¶
func (*Service) GetConfig ¶
func (s *Service) GetConfig(ctx context.Context, _ *gen.GetConfigPayload) (*gen.OtelForwardingConfig, error)
func (*Service) UpsertConfig ¶
func (s *Service) UpsertConfig(ctx context.Context, payload *gen.UpsertConfigPayload) (*gen.OtelForwardingConfig, error)