request_log

package
v0.0.0-...-0898198 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2026 License: MIT Imports: 34 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNotFound = errors.New("record not found")

ErrNotFound is returned when a log record is not found that was requested. This isn't necessarily a bad request as the record may have expired due to TTL.

Functions

func ApplyAttributionToLogRecord

func ApplyAttributionToLogRecord(er *LogRecord, ctx context.Context)

ApplyAttributionToLogRecord stamps the LogRecord with whatever the proxy stack recorded on the request context — the response source plus, when a rate-limit resource matched, the rule id / mode / bucket / full match set. When no attribution was installed (older code paths or non-proxy traffic), defaults the source to ResponseSourceUpstream so the column is always populated.

Designed as a separate call from SetLogRecordFieldsFromRequestInfo so the existing signature isn't disturbed; the request-log round-tripper invokes both in sequence.

func ContextWithAttribution

func ContextWithAttribution(ctx context.Context, attr *Attribution) context.Context

ContextWithAttribution returns a context that carries attr. The proxy bootstrap installs an empty Attribution on the request context before the round-tripper chain runs; middlewares mutate the struct in place.

func IsValidOrderByField

func IsValidOrderByField(field RequestOrderByField) bool

func IsValidResponseSource

func IsValidResponseSource(s ResponseSource) bool

IsValidResponseSource reports whether s is a recognised ResponseSource. Unknown values returned from old DB rows or from forward-compatible readers are not coerced — callers can choose to treat them as upstream or to log a warning.

func SetLogRecordFieldsFromRequestInfo

func SetLogRecordFieldsFromRequestInfo(er *LogRecord, ri httpf.RequestInfo)

Types

type Attribution

type Attribution struct {
	Source           ResponseSource
	RateLimitId      apid.ID
	RateLimitMode    string
	RateLimitBucket  map[string]string
	RateLimitMatched []RateLimitMatch
}

Attribution carries per-request log attribution that the proxy stack populates as the request flows through it. The rate-limit round-tripper writes Source when it short-circuits with a synthetic 429; the (future) rate-limit resource enforcement writes the RateLimit* fields when a rule fires. The request-log round-tripper reads the value on its way back to populate LogRecord.

Use a pointer in the context so middlewares writing to the same Attribution see each other's edits — the pointer doesn't change, the struct fields do.

func AttributionFromContext

func AttributionFromContext(ctx context.Context) *Attribution

AttributionFromContext returns the Attribution carried by ctx, or nil when no proxy middleware has installed one. Returning nil rather than a zero-value struct lets callers distinguish "nothing was stamped" from "stamped but blank" — the LogRecord-population path treats nil as ResponseSourceUpstream.

type Encryptor

type Encryptor interface {
	EncryptForNamespace(ctx context.Context, namespacePath string, data []byte) (encfield.EncryptedField, error)
	Decrypt(ctx context.Context, ef encfield.EncryptedField) ([]byte, error)
}

Encryptor provides namespace-scoped encryption/decryption for blob storage. Satisfied by encrypt.E via duck typing.

type FullLog

type FullLog struct {
	Id                  apid.ID             `json:"id"`
	Namespace           string              `json:"ns"`
	CorrelationID       string              `json:"cid"`
	Timestamp           time.Time           `json:"ts"`
	MillisecondDuration MillisecondDuration `json:"dur"`
	Full                bool                `json:"full,omitempty"`
	InternalTimeout     bool                `json:"to,omitempty"`
	RequestCancelled    bool                `json:"rc,omitempty"`
	Request             FullLogRequest      `json:"req"`
	Response            FullLogResponse     `json:"res"`
}

func NewFullLogFromRecord

func NewFullLogFromRecord(er *LogRecord) *FullLog

func (*FullLog) GetId

func (e *FullLog) GetId() apid.ID

func (*FullLog) GetNamespace

func (e *FullLog) GetNamespace() string

func (*FullLog) ToRecord

func (e *FullLog) ToRecord() *LogRecord

type FullLogRequest

type FullLogRequest struct {
	URL           string              `json:"u"`
	HttpVersion   string              `json:"v"`
	Method        string              `json:"m"`
	Headers       map[string][]string `json:"h"`
	ContentLength int64               `json:"cl,omitempty"`
	Body          []byte              `json:"b,omitempty"`
}

type FullLogResponse

type FullLogResponse struct {
	HttpVersion   string              `json:"v"`
	StatusCode    int                 `json:"sc"`
	Headers       map[string][]string `json:"h"`
	Body          []byte              `json:"b,omitempty"`
	ContentLength int64               `json:"cl,omitempty"`
	Err           string              `json:"err,omitempty"`
}

type FullStore

type FullStore interface {
	// Store persists a FullLog to the storage backend.
	Store(ctx context.Context, log *FullLog) error

	// GetFullLog retrieves a FullLog from the storage backend.
	GetFullLog(ctx context.Context, ns string, id apid.ID) (*FullLog, error)
}

func NewBlobStore

func NewBlobStore(client apblob.Client, encryptor Encryptor, logger *slog.Logger) FullStore

type ListFilters

type ListFilters struct {
	LimitVal        int32                `json:"limit"`
	Offset          int32                `json:"offset"`
	OrderByFieldVal *RequestOrderByField `json:"order_by_field"`
	OrderByVal      *pagination.OrderBy  `json:"order_by"`

	RequestType              *string           `json:"request_type,omitempty"`
	CorrelationId            *string           `json:"correlation_id,omitempty"`
	ConnectionId             *apid.ID          `json:"connection_id,omitempty"`
	ConnectorType            *string           `json:"connector_type,omitempty"`
	ConnectorId              *apid.ID          `json:"connector_id,omitempty"`
	ConnectorVersion         *uint64           `json:"connector_version,omitempty"`
	Method                   *string           `json:"method,omitempty"`
	StatusCodeRangeInclusive []int             `json:"status_code_range,omitempty"`
	TimestampRange           []time.Time       `json:"timestamp_range,omitempty"`
	Path                     *string           `json:"path,omitempty"`
	PathRegex                *string           `json:"path_regex,omitempty"`
	NamespaceMatchers        []string          `json:"namespace_matchers,omitempty"`
	LabelSelector            *string           `json:"label_selector,omitempty"`
	ResponseSource           *string           `json:"response_source,omitempty"`
	RateLimitId              *apid.ID          `json:"rate_limit_id,omitempty"`
	Errors                   *multierror.Error `json:"-"`
}

ListFilters holds the filter, pagination, and ordering data for list requests. This struct is provider-agnostic and is embedded by provider-specific list builders.

func (*ListFilters) AddError

func (l *ListFilters) AddError(e error)

func (*ListFilters) SetConnectionId

func (l *ListFilters) SetConnectionId(u apid.ID)

func (*ListFilters) SetConnectorId

func (l *ListFilters) SetConnectorId(u apid.ID)

func (*ListFilters) SetConnectorType

func (l *ListFilters) SetConnectorType(t string)

func (*ListFilters) SetConnectorVersion

func (l *ListFilters) SetConnectorVersion(v uint64)

func (*ListFilters) SetCorrelationId

func (l *ListFilters) SetCorrelationId(correlationId string)

func (*ListFilters) SetLabelSelector

func (l *ListFilters) SetLabelSelector(selector string)

func (*ListFilters) SetLimit

func (l *ListFilters) SetLimit(limit int32)

func (*ListFilters) SetMethod

func (l *ListFilters) SetMethod(method string)

func (*ListFilters) SetNamespaceMatcher

func (l *ListFilters) SetNamespaceMatcher(matcher string) error

func (*ListFilters) SetNamespaceMatchers

func (l *ListFilters) SetNamespaceMatchers(matchers []string) error

func (*ListFilters) SetOrderBy

func (l *ListFilters) SetOrderBy(field RequestOrderByField, by pagination.OrderBy)

func (*ListFilters) SetPath

func (l *ListFilters) SetPath(path string)

func (*ListFilters) SetPathRegex

func (l *ListFilters) SetPathRegex(r string) error

func (*ListFilters) SetRateLimitId

func (l *ListFilters) SetRateLimitId(id apid.ID)

func (*ListFilters) SetRequestType

func (l *ListFilters) SetRequestType(requestType httpf.RequestType)

func (*ListFilters) SetResponseSource

func (l *ListFilters) SetResponseSource(s ResponseSource)

func (*ListFilters) SetStatusCode

func (l *ListFilters) SetStatusCode(s int)

func (*ListFilters) SetStatusCodeRangeInclusive

func (l *ListFilters) SetStatusCodeRangeInclusive(start, end int)

func (*ListFilters) SetTimestampRange

func (l *ListFilters) SetTimestampRange(start, end time.Time)

func (*ListFilters) Validate

func (l *ListFilters) Validate() error

type ListRequestBuilder

type ListRequestBuilder interface {
	ListRequestExecutor
	Limit(int32) ListRequestBuilder
	OrderBy(RequestOrderByField, pagination.OrderBy) ListRequestBuilder

	WithNamespaceMatcher(matcher string) ListRequestBuilder
	WithNamespaceMatchers(matchers []string) ListRequestBuilder
	WithRequestType(requestType httpf.RequestType) ListRequestBuilder
	WithCorrelationId(correlationId string) ListRequestBuilder
	WithConnectionId(u apid.ID) ListRequestBuilder
	WithConnectorType(t string) ListRequestBuilder
	WithConnectorId(u apid.ID) ListRequestBuilder
	WithConnectorVersion(v uint64) ListRequestBuilder
	WithMethod(method string) ListRequestBuilder
	WithStatusCode(s int) ListRequestBuilder
	WithStatusCodeRangeInclusive(start, end int) ListRequestBuilder
	WithParsedStatusCodeRange(r string) (ListRequestBuilder, error)
	WithPath(path string) ListRequestBuilder
	WithPathRegex(r string) (ListRequestBuilder, error)
	WithTimestampRange(start, end time.Time) ListRequestBuilder
	WithParsedTimestampRange(r string) (ListRequestBuilder, error)
	WithLabelSelector(selector string) (ListRequestBuilder, error)
	WithResponseSource(s ResponseSource) ListRequestBuilder
	WithRateLimitId(id apid.ID) ListRequestBuilder
}

type ListRequestExecutor

type ListRequestExecutor interface {
	FetchPage(ctx context.Context) pagination.PageResult[*LogRecord]
	Enumerate(ctx context.Context, callback pagination.EnumerateCallback[*LogRecord]) error
}

type LogRecord

type LogRecord struct {
	Namespace           string              `json:"namespace"`
	Type                httpf.RequestType   `json:"type"`
	RequestId           apid.ID             `json:"request_id"`
	CorrelationId       string              `json:"correlation_id,omitempty"`
	Timestamp           time.Time           `json:"timestamp"`
	MillisecondDuration MillisecondDuration `json:"duration"`
	ConnectionId        apid.ID             `json:"connection_id,omitempty"`
	ConnectorId         apid.ID             `json:"connector_id,omitempty"`
	ConnectorVersion    uint64              `json:"connector_version,omitempty"`
	Method              string              `json:"method"`
	Host                string              `json:"host"`
	Scheme              string              `json:"scheme"`
	Path                string              `json:"path"`
	RequestHttpVersion  string              `json:"request_http_version,omitempty"`
	RequestSizeBytes    int64               `json:"request_size_bytes,omitempty"`
	RequestMimeType     string              `json:"request_mime_type,omitempty"`
	ResponseStatusCode  int                 `json:"response_status_code,omitempty"`
	ResponseError       string              `json:"response_error,omitempty"`
	ResponseHttpVersion string              `json:"response_http_version,omitempty"`
	ResponseSizeBytes   int64               `json:"response_size_bytes,omitempty"`
	ResponseMimeType    string              `json:"response_mime_type,omitempty"`
	InternalTimeout     bool                `json:"internal_timeout,omitempty"`
	RequestCancelled    bool                `json:"request_cancelled,omitempty"`
	FullRequestRecorded bool                `json:"full_request_recorded,omitempty"`
	Labels              database.Labels     `json:"labels,omitempty"`

	// ResponseSource identifies who produced the response. Defaults to
	// ResponseSourceUpstream so historical entries — and any non-429
	// response — keep the obvious meaning. See attribution.go.
	ResponseSource ResponseSource `json:"response_source,omitempty"`

	// RateLimitId, RateLimitMode, RateLimitBucket are populated when a
	// proxy-side RateLimit resource matched the request, regardless of
	// whether it was the firing rule or just a logged observation. The
	// connector-level reactive limiter does not populate these (it has
	// no rule id).
	RateLimitId     apid.ID           `json:"rate_limit_id,omitempty"`
	RateLimitMode   string            `json:"rate_limit_mode,omitempty"`
	RateLimitBucket map[string]string `json:"rate_limit_bucket,omitempty"`

	// RateLimitMatched is the full set of rate-limit rules that matched
	// this request — the firing rule plus any observe-mode matches. Lets
	// operators see *every* rule that contributed to the decision, not
	// just the one that ultimately rejected the request.
	RateLimitMatched []RateLimitMatch `json:"rate_limit_matched,omitempty"`
}

LogRecord represents a record of an HTTP request as is stored in the request log. This data is redacted to avoid containing sensitive information like information in headers. For a given record, the full request may be stored as well, which would correspond to the data in the Entry struct.

JSON tagging on this struct is used so the same data structure can be passed directly to endpoint responses. It is not use for internal storage.

func (*LogRecord) GetId

func (e *LogRecord) GetId() apid.ID

func (*LogRecord) GetNamespace

func (e *LogRecord) GetNamespace() string

type LogRetriever

type LogRetriever interface {
	GetFullLog(ctx context.Context, id apid.ID) (*FullLog, error)
	NewListRequestsBuilder() ListRequestBuilder
	ListRequestsFromCursor(ctx context.Context, cursor string) (ListRequestExecutor, error)
}

LogRetriever is an interface for retrieving logs. Used by the API to retrieve logs.

type MillisecondDuration

type MillisecondDuration time.Duration

MillisecondDuration is a wrapper around time.Duration for JSON marshaling as milliseconds

func (MillisecondDuration) Duration

func (d MillisecondDuration) Duration() time.Duration

func (MillisecondDuration) MarshalJSON

func (d MillisecondDuration) MarshalJSON() ([]byte, error)

MarshalJSON implements the json.Marshaler interface

func (MillisecondDuration) String

func (d MillisecondDuration) String() string

func (*MillisecondDuration) UnmarshalJSON

func (d *MillisecondDuration) UnmarshalJSON(b []byte) error

UnmarshalJSON implements the json.Unmarshaler interface

type RateLimitMatch

type RateLimitMatch struct {
	Id     apid.ID           `json:"id"`
	Mode   string            `json:"mode"`
	Bucket map[string]string `json:"bucket,omitempty"`
}

RateLimitMatch describes a single rate-limit rule that matched the request. The full set of matches is captured in LogRecord.RateLimitMatched so observers can see every rule that fired (in either enforce or observe mode), not just the most-restrictive one that ultimately decided the request's fate.

type RecordRetriever

type RecordRetriever interface {
	// GetRecord retrieves a single LogRecord by its request ID.
	GetRecord(ctx context.Context, id apid.ID) (*LogRecord, error)

	// NewListRequestsBuilder creates a new builder for listing entry records with filters.
	NewListRequestsBuilder() ListRequestBuilder

	// ListRequestsFromCursor resumes a paginated listing from a cursor string.
	ListRequestsFromCursor(ctx context.Context, cursor string) (ListRequestExecutor, error)
}

RecordRetriever handles querying LogRecord metadata from a storage backend.

func NewClickhouseRecordRetriever

func NewClickhouseRecordRetriever(cfg *config.Database, cursorEncryptor pagination.CursorEncryptor, logger *slog.Logger, dbOpts ...database.Option) RecordRetriever

func NewRecordRetriever

func NewRecordRetriever(cfg *config.Database, cursorEncryptor pagination.CursorEncryptor, logger *slog.Logger, dbOpts ...database.Option) RecordRetriever

NewRecordRetriever creates an RecordRetriever based on the HttpLogging configuration.

func NewSqlRecordRetriever

func NewSqlRecordRetriever(cfg *config.Database, cursorEncryptor pagination.CursorEncryptor, logger *slog.Logger, opts ...database.Option) RecordRetriever

type RecordStore

type RecordStore interface {
	// StoreRecord persists a LogRecord to the storage backend.
	StoreRecord(ctx context.Context, record *LogRecord) error

	// StoreRecords persists multiple LogRecord to the storage backend.
	StoreRecords(ctx context.Context, records []*LogRecord) error
}

RecordStore handles persisting LogRecord metadata to a storage backend.

func NewBufferedStore

func NewBufferedStore(inner RecordStore, flushBatchSize int, flushInterval time.Duration) RecordStore

func NewClickhouseRecordStore

func NewClickhouseRecordStore(cfg *config.Database, logger *slog.Logger, dbOpts ...database.Option) RecordStore

func NewRecordStore

func NewRecordStore(cfg *config.Database, logger *slog.Logger, dbOpts ...database.Option) RecordStore

NewRecordStore creates an RecordStore based on the HttpLogging configuration. dbOpts are forwarded to the underlying instrumented DB constructor — callers pass database.WithTelemetry(...) to enable spans + metrics on the request-log database tier.

func NewSqlRecordStore

func NewSqlRecordStore(cfg *config.Database, logger *slog.Logger, opts ...database.Option) RecordStore

type RequestOrderByField

type RequestOrderByField string
const (
	RequestOrderByTimestamp          RequestOrderByField = "timestamp"
	RequestOrderByType               RequestOrderByField = "type"
	RequestOrderByCorrelationId      RequestOrderByField = "correlation_id"
	RequestOrderByConnectionId       RequestOrderByField = "connection_id"
	RequestOrderByConnectorType      RequestOrderByField = "connector_type"
	RequestOrderByConnectorId        RequestOrderByField = "connector_id"
	RequestOrderByMethod             RequestOrderByField = "method"
	RequestOrderByPath               RequestOrderByField = "path"
	RequestOrderByResponseStatusCode RequestOrderByField = "response_status_code"
	RequestOrderByConnectorVersion   RequestOrderByField = "connector_version"
	RequestOrderByNamespace          RequestOrderByField = "namespace"
)

type ResponseSource

type ResponseSource string

ResponseSource identifies *who* produced the response that a LogRecord captures. Until this PR every 429 in the log looked the same — a 3rd party rate-limiting us was indistinguishable from the connector-level reactive backoff returning a synthetic 429. Stamping this field at the point of synthesis closes that gap and reserves a value for the upcoming proxy-side rate-limit resource enforcement (#223).

const (
	// ResponseSourceUpstream means the response (including any 429) came
	// from the 3rd-party service — the default and historically the only
	// possibility.
	ResponseSourceUpstream ResponseSource = "upstream"

	// ResponseSourceConnectorRateLimiter means the connector-level reactive
	// limiter short-circuited the request because the connection is in
	// cool-down from a prior real upstream 429. No upstream call was made
	// for this attempt.
	ResponseSourceConnectorRateLimiter ResponseSource = "connector_rate_limiter"

	// ResponseSourceRateLimit is reserved for the proxy-side rate-limit
	// resource enforcement (#223). Defining it here keeps the schema
	// stable across the two PRs; population happens in #223.
	ResponseSourceRateLimit ResponseSource = "rate_limit"
)

type RoundTripper

type RoundTripper struct {
	// contains filtered or unexported fields
}

func (*RoundTripper) RoundTrip

func (t *RoundTripper) RoundTrip(req *http.Request) (*http.Response, error)

RoundTrip implements the http.RoundTripper interface

type StorageService

type StorageService struct {
	// contains filtered or unexported fields
}

func NewStorageService

func NewStorageService(
	ctx context.Context,
	cfg *config.HttpLogging,
	cursorEncryptor pagination.CursorEncryptor,
	encryptor Encryptor,
	logger *slog.Logger,
	dbOpts ...database.Option,
) (*StorageService, error)

NewStorageService that will store the log records and the full request/response. dbOpts are forwarded to the underlying DB constructors — pass database.WithTelemetry(...) to instrument the request-log database tier.

func (*StorageService) GetFullLog

func (ss *StorageService) GetFullLog(ctx context.Context, id apid.ID) (*FullLog, error)

func (*StorageService) GetRecord

func (ss *StorageService) GetRecord(ctx context.Context, id apid.ID) (*LogRecord, error)

GetRecord retrieves a single LogRecord by its request ID.

func (*StorageService) ListRequestsFromCursor

func (ss *StorageService) ListRequestsFromCursor(ctx context.Context, cursor string) (ListRequestExecutor, error)

ListRequestsFromCursor resumes a paginated listing from a cursor string.

func (*StorageService) Migrate

func (ss *StorageService) Migrate(ctx context.Context) error

Migrate runs any necessary schema migrations for the storage backend.

func (*StorageService) NewListRequestsBuilder

func (ss *StorageService) NewListRequestsBuilder() ListRequestBuilder

NewListRequestsBuilder creates a new builder for listing entry records with filters.

func (*StorageService) NewRoundTripper

func (ss *StorageService) NewRoundTripper(ri httpf.RequestInfo, transport http.RoundTripper) http.RoundTripper

func (*StorageService) Ping

func (ss *StorageService) Ping(ctx context.Context) bool

Ping checks if the storage backends are reachable.

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL